Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(39)

Side by Side Diff: third_party/twisted_8_1/twisted/protocols/pcp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_pcp -*-
2 #
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 """Producer-Consumer Proxy."""
8
9 __version__ = '$Revision: 1.4 $'[11:-2]
10
11 import operator
12
13 from zope.interface import implements
14
15 from twisted.internet import interfaces
16
17
18 class BasicProducerConsumerProxy:
19 """ I can act as a man in the middle between any Producer and Consumer.
20
21 @ivar producer: the Producer I subscribe to.
22 @type producer: L{IProducer<interfaces.IProducer>}
23 @ivar consumer: the Consumer I publish to.
24 @type consumer: L{IConsumer<interfaces.IConsumer>}
25 @ivar paused: As a Producer, am I paused?
26 @type paused: bool
27 """
28 implements(interfaces.IProducer, interfaces.IConsumer)
29
30 consumer = None
31 producer = None
32 producerIsStreaming = None
33 iAmStreaming = True
34 outstandingPull = False
35 paused = False
36 stopped = False
37
38 def __init__(self, consumer):
39 self._buffer = []
40 if consumer is not None:
41 self.consumer = consumer
42 consumer.registerProducer(self, self.iAmStreaming)
43
44 # Producer methods:
45
46 def pauseProducing(self):
47 self.paused = True
48 if self.producer:
49 self.producer.pauseProducing()
50
51 def resumeProducing(self):
52 self.paused = False
53 if self._buffer:
54 # TODO: Check to see if consumer supports writeSeq.
55 self.consumer.write(''.join(self._buffer))
56 self._buffer[:] = []
57 else:
58 if not self.iAmStreaming:
59 self.outstandingPull = True
60
61 if self.producer is not None:
62 self.producer.resumeProducing()
63
64 def stopProducing(self):
65 if self.producer is not None:
66 self.producer.stopProducing()
67 if self.consumer is not None:
68 del self.consumer
69
70 # Consumer methods:
71
72 def write(self, data):
73 if self.paused or (not self.iAmStreaming and not self.outstandingPull):
74 # We could use that fifo queue here.
75 self._buffer.append(data)
76
77 elif self.consumer is not None:
78 self.consumer.write(data)
79 self.outstandingPull = False
80
81 def finish(self):
82 if self.consumer is not None:
83 self.consumer.finish()
84 self.unregisterProducer()
85
86 def registerProducer(self, producer, streaming):
87 self.producer = producer
88 self.producerIsStreaming = streaming
89
90 def unregisterProducer(self):
91 if self.producer is not None:
92 del self.producer
93 del self.producerIsStreaming
94 if self.consumer:
95 self.consumer.unregisterProducer()
96
97 def __repr__(self):
98 return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
99
100
101 class ProducerConsumerProxy(BasicProducerConsumerProxy):
102 """ProducerConsumerProxy with a finite buffer.
103
104 When my buffer fills up, I have my parent Producer pause until my buffer
105 has room in it again.
106 """
107 # Copies much from abstract.FileDescriptor
108 bufferSize = 2**2**2**2
109
110 producerPaused = False
111 unregistered = False
112
113 def pauseProducing(self):
114 # Does *not* call up to ProducerConsumerProxy to relay the pause
115 # message through to my parent Producer.
116 self.paused = True
117
118 def resumeProducing(self):
119 self.paused = False
120 if self._buffer:
121 data = ''.join(self._buffer)
122 bytesSent = self._writeSomeData(data)
123 if bytesSent < len(data):
124 unsent = data[bytesSent:]
125 assert not self.iAmStreaming, (
126 "Streaming producer did not write all its data.")
127 self._buffer[:] = [unsent]
128 else:
129 self._buffer[:] = []
130 else:
131 bytesSent = 0
132
133 if (self.unregistered and bytesSent and not self._buffer and
134 self.consumer is not None):
135 self.consumer.unregisterProducer()
136
137 if not self.iAmStreaming:
138 self.outstandingPull = not bytesSent
139
140 if self.producer is not None:
141 bytesBuffered = reduce(operator.add,
142 [len(s) for s in self._buffer], 0)
143 # TODO: You can see here the potential for high and low
144 # watermarks, where bufferSize would be the high mark when we
145 # ask the upstream producer to pause, and we wouldn't have
146 # it resume again until it hit the low mark. Or if producer
147 # is Pull, maybe we'd like to pull from it as much as necessary
148 # to keep our buffer full to the low mark, so we're never caught
149 # without something to send.
150 if self.producerPaused and (bytesBuffered < self.bufferSize):
151 # Now that our buffer is empty,
152 self.producerPaused = False
153 self.producer.resumeProducing()
154 elif self.outstandingPull:
155 # I did not have any data to write in response to a pull,
156 # so I'd better pull some myself.
157 self.producer.resumeProducing()
158
159 def write(self, data):
160 if self.paused or (not self.iAmStreaming and not self.outstandingPull):
161 # We could use that fifo queue here.
162 self._buffer.append(data)
163
164 elif self.consumer is not None:
165 assert not self._buffer, (
166 "Writing fresh data to consumer before my buffer is empty!")
167 # I'm going to use _writeSomeData here so that there is only one
168 # path to self.consumer.write. But it doesn't actually make sense,
169 # if I am streaming, for some data to not be all data. But maybe I
170 # am not streaming, but I am writing here anyway, because there was
171 # an earlier request for data which was not answered.
172 bytesSent = self._writeSomeData(data)
173 self.outstandingPull = False
174 if not bytesSent == len(data):
175 assert not self.iAmStreaming, (
176 "Streaming producer did not write all its data.")
177 self._buffer.append(data[bytesSent:])
178
179 if (self.producer is not None) and self.producerIsStreaming:
180 bytesBuffered = reduce(operator.add,
181 [len(s) for s in self._buffer], 0)
182 if bytesBuffered >= self.bufferSize:
183
184 self.producer.pauseProducing()
185 self.producerPaused = True
186
187 def registerProducer(self, producer, streaming):
188 self.unregistered = False
189 BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
190 if not streaming:
191 producer.resumeProducing()
192
193 def unregisterProducer(self):
194 if self.producer is not None:
195 del self.producer
196 del self.producerIsStreaming
197 self.unregistered = True
198 if self.consumer and not self._buffer:
199 self.consumer.unregisterProducer()
200
201 def _writeSomeData(self, data):
202 """Write as much of this data as possible.
203
204 @returns: The number of bytes written.
205 """
206 if self.consumer is None:
207 return 0
208 self.consumer.write(data)
209 return len(data)
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/protocols/oscar.py ('k') | third_party/twisted_8_1/twisted/protocols/policies.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698