BioImager  3.9.1
A .NET microscopy imaging library. Supports various microscopes by using imported libraries & GUI automation. Supported libraries include PriorĀ® & ZeissĀ® & all devices supported by Micromanager 2.0 and python-microscope.
Loading...
Searching...
No Matches
prior.py
1#!/usr/bin/env python3
2
3## Copyright (C) 2020 David Miguel Susano Pinto <carandraug@gmail.com>
4##
5## This file is part of Microscope.
6##
7## Microscope is free software: you can redistribute it and/or modify
8## it under the terms of the GNU General Public License as published by
9## the Free Software Foundation, either version 3 of the License, or
10## (at your option) any later version.
11##
12## Microscope is distributed in the hope that it will be useful,
13## but WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15## GNU General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with Microscope. If not, see <http://www.gnu.org/licenses/>.
19
20"""Prior controller.
21"""
22
23import contextlib
24import threading
25import typing
26
27import serial
28
29import microscope.abc
30
31
33 """Connection to a Prior ProScanIII and wrapper to its commands.
34
35 Devices that are controlled by the same controller should share
36 the same connection instance to ensure correct synchronization of
37 communications from different threads. This ensures that commands
38 for different devices, or replies from different devices, don't
39 get entangled.
40
41 This class also implements the logic to parse and validate
42 commands so it can be shared between multiple devices.
43
44 """
45
46 def __init__(self, port: str, baudrate: int, timeout: float) -> None:
47 # From the technical datasheet: 8 bit word 1 stop bit, no
48 # parity no handshake, baudrate options of 9600, 19200, 38400,
49 # 57600 and 115200.
50 self._serial = serial.Serial(
51 port=port,
52 baudrate=baudrate,
53 timeout=timeout,
54 bytesize=serial.EIGHTBITS,
55 stopbits=serial.STOPBITS_ONE,
56 parity=serial.PARITY_NONE,
57 xonxoff=False,
58 rtscts=False,
59 dsrdtr=False,
60 )
61 self._lock = threading.RLock()
62
63 with self._lock:
64 # We do not use the general get_description() here because
65 # if this is not a ProScan device it would never reach the
66 # '\rEND\r' that signals the end of the description.
67 self.command(b"?")
68 answer = self.readline()
69 if answer != b"PROSCAN INFORMATION\r":
71 raise RuntimeError(
72 "Not a ProScanIII device: '?' returned '%s'"
73 % answer.decode()
74 )
75 # A description ends with END on its own line.
76 line = self._serial.read_until(b"\rEND\r")
77 if not line.endswith(b"\rEND\r"):
78 raise RuntimeError("Failed to clear description")
79
80 def command(self, command: bytes) -> None:
81 """Send command to device."""
82 with self._lock:
83 self._serial.write(command + b"\r")
84
85 def readline(self) -> bytes:
86 """Read a line from the device connection."""
87 with self._lock:
88 return self._serial.read_until(b"\r")
89
90 def read_until_timeout(self) -> None:
91 """Read until timeout; used to clean buffer if in an unknown state."""
92 with self._lock:
93 self._serial.flushInput()
94 while self._serial.readline():
95 continue
96
97 def _command_and_validate(self, command: bytes, expected: bytes) -> None:
98 """Send command and raise exception if answer is unexpected"""
99 with self._lock:
100 answer = self.get_command(command)
101 if answer != expected:
102 self.read_until_timeout()
103 raise RuntimeError(
104 "command '%s' failed (got '%s')"
105 % (command.decode(), answer.decode())
106 )
107
108 def get_command(self, command: bytes) -> bytes:
109 """Send get command and return the answer."""
110 with self._lock:
111 self.command(command)
112 return self.readline()
113
114 def move_command(self, command: bytes) -> None:
115 """Send a move command and check return value."""
116 # Movement commands respond with an R at the end of move.
117 # Once a movement command is issued the application should
118 # wait until the end of move R response is received before
119 # sending any further commands.
120 # TODO: this times 10 for timeout is a bit arbitrary.
121 with self.changed_timeout(10 * self._serial.timeout):
122 self._command_and_validate(command, b"R\r")
123
124 def set_command(self, command: bytes) -> None:
125 """Send a set command and check return value."""
126 # Property type commands that set certain status respond with
127 # zero. They respond with a zero even if there are invalid
128 # arguments in the command.
129 self._command_and_validate(command, b"0\r")
130
131 def get_description(self, command: bytes) -> bytes:
132 """Send a get description command and return it."""
133 with self._lock:
134 self.command(command)
135 return self._serial.read_until(b"\rEND\r")
136
137 @contextlib.contextmanager
138 def changed_timeout(self, new_timeout: float):
139 previous = self._serial.timeout
140 try:
141 self._serial.timeout = new_timeout
142 yield
143 finally:
144 self._serial.timeout = previous
145
146 def assert_filterwheel_number(self, number: int) -> None:
147 assert number > 0 and number < 4
148
149 def _has_thing(self, command: bytes, expected_start: bytes) -> bool:
150 # Use the commands that returns a description string to find
151 # whether a specific device is connected.
152 with self._lock:
153 description = self.get_description(command)
154 if not description.startswith(expected_start):
155 self.read_until_timeout()
156 raise RuntimeError(
157 "Failed to get description '%s' (got '%s')"
158 % (command.decode(), description.decode())
159 )
160 return not description.startswith(expected_start + b"NONE\r")
161
162 def has_filterwheel(self, number: int) -> bool:
163 self.assert_filterwheel_number(number)
164 # We use the 'FILTER w' command to check if there's a filter
165 # wheel instead of the '?' command. The reason is that the
166 # third filter wheel, named "A AXIS" on the controller box and
167 # "FOURTH" on the output of the '?' command, can be used for
168 # non filter wheels. We hope that 'FILTER 3' will fail
169 # properly if what is connected to "A AXIS" is not a filter
170 # wheel.
171 return self._has_thing(b"FILTER %d" % number, b"FILTER_%d = " % number)
172
173 def get_n_filter_positions(self, number: int) -> int:
174 self.assert_filterwheel_number(number)
175 answer = self.get_command(b"FPW %d" % number)
176 return int(answer)
177
178 def get_filter_position(self, number: int) -> int:
179 self.assert_filterwheel_number(number)
180 answer = self.get_command(b"7 %d F" % number)
181 return int(answer)
182
183 def set_filter_position(self, number: int, pos: int) -> None:
184 self.assert_filterwheel_number(number)
185 self.move_command(b"7 %d %d" % (number, pos))
186
187
189 """Prior ProScanIII controller.
190
191 The controlled devices have the following labels:
192
193 `filter 1`
194 Filter wheel connected to connector labelled "FILTER 1".
195 `filter 2`
196 Filter wheel connected to connector labelled "FILTER 1".
197 `filter 3`
198 Filter wheel connected to connector labelled "A AXIS".
199
200 .. note::
201
202 The Prior ProScanIII can control up to three filter wheels.
203 However, a filter position may have a different number
204 dependening on which connector it is. For example, using an 8
205 position filter wheel, what is position 1 on the "filter 1" and
206 "filter 2" connectors, is position 4 when on the "A axis" (or
207 "filter 3") connector.
208
209 """
210
211 def __init__(
212 self, port: str, baudrate: int = 9600, timeout: float = 0.5, **kwargs
213 ) -> None:
214 super().__init__(**kwargs)
215 self._conn = _ProScanIIIConnection(port, baudrate, timeout)
216 self._devices: typing.Mapping[str, microscope.abc.Device] = {}
217
218 # Can have up to three filter wheels, numbered 1 to 3.
219 for number in range(1, 4):
220 if self._conn.has_filterwheel(number):
221 key = "filter %d" % number
222 self._devices[key] = _ProScanIIIFilterWheel(self._conn, number)
223
224 @property
225 def devices(self) -> typing.Mapping[str, microscope.abc.Device]:
226 return self._devices
227
228
230 def __init__(self, connection: _ProScanIIIConnection, number: int) -> None:
231 super().__init__(positions=connection.get_n_filter_positions(number))
232 self._conn = connection
233 self._number = number
234
235 def _do_get_position(self) -> int:
236 return self._conn.get_filter_position(self._number)
237
238 def _do_set_position(self, position: int) -> None:
239 self._conn.set_filter_position(self._number, position)
240
241 def _do_shutdown(self) -> None:
242 pass
bool _has_thing(self, bytes command, bytes expected_start)
Definition: prior.py:149
bytes get_command(self, bytes command)
Definition: prior.py:108
bytes get_description(self, bytes command)
Definition: prior.py:131
None assert_filterwheel_number(self, int number)
Definition: prior.py:146
None move_command(self, bytes command)
Definition: prior.py:114
def changed_timeout(self, float new_timeout)
Definition: prior.py:138
None _command_and_validate(self, bytes command, bytes expected)
Definition: prior.py:97