OLD | NEW |
(Empty) | |
| 1 #! python |
| 2 # |
| 3 # Python Serial Port Extension for Win32, Linux, BSD, Jython |
| 4 # see __init__.py |
| 5 # |
| 6 # This module implements a loop back connection receiving itself what it sent. |
| 7 # |
| 8 # The purpose of this module is.. well... You can run the unit tests with it. |
| 9 # and it was so easy to implement ;-) |
| 10 # |
| 11 # (C) 2001-2011 Chris Liechti <cliechti@gmx.net> |
| 12 # this is distributed under a free software license, see license.txt |
| 13 # |
| 14 # URL format: loop://[option[/option...]] |
| 15 # options: |
| 16 # - "debug" print diagnostic messages |
| 17 |
| 18 from serial.serialutil import * |
| 19 import threading |
| 20 import time |
| 21 import logging |
| 22 |
| 23 # map log level names to constants. used in fromURL() |
| 24 LOGGER_LEVELS = { |
| 25 'debug': logging.DEBUG, |
| 26 'info': logging.INFO, |
| 27 'warning': logging.WARNING, |
| 28 'error': logging.ERROR, |
| 29 } |
| 30 |
| 31 |
| 32 class LoopbackSerial(SerialBase): |
| 33 """Serial port implementation that simulates a loop back connection in plain
software.""" |
| 34 |
| 35 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, |
| 36 9600, 19200, 38400, 57600, 115200) |
| 37 |
| 38 def open(self): |
| 39 """Open port with current settings. This may throw a SerialException |
| 40 if the port cannot be opened.""" |
| 41 if self._isOpen: |
| 42 raise SerialException("Port is already open.") |
| 43 self.logger = None |
| 44 self.buffer_lock = threading.Lock() |
| 45 self.loop_buffer = bytearray() |
| 46 self.cts = False |
| 47 self.dsr = False |
| 48 |
| 49 if self._port is None: |
| 50 raise SerialException("Port must be configured before it can be used
.") |
| 51 # not that there is anything to open, but the function applies the |
| 52 # options found in the URL |
| 53 self.fromURL(self.port) |
| 54 |
| 55 # not that there anything to configure... |
| 56 self._reconfigurePort() |
| 57 # all things set up get, now a clean start |
| 58 self._isOpen = True |
| 59 if not self._rtscts: |
| 60 self.setRTS(True) |
| 61 self.setDTR(True) |
| 62 self.flushInput() |
| 63 self.flushOutput() |
| 64 |
| 65 def _reconfigurePort(self): |
| 66 """Set communication parameters on opened port. for the loop:// |
| 67 protocol all settings are ignored!""" |
| 68 # not that's it of any real use, but it helps in the unit tests |
| 69 if not isinstance(self._baudrate, (int, long)) or not 0 < self._baudrate
< 2**32: |
| 70 raise ValueError("invalid baudrate: %r" % (self._baudrate)) |
| 71 if self.logger: |
| 72 self.logger.info('_reconfigurePort()') |
| 73 |
| 74 def close(self): |
| 75 """Close port""" |
| 76 if self._isOpen: |
| 77 self._isOpen = False |
| 78 # in case of quick reconnects, give the server some time |
| 79 time.sleep(0.3) |
| 80 |
| 81 def makeDeviceName(self, port): |
| 82 raise SerialException("there is no sensible way to turn numbers into URL
s") |
| 83 |
| 84 def fromURL(self, url): |
| 85 """extract host and port from an URL string""" |
| 86 if url.lower().startswith("loop://"): url = url[7:] |
| 87 try: |
| 88 # process options now, directly altering self |
| 89 for option in url.split('/'): |
| 90 if '=' in option: |
| 91 option, value = option.split('=', 1) |
| 92 else: |
| 93 value = None |
| 94 if not option: |
| 95 pass |
| 96 elif option == 'logging': |
| 97 logging.basicConfig() # XXX is that good to call it here? |
| 98 self.logger = logging.getLogger('pySerial.loop') |
| 99 self.logger.setLevel(LOGGER_LEVELS[value]) |
| 100 self.logger.debug('enabled logging') |
| 101 else: |
| 102 raise ValueError('unknown option: %r' % (option,)) |
| 103 except ValueError, e: |
| 104 raise SerialException('expected a string in the form "[loop://][opti
on[/option...]]": %s' % e) |
| 105 |
| 106 # - - - - - - - - - - - - - - - - - - - - - - - - |
| 107 |
| 108 def inWaiting(self): |
| 109 """Return the number of characters currently in the input buffer.""" |
| 110 if not self._isOpen: raise portNotOpenError |
| 111 if self.logger: |
| 112 # attention the logged value can differ from return value in |
| 113 # threaded environments... |
| 114 self.logger.debug('inWaiting() -> %d' % (len(self.loop_buffer),)) |
| 115 return len(self.loop_buffer) |
| 116 |
| 117 def read(self, size=1): |
| 118 """Read size bytes from the serial port. If a timeout is set it may |
| 119 return less characters as requested. With no timeout it will block |
| 120 until the requested number of bytes is read.""" |
| 121 if not self._isOpen: raise portNotOpenError |
| 122 if self._timeout is not None: |
| 123 timeout = time.time() + self._timeout |
| 124 else: |
| 125 timeout = None |
| 126 data = bytearray() |
| 127 while size > 0: |
| 128 self.buffer_lock.acquire() |
| 129 try: |
| 130 block = to_bytes(self.loop_buffer[:size]) |
| 131 del self.loop_buffer[:size] |
| 132 finally: |
| 133 self.buffer_lock.release() |
| 134 data += block |
| 135 size -= len(block) |
| 136 # check for timeout now, after data has been read. |
| 137 # useful for timeout = 0 (non blocking) read |
| 138 if timeout and time.time() > timeout: |
| 139 break |
| 140 return bytes(data) |
| 141 |
| 142 def write(self, data): |
| 143 """Output the given string over the serial port. Can block if the |
| 144 connection is blocked. May raise SerialException if the connection is |
| 145 closed.""" |
| 146 if not self._isOpen: raise portNotOpenError |
| 147 # ensure we're working with bytes |
| 148 data = to_bytes(data) |
| 149 # calculate aprox time that would be used to send the data |
| 150 time_used_to_send = 10.0*len(data) / self._baudrate |
| 151 # when a write timeout is configured check if we would be successful |
| 152 # (not sending anything, not even the part that would have time) |
| 153 if self._writeTimeout is not None and time_used_to_send > self._writeTim
eout: |
| 154 time.sleep(self._writeTimeout) # must wait so that unit test succeed
s |
| 155 raise writeTimeoutError |
| 156 self.buffer_lock.acquire() |
| 157 try: |
| 158 self.loop_buffer += data |
| 159 finally: |
| 160 self.buffer_lock.release() |
| 161 return len(data) |
| 162 |
| 163 def flushInput(self): |
| 164 """Clear input buffer, discarding all that is in the buffer.""" |
| 165 if not self._isOpen: raise portNotOpenError |
| 166 if self.logger: |
| 167 self.logger.info('flushInput()') |
| 168 self.buffer_lock.acquire() |
| 169 try: |
| 170 del self.loop_buffer[:] |
| 171 finally: |
| 172 self.buffer_lock.release() |
| 173 |
| 174 def flushOutput(self): |
| 175 """Clear output buffer, aborting the current output and |
| 176 discarding all that is in the buffer.""" |
| 177 if not self._isOpen: raise portNotOpenError |
| 178 if self.logger: |
| 179 self.logger.info('flushOutput()') |
| 180 |
| 181 def sendBreak(self, duration=0.25): |
| 182 """Send break condition. Timed, returns to idle state after given |
| 183 duration.""" |
| 184 if not self._isOpen: raise portNotOpenError |
| 185 |
| 186 def setBreak(self, level=True): |
| 187 """Set break: Controls TXD. When active, to transmitting is |
| 188 possible.""" |
| 189 if not self._isOpen: raise portNotOpenError |
| 190 if self.logger: |
| 191 self.logger.info('setBreak(%r)' % (level,)) |
| 192 |
| 193 def setRTS(self, level=True): |
| 194 """Set terminal status line: Request To Send""" |
| 195 if not self._isOpen: raise portNotOpenError |
| 196 if self.logger: |
| 197 self.logger.info('setRTS(%r) -> state of CTS' % (level,)) |
| 198 self.cts = level |
| 199 |
| 200 def setDTR(self, level=True): |
| 201 """Set terminal status line: Data Terminal Ready""" |
| 202 if not self._isOpen: raise portNotOpenError |
| 203 if self.logger: |
| 204 self.logger.info('setDTR(%r) -> state of DSR' % (level,)) |
| 205 self.dsr = level |
| 206 |
| 207 def getCTS(self): |
| 208 """Read terminal status line: Clear To Send""" |
| 209 if not self._isOpen: raise portNotOpenError |
| 210 if self.logger: |
| 211 self.logger.info('getCTS() -> state of RTS (%r)' % (self.cts,)) |
| 212 return self.cts |
| 213 |
| 214 def getDSR(self): |
| 215 """Read terminal status line: Data Set Ready""" |
| 216 if not self._isOpen: raise portNotOpenError |
| 217 if self.logger: |
| 218 self.logger.info('getDSR() -> state of DTR (%r)' % (self.dsr,)) |
| 219 return self.dsr |
| 220 |
| 221 def getRI(self): |
| 222 """Read terminal status line: Ring Indicator""" |
| 223 if not self._isOpen: raise portNotOpenError |
| 224 if self.logger: |
| 225 self.logger.info('returning dummy for getRI()') |
| 226 return False |
| 227 |
| 228 def getCD(self): |
| 229 """Read terminal status line: Carrier Detect""" |
| 230 if not self._isOpen: raise portNotOpenError |
| 231 if self.logger: |
| 232 self.logger.info('returning dummy for getCD()') |
| 233 return True |
| 234 |
| 235 # - - - platform specific - - - |
| 236 # None so far |
| 237 |
| 238 |
| 239 # assemble Serial class with the platform specific implementation and the base |
| 240 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O |
| 241 # library, derive from io.RawIOBase |
| 242 try: |
| 243 import io |
| 244 except ImportError: |
| 245 # classic version with our own file-like emulation |
| 246 class Serial(LoopbackSerial, FileLike): |
| 247 pass |
| 248 else: |
| 249 # io library present |
| 250 class Serial(LoopbackSerial, io.RawIOBase): |
| 251 pass |
| 252 |
| 253 |
| 254 # simple client test |
| 255 if __name__ == '__main__': |
| 256 import sys |
| 257 s = Serial('loop://') |
| 258 sys.stdout.write('%s\n' % s) |
| 259 |
| 260 sys.stdout.write("write...\n") |
| 261 s.write("hello\n") |
| 262 s.flush() |
| 263 sys.stdout.write("read: %s\n" % s.read(5)) |
| 264 |
| 265 s.close() |
OLD | NEW |