OLD | NEW |
(Empty) | |
| 1 #! python |
| 2 # Python Serial Port Extension for Win32, Linux, BSD, Jython |
| 3 # serial driver for win32 |
| 4 # see __init__.py |
| 5 # |
| 6 # (C) 2001-2011 Chris Liechti <cliechti@gmx.net> |
| 7 # this is distributed under a free software license, see license.txt |
| 8 # |
| 9 # Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com> |
| 10 |
| 11 import ctypes |
| 12 from serial import win32 |
| 13 |
| 14 from serial.serialutil import * |
| 15 |
| 16 |
| 17 def device(portnum): |
| 18 """Turn a port number into a device name""" |
| 19 return 'COM%d' % (portnum+1) # numbers are transformed to a string |
| 20 |
| 21 |
| 22 class Win32Serial(SerialBase): |
| 23 """Serial port implementation for Win32 based on ctypes.""" |
| 24 |
| 25 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, |
| 26 9600, 19200, 38400, 57600, 115200) |
| 27 |
| 28 def __init__(self, *args, **kwargs): |
| 29 self.hComPort = None |
| 30 self._overlappedRead = None |
| 31 self._overlappedWrite = None |
| 32 self._rtsToggle = False |
| 33 |
| 34 self._rtsState = win32.RTS_CONTROL_ENABLE |
| 35 self._dtrState = win32.DTR_CONTROL_ENABLE |
| 36 |
| 37 |
| 38 SerialBase.__init__(self, *args, **kwargs) |
| 39 |
| 40 def open(self): |
| 41 """Open port with current settings. This may throw a SerialException |
| 42 if the port cannot be opened.""" |
| 43 if self._port is None: |
| 44 raise SerialException("Port must be configured before it can be used
.") |
| 45 if self._isOpen: |
| 46 raise SerialException("Port is already open.") |
| 47 # the "\\.\COMx" format is required for devices other than COM1-COM8 |
| 48 # not all versions of windows seem to support this properly |
| 49 # so that the first few ports are used with the DOS device name |
| 50 port = self.portstr |
| 51 try: |
| 52 if port.upper().startswith('COM') and int(port[3:]) > 8: |
| 53 port = '\\\\.\\' + port |
| 54 except ValueError: |
| 55 # for like COMnotanumber |
| 56 pass |
| 57 self.hComPort = win32.CreateFile(port, |
| 58 win32.GENERIC_READ | win32.GENERIC_WRITE, |
| 59 0, # exclusive access |
| 60 None, # no security |
| 61 win32.OPEN_EXISTING, |
| 62 win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, |
| 63 0) |
| 64 if self.hComPort == win32.INVALID_HANDLE_VALUE: |
| 65 self.hComPort = None # 'cause __del__ is called anyway |
| 66 raise SerialException("could not open port %r: %r" % (self.portstr,
ctypes.WinError())) |
| 67 |
| 68 try: |
| 69 self._overlappedRead = win32.OVERLAPPED() |
| 70 self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None) |
| 71 self._overlappedWrite = win32.OVERLAPPED() |
| 72 #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None
) |
| 73 self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None) |
| 74 |
| 75 # Setup a 4k buffer |
| 76 win32.SetupComm(self.hComPort, 4096, 4096) |
| 77 |
| 78 # Save original timeout values: |
| 79 self._orgTimeouts = win32.COMMTIMEOUTS() |
| 80 win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts)
) |
| 81 |
| 82 self._reconfigurePort() |
| 83 |
| 84 # Clear buffers: |
| 85 # Remove anything that was there |
| 86 win32.PurgeComm(self.hComPort, |
| 87 win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | |
| 88 win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) |
| 89 except: |
| 90 try: |
| 91 self._close() |
| 92 except: |
| 93 # ignore any exception when closing the port |
| 94 # also to keep original exception that happened when setting up |
| 95 pass |
| 96 self.hComPort = None |
| 97 raise |
| 98 else: |
| 99 self._isOpen = True |
| 100 |
| 101 |
| 102 def _reconfigurePort(self): |
| 103 """Set communication parameters on opened port.""" |
| 104 if not self.hComPort: |
| 105 raise SerialException("Can only operate on a valid port handle") |
| 106 |
| 107 # Set Windows timeout values |
| 108 # timeouts is a tuple with the following items: |
| 109 # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier, |
| 110 # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier, |
| 111 # WriteTotalTimeoutConstant) |
| 112 if self._timeout is None: |
| 113 timeouts = (0, 0, 0, 0, 0) |
| 114 elif self._timeout == 0: |
| 115 timeouts = (win32.MAXDWORD, 0, 0, 0, 0) |
| 116 else: |
| 117 timeouts = (0, 0, int(self._timeout*1000), 0, 0) |
| 118 if self._timeout != 0 and self._interCharTimeout is not None: |
| 119 timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:] |
| 120 |
| 121 if self._writeTimeout is None: |
| 122 pass |
| 123 elif self._writeTimeout == 0: |
| 124 timeouts = timeouts[:-2] + (0, win32.MAXDWORD) |
| 125 else: |
| 126 timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000)) |
| 127 win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*ti
meouts))) |
| 128 |
| 129 win32.SetCommMask(self.hComPort, win32.EV_ERR) |
| 130 |
| 131 # Setup the connection info. |
| 132 # Get state and modify it: |
| 133 comDCB = win32.DCB() |
| 134 win32.GetCommState(self.hComPort, ctypes.byref(comDCB)) |
| 135 comDCB.BaudRate = self._baudrate |
| 136 |
| 137 if self._bytesize == FIVEBITS: |
| 138 comDCB.ByteSize = 5 |
| 139 elif self._bytesize == SIXBITS: |
| 140 comDCB.ByteSize = 6 |
| 141 elif self._bytesize == SEVENBITS: |
| 142 comDCB.ByteSize = 7 |
| 143 elif self._bytesize == EIGHTBITS: |
| 144 comDCB.ByteSize = 8 |
| 145 else: |
| 146 raise ValueError("Unsupported number of data bits: %r" % self._bytes
ize) |
| 147 |
| 148 if self._parity == PARITY_NONE: |
| 149 comDCB.Parity = win32.NOPARITY |
| 150 comDCB.fParity = 0 # Disable Parity Check |
| 151 elif self._parity == PARITY_EVEN: |
| 152 comDCB.Parity = win32.EVENPARITY |
| 153 comDCB.fParity = 1 # Enable Parity Check |
| 154 elif self._parity == PARITY_ODD: |
| 155 comDCB.Parity = win32.ODDPARITY |
| 156 comDCB.fParity = 1 # Enable Parity Check |
| 157 elif self._parity == PARITY_MARK: |
| 158 comDCB.Parity = win32.MARKPARITY |
| 159 comDCB.fParity = 1 # Enable Parity Check |
| 160 elif self._parity == PARITY_SPACE: |
| 161 comDCB.Parity = win32.SPACEPARITY |
| 162 comDCB.fParity = 1 # Enable Parity Check |
| 163 else: |
| 164 raise ValueError("Unsupported parity mode: %r" % self._parity) |
| 165 |
| 166 if self._stopbits == STOPBITS_ONE: |
| 167 comDCB.StopBits = win32.ONESTOPBIT |
| 168 elif self._stopbits == STOPBITS_ONE_POINT_FIVE: |
| 169 comDCB.StopBits = win32.ONE5STOPBITS |
| 170 elif self._stopbits == STOPBITS_TWO: |
| 171 comDCB.StopBits = win32.TWOSTOPBITS |
| 172 else: |
| 173 raise ValueError("Unsupported number of stop bits: %r" % self._stopb
its) |
| 174 |
| 175 comDCB.fBinary = 1 # Enable Binary Transmission |
| 176 # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TR
UE) |
| 177 if self._rtscts: |
| 178 comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE |
| 179 elif self._rtsToggle: |
| 180 comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE |
| 181 else: |
| 182 comDCB.fRtsControl = self._rtsState |
| 183 if self._dsrdtr: |
| 184 comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE |
| 185 else: |
| 186 comDCB.fDtrControl = self._dtrState |
| 187 |
| 188 if self._rtsToggle: |
| 189 comDCB.fOutxCtsFlow = 0 |
| 190 else: |
| 191 comDCB.fOutxCtsFlow = self._rtscts |
| 192 comDCB.fOutxDsrFlow = self._dsrdtr |
| 193 comDCB.fOutX = self._xonxoff |
| 194 comDCB.fInX = self._xonxoff |
| 195 comDCB.fNull = 0 |
| 196 comDCB.fErrorChar = 0 |
| 197 comDCB.fAbortOnError = 0 |
| 198 comDCB.XonChar = XON |
| 199 comDCB.XoffChar = XOFF |
| 200 |
| 201 if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)): |
| 202 raise ValueError("Cannot configure port, some setting was wrong. Ori
ginal message: %r" % ctypes.WinError()) |
| 203 |
| 204 #~ def __del__(self): |
| 205 #~ self.close() |
| 206 |
| 207 |
| 208 def _close(self): |
| 209 """internal close port helper""" |
| 210 if self.hComPort: |
| 211 # Restore original timeout values: |
| 212 win32.SetCommTimeouts(self.hComPort, self._orgTimeouts) |
| 213 # Close COM-Port: |
| 214 win32.CloseHandle(self.hComPort) |
| 215 if self._overlappedRead is not None: |
| 216 win32.CloseHandle(self._overlappedRead.hEvent) |
| 217 self._overlappedRead = None |
| 218 if self._overlappedWrite is not None: |
| 219 win32.CloseHandle(self._overlappedWrite.hEvent) |
| 220 self._overlappedWrite = None |
| 221 self.hComPort = None |
| 222 |
| 223 def close(self): |
| 224 """Close port""" |
| 225 if self._isOpen: |
| 226 self._close() |
| 227 self._isOpen = False |
| 228 |
| 229 def makeDeviceName(self, port): |
| 230 return device(port) |
| 231 |
| 232 # - - - - - - - - - - - - - - - - - - - - - - - - |
| 233 |
| 234 def inWaiting(self): |
| 235 """Return the number of characters currently in the input buffer.""" |
| 236 flags = win32.DWORD() |
| 237 comstat = win32.COMSTAT() |
| 238 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.b
yref(comstat)): |
| 239 raise SerialException('call to ClearCommError failed') |
| 240 return comstat.cbInQue |
| 241 |
| 242 def read(self, size=1): |
| 243 """Read size bytes from the serial port. If a timeout is set it may |
| 244 return less characters as requested. With no timeout it will block |
| 245 until the requested number of bytes is read.""" |
| 246 if not self.hComPort: raise portNotOpenError |
| 247 if size > 0: |
| 248 win32.ResetEvent(self._overlappedRead.hEvent) |
| 249 flags = win32.DWORD() |
| 250 comstat = win32.COMSTAT() |
| 251 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctyp
es.byref(comstat)): |
| 252 raise SerialException('call to ClearCommError failed') |
| 253 if self.timeout == 0: |
| 254 n = min(comstat.cbInQue, size) |
| 255 if n > 0: |
| 256 buf = ctypes.create_string_buffer(n) |
| 257 rc = win32.DWORD() |
| 258 err = win32.ReadFile(self.hComPort, buf, n, ctypes.byref(rc)
, ctypes.byref(self._overlappedRead)) |
| 259 if not err and win32.GetLastError() != win32.ERROR_IO_PENDIN
G: |
| 260 raise SerialException("ReadFile failed (%r)" % ctypes.Wi
nError()) |
| 261 err = win32.WaitForSingleObject(self._overlappedRead.hEvent,
win32.INFINITE) |
| 262 read = buf.raw[:rc.value] |
| 263 else: |
| 264 read = bytes() |
| 265 else: |
| 266 buf = ctypes.create_string_buffer(size) |
| 267 rc = win32.DWORD() |
| 268 err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc),
ctypes.byref(self._overlappedRead)) |
| 269 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: |
| 270 raise SerialException("ReadFile failed (%r)" % ctypes.WinErr
or()) |
| 271 err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self
._overlappedRead), ctypes.byref(rc), True) |
| 272 read = buf.raw[:rc.value] |
| 273 else: |
| 274 read = bytes() |
| 275 return bytes(read) |
| 276 |
| 277 def write(self, data): |
| 278 """Output the given string over the serial port.""" |
| 279 if not self.hComPort: raise portNotOpenError |
| 280 #~ if not isinstance(data, (bytes, bytearray)): |
| 281 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type
(data))) |
| 282 # convert data (needed in case of memoryview instance: Py 3.1 io lib), c
types doesn't like memoryview |
| 283 data = to_bytes(data) |
| 284 if data: |
| 285 #~ win32event.ResetEvent(self._overlappedWrite.hEvent) |
| 286 n = win32.DWORD() |
| 287 err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n
), self._overlappedWrite) |
| 288 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: |
| 289 raise SerialException("WriteFile failed (%r)" % ctypes.WinError(
)) |
| 290 if self._writeTimeout != 0: # if blocking (None) or w/ write timeout
(>0) |
| 291 # Wait for the write to complete. |
| 292 #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32
.INFINITE) |
| 293 err = win32.GetOverlappedResult(self.hComPort, self._overlappedW
rite, ctypes.byref(n), True) |
| 294 if n.value != len(data): |
| 295 raise writeTimeoutError |
| 296 return n.value |
| 297 else: |
| 298 return 0 |
| 299 |
| 300 def flush(self): |
| 301 """Flush of file like objects. In this case, wait until all data |
| 302 is written.""" |
| 303 while self.outWaiting(): |
| 304 time.sleep(0.05) |
| 305 # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would |
| 306 # require overlapped IO and its also only possible to set a single mask |
| 307 # on the port--- |
| 308 |
| 309 def flushInput(self): |
| 310 """Clear input buffer, discarding all that is in the buffer.""" |
| 311 if not self.hComPort: raise portNotOpenError |
| 312 win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT
) |
| 313 |
| 314 def flushOutput(self): |
| 315 """Clear output buffer, aborting the current output and |
| 316 discarding all that is in the buffer.""" |
| 317 if not self.hComPort: raise portNotOpenError |
| 318 win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT
) |
| 319 |
| 320 def sendBreak(self, duration=0.25): |
| 321 """Send break condition. Timed, returns to idle state after given durati
on.""" |
| 322 if not self.hComPort: raise portNotOpenError |
| 323 import time |
| 324 win32.SetCommBreak(self.hComPort) |
| 325 time.sleep(duration) |
| 326 win32.ClearCommBreak(self.hComPort) |
| 327 |
| 328 def setBreak(self, level=1): |
| 329 """Set break: Controls TXD. When active, to transmitting is possible.""" |
| 330 if not self.hComPort: raise portNotOpenError |
| 331 if level: |
| 332 win32.SetCommBreak(self.hComPort) |
| 333 else: |
| 334 win32.ClearCommBreak(self.hComPort) |
| 335 |
| 336 def setRTS(self, level=1): |
| 337 """Set terminal status line: Request To Send""" |
| 338 # remember level for reconfigure |
| 339 if level: |
| 340 self._rtsState = win32.RTS_CONTROL_ENABLE |
| 341 else: |
| 342 self._rtsState = win32.RTS_CONTROL_DISABLE |
| 343 # also apply now if port is open |
| 344 if self.hComPort: |
| 345 if level: |
| 346 win32.EscapeCommFunction(self.hComPort, win32.SETRTS) |
| 347 else: |
| 348 win32.EscapeCommFunction(self.hComPort, win32.CLRRTS) |
| 349 |
| 350 def setDTR(self, level=1): |
| 351 """Set terminal status line: Data Terminal Ready""" |
| 352 # remember level for reconfigure |
| 353 if level: |
| 354 self._dtrState = win32.DTR_CONTROL_ENABLE |
| 355 else: |
| 356 self._dtrState = win32.DTR_CONTROL_DISABLE |
| 357 # also apply now if port is open |
| 358 if self.hComPort: |
| 359 if level: |
| 360 win32.EscapeCommFunction(self.hComPort, win32.SETDTR) |
| 361 else: |
| 362 win32.EscapeCommFunction(self.hComPort, win32.CLRDTR) |
| 363 |
| 364 def _GetCommModemStatus(self): |
| 365 stat = win32.DWORD() |
| 366 win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat)) |
| 367 return stat.value |
| 368 |
| 369 def getCTS(self): |
| 370 """Read terminal status line: Clear To Send""" |
| 371 if not self.hComPort: raise portNotOpenError |
| 372 return win32.MS_CTS_ON & self._GetCommModemStatus() != 0 |
| 373 |
| 374 def getDSR(self): |
| 375 """Read terminal status line: Data Set Ready""" |
| 376 if not self.hComPort: raise portNotOpenError |
| 377 return win32.MS_DSR_ON & self._GetCommModemStatus() != 0 |
| 378 |
| 379 def getRI(self): |
| 380 """Read terminal status line: Ring Indicator""" |
| 381 if not self.hComPort: raise portNotOpenError |
| 382 return win32.MS_RING_ON & self._GetCommModemStatus() != 0 |
| 383 |
| 384 def getCD(self): |
| 385 """Read terminal status line: Carrier Detect""" |
| 386 if not self.hComPort: raise portNotOpenError |
| 387 return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0 |
| 388 |
| 389 # - - platform specific - - - - |
| 390 |
| 391 def setBufferSize(self, rx_size=4096, tx_size=None): |
| 392 """\ |
| 393 Recommend a buffer size to the driver (device driver can ignore this |
| 394 vlaue). Must be called before the port is opended. |
| 395 """ |
| 396 if tx_size is None: tx_size = rx_size |
| 397 win32.SetupComm(self.hComPort, rx_size, tx_size) |
| 398 |
| 399 def setXON(self, level=True): |
| 400 """\ |
| 401 Manually control flow - when software flow control is enabled. |
| 402 This will send XON (true) and XOFF (false) to the other device. |
| 403 WARNING: this function is not portable to different platforms! |
| 404 """ |
| 405 if not self.hComPort: raise portNotOpenError |
| 406 if level: |
| 407 win32.EscapeCommFunction(self.hComPort, win32.SETXON) |
| 408 else: |
| 409 win32.EscapeCommFunction(self.hComPort, win32.SETXOFF) |
| 410 |
| 411 def outWaiting(self): |
| 412 """return how many characters the in the outgoing buffer""" |
| 413 flags = win32.DWORD() |
| 414 comstat = win32.COMSTAT() |
| 415 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.b
yref(comstat)): |
| 416 raise SerialException('call to ClearCommError failed') |
| 417 return comstat.cbOutQue |
| 418 |
| 419 # functions useful for RS-485 adapters |
| 420 def setRtsToggle(self, rtsToggle): |
| 421 """Change RTS toggle control setting.""" |
| 422 self._rtsToggle = rtsToggle |
| 423 if self._isOpen: self._reconfigurePort() |
| 424 |
| 425 def getRtsToggle(self): |
| 426 """Get the current RTS toggle control setting.""" |
| 427 return self._rtsToggle |
| 428 |
| 429 rtsToggle = property(getRtsToggle, setRtsToggle, doc="RTS toggle control set
ting") |
| 430 |
| 431 |
| 432 # assemble Serial class with the platform specific implementation and the base |
| 433 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O |
| 434 # library, derive from io.RawIOBase |
| 435 try: |
| 436 import io |
| 437 except ImportError: |
| 438 # classic version with our own file-like emulation |
| 439 class Serial(Win32Serial, FileLike): |
| 440 pass |
| 441 else: |
| 442 # io library present |
| 443 class Serial(Win32Serial, io.RawIOBase): |
| 444 pass |
| 445 |
| 446 |
| 447 # Nur Testfunktion!! |
| 448 if __name__ == '__main__': |
| 449 s = Serial(0) |
| 450 sys.stdout.write("%s\n" % s) |
| 451 |
| 452 s = Serial() |
| 453 sys.stdout.write("%s\n" % s) |
| 454 |
| 455 s.baudrate = 19200 |
| 456 s.databits = 7 |
| 457 s.close() |
| 458 s.port = 0 |
| 459 s.open() |
| 460 sys.stdout.write("%s\n" % s) |
| 461 |
OLD | NEW |