21"""A class for serving microscope components.
23This module provides a server to make microscope control objects available
24over Pyro. When called from the command line, this module will serve devices
25defined in a specified config file.
27This module can be called as a program to serve devices over Pyro,
32where ``CONFIG-FILEPATH`` is the path
for a python file that defines a
33``DEVICES = [device(...), ...]``
39import importlib.machinery
47from collections.abc import Iterable
48from dataclasses import dataclass
49from logging import StreamHandler
50from logging.handlers import RotatingFileHandler
51from threading import Thread
56from microscope.abc import FloatingDeviceMixin
59_logger = logging.getLogger(__name__)
62# Needed for Python<3.8 in MacOSX High Sierra (issue #106)
63# FIXME: remove this once we are dependent on Python>=3.8
64if sys.platform == "darwin" and sys.version_info < (3, 8):
65 _logger.info("changing multiprocessing start method to 'spawn'")
66 multiprocessing = multiprocessing.get_context(
"spawn")
70Pyro4.config.SERIALIZERS_ACCEPTED.add(
"pickle")
71Pyro4.config.SERIALIZER =
"pickle"
78Pyro4.config.REQUIRE_EXPOSE =
False
85 conf: typing.Mapping[str, typing.Any] = {},
86 uid: typing.Optional[str] =
None,
88 """Define devices and where to serve them.
90 A device definition for use
in deviceserver config files.
93 cls: :
class:`Device`
class of device to serve or function that
94 returns a map of `Device` instances to wanted Pyro ID.
95 The device
class will be constructed, or the function will
96 be called,
with the arguments
in ``conf``.
97 host: hostname
or ip address serving the devices.
98 port: port number used to serve the devices.
99 conf: keyword arguments
for ``cls``. The device
or function
100 are effectively constructed
or called
with `cls(**conf)`.
101 uid: used to identify
"floating" devices (see documentation
102 for :
class:`FloatingDeviceMixin`). This must be specified
103 if ``cls``
is a floating device.
107 .. code-block:: python
109 def construct_devices() -> typing.Dict[str, Device]:
110 camera = Camera(some, arguments)
112 return {
'RedCamera': camera}
116 device(construct_devices,
'127.0.0.1', 8000),
118 device(Camera,
'127.0.0.1', 8001,
119 conf={
'kwarg1': some,
'kwarg2': arguments})
123 if not callable(cls):
124 raise TypeError(
"cls must be a callable")
125 elif isinstance(cls, type):
126 if issubclass(cls, FloatingDeviceMixin)
and uid
is None:
127 raise TypeError(
"uid must be specified for floating devices")
128 elif not issubclass(cls, FloatingDeviceMixin)
and uid
is not None:
129 raise TypeError(
"uid must not be given for non floating devices")
130 return dict(cls=cls, host=host, port=int(port), uid=uid, conf=conf)
133def _create_log_formatter(name: str):
134 """Create a logging.Formatter for the device server.
136 Each device is served on its own process
and each device has its
137 own log file. But the logs
from all device servers also appear on
138 stderr where it will be difficult to figure out
from which device
139 server a log message comes. This creates a logging.Formatter
140 which includes the device server name.
143 name: device name to be used on the log output.
146 return logging.Formatter(
147 "%%(asctime)s:%s (%%(name)s):%%(levelname)s"
148 ":PID %%(process)s: %%(message)s" % name
161 """Pass, aggregate or suppress consecutive repetitions of a log message."""
162 if self.
last == record.msg:
170 self.
last = record.msg
174 record.msg =
"Aggregating reps. of: %s" % (record.msg)
180 record.msg =
"%d times: %s" % (self.
repeat_at, record.msg)
183 record.msg =
"Suppressing reps. of: %s" % (record.msg)
189@dataclass(frozen=True)
191 """Class to define configuration for a device server.
193 The different fields map to the different ``device-server``
194 command line options.
202def _check_autoproxy_feature() -> None:
205 if not Pyro4.config.AUTOPROXY:
207 "serving of a ControllerDevice requires"
208 " Pyro4 AUTOPROXY option enabled"
214 if Pyro4.config.SERIALIZER ==
"marshal":
216 "Pyro's AUTOPROXY feature is required but the"
217 " 'marshal' serializer is currently selected"
219 if "marshal" in Pyro4.config.SERIALIZERS_ACCEPTED:
220 Pyro4.config.SERIALIZERS_ACCEPTED.remove(
"marshal")
221 _logger.info(
"marshal was removed from accepted serializers")
225def _register_device(pyro_daemon, device, obj_id=None) -> None:
226 pyro_daemon.register(device, obj_id)
229 _check_autoproxy_feature()
230 for sub_device
in device.devices.values():
231 _register_device(pyro_daemon, sub_device, obj_id=
None)
234 _check_autoproxy_feature()
235 for axis
in device.axes.values():
236 _register_device(pyro_daemon, axis, obj_id=
None)
242 """Initialise a device and serve at host/port according to its id.
245 device_def: definition of the device.
246 options: configuration for the device server.
247 id_to_host: host
or mapping of device identifiers to hostname.
248 id_to_port: map
or mapping of device identifiers to port
250 exit_event: a shared event to signal that the process should
258 options: DeviceServerOptions,
259 id_to_host: typing.Mapping[str, str],
260 id_to_port: typing.Mapping[str, int],
261 exit_event: typing.Optional[multiprocessing.Event] =
None,
276 """Create new instance with same settings.
278 This is useful to restart a device server.
291 cls_name = cls.__name__
299 root_logger = logging.getLogger()
302 for handler
in list(root_logger.handlers):
303 root_logger.removeHandler(handler)
305 root_logger.setLevel(self.
_options.logging_level)
311 stderr_handler = StreamHandler(sys.stderr)
312 stderr_handler.setFormatter(_create_log_formatter(cls_name))
313 root_logger.addHandler(stderr_handler)
314 root_logger.debug(
"Debugging messages on.")
316 root_logger.addFilter(
Filter())
320 cls_is_type = isinstance(cls, type)
328 except Exception
as e:
330 "Failed to start device. Retrying in 5s.", exc_info=e
337 if cls_is_type
and issubclass(cls, FloatingDeviceMixin):
338 uid = str(list(self.
_devices.values())[0].get_id())
341 "Host or port not found for device %s" % (uid,)
349 pyro_daemon = Pyro4.Daemon(port=port, host=host)
351 log_handler = RotatingFileHandler(
352 "%s_%s_%s.log" % (cls_name, host, port)
354 log_handler.setFormatter(_create_log_formatter(cls_name))
355 root_logger.addHandler(log_handler)
357 _logger.info(
"Device initialized; starting daemon.")
358 for obj_id, device
in self.
_devices.items():
359 _register_device(pyro_daemon, device, obj_id=obj_id)
363 pyro_thread = Thread(target=pyro_daemon.requestLoop)
364 pyro_thread.daemon =
True
366 for device
in self.
_devices.values():
367 _logger.info(
"Serving %s", pyro_daemon.uriFor(device))
368 if isinstance(device, FloatingDeviceMixin):
370 "Device UID on port %s is %s", port, device.get_id()
380 except (KeyboardInterrupt, IOError):
382 pyro_daemon.shutdown()
384 for device
in self.
_devices.values():
387 except Exception
as ex:
390 _logger.error(
"Failure to shutdown device %s", device, ex)
393def serve_devices(devices, options: DeviceServerOptions, exit_event=
None):
398 devices = copy.deepcopy(devices)
400 root_logger = logging.getLogger()
402 log_handler = RotatingFileHandler(
"__MAIN__.log")
403 log_handler.setFormatter(_create_log_formatter(
"device-server"))
404 root_logger.addHandler(log_handler)
409 if exit_event
is None:
410 exit_event = multiprocessing.Event()
421 parent = multiprocessing.current_process()
423 def term_func(sig, frame):
424 """Terminate subprocesses cleanly."""
425 if parent == multiprocessing.current_process():
426 _logger.debug(
"Shutting down all servers.")
430 keep_alive_thread.join()
431 for this_server
in servers:
435 if sys.platform !=
"win32":
436 signal.signal(signal.SIGTERM, term_func)
437 signal.signal(signal.SIGINT, term_func)
442 by_class[dev[
"cls"]] = by_class.get(dev[
"cls"], []) + [dev]
445 _logger.warning(
"No valid devices specified. Maybe an empty list?")
447 for cls, devs
in by_class.items():
458 if isinstance(cls, type)
and issubclass(cls, FloatingDeviceMixin):
464 uid_to_host[uid] = dev[
"host"]
465 uid_to_port[uid] = dev[
"port"]
467 dev[
"conf"][
"index"] = count
477 exit_event=exit_event,
486 """Keep DeviceServers alive."""
487 while not exit_event.is_set():
493 "DeviceServer Failure. Process %s is dead with"
494 " exitcode %s. Restarting...",
499 servers.append(s.clone())
504 _logger.error(
"... could not join PID %s.", s.pid)
510 "... DeviceServer with PID %s restarted"
518 _logger.info(
"No servers running. Exiting.")
523 except (KeyboardInterrupt, IOError):
526 keep_alive_thread = Thread(target=keep_alive)
527 keep_alive_thread.start()
529 _logger.info(
"Device Server started. Press Ctrl+C to exit.")
530 while not exit_event.is_set():
533 except (KeyboardInterrupt, IOError):
534 _logger.debug(
"KeyboardInterrupt or IOError")
537 _logger.debug(
"Shutting down servers ...")
544 _logger.info(
" ... No more servers running.")
545 _logger.debug(
"Joining threads ...")
546 keep_alive_thread.join()
547 _logger.debug(
"... Threads joined. Exiting.")
551def _parse_cmd_line_args(args: typing.Sequence[str]) -> DeviceServerOptions:
552 parser = argparse.ArgumentParser(prog=
"device-server")
558 choices=[
"debug",
"info",
"warning",
"error",
"critical"],
559 help=
"Set logging level",
565 metavar=
"CONFIG-FILEPATH",
566 help=
"Path to the configuration file",
568 parsed = parser.parse_args(args)
570 config_fpath=parsed.config_fpath,
571 logging_level=getattr(logging, parsed.logging_level.upper()),
575def _load_source(filepath):
576 loader = importlib.machinery.SourceFileLoader(
"config", filepath)
577 spec = importlib.util.spec_from_loader(
"config", loader)
578 module = importlib.util.module_from_spec(spec)
579 spec.loader.exec_module(module)
583def validate_devices(configfile):
584 config = _load_source(configfile)
586 devices = getattr(config,
"DEVICES")
587 except AttributeError:
588 raise Exception(
"No 'DEVICES=...' in config file.")
589 if not isinstance(devices, Iterable):
590 raise Exception(
"Error in config: DEVICES should be an iterable.")
594def main(argv: typing.Sequence[str]) -> int:
595 options = _parse_cmd_line_args(argv[1:])
597 root_logger = logging.getLogger()
598 root_logger.setLevel(options.logging_level)
600 stderr_handler = StreamHandler(sys.stderr)
601 stderr_handler.setFormatter(_create_log_formatter(
"device-server"))
602 root_logger.addHandler(stderr_handler)
604 root_logger.addFilter(
Filter())
606 devices = validate_devices(options.config_fpath)
608 serve_devices(devices, options)
613def _setuptools_entry_point() -> int:
621 return main(sys.argv)
624def __main__() -> None:
628 _setuptools_entry_point()
631if __name__ ==
"__main__":
632 sys.exit(main(sys.argv))