OLD | NEW |
| (Empty) |
1 # Copyright (c) 2008 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 | |
5 """ | |
6 Abstract file handle class | |
7 """ | |
8 | |
9 from twisted.internet import main, error, interfaces | |
10 from twisted.python import log, failure | |
11 from twisted.persisted import styles | |
12 | |
13 from zope.interface import implements | |
14 import errno | |
15 | |
16 from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF | |
17 from twisted.internet.iocpreactor.const import ERROR_IO_PENDING | |
18 from twisted.internet.iocpreactor import iocpsupport as _iocp | |
19 | |
20 | |
21 | |
22 class FileHandle(log.Logger, styles.Ephemeral, object): | |
23 """ | |
24 File handle that can read and write asynchronously | |
25 """ | |
26 implements(interfaces.IProducer, interfaces.IConsumer, | |
27 interfaces.ITransport, interfaces.IHalfCloseableDescriptor) | |
28 # read stuff | |
29 maxReadBuffers = 16 | |
30 readBufferSize = 4096 | |
31 reading = False | |
32 dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs | |
33 _readNextBuffer = 0 | |
34 _readSize = 0 # how much data we have in the read buffer | |
35 _readScheduled = None | |
36 _readScheduledInOS = False | |
37 | |
38 | |
39 def startReading(self): | |
40 self.reactor.addActiveHandle(self) | |
41 if not self._readScheduled and not self.reading: | |
42 self.reading = True | |
43 self._readScheduled = self.reactor.callLater(0, | |
44 self._resumeReading) | |
45 | |
46 | |
47 def stopReading(self): | |
48 if self._readScheduled: | |
49 self._readScheduled.cancel() | |
50 self._readScheduled = None | |
51 self.reading = False | |
52 | |
53 | |
54 def _resumeReading(self): | |
55 self._readScheduled = None | |
56 if self._dispatchData() and not self._readScheduledInOS: | |
57 self.doRead() | |
58 | |
59 | |
60 def _dispatchData(self): | |
61 """ | |
62 Dispatch previously read data. Return True if self.reading and we don't | |
63 have any more data | |
64 """ | |
65 if not self._readSize: | |
66 return self.reading | |
67 size = self._readSize | |
68 full_buffers = size // self.readBufferSize | |
69 while self._readNextBuffer < full_buffers: | |
70 self.dataReceived(self._readBuffers[self._readNextBuffer]) | |
71 self._readNextBuffer += 1 | |
72 if not self.reading: | |
73 return False | |
74 remainder = size % self.readBufferSize | |
75 if remainder: | |
76 self.dataReceived(buffer(self._readBuffers[full_buffers], | |
77 0, remainder)) | |
78 if self.dynamicReadBuffers: | |
79 total_buffer_size = self.readBufferSize * len(self._readBuffers) | |
80 # we have one buffer too many | |
81 if size < total_buffer_size - self.readBufferSize: | |
82 del self._readBuffers[-1] | |
83 # we filled all buffers, so allocate one more | |
84 elif (size == total_buffer_size and | |
85 len(self._readBuffers) < self.maxReadBuffers): | |
86 self._readBuffers.append(_iocp.AllocateReadBuffer( | |
87 self.readBufferSize)) | |
88 self._readNextBuffer = 0 | |
89 self._readSize = 0 | |
90 return self.reading | |
91 | |
92 | |
93 def _cbRead(self, rc, bytes, evt): | |
94 self._readScheduledInOS = False | |
95 if self._handleRead(rc, bytes, evt): | |
96 self.doRead() | |
97 | |
98 | |
99 def _handleRead(self, rc, bytes, evt): | |
100 """ | |
101 Returns False if we should stop reading for now | |
102 """ | |
103 if self.disconnected: | |
104 return False | |
105 # graceful disconnection | |
106 if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF): | |
107 self.reactor.removeActiveHandle(self) | |
108 self.readConnectionLost(failure.Failure(main.CONNECTION_DONE)) | |
109 return False | |
110 # XXX: not handling WSAEWOULDBLOCK | |
111 # ("too many outstanding overlapped I/O requests") | |
112 elif rc: | |
113 self.connectionLost(failure.Failure( | |
114 error.ConnectionLost("read error -- %s (%s)" % | |
115 (errno.errorcode.get(rc, 'unknown'), rc)))) | |
116 return False | |
117 else: | |
118 assert self._readSize == 0 | |
119 assert self._readNextBuffer == 0 | |
120 self._readSize = bytes | |
121 return self._dispatchData() | |
122 | |
123 | |
124 def doRead(self): | |
125 numReads = 0 | |
126 while 1: | |
127 evt = _iocp.Event(self._cbRead, self) | |
128 | |
129 evt.buff = buff = self._readBuffers | |
130 rc, bytes = self.readFromHandle(buff, evt) | |
131 | |
132 if (rc == ERROR_IO_PENDING | |
133 or (not rc and numReads >= self.maxReads)): | |
134 self._readScheduledInOS = True | |
135 break | |
136 else: | |
137 evt.ignore = True | |
138 if not self._handleRead(rc, bytes, evt): | |
139 break | |
140 numReads += 1 | |
141 | |
142 | |
143 def readFromHandle(self, bufflist, evt): | |
144 raise NotImplementedError() # TODO: this should default to ReadFile | |
145 | |
146 | |
147 def dataReceived(self, data): | |
148 raise NotImplementedError | |
149 | |
150 | |
151 def readConnectionLost(self, reason): | |
152 self.connectionLost(reason) | |
153 | |
154 | |
155 # write stuff | |
156 dataBuffer = '' | |
157 offset = 0 | |
158 writing = False | |
159 _writeScheduled = None | |
160 _writeDisconnecting = False | |
161 _writeDisconnected = False | |
162 writeBufferSize = 2**2**2**2 | |
163 maxWrites = 5 | |
164 | |
165 | |
166 def loseWriteConnection(self): | |
167 self._writeDisconnecting = True | |
168 self.startWriting() | |
169 | |
170 | |
171 def _closeWriteConnection(self): | |
172 # override in subclasses | |
173 pass | |
174 | |
175 | |
176 def writeConnectionLost(self, reason): | |
177 # in current code should never be called | |
178 self.connectionLost(reason) | |
179 | |
180 | |
181 def startWriting(self): | |
182 self.reactor.addActiveHandle(self) | |
183 self.writing = True | |
184 if not self._writeScheduled: | |
185 self._writeScheduled = self.reactor.callLater(0, | |
186 self._resumeWriting) | |
187 | |
188 | |
189 def stopWriting(self): | |
190 if self._writeScheduled: | |
191 self._writeScheduled.cancel() | |
192 self._writeScheduled = None | |
193 self.writing = False | |
194 | |
195 | |
196 def _resumeWriting(self): | |
197 self._writeScheduled = None | |
198 self.doWrite() | |
199 | |
200 | |
201 def _cbWrite(self, rc, bytes, evt): | |
202 if self._handleWrite(rc, bytes, evt): | |
203 self.doWrite() | |
204 | |
205 | |
206 def _handleWrite(self, rc, bytes, evt): | |
207 """ | |
208 Returns false if we should stop writing for now | |
209 """ | |
210 if self.disconnected or self._writeDisconnected: | |
211 return False | |
212 # XXX: not handling WSAEWOULDBLOCK | |
213 # ("too many outstanding overlapped I/O requests") | |
214 if rc: | |
215 self.connectionLost(failure.Failure( | |
216 error.ConnectionLost("write error -- %s (%s)" % | |
217 (errno.errorcode.get(rc, 'unknown'), rc)))) | |
218 return False | |
219 else: | |
220 self.offset += bytes | |
221 # If there is nothing left to send, | |
222 if self.offset == len(self.dataBuffer) and not self._tempDataLen: | |
223 self.dataBuffer = "" | |
224 self.offset = 0 | |
225 # stop writing | |
226 self.stopWriting() | |
227 # If I've got a producer who is supposed to supply me with data | |
228 if self.producer is not None and ((not self.streamingProducer) | |
229 or self.producerPaused): | |
230 # tell them to supply some more. | |
231 self.producerPaused = True | |
232 self.producer.resumeProducing() | |
233 elif self.disconnecting: | |
234 # But if I was previously asked to let the connection die, | |
235 # do so. | |
236 self.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
237 elif self._writeDisconnecting: | |
238 # I was previously asked to to half-close the connection. | |
239 self._closeWriteConnection() | |
240 self._writeDisconnected = True | |
241 return False | |
242 else: | |
243 return True | |
244 | |
245 | |
246 def doWrite(self): | |
247 numWrites = 0 | |
248 while 1: | |
249 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: | |
250 # If there is currently less than SEND_LIMIT bytes left to send | |
251 # in the string, extend it with the array data. | |
252 self.dataBuffer = (buffer(self.dataBuffer, self.offset) + | |
253 "".join(self._tempDataBuffer)) | |
254 self.offset = 0 | |
255 self._tempDataBuffer = [] | |
256 self._tempDataLen = 0 | |
257 | |
258 evt = _iocp.Event(self._cbWrite, self) | |
259 | |
260 # Send as much data as you can. | |
261 if self.offset: | |
262 evt.buff = buff = buffer(self.dataBuffer, self.offset) | |
263 else: | |
264 evt.buff = buff = self.dataBuffer | |
265 rc, bytes = self.writeToHandle(buff, evt) | |
266 if (rc == ERROR_IO_PENDING | |
267 or (not rc and numWrites >= self.maxWrites)): | |
268 break | |
269 else: | |
270 evt.ignore = True | |
271 if not self._handleWrite(rc, bytes, evt): | |
272 break | |
273 numWrites += 1 | |
274 | |
275 | |
276 def writeToHandle(self, buff, evt): | |
277 raise NotImplementedError() # TODO: this should default to WriteFile | |
278 | |
279 | |
280 def write(self, data): | |
281 """Reliably write some data. | |
282 | |
283 The data is buffered until his file descriptor is ready for writing. | |
284 """ | |
285 if isinstance(data, unicode): # no, really, I mean it | |
286 raise TypeError("Data must not be unicode") | |
287 if not self.connected or self._writeDisconnected: | |
288 return | |
289 if data: | |
290 self._tempDataBuffer.append(data) | |
291 self._tempDataLen += len(data) | |
292 if self.producer is not None: | |
293 if (len(self.dataBuffer) + self._tempDataLen | |
294 > self.writeBufferSize): | |
295 self.producerPaused = True | |
296 self.producer.pauseProducing() | |
297 self.startWriting() | |
298 | |
299 | |
300 def writeSequence(self, iovec): | |
301 if not self.connected or not iovec or self._writeDisconnected: | |
302 return | |
303 self._tempDataBuffer.extend(iovec) | |
304 for i in iovec: | |
305 self._tempDataLen += len(i) | |
306 if self.producer is not None: | |
307 if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize: | |
308 self.producerPaused = True | |
309 self.producer.pauseProducing() | |
310 self.startWriting() | |
311 | |
312 | |
313 # general stuff | |
314 connected = False | |
315 disconnected = False | |
316 disconnecting = False | |
317 logstr = "Uninitialized" | |
318 | |
319 SEND_LIMIT = 128*1024 | |
320 | |
321 maxReads = 5 | |
322 | |
323 | |
324 def __init__(self, reactor = None): | |
325 if not reactor: | |
326 from twisted.internet import reactor | |
327 self.reactor = reactor | |
328 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite | |
329 self._tempDataLen = 0 | |
330 self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)] | |
331 | |
332 | |
333 def connectionLost(self, reason): | |
334 """ | |
335 The connection was lost. | |
336 | |
337 This is called when the connection on a selectable object has been | |
338 lost. It will be called whether the connection was closed explicitly, | |
339 an exception occurred in an event handler, or the other end of the | |
340 connection closed it first. | |
341 | |
342 Clean up state here, but make sure to call back up to FileDescriptor. | |
343 """ | |
344 | |
345 self.disconnected = True | |
346 self.connected = False | |
347 if self.producer is not None: | |
348 self.producer.stopProducing() | |
349 self.producer = None | |
350 self.stopReading() | |
351 self.stopWriting() | |
352 self.reactor.removeActiveHandle(self) | |
353 | |
354 | |
355 def getFileHandle(self): | |
356 return -1 | |
357 | |
358 | |
359 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): | |
360 """ | |
361 Close the connection at the next available opportunity. | |
362 | |
363 Call this to cause this FileDescriptor to lose its connection. It will | |
364 first write any data that it has buffered. | |
365 | |
366 If there is data buffered yet to be written, this method will cause the | |
367 transport to lose its connection as soon as it's done flushing its | |
368 write buffer. If you have a producer registered, the connection won't | |
369 be closed until the producer is finished. Therefore, make sure you | |
370 unregister your producer when it's finished, or the connection will | |
371 never close. | |
372 """ | |
373 | |
374 if self.connected and not self.disconnecting: | |
375 if self._writeDisconnected: | |
376 # doWrite won't trigger the connection close anymore | |
377 self.stopReading() | |
378 self.stopWriting | |
379 self.connectionLost(_connDone) | |
380 else: | |
381 self.stopReading() | |
382 self.startWriting() | |
383 self.disconnecting = 1 | |
384 | |
385 | |
386 # Producer/consumer implementation | |
387 | |
388 producerPaused = False | |
389 streamingProducer = False | |
390 | |
391 # first, the consumer stuff. This requires no additional work, as | |
392 # any object you can write to can be a consumer, really. | |
393 | |
394 producer = None | |
395 | |
396 | |
397 def registerProducer(self, producer, streaming): | |
398 """ | |
399 Register to receive data from a producer. | |
400 | |
401 This sets this selectable to be a consumer for a producer. When this | |
402 selectable runs out of data on a write() call, it will ask the producer | |
403 to resumeProducing(). A producer should implement the IProducer | |
404 interface. | |
405 | |
406 FileDescriptor provides some infrastructure for producer methods. | |
407 """ | |
408 if self.producer is not None: | |
409 raise RuntimeError( | |
410 "Cannot register producer %s, because producer " | |
411 "%s was never unregistered." % (producer, self.producer)) | |
412 if self.disconnected: | |
413 producer.stopProducing() | |
414 else: | |
415 self.producer = producer | |
416 self.streamingProducer = streaming | |
417 if not streaming: | |
418 producer.resumeProducing() | |
419 | |
420 | |
421 def unregisterProducer(self): | |
422 """ | |
423 Stop consuming data from a producer, without disconnecting. | |
424 """ | |
425 self.producer = None | |
426 | |
427 | |
428 def stopConsuming(self): | |
429 """ | |
430 Stop consuming data. | |
431 | |
432 This is called when a producer has lost its connection, to tell the | |
433 consumer to go lose its connection (and break potential circular | |
434 references). | |
435 """ | |
436 self.unregisterProducer() | |
437 self.loseConnection() | |
438 | |
439 | |
440 # producer interface implementation | |
441 | |
442 def resumeProducing(self): | |
443 assert self.connected and not self.disconnecting | |
444 self.startReading() | |
445 | |
446 | |
447 def pauseProducing(self): | |
448 self.stopReading() | |
449 | |
450 | |
451 def stopProducing(self): | |
452 self.loseConnection() | |
453 | |
454 | |
455 __all__ = ['FileHandle'] | |
456 | |
OLD | NEW |