26There is support for A-Series and X-Series devices that have firmware
27version 6.06 or higher, as these are the ones who support the ASCII
32 Create non controller classes for the simpler case where there is
33 only one Zaber device, and modelling it as controller device is
38 Consider using `__slots__` on the `_ZaberReply` for performance.
55_logger = logging.getLogger(__name__)
58_SPACE_CODE = ord(b
" ")
62 """Wraps a Zaber reply to easily index its multiple fields."""
64 def __init__(self, data: bytes) ->
None:
68 or data[-2:] != b
"\r\n"
69 or any([data[i] != _SPACE_CODE
for i
in (3, 5, 8, 13, 16)])
71 raise ValueError(
"Not a valid reply from a Zaber device")
75 """The start of reply with device address and space."""
76 return self.
_data[1:3]
80 """The reply flag indicates if the message was accepted or rejected.
82 Can be `b"OK"` (accepted) or `b"RJ"` (rejected). If rejected,
83 the response property will be one word with the reason why.
85 return self.
_data[6:8]
89 """``b"BUSY"`` when the axis is moving and ``b"IDLE"`` otherwise.
91 If the reply message applies to the whole device, the status
92 is `b"BUSY"` if any axis is busy and `b"IDLE"` if all axes are
95 return self.
_data[9:13]
99 """The highest priority warning currently active.
101 This will be `b'--'` under normal conditions. Anything else
104 return self.
_data[14:16]
107 def response(self) -> bytes:
109 return self.
_data[17:-2]
113 """Wraps a serial connection with a reentrant lock.
115 This class is just the wrap to :class:`serial.Serial`. The class
116 exposing the Zaber commands interface is
117 :class:`_ZaberDeviceConnection`.
119 .. todo: replace with microscope._utils.SharedSerial
122 def __init__(self, port: str, baudrate: int, timeout: float) ->
None:
127 bytesize=serial.EIGHTBITS,
128 stopbits=serial.STOPBITS_ONE,
129 parity=serial.PARITY_NONE,
134 self.
_lock = threading.RLock()
140 lines = self.
_serial.readlines()
141 if not all([l.startswith(b
"@")
for l
in lines]):
143 "'%s' does not respond like a Zaber device" % port
147 def lock(self) -> threading.RLock:
150 def write(self, data: bytes) -> int:
152 return self.
_serial.write(data)
154 def readline(self, size: int = -1) -> bytes:
156 return self.
_serial.readline(size)
160 """A Zaber connection to control a single device.
162 This class provides a Python interface to the Zaber commands. It
163 also does the routing of commands to the correct device in the
167 conn: the :class:`_ZaberConnection` instance for this device.
168 device_address: the device address for the specific device.
169 This is the number used at the start of all Zaber
173 def __init__(self, conn: _ZaberConnection, device_address: int) ->
None:
177 def _validate_reply(self, reply: _ZaberReply) ->
None:
180 "received reply from a device with different"
181 " address (%s instead of %s)"
184 if reply.flag != b
"OK":
186 "command rejected because '%s'" % reply.response.decode()
189 def command(self, command: bytes, axis: int = 0) -> _ZaberReply:
190 """Send command and return reply.
193 command: a bytes array with the command and its
195 axis: the axis number to send the command. If zero, the
196 command is executed by all axis in the device.
200 with self.
_conn.lock:
204 data = self.
_conn.readline()
209 def is_busy(self) -> bool:
210 return self.
command(b
"").status == b
"BUSY"
213 """Wait, or error, until device is idle.
215 A device is busy if *any* of its axis is busy.
218 for _
in range(int(timeout / sleep_interval)):
221 time.sleep(sleep_interval)
224 "device still busy after %f seconds" % timeout
228 """Reports the number of axes in the device."""
229 return int(self.
command(b
"get system.axiscount").response)
232 """True if all axes, or selected axis, has been homed."""
233 reply = self.
command(b
"get limit.home.triggered", axis)
234 return all([int(x)
for x
in reply.response.split()])
236 def home(self, axis: int = 0) ->
None:
237 """Move the axis to the home position."""
241 """Number of microsteps needed to complete one full rotation.
243 This is only valid on controllers and rotary devices including
244 filter wheels and filter cube turrets.
246 return int(self.
command(b
"get limit.cycle.dist", axis).response)
249 """The distance between consecutive index positions."""
250 return int(self.
command(b
"get motion.index.dist", axis).response)
253 """The current index number or zero if between index positions."""
254 return int(self.
command(b
"get motion.index.num", axis).response)
256 def move_to_index(self, axis: int, index: int) ->
None:
257 self.
command(b
"move index %d" % index, axis)
259 def move_to_absolute_position(self, axis: int, position: int) ->
None:
260 self.
command(b
"move abs %d" % position, axis)
262 def move_by_relative_position(self, axis: int, position: int) ->
None:
263 self.
command(b
"move rel %d" % position, axis)
266 """Current absolute position of an axis, in microsteps."""
267 return int(self.
command(b
"get pos", axis).response)
270 """The maximum position the device can move to, in microsteps."""
271 return int(self.
command(b
"get limit.max", axis).response)
274 """The minimum position the device can move to, in microsteps."""
275 return int(self.
command(b
"get limit.min", axis).response)
277 def lamp_off(self, channel: int) ->
None:
278 self.
command(b
"lamp off", channel)
280 def lamp_on(self, channel: int) ->
None:
281 self.
command(b
"lamp on", channel)
283 def get_lamp_max_flux(self, channel: int) -> float:
284 return float(self.
command(b
"get lamp.flux.max", channel).response)
286 def get_lamp_flux(self, channel: int) -> float:
287 return float(self.
command(b
"get lamp.flux", channel).response)
289 def set_lamp_flux(self, channel: int, flux: float) ->
None:
290 self.
command(b
"set lamp.flux %.3f" % flux, channel)
292 def get_lamp_is_on(self, channel: int) -> bool:
293 return self.
command(b
"get lamp.status", channel).response == b
"2"
295 def get_lamp_temperature(self, channel: int) -> float:
296 return float(self.
command(b
"get lamp.temperature", channel).response)
300 def __init__(self, dev_conn: _ZaberDeviceConnection, axis: int) ->
None:
316 _logger.warning(
"querying stage axis position but device is busy")
321 def limits(self) -> microscope.AxisLimits:
329 self, conn: _ZaberConnection, device_address: int, **kwargs
331 super().__init__(**kwargs)
335 for i
in range(1, self.
_dev_conn.get_number_axes() + 1)
345 if not self._dev_conn.been_homed():
346 self._dev_conn.home()
353 def axes(self) -> typing.Mapping[str, microscope.abc.StageAxis]:
356 def move_by(self, delta: typing.Mapping[str, float]) ->
None:
357 """Move specified axes by the specified distance."""
358 for axis_name, axis_delta
in delta.items():
359 self.
_dev_conn.move_by_relative_position(
365 def move_to(self, position: typing.Mapping[str, float]) ->
None:
366 """Move specified axes by the specified distance."""
367 for axis_name, axis_position
in position.items():
368 self.
_dev_conn.move_to_absolute_position(
376 """Zaber filter wheels and filter cube turrets."""
379 self, conn: _ZaberConnection, device_address: int, **kwargs
383 if self.
_dev_conn.get_number_axes() != 1:
385 "Device with address %d is not a filter wheel" % device_address
388 rotation_length = self.
_dev_conn.get_rotation_length(1)
389 if rotation_length <= 0:
391 "Device with address %d is not a filter wheel" % device_address
393 positions = int(rotation_length / self.
_dev_conn.get_index_distance(1))
395 super().__init__(positions, **kwargs)
408 def _do_get_position(self) -> int:
409 if self._dev_conn.is_busy():
410 _logger.warning(
"querying filterwheel position but device is busy")
411 self._dev_conn.wait_until_idle()
413 return self._dev_conn.get_current_index(axis=1) - 1
415 def _do_set_position(self, position: int) ->
None:
417 self._dev_conn.move_to_index(axis=1, index=position + 1)
418 self._dev_conn.wait_until_idle()
425 """A single LED from a LED controller."""
427 def __init__(self, dev_conn: _ZaberDeviceConnection, channel: int) ->
None:
440 for our_name, their_name
in [
441 (
"wavelength peak",
"lamp.wavelength.peak"),
442 (
"wavelength fwhm",
"lamp.wavelength.fwhm"),
445 b
"get %s" % their_name.encode(), self.
_channel
447 value = float(reply.response)
480 """This effectively means a Zaber X-LCA4 LED controller.
482 The X-LCA4 series is so far the only LED controller Zaber has.
483 Its documentation is not included on the ASCII protocol manual,
484 see the `controller specific manual online
485 <https://www.zaber.com/protocol-manual?device=X-LCA4>`_
490 self, conn: _ZaberConnection, device_address: int, **kwargs
492 super().__init__(**kwargs)
494 self.
_leds: typing.Dict[str, _ZaberLED] = {}
496 all_lamps = self.
_dev_conn.command(b
"get lamp.status").response.split()
504 for i, lamp_state
in enumerate(all_lamps, start=1):
505 if lamp_state
in [b
"1", b
"2", b
"3"]:
510 _logger.info(
"no LED %d, status is %s", i, lamp_state.decode())
513 def devices(self) -> typing.Dict[str, _ZaberLED]:
518 """Enumerator for Zaber device types.
520 This enum is used to specify the type of device for each address
521 when constructing a :class:`ZaberDaisyChain`.
527 FILTER_WHEEL = _ZaberFilterWheel
528 LED_CONTROLLER = _ZaberLEDController
532 """A daisy chain of Zaber devices.
535 port: the port name to connect to. For example, `COM1`,
536 `/dev/ttyUSB0`, or `/dev/cuad1`.
537 address2type: a map of device addresses to the corresponding
538 :class:`ZaberDeviceType`.
540 Zaber devices can be daisy-chained, i.e., a set of Zaber devices
541 can be connected in a sequence so that each device is only wired
542 to the previous and next device in the sequence, and only the
543 first device in the sequence is connected to the computer. Even
544 if there is only Zaber device, this is modelled as a one element
545 daisy-chain. If there are multiple devices, all connected
546 directly to the computer, i.e., not chained, then each device is
547 its own one-element daisy-chain.
549 .. code-block:: python
551 from microscope.controllers.zaber import ZaberDaisyChain, ZaberDeviceType
552 zaber = ZaberDaisyChain("/dev/ttyUSB0",
553 {2: ZaberDeviceType.STAGE,
554 3: ZaberDeviceType.LED_CONTROLLER,
555 4: ZaberDeviceType.FILTER_WHEEL})
557 # Device names are strings, not int.
558 filterwheel = zaber.devices['4']
560 # LEDs are not devices of the zaber daisy chain, they are
561 # devices of the LED controller.
562 led_controller = zaber.devices['3']
563 led1 = led_controller.devices['LED1']
565 # Stage axis names are the string of the axis number.
566 xy_stage = zaber.devices['2']
567 motor1 = xy_stage.axes['1']
569 Each device on a chain is identified by a device address which is
570 an integer between 1 and 99. By default, the addresses start at 1
571 and are sorted by distance to the computer, but this can be
574 For an LED controller device, the names of its devices are "LED1",
575 "LED2", etc, the same as the labels on the LED controller itself.
577 Because there is no method to correctly guess a device type, a map
578 of device addresses to device types is required.
582 Zaber devices need to be homed before they can be moved. A
583 stage will be homed during `enable` but a filter wheel will be
584 homed during the object construction.
591 address2type: typing.Mapping[int, ZaberDeviceType],
594 super().__init__(**kwargs)
598 for address, device_type
in address2type.items():
599 if address < 1
or address > 99:
600 raise ValueError(
"address must be an integer between 1-99")
601 dev_cls = device_type.value
605 def devices(self) -> typing.Dict[str, microscope.abc.Device]:
None add_setting(self, name, dtype, get_func, set_func, values, typing.Optional[typing.Callable[[], bool]] readonly=None)
threading.RLock lock(self)
int get_number_axes(self)
int get_rotation_length(self, int axis)
int get_limit_max(self, int axis)
None home(self, int axis=0)
int get_limit_min(self, int axis)
bool been_homed(self, int axis=0)
None _validate_reply(self, _ZaberReply reply)
int get_current_index(self, int axis)
_ZaberReply command(self, bytes command, int axis=0)
None wait_until_idle(self, float timeout=10.0)
int get_index_distance(self, int axis)
int get_absolute_position(self, int axis)
typing.List[str] get_status(self)
None _do_set_power(self, float power)
float _do_get_power(self)
None move_to(self, float pos)
None move_by(self, float delta)
microscope.AxisLimits limits(self)
typing.Mapping[str, microscope.abc.StageAxis] axes(self)
bool may_move_on_enable(self)
None move_by(self, typing.Mapping[str, float] delta)
None move_to(self, typing.Mapping[str, float] position)