Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(166)

Unified Diff: third_party/twisted_8_1/twisted/protocols/pcp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/protocols/pcp.py
diff --git a/third_party/twisted_8_1/twisted/protocols/pcp.py b/third_party/twisted_8_1/twisted/protocols/pcp.py
deleted file mode 100644
index 00a1342520ec8e0e9c49404c87b590e83c92c79d..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/protocols/pcp.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# -*- test-case-name: twisted.test.test_pcp -*-
-#
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""Producer-Consumer Proxy."""
-
-__version__ = '$Revision: 1.4 $'[11:-2]
-
-import operator
-
-from zope.interface import implements
-
-from twisted.internet import interfaces
-
-
-class BasicProducerConsumerProxy:
- """ I can act as a man in the middle between any Producer and Consumer.
-
- @ivar producer: the Producer I subscribe to.
- @type producer: L{IProducer<interfaces.IProducer>}
- @ivar consumer: the Consumer I publish to.
- @type consumer: L{IConsumer<interfaces.IConsumer>}
- @ivar paused: As a Producer, am I paused?
- @type paused: bool
- """
- implements(interfaces.IProducer, interfaces.IConsumer)
-
- consumer = None
- producer = None
- producerIsStreaming = None
- iAmStreaming = True
- outstandingPull = False
- paused = False
- stopped = False
-
- def __init__(self, consumer):
- self._buffer = []
- if consumer is not None:
- self.consumer = consumer
- consumer.registerProducer(self, self.iAmStreaming)
-
- # Producer methods:
-
- def pauseProducing(self):
- self.paused = True
- if self.producer:
- self.producer.pauseProducing()
-
- def resumeProducing(self):
- self.paused = False
- if self._buffer:
- # TODO: Check to see if consumer supports writeSeq.
- self.consumer.write(''.join(self._buffer))
- self._buffer[:] = []
- else:
- if not self.iAmStreaming:
- self.outstandingPull = True
-
- if self.producer is not None:
- self.producer.resumeProducing()
-
- def stopProducing(self):
- if self.producer is not None:
- self.producer.stopProducing()
- if self.consumer is not None:
- del self.consumer
-
- # Consumer methods:
-
- def write(self, data):
- if self.paused or (not self.iAmStreaming and not self.outstandingPull):
- # We could use that fifo queue here.
- self._buffer.append(data)
-
- elif self.consumer is not None:
- self.consumer.write(data)
- self.outstandingPull = False
-
- def finish(self):
- if self.consumer is not None:
- self.consumer.finish()
- self.unregisterProducer()
-
- def registerProducer(self, producer, streaming):
- self.producer = producer
- self.producerIsStreaming = streaming
-
- def unregisterProducer(self):
- if self.producer is not None:
- del self.producer
- del self.producerIsStreaming
- if self.consumer:
- self.consumer.unregisterProducer()
-
- def __repr__(self):
- return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
-
-
-class ProducerConsumerProxy(BasicProducerConsumerProxy):
- """ProducerConsumerProxy with a finite buffer.
-
- When my buffer fills up, I have my parent Producer pause until my buffer
- has room in it again.
- """
- # Copies much from abstract.FileDescriptor
- bufferSize = 2**2**2**2
-
- producerPaused = False
- unregistered = False
-
- def pauseProducing(self):
- # Does *not* call up to ProducerConsumerProxy to relay the pause
- # message through to my parent Producer.
- self.paused = True
-
- def resumeProducing(self):
- self.paused = False
- if self._buffer:
- data = ''.join(self._buffer)
- bytesSent = self._writeSomeData(data)
- if bytesSent < len(data):
- unsent = data[bytesSent:]
- assert not self.iAmStreaming, (
- "Streaming producer did not write all its data.")
- self._buffer[:] = [unsent]
- else:
- self._buffer[:] = []
- else:
- bytesSent = 0
-
- if (self.unregistered and bytesSent and not self._buffer and
- self.consumer is not None):
- self.consumer.unregisterProducer()
-
- if not self.iAmStreaming:
- self.outstandingPull = not bytesSent
-
- if self.producer is not None:
- bytesBuffered = reduce(operator.add,
- [len(s) for s in self._buffer], 0)
- # TODO: You can see here the potential for high and low
- # watermarks, where bufferSize would be the high mark when we
- # ask the upstream producer to pause, and we wouldn't have
- # it resume again until it hit the low mark. Or if producer
- # is Pull, maybe we'd like to pull from it as much as necessary
- # to keep our buffer full to the low mark, so we're never caught
- # without something to send.
- if self.producerPaused and (bytesBuffered < self.bufferSize):
- # Now that our buffer is empty,
- self.producerPaused = False
- self.producer.resumeProducing()
- elif self.outstandingPull:
- # I did not have any data to write in response to a pull,
- # so I'd better pull some myself.
- self.producer.resumeProducing()
-
- def write(self, data):
- if self.paused or (not self.iAmStreaming and not self.outstandingPull):
- # We could use that fifo queue here.
- self._buffer.append(data)
-
- elif self.consumer is not None:
- assert not self._buffer, (
- "Writing fresh data to consumer before my buffer is empty!")
- # I'm going to use _writeSomeData here so that there is only one
- # path to self.consumer.write. But it doesn't actually make sense,
- # if I am streaming, for some data to not be all data. But maybe I
- # am not streaming, but I am writing here anyway, because there was
- # an earlier request for data which was not answered.
- bytesSent = self._writeSomeData(data)
- self.outstandingPull = False
- if not bytesSent == len(data):
- assert not self.iAmStreaming, (
- "Streaming producer did not write all its data.")
- self._buffer.append(data[bytesSent:])
-
- if (self.producer is not None) and self.producerIsStreaming:
- bytesBuffered = reduce(operator.add,
- [len(s) for s in self._buffer], 0)
- if bytesBuffered >= self.bufferSize:
-
- self.producer.pauseProducing()
- self.producerPaused = True
-
- def registerProducer(self, producer, streaming):
- self.unregistered = False
- BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
- if not streaming:
- producer.resumeProducing()
-
- def unregisterProducer(self):
- if self.producer is not None:
- del self.producer
- del self.producerIsStreaming
- self.unregistered = True
- if self.consumer and not self._buffer:
- self.consumer.unregisterProducer()
-
- def _writeSomeData(self, data):
- """Write as much of this data as possible.
-
- @returns: The number of bytes written.
- """
- if self.consumer is None:
- return 0
- self.consumer.write(data)
- return len(data)
« no previous file with comments | « third_party/twisted_8_1/twisted/protocols/oscar.py ('k') | third_party/twisted_8_1/twisted/protocols/policies.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698