OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_pcp -*- | |
2 # | |
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """Producer-Consumer Proxy.""" | |
8 | |
9 __version__ = '$Revision: 1.4 $'[11:-2] | |
10 | |
11 import operator | |
12 | |
13 from zope.interface import implements | |
14 | |
15 from twisted.internet import interfaces | |
16 | |
17 | |
18 class BasicProducerConsumerProxy: | |
19 """ I can act as a man in the middle between any Producer and Consumer. | |
20 | |
21 @ivar producer: the Producer I subscribe to. | |
22 @type producer: L{IProducer<interfaces.IProducer>} | |
23 @ivar consumer: the Consumer I publish to. | |
24 @type consumer: L{IConsumer<interfaces.IConsumer>} | |
25 @ivar paused: As a Producer, am I paused? | |
26 @type paused: bool | |
27 """ | |
28 implements(interfaces.IProducer, interfaces.IConsumer) | |
29 | |
30 consumer = None | |
31 producer = None | |
32 producerIsStreaming = None | |
33 iAmStreaming = True | |
34 outstandingPull = False | |
35 paused = False | |
36 stopped = False | |
37 | |
38 def __init__(self, consumer): | |
39 self._buffer = [] | |
40 if consumer is not None: | |
41 self.consumer = consumer | |
42 consumer.registerProducer(self, self.iAmStreaming) | |
43 | |
44 # Producer methods: | |
45 | |
46 def pauseProducing(self): | |
47 self.paused = True | |
48 if self.producer: | |
49 self.producer.pauseProducing() | |
50 | |
51 def resumeProducing(self): | |
52 self.paused = False | |
53 if self._buffer: | |
54 # TODO: Check to see if consumer supports writeSeq. | |
55 self.consumer.write(''.join(self._buffer)) | |
56 self._buffer[:] = [] | |
57 else: | |
58 if not self.iAmStreaming: | |
59 self.outstandingPull = True | |
60 | |
61 if self.producer is not None: | |
62 self.producer.resumeProducing() | |
63 | |
64 def stopProducing(self): | |
65 if self.producer is not None: | |
66 self.producer.stopProducing() | |
67 if self.consumer is not None: | |
68 del self.consumer | |
69 | |
70 # Consumer methods: | |
71 | |
72 def write(self, data): | |
73 if self.paused or (not self.iAmStreaming and not self.outstandingPull): | |
74 # We could use that fifo queue here. | |
75 self._buffer.append(data) | |
76 | |
77 elif self.consumer is not None: | |
78 self.consumer.write(data) | |
79 self.outstandingPull = False | |
80 | |
81 def finish(self): | |
82 if self.consumer is not None: | |
83 self.consumer.finish() | |
84 self.unregisterProducer() | |
85 | |
86 def registerProducer(self, producer, streaming): | |
87 self.producer = producer | |
88 self.producerIsStreaming = streaming | |
89 | |
90 def unregisterProducer(self): | |
91 if self.producer is not None: | |
92 del self.producer | |
93 del self.producerIsStreaming | |
94 if self.consumer: | |
95 self.consumer.unregisterProducer() | |
96 | |
97 def __repr__(self): | |
98 return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer) | |
99 | |
100 | |
101 class ProducerConsumerProxy(BasicProducerConsumerProxy): | |
102 """ProducerConsumerProxy with a finite buffer. | |
103 | |
104 When my buffer fills up, I have my parent Producer pause until my buffer | |
105 has room in it again. | |
106 """ | |
107 # Copies much from abstract.FileDescriptor | |
108 bufferSize = 2**2**2**2 | |
109 | |
110 producerPaused = False | |
111 unregistered = False | |
112 | |
113 def pauseProducing(self): | |
114 # Does *not* call up to ProducerConsumerProxy to relay the pause | |
115 # message through to my parent Producer. | |
116 self.paused = True | |
117 | |
118 def resumeProducing(self): | |
119 self.paused = False | |
120 if self._buffer: | |
121 data = ''.join(self._buffer) | |
122 bytesSent = self._writeSomeData(data) | |
123 if bytesSent < len(data): | |
124 unsent = data[bytesSent:] | |
125 assert not self.iAmStreaming, ( | |
126 "Streaming producer did not write all its data.") | |
127 self._buffer[:] = [unsent] | |
128 else: | |
129 self._buffer[:] = [] | |
130 else: | |
131 bytesSent = 0 | |
132 | |
133 if (self.unregistered and bytesSent and not self._buffer and | |
134 self.consumer is not None): | |
135 self.consumer.unregisterProducer() | |
136 | |
137 if not self.iAmStreaming: | |
138 self.outstandingPull = not bytesSent | |
139 | |
140 if self.producer is not None: | |
141 bytesBuffered = reduce(operator.add, | |
142 [len(s) for s in self._buffer], 0) | |
143 # TODO: You can see here the potential for high and low | |
144 # watermarks, where bufferSize would be the high mark when we | |
145 # ask the upstream producer to pause, and we wouldn't have | |
146 # it resume again until it hit the low mark. Or if producer | |
147 # is Pull, maybe we'd like to pull from it as much as necessary | |
148 # to keep our buffer full to the low mark, so we're never caught | |
149 # without something to send. | |
150 if self.producerPaused and (bytesBuffered < self.bufferSize): | |
151 # Now that our buffer is empty, | |
152 self.producerPaused = False | |
153 self.producer.resumeProducing() | |
154 elif self.outstandingPull: | |
155 # I did not have any data to write in response to a pull, | |
156 # so I'd better pull some myself. | |
157 self.producer.resumeProducing() | |
158 | |
159 def write(self, data): | |
160 if self.paused or (not self.iAmStreaming and not self.outstandingPull): | |
161 # We could use that fifo queue here. | |
162 self._buffer.append(data) | |
163 | |
164 elif self.consumer is not None: | |
165 assert not self._buffer, ( | |
166 "Writing fresh data to consumer before my buffer is empty!") | |
167 # I'm going to use _writeSomeData here so that there is only one | |
168 # path to self.consumer.write. But it doesn't actually make sense, | |
169 # if I am streaming, for some data to not be all data. But maybe I | |
170 # am not streaming, but I am writing here anyway, because there was | |
171 # an earlier request for data which was not answered. | |
172 bytesSent = self._writeSomeData(data) | |
173 self.outstandingPull = False | |
174 if not bytesSent == len(data): | |
175 assert not self.iAmStreaming, ( | |
176 "Streaming producer did not write all its data.") | |
177 self._buffer.append(data[bytesSent:]) | |
178 | |
179 if (self.producer is not None) and self.producerIsStreaming: | |
180 bytesBuffered = reduce(operator.add, | |
181 [len(s) for s in self._buffer], 0) | |
182 if bytesBuffered >= self.bufferSize: | |
183 | |
184 self.producer.pauseProducing() | |
185 self.producerPaused = True | |
186 | |
187 def registerProducer(self, producer, streaming): | |
188 self.unregistered = False | |
189 BasicProducerConsumerProxy.registerProducer(self, producer, streaming) | |
190 if not streaming: | |
191 producer.resumeProducing() | |
192 | |
193 def unregisterProducer(self): | |
194 if self.producer is not None: | |
195 del self.producer | |
196 del self.producerIsStreaming | |
197 self.unregistered = True | |
198 if self.consumer and not self._buffer: | |
199 self.consumer.unregisterProducer() | |
200 | |
201 def _writeSomeData(self, data): | |
202 """Write as much of this data as possible. | |
203 | |
204 @returns: The number of bytes written. | |
205 """ | |
206 if self.consumer is None: | |
207 return 0 | |
208 self.consumer.write(data) | |
209 return len(data) | |
OLD | NEW |