Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Unified Diff: third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py
diff --git a/third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py b/third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py
deleted file mode 100644
index c1838a51b3df95a8eb99a67581e973cbbaae402a..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py
+++ /dev/null
@@ -1,456 +0,0 @@
-# Copyright (c) 2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-Abstract file handle class
-"""
-
-from twisted.internet import main, error, interfaces
-from twisted.python import log, failure
-from twisted.persisted import styles
-
-from zope.interface import implements
-import errno
-
-from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
-from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
-from twisted.internet.iocpreactor import iocpsupport as _iocp
-
-
-
-class FileHandle(log.Logger, styles.Ephemeral, object):
- """
- File handle that can read and write asynchronously
- """
- implements(interfaces.IProducer, interfaces.IConsumer,
- interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
- # read stuff
- maxReadBuffers = 16
- readBufferSize = 4096
- reading = False
- dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
- _readNextBuffer = 0
- _readSize = 0 # how much data we have in the read buffer
- _readScheduled = None
- _readScheduledInOS = False
-
-
- def startReading(self):
- self.reactor.addActiveHandle(self)
- if not self._readScheduled and not self.reading:
- self.reading = True
- self._readScheduled = self.reactor.callLater(0,
- self._resumeReading)
-
-
- def stopReading(self):
- if self._readScheduled:
- self._readScheduled.cancel()
- self._readScheduled = None
- self.reading = False
-
-
- def _resumeReading(self):
- self._readScheduled = None
- if self._dispatchData() and not self._readScheduledInOS:
- self.doRead()
-
-
- def _dispatchData(self):
- """
- Dispatch previously read data. Return True if self.reading and we don't
- have any more data
- """
- if not self._readSize:
- return self.reading
- size = self._readSize
- full_buffers = size // self.readBufferSize
- while self._readNextBuffer < full_buffers:
- self.dataReceived(self._readBuffers[self._readNextBuffer])
- self._readNextBuffer += 1
- if not self.reading:
- return False
- remainder = size % self.readBufferSize
- if remainder:
- self.dataReceived(buffer(self._readBuffers[full_buffers],
- 0, remainder))
- if self.dynamicReadBuffers:
- total_buffer_size = self.readBufferSize * len(self._readBuffers)
- # we have one buffer too many
- if size < total_buffer_size - self.readBufferSize:
- del self._readBuffers[-1]
- # we filled all buffers, so allocate one more
- elif (size == total_buffer_size and
- len(self._readBuffers) < self.maxReadBuffers):
- self._readBuffers.append(_iocp.AllocateReadBuffer(
- self.readBufferSize))
- self._readNextBuffer = 0
- self._readSize = 0
- return self.reading
-
-
- def _cbRead(self, rc, bytes, evt):
- self._readScheduledInOS = False
- if self._handleRead(rc, bytes, evt):
- self.doRead()
-
-
- def _handleRead(self, rc, bytes, evt):
- """
- Returns False if we should stop reading for now
- """
- if self.disconnected:
- return False
- # graceful disconnection
- if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
- self.reactor.removeActiveHandle(self)
- self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
- return False
- # XXX: not handling WSAEWOULDBLOCK
- # ("too many outstanding overlapped I/O requests")
- elif rc:
- self.connectionLost(failure.Failure(
- error.ConnectionLost("read error -- %s (%s)" %
- (errno.errorcode.get(rc, 'unknown'), rc))))
- return False
- else:
- assert self._readSize == 0
- assert self._readNextBuffer == 0
- self._readSize = bytes
- return self._dispatchData()
-
-
- def doRead(self):
- numReads = 0
- while 1:
- evt = _iocp.Event(self._cbRead, self)
-
- evt.buff = buff = self._readBuffers
- rc, bytes = self.readFromHandle(buff, evt)
-
- if (rc == ERROR_IO_PENDING
- or (not rc and numReads >= self.maxReads)):
- self._readScheduledInOS = True
- break
- else:
- evt.ignore = True
- if not self._handleRead(rc, bytes, evt):
- break
- numReads += 1
-
-
- def readFromHandle(self, bufflist, evt):
- raise NotImplementedError() # TODO: this should default to ReadFile
-
-
- def dataReceived(self, data):
- raise NotImplementedError
-
-
- def readConnectionLost(self, reason):
- self.connectionLost(reason)
-
-
- # write stuff
- dataBuffer = ''
- offset = 0
- writing = False
- _writeScheduled = None
- _writeDisconnecting = False
- _writeDisconnected = False
- writeBufferSize = 2**2**2**2
- maxWrites = 5
-
-
- def loseWriteConnection(self):
- self._writeDisconnecting = True
- self.startWriting()
-
-
- def _closeWriteConnection(self):
- # override in subclasses
- pass
-
-
- def writeConnectionLost(self, reason):
- # in current code should never be called
- self.connectionLost(reason)
-
-
- def startWriting(self):
- self.reactor.addActiveHandle(self)
- self.writing = True
- if not self._writeScheduled:
- self._writeScheduled = self.reactor.callLater(0,
- self._resumeWriting)
-
-
- def stopWriting(self):
- if self._writeScheduled:
- self._writeScheduled.cancel()
- self._writeScheduled = None
- self.writing = False
-
-
- def _resumeWriting(self):
- self._writeScheduled = None
- self.doWrite()
-
-
- def _cbWrite(self, rc, bytes, evt):
- if self._handleWrite(rc, bytes, evt):
- self.doWrite()
-
-
- def _handleWrite(self, rc, bytes, evt):
- """
- Returns false if we should stop writing for now
- """
- if self.disconnected or self._writeDisconnected:
- return False
- # XXX: not handling WSAEWOULDBLOCK
- # ("too many outstanding overlapped I/O requests")
- if rc:
- self.connectionLost(failure.Failure(
- error.ConnectionLost("write error -- %s (%s)" %
- (errno.errorcode.get(rc, 'unknown'), rc))))
- return False
- else:
- self.offset += bytes
- # If there is nothing left to send,
- if self.offset == len(self.dataBuffer) and not self._tempDataLen:
- self.dataBuffer = ""
- self.offset = 0
- # stop writing
- self.stopWriting()
- # If I've got a producer who is supposed to supply me with data
- if self.producer is not None and ((not self.streamingProducer)
- or self.producerPaused):
- # tell them to supply some more.
- self.producerPaused = True
- self.producer.resumeProducing()
- elif self.disconnecting:
- # But if I was previously asked to let the connection die,
- # do so.
- self.connectionLost(failure.Failure(main.CONNECTION_DONE))
- elif self._writeDisconnecting:
- # I was previously asked to to half-close the connection.
- self._closeWriteConnection()
- self._writeDisconnected = True
- return False
- else:
- return True
-
-
- def doWrite(self):
- numWrites = 0
- while 1:
- if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
- # If there is currently less than SEND_LIMIT bytes left to send
- # in the string, extend it with the array data.
- self.dataBuffer = (buffer(self.dataBuffer, self.offset) +
- "".join(self._tempDataBuffer))
- self.offset = 0
- self._tempDataBuffer = []
- self._tempDataLen = 0
-
- evt = _iocp.Event(self._cbWrite, self)
-
- # Send as much data as you can.
- if self.offset:
- evt.buff = buff = buffer(self.dataBuffer, self.offset)
- else:
- evt.buff = buff = self.dataBuffer
- rc, bytes = self.writeToHandle(buff, evt)
- if (rc == ERROR_IO_PENDING
- or (not rc and numWrites >= self.maxWrites)):
- break
- else:
- evt.ignore = True
- if not self._handleWrite(rc, bytes, evt):
- break
- numWrites += 1
-
-
- def writeToHandle(self, buff, evt):
- raise NotImplementedError() # TODO: this should default to WriteFile
-
-
- def write(self, data):
- """Reliably write some data.
-
- The data is buffered until his file descriptor is ready for writing.
- """
- if isinstance(data, unicode): # no, really, I mean it
- raise TypeError("Data must not be unicode")
- if not self.connected or self._writeDisconnected:
- return
- if data:
- self._tempDataBuffer.append(data)
- self._tempDataLen += len(data)
- if self.producer is not None:
- if (len(self.dataBuffer) + self._tempDataLen
- > self.writeBufferSize):
- self.producerPaused = True
- self.producer.pauseProducing()
- self.startWriting()
-
-
- def writeSequence(self, iovec):
- if not self.connected or not iovec or self._writeDisconnected:
- return
- self._tempDataBuffer.extend(iovec)
- for i in iovec:
- self._tempDataLen += len(i)
- if self.producer is not None:
- if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
- self.producerPaused = True
- self.producer.pauseProducing()
- self.startWriting()
-
-
- # general stuff
- connected = False
- disconnected = False
- disconnecting = False
- logstr = "Uninitialized"
-
- SEND_LIMIT = 128*1024
-
- maxReads = 5
-
-
- def __init__(self, reactor = None):
- if not reactor:
- from twisted.internet import reactor
- self.reactor = reactor
- self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
- self._tempDataLen = 0
- self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)]
-
-
- def connectionLost(self, reason):
- """
- The connection was lost.
-
- This is called when the connection on a selectable object has been
- lost. It will be called whether the connection was closed explicitly,
- an exception occurred in an event handler, or the other end of the
- connection closed it first.
-
- Clean up state here, but make sure to call back up to FileDescriptor.
- """
-
- self.disconnected = True
- self.connected = False
- if self.producer is not None:
- self.producer.stopProducing()
- self.producer = None
- self.stopReading()
- self.stopWriting()
- self.reactor.removeActiveHandle(self)
-
-
- def getFileHandle(self):
- return -1
-
-
- def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
- """
- Close the connection at the next available opportunity.
-
- Call this to cause this FileDescriptor to lose its connection. It will
- first write any data that it has buffered.
-
- If there is data buffered yet to be written, this method will cause the
- transport to lose its connection as soon as it's done flushing its
- write buffer. If you have a producer registered, the connection won't
- be closed until the producer is finished. Therefore, make sure you
- unregister your producer when it's finished, or the connection will
- never close.
- """
-
- if self.connected and not self.disconnecting:
- if self._writeDisconnected:
- # doWrite won't trigger the connection close anymore
- self.stopReading()
- self.stopWriting
- self.connectionLost(_connDone)
- else:
- self.stopReading()
- self.startWriting()
- self.disconnecting = 1
-
-
- # Producer/consumer implementation
-
- producerPaused = False
- streamingProducer = False
-
- # first, the consumer stuff. This requires no additional work, as
- # any object you can write to can be a consumer, really.
-
- producer = None
-
-
- def registerProducer(self, producer, streaming):
- """
- Register to receive data from a producer.
-
- This sets this selectable to be a consumer for a producer. When this
- selectable runs out of data on a write() call, it will ask the producer
- to resumeProducing(). A producer should implement the IProducer
- interface.
-
- FileDescriptor provides some infrastructure for producer methods.
- """
- if self.producer is not None:
- raise RuntimeError(
- "Cannot register producer %s, because producer "
- "%s was never unregistered." % (producer, self.producer))
- if self.disconnected:
- producer.stopProducing()
- else:
- self.producer = producer
- self.streamingProducer = streaming
- if not streaming:
- producer.resumeProducing()
-
-
- def unregisterProducer(self):
- """
- Stop consuming data from a producer, without disconnecting.
- """
- self.producer = None
-
-
- def stopConsuming(self):
- """
- Stop consuming data.
-
- This is called when a producer has lost its connection, to tell the
- consumer to go lose its connection (and break potential circular
- references).
- """
- self.unregisterProducer()
- self.loseConnection()
-
-
- # producer interface implementation
-
- def resumeProducing(self):
- assert self.connected and not self.disconnecting
- self.startReading()
-
-
- def pauseProducing(self):
- self.stopReading()
-
-
- def stopProducing(self):
- self.loseConnection()
-
-
-__all__ = ['FileHandle']
-

Powered by Google App Engine
This is Rietveld 408576698