Index: tools/telemetry/third_party/pyserial/serial/serialutil.py |
diff --git a/tools/telemetry/third_party/pyserial/serial/serialutil.py b/tools/telemetry/third_party/pyserial/serial/serialutil.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f28ece454fed94087478dd4fee02ec2ab2681ec6 |
--- /dev/null |
+++ b/tools/telemetry/third_party/pyserial/serial/serialutil.py |
@@ -0,0 +1,551 @@ |
+#! python |
+# Python Serial Port Extension for Win32, Linux, BSD, Jython |
+# see __init__.py |
+# |
+# (C) 2001-2010 Chris Liechti <cliechti@gmx.net> |
+# this is distributed under a free software license, see license.txt |
+ |
+# compatibility for older Python < 2.6 |
+try: |
+ bytes |
+ bytearray |
+except (NameError, AttributeError): |
+ # Python older than 2.6 do not have these types. Like for Python 2.6 they |
+ # should behave like str. For Python older than 3.0 we want to work with |
+ # strings anyway, only later versions have a true bytes type. |
+ bytes = str |
+ # bytearray is a mutable type that is easily turned into an instance of |
+ # bytes |
+ class bytearray(list): |
+ # for bytes(bytearray()) usage |
+ def __str__(self): return ''.join(self) |
+ def __repr__(self): return 'bytearray(%r)' % ''.join(self) |
+ # append automatically converts integers to characters |
+ def append(self, item): |
+ if isinstance(item, str): |
+ list.append(self, item) |
+ else: |
+ list.append(self, chr(item)) |
+ # += |
+ def __iadd__(self, other): |
+ for byte in other: |
+ self.append(byte) |
+ return self |
+ |
+ def __getslice__(self, i, j): |
+ return bytearray(list.__getslice__(self, i, j)) |
+ |
+ def __getitem__(self, item): |
+ if isinstance(item, slice): |
+ return bytearray(list.__getitem__(self, item)) |
+ else: |
+ return ord(list.__getitem__(self, item)) |
+ |
+ def __eq__(self, other): |
+ if isinstance(other, basestring): |
+ other = bytearray(other) |
+ return list.__eq__(self, other) |
+ |
+# ``memoryview`` was introduced in Python 2.7 and ``bytes(some_memoryview)`` |
+# isn't returning the contents (very unfortunate). Therefore we need special |
+# cases and test for it. Ensure that there is a ``memoryview`` object for older |
+# Python versions. This is easier than making every test dependent on its |
+# existence. |
+try: |
+ memoryview |
+except (NameError, AttributeError): |
+ # implementation does not matter as we do not realy use it. |
+ # it just must not inherit from something else we might care for. |
+ class memoryview: |
+ pass |
+ |
+ |
+# all Python versions prior 3.x convert ``str([17])`` to '[17]' instead of '\x11' |
+# so a simple ``bytes(sequence)`` doesn't work for all versions |
+def to_bytes(seq): |
+ """convert a sequence to a bytes type""" |
+ if isinstance(seq, bytes): |
+ return seq |
+ elif isinstance(seq, bytearray): |
+ return bytes(seq) |
+ elif isinstance(seq, memoryview): |
+ return seq.tobytes() |
+ else: |
+ b = bytearray() |
+ for item in seq: |
+ b.append(item) # this one handles int and str for our emulation and ints for Python 3.x |
+ return bytes(b) |
+ |
+# create control bytes |
+XON = to_bytes([17]) |
+XOFF = to_bytes([19]) |
+ |
+CR = to_bytes([13]) |
+LF = to_bytes([10]) |
+ |
+ |
+PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S' |
+STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2) |
+FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8) |
+ |
+PARITY_NAMES = { |
+ PARITY_NONE: 'None', |
+ PARITY_EVEN: 'Even', |
+ PARITY_ODD: 'Odd', |
+ PARITY_MARK: 'Mark', |
+ PARITY_SPACE: 'Space', |
+} |
+ |
+ |
+class SerialException(IOError): |
+ """Base class for serial port related exceptions.""" |
+ |
+ |
+class SerialTimeoutException(SerialException): |
+ """Write timeouts give an exception""" |
+ |
+ |
+writeTimeoutError = SerialTimeoutException('Write timeout') |
+portNotOpenError = SerialException('Attempting to use a port that is not open') |
+ |
+ |
+class FileLike(object): |
+ """An abstract file like class. |
+ |
+ This class implements readline and readlines based on read and |
+ writelines based on write. |
+ This class is used to provide the above functions for to Serial |
+ port objects. |
+ |
+ Note that when the serial port was opened with _NO_ timeout that |
+ readline blocks until it sees a newline (or the specified size is |
+ reached) and that readlines would never return and therefore |
+ refuses to work (it raises an exception in this case)! |
+ """ |
+ |
+ def __init__(self): |
+ self.closed = True |
+ |
+ def close(self): |
+ self.closed = True |
+ |
+ # so that ports are closed when objects are discarded |
+ def __del__(self): |
+ """Destructor. Calls close().""" |
+ # The try/except block is in case this is called at program |
+ # exit time, when it's possible that globals have already been |
+ # deleted, and then the close() call might fail. Since |
+ # there's nothing we can do about such failures and they annoy |
+ # the end users, we suppress the traceback. |
+ try: |
+ self.close() |
+ except: |
+ pass |
+ |
+ def writelines(self, sequence): |
+ for line in sequence: |
+ self.write(line) |
+ |
+ def flush(self): |
+ """flush of file like objects""" |
+ pass |
+ |
+ # iterator for e.g. "for line in Serial(0): ..." usage |
+ def next(self): |
+ line = self.readline() |
+ if not line: raise StopIteration |
+ return line |
+ |
+ def __iter__(self): |
+ return self |
+ |
+ def readline(self, size=None, eol=LF): |
+ """read a line which is terminated with end-of-line (eol) character |
+ ('\n' by default) or until timeout.""" |
+ leneol = len(eol) |
+ line = bytearray() |
+ while True: |
+ c = self.read(1) |
+ if c: |
+ line += c |
+ if line[-leneol:] == eol: |
+ break |
+ if size is not None and len(line) >= size: |
+ break |
+ else: |
+ break |
+ return bytes(line) |
+ |
+ def readlines(self, sizehint=None, eol=LF): |
+ """read a list of lines, until timeout. |
+ sizehint is ignored.""" |
+ if self.timeout is None: |
+ raise ValueError("Serial port MUST have enabled timeout for this function!") |
+ leneol = len(eol) |
+ lines = [] |
+ while True: |
+ line = self.readline(eol=eol) |
+ if line: |
+ lines.append(line) |
+ if line[-leneol:] != eol: # was the line received with a timeout? |
+ break |
+ else: |
+ break |
+ return lines |
+ |
+ def xreadlines(self, sizehint=None): |
+ """Read lines, implemented as generator. It will raise StopIteration on |
+ timeout (empty read). sizehint is ignored.""" |
+ while True: |
+ line = self.readline() |
+ if not line: break |
+ yield line |
+ |
+ # other functions of file-likes - not used by pySerial |
+ |
+ #~ readinto(b) |
+ |
+ def seek(self, pos, whence=0): |
+ raise IOError("file is not seekable") |
+ |
+ def tell(self): |
+ raise IOError("file is not seekable") |
+ |
+ def truncate(self, n=None): |
+ raise IOError("file is not seekable") |
+ |
+ def isatty(self): |
+ return False |
+ |
+ |
+class SerialBase(object): |
+ """Serial port base class. Provides __init__ function and properties to |
+ get/set port settings.""" |
+ |
+ # default values, may be overridden in subclasses that do not support all values |
+ BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, |
+ 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, |
+ 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, |
+ 3000000, 3500000, 4000000) |
+ BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS) |
+ PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE) |
+ STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO) |
+ |
+ def __init__(self, |
+ port = None, # number of device, numbering starts at |
+ # zero. if everything fails, the user |
+ # can specify a device string, note |
+ # that this isn't portable anymore |
+ # port will be opened if one is specified |
+ baudrate=9600, # baud rate |
+ bytesize=EIGHTBITS, # number of data bits |
+ parity=PARITY_NONE, # enable parity checking |
+ stopbits=STOPBITS_ONE, # number of stop bits |
+ timeout=None, # set a timeout value, None to wait forever |
+ xonxoff=False, # enable software flow control |
+ rtscts=False, # enable RTS/CTS flow control |
+ writeTimeout=None, # set a timeout for writes |
+ dsrdtr=False, # None: use rtscts setting, dsrdtr override if True or False |
+ interCharTimeout=None # Inter-character timeout, None to disable |
+ ): |
+ """Initialize comm port object. If a port is given, then the port will be |
+ opened immediately. Otherwise a Serial port object in closed state |
+ is returned.""" |
+ |
+ self._isOpen = False |
+ self._port = None # correct value is assigned below through properties |
+ self._baudrate = None # correct value is assigned below through properties |
+ self._bytesize = None # correct value is assigned below through properties |
+ self._parity = None # correct value is assigned below through properties |
+ self._stopbits = None # correct value is assigned below through properties |
+ self._timeout = None # correct value is assigned below through properties |
+ self._writeTimeout = None # correct value is assigned below through properties |
+ self._xonxoff = None # correct value is assigned below through properties |
+ self._rtscts = None # correct value is assigned below through properties |
+ self._dsrdtr = None # correct value is assigned below through properties |
+ self._interCharTimeout = None # correct value is assigned below through properties |
+ |
+ # assign values using get/set methods using the properties feature |
+ self.port = port |
+ self.baudrate = baudrate |
+ self.bytesize = bytesize |
+ self.parity = parity |
+ self.stopbits = stopbits |
+ self.timeout = timeout |
+ self.writeTimeout = writeTimeout |
+ self.xonxoff = xonxoff |
+ self.rtscts = rtscts |
+ self.dsrdtr = dsrdtr |
+ self.interCharTimeout = interCharTimeout |
+ |
+ if port is not None: |
+ self.open() |
+ |
+ def isOpen(self): |
+ """Check if the port is opened.""" |
+ return self._isOpen |
+ |
+ # - - - - - - - - - - - - - - - - - - - - - - - - |
+ |
+ # TODO: these are not really needed as the is the BAUDRATES etc. attribute... |
+ # maybe i remove them before the final release... |
+ |
+ def getSupportedBaudrates(self): |
+ return [(str(b), b) for b in self.BAUDRATES] |
+ |
+ def getSupportedByteSizes(self): |
+ return [(str(b), b) for b in self.BYTESIZES] |
+ |
+ def getSupportedStopbits(self): |
+ return [(str(b), b) for b in self.STOPBITS] |
+ |
+ def getSupportedParities(self): |
+ return [(PARITY_NAMES[b], b) for b in self.PARITIES] |
+ |
+ # - - - - - - - - - - - - - - - - - - - - - - - - |
+ |
+ def setPort(self, port): |
+ """Change the port. The attribute portstr is set to a string that |
+ contains the name of the port.""" |
+ |
+ was_open = self._isOpen |
+ if was_open: self.close() |
+ if port is not None: |
+ if isinstance(port, basestring): |
+ self.portstr = port |
+ else: |
+ self.portstr = self.makeDeviceName(port) |
+ else: |
+ self.portstr = None |
+ self._port = port |
+ self.name = self.portstr |
+ if was_open: self.open() |
+ |
+ def getPort(self): |
+ """Get the current port setting. The value that was passed on init or using |
+ setPort() is passed back. See also the attribute portstr which contains |
+ the name of the port as a string.""" |
+ return self._port |
+ |
+ port = property(getPort, setPort, doc="Port setting") |
+ |
+ |
+ def setBaudrate(self, baudrate): |
+ """Change baud rate. It raises a ValueError if the port is open and the |
+ baud rate is not possible. If the port is closed, then the value is |
+ accepted and the exception is raised when the port is opened.""" |
+ try: |
+ b = int(baudrate) |
+ except TypeError: |
+ raise ValueError("Not a valid baudrate: %r" % (baudrate,)) |
+ else: |
+ if b <= 0: |
+ raise ValueError("Not a valid baudrate: %r" % (baudrate,)) |
+ self._baudrate = b |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getBaudrate(self): |
+ """Get the current baud rate setting.""" |
+ return self._baudrate |
+ |
+ baudrate = property(getBaudrate, setBaudrate, doc="Baud rate setting") |
+ |
+ |
+ def setByteSize(self, bytesize): |
+ """Change byte size.""" |
+ if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % (bytesize,)) |
+ self._bytesize = bytesize |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getByteSize(self): |
+ """Get the current byte size setting.""" |
+ return self._bytesize |
+ |
+ bytesize = property(getByteSize, setByteSize, doc="Byte size setting") |
+ |
+ |
+ def setParity(self, parity): |
+ """Change parity setting.""" |
+ if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % (parity,)) |
+ self._parity = parity |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getParity(self): |
+ """Get the current parity setting.""" |
+ return self._parity |
+ |
+ parity = property(getParity, setParity, doc="Parity setting") |
+ |
+ |
+ def setStopbits(self, stopbits): |
+ """Change stop bits size.""" |
+ if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: %r" % (stopbits,)) |
+ self._stopbits = stopbits |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getStopbits(self): |
+ """Get the current stop bits setting.""" |
+ return self._stopbits |
+ |
+ stopbits = property(getStopbits, setStopbits, doc="Stop bits setting") |
+ |
+ |
+ def setTimeout(self, timeout): |
+ """Change timeout setting.""" |
+ if timeout is not None: |
+ try: |
+ timeout + 1 # test if it's a number, will throw a TypeError if not... |
+ except TypeError: |
+ raise ValueError("Not a valid timeout: %r" % (timeout,)) |
+ if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,)) |
+ self._timeout = timeout |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getTimeout(self): |
+ """Get the current timeout setting.""" |
+ return self._timeout |
+ |
+ timeout = property(getTimeout, setTimeout, doc="Timeout setting for read()") |
+ |
+ |
+ def setWriteTimeout(self, timeout): |
+ """Change timeout setting.""" |
+ if timeout is not None: |
+ if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,)) |
+ try: |
+ timeout + 1 #test if it's a number, will throw a TypeError if not... |
+ except TypeError: |
+ raise ValueError("Not a valid timeout: %r" % timeout) |
+ |
+ self._writeTimeout = timeout |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getWriteTimeout(self): |
+ """Get the current timeout setting.""" |
+ return self._writeTimeout |
+ |
+ writeTimeout = property(getWriteTimeout, setWriteTimeout, doc="Timeout setting for write()") |
+ |
+ |
+ def setXonXoff(self, xonxoff): |
+ """Change XON/XOFF setting.""" |
+ self._xonxoff = xonxoff |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getXonXoff(self): |
+ """Get the current XON/XOFF setting.""" |
+ return self._xonxoff |
+ |
+ xonxoff = property(getXonXoff, setXonXoff, doc="XON/XOFF setting") |
+ |
+ def setRtsCts(self, rtscts): |
+ """Change RTS/CTS flow control setting.""" |
+ self._rtscts = rtscts |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getRtsCts(self): |
+ """Get the current RTS/CTS flow control setting.""" |
+ return self._rtscts |
+ |
+ rtscts = property(getRtsCts, setRtsCts, doc="RTS/CTS flow control setting") |
+ |
+ def setDsrDtr(self, dsrdtr=None): |
+ """Change DsrDtr flow control setting.""" |
+ if dsrdtr is None: |
+ # if not set, keep backwards compatibility and follow rtscts setting |
+ self._dsrdtr = self._rtscts |
+ else: |
+ # if defined independently, follow its value |
+ self._dsrdtr = dsrdtr |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getDsrDtr(self): |
+ """Get the current DSR/DTR flow control setting.""" |
+ return self._dsrdtr |
+ |
+ dsrdtr = property(getDsrDtr, setDsrDtr, "DSR/DTR flow control setting") |
+ |
+ def setInterCharTimeout(self, interCharTimeout): |
+ """Change inter-character timeout setting.""" |
+ if interCharTimeout is not None: |
+ if interCharTimeout < 0: raise ValueError("Not a valid timeout: %r" % interCharTimeout) |
+ try: |
+ interCharTimeout + 1 # test if it's a number, will throw a TypeError if not... |
+ except TypeError: |
+ raise ValueError("Not a valid timeout: %r" % interCharTimeout) |
+ |
+ self._interCharTimeout = interCharTimeout |
+ if self._isOpen: self._reconfigurePort() |
+ |
+ def getInterCharTimeout(self): |
+ """Get the current inter-character timeout setting.""" |
+ return self._interCharTimeout |
+ |
+ interCharTimeout = property(getInterCharTimeout, setInterCharTimeout, doc="Inter-character timeout setting for read()") |
+ |
+ # - - - - - - - - - - - - - - - - - - - - - - - - |
+ |
+ _SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff', |
+ 'dsrdtr', 'rtscts', 'timeout', 'writeTimeout', 'interCharTimeout') |
+ |
+ def getSettingsDict(self): |
+ """Get current port settings as a dictionary. For use with |
+ applySettingsDict""" |
+ return dict([(key, getattr(self, '_'+key)) for key in self._SETTINGS]) |
+ |
+ def applySettingsDict(self, d): |
+ """apply stored settings from a dictionary returned from |
+ getSettingsDict. it's allowed to delete keys from the dictionary. these |
+ values will simply left unchanged.""" |
+ for key in self._SETTINGS: |
+ if d[key] != getattr(self, '_'+key): # check against internal "_" value |
+ setattr(self, key, d[key]) # set non "_" value to use properties write function |
+ |
+ # - - - - - - - - - - - - - - - - - - - - - - - - |
+ |
+ def __repr__(self): |
+ """String representation of the current port settings and its state.""" |
+ return "%s<id=0x%x, open=%s>(port=%r, baudrate=%r, bytesize=%r, parity=%r, stopbits=%r, timeout=%r, xonxoff=%r, rtscts=%r, dsrdtr=%r)" % ( |
+ self.__class__.__name__, |
+ id(self), |
+ self._isOpen, |
+ self.portstr, |
+ self.baudrate, |
+ self.bytesize, |
+ self.parity, |
+ self.stopbits, |
+ self.timeout, |
+ self.xonxoff, |
+ self.rtscts, |
+ self.dsrdtr, |
+ ) |
+ |
+ |
+ # - - - - - - - - - - - - - - - - - - - - - - - - |
+ # compatibility with io library |
+ |
+ def readable(self): return True |
+ def writable(self): return True |
+ def seekable(self): return False |
+ def readinto(self, b): |
+ data = self.read(len(b)) |
+ n = len(data) |
+ try: |
+ b[:n] = data |
+ except TypeError, err: |
+ import array |
+ if not isinstance(b, array.array): |
+ raise err |
+ b[:n] = array.array('b', data) |
+ return n |
+ |
+ |
+if __name__ == '__main__': |
+ import sys |
+ s = SerialBase() |
+ sys.stdout.write('port name: %s\n' % s.portstr) |
+ sys.stdout.write('baud rates: %s\n' % s.getSupportedBaudrates()) |
+ sys.stdout.write('byte sizes: %s\n' % s.getSupportedByteSizes()) |
+ sys.stdout.write('parities: %s\n' % s.getSupportedParities()) |
+ sys.stdout.write('stop bits: %s\n' % s.getSupportedStopbits()) |
+ sys.stdout.write('%s\n' % s) |