45 """Test device for testing the device server keep alive."""
47 def _do_shutdown(self) -> None:
50 def get_pid(self) -> int:
55 """`DeviceServer` that queues an exception during `run`.
57 A `DeviceServer` instance runs on another process so if it fails
58 we can
't easily check why. This subclass will put any exception
59 that happens during `run()` into the given queue so that the
60 parent process can check it.
64 def __init__(self, queue: multiprocessing.Queue, *args, **kwargs) ->
None:
65 super().__init__(*args, **kwargs)
71 except Exception
as ex:
75def _patch_out_device_server_logs(func):
76 """Decorator to run device server without noise from logs.
78 The device server redirects the logger to stderr *and* creates
79 files on the current directory. There
is no options to control
80 this behaviour so this patches the loggers.
83 def null_logs(*args, **kwargs):
84 return logging.NullHandler()
86 no_file = unittest.mock.patch(
87 "microscope.device_server.RotatingFileHandler", null_logs
89 no_stream = unittest.mock.patch(
90 "microscope.device_server.StreamHandler", null_logs
92 return no_file(no_stream(func))
96 """Handles start and termination of deviceserver.
98 Subclasses may overload class properties defaults as needed.
102 TIMEOUT (number): time given
for service to terminate after
103 receiving signal to terminate.
104 p (multiprocessing.Process): device server process.
110 @_patch_out_device_server_logs
114 logging_level=logging.INFO,
116 self.
p = multiprocessing.Process(
117 target=microscope.device_server.serve_devices,
127 self.
p.is_alive(),
"deviceserver not dead after SIGTERM"
132 """TestCase that starts DeviceServer on separate process.
134 Subclasses should define the class attribute `args`, which
is used
135 to start the `DeviceServer`
and implement `test_*` methods.
142 @_patch_out_device_server_logs
144 self.
queue = multiprocessing.Queue()
152 self.assertIsNotNone(
153 self.
process.exitcode,
"deviceserver not dead after SIGTERM"
160 TestCamera,
"127.0.0.1", 8001, {
"buffer_length": 0}
163 TestFilterWheel,
"127.0.0.1", 8003, {
"positions": 3}
168 """Simplest case, start and exit, given enough time to start all devices"""
169 self.assertTrue(self.
p.is_alive(),
"service dies at start")
172 """Check issues on SIGTERM before starting all devices"""
176class TestInputCheck(BaseTestServeDevices):
178 """Check behaviour if there are no devices."""
180 not self.
p.is_alive(),
"not dying for empty list of devices"
185 def __init__(self, port, **kwargs):
186 super().__init__(**kwargs)
193 def _do_shutdown(self):
197class TestClashingArguments(BaseTestServeDevices):
198 """Device server and device constructor arguments do not clash"""
202 DeviceWithPort,
"127.0.0.1", 8000, {
"port": 7000}
206 def test_port_conflict(self):
208 "PYRO:DeviceWithPort@127.0.0.1:8000"
210 self.assertEqual(client.port, 7000)
214 def _test_load_source(self, filename):
215 file_contents =
"DEVICES = [1,2,3]"
216 with tempfile.TemporaryDirectory()
as dirpath:
217 filepath = os.path.join(dirpath, filename)
218 with open(filepath,
"w")
as fh:
219 fh.write(file_contents)
222 self.assertEqual(module.DEVICES, [1, 2, 3])
225 """Reading of config file module-like works"""
229 """Reading of config file does not require .py file extension"""
236 """Reading of config file does not require file extension"""
243 TestFloatingDevice,
"127.0.0.1", 8001, {
"uid":
"foo"}, uid=
"foo"
246 TestFloatingDevice,
"127.0.0.1", 8002, {
"uid":
"bar"}, uid=
"bar"
250 def test_injection_of_index_kwarg(self):
251 floating_1 = Pyro4.Proxy(
"PYRO:TestFloatingDevice@127.0.0.1:8001")
252 floating_2 = Pyro4.Proxy(
"PYRO:TestFloatingDevice@127.0.0.1:8002")
253 self.assertEqual(floating_1.get_index(), 0)
254 self.assertEqual(floating_2.get_index(), 1)
271 {
"uid":
"foo",
"index": 0},
276 logging_level=logging.INFO,
278 {
"bar":
"127.0.0.1"},
280 multiprocessing.Event(),
283 def test_fail_with_wrong_uid(self):
284 """DeviceServer fails if it gets a FloatingDevice with another UID"""
287 "expected DeviceServer to have errored and be dead",
290 str(self.
queue.get_nowait()),
291 "Host or port not found for device foo",
301 "dm1": TestDeformableMirror(10),
302 "dm2": TestDeformableMirror(20),
309 logging_level=logging.INFO,
313 multiprocessing.Event(),
317 """Function that constructs multiple devices in device definition"""
318 self.assertTrue(self.
process.is_alive())
319 dm1 = Pyro4.Proxy(
"PYRO:dm1@127.0.0.1:8001")
320 dm2 = Pyro4.Proxy(
"PYRO:dm2@127.0.0.1:8001")
321 self.assertEqual(dm1.n_actuators, 10)
322 self.assertEqual(dm2.n_actuators, 20)
328 ExposePIDDevice,
"127.0.0.1", 8001, {}
332 @unittest.skipUnless(
333 hasattr(signal,
"SIGKILL"),
334 "can't test if we can't kill subprocess (windows)",
336 def test_keep_alive(self):
337 device = Pyro4.Proxy(
"PYRO:ExposePIDDevice@127.0.0.1:8001")
338 initial_pid = device.get_pid()
340 os.kill(initial_pid, signal.SIGKILL)
342 with self.assertRaises(Pyro4.errors.ConnectionClosedError):
349 device._pyroReconnect(tries=1)
350 new_pid = device.get_pid()
351 self.assertNotEqual(initial_pid, new_pid)
354if __name__ ==
"__main__":
def _test_load_source(self, filename)
def test_py_file_extension(self)
def test_cfg_file_extension(self)
def test_no_file_extension(self)
def test_function_in_device_definition(self)
def test_immediate_interrupt(self)
def device(typing.Callable cls, str host, int port, typing.Mapping[str, typing.Any] conf={}, typing.Optional[str] uid=None)
def _load_source(filepath)