OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_abstract -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 | |
6 """ | |
7 Support for generic select()able objects. | |
8 | |
9 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
10 """ | |
11 | |
12 from zope.interface import implements | |
13 | |
14 # Twisted Imports | |
15 from twisted.python import log, reflect, failure | |
16 from twisted.persisted import styles | |
17 from twisted.internet import interfaces, main | |
18 | |
19 | |
20 class FileDescriptor(log.Logger, styles.Ephemeral, object): | |
21 """An object which can be operated on by select(). | |
22 | |
23 This is an abstract superclass of all objects which may be notified when | |
24 they are readable or writable; e.g. they have a file-descriptor that is | |
25 valid to be passed to select(2). | |
26 """ | |
27 connected = 0 | |
28 producerPaused = 0 | |
29 streamingProducer = 0 | |
30 producer = None | |
31 disconnected = 0 | |
32 disconnecting = 0 | |
33 _writeDisconnecting = False | |
34 _writeDisconnected = False | |
35 dataBuffer = "" | |
36 offset = 0 | |
37 | |
38 SEND_LIMIT = 128*1024 | |
39 | |
40 implements(interfaces.IProducer, interfaces.IReadWriteDescriptor, | |
41 interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfClos
eableDescriptor) | |
42 | |
43 def __init__(self, reactor=None): | |
44 if not reactor: | |
45 from twisted.internet import reactor | |
46 self.reactor = reactor | |
47 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite | |
48 self._tempDataLen = 0 | |
49 | |
50 def connectionLost(self, reason): | |
51 """The connection was lost. | |
52 | |
53 This is called when the connection on a selectable object has been | |
54 lost. It will be called whether the connection was closed explicitly, | |
55 an exception occurred in an event handler, or the other end of the | |
56 connection closed it first. | |
57 | |
58 Clean up state here, but make sure to call back up to FileDescriptor. | |
59 """ | |
60 | |
61 self.disconnected = 1 | |
62 self.connected = 0 | |
63 if self.producer is not None: | |
64 self.producer.stopProducing() | |
65 self.producer = None | |
66 self.stopReading() | |
67 self.stopWriting() | |
68 | |
69 def writeSomeData(self, data): | |
70 """Write as much as possible of the given data, immediately. | |
71 | |
72 This is called to invoke the lower-level writing functionality, such as | |
73 a socket's send() method, or a file's write(); this method returns an | |
74 integer. If positive, it is the number of bytes written; if negative, | |
75 it indicates the connection was lost. | |
76 """ | |
77 | |
78 raise NotImplementedError("%s does not implement writeSomeData" % | |
79 reflect.qual(self.__class__)) | |
80 | |
81 def doRead(self): | |
82 """Called when data is avaliable for reading. | |
83 | |
84 Subclasses must override this method. The result will be interpreted | |
85 in the same way as a result of doWrite(). | |
86 """ | |
87 raise NotImplementedError("%s does not implement doRead" % | |
88 reflect.qual(self.__class__)) | |
89 | |
90 def doWrite(self): | |
91 """Called when data can be written. | |
92 | |
93 A result that is true (which will be a negative number) implies the | |
94 connection was lost. A false result implies the connection is still | |
95 there; a result of 0 implies no write was done, and a result of None | |
96 indicates that a write was done. | |
97 """ | |
98 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: | |
99 # If there is currently less than SEND_LIMIT bytes left to send | |
100 # in the string, extend it with the array data. | |
101 self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(sel
f._tempDataBuffer) | |
102 self.offset = 0 | |
103 self._tempDataBuffer = [] | |
104 self._tempDataLen = 0 | |
105 | |
106 # Send as much data as you can. | |
107 if self.offset: | |
108 l = self.writeSomeData(buffer(self.dataBuffer, self.offset)) | |
109 else: | |
110 l = self.writeSomeData(self.dataBuffer) | |
111 if l < 0 or isinstance(l, Exception): | |
112 return l | |
113 if l == 0 and self.dataBuffer: | |
114 result = 0 | |
115 else: | |
116 result = None | |
117 self.offset += l | |
118 # If there is nothing left to send, | |
119 if self.offset == len(self.dataBuffer) and not self._tempDataLen: | |
120 self.dataBuffer = "" | |
121 self.offset = 0 | |
122 # stop writing. | |
123 self.stopWriting() | |
124 # If I've got a producer who is supposed to supply me with data, | |
125 if self.producer is not None and ((not self.streamingProducer) | |
126 or self.producerPaused): | |
127 # tell them to supply some more. | |
128 self.producerPaused = 0 | |
129 self.producer.resumeProducing() | |
130 elif self.disconnecting: | |
131 # But if I was previously asked to let the connection die, do | |
132 # so. | |
133 return self._postLoseConnection() | |
134 elif self._writeDisconnecting: | |
135 # I was previously asked to to half-close the connection. | |
136 result = self._closeWriteConnection() | |
137 self._writeDisconnected = True | |
138 return result | |
139 return result | |
140 | |
141 def _postLoseConnection(self): | |
142 """Called after a loseConnection(), when all data has been written. | |
143 | |
144 Whatever this returns is then returned by doWrite. | |
145 """ | |
146 # default implementation, telling reactor we're finished | |
147 return main.CONNECTION_DONE | |
148 | |
149 def _closeWriteConnection(self): | |
150 # override in subclasses | |
151 pass | |
152 | |
153 def writeConnectionLost(self, reason): | |
154 # in current code should never be called | |
155 self.connectionLost(reason) | |
156 | |
157 def readConnectionLost(self, reason): | |
158 # override in subclasses | |
159 self.connectionLost(reason) | |
160 | |
161 def write(self, data): | |
162 """Reliably write some data. | |
163 | |
164 The data is buffered until the underlying file descriptor is ready | |
165 for writing. If there is more than C{self.bufferSize} data in the | |
166 buffer and this descriptor has a registered streaming producer, its | |
167 C{pauseProducing()} method will be called. | |
168 """ | |
169 if isinstance(data, unicode): # no, really, I mean it | |
170 raise TypeError("Data must not be unicode") | |
171 if not self.connected or self._writeDisconnected: | |
172 return | |
173 if data: | |
174 self._tempDataBuffer.append(data) | |
175 self._tempDataLen += len(data) | |
176 # If we are responsible for pausing our producer, | |
177 if self.producer is not None and self.streamingProducer: | |
178 # and our buffer is full, | |
179 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize: | |
180 # pause it. | |
181 self.producerPaused = 1 | |
182 self.producer.pauseProducing() | |
183 self.startWriting() | |
184 | |
185 def writeSequence(self, iovec): | |
186 """Reliably write a sequence of data. | |
187 | |
188 Currently, this is a convenience method roughly equivalent to:: | |
189 | |
190 for chunk in iovec: | |
191 fd.write(chunk) | |
192 | |
193 It may have a more efficient implementation at a later time or in a | |
194 different reactor. | |
195 | |
196 As with the C{write()} method, if a buffer size limit is reached and a | |
197 streaming producer is registered, it will be paused until the buffered | |
198 data is written to the underlying file descriptor. | |
199 """ | |
200 if not self.connected or not iovec or self._writeDisconnected: | |
201 return | |
202 self._tempDataBuffer.extend(iovec) | |
203 for i in iovec: | |
204 self._tempDataLen += len(i) | |
205 # If we are responsible for pausing our producer, | |
206 if self.producer is not None and self.streamingProducer: | |
207 # and our buffer is full, | |
208 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize: | |
209 # pause it. | |
210 self.producerPaused = 1 | |
211 self.producer.pauseProducing() | |
212 self.startWriting() | |
213 | |
214 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): | |
215 """Close the connection at the next available opportunity. | |
216 | |
217 Call this to cause this FileDescriptor to lose its connection. It will | |
218 first write any data that it has buffered. | |
219 | |
220 If there is data buffered yet to be written, this method will cause the | |
221 transport to lose its connection as soon as it's done flushing its | |
222 write buffer. If you have a producer registered, the connection won't | |
223 be closed until the producer is finished. Therefore, make sure you | |
224 unregister your producer when it's finished, or the connection will | |
225 never close. | |
226 """ | |
227 | |
228 if self.connected and not self.disconnecting: | |
229 if self._writeDisconnected: | |
230 # doWrite won't trigger the connection close anymore | |
231 self.stopReading() | |
232 self.stopWriting() | |
233 self.connectionLost(_connDone) | |
234 else: | |
235 self.stopReading() | |
236 self.startWriting() | |
237 self.disconnecting = 1 | |
238 | |
239 def loseWriteConnection(self): | |
240 self._writeDisconnecting = True | |
241 self.startWriting() | |
242 | |
243 def stopReading(self): | |
244 """Stop waiting for read availability. | |
245 | |
246 Call this to remove this selectable from being notified when it is | |
247 ready for reading. | |
248 """ | |
249 self.reactor.removeReader(self) | |
250 | |
251 def stopWriting(self): | |
252 """Stop waiting for write availability. | |
253 | |
254 Call this to remove this selectable from being notified when it is ready | |
255 for writing. | |
256 """ | |
257 self.reactor.removeWriter(self) | |
258 | |
259 def startReading(self): | |
260 """Start waiting for read availability. | |
261 """ | |
262 self.reactor.addReader(self) | |
263 | |
264 def startWriting(self): | |
265 """Start waiting for write availability. | |
266 | |
267 Call this to have this FileDescriptor be notified whenever it is ready f
or | |
268 writing. | |
269 """ | |
270 self.reactor.addWriter(self) | |
271 | |
272 # Producer/consumer implementation | |
273 | |
274 # first, the consumer stuff. This requires no additional work, as | |
275 # any object you can write to can be a consumer, really. | |
276 | |
277 producer = None | |
278 bufferSize = 2**2**2**2 | |
279 | |
280 def registerProducer(self, producer, streaming): | |
281 """Register to receive data from a producer. | |
282 | |
283 This sets this selectable to be a consumer for a producer. When this | |
284 selectable runs out of data on a write() call, it will ask the producer | |
285 to resumeProducing(). When the FileDescriptor's internal data buffer is | |
286 filled, it will ask the producer to pauseProducing(). If the connection | |
287 is lost, FileDescriptor calls producer's stopProducing() method. | |
288 | |
289 If streaming is true, the producer should provide the IPushProducer | |
290 interface. Otherwise, it is assumed that producer provides the | |
291 IPullProducer interface. In this case, the producer won't be asked | |
292 to pauseProducing(), but it has to be careful to write() data only | |
293 when its resumeProducing() method is called. | |
294 """ | |
295 if self.producer is not None: | |
296 raise RuntimeError("Cannot register producer %s, because producer %s
was never unregistered." % (producer, self.producer)) | |
297 if self.disconnected: | |
298 producer.stopProducing() | |
299 else: | |
300 self.producer = producer | |
301 self.streamingProducer = streaming | |
302 if not streaming: | |
303 producer.resumeProducing() | |
304 | |
305 def unregisterProducer(self): | |
306 """Stop consuming data from a producer, without disconnecting. | |
307 """ | |
308 self.producer = None | |
309 | |
310 def stopConsuming(self): | |
311 """Stop consuming data. | |
312 | |
313 This is called when a producer has lost its connection, to tell the | |
314 consumer to go lose its connection (and break potential circular | |
315 references). | |
316 """ | |
317 self.unregisterProducer() | |
318 self.loseConnection() | |
319 | |
320 # producer interface implementation | |
321 | |
322 def resumeProducing(self): | |
323 assert self.connected and not self.disconnecting | |
324 self.startReading() | |
325 | |
326 def pauseProducing(self): | |
327 self.stopReading() | |
328 | |
329 def stopProducing(self): | |
330 self.loseConnection() | |
331 | |
332 | |
333 def fileno(self): | |
334 """File Descriptor number for select(). | |
335 | |
336 This method must be overridden or assigned in subclasses to | |
337 indicate a valid file descriptor for the operating system. | |
338 """ | |
339 return -1 | |
340 | |
341 | |
342 def isIPAddress(addr): | |
343 """ | |
344 Determine whether the given string represents an IPv4 address. | |
345 | |
346 @type addr: C{str} | |
347 @param addr: A string which may or may not be the decimal dotted | |
348 representation of an IPv4 address. | |
349 | |
350 @rtype: C{bool} | |
351 @return: C{True} if C{addr} represents an IPv4 address, C{False} | |
352 otherwise. | |
353 """ | |
354 dottedParts = addr.split('.') | |
355 if len(dottedParts) == 4: | |
356 for octet in dottedParts: | |
357 try: | |
358 value = int(octet) | |
359 except ValueError: | |
360 return False | |
361 else: | |
362 if value < 0 or value > 255: | |
363 return False | |
364 return True | |
365 return False | |
366 | |
367 | |
368 __all__ = ["FileDescriptor"] | |
OLD | NEW |