OLD | NEW |
| (Empty) |
1 #! python | |
2 # | |
3 # Python Serial Port Extension for Win32, Linux, BSD, Jython | |
4 # see __init__.py | |
5 # | |
6 # This module implements a loop back connection receiving itself what it sent. | |
7 # | |
8 # The purpose of this module is.. well... You can run the unit tests with it. | |
9 # and it was so easy to implement ;-) | |
10 # | |
11 # (C) 2001-2011 Chris Liechti <cliechti@gmx.net> | |
12 # this is distributed under a free software license, see license.txt | |
13 # | |
14 # URL format: loop://[option[/option...]] | |
15 # options: | |
16 # - "debug" print diagnostic messages | |
17 | |
18 from serial.serialutil import * | |
19 import threading | |
20 import time | |
21 import logging | |
22 | |
23 # map log level names to constants. used in fromURL() | |
24 LOGGER_LEVELS = { | |
25 'debug': logging.DEBUG, | |
26 'info': logging.INFO, | |
27 'warning': logging.WARNING, | |
28 'error': logging.ERROR, | |
29 } | |
30 | |
31 | |
32 class LoopbackSerial(SerialBase): | |
33 """Serial port implementation that simulates a loop back connection in plain
software.""" | |
34 | |
35 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, | |
36 9600, 19200, 38400, 57600, 115200) | |
37 | |
38 def open(self): | |
39 """Open port with current settings. This may throw a SerialException | |
40 if the port cannot be opened.""" | |
41 if self._isOpen: | |
42 raise SerialException("Port is already open.") | |
43 self.logger = None | |
44 self.buffer_lock = threading.Lock() | |
45 self.loop_buffer = bytearray() | |
46 self.cts = False | |
47 self.dsr = False | |
48 | |
49 if self._port is None: | |
50 raise SerialException("Port must be configured before it can be used
.") | |
51 # not that there is anything to open, but the function applies the | |
52 # options found in the URL | |
53 self.fromURL(self.port) | |
54 | |
55 # not that there anything to configure... | |
56 self._reconfigurePort() | |
57 # all things set up get, now a clean start | |
58 self._isOpen = True | |
59 if not self._rtscts: | |
60 self.setRTS(True) | |
61 self.setDTR(True) | |
62 self.flushInput() | |
63 self.flushOutput() | |
64 | |
65 def _reconfigurePort(self): | |
66 """Set communication parameters on opened port. for the loop:// | |
67 protocol all settings are ignored!""" | |
68 # not that's it of any real use, but it helps in the unit tests | |
69 if not isinstance(self._baudrate, (int, long)) or not 0 < self._baudrate
< 2**32: | |
70 raise ValueError("invalid baudrate: %r" % (self._baudrate)) | |
71 if self.logger: | |
72 self.logger.info('_reconfigurePort()') | |
73 | |
74 def close(self): | |
75 """Close port""" | |
76 if self._isOpen: | |
77 self._isOpen = False | |
78 # in case of quick reconnects, give the server some time | |
79 time.sleep(0.3) | |
80 | |
81 def makeDeviceName(self, port): | |
82 raise SerialException("there is no sensible way to turn numbers into URL
s") | |
83 | |
84 def fromURL(self, url): | |
85 """extract host and port from an URL string""" | |
86 if url.lower().startswith("loop://"): url = url[7:] | |
87 try: | |
88 # process options now, directly altering self | |
89 for option in url.split('/'): | |
90 if '=' in option: | |
91 option, value = option.split('=', 1) | |
92 else: | |
93 value = None | |
94 if not option: | |
95 pass | |
96 elif option == 'logging': | |
97 logging.basicConfig() # XXX is that good to call it here? | |
98 self.logger = logging.getLogger('pySerial.loop') | |
99 self.logger.setLevel(LOGGER_LEVELS[value]) | |
100 self.logger.debug('enabled logging') | |
101 else: | |
102 raise ValueError('unknown option: %r' % (option,)) | |
103 except ValueError, e: | |
104 raise SerialException('expected a string in the form "[loop://][opti
on[/option...]]": %s' % e) | |
105 | |
106 # - - - - - - - - - - - - - - - - - - - - - - - - | |
107 | |
108 def inWaiting(self): | |
109 """Return the number of characters currently in the input buffer.""" | |
110 if not self._isOpen: raise portNotOpenError | |
111 if self.logger: | |
112 # attention the logged value can differ from return value in | |
113 # threaded environments... | |
114 self.logger.debug('inWaiting() -> %d' % (len(self.loop_buffer),)) | |
115 return len(self.loop_buffer) | |
116 | |
117 def read(self, size=1): | |
118 """Read size bytes from the serial port. If a timeout is set it may | |
119 return less characters as requested. With no timeout it will block | |
120 until the requested number of bytes is read.""" | |
121 if not self._isOpen: raise portNotOpenError | |
122 if self._timeout is not None: | |
123 timeout = time.time() + self._timeout | |
124 else: | |
125 timeout = None | |
126 data = bytearray() | |
127 while size > 0: | |
128 self.buffer_lock.acquire() | |
129 try: | |
130 block = to_bytes(self.loop_buffer[:size]) | |
131 del self.loop_buffer[:size] | |
132 finally: | |
133 self.buffer_lock.release() | |
134 data += block | |
135 size -= len(block) | |
136 # check for timeout now, after data has been read. | |
137 # useful for timeout = 0 (non blocking) read | |
138 if timeout and time.time() > timeout: | |
139 break | |
140 return bytes(data) | |
141 | |
142 def write(self, data): | |
143 """Output the given string over the serial port. Can block if the | |
144 connection is blocked. May raise SerialException if the connection is | |
145 closed.""" | |
146 if not self._isOpen: raise portNotOpenError | |
147 # ensure we're working with bytes | |
148 data = to_bytes(data) | |
149 # calculate aprox time that would be used to send the data | |
150 time_used_to_send = 10.0*len(data) / self._baudrate | |
151 # when a write timeout is configured check if we would be successful | |
152 # (not sending anything, not even the part that would have time) | |
153 if self._writeTimeout is not None and time_used_to_send > self._writeTim
eout: | |
154 time.sleep(self._writeTimeout) # must wait so that unit test succeed
s | |
155 raise writeTimeoutError | |
156 self.buffer_lock.acquire() | |
157 try: | |
158 self.loop_buffer += data | |
159 finally: | |
160 self.buffer_lock.release() | |
161 return len(data) | |
162 | |
163 def flushInput(self): | |
164 """Clear input buffer, discarding all that is in the buffer.""" | |
165 if not self._isOpen: raise portNotOpenError | |
166 if self.logger: | |
167 self.logger.info('flushInput()') | |
168 self.buffer_lock.acquire() | |
169 try: | |
170 del self.loop_buffer[:] | |
171 finally: | |
172 self.buffer_lock.release() | |
173 | |
174 def flushOutput(self): | |
175 """Clear output buffer, aborting the current output and | |
176 discarding all that is in the buffer.""" | |
177 if not self._isOpen: raise portNotOpenError | |
178 if self.logger: | |
179 self.logger.info('flushOutput()') | |
180 | |
181 def sendBreak(self, duration=0.25): | |
182 """Send break condition. Timed, returns to idle state after given | |
183 duration.""" | |
184 if not self._isOpen: raise portNotOpenError | |
185 | |
186 def setBreak(self, level=True): | |
187 """Set break: Controls TXD. When active, to transmitting is | |
188 possible.""" | |
189 if not self._isOpen: raise portNotOpenError | |
190 if self.logger: | |
191 self.logger.info('setBreak(%r)' % (level,)) | |
192 | |
193 def setRTS(self, level=True): | |
194 """Set terminal status line: Request To Send""" | |
195 if not self._isOpen: raise portNotOpenError | |
196 if self.logger: | |
197 self.logger.info('setRTS(%r) -> state of CTS' % (level,)) | |
198 self.cts = level | |
199 | |
200 def setDTR(self, level=True): | |
201 """Set terminal status line: Data Terminal Ready""" | |
202 if not self._isOpen: raise portNotOpenError | |
203 if self.logger: | |
204 self.logger.info('setDTR(%r) -> state of DSR' % (level,)) | |
205 self.dsr = level | |
206 | |
207 def getCTS(self): | |
208 """Read terminal status line: Clear To Send""" | |
209 if not self._isOpen: raise portNotOpenError | |
210 if self.logger: | |
211 self.logger.info('getCTS() -> state of RTS (%r)' % (self.cts,)) | |
212 return self.cts | |
213 | |
214 def getDSR(self): | |
215 """Read terminal status line: Data Set Ready""" | |
216 if not self._isOpen: raise portNotOpenError | |
217 if self.logger: | |
218 self.logger.info('getDSR() -> state of DTR (%r)' % (self.dsr,)) | |
219 return self.dsr | |
220 | |
221 def getRI(self): | |
222 """Read terminal status line: Ring Indicator""" | |
223 if not self._isOpen: raise portNotOpenError | |
224 if self.logger: | |
225 self.logger.info('returning dummy for getRI()') | |
226 return False | |
227 | |
228 def getCD(self): | |
229 """Read terminal status line: Carrier Detect""" | |
230 if not self._isOpen: raise portNotOpenError | |
231 if self.logger: | |
232 self.logger.info('returning dummy for getCD()') | |
233 return True | |
234 | |
235 # - - - platform specific - - - | |
236 # None so far | |
237 | |
238 | |
239 # assemble Serial class with the platform specific implementation and the base | |
240 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O | |
241 # library, derive from io.RawIOBase | |
242 try: | |
243 import io | |
244 except ImportError: | |
245 # classic version with our own file-like emulation | |
246 class Serial(LoopbackSerial, FileLike): | |
247 pass | |
248 else: | |
249 # io library present | |
250 class Serial(LoopbackSerial, io.RawIOBase): | |
251 pass | |
252 | |
253 | |
254 # simple client test | |
255 if __name__ == '__main__': | |
256 import sys | |
257 s = Serial('loop://') | |
258 sys.stdout.write('%s\n' % s) | |
259 | |
260 sys.stdout.write("write...\n") | |
261 s.write("hello\n") | |
262 s.flush() | |
263 sys.stdout.write("read: %s\n" % s.read(5)) | |
264 | |
265 s.close() | |
OLD | NEW |