| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_abstract -*- | |
| 2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 | |
| 6 """ | |
| 7 Support for generic select()able objects. | |
| 8 | |
| 9 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 10 """ | |
| 11 | |
| 12 from zope.interface import implements | |
| 13 | |
| 14 # Twisted Imports | |
| 15 from twisted.python import log, reflect, failure | |
| 16 from twisted.persisted import styles | |
| 17 from twisted.internet import interfaces, main | |
| 18 | |
| 19 | |
| 20 class FileDescriptor(log.Logger, styles.Ephemeral, object): | |
| 21 """An object which can be operated on by select(). | |
| 22 | |
| 23 This is an abstract superclass of all objects which may be notified when | |
| 24 they are readable or writable; e.g. they have a file-descriptor that is | |
| 25 valid to be passed to select(2). | |
| 26 """ | |
| 27 connected = 0 | |
| 28 producerPaused = 0 | |
| 29 streamingProducer = 0 | |
| 30 producer = None | |
| 31 disconnected = 0 | |
| 32 disconnecting = 0 | |
| 33 _writeDisconnecting = False | |
| 34 _writeDisconnected = False | |
| 35 dataBuffer = "" | |
| 36 offset = 0 | |
| 37 | |
| 38 SEND_LIMIT = 128*1024 | |
| 39 | |
| 40 implements(interfaces.IProducer, interfaces.IReadWriteDescriptor, | |
| 41 interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfClos
eableDescriptor) | |
| 42 | |
| 43 def __init__(self, reactor=None): | |
| 44 if not reactor: | |
| 45 from twisted.internet import reactor | |
| 46 self.reactor = reactor | |
| 47 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite | |
| 48 self._tempDataLen = 0 | |
| 49 | |
| 50 def connectionLost(self, reason): | |
| 51 """The connection was lost. | |
| 52 | |
| 53 This is called when the connection on a selectable object has been | |
| 54 lost. It will be called whether the connection was closed explicitly, | |
| 55 an exception occurred in an event handler, or the other end of the | |
| 56 connection closed it first. | |
| 57 | |
| 58 Clean up state here, but make sure to call back up to FileDescriptor. | |
| 59 """ | |
| 60 | |
| 61 self.disconnected = 1 | |
| 62 self.connected = 0 | |
| 63 if self.producer is not None: | |
| 64 self.producer.stopProducing() | |
| 65 self.producer = None | |
| 66 self.stopReading() | |
| 67 self.stopWriting() | |
| 68 | |
| 69 def writeSomeData(self, data): | |
| 70 """Write as much as possible of the given data, immediately. | |
| 71 | |
| 72 This is called to invoke the lower-level writing functionality, such as | |
| 73 a socket's send() method, or a file's write(); this method returns an | |
| 74 integer. If positive, it is the number of bytes written; if negative, | |
| 75 it indicates the connection was lost. | |
| 76 """ | |
| 77 | |
| 78 raise NotImplementedError("%s does not implement writeSomeData" % | |
| 79 reflect.qual(self.__class__)) | |
| 80 | |
| 81 def doRead(self): | |
| 82 """Called when data is avaliable for reading. | |
| 83 | |
| 84 Subclasses must override this method. The result will be interpreted | |
| 85 in the same way as a result of doWrite(). | |
| 86 """ | |
| 87 raise NotImplementedError("%s does not implement doRead" % | |
| 88 reflect.qual(self.__class__)) | |
| 89 | |
| 90 def doWrite(self): | |
| 91 """Called when data can be written. | |
| 92 | |
| 93 A result that is true (which will be a negative number) implies the | |
| 94 connection was lost. A false result implies the connection is still | |
| 95 there; a result of 0 implies no write was done, and a result of None | |
| 96 indicates that a write was done. | |
| 97 """ | |
| 98 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: | |
| 99 # If there is currently less than SEND_LIMIT bytes left to send | |
| 100 # in the string, extend it with the array data. | |
| 101 self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(sel
f._tempDataBuffer) | |
| 102 self.offset = 0 | |
| 103 self._tempDataBuffer = [] | |
| 104 self._tempDataLen = 0 | |
| 105 | |
| 106 # Send as much data as you can. | |
| 107 if self.offset: | |
| 108 l = self.writeSomeData(buffer(self.dataBuffer, self.offset)) | |
| 109 else: | |
| 110 l = self.writeSomeData(self.dataBuffer) | |
| 111 if l < 0 or isinstance(l, Exception): | |
| 112 return l | |
| 113 if l == 0 and self.dataBuffer: | |
| 114 result = 0 | |
| 115 else: | |
| 116 result = None | |
| 117 self.offset += l | |
| 118 # If there is nothing left to send, | |
| 119 if self.offset == len(self.dataBuffer) and not self._tempDataLen: | |
| 120 self.dataBuffer = "" | |
| 121 self.offset = 0 | |
| 122 # stop writing. | |
| 123 self.stopWriting() | |
| 124 # If I've got a producer who is supposed to supply me with data, | |
| 125 if self.producer is not None and ((not self.streamingProducer) | |
| 126 or self.producerPaused): | |
| 127 # tell them to supply some more. | |
| 128 self.producerPaused = 0 | |
| 129 self.producer.resumeProducing() | |
| 130 elif self.disconnecting: | |
| 131 # But if I was previously asked to let the connection die, do | |
| 132 # so. | |
| 133 return self._postLoseConnection() | |
| 134 elif self._writeDisconnecting: | |
| 135 # I was previously asked to to half-close the connection. | |
| 136 result = self._closeWriteConnection() | |
| 137 self._writeDisconnected = True | |
| 138 return result | |
| 139 return result | |
| 140 | |
| 141 def _postLoseConnection(self): | |
| 142 """Called after a loseConnection(), when all data has been written. | |
| 143 | |
| 144 Whatever this returns is then returned by doWrite. | |
| 145 """ | |
| 146 # default implementation, telling reactor we're finished | |
| 147 return main.CONNECTION_DONE | |
| 148 | |
| 149 def _closeWriteConnection(self): | |
| 150 # override in subclasses | |
| 151 pass | |
| 152 | |
| 153 def writeConnectionLost(self, reason): | |
| 154 # in current code should never be called | |
| 155 self.connectionLost(reason) | |
| 156 | |
| 157 def readConnectionLost(self, reason): | |
| 158 # override in subclasses | |
| 159 self.connectionLost(reason) | |
| 160 | |
| 161 def write(self, data): | |
| 162 """Reliably write some data. | |
| 163 | |
| 164 The data is buffered until the underlying file descriptor is ready | |
| 165 for writing. If there is more than C{self.bufferSize} data in the | |
| 166 buffer and this descriptor has a registered streaming producer, its | |
| 167 C{pauseProducing()} method will be called. | |
| 168 """ | |
| 169 if isinstance(data, unicode): # no, really, I mean it | |
| 170 raise TypeError("Data must not be unicode") | |
| 171 if not self.connected or self._writeDisconnected: | |
| 172 return | |
| 173 if data: | |
| 174 self._tempDataBuffer.append(data) | |
| 175 self._tempDataLen += len(data) | |
| 176 # If we are responsible for pausing our producer, | |
| 177 if self.producer is not None and self.streamingProducer: | |
| 178 # and our buffer is full, | |
| 179 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize: | |
| 180 # pause it. | |
| 181 self.producerPaused = 1 | |
| 182 self.producer.pauseProducing() | |
| 183 self.startWriting() | |
| 184 | |
| 185 def writeSequence(self, iovec): | |
| 186 """Reliably write a sequence of data. | |
| 187 | |
| 188 Currently, this is a convenience method roughly equivalent to:: | |
| 189 | |
| 190 for chunk in iovec: | |
| 191 fd.write(chunk) | |
| 192 | |
| 193 It may have a more efficient implementation at a later time or in a | |
| 194 different reactor. | |
| 195 | |
| 196 As with the C{write()} method, if a buffer size limit is reached and a | |
| 197 streaming producer is registered, it will be paused until the buffered | |
| 198 data is written to the underlying file descriptor. | |
| 199 """ | |
| 200 if not self.connected or not iovec or self._writeDisconnected: | |
| 201 return | |
| 202 self._tempDataBuffer.extend(iovec) | |
| 203 for i in iovec: | |
| 204 self._tempDataLen += len(i) | |
| 205 # If we are responsible for pausing our producer, | |
| 206 if self.producer is not None and self.streamingProducer: | |
| 207 # and our buffer is full, | |
| 208 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize: | |
| 209 # pause it. | |
| 210 self.producerPaused = 1 | |
| 211 self.producer.pauseProducing() | |
| 212 self.startWriting() | |
| 213 | |
| 214 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): | |
| 215 """Close the connection at the next available opportunity. | |
| 216 | |
| 217 Call this to cause this FileDescriptor to lose its connection. It will | |
| 218 first write any data that it has buffered. | |
| 219 | |
| 220 If there is data buffered yet to be written, this method will cause the | |
| 221 transport to lose its connection as soon as it's done flushing its | |
| 222 write buffer. If you have a producer registered, the connection won't | |
| 223 be closed until the producer is finished. Therefore, make sure you | |
| 224 unregister your producer when it's finished, or the connection will | |
| 225 never close. | |
| 226 """ | |
| 227 | |
| 228 if self.connected and not self.disconnecting: | |
| 229 if self._writeDisconnected: | |
| 230 # doWrite won't trigger the connection close anymore | |
| 231 self.stopReading() | |
| 232 self.stopWriting() | |
| 233 self.connectionLost(_connDone) | |
| 234 else: | |
| 235 self.stopReading() | |
| 236 self.startWriting() | |
| 237 self.disconnecting = 1 | |
| 238 | |
| 239 def loseWriteConnection(self): | |
| 240 self._writeDisconnecting = True | |
| 241 self.startWriting() | |
| 242 | |
| 243 def stopReading(self): | |
| 244 """Stop waiting for read availability. | |
| 245 | |
| 246 Call this to remove this selectable from being notified when it is | |
| 247 ready for reading. | |
| 248 """ | |
| 249 self.reactor.removeReader(self) | |
| 250 | |
| 251 def stopWriting(self): | |
| 252 """Stop waiting for write availability. | |
| 253 | |
| 254 Call this to remove this selectable from being notified when it is ready | |
| 255 for writing. | |
| 256 """ | |
| 257 self.reactor.removeWriter(self) | |
| 258 | |
| 259 def startReading(self): | |
| 260 """Start waiting for read availability. | |
| 261 """ | |
| 262 self.reactor.addReader(self) | |
| 263 | |
| 264 def startWriting(self): | |
| 265 """Start waiting for write availability. | |
| 266 | |
| 267 Call this to have this FileDescriptor be notified whenever it is ready f
or | |
| 268 writing. | |
| 269 """ | |
| 270 self.reactor.addWriter(self) | |
| 271 | |
| 272 # Producer/consumer implementation | |
| 273 | |
| 274 # first, the consumer stuff. This requires no additional work, as | |
| 275 # any object you can write to can be a consumer, really. | |
| 276 | |
| 277 producer = None | |
| 278 bufferSize = 2**2**2**2 | |
| 279 | |
| 280 def registerProducer(self, producer, streaming): | |
| 281 """Register to receive data from a producer. | |
| 282 | |
| 283 This sets this selectable to be a consumer for a producer. When this | |
| 284 selectable runs out of data on a write() call, it will ask the producer | |
| 285 to resumeProducing(). When the FileDescriptor's internal data buffer is | |
| 286 filled, it will ask the producer to pauseProducing(). If the connection | |
| 287 is lost, FileDescriptor calls producer's stopProducing() method. | |
| 288 | |
| 289 If streaming is true, the producer should provide the IPushProducer | |
| 290 interface. Otherwise, it is assumed that producer provides the | |
| 291 IPullProducer interface. In this case, the producer won't be asked | |
| 292 to pauseProducing(), but it has to be careful to write() data only | |
| 293 when its resumeProducing() method is called. | |
| 294 """ | |
| 295 if self.producer is not None: | |
| 296 raise RuntimeError("Cannot register producer %s, because producer %s
was never unregistered." % (producer, self.producer)) | |
| 297 if self.disconnected: | |
| 298 producer.stopProducing() | |
| 299 else: | |
| 300 self.producer = producer | |
| 301 self.streamingProducer = streaming | |
| 302 if not streaming: | |
| 303 producer.resumeProducing() | |
| 304 | |
| 305 def unregisterProducer(self): | |
| 306 """Stop consuming data from a producer, without disconnecting. | |
| 307 """ | |
| 308 self.producer = None | |
| 309 | |
| 310 def stopConsuming(self): | |
| 311 """Stop consuming data. | |
| 312 | |
| 313 This is called when a producer has lost its connection, to tell the | |
| 314 consumer to go lose its connection (and break potential circular | |
| 315 references). | |
| 316 """ | |
| 317 self.unregisterProducer() | |
| 318 self.loseConnection() | |
| 319 | |
| 320 # producer interface implementation | |
| 321 | |
| 322 def resumeProducing(self): | |
| 323 assert self.connected and not self.disconnecting | |
| 324 self.startReading() | |
| 325 | |
| 326 def pauseProducing(self): | |
| 327 self.stopReading() | |
| 328 | |
| 329 def stopProducing(self): | |
| 330 self.loseConnection() | |
| 331 | |
| 332 | |
| 333 def fileno(self): | |
| 334 """File Descriptor number for select(). | |
| 335 | |
| 336 This method must be overridden or assigned in subclasses to | |
| 337 indicate a valid file descriptor for the operating system. | |
| 338 """ | |
| 339 return -1 | |
| 340 | |
| 341 | |
| 342 def isIPAddress(addr): | |
| 343 """ | |
| 344 Determine whether the given string represents an IPv4 address. | |
| 345 | |
| 346 @type addr: C{str} | |
| 347 @param addr: A string which may or may not be the decimal dotted | |
| 348 representation of an IPv4 address. | |
| 349 | |
| 350 @rtype: C{bool} | |
| 351 @return: C{True} if C{addr} represents an IPv4 address, C{False} | |
| 352 otherwise. | |
| 353 """ | |
| 354 dottedParts = addr.split('.') | |
| 355 if len(dottedParts) == 4: | |
| 356 for octet in dottedParts: | |
| 357 try: | |
| 358 value = int(octet) | |
| 359 except ValueError: | |
| 360 return False | |
| 361 else: | |
| 362 if value < 0 or value > 255: | |
| 363 return False | |
| 364 return True | |
| 365 return False | |
| 366 | |
| 367 | |
| 368 __all__ = ["FileDescriptor"] | |
| OLD | NEW |