BioImager  3.9.1
A .NET microscopy imaging library. Supports various microscopes by using imported libraries & GUI automation. Supported libraries include PriorĀ® & ZeissĀ® & all devices supported by Micromanager 2.0 and python-microscope.
Loading...
Searching...
No Matches
mock_devices.py
1#!/usr/bin/env python3
2
3## Copyright (C) 2020 David Miguel Susano Pinto <carandraug@gmail.com>
4##
5## This file is part of Microscope.
6##
7## Microscope is free software: you can redistribute it and/or modify
8## it under the terms of the GNU General Public License as published by
9## the Free Software Foundation, either version 3 of the License, or
10## (at your option) any later version.
11##
12## Microscope is distributed in the hope that it will be useful,
13## but WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15## GNU General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with Microscope. If not, see <http://www.gnu.org/licenses/>.
19
20"""Mock devices to be used in test cases.
21
22These classes mock the different hardware as much as needed for our
23testing needs. Their behaviour is based first on the specifications
24we have, and second on what we actually experience. Our experience is
25that most hardware does not actually follow the specs.
26
27To fake a specific device type for interactive usage, use the dummy
28device classes instead. There's a concrete class for each device
29interface.
30"""
31
32import enum
33import io
34
35import serial.serialutil
36
37
38class SerialMock(serial.serialutil.SerialBase):
39 """Base class to mock devices controlled via serial.
40
41 It has two :class:`io.BytesIO` buffers. One :func:`write`s the
42 output buffer and the other :func:`read`s the input buffer. After
43 a write, the output buffer is analysed for a command. If there is
44 a command, stuff gets done. This usually means adding to the
45 input buffer and changing state of the device.
46
47 """
48
49 def __init__(self, *args, **kwargs):
50 super().__init__(*args, **kwargs)
51 self.in_buffer = io.BytesIO()
52 self.out_buffer = io.BytesIO()
53
54 # Number of bytes in out buffer pending 'interpretation'. A
55 # command is only interpreted and handled when EOL is seen.
56 self.out_pending_bytes = 0
57 self.out_parsed_bytes = 0
58
59 # Number of bytes in the input buffer that have been read
60 self.in_read_bytes = 0
61
62 def open(self):
63 pass
64
65 def close(self):
66 self.in_buffer.close()
67 self.out_buffer.close()
68
69 def handle(self, command):
70 raise NotImplementedError("sub classes need to implement handle()")
71
72 def write(self, data):
73 self.out_buffer.write(data)
74 self.out_pending_bytes += len(data)
75
76 if self.out_pending_bytes > len(data):
77 # we need to retrieve data from a previous write
78 self.out_buffer.seek(-self.out_pending_bytes, 2)
79 data = self.out_buffer.read(self.out_pending_bytes)
80
81 for msg in data.split(self.eol)[:-1]:
82 self.handle(msg)
83 self.out_pending_bytes -= len(msg) + len(self.eol)
84 return len(data)
85
86 def _readx_wrapper(self, reader, *args, **kwargs):
87 """Place pointer of input buffer before and after read methods"""
88 self.in_buffer.seek(self.in_read_bytes)
89 msg = reader(*args, **kwargs)
90 self.in_read_bytes += len(msg)
91 return msg
92
93 def read(self, size=1):
94 return self._readx_wrapper(self.in_buffer.read, size)
95
96 def readline(self, size=-1):
97 return self._readx_wrapper(self.in_buffer.readline, size)
98
99 def reset_input_buffer(self):
100 self.in_read_bytes = self.in_buffer.getbuffer().nbytes
101 self.in_buffer.seek(0, 2)
102
103 def reset_output_buffer(self):
104 pass
105
106
107class CoherentSapphireLaserMock(SerialMock):
108 """Modelled after a Coherent Sapphire LP 561nm laser.
109
110 This mocked device is constructed into the ready state. That is,
111 after the laser has been turned on enough time to warmup (~ 30
112 seconds), and then the key has been turned on for enough time to
113 actual get the laser ready (~10 seconds).
114
115 We don't mock the turning of the key, that's much trickier and we
116 don't need it yet. We'll do it if it ever becomes an issue, and
117 probably use a state machine library for that.
118
119 """
120
121 eol = b"\r\n"
122
123 # Communication parameters
124 baudrate = 19200
125 parity = serial.PARITY_NONE
126 bytesize = serial.EIGHTBITS
127 stopbits = serial.STOPBITS_ONE
128 rtscts = False
129 dsrdtr = False
130
131 # Laser is 200mW, range is 10 to 110%
132 default_power = 50.0
133 min_power = 20.0
134 max_power = 220.0
135
136 def __init__(self, *args, **kwargs):
137 super().__init__(*args, **kwargs)
138
139 self.key = "on"
140 self.status = "laser ready" # Laser ready, status code 5
141 self.light = True # Light Servo
142 self.tec = True # TEC (Thermo-Electric Cooler) Servo
143 self.echo = True
144 self.prompt = True
145 self.power = CoherentSapphireLaserMock.default_power
146
147 def write(self, data):
148 # Echo as soon as we get data, do not wait for an EOL. Also,
149 # echo before handling the command because if will echo even
150 # if the command is to turn the echo off.
151 if self.echo:
152 self.in_buffer.write(data)
153 else:
154 # If echo is off, we still echo EOLs
155 self.in_buffer.write(self.eol * data.count(self.eol))
156 return super().write(data)
157
158 def handle(self, command):
159 # Operator's manual mentions all commands in uppercase.
160 # Experimentation shows that they are case insensitive.
161 command = command.upper()
162
163 answer = None
164
165 # Prompt
166 if command == b">=0":
167 self.prompt = False
168 elif command == b">=1":
169 self.prompt = True
170
171 # Echo
172 elif command == b"E=0":
173 self.echo = False
174 elif command == b"E=1":
175 self.echo = True
176
177 # Head ID
178 elif command == b"?HID":
179 answer = b"505925.000"
180
181 # Head hours
182 elif command == b"?HH":
183 answer = b" 257:34"
184
185 # Key switch
186 elif command == b"?K":
187 if self.key == "standby":
188 answer = b"0"
189 elif self.key == "on":
190 answer = b"1"
191 else:
192 raise RuntimeError("unknown key state '%s'" % self.key)
193
194 # Light servo
195 elif command == b"L=0":
196 self.light = False
197 elif command == b"L=1":
198 if self.tec:
199 if self.key == "on":
200 self.light = True
201 # if key switch is not on, keep light off
202 else:
203 answer = b"TEC must be ON (T=1) to enable Light Output!"
204 elif command == b"?L":
205 answer = b"1" if self.light else b"0"
206
207 # TEC servo
208 elif command == b"T=0":
209 # turning this off, also turns light servo off
210 self.tec = False
211 self.light = False
212 elif command == b"T=1":
213 self.tec = True
214 elif command == b"?T":
215 answer = b"1" if self.tec else b"0"
216
217 # Laser power
218 elif command == b"?MINLP":
219 answer = b"20.000"
220 elif command == b"?MAXLP":
221 answer = b"220.000"
222 elif command == b"?P":
223 if not self.light:
224 answer = b"0.000"
225 else:
226 answer = b"%.3f" % (self.power)
227 elif command == b"?SP":
228 answer = b"%.3f" % (self.power)
229 elif command.startswith(b"P="):
230 new_power = float(command[2:])
231 if new_power < 19.999999 or new_power > 220.00001:
232 answer = b"value must be between 20.000 and 220.000"
233 else:
234 if not self.light:
235 answer = b"Note: Laser_Output is OFF (L=0)"
236 self.power = new_power
237
238 # Nominal output power
239 elif command == b"NOMP":
240 answer = b"200"
241
242 # Laser type and nominal power
243 elif command == b"LT":
244 answer = b"Sapphire 200mW"
245
246 # Laser head status
247 elif command == b"?STA":
248 status_codes = {
249 "start up": b"1",
250 "warmup": b"2",
251 "standby": b"3",
252 "laser on": b"4",
253 "laser ready": b"5",
254 "error": b"6",
255 }
256 answer = status_codes[self.status]
257
258 # Fault related commands. We don't model any faults yet.
259 elif command == b"?F":
260 answer = b"0"
261 elif command == b"?FF":
262 # Two bytes with possible faults:
263 # 0 - external interlock fault
264 # 1 - diode temperature fault (both TEC and light
265 # servo off)
266 # 2 - base plate temperature fault (both TEC and light
267 # servo off)
268 # 3 - OEM controller LP temperature (both TEC and
269 # light servo off)
270 # 4 - diode current fault
271 # 5 - analog interface fault
272 # 6 - base plate temperature fault (only light servo
273 # turned off)
274 # 7 - diode temperature fault (only light servo turned
275 # off)
276 # 8 - system warning/waiting for TEC servo to reach
277 # target temperature
278 # 9 - head EEPROM fault
279 # 10 - OEM controller LP EEPROM fault
280 # 11 - EEPOT1 fault
281 # 12 - EEPOT2 fault
282 # 13 - laser ready
283 # 14 - not implemented
284 # 15 - not implemented
285 if self.light:
286 # Has a bit of its own, but it's not really a fault.
287 answer = b"8192" # 00100000 00000000
288 else:
289 answer = b"0"
290 elif command == b"?FL":
291 # Show faults in text. This is a multiline reply, one
292 # per fault, plus the header line.
293 answer = b"Fault(s):\r\n\tNone"
294
295 # Software version
296 elif command in [b"sv", b"svps"]:
297 answer = b"8.005"
298
299 # Nominal laser wavelength
300 elif command == b"?WAVE":
301 answer = b"561"
302
303 else:
304 raise NotImplementedError(
305 "no handling for command '%s'" % command.decode("utf-8")
306 )
307
308 if answer is not None:
309 self.in_buffer.write(answer + self.eol)
310
311 if self.prompt:
312 self.in_buffer.write(b"Sapphire:0-> ")
313 return
314
315
317 """Modelled after a Cobolt Jive laser 561nm."""
318
319 eol = b"\r"
320
321 baudrate = 115200
322 parity = serial.PARITY_NONE
323 bytesize = serial.EIGHTBITS
324 stopbits = serial.STOPBITS_ONE
325 rtscts = False
326 dsrdtr = False
327
328 # Values in mW
329 default_power = 50.0
330 min_power = 0.0
331 max_power = 600.0
332
333 def __init__(self, *args, **kwargs):
334 super().__init__(*args, **kwargs)
335
336 self.power = CoboltLaserMock.default_power
337 self.light = False
338
339 self.interlock_open = False
340
341 self.auto_start = False
342 self.direct_control = False
343
344 self.on_after_interlock = False
345
346 self.fault = None
347
348 def handle(self, command):
349 # Leading and trailing whitespace is ignored.
350 command = command.strip()
351
352 # Acknowledgment string if command is not a query and there
353 # is no error.
354 answer = b"OK"
355
356 if command == b"sn?": # serial number
357 answer = b"7863"
358 elif command == b"gcn?":
359 answer = b"Macro-Gen5b-SHG-0501_4W-RevA"
360 elif command == b"ver?" or command == b"gfv?":
361 answer = b"50070"
362 elif command == b"gfvlas?":
363 answer = b"This laser head does not have firmware."
364 elif command == b"hrs?": # System operating hours
365 answer = b"828.98"
366
367 # TODO: This whole @cob0 and @cob1 need better testing on
368 # what it actually does. Documentation says that @cob1 is
369 # "Laser ON after interlock. Forces laser into
370 # autostart. without checking if autostart is enabled".
371 # @cob0 is undocumented.
372
373 # May be a bug but the commands @cob0 and @cob1 both have the
374 # effect of also turning off the laser.
375 elif command == b"@cob1":
376 self.on_after_interlock = True
377 self.light = False
378 elif command == b"@cob0":
379 self.on_after_interlock = False
380 self.light = False
381
382 elif command == b"@cobas?":
383 answer = b"1" if self.auto_start else b"0"
384 elif command == b"@cobas 0":
385 self.auto_start = False
386 elif command == b"@cobas 1":
387 self.auto_start = True
388
389 # Laser state
390 elif command == b"l?":
391 answer = b"1" if self.light else b"0"
392 elif command == b"l1":
393 if self.auto_start:
394 answer = b"Syntax error: not allowed in autostart mode."
395 else:
396 self.light = True
397 elif command == b"l0":
398 self.light = False
399
400 # Output power
401 elif command.startswith(b"p "):
402 # The p command takes values in W so convert to mW
403 new_power = float(command[2:]) * 1000.0
404 if new_power > self.max_power or new_power < self.min_power:
405 answer = b"Syntax error: Value is out of range."
406 else:
407 self.power = new_power
408 elif command == b"p?":
409 answer = b"%.4f" % (self.power / 1000.0)
410 elif command == b"pa?":
411 if self.light:
412 answer = b"%.4f" % (self.power / 1000.0)
413 else:
414 answer = b"0.0000"
415
416 # Undocumented. Seems to be the same as 'p ...'
417 elif command.startswith(b"@cobasp "):
418 return self.handlehandle(command[6:])
419
420 # Direct control
421 elif command == b"@cobasdr?":
422 answer = b"1" if self.direct_control else b"0"
423 elif command == b"@cobasdr 0":
424 self.direct_control = False
425 elif command == b"@cobasdr 1":
426 self.direct_control = False
427
428 # Undocumented. Seems to returns maximum laser power in mW.
429 elif command == b"gmlp?":
430 answer = b"600.000000"
431
432 # Are you there?
433 elif command == b"?":
434 answer = b"OK"
435
436 # Get operating fault
437 elif command == b"f?":
438 # The errors (which we don't model yet) are:
439 # 1 = temperature error
440 # 3 = interlock
441 # 4 = constant power fault
442 answer = b"0"
443
444 # Interlock state
445 elif command == b"ilk?":
446 answer = b"1" if self.interlock_open else b"0"
447
448 # Autostart program state
449 elif command == b"cobast?":
450 # This is completely undocumented. Manual
451 # experimentation seems to be:
452 # 0 = laser off with @cob0
453 # 1 = laser off with @cob1
454 # 2 = waiting for temperature
455 # 3 = warming up
456 # 4 = completed (laser on)
457 # 5 = fault (such as interlock)
458 # 6 = aborted
459 if self.light:
460 answer = b"4"
461 else:
462 answer = b"1" if self.on_after_interlock else b"0"
463
464 else:
465 raise NotImplementedError(
466 "no handling for command '%s'" % command.decode("utf-8")
467 )
468
469 # Sending a command is done with '\r' only. However,
470 # responses from the hardware end with '\r\n'.
471 self.in_buffer.write(answer + b"\r\n")
472
473
475 """Modelled after a TA Deepstar 488nm."""
476
477 eol = b"\r\n"
478
479 baudrate = 9600
480 parity = serial.PARITY_NONE
481 bytesize = serial.EIGHTBITS
482 stopbits = serial.STOPBITS_ONE
483 rtscts = False
484 dsrdtr = False
485
486 # Values in mW
487 default_power = 50.0
488 min_power = 0.0
489 max_power = 200.0
490
491 class State(enum.Enum):
492 S0 = 0 # Global error state or interlocked state
493 S1 = 1 # Standby state or Laser OFF state
494 S2 = 2 # Laser ON state
495
496 class Mode(enum.Enum):
497 blackout = 1
498 bias = 2
499 modulated = 3
500 deepstar = 4
501
502 command2mode = {
503 b"L0": Mode.blackout,
504 b"BLK": Mode.blackout,
505 b"LB": Mode.bias,
506 b"L1": Mode.modulated,
507 b"L2": Mode.deepstar,
508 }
509 mode2answer = {
510 Mode.blackout: b"L0", # always L0, even if BLK was used
511 Mode.bias: b"LB",
512 Mode.modulated: b"L1",
513 Mode.deepstar: b"L2",
514 }
515
516 def __init__(self, *args, **kwargs):
517 super().__init__(*args, **kwargs)
518
519 self.power = self.default_power
520
521 self.state = self.State.S1 # default dependent on 'ASx' command
522 self.mode = self.Mode.blackout
523
524 self.internal_peak_power = False
525 self.analog2digital = False
526 self.bias_modulation = False
527 self.digital_modulation = False
528
529 @property
530 def light(self):
531 if (
532 self.state != self.State.S2
533 or self.mode == self.mode.blackout
534 or not self.internal_peak_power
535 or (self.analog2digital and not self.digital_modulation)
536 ):
537 return False
538 return True
539
540 def write(self, data):
541 # This connection does not wait for an eol to parse the
542 # command. It only looks at 16 or 7 bit (depending on
543 # state). Sending a message one character at a time will not
544 # work so just send the whole data to be handled.
545 self.handle(data)
546 return len(data)
547
548 def handle(self, command):
549 if len(command) != 16 and (
550 len(command) != 7 and self.state != self.State.S2
551 ):
552 # Such a thing will make the laser go into S0 state which
553 # will also turns it off.. We don't model this because
554 # we don't know well how the reset (RST command) works.
555 raise RuntimeError("invalid Omicron Deepstar command")
556 elif command[-2:] != b"\r\n":
557 # Even if a command is correct, the last two characters
558 # need to be \r\n.
559 raise RuntimeError("command does not end in '\\r\\n'")
560
561 command = command[:-2].rstrip(b" ")
562 answer = None
563
564 if command == b"S?":
565 answer = self.state.name.encode()
566 elif command == b"STAT0":
567 # Model-code of the connected lasersystem:
568 answer = (
569 b"MC"
570 + b" 488" # wavelength
571 + b" " # empty for single diode system (D for double)
572 + b" %3d" % (self.max_power) # in mw
573 + b" TA "
574 ) # controller version / operating mode
575 elif command == b"STAT1":
576 answer = (
577 b"SL"
578 + b" 6AB" # actual bias (hexadecimal)
579 + b" 600" # modulated bias-level (hexadecimal)
580 + b" 868" # mod-level internal set for drive max. current
581 + b" T249" # diode temperature (in celcius * 10)
582 + b" V117"
583 ) # control voltage (in volts * 10)
584 elif command == b"STAT2":
585 answer = (
586 b"R111" # firmware release
587 + b" N02" # No used laserpen
588 + b" SNP131056" # S/No of laserhead
589 + b" SNC131056" # S/No of controller
590 + b" WH 04667" # working hours
591 + b" SLS B9C 500"
592 ) # start values for the diode parameters
593 elif command == b"STAT3":
594 # Stored option code flags.
595 answer = (
596 b"OC "
597 + b"AS1" # autostart option
598 + b"TH0" # TTL-logic-high
599 + b"AP0" # auto power correction
600 + b"FK0" # fiber coupling single mode
601 + b"AC0" # analog modulation for CW-lasers
602 + b"AM0" # analog modulation for modulated lasers
603 + b"SU0" # subtractive analog modulation for modulated lasers
604 + b"CO0" # collimator optic
605 + b"FO0" # focusing optic
606 + b"MO0" # highspeed monitoring
607 + b"US0" # USB interface
608 + b"LA1" # RS232 interface
609 + b"FA0"
610 ) # fiber coupling single mode
611
612 # Changing mode
613 elif command in self.command2mode.keys():
614 if self.state == self.State.S2:
615 self.mode = self.command2mode[command]
616 answer = b">"
617 else:
618 answer = b"UK"
619
620 # Current mode (undocumented)
621 elif command == b"L?":
622 if self.state == self.State.S2:
623 answer = self.mode2answer[self.mode]
624 else:
625 answer = b"UK"
626
627 # Laser on
628 elif command == b"LON":
629 if self.state == self.State.S1:
630 self.state = self.State.S2
631 answer = b"LONOK"
632 elif self.mode == self.Mode.S2:
633 answer = b"UK"
634 else: # in S0 state
635 # This is undocumented and it's probably a bug on
636 # their firmware. Should probably be returning UK.
637 answer = b"INT"
638
639 # Laser off
640 elif command == b"LF":
641 if self.state == self.State.S2:
642 self.state = self.State.S1
643 answer = b"LOFFOK"
644 else:
645 answer = b"UK"
646
647 # Peak Power
648 elif command.startswith(b"PP"):
649 # peak power values are a 3 byte char hexadecimal number,
650 # scale to the range of possible power:
651 # 000[hex] = 0[dec] = 0% = 0 mW
652 # FFF[hex] = 4095[dec] = 100% = 200 mW
653 if command == b"PP?":
654 level = self.power / self.max_power
655 answer = b"PP%03X" % round(float(0xFFF) * level)
656 elif len(command) == 5:
657 level = int(command[2:], 16) / float(0xFFF)
658 self.power = level * self.max_power
659 answer = command
660 else:
661 raise RuntimeError("invalid command '%'" % command)
662
663 # Power level
664 elif command == b"P?":
665 # TODO: get a laser that supports this command to test.
666 # This is only based on the documentation.
667 #
668 # Actual laser power is a 4 byte char heaxadecimal
669 # number. Range for actual laser power is:
670 # 0x0000 = 0 [dec] = 0% = 0 mW
671 # 0x0CCC = 3276 [dec] = 100% = 200 mW
672 # 0x0FFF = 4095 [dec] = 120% = 240 mW
673 level = self.power / self.max_power
674 answer = b"P%04X" % round(float(0xCCC) * level)
675
676 # Internal peak power
677 elif command == b"IPO":
678 self.internal_peak_power = True
679 answer = command
680 elif command == b"IPF":
681 self.internal_peak_power = False
682 answer = command
683 elif command == b"IP?":
684 answer = b"IPO" if self.internal_peak_power else b"IPF"
685
686 # Analogue modulation path or signal linked to the digital
687 # modulation path.
688 elif command == b"A2DO":
689 self.analog2digital = True
690 answer = b"A2D ON"
691 elif command == b"A2DF":
692 self.analog2digital = False
693 answer = b"A2D OFF"
694 elif command == b"A2D?":
695 answer = b"A2D ON" if self.analog2digital else b"A2D OFF"
696
697 # Bias and Digital modulation
698 elif command == b"MF":
699 self.bias_modulation = False
700 self.digital_modulation = False
701 answer = command
702 elif command == b"MO1":
703 self.bias_modulation = True
704 self.digital_modulation = False
705 answer = command
706 elif command == b"MO2":
707 self.bias_modulation = False
708 self.digital_modulation = True
709 answer = command
710 elif command == b"MO3":
711 self.bias_modulation = True
712 self.digital_modulation = True
713 answer = command
714
715 else:
716 raise NotImplementedError(
717 "no handling for command '%s'" % command.decode("utf-8")
718 )
719
720 self.in_buffer.write(answer + self.eol)