Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(85)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/abstract.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_abstract -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 Support for generic select()able objects.
8
9 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
10 """
11
12 from zope.interface import implements
13
14 # Twisted Imports
15 from twisted.python import log, reflect, failure
16 from twisted.persisted import styles
17 from twisted.internet import interfaces, main
18
19
20 class FileDescriptor(log.Logger, styles.Ephemeral, object):
21 """An object which can be operated on by select().
22
23 This is an abstract superclass of all objects which may be notified when
24 they are readable or writable; e.g. they have a file-descriptor that is
25 valid to be passed to select(2).
26 """
27 connected = 0
28 producerPaused = 0
29 streamingProducer = 0
30 producer = None
31 disconnected = 0
32 disconnecting = 0
33 _writeDisconnecting = False
34 _writeDisconnected = False
35 dataBuffer = ""
36 offset = 0
37
38 SEND_LIMIT = 128*1024
39
40 implements(interfaces.IProducer, interfaces.IReadWriteDescriptor,
41 interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfClos eableDescriptor)
42
43 def __init__(self, reactor=None):
44 if not reactor:
45 from twisted.internet import reactor
46 self.reactor = reactor
47 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
48 self._tempDataLen = 0
49
50 def connectionLost(self, reason):
51 """The connection was lost.
52
53 This is called when the connection on a selectable object has been
54 lost. It will be called whether the connection was closed explicitly,
55 an exception occurred in an event handler, or the other end of the
56 connection closed it first.
57
58 Clean up state here, but make sure to call back up to FileDescriptor.
59 """
60
61 self.disconnected = 1
62 self.connected = 0
63 if self.producer is not None:
64 self.producer.stopProducing()
65 self.producer = None
66 self.stopReading()
67 self.stopWriting()
68
69 def writeSomeData(self, data):
70 """Write as much as possible of the given data, immediately.
71
72 This is called to invoke the lower-level writing functionality, such as
73 a socket's send() method, or a file's write(); this method returns an
74 integer. If positive, it is the number of bytes written; if negative,
75 it indicates the connection was lost.
76 """
77
78 raise NotImplementedError("%s does not implement writeSomeData" %
79 reflect.qual(self.__class__))
80
81 def doRead(self):
82 """Called when data is avaliable for reading.
83
84 Subclasses must override this method. The result will be interpreted
85 in the same way as a result of doWrite().
86 """
87 raise NotImplementedError("%s does not implement doRead" %
88 reflect.qual(self.__class__))
89
90 def doWrite(self):
91 """Called when data can be written.
92
93 A result that is true (which will be a negative number) implies the
94 connection was lost. A false result implies the connection is still
95 there; a result of 0 implies no write was done, and a result of None
96 indicates that a write was done.
97 """
98 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
99 # If there is currently less than SEND_LIMIT bytes left to send
100 # in the string, extend it with the array data.
101 self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(sel f._tempDataBuffer)
102 self.offset = 0
103 self._tempDataBuffer = []
104 self._tempDataLen = 0
105
106 # Send as much data as you can.
107 if self.offset:
108 l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
109 else:
110 l = self.writeSomeData(self.dataBuffer)
111 if l < 0 or isinstance(l, Exception):
112 return l
113 if l == 0 and self.dataBuffer:
114 result = 0
115 else:
116 result = None
117 self.offset += l
118 # If there is nothing left to send,
119 if self.offset == len(self.dataBuffer) and not self._tempDataLen:
120 self.dataBuffer = ""
121 self.offset = 0
122 # stop writing.
123 self.stopWriting()
124 # If I've got a producer who is supposed to supply me with data,
125 if self.producer is not None and ((not self.streamingProducer)
126 or self.producerPaused):
127 # tell them to supply some more.
128 self.producerPaused = 0
129 self.producer.resumeProducing()
130 elif self.disconnecting:
131 # But if I was previously asked to let the connection die, do
132 # so.
133 return self._postLoseConnection()
134 elif self._writeDisconnecting:
135 # I was previously asked to to half-close the connection.
136 result = self._closeWriteConnection()
137 self._writeDisconnected = True
138 return result
139 return result
140
141 def _postLoseConnection(self):
142 """Called after a loseConnection(), when all data has been written.
143
144 Whatever this returns is then returned by doWrite.
145 """
146 # default implementation, telling reactor we're finished
147 return main.CONNECTION_DONE
148
149 def _closeWriteConnection(self):
150 # override in subclasses
151 pass
152
153 def writeConnectionLost(self, reason):
154 # in current code should never be called
155 self.connectionLost(reason)
156
157 def readConnectionLost(self, reason):
158 # override in subclasses
159 self.connectionLost(reason)
160
161 def write(self, data):
162 """Reliably write some data.
163
164 The data is buffered until the underlying file descriptor is ready
165 for writing. If there is more than C{self.bufferSize} data in the
166 buffer and this descriptor has a registered streaming producer, its
167 C{pauseProducing()} method will be called.
168 """
169 if isinstance(data, unicode): # no, really, I mean it
170 raise TypeError("Data must not be unicode")
171 if not self.connected or self._writeDisconnected:
172 return
173 if data:
174 self._tempDataBuffer.append(data)
175 self._tempDataLen += len(data)
176 # If we are responsible for pausing our producer,
177 if self.producer is not None and self.streamingProducer:
178 # and our buffer is full,
179 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
180 # pause it.
181 self.producerPaused = 1
182 self.producer.pauseProducing()
183 self.startWriting()
184
185 def writeSequence(self, iovec):
186 """Reliably write a sequence of data.
187
188 Currently, this is a convenience method roughly equivalent to::
189
190 for chunk in iovec:
191 fd.write(chunk)
192
193 It may have a more efficient implementation at a later time or in a
194 different reactor.
195
196 As with the C{write()} method, if a buffer size limit is reached and a
197 streaming producer is registered, it will be paused until the buffered
198 data is written to the underlying file descriptor.
199 """
200 if not self.connected or not iovec or self._writeDisconnected:
201 return
202 self._tempDataBuffer.extend(iovec)
203 for i in iovec:
204 self._tempDataLen += len(i)
205 # If we are responsible for pausing our producer,
206 if self.producer is not None and self.streamingProducer:
207 # and our buffer is full,
208 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
209 # pause it.
210 self.producerPaused = 1
211 self.producer.pauseProducing()
212 self.startWriting()
213
214 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
215 """Close the connection at the next available opportunity.
216
217 Call this to cause this FileDescriptor to lose its connection. It will
218 first write any data that it has buffered.
219
220 If there is data buffered yet to be written, this method will cause the
221 transport to lose its connection as soon as it's done flushing its
222 write buffer. If you have a producer registered, the connection won't
223 be closed until the producer is finished. Therefore, make sure you
224 unregister your producer when it's finished, or the connection will
225 never close.
226 """
227
228 if self.connected and not self.disconnecting:
229 if self._writeDisconnected:
230 # doWrite won't trigger the connection close anymore
231 self.stopReading()
232 self.stopWriting()
233 self.connectionLost(_connDone)
234 else:
235 self.stopReading()
236 self.startWriting()
237 self.disconnecting = 1
238
239 def loseWriteConnection(self):
240 self._writeDisconnecting = True
241 self.startWriting()
242
243 def stopReading(self):
244 """Stop waiting for read availability.
245
246 Call this to remove this selectable from being notified when it is
247 ready for reading.
248 """
249 self.reactor.removeReader(self)
250
251 def stopWriting(self):
252 """Stop waiting for write availability.
253
254 Call this to remove this selectable from being notified when it is ready
255 for writing.
256 """
257 self.reactor.removeWriter(self)
258
259 def startReading(self):
260 """Start waiting for read availability.
261 """
262 self.reactor.addReader(self)
263
264 def startWriting(self):
265 """Start waiting for write availability.
266
267 Call this to have this FileDescriptor be notified whenever it is ready f or
268 writing.
269 """
270 self.reactor.addWriter(self)
271
272 # Producer/consumer implementation
273
274 # first, the consumer stuff. This requires no additional work, as
275 # any object you can write to can be a consumer, really.
276
277 producer = None
278 bufferSize = 2**2**2**2
279
280 def registerProducer(self, producer, streaming):
281 """Register to receive data from a producer.
282
283 This sets this selectable to be a consumer for a producer. When this
284 selectable runs out of data on a write() call, it will ask the producer
285 to resumeProducing(). When the FileDescriptor's internal data buffer is
286 filled, it will ask the producer to pauseProducing(). If the connection
287 is lost, FileDescriptor calls producer's stopProducing() method.
288
289 If streaming is true, the producer should provide the IPushProducer
290 interface. Otherwise, it is assumed that producer provides the
291 IPullProducer interface. In this case, the producer won't be asked
292 to pauseProducing(), but it has to be careful to write() data only
293 when its resumeProducing() method is called.
294 """
295 if self.producer is not None:
296 raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
297 if self.disconnected:
298 producer.stopProducing()
299 else:
300 self.producer = producer
301 self.streamingProducer = streaming
302 if not streaming:
303 producer.resumeProducing()
304
305 def unregisterProducer(self):
306 """Stop consuming data from a producer, without disconnecting.
307 """
308 self.producer = None
309
310 def stopConsuming(self):
311 """Stop consuming data.
312
313 This is called when a producer has lost its connection, to tell the
314 consumer to go lose its connection (and break potential circular
315 references).
316 """
317 self.unregisterProducer()
318 self.loseConnection()
319
320 # producer interface implementation
321
322 def resumeProducing(self):
323 assert self.connected and not self.disconnecting
324 self.startReading()
325
326 def pauseProducing(self):
327 self.stopReading()
328
329 def stopProducing(self):
330 self.loseConnection()
331
332
333 def fileno(self):
334 """File Descriptor number for select().
335
336 This method must be overridden or assigned in subclasses to
337 indicate a valid file descriptor for the operating system.
338 """
339 return -1
340
341
342 def isIPAddress(addr):
343 """
344 Determine whether the given string represents an IPv4 address.
345
346 @type addr: C{str}
347 @param addr: A string which may or may not be the decimal dotted
348 representation of an IPv4 address.
349
350 @rtype: C{bool}
351 @return: C{True} if C{addr} represents an IPv4 address, C{False}
352 otherwise.
353 """
354 dottedParts = addr.split('.')
355 if len(dottedParts) == 4:
356 for octet in dottedParts:
357 try:
358 value = int(octet)
359 except ValueError:
360 return False
361 else:
362 if value < 0 or value > 255:
363 return False
364 return True
365 return False
366
367
368 __all__ = ["FileDescriptor"]
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/_win32stdio.py ('k') | third_party/twisted_8_1/twisted/internet/address.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698