Index: third_party/twisted_8_1/twisted/protocols/pcp.py |
diff --git a/third_party/twisted_8_1/twisted/protocols/pcp.py b/third_party/twisted_8_1/twisted/protocols/pcp.py |
deleted file mode 100644 |
index 00a1342520ec8e0e9c49404c87b590e83c92c79d..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/protocols/pcp.py |
+++ /dev/null |
@@ -1,209 +0,0 @@ |
-# -*- test-case-name: twisted.test.test_pcp -*- |
-# |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-"""Producer-Consumer Proxy.""" |
- |
-__version__ = '$Revision: 1.4 $'[11:-2] |
- |
-import operator |
- |
-from zope.interface import implements |
- |
-from twisted.internet import interfaces |
- |
- |
-class BasicProducerConsumerProxy: |
- """ I can act as a man in the middle between any Producer and Consumer. |
- |
- @ivar producer: the Producer I subscribe to. |
- @type producer: L{IProducer<interfaces.IProducer>} |
- @ivar consumer: the Consumer I publish to. |
- @type consumer: L{IConsumer<interfaces.IConsumer>} |
- @ivar paused: As a Producer, am I paused? |
- @type paused: bool |
- """ |
- implements(interfaces.IProducer, interfaces.IConsumer) |
- |
- consumer = None |
- producer = None |
- producerIsStreaming = None |
- iAmStreaming = True |
- outstandingPull = False |
- paused = False |
- stopped = False |
- |
- def __init__(self, consumer): |
- self._buffer = [] |
- if consumer is not None: |
- self.consumer = consumer |
- consumer.registerProducer(self, self.iAmStreaming) |
- |
- # Producer methods: |
- |
- def pauseProducing(self): |
- self.paused = True |
- if self.producer: |
- self.producer.pauseProducing() |
- |
- def resumeProducing(self): |
- self.paused = False |
- if self._buffer: |
- # TODO: Check to see if consumer supports writeSeq. |
- self.consumer.write(''.join(self._buffer)) |
- self._buffer[:] = [] |
- else: |
- if not self.iAmStreaming: |
- self.outstandingPull = True |
- |
- if self.producer is not None: |
- self.producer.resumeProducing() |
- |
- def stopProducing(self): |
- if self.producer is not None: |
- self.producer.stopProducing() |
- if self.consumer is not None: |
- del self.consumer |
- |
- # Consumer methods: |
- |
- def write(self, data): |
- if self.paused or (not self.iAmStreaming and not self.outstandingPull): |
- # We could use that fifo queue here. |
- self._buffer.append(data) |
- |
- elif self.consumer is not None: |
- self.consumer.write(data) |
- self.outstandingPull = False |
- |
- def finish(self): |
- if self.consumer is not None: |
- self.consumer.finish() |
- self.unregisterProducer() |
- |
- def registerProducer(self, producer, streaming): |
- self.producer = producer |
- self.producerIsStreaming = streaming |
- |
- def unregisterProducer(self): |
- if self.producer is not None: |
- del self.producer |
- del self.producerIsStreaming |
- if self.consumer: |
- self.consumer.unregisterProducer() |
- |
- def __repr__(self): |
- return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer) |
- |
- |
-class ProducerConsumerProxy(BasicProducerConsumerProxy): |
- """ProducerConsumerProxy with a finite buffer. |
- |
- When my buffer fills up, I have my parent Producer pause until my buffer |
- has room in it again. |
- """ |
- # Copies much from abstract.FileDescriptor |
- bufferSize = 2**2**2**2 |
- |
- producerPaused = False |
- unregistered = False |
- |
- def pauseProducing(self): |
- # Does *not* call up to ProducerConsumerProxy to relay the pause |
- # message through to my parent Producer. |
- self.paused = True |
- |
- def resumeProducing(self): |
- self.paused = False |
- if self._buffer: |
- data = ''.join(self._buffer) |
- bytesSent = self._writeSomeData(data) |
- if bytesSent < len(data): |
- unsent = data[bytesSent:] |
- assert not self.iAmStreaming, ( |
- "Streaming producer did not write all its data.") |
- self._buffer[:] = [unsent] |
- else: |
- self._buffer[:] = [] |
- else: |
- bytesSent = 0 |
- |
- if (self.unregistered and bytesSent and not self._buffer and |
- self.consumer is not None): |
- self.consumer.unregisterProducer() |
- |
- if not self.iAmStreaming: |
- self.outstandingPull = not bytesSent |
- |
- if self.producer is not None: |
- bytesBuffered = reduce(operator.add, |
- [len(s) for s in self._buffer], 0) |
- # TODO: You can see here the potential for high and low |
- # watermarks, where bufferSize would be the high mark when we |
- # ask the upstream producer to pause, and we wouldn't have |
- # it resume again until it hit the low mark. Or if producer |
- # is Pull, maybe we'd like to pull from it as much as necessary |
- # to keep our buffer full to the low mark, so we're never caught |
- # without something to send. |
- if self.producerPaused and (bytesBuffered < self.bufferSize): |
- # Now that our buffer is empty, |
- self.producerPaused = False |
- self.producer.resumeProducing() |
- elif self.outstandingPull: |
- # I did not have any data to write in response to a pull, |
- # so I'd better pull some myself. |
- self.producer.resumeProducing() |
- |
- def write(self, data): |
- if self.paused or (not self.iAmStreaming and not self.outstandingPull): |
- # We could use that fifo queue here. |
- self._buffer.append(data) |
- |
- elif self.consumer is not None: |
- assert not self._buffer, ( |
- "Writing fresh data to consumer before my buffer is empty!") |
- # I'm going to use _writeSomeData here so that there is only one |
- # path to self.consumer.write. But it doesn't actually make sense, |
- # if I am streaming, for some data to not be all data. But maybe I |
- # am not streaming, but I am writing here anyway, because there was |
- # an earlier request for data which was not answered. |
- bytesSent = self._writeSomeData(data) |
- self.outstandingPull = False |
- if not bytesSent == len(data): |
- assert not self.iAmStreaming, ( |
- "Streaming producer did not write all its data.") |
- self._buffer.append(data[bytesSent:]) |
- |
- if (self.producer is not None) and self.producerIsStreaming: |
- bytesBuffered = reduce(operator.add, |
- [len(s) for s in self._buffer], 0) |
- if bytesBuffered >= self.bufferSize: |
- |
- self.producer.pauseProducing() |
- self.producerPaused = True |
- |
- def registerProducer(self, producer, streaming): |
- self.unregistered = False |
- BasicProducerConsumerProxy.registerProducer(self, producer, streaming) |
- if not streaming: |
- producer.resumeProducing() |
- |
- def unregisterProducer(self): |
- if self.producer is not None: |
- del self.producer |
- del self.producerIsStreaming |
- self.unregistered = True |
- if self.consumer and not self._buffer: |
- self.consumer.unregisterProducer() |
- |
- def _writeSomeData(self, data): |
- """Write as much of this data as possible. |
- |
- @returns: The number of bytes written. |
- """ |
- if self.consumer is None: |
- return 0 |
- self.consumer.write(data) |
- return len(data) |