Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(249)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/iocpreactor/abstract.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2008 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 Abstract file handle class
7 """
8
9 from twisted.internet import main, error, interfaces
10 from twisted.python import log, failure
11 from twisted.persisted import styles
12
13 from zope.interface import implements
14 import errno
15
16 from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
17 from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
18 from twisted.internet.iocpreactor import iocpsupport as _iocp
19
20
21
22 class FileHandle(log.Logger, styles.Ephemeral, object):
23 """
24 File handle that can read and write asynchronously
25 """
26 implements(interfaces.IProducer, interfaces.IConsumer,
27 interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
28 # read stuff
29 maxReadBuffers = 16
30 readBufferSize = 4096
31 reading = False
32 dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
33 _readNextBuffer = 0
34 _readSize = 0 # how much data we have in the read buffer
35 _readScheduled = None
36 _readScheduledInOS = False
37
38
39 def startReading(self):
40 self.reactor.addActiveHandle(self)
41 if not self._readScheduled and not self.reading:
42 self.reading = True
43 self._readScheduled = self.reactor.callLater(0,
44 self._resumeReading)
45
46
47 def stopReading(self):
48 if self._readScheduled:
49 self._readScheduled.cancel()
50 self._readScheduled = None
51 self.reading = False
52
53
54 def _resumeReading(self):
55 self._readScheduled = None
56 if self._dispatchData() and not self._readScheduledInOS:
57 self.doRead()
58
59
60 def _dispatchData(self):
61 """
62 Dispatch previously read data. Return True if self.reading and we don't
63 have any more data
64 """
65 if not self._readSize:
66 return self.reading
67 size = self._readSize
68 full_buffers = size // self.readBufferSize
69 while self._readNextBuffer < full_buffers:
70 self.dataReceived(self._readBuffers[self._readNextBuffer])
71 self._readNextBuffer += 1
72 if not self.reading:
73 return False
74 remainder = size % self.readBufferSize
75 if remainder:
76 self.dataReceived(buffer(self._readBuffers[full_buffers],
77 0, remainder))
78 if self.dynamicReadBuffers:
79 total_buffer_size = self.readBufferSize * len(self._readBuffers)
80 # we have one buffer too many
81 if size < total_buffer_size - self.readBufferSize:
82 del self._readBuffers[-1]
83 # we filled all buffers, so allocate one more
84 elif (size == total_buffer_size and
85 len(self._readBuffers) < self.maxReadBuffers):
86 self._readBuffers.append(_iocp.AllocateReadBuffer(
87 self.readBufferSize))
88 self._readNextBuffer = 0
89 self._readSize = 0
90 return self.reading
91
92
93 def _cbRead(self, rc, bytes, evt):
94 self._readScheduledInOS = False
95 if self._handleRead(rc, bytes, evt):
96 self.doRead()
97
98
99 def _handleRead(self, rc, bytes, evt):
100 """
101 Returns False if we should stop reading for now
102 """
103 if self.disconnected:
104 return False
105 # graceful disconnection
106 if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
107 self.reactor.removeActiveHandle(self)
108 self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
109 return False
110 # XXX: not handling WSAEWOULDBLOCK
111 # ("too many outstanding overlapped I/O requests")
112 elif rc:
113 self.connectionLost(failure.Failure(
114 error.ConnectionLost("read error -- %s (%s)" %
115 (errno.errorcode.get(rc, 'unknown'), rc))))
116 return False
117 else:
118 assert self._readSize == 0
119 assert self._readNextBuffer == 0
120 self._readSize = bytes
121 return self._dispatchData()
122
123
124 def doRead(self):
125 numReads = 0
126 while 1:
127 evt = _iocp.Event(self._cbRead, self)
128
129 evt.buff = buff = self._readBuffers
130 rc, bytes = self.readFromHandle(buff, evt)
131
132 if (rc == ERROR_IO_PENDING
133 or (not rc and numReads >= self.maxReads)):
134 self._readScheduledInOS = True
135 break
136 else:
137 evt.ignore = True
138 if not self._handleRead(rc, bytes, evt):
139 break
140 numReads += 1
141
142
143 def readFromHandle(self, bufflist, evt):
144 raise NotImplementedError() # TODO: this should default to ReadFile
145
146
147 def dataReceived(self, data):
148 raise NotImplementedError
149
150
151 def readConnectionLost(self, reason):
152 self.connectionLost(reason)
153
154
155 # write stuff
156 dataBuffer = ''
157 offset = 0
158 writing = False
159 _writeScheduled = None
160 _writeDisconnecting = False
161 _writeDisconnected = False
162 writeBufferSize = 2**2**2**2
163 maxWrites = 5
164
165
166 def loseWriteConnection(self):
167 self._writeDisconnecting = True
168 self.startWriting()
169
170
171 def _closeWriteConnection(self):
172 # override in subclasses
173 pass
174
175
176 def writeConnectionLost(self, reason):
177 # in current code should never be called
178 self.connectionLost(reason)
179
180
181 def startWriting(self):
182 self.reactor.addActiveHandle(self)
183 self.writing = True
184 if not self._writeScheduled:
185 self._writeScheduled = self.reactor.callLater(0,
186 self._resumeWriting)
187
188
189 def stopWriting(self):
190 if self._writeScheduled:
191 self._writeScheduled.cancel()
192 self._writeScheduled = None
193 self.writing = False
194
195
196 def _resumeWriting(self):
197 self._writeScheduled = None
198 self.doWrite()
199
200
201 def _cbWrite(self, rc, bytes, evt):
202 if self._handleWrite(rc, bytes, evt):
203 self.doWrite()
204
205
206 def _handleWrite(self, rc, bytes, evt):
207 """
208 Returns false if we should stop writing for now
209 """
210 if self.disconnected or self._writeDisconnected:
211 return False
212 # XXX: not handling WSAEWOULDBLOCK
213 # ("too many outstanding overlapped I/O requests")
214 if rc:
215 self.connectionLost(failure.Failure(
216 error.ConnectionLost("write error -- %s (%s)" %
217 (errno.errorcode.get(rc, 'unknown'), rc))))
218 return False
219 else:
220 self.offset += bytes
221 # If there is nothing left to send,
222 if self.offset == len(self.dataBuffer) and not self._tempDataLen:
223 self.dataBuffer = ""
224 self.offset = 0
225 # stop writing
226 self.stopWriting()
227 # If I've got a producer who is supposed to supply me with data
228 if self.producer is not None and ((not self.streamingProducer)
229 or self.producerPaused):
230 # tell them to supply some more.
231 self.producerPaused = True
232 self.producer.resumeProducing()
233 elif self.disconnecting:
234 # But if I was previously asked to let the connection die,
235 # do so.
236 self.connectionLost(failure.Failure(main.CONNECTION_DONE))
237 elif self._writeDisconnecting:
238 # I was previously asked to to half-close the connection.
239 self._closeWriteConnection()
240 self._writeDisconnected = True
241 return False
242 else:
243 return True
244
245
246 def doWrite(self):
247 numWrites = 0
248 while 1:
249 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
250 # If there is currently less than SEND_LIMIT bytes left to send
251 # in the string, extend it with the array data.
252 self.dataBuffer = (buffer(self.dataBuffer, self.offset) +
253 "".join(self._tempDataBuffer))
254 self.offset = 0
255 self._tempDataBuffer = []
256 self._tempDataLen = 0
257
258 evt = _iocp.Event(self._cbWrite, self)
259
260 # Send as much data as you can.
261 if self.offset:
262 evt.buff = buff = buffer(self.dataBuffer, self.offset)
263 else:
264 evt.buff = buff = self.dataBuffer
265 rc, bytes = self.writeToHandle(buff, evt)
266 if (rc == ERROR_IO_PENDING
267 or (not rc and numWrites >= self.maxWrites)):
268 break
269 else:
270 evt.ignore = True
271 if not self._handleWrite(rc, bytes, evt):
272 break
273 numWrites += 1
274
275
276 def writeToHandle(self, buff, evt):
277 raise NotImplementedError() # TODO: this should default to WriteFile
278
279
280 def write(self, data):
281 """Reliably write some data.
282
283 The data is buffered until his file descriptor is ready for writing.
284 """
285 if isinstance(data, unicode): # no, really, I mean it
286 raise TypeError("Data must not be unicode")
287 if not self.connected or self._writeDisconnected:
288 return
289 if data:
290 self._tempDataBuffer.append(data)
291 self._tempDataLen += len(data)
292 if self.producer is not None:
293 if (len(self.dataBuffer) + self._tempDataLen
294 > self.writeBufferSize):
295 self.producerPaused = True
296 self.producer.pauseProducing()
297 self.startWriting()
298
299
300 def writeSequence(self, iovec):
301 if not self.connected or not iovec or self._writeDisconnected:
302 return
303 self._tempDataBuffer.extend(iovec)
304 for i in iovec:
305 self._tempDataLen += len(i)
306 if self.producer is not None:
307 if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
308 self.producerPaused = True
309 self.producer.pauseProducing()
310 self.startWriting()
311
312
313 # general stuff
314 connected = False
315 disconnected = False
316 disconnecting = False
317 logstr = "Uninitialized"
318
319 SEND_LIMIT = 128*1024
320
321 maxReads = 5
322
323
324 def __init__(self, reactor = None):
325 if not reactor:
326 from twisted.internet import reactor
327 self.reactor = reactor
328 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
329 self._tempDataLen = 0
330 self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)]
331
332
333 def connectionLost(self, reason):
334 """
335 The connection was lost.
336
337 This is called when the connection on a selectable object has been
338 lost. It will be called whether the connection was closed explicitly,
339 an exception occurred in an event handler, or the other end of the
340 connection closed it first.
341
342 Clean up state here, but make sure to call back up to FileDescriptor.
343 """
344
345 self.disconnected = True
346 self.connected = False
347 if self.producer is not None:
348 self.producer.stopProducing()
349 self.producer = None
350 self.stopReading()
351 self.stopWriting()
352 self.reactor.removeActiveHandle(self)
353
354
355 def getFileHandle(self):
356 return -1
357
358
359 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
360 """
361 Close the connection at the next available opportunity.
362
363 Call this to cause this FileDescriptor to lose its connection. It will
364 first write any data that it has buffered.
365
366 If there is data buffered yet to be written, this method will cause the
367 transport to lose its connection as soon as it's done flushing its
368 write buffer. If you have a producer registered, the connection won't
369 be closed until the producer is finished. Therefore, make sure you
370 unregister your producer when it's finished, or the connection will
371 never close.
372 """
373
374 if self.connected and not self.disconnecting:
375 if self._writeDisconnected:
376 # doWrite won't trigger the connection close anymore
377 self.stopReading()
378 self.stopWriting
379 self.connectionLost(_connDone)
380 else:
381 self.stopReading()
382 self.startWriting()
383 self.disconnecting = 1
384
385
386 # Producer/consumer implementation
387
388 producerPaused = False
389 streamingProducer = False
390
391 # first, the consumer stuff. This requires no additional work, as
392 # any object you can write to can be a consumer, really.
393
394 producer = None
395
396
397 def registerProducer(self, producer, streaming):
398 """
399 Register to receive data from a producer.
400
401 This sets this selectable to be a consumer for a producer. When this
402 selectable runs out of data on a write() call, it will ask the producer
403 to resumeProducing(). A producer should implement the IProducer
404 interface.
405
406 FileDescriptor provides some infrastructure for producer methods.
407 """
408 if self.producer is not None:
409 raise RuntimeError(
410 "Cannot register producer %s, because producer "
411 "%s was never unregistered." % (producer, self.producer))
412 if self.disconnected:
413 producer.stopProducing()
414 else:
415 self.producer = producer
416 self.streamingProducer = streaming
417 if not streaming:
418 producer.resumeProducing()
419
420
421 def unregisterProducer(self):
422 """
423 Stop consuming data from a producer, without disconnecting.
424 """
425 self.producer = None
426
427
428 def stopConsuming(self):
429 """
430 Stop consuming data.
431
432 This is called when a producer has lost its connection, to tell the
433 consumer to go lose its connection (and break potential circular
434 references).
435 """
436 self.unregisterProducer()
437 self.loseConnection()
438
439
440 # producer interface implementation
441
442 def resumeProducing(self):
443 assert self.connected and not self.disconnecting
444 self.startReading()
445
446
447 def pauseProducing(self):
448 self.stopReading()
449
450
451 def stopProducing(self):
452 self.loseConnection()
453
454
455 __all__ = ['FileHandle']
456
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698