Index: third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py |
diff --git a/third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py b/third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py |
deleted file mode 100644 |
index c1838a51b3df95a8eb99a67581e973cbbaae402a..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py |
+++ /dev/null |
@@ -1,456 +0,0 @@ |
-# Copyright (c) 2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-""" |
-Abstract file handle class |
-""" |
- |
-from twisted.internet import main, error, interfaces |
-from twisted.python import log, failure |
-from twisted.persisted import styles |
- |
-from zope.interface import implements |
-import errno |
- |
-from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF |
-from twisted.internet.iocpreactor.const import ERROR_IO_PENDING |
-from twisted.internet.iocpreactor import iocpsupport as _iocp |
- |
- |
- |
-class FileHandle(log.Logger, styles.Ephemeral, object): |
- """ |
- File handle that can read and write asynchronously |
- """ |
- implements(interfaces.IProducer, interfaces.IConsumer, |
- interfaces.ITransport, interfaces.IHalfCloseableDescriptor) |
- # read stuff |
- maxReadBuffers = 16 |
- readBufferSize = 4096 |
- reading = False |
- dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs |
- _readNextBuffer = 0 |
- _readSize = 0 # how much data we have in the read buffer |
- _readScheduled = None |
- _readScheduledInOS = False |
- |
- |
- def startReading(self): |
- self.reactor.addActiveHandle(self) |
- if not self._readScheduled and not self.reading: |
- self.reading = True |
- self._readScheduled = self.reactor.callLater(0, |
- self._resumeReading) |
- |
- |
- def stopReading(self): |
- if self._readScheduled: |
- self._readScheduled.cancel() |
- self._readScheduled = None |
- self.reading = False |
- |
- |
- def _resumeReading(self): |
- self._readScheduled = None |
- if self._dispatchData() and not self._readScheduledInOS: |
- self.doRead() |
- |
- |
- def _dispatchData(self): |
- """ |
- Dispatch previously read data. Return True if self.reading and we don't |
- have any more data |
- """ |
- if not self._readSize: |
- return self.reading |
- size = self._readSize |
- full_buffers = size // self.readBufferSize |
- while self._readNextBuffer < full_buffers: |
- self.dataReceived(self._readBuffers[self._readNextBuffer]) |
- self._readNextBuffer += 1 |
- if not self.reading: |
- return False |
- remainder = size % self.readBufferSize |
- if remainder: |
- self.dataReceived(buffer(self._readBuffers[full_buffers], |
- 0, remainder)) |
- if self.dynamicReadBuffers: |
- total_buffer_size = self.readBufferSize * len(self._readBuffers) |
- # we have one buffer too many |
- if size < total_buffer_size - self.readBufferSize: |
- del self._readBuffers[-1] |
- # we filled all buffers, so allocate one more |
- elif (size == total_buffer_size and |
- len(self._readBuffers) < self.maxReadBuffers): |
- self._readBuffers.append(_iocp.AllocateReadBuffer( |
- self.readBufferSize)) |
- self._readNextBuffer = 0 |
- self._readSize = 0 |
- return self.reading |
- |
- |
- def _cbRead(self, rc, bytes, evt): |
- self._readScheduledInOS = False |
- if self._handleRead(rc, bytes, evt): |
- self.doRead() |
- |
- |
- def _handleRead(self, rc, bytes, evt): |
- """ |
- Returns False if we should stop reading for now |
- """ |
- if self.disconnected: |
- return False |
- # graceful disconnection |
- if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF): |
- self.reactor.removeActiveHandle(self) |
- self.readConnectionLost(failure.Failure(main.CONNECTION_DONE)) |
- return False |
- # XXX: not handling WSAEWOULDBLOCK |
- # ("too many outstanding overlapped I/O requests") |
- elif rc: |
- self.connectionLost(failure.Failure( |
- error.ConnectionLost("read error -- %s (%s)" % |
- (errno.errorcode.get(rc, 'unknown'), rc)))) |
- return False |
- else: |
- assert self._readSize == 0 |
- assert self._readNextBuffer == 0 |
- self._readSize = bytes |
- return self._dispatchData() |
- |
- |
- def doRead(self): |
- numReads = 0 |
- while 1: |
- evt = _iocp.Event(self._cbRead, self) |
- |
- evt.buff = buff = self._readBuffers |
- rc, bytes = self.readFromHandle(buff, evt) |
- |
- if (rc == ERROR_IO_PENDING |
- or (not rc and numReads >= self.maxReads)): |
- self._readScheduledInOS = True |
- break |
- else: |
- evt.ignore = True |
- if not self._handleRead(rc, bytes, evt): |
- break |
- numReads += 1 |
- |
- |
- def readFromHandle(self, bufflist, evt): |
- raise NotImplementedError() # TODO: this should default to ReadFile |
- |
- |
- def dataReceived(self, data): |
- raise NotImplementedError |
- |
- |
- def readConnectionLost(self, reason): |
- self.connectionLost(reason) |
- |
- |
- # write stuff |
- dataBuffer = '' |
- offset = 0 |
- writing = False |
- _writeScheduled = None |
- _writeDisconnecting = False |
- _writeDisconnected = False |
- writeBufferSize = 2**2**2**2 |
- maxWrites = 5 |
- |
- |
- def loseWriteConnection(self): |
- self._writeDisconnecting = True |
- self.startWriting() |
- |
- |
- def _closeWriteConnection(self): |
- # override in subclasses |
- pass |
- |
- |
- def writeConnectionLost(self, reason): |
- # in current code should never be called |
- self.connectionLost(reason) |
- |
- |
- def startWriting(self): |
- self.reactor.addActiveHandle(self) |
- self.writing = True |
- if not self._writeScheduled: |
- self._writeScheduled = self.reactor.callLater(0, |
- self._resumeWriting) |
- |
- |
- def stopWriting(self): |
- if self._writeScheduled: |
- self._writeScheduled.cancel() |
- self._writeScheduled = None |
- self.writing = False |
- |
- |
- def _resumeWriting(self): |
- self._writeScheduled = None |
- self.doWrite() |
- |
- |
- def _cbWrite(self, rc, bytes, evt): |
- if self._handleWrite(rc, bytes, evt): |
- self.doWrite() |
- |
- |
- def _handleWrite(self, rc, bytes, evt): |
- """ |
- Returns false if we should stop writing for now |
- """ |
- if self.disconnected or self._writeDisconnected: |
- return False |
- # XXX: not handling WSAEWOULDBLOCK |
- # ("too many outstanding overlapped I/O requests") |
- if rc: |
- self.connectionLost(failure.Failure( |
- error.ConnectionLost("write error -- %s (%s)" % |
- (errno.errorcode.get(rc, 'unknown'), rc)))) |
- return False |
- else: |
- self.offset += bytes |
- # If there is nothing left to send, |
- if self.offset == len(self.dataBuffer) and not self._tempDataLen: |
- self.dataBuffer = "" |
- self.offset = 0 |
- # stop writing |
- self.stopWriting() |
- # If I've got a producer who is supposed to supply me with data |
- if self.producer is not None and ((not self.streamingProducer) |
- or self.producerPaused): |
- # tell them to supply some more. |
- self.producerPaused = True |
- self.producer.resumeProducing() |
- elif self.disconnecting: |
- # But if I was previously asked to let the connection die, |
- # do so. |
- self.connectionLost(failure.Failure(main.CONNECTION_DONE)) |
- elif self._writeDisconnecting: |
- # I was previously asked to to half-close the connection. |
- self._closeWriteConnection() |
- self._writeDisconnected = True |
- return False |
- else: |
- return True |
- |
- |
- def doWrite(self): |
- numWrites = 0 |
- while 1: |
- if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: |
- # If there is currently less than SEND_LIMIT bytes left to send |
- # in the string, extend it with the array data. |
- self.dataBuffer = (buffer(self.dataBuffer, self.offset) + |
- "".join(self._tempDataBuffer)) |
- self.offset = 0 |
- self._tempDataBuffer = [] |
- self._tempDataLen = 0 |
- |
- evt = _iocp.Event(self._cbWrite, self) |
- |
- # Send as much data as you can. |
- if self.offset: |
- evt.buff = buff = buffer(self.dataBuffer, self.offset) |
- else: |
- evt.buff = buff = self.dataBuffer |
- rc, bytes = self.writeToHandle(buff, evt) |
- if (rc == ERROR_IO_PENDING |
- or (not rc and numWrites >= self.maxWrites)): |
- break |
- else: |
- evt.ignore = True |
- if not self._handleWrite(rc, bytes, evt): |
- break |
- numWrites += 1 |
- |
- |
- def writeToHandle(self, buff, evt): |
- raise NotImplementedError() # TODO: this should default to WriteFile |
- |
- |
- def write(self, data): |
- """Reliably write some data. |
- |
- The data is buffered until his file descriptor is ready for writing. |
- """ |
- if isinstance(data, unicode): # no, really, I mean it |
- raise TypeError("Data must not be unicode") |
- if not self.connected or self._writeDisconnected: |
- return |
- if data: |
- self._tempDataBuffer.append(data) |
- self._tempDataLen += len(data) |
- if self.producer is not None: |
- if (len(self.dataBuffer) + self._tempDataLen |
- > self.writeBufferSize): |
- self.producerPaused = True |
- self.producer.pauseProducing() |
- self.startWriting() |
- |
- |
- def writeSequence(self, iovec): |
- if not self.connected or not iovec or self._writeDisconnected: |
- return |
- self._tempDataBuffer.extend(iovec) |
- for i in iovec: |
- self._tempDataLen += len(i) |
- if self.producer is not None: |
- if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize: |
- self.producerPaused = True |
- self.producer.pauseProducing() |
- self.startWriting() |
- |
- |
- # general stuff |
- connected = False |
- disconnected = False |
- disconnecting = False |
- logstr = "Uninitialized" |
- |
- SEND_LIMIT = 128*1024 |
- |
- maxReads = 5 |
- |
- |
- def __init__(self, reactor = None): |
- if not reactor: |
- from twisted.internet import reactor |
- self.reactor = reactor |
- self._tempDataBuffer = [] # will be added to dataBuffer in doWrite |
- self._tempDataLen = 0 |
- self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)] |
- |
- |
- def connectionLost(self, reason): |
- """ |
- The connection was lost. |
- |
- This is called when the connection on a selectable object has been |
- lost. It will be called whether the connection was closed explicitly, |
- an exception occurred in an event handler, or the other end of the |
- connection closed it first. |
- |
- Clean up state here, but make sure to call back up to FileDescriptor. |
- """ |
- |
- self.disconnected = True |
- self.connected = False |
- if self.producer is not None: |
- self.producer.stopProducing() |
- self.producer = None |
- self.stopReading() |
- self.stopWriting() |
- self.reactor.removeActiveHandle(self) |
- |
- |
- def getFileHandle(self): |
- return -1 |
- |
- |
- def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): |
- """ |
- Close the connection at the next available opportunity. |
- |
- Call this to cause this FileDescriptor to lose its connection. It will |
- first write any data that it has buffered. |
- |
- If there is data buffered yet to be written, this method will cause the |
- transport to lose its connection as soon as it's done flushing its |
- write buffer. If you have a producer registered, the connection won't |
- be closed until the producer is finished. Therefore, make sure you |
- unregister your producer when it's finished, or the connection will |
- never close. |
- """ |
- |
- if self.connected and not self.disconnecting: |
- if self._writeDisconnected: |
- # doWrite won't trigger the connection close anymore |
- self.stopReading() |
- self.stopWriting |
- self.connectionLost(_connDone) |
- else: |
- self.stopReading() |
- self.startWriting() |
- self.disconnecting = 1 |
- |
- |
- # Producer/consumer implementation |
- |
- producerPaused = False |
- streamingProducer = False |
- |
- # first, the consumer stuff. This requires no additional work, as |
- # any object you can write to can be a consumer, really. |
- |
- producer = None |
- |
- |
- def registerProducer(self, producer, streaming): |
- """ |
- Register to receive data from a producer. |
- |
- This sets this selectable to be a consumer for a producer. When this |
- selectable runs out of data on a write() call, it will ask the producer |
- to resumeProducing(). A producer should implement the IProducer |
- interface. |
- |
- FileDescriptor provides some infrastructure for producer methods. |
- """ |
- if self.producer is not None: |
- raise RuntimeError( |
- "Cannot register producer %s, because producer " |
- "%s was never unregistered." % (producer, self.producer)) |
- if self.disconnected: |
- producer.stopProducing() |
- else: |
- self.producer = producer |
- self.streamingProducer = streaming |
- if not streaming: |
- producer.resumeProducing() |
- |
- |
- def unregisterProducer(self): |
- """ |
- Stop consuming data from a producer, without disconnecting. |
- """ |
- self.producer = None |
- |
- |
- def stopConsuming(self): |
- """ |
- Stop consuming data. |
- |
- This is called when a producer has lost its connection, to tell the |
- consumer to go lose its connection (and break potential circular |
- references). |
- """ |
- self.unregisterProducer() |
- self.loseConnection() |
- |
- |
- # producer interface implementation |
- |
- def resumeProducing(self): |
- assert self.connected and not self.disconnecting |
- self.startReading() |
- |
- |
- def pauseProducing(self): |
- self.stopReading() |
- |
- |
- def stopProducing(self): |
- self.loseConnection() |
- |
- |
-__all__ = ['FileHandle'] |
- |