BioImager  3.9.1
A .NET microscopy imaging library. Supports various microscopes by using imported libraries & GUI automation. Supported libraries include Prior® & Zeiss® & all devices supported by Micromanager 2.0 and python-microscope.
Loading...
Searching...
No Matches
ludl.py
1#!/usr/bin/env python3
2
3## Copyright (C) 2020 David Miguel Susano Pinto <carandraug@gmail.com>
4## Copyright (C) 2022 Ian Dobbie <ian.dobbie@jhu.edu>
5##
6##
7## This file is part of Microscope.
8##
9## Microscope is free software: you can redistribute it and/or modify
10## it under the terms of the GNU General Public License as published by
11## the Free Software Foundation, either version 3 of the License, or
12## (at your option) any later version.
13##
14## Microscope is distributed in the hope that it will be useful,
15## but WITHOUT ANY WARRANTY; without even the implied warranty of
16## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17## GNU General Public License for more details.
18##
19## You should have received a copy of the GNU General Public License
20## along with Microscope. If not, see <http://www.gnu.org/licenses/>.
21
22"""Ludl controller.
23"""
24
25import contextlib
26import threading
27import typing
28import time
29import serial
30import re
31
32import microscope.abc
33
34
35
36# so far very basic support for stages
37# no support for filter, shutters, or slide loader as I dont have hardware
38
39# Note:
40# commands end in a '\r' but replies return ending in '\n'!
41
42# errors
43# -1 Unknown command
44# -2 Illegal point type or axis, or module not installed
45# -3 Not enough parameters (e.g. move r=)
46# -4 Parameter out of range
47# -21 Process aborted by HALT command
48
49# Slide Loader:
50# -4 (parameter out of range) used for cassette or slot range errors
51# -10 No slides selected
52# -11 End of list reached
53# -12 Slide error
54# -16 Motor move error (move not completed successfully due to stall,
55# end limit, etc….)
56# -17 Initialization erro
57
58#On startup the stage move to extremes to find limits and then sets
59# the -ve limit on each axis to 0.
60
61LUDL_ERRORS = { -1: 'Unknown command',
62 -2: 'Illegal point type or axis, or module not installed',
63 -3: 'Not enough parameters (e.g. move r=)',
64 -4: 'Parameter out of range',
65 -21: 'Process aborted by HALT command',
66 # Slide Loader:
67 #-4:, (parameter out of range) used for cassette or slot range errors
68 -10: 'No slides selected',
69 -11: 'End of list reached',
70 -12: 'Slide error',
71 -16: 'Motor move error (move not completed successfully due to stall, end limit, etc.',
72 -17: 'Initialization error',
73 }
74
75AXIS_MAPPER = { 1: 'X' ,
76 2: 'Y' ,
77 3: 'Z' ,
78 }
79
81 """Connection to a Ludl Controller and wrapper to its commands.
82
83 Tested with MC2000 controller and xy stage.
84
85
86 This class also implements the logic to parse and validate
87 commands so it can be shared between multiple devices.
88
89 This class has only been tested on a MAC2000 controller from the
90 1990's however newer controllers should be compatible.
91
92 """
93
94 def __init__(self, port: str, baudrate: int, timeout: float) -> None:
95 # From the technical datasheet: 8 bit word 1 stop bit, no
96 # parity no handshake, baudrate options of 9600, 19200, 38400,
97 # 57600 and 115200.
98 self._serial = serial.Serial(
99 port=port,
100 baudrate=baudrate,
101 timeout=timeout,
102 bytesize=serial.EIGHTBITS,
103 stopbits=serial.STOPBITS_TWO,
104 parity=serial.PARITY_NONE,
105 xonxoff=False,
106 rtscts=False,
107 dsrdtr=False,
108 )
109 self._lock = threading.RLock()
110
111 with self._lock:
112 # We do not use the general get_description() here because
113 # if this is not a ProScan device it would never reach the
114 # '\rEND\r' that signals the end of the description.
115 try:
116 self.command(b'RCONFIG')
117 answer = self.read_multiline()
118 except:
119 print("Unable to read configuration. Is Ludl connected?")
120 return
121 # parse config responce which tells us what devices are present
122 # on this controller.
123
124 self._devlist={}
125
126 for line in answer[4:-1]:
127 #loop through lines 4 to second last one which are devices
128 #present on this controller
129 devinfo=re.split(r"\s{2,}",line.decode('ascii'))
130 # dev address,label,id,description, type
131 self._devlist[devinfo[0]]=devinfo[1:]
132
133# print(answer)
134
135
136 def is_busy(self):
137 pass
138
139 def get_number_axes(self):
140 return 2
141
142 def command(self, command: bytes) -> None:
143 """Send command to device."""
144 with self._lock:
145 self._serial.write(command + b"\r")
146
147 def readline(self) -> bytes:
148 """Read a line from the device connection until '\n'."""
149 with self._lock:
150 return self._serial.read_until(b"\n")
151
152 def read_multiline(self):
153 output=[]
154 line=True
155 while (line):
156 line=self.readline()
157 output.append(line.strip())
158 if line==b'N' or line[0:2] == b':A' :
159 #thins means an end of command strings doesn require an
160 #additional timeout before it returns
161 return (output)
162 elif line[0] == b'N':
163 #this is an error string
164 error=line[2:].strip()
165 raise('Ludl controller error: %s,%s' % (error,LUDL_ERRORS[error]))
166 return(output)
167
168 def read_until_timeout(self) -> None:
169 """Read until timeout; used to clean buffer if in an unknown state."""
170 with self._lock:
171 self._serial.flushInput()
172 while self._serial.readline():
173 continue
174
175 def wait_until_idle(self) -> None:
176 """Keep sending the 'STATUS' comand until the respnce
177 returns b'0\r' """
178 self._command_and_validate(b'STATUS', b"N")
179
180 def _command_and_validate(self, command: bytes, expected: bytes) -> None:
181 with self._lock:
182 answer = self.get_command(command)
183 if answer == b':A \n' :
184 #wait for move to stop
185 while(self.get_command(b'STATUS') != expected):
186 time.sleep(0.01)
187 return answer
188
189 def get_command(self, command: bytes) -> bytes:
190 """Send get command and return the answer."""
191 with self._lock:
192 self.command(command)
193 return self.readline()
194
195 def move_command(self, command: bytes) -> None:
196 """Send a move command and check return value."""
197 # Movement commands respond with ":A \n" but the move is then
198 # being performed. The move is only finihsed once the
199 # "STATUS" command returns "N" rather than "B"
200 #self._command_and_validate(command, b"N")
201 #
202 #actully beter to just issue the move command and rely on
203 #other process to check position
204 self.get_command(command)
205
206 def move_by_relative_position(self, axis: bytes, delta: float) -> None:
207 """Send a realtive movement command to stated axis"""
208 axisname=AXIS_MAPPER[axis]
209 self.move_command(bytes('MOVREL {0}={1}'.format(axisname,
210 str(delta)),'ascii'))
211
212 def move_to_absolute_position(self, axis: bytes, pos: float) -> None:
213 """Send a realtive movement command to stated axis"""
214 axisname=AXIS_MAPPER[axis]
215 self.move_command(bytes('MOVE {0}={1}'.format(axisname,
216 str(pos)),'ascii'))
217 def move_to_limit(self,axis: bytes,speed: int):
218 axisname=AXIS_MAPPER[axis]
219 self.get_command(bytes('SPIN {0}={1}'.format(axisname,speed),'ascii'))
220
221 def motor_moving(self,axis: bytes) -> int:
222 axisname=AXIS_MAPPER[axis]
223 reply=self.get_command(bytes('RDSTAT {0}'.format(axisname),'ascii'))
224 flags=int(reply.strip()[3:])
225 return (flags&1)
226
227 def set_speed(self, axis: bytes, speed: int) -> None:
228 axisname=AXIS_MAPPER[axis]
229 self.get_command(bytes('SPEED {0}={1}'.format(axisname,speed),'ascii'))
230
231
232 def wait_for_motor_stop(self,axis: bytes):
233 while(self.motor_moving(axis)):
234 time.sleep(0.1)
235
236
237 def reset_position(self, axis: bytes):
238 axisname=AXIS_MAPPER[axis]
239 self.get_command(bytes('HERE {0}=0'.format(axisname),'ascii'))
240
241 def get_absolute_position(self, axis: bytes) -> float:
242 axisname=AXIS_MAPPER[axis]
243 position=self.get_command(bytes('WHERE {0}'.format(axisname),'ascii'))
244 if position[3:4]==b'N':
245 print("Error: {0} : {1}".format(position,
246 LUDL_ERRORS[int(position[4:6])]))
247 else:
248 return float(position.strip()[2:])
249
250 def set_command(self, command: bytes) -> None:
251 """Send a set command and check return value."""
252 # Property type commands that set certain status respond with
253 # zero. They respond with a zero even if there are invalid
254 # arguments in the command.
255 self._command_and_validate(command, b"0\r")
256
257 def get_description(self, command: bytes) -> bytes:
258 """Send a get description command and return it."""
259 with self._lock:
260 self.command(command)
261 return self._serial.read_until(b"\rEND\r")
262
263 @contextlib.contextmanager
264 def changed_timeout(self, new_timeout: float):
265 previous = self._serial.timeout
266 try:
267 self._serial.timeout = new_timeout
268 yield
269 finally:
270 self._serial.timeout = previous
271
272
274 def __init__(self, dev_conn: _LudlController, axis: str) -> None:
275 super().__init__()
276 self._dev_conn = dev_conn
277 self._axis = axis
278 # not a good solution as min/max are used to build the stage map in
279 # mosaic etc... Maybe we just need to know it!
280 self.min_limit = 0.0
281 self.max_limit = 100000.0
282 self.set_speed(100000)
283
284 def move_by(self, delta: float) -> None:
285 self._dev_conn.move_by_relative_position(self._axis, int(delta))
286
287 def move_to(self, pos: float) -> None:
288 self._dev_conn.move_to_absolute_position(self._axis, int(pos))
289
290 @property
291 def position(self) -> float:
292 if self._dev_conn.is_busy():
293 _logger.warning("querying stage axis position but device is busy")
294 self._dev_conn.wait_until_idle()
295 return float(self._dev_conn.get_absolute_position(self._axis))
296
297 @property
298 def limits(self) -> microscope.AxisLimits:
299 return microscope.AxisLimits(lower=self.min_limit, upper=self.max_limit)
300
301 # def speed(self) -> int:
302 # return self.speed
303
304
305 def home(self) -> None:
306 self.find_limits()
307 self.move_tomove_to(self.max_limit/2)
308
309 def set_speed(self, speed: int) -> None:
310 self.speed = speed
311 self._dev_conn.set_speed(self._axis, speed)
312
313 def find_limits(self,speed = 100000):
314 #drive axis to minimum pos, zero and then drive to max position
315 self._dev_conn.move_to_limit(self._axis,-speed)
316 # spin moves dont set the status info need to query the motor
317 # status byte
318 self._dev_conn.wait_for_motor_stop(self._axis)
319 # reset positon to zero.
320 print(self.positionposition)
321 self._dev_conn.reset_position(self._axis)
322 self.min_limit=0.0
323 self._dev_conn.homed = True
324 # move to positive limit
325 self._dev_conn.move_to_limit(self._axis,speed)
326 self._dev_conn.wait_for_motor_stop(self._axis)
328 return self.limitslimits
329
330
331
333 def __init__(
334 self, conn: _LudlController, **kwargs
335 ) -> None:
336 super().__init__(**kwargs)
337 self._dev_conn = conn
338 self._axes = {
339 str(i): _LudlStageAxis(self._dev_conn, i)
340 for i in range(1, 3)#self._dev_conn.get_number_axes() + 1)
341 }
342 self.homed = False
343
344 def _do_shutdown(self) -> None:
345 pass
346
347 def _do_enable(self) -> bool:
348 # Before a device can moved, it first needs to establish a
349 # reference to the home position. We won't be able to move
350 # unless we home it first.
351 if not self.homed:
352 axes=self.axes
353 for axis in axes:
354 self.axes[axis].home()
355 self.homed = True
356 return True
357
358
359 def may_move_on_enable(self) -> bool:
360 return not self.homed
361
362
363 @property
364 def axes(self) -> typing.Mapping[str, microscope.abc.StageAxis]:
365 return self._axes
366
367 def move_by(self, delta: typing.Mapping[str, float]) -> None:
368 """Move specified axes by the specified distance. """
369 for axis_name, axis_delta in delta.items():
370 self._dev_conn.move_by_relative_position(
371 int(axis_name), int(axis_delta),
372 )
373 self._dev_conn.wait_until_idle()
374
375 def move_to(self, position: typing.Mapping[str, float]) -> None:
376 """Move specified axes by the specified distance. """
377 print(position)
378 for axis_name, axis_position in position.items():
379 self._dev_conn.move_to_absolute_position(
380 int(axis_name), int(axis_position),
381 )
382 self._dev_conn.wait_until_idle()
383
384# def assert_filterwheel_number(self, number: int) -> None:
385# assert number > 0 and number < 4
386
387# def _has_thing(self, command: bytes, expected_start: bytes) -> bool:
388# # Use the commands that returns a description string to find
389# # whether a specific device is connected.
390# with self._lock:
391# description = self.get_description(command)
392# if not description.startswith(expected_start):
393# self.read_until_timeout()
394# raise RuntimeError(
395# "Failed to get description '%s' (got '%s')"
396# % (command.decode(), description.decode())
397# )
398# return not description.startswith(expected_start + b"NONE\r")
399
400 # def has_filterwheel(self, number: int) -> bool:
401 # self.assert_filterwheel_number(number)
402 # # We use the 'FILTER w' command to check if there's a filter
403 # # wheel instead of the '?' command. The reason is that the
404 # # third filter wheel, named "A AXIS" on the controller box and
405 # # "FOURTH" on the output of the '?' command, can be used for
406 # # non filter wheels. We hope that 'FILTER 3' will fail
407 # # properly if what is connected to "A AXIS" is not a filter
408 # # wheel.
409 # return self._has_thing(b"FILTER %d" % number, b"FILTER_%d = " % number)
410
411 # def get_n_filter_positions(self, number: int) -> int:
412 # self.assert_filterwheel_number(number)
413 # answer = self.get_command(b"FPW %d" % number)
414 # return int(answer)
415
416 # def get_filter_position(self, number: int) -> int:
417 # self.assert_filterwheel_number(number)
418 # answer = self.get_command(b"7 %d F" % number)
419 # return int(answer)
420
421 # def set_filter_position(self, number: int, pos: int) -> None:
422 # self.assert_filterwheel_number(number)
423 # self.move_command(b"7 %d %d" % (number, pos))
424
425
426#IMD 20220408
427#Not yet implemented filterwheel or shutters as I dont have any on my system
428
430 """Ludl MC 2000 controller.
431
432 .. note::
433
434 The Ludl MC5000 can control a stage, filter wheels and shutters.
435
436 """
437
438
439
440 def __init__(
441 self, port: str, baudrate: int = 9600, timeout: float = 0.5, **kwargs
442 ) -> None:
443 super().__init__(**kwargs)
444 self._conn = _LudlController(port, baudrate, timeout)
445 self._devices: typing.Mapping[str, microscope.abc.Device] = {}
446 self._devices['stage']=_LudlStage(self._conn)
447# # Can have up to three filter wheels, numbered 1 to 3.
448# for number in range(1, 4):
449# if self._conn.has_filterwheel(number):
450# key = "filter %d" % number
451# self._devices[key] = _ludlFilterWheel(self._conn, number)
452
453
454
455 @property
456 def devices(self) -> typing.Mapping[str, microscope.abc.Device]:
457 return self._devices
458
459# ludl controller can do filter wheels so leave this code for future adoption
460#
461# class _ludlFilterWheel(microscope.abc.FilterWheel):
462# def __init__(self, connection: _ludlConnection, number: int) -> None:
463# super().__init__(positions=connection.get_n_filter_positions(number))
464# self._conn = connection
465# self._number = number
466
467# def _do_get_position(self) -> int:
468# return self._conn.get_filter_position(self._number)
469
470# def _do_set_position(self, position: int) -> None:
471# self._conn.set_filter_position(self._number, position)
472
473# def _do_shutdown(self) -> None:
474# pass
None move_to(self, float pos)
Definition: abc.py:1370
microscope.AxisLimits limits(self)
Definition: abc.py:1382
float position(self)
Definition: abc.py:1376
None _command_and_validate(self, bytes command, bytes expected)
Definition: ludl.py:180
bytes get_description(self, bytes command)
Definition: ludl.py:257
int motor_moving(self, bytes axis)
Definition: ludl.py:221
None move_by_relative_position(self, bytes axis, float delta)
Definition: ludl.py:206
bytes get_command(self, bytes command)
Definition: ludl.py:189
None set_command(self, bytes command)
Definition: ludl.py:250
None move_command(self, bytes command)
Definition: ludl.py:195
None move_to_absolute_position(self, bytes axis, float pos)
Definition: ludl.py:212
None command(self, bytes command)
Definition: ludl.py:142
None set_speed(self, int speed)
Definition: ludl.py:309
None move_to(self, float pos)
Definition: ludl.py:287
None move_by(self, float delta)
Definition: ludl.py:284
microscope.AxisLimits limits(self)
Definition: ludl.py:298
def find_limits(self, speed=100000)
Definition: ludl.py:313
None move_to(self, typing.Mapping[str, float] position)
Definition: ludl.py:375
typing.Mapping[str, microscope.abc.StageAxis] axes(self)
Definition: ludl.py:364
None move_by(self, typing.Mapping[str, float] delta)
Definition: ludl.py:367