BioImager  4.9.0
A .NET microscopy imaging application based on Bio library. Supports various microscopes by using imported libraries & GUI automation. Supports XInput game controllers to move stage, take images, run ImageJ macros on images or Bio C# scripts.
Loading...
Searching...
No Matches
zaber.py
1#!/usr/bin/env python3
2
3
20
21"""Zaber devices.
22
23Devices supported
24-----------------
25
26There is support for A-Series and X-Series devices that have firmware
27version 6.06 or higher, as these are the ones who support the ASCII
28protocol.
29
30.. todo::
31
32 Create non controller classes for the simpler case where there is
33 only one Zaber device, and modelling it as controller device is
34 non obvious.
35
36.. todo::
37
38 Consider using `__slots__` on the `_ZaberReply` for performance.
39
40"""
41
42import enum
43import logging
44import threading
45import time
46import typing
47
48import serial
49
50import microscope
52import microscope.abc
53
54
55_logger = logging.getLogger(__name__)
56
57_AT_CODE = ord(b"@")
58_SPACE_CODE = ord(b" ")
59
60
62 """Wraps a Zaber reply to easily index its multiple fields."""
63
64 def __init__(self, data: bytes) -> None:
65 self._data = data
66 if (
67 data[0] != _AT_CODE
68 or data[-2:] != b"\r\n"
69 or any([data[i] != _SPACE_CODE for i in (3, 5, 8, 13, 16)])
70 ):
71 raise ValueError("Not a valid reply from a Zaber device")
72
73 @property
74 def address(self) -> bytes:
75 """The start of reply with device address and space."""
76 return self._data[1:3]
77
78 @property
79 def flag(self) -> bytes:
80 """The reply flag indicates if the message was accepted or rejected.
81
82 Can be `b"OK"` (accepted) or `b"RJ"` (rejected). If rejected,
83 the response property will be one word with the reason why.
84 """
85 return self._data[6:8]
86
87 @property
88 def status(self) -> bytes:
89 """``b"BUSY"`` when the axis is moving and ``b"IDLE"`` otherwise.
90
91 If the reply message applies to the whole device, the status
92 is `b"BUSY"` if any axis is busy and `b"IDLE"` if all axes are
93 idle.
94 """
95 return self._data[9:13]
96
97 @property
98 def warning(self) -> bytes:
99 """The highest priority warning currently active.
100
101 This will be `b'--'` under normal conditions. Anything else
102 is a warning.
103 """
104 return self._data[14:16]
105
106 @property
107 def response(self) -> bytes:
108 # Assumes no checksum
109 return self._data[17:-2]
110
111
113 """Wraps a serial connection with a reentrant lock.
114
115 This class is just the wrap to :class:`serial.Serial`. The class
116 exposing the Zaber commands interface is
117 :class:`_ZaberDeviceConnection`.
118
119 .. todo: replace with microscope._utils.SharedSerial
120 """
121
122 def __init__(self, port: str, baudrate: int, timeout: float) -> None:
123 self._serial = serial.Serial(
124 port=port,
125 baudrate=baudrate,
126 timeout=timeout,
127 bytesize=serial.EIGHTBITS,
128 stopbits=serial.STOPBITS_ONE,
129 parity=serial.PARITY_NONE,
130 xonxoff=False,
131 rtscts=False,
132 dsrdtr=False,
133 )
134 self._lock = threading.RLock()
135 with self._lock:
136 # The command / does nothing other than getting a response
137 # from all devices in the chain. This seems to be the
138 # most innocent command we can use.
139 self._serial.write(b"/\n")
140 lines = self._serial.readlines()
141 if not all([l.startswith(b"@") for l in lines]):
142 raise RuntimeError(
143 "'%s' does not respond like a Zaber device" % port
144 )
145
146 @property
147 def lock(self) -> threading.RLock:
148 return self._lock
149
150 def write(self, data: bytes) -> int:
151 with self.locklock:
152 return self._serial.write(data)
153
154 def readline(self, size: int = -1) -> bytes:
155 with self.locklock:
156 return self._serial.readline(size)
157
158
160 """A Zaber connection to control a single device.
161
162 This class provides a Python interface to the Zaber commands. It
163 also does the routing of commands to the correct device in the
164 chain.
165
166 Args:
167 conn: the :class:`_ZaberConnection` instance for this device.
168 device_address: the device address for the specific device.
169 This is the number used at the start of all Zaber
170 commands.
171 """
172
173 def __init__(self, conn: _ZaberConnection, device_address: int) -> None:
174 self._conn = conn
175 self._address_bytes = b"%02d" % device_address
176
177 def _validate_reply(self, reply: _ZaberReply) -> None:
178 if reply.address != self._address_bytes:
179 raise RuntimeError(
180 "received reply from a device with different"
181 " address (%s instead of %s)"
182 % (reply.address.decode(), self._address_bytes.decode())
183 )
184 if reply.flag != b"OK":
185 raise RuntimeError(
186 "command rejected because '%s'" % reply.response.decode()
187 )
188
189 def command(self, command: bytes, axis: int = 0) -> _ZaberReply:
190 """Send command and return reply.
191
192 Args:
193 command: a bytes array with the command and its
194 parameters.
195 axis: the axis number to send the command. If zero, the
196 command is executed by all axis in the device.
197 """
198 # We do not need to check whether axis number is valid because
199 # the device will reject the command with BADAXIS if so.
200 with self._conn.lock:
201 self._conn.write(
202 b"/%s %1d %s\n" % (self._address_bytes, axis, command)
203 )
204 data = self._conn.readline()
205 reply = _ZaberReply(data)
206 self._validate_reply(reply)
207 return reply
208
209 def is_busy(self) -> bool:
210 return self.command(b"").status == b"BUSY"
211
212 def wait_until_idle(self, timeout: float = 10.0) -> None:
213 """Wait, or error, until device is idle.
214
215 A device is busy if *any* of its axis is busy.
216 """
217 sleep_interval = 0.1
218 for _ in range(int(timeout / sleep_interval)):
219 if not self.is_busy():
220 break
221 time.sleep(sleep_interval)
222 else:
224 "device still busy after %f seconds" % timeout
225 )
226
227 def get_number_axes(self) -> int:
228 """Reports the number of axes in the device."""
229 return int(self.command(b"get system.axiscount").response)
230
231 def been_homed(self, axis: int = 0) -> bool:
232 """True if all axes, or selected axis, has been homed."""
233 reply = self.command(b"get limit.home.triggered", axis)
234 return all([int(x) for x in reply.response.split()])
235
236 def home(self, axis: int = 0) -> None:
237 """Move the axis to the home position."""
238 self.command(b"home", axis)
239
240 def get_rotation_length(self, axis: int) -> int:
241 """Number of microsteps needed to complete one full rotation.
242
243 This is only valid on controllers and rotary devices including
244 filter wheels and filter cube turrets.
245 """
246 return int(self.command(b"get limit.cycle.dist", axis).response)
247
248 def get_index_distance(self, axis: int) -> int:
249 """The distance between consecutive index positions."""
250 return int(self.command(b"get motion.index.dist", axis).response)
251
252 def get_current_index(self, axis: int) -> int:
253 """The current index number or zero if between index positions."""
254 return int(self.command(b"get motion.index.num", axis).response)
255
256 def move_to_index(self, axis: int, index: int) -> None:
257 self.command(b"move index %d" % index, axis)
258
259 def move_to_absolute_position(self, axis: int, position: int) -> None:
260 self.command(b"move abs %d" % position, axis)
261
262 def move_by_relative_position(self, axis: int, position: int) -> None:
263 self.command(b"move rel %d" % position, axis)
264
265 def get_absolute_position(self, axis: int) -> int:
266 """Current absolute position of an axis, in microsteps."""
267 return int(self.command(b"get pos", axis).response)
268
269 def get_limit_max(self, axis: int) -> int:
270 """The maximum position the device can move to, in microsteps."""
271 return int(self.command(b"get limit.max", axis).response)
272
273 def get_limit_min(self, axis: int) -> int:
274 """The minimum position the device can move to, in microsteps."""
275 return int(self.command(b"get limit.min", axis).response)
276
277 def lamp_off(self, channel: int) -> None:
278 self.command(b"lamp off", channel)
279
280 def lamp_on(self, channel: int) -> None:
281 self.command(b"lamp on", channel)
282
283 def get_lamp_max_flux(self, channel: int) -> float:
284 return float(self.command(b"get lamp.flux.max", channel).response)
285
286 def get_lamp_flux(self, channel: int) -> float:
287 return float(self.command(b"get lamp.flux", channel).response)
288
289 def set_lamp_flux(self, channel: int, flux: float) -> None:
290 self.command(b"set lamp.flux %.3f" % flux, channel)
291
292 def get_lamp_is_on(self, channel: int) -> bool:
293 return self.command(b"get lamp.status", channel).response == b"2"
294
295 def get_lamp_temperature(self, channel: int) -> float:
296 return float(self.command(b"get lamp.temperature", channel).response)
297
298
300 def __init__(self, dev_conn: _ZaberDeviceConnection, axis: int) -> None:
301 super().__init__()
302 self._dev_conn = dev_conn
303 self._axis = axis
304
305 def move_by(self, delta: float) -> None:
306 self._dev_conn.move_by_relative_position(self._axis, int(delta))
307 self._dev_conn.wait_until_idle()
308
309 def move_to(self, pos: float) -> None:
310 self._dev_conn.move_to_absolute_position(self._axis, int(pos))
311 self._dev_conn.wait_until_idle()
312
313 @property
314 def position(self) -> float:
315 if self._dev_conn.is_busy():
316 _logger.warning("querying stage axis position but device is busy")
317 self._dev_conn.wait_until_idle()
318 return float(self._dev_conn.get_absolute_position(self._axis))
319
320 @property
321 def limits(self) -> microscope.AxisLimits:
322 min_limit = self._dev_conn.get_limit_min(self._axis)
323 max_limit = self._dev_conn.get_limit_max(self._axis)
324 return microscope.AxisLimits(lower=min_limit, upper=max_limit)
325
326
328 def __init__(
329 self, conn: _ZaberConnection, device_address: int, **kwargs
330 ) -> None:
331 super().__init__(**kwargs)
332 self._dev_conn = _ZaberDeviceConnection(conn, device_address)
333 self._axes = {
334 str(i): _ZaberStageAxis(self._dev_conn, i)
335 for i in range(1, self._dev_conn.get_number_axes() + 1)
336 }
337
338 def _do_shutdown(self) -> None:
339 pass
340
341 def _do_enable(self) -> bool:
342 # Before a device can moved, it first needs to establish a
343 # reference to the home position. We won't be able to move
344 # unless we home it first.
345 if not self._dev_conn.been_homed():
346 self._dev_conn.home()
347 return True
348
349 def may_move_on_enable(self) -> bool:
350 return not self._dev_conn.been_homed()
351
352 @property
353 def axes(self) -> typing.Mapping[str, microscope.abc.StageAxis]:
354 return self._axes
355
356 def move_by(self, delta: typing.Mapping[str, float]) -> None:
357 """Move specified axes by the specified distance."""
358 for axis_name, axis_delta in delta.items():
359 self._dev_conn.move_by_relative_position(
360 int(axis_name),
361 int(axis_delta),
362 )
363 self._dev_conn.wait_until_idle()
364
365 def move_to(self, position: typing.Mapping[str, float]) -> None:
366 """Move specified axes by the specified distance."""
367 for axis_name, axis_position in position.items():
368 self._dev_conn.move_to_absolute_position(
369 int(axis_name),
370 int(axis_position),
371 )
372 self._dev_conn.wait_until_idle()
373
374
376 """Zaber filter wheels and filter cube turrets."""
377
378 def __init__(
379 self, conn: _ZaberConnection, device_address: int, **kwargs
380 ) -> None:
381 self._dev_conn = _ZaberDeviceConnection(conn, device_address)
382
383 if self._dev_conn.get_number_axes() != 1:
385 "Device with address %d is not a filter wheel" % device_address
386 )
387
388 rotation_length = self._dev_conn.get_rotation_length(1)
389 if rotation_length <= 0:
391 "Device with address %d is not a filter wheel" % device_address
392 )
393 positions = int(rotation_length / self._dev_conn.get_index_distance(1))
394
395 super().__init__(positions, **kwargs)
396
397 # Before a device can moved, it first needs to establish a
398 # reference to the home position. We won't be able to move
399 # unless we home it first. On a stage this happens during
400 # enable because the stage movemenet can be dangerous but on a
401 # filter wheel this is fine.
402 if not self._dev_conn.been_homed():
403 self._dev_conn.home()
404
405 def _do_shutdown(self) -> None:
406 pass
407
408 def _do_get_position(self) -> int:
409 if self._dev_conn.is_busy():
410 _logger.warning("querying filterwheel position but device is busy")
411 self._dev_conn.wait_until_idle()
412 # Zaber positions start at one, hence -1.
413 return self._dev_conn.get_current_index(axis=1) - 1
414
415 def _do_set_position(self, position: int) -> None:
416 # Zaber positions start at one, hence +1.
417 self._dev_conn.move_to_index(axis=1, index=position + 1)
418 self._dev_conn.wait_until_idle()
419
420
424):
425 """A single LED from a LED controller."""
426
427 def __init__(self, dev_conn: _ZaberDeviceConnection, channel: int) -> None:
428 super().__init__()
429 self._dev_conn = dev_conn
430 self._channel = channel
431 self._max_flux = self._dev_conn.get_lamp_max_flux(self._channel)
432 self.add_setting(
433 "temperature",
434 "float",
435 lambda: self._dev_conn.get_lamp_temperature(self._channel),
436 None,
437 values=tuple(),
438 )
439
440 for our_name, their_name in [
441 ("wavelength peak", "lamp.wavelength.peak"),
442 ("wavelength fwhm", "lamp.wavelength.fwhm"),
443 ]:
444 reply = self._dev_conn.command(
445 b"get %s" % their_name.encode(), self._channel
446 )
447 value = float(reply.response)
448 self.add_setting(
449 our_name,
450 "float",
451 lambda x=value: x,
452 None,
453 values=tuple(),
454 )
455
456 def _do_shutdown(self) -> None:
457 pass
458
459 def get_status(self) -> typing.List[str]:
460 return super().get_status()
461
462 def _do_enable(self) -> bool:
463 self._dev_conn.lamp_on(self._channel)
464 return True
465
466 def _do_disable(self) -> None:
467 self._dev_conn.lamp_off(self._channel)
468
469 def _do_get_power(self) -> float:
470 return self._dev_conn.get_lamp_flux(self._channel) / self._max_flux
471
472 def _do_set_power(self, power: float) -> None:
473 self._dev_conn.set_lamp_flux(self._channel, power * self._max_flux)
474
475 def get_is_on(self) -> bool:
476 return self._dev_conn.get_lamp_is_on(self._channel)
477
478
480 """This effectively means a Zaber X-LCA4 LED controller.
481
482 The X-LCA4 series is so far the only LED controller Zaber has.
483 Its documentation is not included on the ASCII protocol manual,
484 see the `controller specific manual online
485 <https://www.zaber.com/protocol-manual?device=X-LCA4>`_
486
487 """
488
489 def __init__(
490 self, conn: _ZaberConnection, device_address: int, **kwargs
491 ) -> None:
492 super().__init__(**kwargs)
493 self._dev_conn = _ZaberDeviceConnection(conn, device_address)
494 self._leds: typing.Dict[str, _ZaberLED] = {}
495
496 all_lamps = self._dev_conn.command(b"get lamp.status").response.split()
497 # We get one status per peripheral connection. Documentation
498 # states that a value of "0" means unplugged but with X-LCA4,
499 # firmware version 7.13 and build 8074, we get NA for
500 # unplugged. Also not sure how 0 (unplugged) would differ
501 # from 5 (peripheral not connected). So we count as valid
502 # those with status 1 (turned off), 2 (turned on), and 3
503 # (fault).
504 for i, lamp_state in enumerate(all_lamps, start=1):
505 if lamp_state in [b"1", b"2", b"3"]:
506 # The labels on the controller are LED1, LED2, etc, so
507 # use the same for key.
508 self._leds["LED%d" % i] = _ZaberLED(self._dev_conn, i)
509 else:
510 _logger.info("no LED %d, status is %s", i, lamp_state.decode())
511
512 @property
513 def devices(self) -> typing.Dict[str, _ZaberLED]:
514 return self._leds
515
516
517class ZaberDeviceType(enum.Enum):
518 """Enumerator for Zaber device types.
519
520 This enum is used to specify the type of device for each address
521 when constructing a :class:`ZaberDaisyChain`.
522 """
523
524 # We require the use of an enum instead of directly specifying the
525 # class to keep the individual device classes private.
526 STAGE = _ZaberStage
527 FILTER_WHEEL = _ZaberFilterWheel
528 LED_CONTROLLER = _ZaberLEDController
529
530
532 """A daisy chain of Zaber devices.
533
534 Args:
535 port: the port name to connect to. For example, `COM1`,
536 `/dev/ttyUSB0`, or `/dev/cuad1`.
537 address2type: a map of device addresses to the corresponding
538 :class:`ZaberDeviceType`.
539
540 Zaber devices can be daisy-chained, i.e., a set of Zaber devices
541 can be connected in a sequence so that each device is only wired
542 to the previous and next device in the sequence, and only the
543 first device in the sequence is connected to the computer. Even
544 if there is only Zaber device, this is modelled as a one element
545 daisy-chain. If there are multiple devices, all connected
546 directly to the computer, i.e., not chained, then each device is
547 its own one-element daisy-chain.
548
549 .. code-block:: python
550
551 from microscope.controllers.zaber import ZaberDaisyChain, ZaberDeviceType
552 zaber = ZaberDaisyChain("/dev/ttyUSB0",
553 {2: ZaberDeviceType.STAGE,
554 3: ZaberDeviceType.LED_CONTROLLER,
555 4: ZaberDeviceType.FILTER_WHEEL})
556
557 # Device names are strings, not int.
558 filterwheel = zaber.devices['4']
559
560 # LEDs are not devices of the zaber daisy chain, they are
561 # devices of the LED controller.
562 led_controller = zaber.devices['3']
563 led1 = led_controller.devices['LED1']
564
565 # Stage axis names are the string of the axis number.
566 xy_stage = zaber.devices['2']
567 motor1 = xy_stage.axes['1']
568
569 Each device on a chain is identified by a device address which is
570 an integer between 1 and 99. By default, the addresses start at 1
571 and are sorted by distance to the computer, but this can be
572 changed.
573
574 For an LED controller device, the names of its devices are "LED1",
575 "LED2", etc, the same as the labels on the LED controller itself.
576
577 Because there is no method to correctly guess a device type, a map
578 of device addresses to device types is required.
579
580 .. note::
581
582 Zaber devices need to be homed before they can be moved. A
583 stage will be homed during `enable` but a filter wheel will be
584 homed during the object construction.
585
586 """
587
588 def __init__(
589 self,
590 port: str,
591 address2type: typing.Mapping[int, ZaberDeviceType],
592 **kwargs,
593 ) -> None:
594 super().__init__(**kwargs)
595 self._conn = _ZaberConnection(port, baudrate=115200, timeout=0.5)
596 self._devices: typing.Dict[str, microscope.abc.Device] = {}
597
598 for address, device_type in address2type.items():
599 if address < 1 or address > 99:
600 raise ValueError("address must be an integer between 1-99")
601 dev_cls = device_type.value
602 self._devices[str(address)] = dev_cls(self._conn, address)
603
604 @property
605 def devices(self) -> typing.Dict[str, microscope.abc.Device]:
606 return self._devices
None add_setting(self, name, dtype, get_func, set_func, values, typing.Optional[typing.Callable[[], bool]] readonly=None)
Definition abc.py:407
None _validate_reply(self, _ZaberReply reply)
Definition zaber.py:177
_ZaberReply command(self, bytes command, int axis=0)
Definition zaber.py:189
None wait_until_idle(self, float timeout=10.0)
Definition zaber.py:212
typing.List[str] get_status(self)
Definition zaber.py:459
None _do_set_power(self, float power)
Definition zaber.py:472
microscope.AxisLimits limits(self)
Definition zaber.py:321
typing.Mapping[str, microscope.abc.StageAxis] axes(self)
Definition zaber.py:353
None move_by(self, typing.Mapping[str, float] delta)
Definition zaber.py:356
None move_to(self, typing.Mapping[str, float] position)
Definition zaber.py:365