Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(271)

Side by Side Diff: third_party/twisted_8_1/twisted/words/protocols/jabber/xmlstream.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.words.test.test_jabberxmlstream -*-
2 #
3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6 """
7 XMPP XML Streams
8
9 Building blocks for setting up XML Streams, including helping classes for
10 doing authentication on either client or server side, and working with XML
11 Stanzas.
12 """
13
14 from zope.interface import directlyProvides, implements
15
16 from twisted.internet import defer
17 from twisted.internet.error import ConnectionLost
18 from twisted.python import failure, log
19 from twisted.words.protocols.jabber import error, ijabber, jid
20 from twisted.words.xish import domish, xmlstream
21 from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT
22 from twisted.words.xish.xmlstream import STREAM_START_EVENT
23 from twisted.words.xish.xmlstream import STREAM_END_EVENT
24 from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT
25
26 try:
27 from twisted.internet import ssl
28 except ImportError:
29 ssl = None
30 if ssl and not ssl.supported:
31 ssl = None
32
33 STREAM_AUTHD_EVENT = intern("//event/stream/authd")
34 INIT_FAILED_EVENT = intern("//event/xmpp/initfailed")
35
36 NS_STREAMS = 'http://etherx.jabber.org/streams'
37 NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
38
39 Reset = object()
40
41 def hashPassword(sid, password):
42 """
43 Create a SHA1-digest string of a session identifier and password.
44 """
45 import sha
46 return sha.new("%s%s" % (sid, password)).hexdigest()
47
48
49
50 class Authenticator:
51 """
52 Base class for business logic of initializing an XmlStream
53
54 Subclass this object to enable an XmlStream to initialize and authenticate
55 to different types of stream hosts (such as clients, components, etc.).
56
57 Rules:
58 1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the
59 stream has been completely initialized.
60 2. The Authenticator SHOULD reset all state information when
61 L{associateWithStream} is called.
62 3. The Authenticator SHOULD override L{streamStarted}, and start
63 initialization there.
64
65 @type xmlstream: L{XmlStream}
66 @ivar xmlstream: The XmlStream that needs authentication
67
68 @note: the term authenticator is historical. Authenticators perform
69 all steps required to prepare the stream for the exchange
70 of XML stanzas.
71 """
72
73 def __init__(self):
74 self.xmlstream = None
75
76
77 def connectionMade(self):
78 """
79 Called by the XmlStream when the underlying socket connection is
80 in place.
81
82 This allows the Authenticator to send an initial root element, if it's
83 connecting, or wait for an inbound root from the peer if it's accepting
84 the connection.
85
86 Subclasses can use self.xmlstream.send() to send any initial data to
87 the peer.
88 """
89
90
91 def streamStarted(self, rootElement):
92 """
93 Called by the XmlStream when the stream has started.
94
95 A stream is considered to have started when the start tag of the root
96 element has been received.
97
98 This examines L{rootElement} to see if there is a version attribute.
99 If absent, C{0.0} is assumed per RFC 3920. Subsequently, the
100 minimum of the version from the received stream header and the
101 value stored in L{xmlstream} is taken and put back in {xmlstream}.
102
103 Extensions of this method can extract more information from the
104 stream header and perform checks on them, optionally sending
105 stream errors and closing the stream.
106 """
107 if rootElement.hasAttribute("version"):
108 version = rootElement["version"].split(".")
109 try:
110 version = (int(version[0]), int(version[1]))
111 except (IndexError, ValueError):
112 version = (0, 0)
113 else:
114 version = (0, 0)
115
116 self.xmlstream.version = min(self.xmlstream.version, version)
117
118
119 def associateWithStream(self, xmlstream):
120 """
121 Called by the XmlStreamFactory when a connection has been made
122 to the requested peer, and an XmlStream object has been
123 instantiated.
124
125 The default implementation just saves a handle to the new
126 XmlStream.
127
128 @type xmlstream: L{XmlStream}
129 @param xmlstream: The XmlStream that will be passing events to this
130 Authenticator.
131
132 """
133 self.xmlstream = xmlstream
134
135
136
137 class ConnectAuthenticator(Authenticator):
138 """
139 Authenticator for initiating entities.
140 """
141
142 namespace = None
143
144 def __init__(self, otherHost):
145 self.otherHost = otherHost
146
147
148 def connectionMade(self):
149 self.xmlstream.namespace = self.namespace
150 self.xmlstream.otherEntity = jid.internJID(self.otherHost)
151 self.xmlstream.sendHeader()
152
153
154 def initializeStream(self):
155 """
156 Perform stream initialization procedures.
157
158 An L{XmlStream} holds a list of initializer objects in its
159 C{initializers} attribute. This method calls these initializers in
160 order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has
161 been successfully processed. Otherwise it dispatches the
162 C{INIT_FAILED_EVENT} event with the failure.
163
164 Initializers may return the special L{Reset} object to halt the
165 initialization processing. It signals that the current initializer was
166 successfully processed, but that the XML Stream has been reset. An
167 example is the TLSInitiatingInitializer.
168 """
169
170 def remove_first(result):
171 self.xmlstream.initializers.pop(0)
172
173 return result
174
175 def do_next(result):
176 """
177 Take the first initializer and process it.
178
179 On success, the initializer is removed from the list and
180 then next initializer will be tried.
181 """
182
183 if result is Reset:
184 return None
185
186 try:
187 init = self.xmlstream.initializers[0]
188 except IndexError:
189 self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT)
190 return None
191 else:
192 d = defer.maybeDeferred(init.initialize)
193 d.addCallback(remove_first)
194 d.addCallback(do_next)
195 return d
196
197 d = defer.succeed(None)
198 d.addCallback(do_next)
199 d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT)
200
201
202 def streamStarted(self, rootElement):
203 """
204 Called by the XmlStream when the stream has started.
205
206 This extends L{Authenticator.streamStarted} to extract further stream
207 headers from L{rootElement}, optionally wait for stream features being
208 received and then call C{initializeStream}.
209 """
210
211 Authenticator.streamStarted(self, rootElement)
212
213 self.xmlstream.sid = rootElement.getAttribute("id")
214
215 if rootElement.hasAttribute("from"):
216 self.xmlstream.otherEntity = jid.internJID(rootElement["from"])
217
218 # Setup observer for stream features, if applicable
219 if self.xmlstream.version >= (1, 0):
220 def onFeatures(element):
221 features = {}
222 for feature in element.elements():
223 features[(feature.uri, feature.name)] = feature
224
225 self.xmlstream.features = features
226 self.initializeStream()
227
228 self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' %
229 NS_STREAMS,
230 onFeatures)
231 else:
232 self.initializeStream()
233
234
235
236 class ListenAuthenticator(Authenticator):
237 """
238 Authenticator for receiving entities.
239 """
240
241 namespace = None
242
243 def associateWithStream(self, xmlstream):
244 """
245 Called by the XmlStreamFactory when a connection has been made.
246
247 Extend L{Authenticator.associateWithStream} to set the L{XmlStream}
248 to be non-initiating.
249 """
250 Authenticator.associateWithStream(self, xmlstream)
251 self.xmlstream.initiating = False
252
253
254 def streamStarted(self, rootElement):
255 """
256 Called by the XmlStream when the stream has started.
257
258 This extends L{Authenticator.streamStarted} to extract further
259 information from the stream headers from L{rootElement}.
260 """
261 Authenticator.streamStarted(self, rootElement)
262
263 self.xmlstream.namespace = rootElement.defaultUri
264
265 if rootElement.hasAttribute("to"):
266 self.xmlstream.thisEntity = jid.internJID(rootElement["to"])
267
268 self.xmlstream.prefixes = {}
269 for prefix, uri in rootElement.localPrefixes.iteritems():
270 self.xmlstream.prefixes[uri] = prefix
271
272
273
274 class FeatureNotAdvertized(Exception):
275 """
276 Exception indicating a stream feature was not advertized, while required by
277 the initiating entity.
278 """
279
280
281
282 class BaseFeatureInitiatingInitializer(object):
283 """
284 Base class for initializers with a stream feature.
285
286 This assumes the associated XmlStream represents the initiating entity
287 of the connection.
288
289 @cvar feature: tuple of (uri, name) of the stream feature root element.
290 @type feature: tuple of (L{str}, L{str})
291 @ivar required: whether the stream feature is required to be advertized
292 by the receiving entity.
293 @type required: L{bool}
294 """
295
296 implements(ijabber.IInitiatingInitializer)
297
298 feature = None
299 required = False
300
301 def __init__(self, xs):
302 self.xmlstream = xs
303
304
305 def initialize(self):
306 """
307 Initiate the initialization.
308
309 Checks if the receiving entity advertizes the stream feature. If it
310 does, the initialization is started. If it is not advertized, and the
311 C{required} instance variable is L{True}, it raises
312 L{FeatureNotAdvertized}. Otherwise, the initialization silently
313 succeeds.
314 """
315
316 if self.feature in self.xmlstream.features:
317 return self.start()
318 elif self.required:
319 raise FeatureNotAdvertized
320 else:
321 return None
322
323
324 def start(self):
325 """
326 Start the actual initialization.
327
328 May return a deferred for asynchronous initialization.
329 """
330
331
332
333 class TLSError(Exception):
334 """
335 TLS base exception.
336 """
337
338
339
340 class TLSFailed(TLSError):
341 """
342 Exception indicating failed TLS negotiation
343 """
344
345
346
347 class TLSRequired(TLSError):
348 """
349 Exception indicating required TLS negotiation.
350
351 This exception is raised when the receiving entity requires TLS
352 negotiation and the initiating does not desire to negotiate TLS.
353 """
354
355
356
357 class TLSNotSupported(TLSError):
358 """
359 Exception indicating missing TLS support.
360
361 This exception is raised when the initiating entity wants and requires to
362 negotiate TLS when the OpenSSL library is not available.
363 """
364
365
366
367 class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
368 """
369 TLS stream initializer for the initiating entity.
370
371 It is strongly required to include this initializer in the list of
372 initializers for an XMPP stream. By default it will try to negotiate TLS.
373 An XMPP server may indicate that TLS is required. If TLS is not desired,
374 set the C{wanted} attribute to False instead of removing it from the list
375 of initializers, so a proper exception L{TLSRequired} can be raised.
376
377 @cvar wanted: indicates if TLS negotiation is wanted.
378 @type wanted: L{bool}
379 """
380
381 feature = (NS_XMPP_TLS, 'starttls')
382 wanted = True
383 _deferred = None
384
385 def onProceed(self, obj):
386 """
387 Proceed with TLS negotiation and reset the XML stream.
388 """
389
390 self.xmlstream.removeObserver('/failure', self.onFailure)
391 ctx = ssl.CertificateOptions()
392 self.xmlstream.transport.startTLS(ctx)
393 self.xmlstream.reset()
394 self.xmlstream.sendHeader()
395 self._deferred.callback(Reset)
396
397
398 def onFailure(self, obj):
399 self.xmlstream.removeObserver('/proceed', self.onProceed)
400 self._deferred.errback(TLSFailed())
401
402
403 def start(self):
404 """
405 Start TLS negotiation.
406
407 This checks if the receiving entity requires TLS, the SSL library is
408 available and uses the C{required} and C{wanted} instance variables to
409 determine what to do in the various different cases.
410
411 For example, if the SSL library is not available, and wanted and
412 required by the user, it raises an exception. However if it is not
413 required by both parties, initialization silently succeeds, moving
414 on to the next step.
415 """
416 if self.wanted:
417 if ssl is None:
418 if self.required:
419 return defer.fail(TLSNotSupported())
420 else:
421 return defer.succeed(None)
422 else:
423 pass
424 elif self.xmlstream.features[self.feature].required:
425 return defer.fail(TLSRequired())
426 else:
427 return defer.succeed(None)
428
429 self._deferred = defer.Deferred()
430 self.xmlstream.addOnetimeObserver("/proceed", self.onProceed)
431 self.xmlstream.addOnetimeObserver("/failure", self.onFailure)
432 self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls")))
433 return self._deferred
434
435
436
437 class XmlStream(xmlstream.XmlStream):
438 """
439 XMPP XML Stream protocol handler.
440
441 @ivar version: XML stream version as a tuple (major, minor). Initially,
442 this is set to the minimally supported version. Upon
443 receiving the stream header of the peer, it is set to the
444 minimum of that value and the version on the received
445 header.
446 @type version: (L{int}, L{int})
447 @ivar namespace: default namespace URI for stream
448 @type namespace: L{str}
449 @ivar thisEntity: JID of this entity
450 @type thisEntity: L{JID}
451 @ivar otherEntity: JID of the peer entity
452 @type otherEntity: L{JID}
453 @ivar sid: session identifier
454 @type sid: L{str}
455 @ivar initiating: True if this is the initiating stream
456 @type initiating: L{bool}
457 @ivar features: map of (uri, name) to stream features element received from
458 the receiving entity.
459 @type features: L{dict} of (L{str}, L{str}) to L{domish.Element}.
460 @ivar prefixes: map of URI to prefixes that are to appear on stream
461 header.
462 @type prefixes: L{dict} of L{str} to L{str}
463 @ivar initializers: list of stream initializer objects
464 @type initializers: L{list} of objects that provide L{IInitializer}
465 @ivar authenticator: associated authenticator that uses C{initializers} to
466 initialize the XML stream.
467 """
468
469 version = (1, 0)
470 namespace = 'invalid'
471 thisEntity = None
472 otherEntity = None
473 sid = None
474 initiating = True
475
476 _headerSent = False # True if the stream header has been sent
477
478 def __init__(self, authenticator):
479 xmlstream.XmlStream.__init__(self)
480
481 self.prefixes = {NS_STREAMS: 'stream'}
482 self.authenticator = authenticator
483 self.initializers = []
484 self.features = {}
485
486 # Reset the authenticator
487 authenticator.associateWithStream(self)
488
489
490 def _callLater(self, *args, **kwargs):
491 from twisted.internet import reactor
492 return reactor.callLater(*args, **kwargs)
493
494
495 def reset(self):
496 """
497 Reset XML Stream.
498
499 Resets the XML Parser for incoming data. This is to be used after
500 successfully negotiating a new layer, e.g. TLS and SASL. Note that
501 registered event observers will continue to be in place.
502 """
503 self._headerSent = False
504 self._initializeStream()
505
506
507 def onStreamError(self, errelem):
508 """
509 Called when a stream:error element has been received.
510
511 Dispatches a L{STREAM_ERROR_EVENT} event with the error element to
512 allow for cleanup actions and drops the connection.
513
514 @param errelem: The received error element.
515 @type errelem: L{domish.Element}
516 """
517 self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)),
518 STREAM_ERROR_EVENT)
519 self.transport.loseConnection()
520
521
522 def sendHeader(self):
523 """
524 Send stream header.
525 """
526 # set up optional extra namespaces
527 localPrefixes = {}
528 for uri, prefix in self.prefixes.iteritems():
529 if uri != NS_STREAMS:
530 localPrefixes[prefix] = uri
531
532 rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace,
533 localPrefixes=localPrefixes)
534
535 if self.otherEntity:
536 rootElement['to'] = self.otherEntity.userhost()
537
538 if self.thisEntity:
539 rootElement['from'] = self.thisEntity.userhost()
540
541 if not self.initiating and self.sid:
542 rootElement['id'] = self.sid
543
544 if self.version >= (1, 0):
545 rootElement['version'] = "%d.%d" % self.version
546
547 self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0))
548 self._headerSent = True
549
550
551 def sendFooter(self):
552 """
553 Send stream footer.
554 """
555 self.send('</stream:stream>')
556
557
558 def sendStreamError(self, streamError):
559 """
560 Send stream level error.
561
562 If we are the receiving entity, and haven't sent the header yet,
563 we sent one first.
564
565 After sending the stream error, the stream is closed and the transport
566 connection dropped.
567
568 @param streamError: stream error instance
569 @type streamError: L{error.StreamError}
570 """
571 if not self._headerSent and not self.initiating:
572 self.sendHeader()
573
574 if self._headerSent:
575 self.send(streamError.getElement())
576 self.sendFooter()
577
578 self.transport.loseConnection()
579
580
581 def send(self, obj):
582 """
583 Send data over the stream.
584
585 This overrides L{xmlstream.Xmlstream.send} to use the default namespace
586 of the stream header when serializing L{domish.IElement}s. It is
587 assumed that if you pass an object that provides L{domish.IElement},
588 it represents a direct child of the stream's root element.
589 """
590 if domish.IElement.providedBy(obj):
591 obj = obj.toXml(prefixes=self.prefixes,
592 defaultUri=self.namespace,
593 prefixesInScope=self.prefixes.values())
594
595 xmlstream.XmlStream.send(self, obj)
596
597
598 def connectionMade(self):
599 """
600 Called when a connection is made.
601
602 Notifies the authenticator when a connection has been made.
603 """
604 xmlstream.XmlStream.connectionMade(self)
605 self.authenticator.connectionMade()
606
607
608 def onDocumentStart(self, rootElement):
609 """
610 Called when the stream header has been received.
611
612 Extracts the header's C{id} and C{version} attributes from the root
613 element. The C{id} attribute is stored in our C{sid} attribute and the
614 C{version} attribute is parsed and the minimum of the version we sent
615 and the parsed C{version} attribute is stored as a tuple (major, minor)
616 in this class' C{version} attribute. If no C{version} attribute was
617 present, we assume version 0.0.
618
619 If appropriate (we are the initiating stream and the minimum of our and
620 the other party's version is at least 1.0), a one-time observer is
621 registered for getting the stream features. The registered function is
622 C{onFeatures}.
623
624 Ultimately, the authenticator's C{streamStarted} method will be called.
625
626 @param rootElement: The root element.
627 @type rootElement: L{domish.Element}
628 """
629 xmlstream.XmlStream.onDocumentStart(self, rootElement)
630
631 # Setup observer for stream errors
632 self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS,
633 self.onStreamError)
634
635 self.authenticator.streamStarted(rootElement)
636
637
638
639 class XmlStreamFactory(xmlstream.XmlStreamFactory):
640 """
641 Factory for Jabber XmlStream objects as a reconnecting client.
642
643 Note that this differs from L{xmlstream.XmlStreamFactory} in that
644 it generates Jabber specific L{XmlStream} instances that have
645 authenticators.
646 """
647
648 protocol = XmlStream
649
650 def __init__(self, authenticator):
651 xmlstream.XmlStreamFactory.__init__(self, authenticator)
652 self.authenticator = authenticator
653
654
655
656 class TimeoutError(Exception):
657 """
658 Exception raised when no IQ response has been received before the
659 configured timeout.
660 """
661
662
663
664 def upgradeWithIQResponseTracker(xs):
665 """
666 Enhances an XmlStream for iq response tracking.
667
668 This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a
669 response is an error iq stanza, the deferred has its errback invoked with a
670 failure that holds a L{StanzaException<error.StanzaException>} that is
671 easier to examine.
672 """
673 def callback(iq):
674 """
675 Handle iq response by firing associated deferred.
676 """
677 if getattr(iq, 'handled', False):
678 return
679
680 try:
681 d = xs.iqDeferreds[iq["id"]]
682 except KeyError:
683 pass
684 else:
685 del xs.iqDeferreds[iq["id"]]
686 iq.handled = True
687 if iq['type'] == 'error':
688 d.errback(error.exceptionFromStanza(iq))
689 else:
690 d.callback(iq)
691
692
693 def disconnected(_):
694 """
695 Make sure deferreds do not linger on after disconnect.
696
697 This errbacks all deferreds of iq's for which no response has been
698 received with a L{ConnectionLost} failure. Otherwise, the deferreds
699 will never be fired.
700 """
701 iqDeferreds = xs.iqDeferreds
702 xs.iqDeferreds = {}
703 for d in iqDeferreds.itervalues():
704 d.errback(ConnectionLost())
705
706 xs.iqDeferreds = {}
707 xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None)
708 xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected)
709 xs.addObserver('/iq[@type="result"]', callback)
710 xs.addObserver('/iq[@type="error"]', callback)
711 directlyProvides(xs, ijabber.IIQResponseTracker)
712
713
714
715 class IQ(domish.Element):
716 """
717 Wrapper for an iq stanza.
718
719 Iq stanzas are used for communications with a request-response behaviour.
720 Each iq request is associated with an XML stream and has its own unique id
721 to be able to track the response.
722
723 @ivar timeout: if set, a timeout period after which the deferred returned
724 by C{send} will have its errback called with a
725 L{TimeoutError} failure.
726 @type timeout: C{float}
727 """
728
729 timeout = None
730
731 def __init__(self, xmlstream, stanzaType="set"):
732 """
733 @type xmlstream: L{xmlstream.XmlStream}
734 @param xmlstream: XmlStream to use for transmission of this IQ
735
736 @type stanzaType: L{str}
737 @param stanzaType: IQ type identifier ('get' or 'set')
738 """
739 domish.Element.__init__(self, (None, "iq"))
740 self.addUniqueId()
741 self["type"] = stanzaType
742 self._xmlstream = xmlstream
743
744
745 def send(self, to=None):
746 """
747 Send out this iq.
748
749 Returns a deferred that is fired when an iq response with the same id
750 is received. Result responses will be passed to the deferred callback.
751 Error responses will be transformed into a
752 L{StanzaError<error.StanzaError>} and result in the errback of the
753 deferred being invoked.
754
755 @rtype: L{defer.Deferred}
756 """
757 if to is not None:
758 self["to"] = to
759
760 if not ijabber.IIQResponseTracker.providedBy(self._xmlstream):
761 upgradeWithIQResponseTracker(self._xmlstream)
762
763 d = defer.Deferred()
764 self._xmlstream.iqDeferreds[self['id']] = d
765
766 timeout = self.timeout or self._xmlstream.iqDefaultTimeout
767 if timeout is not None:
768 def onTimeout():
769 del self._xmlstream.iqDeferreds[self['id']]
770 d.errback(TimeoutError("IQ timed out"))
771
772 call = self._xmlstream._callLater(timeout, onTimeout)
773
774 def cancelTimeout(result):
775 if call.active():
776 call.cancel()
777
778 return result
779
780 d.addBoth(cancelTimeout)
781
782 self._xmlstream.send(self)
783 return d
784
785
786
787 def toResponse(stanza, stanzaType=None):
788 """
789 Create a response stanza from another stanza.
790
791 This takes the addressing and id attributes from a stanza to create a (new,
792 empty) response stanza. The addressing attributes are swapped and the id
793 copied. Optionally, the stanza type of the response can be specified.
794
795 @param stanza: the original stanza
796 @type stanza: L{domish.Element}
797 @param stanzaType: optional response stanza type
798 @type stanzaType: C{str}
799 @return: the response stanza.
800 @rtype: L{domish.Element}
801 """
802
803 toAddr = stanza.getAttribute('from')
804 fromAddr = stanza.getAttribute('to')
805 stanzaID = stanza.getAttribute('id')
806
807 response = domish.Element((None, stanza.name))
808 if toAddr:
809 response['to'] = toAddr
810 if fromAddr:
811 response['from'] = fromAddr
812 if stanzaID:
813 response['id'] = stanzaID
814 if type:
815 response['type'] = stanzaType
816
817 return response
818
819
820
821 class XMPPHandler(object):
822 """
823 XMPP protocol handler.
824
825 Classes derived from this class implement (part of) one or more XMPP
826 extension protocols, and are referred to as a subprotocol implementation.
827 """
828
829 implements(ijabber.IXMPPHandler)
830
831 def __init__(self):
832 self.parent = None
833 self.xmlstream = None
834
835
836 def setHandlerParent(self, parent):
837 self.parent = parent
838 self.parent.addHandler(self)
839
840
841 def disownHandlerParent(self, parent):
842 self.parent.removeHandler(self)
843 self.parent = None
844
845
846 def makeConnection(self, xs):
847 self.xmlstream = xs
848 self.connectionMade()
849
850
851 def connectionMade(self):
852 """
853 Called after a connection has been established.
854
855 Can be overridden to perform work before stream initialization.
856 """
857
858
859 def connectionInitialized(self):
860 """
861 The XML stream has been initialized.
862
863 Can be overridden to perform work after stream initialization, e.g. to
864 set up observers and start exchanging XML stanzas.
865 """
866
867
868 def connectionLost(self, reason):
869 """
870 The XML stream has been closed.
871
872 This method can be extended to inspect the C{reason} argument and
873 act on it.
874 """
875 self.xmlstream = None
876
877
878 def send(self, obj):
879 """
880 Send data over the managed XML stream.
881
882 @note: The stream manager maintains a queue for data sent using this
883 method when there is no current initialized XML stream. This
884 data is then sent as soon as a new stream has been established
885 and initialized. Subsequently, L{connectionInitialized} will be
886 called again. If this queueing is not desired, use C{send} on
887 C{self.xmlstream}.
888
889 @param obj: data to be sent over the XML stream. This is usually an
890 object providing L{domish.IElement}, or serialized XML. See
891 L{xmlstream.XmlStream} for details.
892 """
893 self.parent.send(obj)
894
895
896
897 class XMPPHandlerCollection(object):
898 """
899 Collection of XMPP subprotocol handlers.
900
901 This allows for grouping of subprotocol handlers, but is not an
902 L{XMPPHandler} itself, so this is not recursive.
903
904 @ivar handlers: List of protocol handlers.
905 @type handlers: L{list} of objects providing
906 L{IXMPPHandler}
907 """
908
909 implements(ijabber.IXMPPHandlerCollection)
910
911 def __init__(self):
912 self.handlers = []
913
914
915 def __iter__(self):
916 """
917 Act as a container for handlers.
918 """
919 return iter(self.handlers)
920
921
922 def addHandler(self, handler):
923 """
924 Add protocol handler.
925
926 Protocol handlers are expected to provide L{ijabber.IXMPPHandler}.
927 """
928 self.handlers.append(handler)
929
930
931 def removeHandler(self, handler):
932 """
933 Remove protocol handler.
934 """
935 self.handlers.remove(handler)
936
937
938
939 class StreamManager(XMPPHandlerCollection):
940 """
941 Business logic representing a managed XMPP connection.
942
943 This maintains a single XMPP connection and provides facilities for packet
944 routing and transmission. Business logic modules are objects providing
945 L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
946 using L{addHandler}.
947
948 @ivar xmlstream: currently managed XML stream
949 @type xmlstream: L{XmlStream}
950 @ivar logTraffic: if true, log all traffic.
951 @type logTraffic: L{bool}
952 @ivar _initialized: Whether the stream represented by L{xmlstream} has
953 been initialized. This is used when caching outgoing
954 stanzas.
955 @type _initialized: C{bool}
956 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
957 @type _packetQueue: L{list}
958 """
959
960 logTraffic = False
961
962 def __init__(self, factory):
963 XMPPHandlerCollection.__init__(self)
964 self.xmlstream = None
965 self._packetQueue = []
966 self._initialized = False
967
968 factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected)
969 factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd)
970 factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed)
971 factory.addBootstrap(STREAM_END_EVENT, self._disconnected)
972 self.factory = factory
973
974
975 def addHandler(self, handler):
976 """
977 Add protocol handler.
978
979 When an XML stream has already been established, the handler's
980 C{connectionInitialized} will be called to get it up to speed.
981 """
982 XMPPHandlerCollection.addHandler(self, handler)
983
984 # get protocol handler up to speed when a connection has already
985 # been established
986 if self.xmlstream and self._initialized:
987 handler.makeConnection(self.xmlstream)
988 handler.connectionInitialized()
989
990
991 def _connected(self, xs):
992 """
993 Called when the transport connection has been established.
994
995 Here we optionally set up traffic logging (depending on L{logTraffic})
996 and call each handler's C{makeConnection} method with the L{XmlStream}
997 instance.
998 """
999 def logDataIn(buf):
1000 log.msg("RECV: %r" % buf)
1001
1002 def logDataOut(buf):
1003 log.msg("SEND: %r" % buf)
1004
1005 if self.logTraffic:
1006 xs.rawDataInFn = logDataIn
1007 xs.rawDataOutFn = logDataOut
1008
1009 self.xmlstream = xs
1010
1011 for e in self:
1012 e.makeConnection(xs)
1013
1014
1015 def _authd(self, xs):
1016 """
1017 Called when the stream has been initialized.
1018
1019 Send out cached stanzas and call each handler's
1020 C{connectionInitialized} method.
1021 """
1022 # Flush all pending packets
1023 for p in self._packetQueue:
1024 xs.send(p)
1025 self._packetQueue = []
1026 self._initialized = True
1027
1028 # Notify all child services which implement
1029 # the IService interface
1030 for e in self:
1031 e.connectionInitialized()
1032
1033
1034 def initializationFailed(self, reason):
1035 """
1036 Called when stream initialization has failed.
1037
1038 Stream initialization has halted, with the reason indicated by
1039 C{reason}. It may be retried by calling the authenticator's
1040 C{initializeStream}. See the respective authenticators for details.
1041
1042 @param reason: A failure instance indicating why stream initialization
1043 failed.
1044 @type reason: L{failure.Failure}
1045 """
1046
1047
1048 def _disconnected(self, _):
1049 """
1050 Called when the stream has been closed.
1051
1052 From this point on, the manager doesn't interact with the
1053 L{XmlStream} anymore and notifies each handler that the connection
1054 was lost by calling its C{connectionLost} method.
1055 """
1056 self.xmlstream = None
1057 self._initialized = False
1058
1059 # Notify all child services which implement
1060 # the IService interface
1061 for e in self:
1062 e.connectionLost(None)
1063
1064
1065 def send(self, obj):
1066 """
1067 Send data over the XML stream.
1068
1069 When there is no established XML stream, the data is queued and sent
1070 out when a new XML stream has been established and initialized.
1071
1072 @param obj: data to be sent over the XML stream. See
1073 L{xmlstream.XmlStream.send} for details.
1074 """
1075 if self._initialized:
1076 self.xmlstream.send(obj)
1077 else:
1078 self._packetQueue.append(obj)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698