33 """Connection to a Prior ProScanIII and wrapper to its commands.
35 Devices that are controlled by the same controller should share
36 the same connection instance to ensure correct synchronization of
37 communications from different threads. This ensures that commands
38 for different devices,
or replies
from different devices, don
't
41 This class also implements the logic to parse and validate
42 commands so it can be shared between multiple devices.
46 def __init__(self, port: str, baudrate: int, timeout: float) ->
None:
54 bytesize=serial.EIGHTBITS,
55 stopbits=serial.STOPBITS_ONE,
56 parity=serial.PARITY_NONE,
61 self.
_lock = threading.RLock()
69 if answer != b
"PROSCAN INFORMATION\r":
72 "Not a ProScanIII device: '?' returned '%s'"
76 line = self.
_serial.read_until(b
"\rEND\r")
77 if not line.endswith(b
"\rEND\r"):
78 raise RuntimeError(
"Failed to clear description")
80 def command(self, command: bytes) ->
None:
81 """Send command to device."""
83 self.
_serial.write(command + b
"\r")
86 """Read a line from the device connection."""
88 return self.
_serial.read_until(b
"\r")
91 """Read until timeout; used to clean buffer if in an unknown state."""
97 def _command_and_validate(self, command: bytes, expected: bytes) ->
None:
98 """Send command and raise exception if answer is unexpected"""
101 if answer != expected:
104 "command '%s' failed (got '%s')"
105 % (command.decode(), answer.decode())
109 """Send get command and return the answer."""
115 """Send a move command and check return value."""
125 """Send a set command and check return value."""
132 """Send a get description command and return it."""
135 return self.
_serial.read_until(b
"\rEND\r")
137 @contextlib.contextmanager
138 def changed_timeout(self, new_timeout: float):
139 previous = self.
_serial.timeout
141 self.
_serial.timeout = new_timeout
144 self.
_serial.timeout = previous
146 def assert_filterwheel_number(self, number: int) ->
None:
147 assert number > 0
and number < 4
149 def _has_thing(self, command: bytes, expected_start: bytes) -> bool:
154 if not description.startswith(expected_start):
157 "Failed to get description '%s' (got '%s')"
158 % (command.decode(), description.decode())
160 return not description.startswith(expected_start + b
"NONE\r")
162 def has_filterwheel(self, number: int) -> bool:
171 return self.
_has_thing(b
"FILTER %d" % number, b
"FILTER_%d = " % number)
173 def get_n_filter_positions(self, number: int) -> int:
178 def get_filter_position(self, number: int) -> int:
183 def set_filter_position(self, number: int, pos: int) ->
None:
189 """Prior ProScanIII controller.
191 The controlled devices have the following labels:
194 Filter wheel connected to connector labelled "FILTER 1".
196 Filter wheel connected to connector labelled
"FILTER 1".
198 Filter wheel connected to connector labelled
"A AXIS".
202 The Prior ProScanIII can control up to three filter wheels.
203 However, a filter position may have a different number
204 dependening on which connector it
is. For example, using an 8
205 position filter wheel, what
is position 1 on the
"filter 1" and
206 "filter 2" connectors,
is position 4 when on the
"A axis" (
or
207 "filter 3") connector.
212 self, port: str, baudrate: int = 9600, timeout: float = 0.5, **kwargs
214 super().__init__(**kwargs)
219 for number
in range(1, 4):
220 if self.
_conn.has_filterwheel(number):
221 key =
"filter %d" % number
225 def devices(self) -> typing.Mapping[str, microscope.abc.Device]:
230 def __init__(self, connection: _ProScanIIIConnection, number: int) ->
None:
231 super().__init__(positions=connection.get_n_filter_positions(number))
232 self.
_conn = connection
235 def _do_get_position(self) -> int:
238 def _do_set_position(self, position: int) ->
None:
241 def _do_shutdown(self) -> None:
bool _has_thing(self, bytes command, bytes expected_start)
bytes get_command(self, bytes command)
bytes get_description(self, bytes command)
None command(self, bytes command)
None read_until_timeout(self)
None assert_filterwheel_number(self, int number)
None move_command(self, bytes command)
None set_command(self, bytes command)
def changed_timeout(self, float new_timeout)
None _command_and_validate(self, bytes command, bytes expected)