20"""Mock devices to be used in test cases.
22These classes mock the different hardware as much as needed for our
23testing needs. Their behaviour is based first on the specifications
24we have,
and second on what we actually experience. Our experience
is
25that most hardware does
not actually follow the specs.
27To fake a specific device type
for interactive usage, use the dummy
28device classes instead. There
's a concrete class for each device
35import serial.serialutil
38class SerialMock(serial.serialutil.SerialBase):
39 """Base class to mock devices controlled via serial.
41 It has two :class:`io.BytesIO` buffers. One :func:`write`s the
42 output buffer
and the other :func:`read`s the input buffer. After
43 a write, the output buffer
is analysed
for a command. If there
is
44 a command, stuff gets done. This usually means adding to the
45 input buffer
and changing state of the device.
49 def __init__(self, *args, **kwargs):
50 super().__init__(*args, **kwargs)
66 self.in_buffer.close()
67 self.out_buffer.close()
69 def handle(self, command):
70 raise NotImplementedError(
"sub classes need to implement handle()")
72 def write(self, data):
73 self.out_buffer.write(data)
74 self.out_pending_bytes += len(data)
76 if self.out_pending_bytes > len(data):
78 self.out_buffer.seek(-self.out_pending_bytes, 2)
79 data = self.out_buffer.read(self.out_pending_bytes)
81 for msg
in data.split(self.eol)[:-1]:
83 self.out_pending_bytes -= len(msg) + len(self.eol)
86 def _readx_wrapper(self, reader, *args, **kwargs):
87 """Place pointer of input buffer before and after read methods"""
88 self.in_buffer.seek(self.in_read_bytes)
89 msg = reader(*args, **kwargs)
90 self.in_read_bytes += len(msg)
93 def read(self, size=1):
94 return self._readx_wrapper(self.in_buffer.read, size)
96 def readline(self, size=-1):
97 return self._readx_wrapper(self.in_buffer.readline, size)
99 def reset_input_buffer(self):
100 self.in_read_bytes = self.in_buffer.getbuffer().nbytes
101 self.in_buffer.seek(0, 2)
103 def reset_output_buffer(self):
107class CoherentSapphireLaserMock(SerialMock):
108 """Modelled after a Coherent Sapphire LP 561nm laser.
110 This mocked device is constructed into the ready state. That
is,
111 after the laser has been turned on enough time to warmup (~ 30
112 seconds),
and then the key has been turned on
for enough time to
113 actual get the laser ready (~10 seconds).
115 We don
't mock the turning of the key, that's much trickier
and we
116 don
't need it yet. We'll do it
if it ever becomes an issue,
and
117 probably use a state machine library
for that.
125 parity = serial.PARITY_NONE
126 bytesize = serial.EIGHTBITS
127 stopbits = serial.STOPBITS_ONE
136 def __init__(self, *args, **kwargs):
137 super().__init__(*args, **kwargs)
140 self.status =
"laser ready"
145 self.power = CoherentSapphireLaserMock.default_power
147 def write(self, data):
152 self.in_buffer.write(data)
155 self.in_buffer.write(self.eol * data.count(self.eol))
156 return super().write(data)
158 def handle(self, command):
161 command = command.upper()
166 if command == b
">=0":
168 elif command == b
">=1":
172 elif command == b
"E=0":
174 elif command == b
"E=1":
178 elif command == b
"?HID":
179 answer = b
"505925.000"
182 elif command == b
"?HH":
186 elif command == b
"?K":
187 if self.key ==
"standby":
189 elif self.key ==
"on":
192 raise RuntimeError(
"unknown key state '%s'" % self.key)
195 elif command == b
"L=0":
197 elif command == b
"L=1":
203 answer = b
"TEC must be ON (T=1) to enable Light Output!"
204 elif command == b
"?L":
205 answer = b
"1" if self.light
else b
"0"
208 elif command == b
"T=0":
212 elif command == b
"T=1":
214 elif command == b
"?T":
215 answer = b
"1" if self.tec
else b
"0"
218 elif command == b
"?MINLP":
220 elif command == b
"?MAXLP":
222 elif command == b
"?P":
226 answer = b
"%.3f" % (self.power)
227 elif command == b
"?SP":
228 answer = b
"%.3f" % (self.power)
229 elif command.startswith(b
"P="):
230 new_power = float(command[2:])
231 if new_power < 19.999999
or new_power > 220.00001:
232 answer = b
"value must be between 20.000 and 220.000"
235 answer = b
"Note: Laser_Output is OFF (L=0)"
236 self.power = new_power
239 elif command == b
"NOMP":
243 elif command == b
"LT":
244 answer = b
"Sapphire 200mW"
247 elif command == b
"?STA":
256 answer = status_codes[self.status]
259 elif command == b
"?F":
261 elif command == b
"?FF":
290 elif command == b
"?FL":
293 answer = b
"Fault(s):\r\n\tNone"
296 elif command
in [b
"sv", b
"svps"]:
300 elif command == b
"?WAVE":
304 raise NotImplementedError(
305 "no handling for command '%s'" % command.decode(
"utf-8")
308 if answer
is not None:
309 self.in_buffer.write(answer + self.eol)
312 self.in_buffer.write(b
"Sapphire:0-> ")
317 """Modelled after a Cobolt Jive laser 561nm."""
322 parity = serial.PARITY_NONE
323 bytesize = serial.EIGHTBITS
324 stopbits = serial.STOPBITS_ONE
333 def __init__(self, *args, **kwargs):
334 super().__init__(*args, **kwargs)
336 self.
power = CoboltLaserMock.default_power
348 def handle(self, command):
350 command = command.strip()
356 if command == b
"sn?":
358 elif command == b
"gcn?":
359 answer = b
"Macro-Gen5b-SHG-0501_4W-RevA"
360 elif command == b
"ver?" or command == b
"gfv?":
362 elif command == b
"gfvlas?":
363 answer = b
"This laser head does not have firmware."
364 elif command == b
"hrs?":
375 elif command == b
"@cob1":
378 elif command == b
"@cob0":
382 elif command == b
"@cobas?":
384 elif command == b
"@cobas 0":
386 elif command == b
"@cobas 1":
390 elif command == b
"l?":
391 answer = b
"1" if self.
light else b
"0"
392 elif command == b
"l1":
394 answer = b
"Syntax error: not allowed in autostart mode."
397 elif command == b
"l0":
401 elif command.startswith(b
"p "):
403 new_power = float(command[2:]) * 1000.0
405 answer = b
"Syntax error: Value is out of range."
407 self.
power = new_power
408 elif command == b
"p?":
409 answer = b
"%.4f" % (self.
power / 1000.0)
410 elif command == b
"pa?":
412 answer = b
"%.4f" % (self.
power / 1000.0)
417 elif command.startswith(b
"@cobasp "):
421 elif command == b
"@cobasdr?":
423 elif command == b
"@cobasdr 0":
425 elif command == b
"@cobasdr 1":
429 elif command == b
"gmlp?":
430 answer = b
"600.000000"
433 elif command == b
"?":
437 elif command == b
"f?":
445 elif command == b
"ilk?":
449 elif command == b
"cobast?":
465 raise NotImplementedError(
466 "no handling for command '%s'" % command.decode(
"utf-8")
475 """Modelled after a TA Deepstar 488nm."""
480 parity = serial.PARITY_NONE
481 bytesize = serial.EIGHTBITS
482 stopbits = serial.STOPBITS_ONE
503 b
"L0": Mode.blackout,
504 b
"BLK": Mode.blackout,
506 b
"L1": Mode.modulated,
507 b
"L2": Mode.deepstar,
510 Mode.blackout: b
"L0",
512 Mode.modulated: b
"L1",
513 Mode.deepstar: b
"L2",
516 def __init__(self, *args, **kwargs):
517 super().__init__(*args, **kwargs)
519 self.power = self.default_power
521 self.state = self.State.S1
522 self.mode = self.Mode.blackout
524 self.internal_peak_power =
False
525 self.analog2digital =
False
526 self.bias_modulation =
False
527 self.digital_modulation =
False
532 self.state != self.State.S2
533 or self.mode == self.mode.blackout
534 or not self.internal_peak_power
535 or (self.analog2digital
and not self.digital_modulation)
540 def write(self, data):
548 def handle(self, command):
549 if len(command) != 16
and (
550 len(command) != 7
and self.state != self.State.S2
555 raise RuntimeError(
"invalid Omicron Deepstar command")
556 elif command[-2:] != b
"\r\n":
559 raise RuntimeError(
"command does not end in '\\r\\n'")
561 command = command[:-2].rstrip(b
" ")
565 answer = self.state.name.encode()
566 elif command == b
"STAT0":
572 + b
" %3d" % (self.max_power)
575 elif command == b
"STAT1":
584 elif command == b
"STAT2":
593 elif command == b
"STAT3":
613 elif command
in self.command2mode.keys():
614 if self.state == self.State.S2:
615 self.mode = self.command2mode[command]
621 elif command == b
"L?":
622 if self.state == self.State.S2:
623 answer = self.mode2answer[self.mode]
628 elif command == b
"LON":
629 if self.state == self.State.S1:
630 self.state = self.State.S2
632 elif self.mode == self.Mode.S2:
640 elif command == b
"LF":
641 if self.state == self.State.S2:
642 self.state = self.State.S1
648 elif command.startswith(b
"PP"):
653 if command == b
"PP?":
654 level = self.power / self.max_power
655 answer = b
"PP%03X" % round(float(0xFFF) * level)
656 elif len(command) == 5:
657 level = int(command[2:], 16) / float(0xFFF)
658 self.power = level * self.max_power
661 raise RuntimeError(
"invalid command '%'" % command)
664 elif command == b
"P?":
673 level = self.power / self.max_power
674 answer = b
"P%04X" % round(float(0xCCC) * level)
677 elif command == b
"IPO":
678 self.internal_peak_power =
True
680 elif command == b
"IPF":
681 self.internal_peak_power =
False
683 elif command == b
"IP?":
684 answer = b
"IPO" if self.internal_peak_power
else b
"IPF"
688 elif command == b
"A2DO":
689 self.analog2digital =
True
691 elif command == b
"A2DF":
692 self.analog2digital =
False
694 elif command == b
"A2D?":
695 answer = b
"A2D ON" if self.analog2digital
else b
"A2D OFF"
698 elif command == b
"MF":
699 self.bias_modulation =
False
700 self.digital_modulation =
False
702 elif command == b
"MO1":
703 self.bias_modulation =
True
704 self.digital_modulation =
False
706 elif command == b
"MO2":
707 self.bias_modulation =
False
708 self.digital_modulation =
True
710 elif command == b
"MO3":
711 self.bias_modulation =
True
712 self.digital_modulation =
True
716 raise NotImplementedError(
717 "no handling for command '%s'" % command.decode(
"utf-8")
720 self.in_buffer.write(answer + self.eol)
def handle(self, command)
def handle(self, command)