BioImager  3.9.1
A .NET microscopy imaging library. Supports various microscopes by using imported libraries & GUI automation. Supported libraries include PriorĀ® & ZeissĀ® & all devices supported by Micromanager 2.0 and python-microscope.
Loading...
Searching...
No Matches
device_server.py
1#!/usr/bin/env python3
2
3## Copyright (C) 2020 David Miguel Susano Pinto <carandraug@gmail.com>
4## Copyright (C) 2020 Mick Phillips <mick.phillips@gmail.com>
5##
6## This file is part of Microscope.
7##
8## Microscope is free software: you can redistribute it and/or modify
9## it under the terms of the GNU General Public License as published by
10## the Free Software Foundation, either version 3 of the License, or
11## (at your option) any later version.
12##
13## Microscope is distributed in the hope that it will be useful,
14## but WITHOUT ANY WARRANTY; without even the implied warranty of
15## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16## GNU General Public License for more details.
17##
18## You should have received a copy of the GNU General Public License
19## along with Microscope. If not, see <http://www.gnu.org/licenses/>.
20
21"""A class for serving microscope components.
22
23This module provides a server to make microscope control objects available
24over Pyro. When called from the command line, this module will serve devices
25defined in a specified config file.
26
27This module can be called as a program to serve devices over Pyro,
28like so::
29
30 python -m microscope.device_server CONFIG-FILEPATH
31
32where ``CONFIG-FILEPATH`` is the path for a python file that defines a
33``DEVICES = [device(...), ...]``
34
35"""
36
37import argparse
38import copy
39import importlib.machinery
40import importlib.util
41import logging
42import multiprocessing
43import signal
44import sys
45import time
46import typing
47from collections.abc import Iterable
48from dataclasses import dataclass
49from logging import StreamHandler
50from logging.handlers import RotatingFileHandler
51from threading import Thread
52
53import Pyro4
54
55import microscope.abc
56from microscope.abc import FloatingDeviceMixin
57
58
59_logger = logging.getLogger(__name__)
60
61
62# Needed for Python<3.8 in MacOSX High Sierra (issue #106)
63# FIXME: remove this once we are dependent on Python>=3.8
64if sys.platform == "darwin" and sys.version_info < (3, 8):
65 _logger.info("changing multiprocessing start method to 'spawn'")
66 multiprocessing = multiprocessing.get_context("spawn")
67
68
69# Pyro configuration. Use pickle because it can serialize numpy ndarrays.
70Pyro4.config.SERIALIZERS_ACCEPTED.add("pickle")
71Pyro4.config.SERIALIZER = "pickle"
72
73# We effectively expose all attributes of the classes since our
74# devices don't hold any private data. The private methods are to
75# signal an interface not meant for public usage, not because there's
76# anything secret or unsafe. So disable REQUIRE_EXPOSE which avoids
77# requiring Pyro4.expose all over the code (see issue #49)
78Pyro4.config.REQUIRE_EXPOSE = False
79
80
81def device(
82 cls: typing.Callable,
83 host: str,
84 port: int,
85 conf: typing.Mapping[str, typing.Any] = {},
86 uid: typing.Optional[str] = None,
87):
88 """Define devices and where to serve them.
89
90 A device definition for use in deviceserver config files.
91
92 Args:
93 cls: :class:`Device` class of device to serve or function that
94 returns a map of `Device` instances to wanted Pyro ID.
95 The device class will be constructed, or the function will
96 be called, with the arguments in ``conf``.
97 host: hostname or ip address serving the devices.
98 port: port number used to serve the devices.
99 conf: keyword arguments for ``cls``. The device or function
100 are effectively constructed or called with `cls(**conf)`.
101 uid: used to identify "floating" devices (see documentation
102 for :class:`FloatingDeviceMixin`). This must be specified
103 if ``cls`` is a floating device.
104
105 Example
106
107 .. code-block:: python
108
109 def construct_devices() -> typing.Dict[str, Device]:
110 camera = Camera(some, arguments)
111 # ... any other configuration that might be wanted
112 return {'RedCamera': camera}
113
114 DEVICES = [
115 # passing a function that returns devices
116 device(construct_devices, '127.0.0.1', 8000),
117 # passing a Device class
118 device(Camera, '127.0.0.1', 8001,
119 conf={'kwarg1': some, 'kwarg2': arguments})
120 ]
121
122 """
123 if not callable(cls):
124 raise TypeError("cls must be a callable")
125 elif isinstance(cls, type):
126 if issubclass(cls, FloatingDeviceMixin) and uid is None:
127 raise TypeError("uid must be specified for floating devices")
128 elif not issubclass(cls, FloatingDeviceMixin) and uid is not None:
129 raise TypeError("uid must not be given for non floating devices")
130 return dict(cls=cls, host=host, port=int(port), uid=uid, conf=conf)
131
132
133def _create_log_formatter(name: str):
134 """Create a logging.Formatter for the device server.
135
136 Each device is served on its own process and each device has its
137 own log file. But the logs from all device servers also appear on
138 stderr where it will be difficult to figure out from which device
139 server a log message comes. This creates a logging.Formatter
140 which includes the device server name.
141
142 Args:
143 name: device name to be used on the log output.
144
145 """
146 return logging.Formatter(
147 "%%(asctime)s:%s (%%(name)s):%%(levelname)s"
148 ":PID %%(process)s: %%(message)s" % name
149 )
150
151
152class Filter(logging.Filter):
153 def __init__(self):
154 self.last = None
155 self.count = 1
156 self.aggregate_at = 3
157 self.repeat_at = 5
158 self.stop_at = self.aggregate_at + 3 * self.repeat_at
159
160 def filter(self, record):
161 """Pass, aggregate or suppress consecutive repetitions of a log message."""
162 if self.last == record.msg:
163 # Repeated message. Increment count.
164 self.count += 1
165 else:
166 # New message. We've seen 1 instance of it.
167 self.count = 1
168 # Update self.last - no further reference to last message
169 # needed in this call.
170 self.last = record.msg
171 if self.count < self.aggregate_at:
172 return True
173 elif self.count == self.aggregate_at:
174 record.msg = "Aggregating reps. of: %s" % (record.msg)
175 return True
176 elif (
177 self.stop_at > self.count > self.aggregate_at
178 and ((self.count - self.aggregate_at) % self.repeat_at) == 0
179 ):
180 record.msg = "%d times: %s" % (self.repeat_at, record.msg)
181 return True
182 elif self.count == self.stop_at:
183 record.msg = "Suppressing reps. of: %s" % (record.msg)
184 return True
185 else:
186 return False
187
188
189@dataclass(frozen=True)
191 """Class to define configuration for a device server.
192
193 The different fields map to the different ``device-server``
194 command line options.
195
196 """
197
198 config_fpath: str
199 logging_level: int
200
201
202def _check_autoproxy_feature() -> None:
203 # AUTOPROXY is enabled by default. If it is disabled there must
204 # be a reason so raise an error instead of silently enabling it.
205 if not Pyro4.config.AUTOPROXY:
206 raise Exception(
207 "serving of a ControllerDevice requires"
208 " Pyro4 AUTOPROXY option enabled"
209 )
210
211 # AUTOPROXY does not work with the marshal serializer. marshal is
212 # not the default serializer so if it is the current serializer
213 # there must be a reason so we don't just change it.
214 if Pyro4.config.SERIALIZER == "marshal":
215 raise Exception(
216 "Pyro's AUTOPROXY feature is required but the"
217 " 'marshal' serializer is currently selected"
218 )
219 if "marshal" in Pyro4.config.SERIALIZERS_ACCEPTED:
220 Pyro4.config.SERIALIZERS_ACCEPTED.remove("marshal")
221 _logger.info("marshal was removed from accepted serializers")
222 return None
223
224
225def _register_device(pyro_daemon, device, obj_id=None) -> None:
226 pyro_daemon.register(device, obj_id)
227
228 if isinstance(device, microscope.abc.Controller):
229 _check_autoproxy_feature()
230 for sub_device in device.devices.values():
231 _register_device(pyro_daemon, sub_device, obj_id=None)
232
233 if isinstance(device, microscope.abc.Stage):
234 _check_autoproxy_feature()
235 for axis in device.axes.values():
236 _register_device(pyro_daemon, axis, obj_id=None)
237
238 return None
239
240
241class DeviceServer(multiprocessing.Process):
242 """Initialise a device and serve at host/port according to its id.
243
244 Args:
245 device_def: definition of the device.
246 options: configuration for the device server.
247 id_to_host: host or mapping of device identifiers to hostname.
248 id_to_port: map or mapping of device identifiers to port
249 number.
250 exit_event: a shared event to signal that the process should
251 quit.
252
253 """
254
255 def __init__(
256 self,
257 device_def,
258 options: DeviceServerOptions,
259 id_to_host: typing.Mapping[str, str],
260 id_to_port: typing.Mapping[str, int],
261 exit_event: typing.Optional[multiprocessing.Event] = None,
262 ):
263 # The device to serve.
264 self._device_def = device_def
265 self._options = options
266 self._devices: typing.Dict[str, microscope.abc.Device] = {}
267 # Where to serve it.
268 self._id_to_host = id_to_host
269 self._id_to_port = id_to_port
270 # A shared event to allow clean shutdown.
271 self.exit_event = exit_event
272 super().__init__()
273 self.daemon = True
274
275 def clone(self):
276 """Create new instance with same settings.
277
278 This is useful to restart a device server.
279
280 """
281 return DeviceServer(
282 self._device_def,
283 self._options,
284 self._id_to_host,
285 self._id_to_port,
286 exit_event=self.exit_event,
287 )
288
289 def run(self):
290 cls = self._device_def["cls"]
291 cls_name = cls.__name__
292
293 # If the multiprocessing start method is fork, the child
294 # process gets a copy of the root logger. The copy is
295 # configured to sign the messages as "device-server", and
296 # write to the main log file and stderr. We remove those
297 # handlers so that this DeviceServer is logged to a separate
298 # file and the messages are signed with the device name.
299 root_logger = logging.getLogger()
300 # Get a new list of handlers because otherwise we are
301 # iterating over the same list as removeHandler().
302 for handler in list(root_logger.handlers):
303 root_logger.removeHandler(handler)
304
305 root_logger.setLevel(self._options.logging_level)
306
307 # Later, we'll log to one file per server, with a filename
308 # based on a unique identifier for the device. Some devices
309 # don't have UIDs available until after initialization, so
310 # log to stderr until then.
311 stderr_handler = StreamHandler(sys.stderr)
312 stderr_handler.setFormatter(_create_log_formatter(cls_name))
313 root_logger.addHandler(stderr_handler)
314 root_logger.debug("Debugging messages on.")
315
316 root_logger.addFilter(Filter())
317
318 # The cls argument can either be a Device subclass, or it can
319 # be a function that returns a map of names to devices.
320 cls_is_type = isinstance(cls, type)
321
322 if not cls_is_type:
323 self._devices = cls(**self._device_def["conf"])
324 else:
325 while not self.exit_event.is_set():
326 try:
327 device = cls(**self._device_def["conf"])
328 except Exception as e:
329 _logger.info(
330 "Failed to start device. Retrying in 5s.", exc_info=e
331 )
332 time.sleep(5)
333 else:
334 break
335 self._devices = {cls_name: device}
336
337 if cls_is_type and issubclass(cls, FloatingDeviceMixin):
338 uid = str(list(self._devices.values())[0].get_id())
339 if uid not in self._id_to_host or uid not in self._id_to_port:
340 raise Exception(
341 "Host or port not found for device %s" % (uid,)
342 )
343 host = self._id_to_host[uid]
344 port = self._id_to_port[uid]
345 else:
346 host = self._device_def["host"]
347 port = self._device_def["port"]
348
349 pyro_daemon = Pyro4.Daemon(port=port, host=host)
350
351 log_handler = RotatingFileHandler(
352 "%s_%s_%s.log" % (cls_name, host, port)
353 )
354 log_handler.setFormatter(_create_log_formatter(cls_name))
355 root_logger.addHandler(log_handler)
356
357 _logger.info("Device initialized; starting daemon.")
358 for obj_id, device in self._devices.items():
359 _register_device(pyro_daemon, device, obj_id=obj_id)
360
361 # Run the Pyro daemon in a separate thread so that we can do
362 # clean shutdown under Windows.
363 pyro_thread = Thread(target=pyro_daemon.requestLoop)
364 pyro_thread.daemon = True
365 pyro_thread.start()
366 for device in self._devices.values():
367 _logger.info("Serving %s", pyro_daemon.uriFor(device))
368 if isinstance(device, FloatingDeviceMixin):
369 _logger.info(
370 "Device UID on port %s is %s", port, device.get_id()
371 )
372
373 # Wait for termination event. We should just be able to call
374 # wait() on the exit_event, but this causes issues with locks
375 # in multiprocessing - see http://bugs.python.org/issue30975 .
376 while self.exit_event and not self.exit_event.is_set():
377 # This tread waits for the termination event.
378 try:
379 time.sleep(5)
380 except (KeyboardInterrupt, IOError):
381 pass
382 pyro_daemon.shutdown()
383 pyro_thread.join()
384 for device in self._devices.values():
385 try:
386 device.shutdown()
387 except Exception as ex:
388 # Catch errors so we get a chance of shutting down the
389 # other devices.
390 _logger.error("Failure to shutdown device %s", device, ex)
391
392
393def serve_devices(devices, options: DeviceServerOptions, exit_event=None):
394 # We make changes to `devices` (would be great if we didn't had
395 # to) so make a a copy of it because we don't want to make those
396 # changes on the caller. See original issue on #211 and PRs #212
397 # and #217 (most discussion happens on #212).
398 devices = copy.deepcopy(devices)
399
400 root_logger = logging.getLogger()
401
402 log_handler = RotatingFileHandler("__MAIN__.log")
403 log_handler.setFormatter(_create_log_formatter("device-server"))
404 root_logger.addHandler(log_handler)
405
406 # An event to trigger clean termination of subprocesses. This is the
407 # only way to ensure devices are shut down properly when processes
408 # exit, as __del__ is not necessarily called when the interpreter exits.
409 if exit_event is None:
410 exit_event = multiprocessing.Event()
411
412 servers = (
413 []
414 ) # DeviceServers instances that we need to wait for when exiting
415
416 # Child processes inherit signal handling from the parent so we
417 # need to make sure that only the parent process sets the exit
418 # event and waits for the DeviceServers to exit. See issue #9.
419 # This won't work behind a Windows service wrapper, so we deal with
420 # clean shutdown on win32 elsewhere.
421 parent = multiprocessing.current_process()
422
423 def term_func(sig, frame):
424 """Terminate subprocesses cleanly."""
425 if parent == multiprocessing.current_process():
426 _logger.debug("Shutting down all servers.")
427 exit_event.set()
428 # Join keep_alive_thread so that it can't modify the list
429 # of servers.
430 keep_alive_thread.join()
431 for this_server in servers:
432 this_server.join()
433 sys.exit()
434
435 if sys.platform != "win32":
436 signal.signal(signal.SIGTERM, term_func)
437 signal.signal(signal.SIGINT, term_func)
438
439 # Group devices by class.
440 by_class = {}
441 for dev in devices:
442 by_class[dev["cls"]] = by_class.get(dev["cls"], []) + [dev]
443
444 if not by_class:
445 _logger.warning("No valid devices specified. Maybe an empty list?")
446
447 for cls, devs in by_class.items():
448 # Floating devices are devices that can only be identified
449 # after having been initialized, so the constructor will
450 # return any device that it supports. To work around this we
451 # map all device uid to host/port first. After the
452 # DeviceServer constructs the device, it can check on the map
453 # where to serve it. For non floating devices that
454 # information is part of the device definition, no map is
455 # needed.
456 uid_to_host = {}
457 uid_to_port = {}
458 if isinstance(cls, type) and issubclass(cls, FloatingDeviceMixin):
459 # In addition to the maps of uid to host/port, floating
460 # devices SDKs need the number of devices to index them.
461 count = 0
462 for dev in devs:
463 uid = dev["uid"]
464 uid_to_host[uid] = dev["host"]
465 uid_to_port[uid] = dev["port"]
466
467 dev["conf"]["index"] = count
468 count += 1
469
470 for dev in devs:
471 servers.append(
473 dev,
474 options,
475 uid_to_host,
476 uid_to_port,
477 exit_event=exit_event,
478 )
479 )
480 servers[-1].start()
481
482 # Main thread must be idle to process signals correctly, so use another
483 # thread to check DeviceServers, restarting them where necessary. Define
484 # the thread target here so that it can access variables in __main__ scope.
485 def keep_alive():
486 """Keep DeviceServers alive."""
487 while not exit_event.is_set():
488 for s in servers:
489 if s.is_alive():
490 continue
491 else:
492 _logger.info(
493 "DeviceServer Failure. Process %s is dead with"
494 " exitcode %s. Restarting...",
495 s.pid,
496 s.exitcode,
497 )
498 servers.remove(s)
499 servers.append(s.clone())
500
501 try:
502 s.join(30)
503 except:
504 _logger.error("... could not join PID %s.", s.pid)
505 else:
506 old_pid = s.pid
507 del s
508 servers[-1].start()
509 _logger.info(
510 "... DeviceServer with PID %s restarted"
511 " as PID %s.",
512 old_pid,
513 servers[-1].pid,
514 )
515 if not servers:
516 # Log and exit if no servers running. May want to change this
517 # if we add some interface to interactively restart servers.
518 _logger.info("No servers running. Exiting.")
519 exit_event.set()
520 else:
521 try:
522 time.sleep(5)
523 except (KeyboardInterrupt, IOError):
524 pass
525
526 keep_alive_thread = Thread(target=keep_alive)
527 keep_alive_thread.start()
528
529 _logger.info("Device Server started. Press Ctrl+C to exit.")
530 while not exit_event.is_set():
531 try:
532 time.sleep(5)
533 except (KeyboardInterrupt, IOError):
534 _logger.debug("KeyboardInterrupt or IOError")
535 exit_event.set()
536
537 _logger.debug("Shutting down servers ...")
538 while servers:
539 for s in servers:
540 if not s.is_alive():
541 servers.remove(s)
542 del s
543 time.sleep(1)
544 _logger.info(" ... No more servers running.")
545 _logger.debug("Joining threads ...")
546 keep_alive_thread.join()
547 _logger.debug("... Threads joined. Exiting.")
548 return
549
550
551def _parse_cmd_line_args(args: typing.Sequence[str]) -> DeviceServerOptions:
552 parser = argparse.ArgumentParser(prog="device-server")
553 parser.add_argument(
554 "--logging-level",
555 action="store",
556 type=str,
557 default="info",
558 choices=["debug", "info", "warning", "error", "critical"],
559 help="Set logging level",
560 )
561 parser.add_argument(
562 "config_fpath",
563 action="store",
564 type=str,
565 metavar="CONFIG-FILEPATH",
566 help="Path to the configuration file",
567 )
568 parsed = parser.parse_args(args)
569 return DeviceServerOptions(
570 config_fpath=parsed.config_fpath,
571 logging_level=getattr(logging, parsed.logging_level.upper()),
572 )
573
574
575def _load_source(filepath):
576 loader = importlib.machinery.SourceFileLoader("config", filepath)
577 spec = importlib.util.spec_from_loader("config", loader)
578 module = importlib.util.module_from_spec(spec)
579 spec.loader.exec_module(module)
580 return module
581
582
583def validate_devices(configfile):
584 config = _load_source(configfile)
585 try:
586 devices = getattr(config, "DEVICES")
587 except AttributeError:
588 raise Exception("No 'DEVICES=...' in config file.")
589 if not isinstance(devices, Iterable):
590 raise Exception("Error in config: DEVICES should be an iterable.")
591 return devices
592
593
594def main(argv: typing.Sequence[str]) -> int:
595 options = _parse_cmd_line_args(argv[1:])
596
597 root_logger = logging.getLogger()
598 root_logger.setLevel(options.logging_level)
599
600 stderr_handler = StreamHandler(sys.stderr)
601 stderr_handler.setFormatter(_create_log_formatter("device-server"))
602 root_logger.addHandler(stderr_handler)
603
604 root_logger.addFilter(Filter())
605
606 devices = validate_devices(options.config_fpath)
607
608 serve_devices(devices, options)
609
610 return 0
611
612
613def _setuptools_entry_point() -> int:
614 # The setuptools entry point must be a function, we can't simply
615 # name this module even if this module does work as a script. We
616 # also do not want to set the default of main() to sys.argv
617 # because when the documentation is generated (with Sphinx's
618 # autodoc extension), then sys.argv gets replaced with the
619 # sys.argv value at the time docs were generated (see
620 # https://stackoverflow.com/a/12087750 )
621 return main(sys.argv)
622
623
624def __main__() -> None:
625 # Kept for backwards compatibility. It keeps the setuptools
626 # scripts from older editable mode installations. Will be safe to
627 # remove soon.
628 _setuptools_entry_point()
629
630
631if __name__ == "__main__":
632 sys.exit(main(sys.argv))