| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2008 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 """ | |
| 6 Abstract file handle class | |
| 7 """ | |
| 8 | |
| 9 from twisted.internet import main, error, interfaces | |
| 10 from twisted.python import log, failure | |
| 11 from twisted.persisted import styles | |
| 12 | |
| 13 from zope.interface import implements | |
| 14 import errno | |
| 15 | |
| 16 from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF | |
| 17 from twisted.internet.iocpreactor.const import ERROR_IO_PENDING | |
| 18 from twisted.internet.iocpreactor import iocpsupport as _iocp | |
| 19 | |
| 20 | |
| 21 | |
| 22 class FileHandle(log.Logger, styles.Ephemeral, object): | |
| 23 """ | |
| 24 File handle that can read and write asynchronously | |
| 25 """ | |
| 26 implements(interfaces.IProducer, interfaces.IConsumer, | |
| 27 interfaces.ITransport, interfaces.IHalfCloseableDescriptor) | |
| 28 # read stuff | |
| 29 maxReadBuffers = 16 | |
| 30 readBufferSize = 4096 | |
| 31 reading = False | |
| 32 dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs | |
| 33 _readNextBuffer = 0 | |
| 34 _readSize = 0 # how much data we have in the read buffer | |
| 35 _readScheduled = None | |
| 36 _readScheduledInOS = False | |
| 37 | |
| 38 | |
| 39 def startReading(self): | |
| 40 self.reactor.addActiveHandle(self) | |
| 41 if not self._readScheduled and not self.reading: | |
| 42 self.reading = True | |
| 43 self._readScheduled = self.reactor.callLater(0, | |
| 44 self._resumeReading) | |
| 45 | |
| 46 | |
| 47 def stopReading(self): | |
| 48 if self._readScheduled: | |
| 49 self._readScheduled.cancel() | |
| 50 self._readScheduled = None | |
| 51 self.reading = False | |
| 52 | |
| 53 | |
| 54 def _resumeReading(self): | |
| 55 self._readScheduled = None | |
| 56 if self._dispatchData() and not self._readScheduledInOS: | |
| 57 self.doRead() | |
| 58 | |
| 59 | |
| 60 def _dispatchData(self): | |
| 61 """ | |
| 62 Dispatch previously read data. Return True if self.reading and we don't | |
| 63 have any more data | |
| 64 """ | |
| 65 if not self._readSize: | |
| 66 return self.reading | |
| 67 size = self._readSize | |
| 68 full_buffers = size // self.readBufferSize | |
| 69 while self._readNextBuffer < full_buffers: | |
| 70 self.dataReceived(self._readBuffers[self._readNextBuffer]) | |
| 71 self._readNextBuffer += 1 | |
| 72 if not self.reading: | |
| 73 return False | |
| 74 remainder = size % self.readBufferSize | |
| 75 if remainder: | |
| 76 self.dataReceived(buffer(self._readBuffers[full_buffers], | |
| 77 0, remainder)) | |
| 78 if self.dynamicReadBuffers: | |
| 79 total_buffer_size = self.readBufferSize * len(self._readBuffers) | |
| 80 # we have one buffer too many | |
| 81 if size < total_buffer_size - self.readBufferSize: | |
| 82 del self._readBuffers[-1] | |
| 83 # we filled all buffers, so allocate one more | |
| 84 elif (size == total_buffer_size and | |
| 85 len(self._readBuffers) < self.maxReadBuffers): | |
| 86 self._readBuffers.append(_iocp.AllocateReadBuffer( | |
| 87 self.readBufferSize)) | |
| 88 self._readNextBuffer = 0 | |
| 89 self._readSize = 0 | |
| 90 return self.reading | |
| 91 | |
| 92 | |
| 93 def _cbRead(self, rc, bytes, evt): | |
| 94 self._readScheduledInOS = False | |
| 95 if self._handleRead(rc, bytes, evt): | |
| 96 self.doRead() | |
| 97 | |
| 98 | |
| 99 def _handleRead(self, rc, bytes, evt): | |
| 100 """ | |
| 101 Returns False if we should stop reading for now | |
| 102 """ | |
| 103 if self.disconnected: | |
| 104 return False | |
| 105 # graceful disconnection | |
| 106 if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF): | |
| 107 self.reactor.removeActiveHandle(self) | |
| 108 self.readConnectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 109 return False | |
| 110 # XXX: not handling WSAEWOULDBLOCK | |
| 111 # ("too many outstanding overlapped I/O requests") | |
| 112 elif rc: | |
| 113 self.connectionLost(failure.Failure( | |
| 114 error.ConnectionLost("read error -- %s (%s)" % | |
| 115 (errno.errorcode.get(rc, 'unknown'), rc)))) | |
| 116 return False | |
| 117 else: | |
| 118 assert self._readSize == 0 | |
| 119 assert self._readNextBuffer == 0 | |
| 120 self._readSize = bytes | |
| 121 return self._dispatchData() | |
| 122 | |
| 123 | |
| 124 def doRead(self): | |
| 125 numReads = 0 | |
| 126 while 1: | |
| 127 evt = _iocp.Event(self._cbRead, self) | |
| 128 | |
| 129 evt.buff = buff = self._readBuffers | |
| 130 rc, bytes = self.readFromHandle(buff, evt) | |
| 131 | |
| 132 if (rc == ERROR_IO_PENDING | |
| 133 or (not rc and numReads >= self.maxReads)): | |
| 134 self._readScheduledInOS = True | |
| 135 break | |
| 136 else: | |
| 137 evt.ignore = True | |
| 138 if not self._handleRead(rc, bytes, evt): | |
| 139 break | |
| 140 numReads += 1 | |
| 141 | |
| 142 | |
| 143 def readFromHandle(self, bufflist, evt): | |
| 144 raise NotImplementedError() # TODO: this should default to ReadFile | |
| 145 | |
| 146 | |
| 147 def dataReceived(self, data): | |
| 148 raise NotImplementedError | |
| 149 | |
| 150 | |
| 151 def readConnectionLost(self, reason): | |
| 152 self.connectionLost(reason) | |
| 153 | |
| 154 | |
| 155 # write stuff | |
| 156 dataBuffer = '' | |
| 157 offset = 0 | |
| 158 writing = False | |
| 159 _writeScheduled = None | |
| 160 _writeDisconnecting = False | |
| 161 _writeDisconnected = False | |
| 162 writeBufferSize = 2**2**2**2 | |
| 163 maxWrites = 5 | |
| 164 | |
| 165 | |
| 166 def loseWriteConnection(self): | |
| 167 self._writeDisconnecting = True | |
| 168 self.startWriting() | |
| 169 | |
| 170 | |
| 171 def _closeWriteConnection(self): | |
| 172 # override in subclasses | |
| 173 pass | |
| 174 | |
| 175 | |
| 176 def writeConnectionLost(self, reason): | |
| 177 # in current code should never be called | |
| 178 self.connectionLost(reason) | |
| 179 | |
| 180 | |
| 181 def startWriting(self): | |
| 182 self.reactor.addActiveHandle(self) | |
| 183 self.writing = True | |
| 184 if not self._writeScheduled: | |
| 185 self._writeScheduled = self.reactor.callLater(0, | |
| 186 self._resumeWriting) | |
| 187 | |
| 188 | |
| 189 def stopWriting(self): | |
| 190 if self._writeScheduled: | |
| 191 self._writeScheduled.cancel() | |
| 192 self._writeScheduled = None | |
| 193 self.writing = False | |
| 194 | |
| 195 | |
| 196 def _resumeWriting(self): | |
| 197 self._writeScheduled = None | |
| 198 self.doWrite() | |
| 199 | |
| 200 | |
| 201 def _cbWrite(self, rc, bytes, evt): | |
| 202 if self._handleWrite(rc, bytes, evt): | |
| 203 self.doWrite() | |
| 204 | |
| 205 | |
| 206 def _handleWrite(self, rc, bytes, evt): | |
| 207 """ | |
| 208 Returns false if we should stop writing for now | |
| 209 """ | |
| 210 if self.disconnected or self._writeDisconnected: | |
| 211 return False | |
| 212 # XXX: not handling WSAEWOULDBLOCK | |
| 213 # ("too many outstanding overlapped I/O requests") | |
| 214 if rc: | |
| 215 self.connectionLost(failure.Failure( | |
| 216 error.ConnectionLost("write error -- %s (%s)" % | |
| 217 (errno.errorcode.get(rc, 'unknown'), rc)))) | |
| 218 return False | |
| 219 else: | |
| 220 self.offset += bytes | |
| 221 # If there is nothing left to send, | |
| 222 if self.offset == len(self.dataBuffer) and not self._tempDataLen: | |
| 223 self.dataBuffer = "" | |
| 224 self.offset = 0 | |
| 225 # stop writing | |
| 226 self.stopWriting() | |
| 227 # If I've got a producer who is supposed to supply me with data | |
| 228 if self.producer is not None and ((not self.streamingProducer) | |
| 229 or self.producerPaused): | |
| 230 # tell them to supply some more. | |
| 231 self.producerPaused = True | |
| 232 self.producer.resumeProducing() | |
| 233 elif self.disconnecting: | |
| 234 # But if I was previously asked to let the connection die, | |
| 235 # do so. | |
| 236 self.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 237 elif self._writeDisconnecting: | |
| 238 # I was previously asked to to half-close the connection. | |
| 239 self._closeWriteConnection() | |
| 240 self._writeDisconnected = True | |
| 241 return False | |
| 242 else: | |
| 243 return True | |
| 244 | |
| 245 | |
| 246 def doWrite(self): | |
| 247 numWrites = 0 | |
| 248 while 1: | |
| 249 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: | |
| 250 # If there is currently less than SEND_LIMIT bytes left to send | |
| 251 # in the string, extend it with the array data. | |
| 252 self.dataBuffer = (buffer(self.dataBuffer, self.offset) + | |
| 253 "".join(self._tempDataBuffer)) | |
| 254 self.offset = 0 | |
| 255 self._tempDataBuffer = [] | |
| 256 self._tempDataLen = 0 | |
| 257 | |
| 258 evt = _iocp.Event(self._cbWrite, self) | |
| 259 | |
| 260 # Send as much data as you can. | |
| 261 if self.offset: | |
| 262 evt.buff = buff = buffer(self.dataBuffer, self.offset) | |
| 263 else: | |
| 264 evt.buff = buff = self.dataBuffer | |
| 265 rc, bytes = self.writeToHandle(buff, evt) | |
| 266 if (rc == ERROR_IO_PENDING | |
| 267 or (not rc and numWrites >= self.maxWrites)): | |
| 268 break | |
| 269 else: | |
| 270 evt.ignore = True | |
| 271 if not self._handleWrite(rc, bytes, evt): | |
| 272 break | |
| 273 numWrites += 1 | |
| 274 | |
| 275 | |
| 276 def writeToHandle(self, buff, evt): | |
| 277 raise NotImplementedError() # TODO: this should default to WriteFile | |
| 278 | |
| 279 | |
| 280 def write(self, data): | |
| 281 """Reliably write some data. | |
| 282 | |
| 283 The data is buffered until his file descriptor is ready for writing. | |
| 284 """ | |
| 285 if isinstance(data, unicode): # no, really, I mean it | |
| 286 raise TypeError("Data must not be unicode") | |
| 287 if not self.connected or self._writeDisconnected: | |
| 288 return | |
| 289 if data: | |
| 290 self._tempDataBuffer.append(data) | |
| 291 self._tempDataLen += len(data) | |
| 292 if self.producer is not None: | |
| 293 if (len(self.dataBuffer) + self._tempDataLen | |
| 294 > self.writeBufferSize): | |
| 295 self.producerPaused = True | |
| 296 self.producer.pauseProducing() | |
| 297 self.startWriting() | |
| 298 | |
| 299 | |
| 300 def writeSequence(self, iovec): | |
| 301 if not self.connected or not iovec or self._writeDisconnected: | |
| 302 return | |
| 303 self._tempDataBuffer.extend(iovec) | |
| 304 for i in iovec: | |
| 305 self._tempDataLen += len(i) | |
| 306 if self.producer is not None: | |
| 307 if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize: | |
| 308 self.producerPaused = True | |
| 309 self.producer.pauseProducing() | |
| 310 self.startWriting() | |
| 311 | |
| 312 | |
| 313 # general stuff | |
| 314 connected = False | |
| 315 disconnected = False | |
| 316 disconnecting = False | |
| 317 logstr = "Uninitialized" | |
| 318 | |
| 319 SEND_LIMIT = 128*1024 | |
| 320 | |
| 321 maxReads = 5 | |
| 322 | |
| 323 | |
| 324 def __init__(self, reactor = None): | |
| 325 if not reactor: | |
| 326 from twisted.internet import reactor | |
| 327 self.reactor = reactor | |
| 328 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite | |
| 329 self._tempDataLen = 0 | |
| 330 self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)] | |
| 331 | |
| 332 | |
| 333 def connectionLost(self, reason): | |
| 334 """ | |
| 335 The connection was lost. | |
| 336 | |
| 337 This is called when the connection on a selectable object has been | |
| 338 lost. It will be called whether the connection was closed explicitly, | |
| 339 an exception occurred in an event handler, or the other end of the | |
| 340 connection closed it first. | |
| 341 | |
| 342 Clean up state here, but make sure to call back up to FileDescriptor. | |
| 343 """ | |
| 344 | |
| 345 self.disconnected = True | |
| 346 self.connected = False | |
| 347 if self.producer is not None: | |
| 348 self.producer.stopProducing() | |
| 349 self.producer = None | |
| 350 self.stopReading() | |
| 351 self.stopWriting() | |
| 352 self.reactor.removeActiveHandle(self) | |
| 353 | |
| 354 | |
| 355 def getFileHandle(self): | |
| 356 return -1 | |
| 357 | |
| 358 | |
| 359 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): | |
| 360 """ | |
| 361 Close the connection at the next available opportunity. | |
| 362 | |
| 363 Call this to cause this FileDescriptor to lose its connection. It will | |
| 364 first write any data that it has buffered. | |
| 365 | |
| 366 If there is data buffered yet to be written, this method will cause the | |
| 367 transport to lose its connection as soon as it's done flushing its | |
| 368 write buffer. If you have a producer registered, the connection won't | |
| 369 be closed until the producer is finished. Therefore, make sure you | |
| 370 unregister your producer when it's finished, or the connection will | |
| 371 never close. | |
| 372 """ | |
| 373 | |
| 374 if self.connected and not self.disconnecting: | |
| 375 if self._writeDisconnected: | |
| 376 # doWrite won't trigger the connection close anymore | |
| 377 self.stopReading() | |
| 378 self.stopWriting | |
| 379 self.connectionLost(_connDone) | |
| 380 else: | |
| 381 self.stopReading() | |
| 382 self.startWriting() | |
| 383 self.disconnecting = 1 | |
| 384 | |
| 385 | |
| 386 # Producer/consumer implementation | |
| 387 | |
| 388 producerPaused = False | |
| 389 streamingProducer = False | |
| 390 | |
| 391 # first, the consumer stuff. This requires no additional work, as | |
| 392 # any object you can write to can be a consumer, really. | |
| 393 | |
| 394 producer = None | |
| 395 | |
| 396 | |
| 397 def registerProducer(self, producer, streaming): | |
| 398 """ | |
| 399 Register to receive data from a producer. | |
| 400 | |
| 401 This sets this selectable to be a consumer for a producer. When this | |
| 402 selectable runs out of data on a write() call, it will ask the producer | |
| 403 to resumeProducing(). A producer should implement the IProducer | |
| 404 interface. | |
| 405 | |
| 406 FileDescriptor provides some infrastructure for producer methods. | |
| 407 """ | |
| 408 if self.producer is not None: | |
| 409 raise RuntimeError( | |
| 410 "Cannot register producer %s, because producer " | |
| 411 "%s was never unregistered." % (producer, self.producer)) | |
| 412 if self.disconnected: | |
| 413 producer.stopProducing() | |
| 414 else: | |
| 415 self.producer = producer | |
| 416 self.streamingProducer = streaming | |
| 417 if not streaming: | |
| 418 producer.resumeProducing() | |
| 419 | |
| 420 | |
| 421 def unregisterProducer(self): | |
| 422 """ | |
| 423 Stop consuming data from a producer, without disconnecting. | |
| 424 """ | |
| 425 self.producer = None | |
| 426 | |
| 427 | |
| 428 def stopConsuming(self): | |
| 429 """ | |
| 430 Stop consuming data. | |
| 431 | |
| 432 This is called when a producer has lost its connection, to tell the | |
| 433 consumer to go lose its connection (and break potential circular | |
| 434 references). | |
| 435 """ | |
| 436 self.unregisterProducer() | |
| 437 self.loseConnection() | |
| 438 | |
| 439 | |
| 440 # producer interface implementation | |
| 441 | |
| 442 def resumeProducing(self): | |
| 443 assert self.connected and not self.disconnecting | |
| 444 self.startReading() | |
| 445 | |
| 446 | |
| 447 def pauseProducing(self): | |
| 448 self.stopReading() | |
| 449 | |
| 450 | |
| 451 def stopProducing(self): | |
| 452 self.loseConnection() | |
| 453 | |
| 454 | |
| 455 __all__ = ['FileHandle'] | |
| 456 | |
| OLD | NEW |