OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_factories -*- | |
2 # | |
3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """ | |
8 Standard implementations of Twisted protocol-related interfaces. | |
9 | |
10 Start here if you are looking to write a new protocol implementation for | |
11 Twisted. The Protocol class contains some introductory material. | |
12 | |
13 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
14 """ | |
15 | |
16 import random | |
17 from zope.interface import implements | |
18 | |
19 # Twisted Imports | |
20 from twisted.python import log, failure, components | |
21 from twisted.internet import interfaces, error, defer | |
22 | |
23 | |
24 class Factory: | |
25 """This is a factory which produces protocols. | |
26 | |
27 By default, buildProtocol will create a protocol of the class given in | |
28 self.protocol. | |
29 """ | |
30 | |
31 implements(interfaces.IProtocolFactory) | |
32 | |
33 # put a subclass of Protocol here: | |
34 protocol = None | |
35 | |
36 numPorts = 0 | |
37 noisy = True | |
38 | |
39 def doStart(self): | |
40 """Make sure startFactory is called. | |
41 | |
42 Users should not call this function themselves! | |
43 """ | |
44 if not self.numPorts: | |
45 if self.noisy: | |
46 log.msg("Starting factory %r" % self) | |
47 self.startFactory() | |
48 self.numPorts = self.numPorts + 1 | |
49 | |
50 def doStop(self): | |
51 """Make sure stopFactory is called. | |
52 | |
53 Users should not call this function themselves! | |
54 """ | |
55 if self.numPorts == 0: | |
56 # this shouldn't happen, but does sometimes and this is better | |
57 # than blowing up in assert as we did previously. | |
58 return | |
59 self.numPorts = self.numPorts - 1 | |
60 if not self.numPorts: | |
61 if self.noisy: | |
62 log.msg("Stopping factory %r" % self) | |
63 self.stopFactory() | |
64 | |
65 def startFactory(self): | |
66 """This will be called before I begin listening on a Port or Connector. | |
67 | |
68 It will only be called once, even if the factory is connected | |
69 to multiple ports. | |
70 | |
71 This can be used to perform 'unserialization' tasks that | |
72 are best put off until things are actually running, such | |
73 as connecting to a database, opening files, etcetera. | |
74 """ | |
75 | |
76 def stopFactory(self): | |
77 """This will be called before I stop listening on all Ports/Connectors. | |
78 | |
79 This can be overridden to perform 'shutdown' tasks such as disconnecting | |
80 database connections, closing files, etc. | |
81 | |
82 It will be called, for example, before an application shuts down, | |
83 if it was connected to a port. User code should not call this function | |
84 directly. | |
85 """ | |
86 | |
87 def buildProtocol(self, addr): | |
88 """Create an instance of a subclass of Protocol. | |
89 | |
90 The returned instance will handle input on an incoming server | |
91 connection, and an attribute \"factory\" pointing to the creating | |
92 factory. | |
93 | |
94 Override this method to alter how Protocol instances get created. | |
95 | |
96 @param addr: an object implementing L{twisted.internet.interfaces.IAddre
ss} | |
97 """ | |
98 p = self.protocol() | |
99 p.factory = self | |
100 return p | |
101 | |
102 | |
103 class ClientFactory(Factory): | |
104 """A Protocol factory for clients. | |
105 | |
106 This can be used together with the various connectXXX methods in | |
107 reactors. | |
108 """ | |
109 | |
110 def startedConnecting(self, connector): | |
111 """Called when a connection has been started. | |
112 | |
113 You can call connector.stopConnecting() to stop the connection attempt. | |
114 | |
115 @param connector: a Connector object. | |
116 """ | |
117 | |
118 def clientConnectionFailed(self, connector, reason): | |
119 """Called when a connection has failed to connect. | |
120 | |
121 It may be useful to call connector.connect() - this will reconnect. | |
122 | |
123 @type reason: L{twisted.python.failure.Failure} | |
124 """ | |
125 | |
126 def clientConnectionLost(self, connector, reason): | |
127 """Called when an established connection is lost. | |
128 | |
129 It may be useful to call connector.connect() - this will reconnect. | |
130 | |
131 @type reason: L{twisted.python.failure.Failure} | |
132 """ | |
133 | |
134 | |
135 class _InstanceFactory(ClientFactory): | |
136 """Factory used by ClientCreator.""" | |
137 | |
138 noisy = False | |
139 | |
140 def __init__(self, reactor, instance, deferred): | |
141 self.reactor = reactor | |
142 self.instance = instance | |
143 self.deferred = deferred | |
144 | |
145 def __repr__(self): | |
146 return "<ClientCreator factory: %r>" % (self.instance, ) | |
147 | |
148 def buildProtocol(self, addr): | |
149 self.reactor.callLater(0, self.deferred.callback, self.instance) | |
150 del self.deferred | |
151 return self.instance | |
152 | |
153 def clientConnectionFailed(self, connector, reason): | |
154 self.reactor.callLater(0, self.deferred.errback, reason) | |
155 del self.deferred | |
156 | |
157 | |
158 class ClientCreator: | |
159 """Client connections that do not require a factory. | |
160 | |
161 The various connect* methods create a protocol instance using the given | |
162 protocol class and arguments, and connect it, returning a Deferred of the | |
163 resulting protocol instance. | |
164 | |
165 Useful for cases when we don't really need a factory. Mainly this | |
166 is when there is no shared state between protocol instances, and no need | |
167 to reconnect. | |
168 """ | |
169 | |
170 def __init__(self, reactor, protocolClass, *args, **kwargs): | |
171 self.reactor = reactor | |
172 self.protocolClass = protocolClass | |
173 self.args = args | |
174 self.kwargs = kwargs | |
175 | |
176 def connectTCP(self, host, port, timeout=30, bindAddress=None): | |
177 """Connect to remote host, return Deferred of resulting protocol instanc
e.""" | |
178 d = defer.Deferred() | |
179 f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self
.kwargs), d) | |
180 self.reactor.connectTCP(host, port, f, timeout=timeout, bindAddress=bind
Address) | |
181 return d | |
182 | |
183 def connectUNIX(self, address, timeout = 30, checkPID=0): | |
184 """Connect to Unix socket, return Deferred of resulting protocol instanc
e.""" | |
185 d = defer.Deferred() | |
186 f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self
.kwargs), d) | |
187 self.reactor.connectUNIX(address, f, timeout = timeout, checkPID=checkPI
D) | |
188 return d | |
189 | |
190 def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=Non
e): | |
191 """Connect to SSL server, return Deferred of resulting protocol instance
.""" | |
192 d = defer.Deferred() | |
193 f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self
.kwargs), d) | |
194 self.reactor.connectSSL(host, port, f, contextFactory, timeout=timeout,
bindAddress=bindAddress) | |
195 return d | |
196 | |
197 | |
198 class ReconnectingClientFactory(ClientFactory): | |
199 """My clients auto-reconnect with an exponential back-off. | |
200 | |
201 Note that clients should call my resetDelay method after they have | |
202 connected successfully. | |
203 | |
204 @ivar maxDelay: Maximum number of seconds between connection attempts. | |
205 @ivar initialDelay: Delay for the first reconnection attempt. | |
206 @ivar factor: a multiplicitive factor by which the delay grows | |
207 @ivar jitter: percentage of randomness to introduce into the delay length | |
208 to prevent stampeding. | |
209 """ | |
210 maxDelay = 3600 | |
211 initialDelay = 1.0 | |
212 # Note: These highly sensitive factors have been precisely measured by | |
213 # the National Institute of Science and Technology. Take extreme care | |
214 # in altering them, or you may damage your Internet! | |
215 factor = 2.7182818284590451 # (math.e) | |
216 # Phi = 1.6180339887498948 # (Phi is acceptable for use as a | |
217 # factor if e is too large for your application.) | |
218 jitter = 0.11962656492 # molar Planck constant times c, Jule meter/mole | |
219 | |
220 delay = initialDelay | |
221 retries = 0 | |
222 maxRetries = None | |
223 _callID = None | |
224 connector = None | |
225 | |
226 continueTrying = 1 | |
227 | |
228 def clientConnectionFailed(self, connector, reason): | |
229 if self.continueTrying: | |
230 self.connector = connector | |
231 self.retry() | |
232 | |
233 def clientConnectionLost(self, connector, unused_reason): | |
234 if self.continueTrying: | |
235 self.connector = connector | |
236 self.retry() | |
237 | |
238 def retry(self, connector=None): | |
239 """Have this connector connect again, after a suitable delay. | |
240 """ | |
241 if not self.continueTrying: | |
242 if self.noisy: | |
243 log.msg("Abandoning %s on explicit request" % (connector,)) | |
244 return | |
245 | |
246 if connector is None: | |
247 if self.connector is None: | |
248 raise ValueError("no connector to retry") | |
249 else: | |
250 connector = self.connector | |
251 | |
252 self.retries += 1 | |
253 if self.maxRetries is not None and (self.retries > self.maxRetries): | |
254 if self.noisy: | |
255 log.msg("Abandoning %s after %d retries." % | |
256 (connector, self.retries)) | |
257 return | |
258 | |
259 self.delay = min(self.delay * self.factor, self.maxDelay) | |
260 if self.jitter: | |
261 self.delay = random.normalvariate(self.delay, | |
262 self.delay * self.jitter) | |
263 | |
264 if self.noisy: | |
265 log.msg("%s will retry in %d seconds" % (connector, self.delay,)) | |
266 from twisted.internet import reactor | |
267 | |
268 def reconnector(): | |
269 self._callID = None | |
270 connector.connect() | |
271 self._callID = reactor.callLater(self.delay, reconnector) | |
272 | |
273 def stopTrying(self): | |
274 """I put a stop to any attempt to reconnect in progress. | |
275 """ | |
276 # ??? Is this function really stopFactory? | |
277 if self._callID: | |
278 self._callID.cancel() | |
279 self._callID = None | |
280 if self.connector: | |
281 # Hopefully this doesn't just make clientConnectionFailed | |
282 # retry again. | |
283 try: | |
284 self.connector.stopConnecting() | |
285 except error.NotConnectingError: | |
286 pass | |
287 self.continueTrying = 0 | |
288 | |
289 def resetDelay(self): | |
290 """Call me after a successful connection to reset. | |
291 | |
292 I reset the delay and the retry counter. | |
293 """ | |
294 self.delay = self.initialDelay | |
295 self.retries = 0 | |
296 self._callID = None | |
297 self.continueTrying = 1 | |
298 | |
299 | |
300 def __getstate__(self): | |
301 """ | |
302 Remove all of the state which is mutated by connection attempts and | |
303 failures, returning just the state which describes how reconnections | |
304 should be attempted. This will make the unserialized instance | |
305 behave just as this one did when it was first instantiated. | |
306 """ | |
307 state = self.__dict__.copy() | |
308 for key in ['connector', 'retries', 'delay', | |
309 'continueTrying', '_callID']: | |
310 if key in state: | |
311 del state[key] | |
312 return state | |
313 | |
314 | |
315 | |
316 class ServerFactory(Factory): | |
317 """Subclass this to indicate that your protocol.Factory is only usable for s
ervers. | |
318 """ | |
319 | |
320 | |
321 class BaseProtocol: | |
322 """This is the abstract superclass of all protocols. | |
323 | |
324 If you are going to write a new protocol for Twisted, start here. The | |
325 docstrings of this class explain how you can get started. Any protocol | |
326 implementation, either client or server, should be a subclass of me. | |
327 | |
328 My API is quite simple. Implement dataReceived(data) to handle both | |
329 event-based and synchronous input; output can be sent through the | |
330 'transport' attribute, which is to be an instance that implements | |
331 L{twisted.internet.interfaces.ITransport}. | |
332 | |
333 Some subclasses exist already to help you write common types of protocols: | |
334 see the L{twisted.protocols.basic} module for a few of them. | |
335 """ | |
336 | |
337 connected = 0 | |
338 transport = None | |
339 | |
340 def makeConnection(self, transport): | |
341 """Make a connection to a transport and a server. | |
342 | |
343 This sets the 'transport' attribute of this Protocol, and calls the | |
344 connectionMade() callback. | |
345 """ | |
346 self.connected = 1 | |
347 self.transport = transport | |
348 self.connectionMade() | |
349 | |
350 def connectionMade(self): | |
351 """Called when a connection is made. | |
352 | |
353 This may be considered the initializer of the protocol, because | |
354 it is called when the connection is completed. For clients, | |
355 this is called once the connection to the server has been | |
356 established; for servers, this is called after an accept() call | |
357 stops blocking and a socket has been received. If you need to | |
358 send any greeting or initial message, do it here. | |
359 """ | |
360 | |
361 connectionDone=failure.Failure(error.ConnectionDone()) | |
362 connectionDone.cleanFailure() | |
363 | |
364 | |
365 class Protocol(BaseProtocol): | |
366 | |
367 implements(interfaces.IProtocol) | |
368 | |
369 def dataReceived(self, data): | |
370 """Called whenever data is received. | |
371 | |
372 Use this method to translate to a higher-level message. Usually, some | |
373 callback will be made upon the receipt of each complete protocol | |
374 message. | |
375 | |
376 @param data: a string of indeterminate length. Please keep in mind | |
377 that you will probably need to buffer some data, as partial | |
378 (or multiple) protocol messages may be received! I recommend | |
379 that unit tests for protocols call through to this method with | |
380 differing chunk sizes, down to one byte at a time. | |
381 """ | |
382 | |
383 def connectionLost(self, reason=connectionDone): | |
384 """Called when the connection is shut down. | |
385 | |
386 Clear any circular references here, and any external references | |
387 to this Protocol. The connection has been closed. | |
388 | |
389 @type reason: L{twisted.python.failure.Failure} | |
390 """ | |
391 | |
392 | |
393 class ProtocolToConsumerAdapter(components.Adapter): | |
394 implements(interfaces.IConsumer) | |
395 | |
396 def write(self, data): | |
397 self.original.dataReceived(data) | |
398 | |
399 def registerProducer(self, producer, streaming): | |
400 pass | |
401 | |
402 def unregisterProducer(self): | |
403 pass | |
404 | |
405 components.registerAdapter(ProtocolToConsumerAdapter, interfaces.IProtocol, | |
406 interfaces.IConsumer) | |
407 | |
408 class ConsumerToProtocolAdapter(components.Adapter): | |
409 implements(interfaces.IProtocol) | |
410 | |
411 def dataReceived(self, data): | |
412 self.original.write(data) | |
413 | |
414 def connectionLost(self, reason): | |
415 pass | |
416 | |
417 def makeConnection(self, transport): | |
418 pass | |
419 | |
420 def connectionMade(self): | |
421 pass | |
422 | |
423 components.registerAdapter(ConsumerToProtocolAdapter, interfaces.IConsumer, | |
424 interfaces.IProtocol) | |
425 | |
426 class ProcessProtocol(BaseProtocol): | |
427 """ | |
428 Base process protocol implementation which does simple dispatching for | |
429 stdin, stdout, and stderr file descriptors. | |
430 """ | |
431 implements(interfaces.IProcessProtocol) | |
432 | |
433 def childDataReceived(self, childFD, data): | |
434 if childFD == 1: | |
435 self.outReceived(data) | |
436 elif childFD == 2: | |
437 self.errReceived(data) | |
438 | |
439 | |
440 def outReceived(self, data): | |
441 """ | |
442 Some data was received from stdout. | |
443 """ | |
444 | |
445 | |
446 def errReceived(self, data): | |
447 """ | |
448 Some data was received from stderr. | |
449 """ | |
450 | |
451 | |
452 def childConnectionLost(self, childFD): | |
453 if childFD == 0: | |
454 self.inConnectionLost() | |
455 elif childFD == 1: | |
456 self.outConnectionLost() | |
457 elif childFD == 2: | |
458 self.errConnectionLost() | |
459 | |
460 | |
461 def inConnectionLost(self): | |
462 """ | |
463 This will be called when stdin is closed. | |
464 """ | |
465 | |
466 | |
467 def outConnectionLost(self): | |
468 """ | |
469 This will be called when stdout is closed. | |
470 """ | |
471 | |
472 | |
473 def errConnectionLost(self): | |
474 """ | |
475 This will be called when stderr is closed. | |
476 """ | |
477 | |
478 | |
479 def processEnded(self, reason): | |
480 """ | |
481 This will be called when the subprocess is finished. | |
482 | |
483 @type reason: L{twisted.python.failure.Failure} | |
484 """ | |
485 | |
486 | |
487 class AbstractDatagramProtocol: | |
488 """ | |
489 Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP, UDP. | |
490 """ | |
491 | |
492 transport = None | |
493 numPorts = 0 | |
494 noisy = True | |
495 | |
496 def __getstate__(self): | |
497 d = self.__dict__.copy() | |
498 d['transport'] = None | |
499 return d | |
500 | |
501 def doStart(self): | |
502 """Make sure startProtocol is called. | |
503 | |
504 This will be called by makeConnection(), users should not call it. | |
505 """ | |
506 if not self.numPorts: | |
507 if self.noisy: | |
508 log.msg("Starting protocol %s" % self) | |
509 self.startProtocol() | |
510 self.numPorts = self.numPorts + 1 | |
511 | |
512 def doStop(self): | |
513 """Make sure stopProtocol is called. | |
514 | |
515 This will be called by the port, users should not call it. | |
516 """ | |
517 assert self.numPorts > 0 | |
518 self.numPorts = self.numPorts - 1 | |
519 self.transport = None | |
520 if not self.numPorts: | |
521 if self.noisy: | |
522 log.msg("Stopping protocol %s" % self) | |
523 self.stopProtocol() | |
524 | |
525 def startProtocol(self): | |
526 """Called when a transport is connected to this protocol. | |
527 | |
528 Will only be called once, even if multiple ports are connected. | |
529 """ | |
530 | |
531 def stopProtocol(self): | |
532 """Called when the transport is disconnected. | |
533 | |
534 Will only be called once, after all ports are disconnected. | |
535 """ | |
536 | |
537 def makeConnection(self, transport): | |
538 """Make a connection to a transport and a server. | |
539 | |
540 This sets the 'transport' attribute of this DatagramProtocol, and calls
the | |
541 doStart() callback. | |
542 """ | |
543 assert self.transport == None | |
544 self.transport = transport | |
545 self.doStart() | |
546 | |
547 def datagramReceived(self, datagram, addr): | |
548 """Called when a datagram is received. | |
549 | |
550 @param datagram: the string received from the transport. | |
551 @param addr: tuple of source of datagram. | |
552 """ | |
553 | |
554 | |
555 class DatagramProtocol(AbstractDatagramProtocol): | |
556 """ | |
557 Protocol for datagram-oriented transport, e.g. UDP. | |
558 | |
559 @type transport: C{NoneType} or | |
560 L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider | |
561 @ivar transport: The transport with which this protocol is associated, | |
562 if it is associated with one. | |
563 """ | |
564 | |
565 def connectionRefused(self): | |
566 """Called due to error from write in connected mode. | |
567 | |
568 Note this is a result of ICMP message generated by *previous* | |
569 write. | |
570 """ | |
571 | |
572 | |
573 class ConnectedDatagramProtocol(DatagramProtocol): | |
574 """Protocol for connected datagram-oriented transport. | |
575 | |
576 No longer necessary for UDP. | |
577 """ | |
578 | |
579 def datagramReceived(self, datagram): | |
580 """Called when a datagram is received. | |
581 | |
582 @param datagram: the string received from the transport. | |
583 """ | |
584 | |
585 def connectionFailed(self, failure): | |
586 """Called if connecting failed. | |
587 | |
588 Usually this will be due to a DNS lookup failure. | |
589 """ | |
590 | |
591 | |
592 | |
593 class FileWrapper: | |
594 """A wrapper around a file-like object to make it behave as a Transport. | |
595 | |
596 This doesn't actually stream the file to the attached protocol, | |
597 and is thus useful mainly as a utility for debugging protocols. | |
598 """ | |
599 | |
600 implements(interfaces.ITransport) | |
601 | |
602 closed = 0 | |
603 disconnecting = 0 | |
604 producer = None | |
605 streamingProducer = 0 | |
606 | |
607 def __init__(self, file): | |
608 self.file = file | |
609 | |
610 def write(self, data): | |
611 try: | |
612 self.file.write(data) | |
613 except: | |
614 self.handleException() | |
615 # self._checkProducer() | |
616 | |
617 def _checkProducer(self): | |
618 # Cheating; this is called at "idle" times to allow producers to be | |
619 # found and dealt with | |
620 if self.producer: | |
621 self.producer.resumeProducing() | |
622 | |
623 def registerProducer(self, producer, streaming): | |
624 """From abstract.FileDescriptor | |
625 """ | |
626 self.producer = producer | |
627 self.streamingProducer = streaming | |
628 if not streaming: | |
629 producer.resumeProducing() | |
630 | |
631 def unregisterProducer(self): | |
632 self.producer = None | |
633 | |
634 def stopConsuming(self): | |
635 self.unregisterProducer() | |
636 self.loseConnection() | |
637 | |
638 def writeSequence(self, iovec): | |
639 self.write("".join(iovec)) | |
640 | |
641 def loseConnection(self): | |
642 self.closed = 1 | |
643 try: | |
644 self.file.close() | |
645 except (IOError, OSError): | |
646 self.handleException() | |
647 | |
648 def getPeer(self): | |
649 # XXX: According to ITransport, this should return an IAddress! | |
650 return 'file', 'file' | |
651 | |
652 def getHost(self): | |
653 # XXX: According to ITransport, this should return an IAddress! | |
654 return 'file' | |
655 | |
656 def handleException(self): | |
657 pass | |
658 | |
659 def resumeProducing(self): | |
660 # Never sends data anyways | |
661 pass | |
662 | |
663 def pauseProducing(self): | |
664 # Never sends data anyways | |
665 pass | |
666 | |
667 def stopProducing(self): | |
668 self.loseConnection() | |
669 | |
670 | |
671 __all__ = ["Factory", "ClientFactory", "ReconnectingClientFactory", "connectionD
one", | |
672 "Protocol", "ProcessProtocol", "FileWrapper", "ServerFactory", | |
673 "AbstractDatagramProtocol", "DatagramProtocol", "ConnectedDatagramPro
tocol", | |
674 "ClientCreator"] | |
OLD | NEW |