| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_pcp -*- | |
| 2 # | |
| 3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """Producer-Consumer Proxy.""" | |
| 8 | |
| 9 __version__ = '$Revision: 1.4 $'[11:-2] | |
| 10 | |
| 11 import operator | |
| 12 | |
| 13 from zope.interface import implements | |
| 14 | |
| 15 from twisted.internet import interfaces | |
| 16 | |
| 17 | |
| 18 class BasicProducerConsumerProxy: | |
| 19 """ I can act as a man in the middle between any Producer and Consumer. | |
| 20 | |
| 21 @ivar producer: the Producer I subscribe to. | |
| 22 @type producer: L{IProducer<interfaces.IProducer>} | |
| 23 @ivar consumer: the Consumer I publish to. | |
| 24 @type consumer: L{IConsumer<interfaces.IConsumer>} | |
| 25 @ivar paused: As a Producer, am I paused? | |
| 26 @type paused: bool | |
| 27 """ | |
| 28 implements(interfaces.IProducer, interfaces.IConsumer) | |
| 29 | |
| 30 consumer = None | |
| 31 producer = None | |
| 32 producerIsStreaming = None | |
| 33 iAmStreaming = True | |
| 34 outstandingPull = False | |
| 35 paused = False | |
| 36 stopped = False | |
| 37 | |
| 38 def __init__(self, consumer): | |
| 39 self._buffer = [] | |
| 40 if consumer is not None: | |
| 41 self.consumer = consumer | |
| 42 consumer.registerProducer(self, self.iAmStreaming) | |
| 43 | |
| 44 # Producer methods: | |
| 45 | |
| 46 def pauseProducing(self): | |
| 47 self.paused = True | |
| 48 if self.producer: | |
| 49 self.producer.pauseProducing() | |
| 50 | |
| 51 def resumeProducing(self): | |
| 52 self.paused = False | |
| 53 if self._buffer: | |
| 54 # TODO: Check to see if consumer supports writeSeq. | |
| 55 self.consumer.write(''.join(self._buffer)) | |
| 56 self._buffer[:] = [] | |
| 57 else: | |
| 58 if not self.iAmStreaming: | |
| 59 self.outstandingPull = True | |
| 60 | |
| 61 if self.producer is not None: | |
| 62 self.producer.resumeProducing() | |
| 63 | |
| 64 def stopProducing(self): | |
| 65 if self.producer is not None: | |
| 66 self.producer.stopProducing() | |
| 67 if self.consumer is not None: | |
| 68 del self.consumer | |
| 69 | |
| 70 # Consumer methods: | |
| 71 | |
| 72 def write(self, data): | |
| 73 if self.paused or (not self.iAmStreaming and not self.outstandingPull): | |
| 74 # We could use that fifo queue here. | |
| 75 self._buffer.append(data) | |
| 76 | |
| 77 elif self.consumer is not None: | |
| 78 self.consumer.write(data) | |
| 79 self.outstandingPull = False | |
| 80 | |
| 81 def finish(self): | |
| 82 if self.consumer is not None: | |
| 83 self.consumer.finish() | |
| 84 self.unregisterProducer() | |
| 85 | |
| 86 def registerProducer(self, producer, streaming): | |
| 87 self.producer = producer | |
| 88 self.producerIsStreaming = streaming | |
| 89 | |
| 90 def unregisterProducer(self): | |
| 91 if self.producer is not None: | |
| 92 del self.producer | |
| 93 del self.producerIsStreaming | |
| 94 if self.consumer: | |
| 95 self.consumer.unregisterProducer() | |
| 96 | |
| 97 def __repr__(self): | |
| 98 return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer) | |
| 99 | |
| 100 | |
| 101 class ProducerConsumerProxy(BasicProducerConsumerProxy): | |
| 102 """ProducerConsumerProxy with a finite buffer. | |
| 103 | |
| 104 When my buffer fills up, I have my parent Producer pause until my buffer | |
| 105 has room in it again. | |
| 106 """ | |
| 107 # Copies much from abstract.FileDescriptor | |
| 108 bufferSize = 2**2**2**2 | |
| 109 | |
| 110 producerPaused = False | |
| 111 unregistered = False | |
| 112 | |
| 113 def pauseProducing(self): | |
| 114 # Does *not* call up to ProducerConsumerProxy to relay the pause | |
| 115 # message through to my parent Producer. | |
| 116 self.paused = True | |
| 117 | |
| 118 def resumeProducing(self): | |
| 119 self.paused = False | |
| 120 if self._buffer: | |
| 121 data = ''.join(self._buffer) | |
| 122 bytesSent = self._writeSomeData(data) | |
| 123 if bytesSent < len(data): | |
| 124 unsent = data[bytesSent:] | |
| 125 assert not self.iAmStreaming, ( | |
| 126 "Streaming producer did not write all its data.") | |
| 127 self._buffer[:] = [unsent] | |
| 128 else: | |
| 129 self._buffer[:] = [] | |
| 130 else: | |
| 131 bytesSent = 0 | |
| 132 | |
| 133 if (self.unregistered and bytesSent and not self._buffer and | |
| 134 self.consumer is not None): | |
| 135 self.consumer.unregisterProducer() | |
| 136 | |
| 137 if not self.iAmStreaming: | |
| 138 self.outstandingPull = not bytesSent | |
| 139 | |
| 140 if self.producer is not None: | |
| 141 bytesBuffered = reduce(operator.add, | |
| 142 [len(s) for s in self._buffer], 0) | |
| 143 # TODO: You can see here the potential for high and low | |
| 144 # watermarks, where bufferSize would be the high mark when we | |
| 145 # ask the upstream producer to pause, and we wouldn't have | |
| 146 # it resume again until it hit the low mark. Or if producer | |
| 147 # is Pull, maybe we'd like to pull from it as much as necessary | |
| 148 # to keep our buffer full to the low mark, so we're never caught | |
| 149 # without something to send. | |
| 150 if self.producerPaused and (bytesBuffered < self.bufferSize): | |
| 151 # Now that our buffer is empty, | |
| 152 self.producerPaused = False | |
| 153 self.producer.resumeProducing() | |
| 154 elif self.outstandingPull: | |
| 155 # I did not have any data to write in response to a pull, | |
| 156 # so I'd better pull some myself. | |
| 157 self.producer.resumeProducing() | |
| 158 | |
| 159 def write(self, data): | |
| 160 if self.paused or (not self.iAmStreaming and not self.outstandingPull): | |
| 161 # We could use that fifo queue here. | |
| 162 self._buffer.append(data) | |
| 163 | |
| 164 elif self.consumer is not None: | |
| 165 assert not self._buffer, ( | |
| 166 "Writing fresh data to consumer before my buffer is empty!") | |
| 167 # I'm going to use _writeSomeData here so that there is only one | |
| 168 # path to self.consumer.write. But it doesn't actually make sense, | |
| 169 # if I am streaming, for some data to not be all data. But maybe I | |
| 170 # am not streaming, but I am writing here anyway, because there was | |
| 171 # an earlier request for data which was not answered. | |
| 172 bytesSent = self._writeSomeData(data) | |
| 173 self.outstandingPull = False | |
| 174 if not bytesSent == len(data): | |
| 175 assert not self.iAmStreaming, ( | |
| 176 "Streaming producer did not write all its data.") | |
| 177 self._buffer.append(data[bytesSent:]) | |
| 178 | |
| 179 if (self.producer is not None) and self.producerIsStreaming: | |
| 180 bytesBuffered = reduce(operator.add, | |
| 181 [len(s) for s in self._buffer], 0) | |
| 182 if bytesBuffered >= self.bufferSize: | |
| 183 | |
| 184 self.producer.pauseProducing() | |
| 185 self.producerPaused = True | |
| 186 | |
| 187 def registerProducer(self, producer, streaming): | |
| 188 self.unregistered = False | |
| 189 BasicProducerConsumerProxy.registerProducer(self, producer, streaming) | |
| 190 if not streaming: | |
| 191 producer.resumeProducing() | |
| 192 | |
| 193 def unregisterProducer(self): | |
| 194 if self.producer is not None: | |
| 195 del self.producer | |
| 196 del self.producerIsStreaming | |
| 197 self.unregistered = True | |
| 198 if self.consumer and not self._buffer: | |
| 199 self.consumer.unregisterProducer() | |
| 200 | |
| 201 def _writeSomeData(self, data): | |
| 202 """Write as much of this data as possible. | |
| 203 | |
| 204 @returns: The number of bytes written. | |
| 205 """ | |
| 206 if self.consumer is None: | |
| 207 return 0 | |
| 208 self.consumer.write(data) | |
| 209 return len(data) | |
| OLD | NEW |