OLD | NEW |
| (Empty) |
1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """A bare-bones and non-compliant XMPP server. | |
6 | |
7 Just enough of the protocol is implemented to get it to work with | |
8 Chrome's sync notification system. | |
9 """ | |
10 | |
11 import asynchat | |
12 import asyncore | |
13 import base64 | |
14 import re | |
15 import socket | |
16 from xml.dom import minidom | |
17 | |
18 # pychecker complains about the use of fileno(), which is implemented | |
19 # by asyncore by forwarding to an internal object via __getattr__. | |
20 __pychecker__ = 'no-classattr' | |
21 | |
22 | |
23 class Error(Exception): | |
24 """Error class for this module.""" | |
25 pass | |
26 | |
27 | |
28 class UnexpectedXml(Error): | |
29 """Raised when an unexpected XML element has been encountered.""" | |
30 | |
31 def __init__(self, xml_element): | |
32 xml_text = xml_element.toxml() | |
33 Error.__init__(self, 'Unexpected XML element', xml_text) | |
34 | |
35 | |
36 def ParseXml(xml_string): | |
37 """Parses the given string as XML and returns a minidom element | |
38 object. | |
39 """ | |
40 dom = minidom.parseString(xml_string) | |
41 | |
42 # minidom handles xmlns specially, but there's a bug where it sets | |
43 # the attribute value to None, which causes toxml() or toprettyxml() | |
44 # to break. | |
45 def FixMinidomXmlnsBug(xml_element): | |
46 if xml_element.getAttribute('xmlns') is None: | |
47 xml_element.setAttribute('xmlns', '') | |
48 | |
49 def ApplyToAllDescendantElements(xml_element, fn): | |
50 fn(xml_element) | |
51 for node in xml_element.childNodes: | |
52 if node.nodeType == node.ELEMENT_NODE: | |
53 ApplyToAllDescendantElements(node, fn) | |
54 | |
55 root = dom.documentElement | |
56 ApplyToAllDescendantElements(root, FixMinidomXmlnsBug) | |
57 return root | |
58 | |
59 | |
60 def CloneXml(xml): | |
61 """Returns a deep copy of the given XML element. | |
62 | |
63 Args: | |
64 xml: The XML element, which should be something returned from | |
65 ParseXml() (i.e., a root element). | |
66 """ | |
67 return xml.ownerDocument.cloneNode(True).documentElement | |
68 | |
69 | |
70 class StanzaParser(object): | |
71 """A hacky incremental XML parser. | |
72 | |
73 StanzaParser consumes data incrementally via FeedString() and feeds | |
74 its delegate complete parsed stanzas (i.e., XML documents) via | |
75 FeedStanza(). Any stanzas passed to FeedStanza() are unlinked after | |
76 the callback is done. | |
77 | |
78 Use like so: | |
79 | |
80 class MyClass(object): | |
81 ... | |
82 def __init__(self, ...): | |
83 ... | |
84 self._parser = StanzaParser(self) | |
85 ... | |
86 | |
87 def SomeFunction(self, ...): | |
88 ... | |
89 self._parser.FeedString(some_data) | |
90 ... | |
91 | |
92 def FeedStanza(self, stanza): | |
93 ... | |
94 print stanza.toprettyxml() | |
95 ... | |
96 """ | |
97 | |
98 # NOTE(akalin): The following regexps are naive, but necessary since | |
99 # none of the existing Python 2.4/2.5 XML libraries support | |
100 # incremental parsing. This works well enough for our purposes. | |
101 # | |
102 # The regexps below assume that any present XML element starts at | |
103 # the beginning of the string, but there may be trailing whitespace. | |
104 | |
105 # Matches an opening stream tag (e.g., '<stream:stream foo="bar">') | |
106 # (assumes that the stream XML namespace is defined in the tag). | |
107 _stream_re = re.compile(r'^(<stream:stream [^>]*>)\s*') | |
108 | |
109 # Matches an empty element tag (e.g., '<foo bar="baz"/>'). | |
110 _empty_element_re = re.compile(r'^(<[^>]*/>)\s*') | |
111 | |
112 # Matches a non-empty element (e.g., '<foo bar="baz">quux</foo>'). | |
113 # Does *not* handle nested elements. | |
114 _non_empty_element_re = re.compile(r'^(<([^ >]*)[^>]*>.*?</\2>)\s*') | |
115 | |
116 # The closing tag for a stream tag. We have to insert this | |
117 # ourselves since all XML stanzas are children of the stream tag, | |
118 # which is never closed until the connection is closed. | |
119 _stream_suffix = '</stream:stream>' | |
120 | |
121 def __init__(self, delegate): | |
122 self._buffer = '' | |
123 self._delegate = delegate | |
124 | |
125 def FeedString(self, data): | |
126 """Consumes the given string data, possibly feeding one or more | |
127 stanzas to the delegate. | |
128 """ | |
129 self._buffer += data | |
130 while (self._ProcessBuffer(self._stream_re, self._stream_suffix) or | |
131 self._ProcessBuffer(self._empty_element_re) or | |
132 self._ProcessBuffer(self._non_empty_element_re)): | |
133 pass | |
134 | |
135 def _ProcessBuffer(self, regexp, xml_suffix=''): | |
136 """If the buffer matches the given regexp, removes the match from | |
137 the buffer, appends the given suffix, parses it, and feeds it to | |
138 the delegate. | |
139 | |
140 Returns: | |
141 Whether or not the buffer matched the given regexp. | |
142 """ | |
143 results = regexp.match(self._buffer) | |
144 if not results: | |
145 return False | |
146 xml_text = self._buffer[:results.end()] + xml_suffix | |
147 self._buffer = self._buffer[results.end():] | |
148 stanza = ParseXml(xml_text) | |
149 self._delegate.FeedStanza(stanza) | |
150 # Needed because stanza may have cycles. | |
151 stanza.unlink() | |
152 return True | |
153 | |
154 | |
155 class Jid(object): | |
156 """Simple struct for an XMPP jid (essentially an e-mail address with | |
157 an optional resource string). | |
158 """ | |
159 | |
160 def __init__(self, username, domain, resource=''): | |
161 self.username = username | |
162 self.domain = domain | |
163 self.resource = resource | |
164 | |
165 def __str__(self): | |
166 jid_str = "%s@%s" % (self.username, self.domain) | |
167 if self.resource: | |
168 jid_str += '/' + self.resource | |
169 return jid_str | |
170 | |
171 def GetBareJid(self): | |
172 return Jid(self.username, self.domain) | |
173 | |
174 | |
175 class IdGenerator(object): | |
176 """Simple class to generate unique IDs for XMPP messages.""" | |
177 | |
178 def __init__(self, prefix): | |
179 self._prefix = prefix | |
180 self._id = 0 | |
181 | |
182 def GetNextId(self): | |
183 next_id = "%s.%s" % (self._prefix, self._id) | |
184 self._id += 1 | |
185 return next_id | |
186 | |
187 | |
188 class HandshakeTask(object): | |
189 """Class to handle the initial handshake with a connected XMPP | |
190 client. | |
191 """ | |
192 | |
193 # The handshake states in order. | |
194 (_INITIAL_STREAM_NEEDED, | |
195 _AUTH_NEEDED, | |
196 _AUTH_STREAM_NEEDED, | |
197 _BIND_NEEDED, | |
198 _SESSION_NEEDED, | |
199 _FINISHED) = range(6) | |
200 | |
201 # Used when in the _INITIAL_STREAM_NEEDED and _AUTH_STREAM_NEEDED | |
202 # states. Not an XML object as it's only the opening tag. | |
203 # | |
204 # The from and id attributes are filled in later. | |
205 _STREAM_DATA = ( | |
206 '<stream:stream from="%s" id="%s" ' | |
207 'version="1.0" xmlns:stream="http://etherx.jabber.org/streams" ' | |
208 'xmlns="jabber:client">') | |
209 | |
210 # Used when in the _INITIAL_STREAM_NEEDED state. | |
211 _AUTH_STANZA = ParseXml( | |
212 '<stream:features xmlns:stream="http://etherx.jabber.org/streams">' | |
213 ' <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">' | |
214 ' <mechanism>PLAIN</mechanism>' | |
215 ' <mechanism>X-GOOGLE-TOKEN</mechanism>' | |
216 ' </mechanisms>' | |
217 '</stream:features>') | |
218 | |
219 # Used when in the _AUTH_NEEDED state. | |
220 _AUTH_SUCCESS_STANZA = ParseXml( | |
221 '<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>') | |
222 | |
223 # Used when in the _AUTH_NEEDED state. | |
224 _AUTH_FAILURE_STANZA = ParseXml( | |
225 '<failure xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>') | |
226 | |
227 # Used when in the _AUTH_STREAM_NEEDED state. | |
228 _BIND_STANZA = ParseXml( | |
229 '<stream:features xmlns:stream="http://etherx.jabber.org/streams">' | |
230 ' <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/>' | |
231 ' <session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>' | |
232 '</stream:features>') | |
233 | |
234 # Used when in the _BIND_NEEDED state. | |
235 # | |
236 # The id and jid attributes are filled in later. | |
237 _BIND_RESULT_STANZA = ParseXml( | |
238 '<iq id="" type="result">' | |
239 ' <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">' | |
240 ' <jid/>' | |
241 ' </bind>' | |
242 '</iq>') | |
243 | |
244 # Used when in the _SESSION_NEEDED state. | |
245 # | |
246 # The id attribute is filled in later. | |
247 _IQ_RESPONSE_STANZA = ParseXml('<iq id="" type="result"/>') | |
248 | |
249 def __init__(self, connection, resource_prefix, authenticated): | |
250 self._connection = connection | |
251 self._id_generator = IdGenerator(resource_prefix) | |
252 self._username = '' | |
253 self._domain = '' | |
254 self._jid = None | |
255 self._authenticated = authenticated | |
256 self._resource_prefix = resource_prefix | |
257 self._state = self._INITIAL_STREAM_NEEDED | |
258 | |
259 def FeedStanza(self, stanza): | |
260 """Inspects the given stanza and changes the handshake state if needed. | |
261 | |
262 Called when a stanza is received from the client. Inspects the | |
263 stanza to make sure it has the expected attributes given the | |
264 current state, advances the state if needed, and sends a reply to | |
265 the client if needed. | |
266 """ | |
267 def ExpectStanza(stanza, name): | |
268 if stanza.tagName != name: | |
269 raise UnexpectedXml(stanza) | |
270 | |
271 def ExpectIq(stanza, type, name): | |
272 ExpectStanza(stanza, 'iq') | |
273 if (stanza.getAttribute('type') != type or | |
274 stanza.firstChild.tagName != name): | |
275 raise UnexpectedXml(stanza) | |
276 | |
277 def GetStanzaId(stanza): | |
278 return stanza.getAttribute('id') | |
279 | |
280 def HandleStream(stanza): | |
281 ExpectStanza(stanza, 'stream:stream') | |
282 domain = stanza.getAttribute('to') | |
283 if domain: | |
284 self._domain = domain | |
285 SendStreamData() | |
286 | |
287 def SendStreamData(): | |
288 next_id = self._id_generator.GetNextId() | |
289 stream_data = self._STREAM_DATA % (self._domain, next_id) | |
290 self._connection.SendData(stream_data) | |
291 | |
292 def GetUserDomain(stanza): | |
293 encoded_username_password = stanza.firstChild.data | |
294 username_password = base64.b64decode(encoded_username_password) | |
295 (_, username_domain, _) = username_password.split('\0') | |
296 # The domain may be omitted. | |
297 # | |
298 # If we were using python 2.5, we'd be able to do: | |
299 # | |
300 # username, _, domain = username_domain.partition('@') | |
301 # if not domain: | |
302 # domain = self._domain | |
303 at_pos = username_domain.find('@') | |
304 if at_pos != -1: | |
305 username = username_domain[:at_pos] | |
306 domain = username_domain[at_pos+1:] | |
307 else: | |
308 username = username_domain | |
309 domain = self._domain | |
310 return (username, domain) | |
311 | |
312 def Finish(): | |
313 self._state = self._FINISHED | |
314 self._connection.HandshakeDone(self._jid) | |
315 | |
316 if self._state == self._INITIAL_STREAM_NEEDED: | |
317 HandleStream(stanza) | |
318 self._connection.SendStanza(self._AUTH_STANZA, False) | |
319 self._state = self._AUTH_NEEDED | |
320 | |
321 elif self._state == self._AUTH_NEEDED: | |
322 ExpectStanza(stanza, 'auth') | |
323 (self._username, self._domain) = GetUserDomain(stanza) | |
324 if self._authenticated: | |
325 self._connection.SendStanza(self._AUTH_SUCCESS_STANZA, False) | |
326 self._state = self._AUTH_STREAM_NEEDED | |
327 else: | |
328 self._connection.SendStanza(self._AUTH_FAILURE_STANZA, False) | |
329 Finish() | |
330 | |
331 elif self._state == self._AUTH_STREAM_NEEDED: | |
332 HandleStream(stanza) | |
333 self._connection.SendStanza(self._BIND_STANZA, False) | |
334 self._state = self._BIND_NEEDED | |
335 | |
336 elif self._state == self._BIND_NEEDED: | |
337 ExpectIq(stanza, 'set', 'bind') | |
338 stanza_id = GetStanzaId(stanza) | |
339 resource_element = stanza.getElementsByTagName('resource')[0] | |
340 resource = resource_element.firstChild.data | |
341 full_resource = '%s.%s' % (self._resource_prefix, resource) | |
342 response = CloneXml(self._BIND_RESULT_STANZA) | |
343 response.setAttribute('id', stanza_id) | |
344 self._jid = Jid(self._username, self._domain, full_resource) | |
345 jid_text = response.parentNode.createTextNode(str(self._jid)) | |
346 response.getElementsByTagName('jid')[0].appendChild(jid_text) | |
347 self._connection.SendStanza(response) | |
348 self._state = self._SESSION_NEEDED | |
349 | |
350 elif self._state == self._SESSION_NEEDED: | |
351 ExpectIq(stanza, 'set', 'session') | |
352 stanza_id = GetStanzaId(stanza) | |
353 xml = CloneXml(self._IQ_RESPONSE_STANZA) | |
354 xml.setAttribute('id', stanza_id) | |
355 self._connection.SendStanza(xml) | |
356 Finish() | |
357 | |
358 | |
359 def AddrString(addr): | |
360 return '%s:%d' % addr | |
361 | |
362 | |
363 class XmppConnection(asynchat.async_chat): | |
364 """A single XMPP client connection. | |
365 | |
366 This class handles the connection to a single XMPP client (via a | |
367 socket). It does the XMPP handshake and also implements the (old) | |
368 Google notification protocol. | |
369 """ | |
370 | |
371 # Used for acknowledgements to the client. | |
372 # | |
373 # The from and id attributes are filled in later. | |
374 _IQ_RESPONSE_STANZA = ParseXml('<iq from="" id="" type="result"/>') | |
375 | |
376 def __init__(self, sock, socket_map, delegate, addr, authenticated): | |
377 """Starts up the xmpp connection. | |
378 | |
379 Args: | |
380 sock: The socket to the client. | |
381 socket_map: A map from sockets to their owning objects. | |
382 delegate: The delegate, which is notified when the XMPP | |
383 handshake is successful, when the connection is closed, and | |
384 when a notification has to be broadcast. | |
385 addr: The host/port of the client. | |
386 """ | |
387 # We do this because in versions of python < 2.6, | |
388 # async_chat.__init__ doesn't take a map argument nor pass it to | |
389 # dispatcher.__init__. We rely on the fact that | |
390 # async_chat.__init__ calls dispatcher.__init__ as the last thing | |
391 # it does, and that calling dispatcher.__init__ with socket=None | |
392 # and map=None is essentially a no-op. | |
393 asynchat.async_chat.__init__(self) | |
394 asyncore.dispatcher.__init__(self, sock, socket_map) | |
395 | |
396 self.set_terminator(None) | |
397 | |
398 self._delegate = delegate | |
399 self._parser = StanzaParser(self) | |
400 self._jid = None | |
401 | |
402 self._addr = addr | |
403 addr_str = AddrString(self._addr) | |
404 self._handshake_task = HandshakeTask(self, addr_str, authenticated) | |
405 print 'Starting connection to %s' % self | |
406 | |
407 def __str__(self): | |
408 if self._jid: | |
409 return str(self._jid) | |
410 else: | |
411 return AddrString(self._addr) | |
412 | |
413 # async_chat implementation. | |
414 | |
415 def collect_incoming_data(self, data): | |
416 self._parser.FeedString(data) | |
417 | |
418 # This is only here to make pychecker happy. | |
419 def found_terminator(self): | |
420 asynchat.async_chat.found_terminator(self) | |
421 | |
422 def close(self): | |
423 print "Closing connection to %s" % self | |
424 self._delegate.OnXmppConnectionClosed(self) | |
425 asynchat.async_chat.close(self) | |
426 | |
427 # Called by self._parser.FeedString(). | |
428 def FeedStanza(self, stanza): | |
429 if self._handshake_task: | |
430 self._handshake_task.FeedStanza(stanza) | |
431 elif stanza.tagName == 'iq' and stanza.getAttribute('type') == 'result': | |
432 # Ignore all client acks. | |
433 pass | |
434 elif (stanza.firstChild and | |
435 stanza.firstChild.namespaceURI == 'google:push'): | |
436 self._HandlePushCommand(stanza) | |
437 else: | |
438 raise UnexpectedXml(stanza) | |
439 | |
440 # Called by self._handshake_task. | |
441 def HandshakeDone(self, jid): | |
442 if jid: | |
443 self._jid = jid | |
444 self._handshake_task = None | |
445 self._delegate.OnXmppHandshakeDone(self) | |
446 print "Handshake done for %s" % self | |
447 else: | |
448 print "Handshake failed for %s" % self | |
449 self.close() | |
450 | |
451 def _HandlePushCommand(self, stanza): | |
452 if stanza.tagName == 'iq' and stanza.firstChild.tagName == 'subscribe': | |
453 # Subscription request. | |
454 self._SendIqResponseStanza(stanza) | |
455 elif stanza.tagName == 'message' and stanza.firstChild.tagName == 'push': | |
456 # Send notification request. | |
457 self._delegate.ForwardNotification(self, stanza) | |
458 else: | |
459 raise UnexpectedXml(command_xml) | |
460 | |
461 def _SendIqResponseStanza(self, iq): | |
462 stanza = CloneXml(self._IQ_RESPONSE_STANZA) | |
463 stanza.setAttribute('from', str(self._jid.GetBareJid())) | |
464 stanza.setAttribute('id', iq.getAttribute('id')) | |
465 self.SendStanza(stanza) | |
466 | |
467 def SendStanza(self, stanza, unlink=True): | |
468 """Sends a stanza to the client. | |
469 | |
470 Args: | |
471 stanza: The stanza to send. | |
472 unlink: Whether to unlink stanza after sending it. (Pass in | |
473 False if stanza is a constant.) | |
474 """ | |
475 self.SendData(stanza.toxml()) | |
476 if unlink: | |
477 stanza.unlink() | |
478 | |
479 def SendData(self, data): | |
480 """Sends raw data to the client. | |
481 """ | |
482 # We explicitly encode to ascii as that is what the client expects | |
483 # (some minidom library functions return unicode strings). | |
484 self.push(data.encode('ascii')) | |
485 | |
486 def ForwardNotification(self, notification_stanza): | |
487 """Forwards a notification to the client.""" | |
488 notification_stanza.setAttribute('from', str(self._jid.GetBareJid())) | |
489 notification_stanza.setAttribute('to', str(self._jid)) | |
490 self.SendStanza(notification_stanza, False) | |
491 | |
492 | |
493 class XmppServer(asyncore.dispatcher): | |
494 """The main XMPP server class. | |
495 | |
496 The XMPP server starts accepting connections on the given address | |
497 and spawns off XmppConnection objects for each one. | |
498 | |
499 Use like so: | |
500 | |
501 socket_map = {} | |
502 xmpp_server = xmppserver.XmppServer(socket_map, ('127.0.0.1', 5222)) | |
503 asyncore.loop(30.0, False, socket_map) | |
504 """ | |
505 | |
506 # Used when sending a notification. | |
507 _NOTIFICATION_STANZA = ParseXml( | |
508 '<message>' | |
509 ' <push xmlns="google:push">' | |
510 ' <data/>' | |
511 ' </push>' | |
512 '</message>') | |
513 | |
514 def __init__(self, socket_map, addr): | |
515 asyncore.dispatcher.__init__(self, None, socket_map) | |
516 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | |
517 self.set_reuse_addr() | |
518 self.bind(addr) | |
519 self.listen(5) | |
520 self._socket_map = socket_map | |
521 self._connections = set() | |
522 self._handshake_done_connections = set() | |
523 self._notifications_enabled = True | |
524 self._authenticated = True | |
525 | |
526 def handle_accept(self): | |
527 (sock, addr) = self.accept() | |
528 xmpp_connection = XmppConnection( | |
529 sock, self._socket_map, self, addr, self._authenticated) | |
530 self._connections.add(xmpp_connection) | |
531 # Return the new XmppConnection for testing. | |
532 return xmpp_connection | |
533 | |
534 def close(self): | |
535 # A copy is necessary since calling close on each connection | |
536 # removes it from self._connections. | |
537 for connection in self._connections.copy(): | |
538 connection.close() | |
539 asyncore.dispatcher.close(self) | |
540 | |
541 def EnableNotifications(self): | |
542 self._notifications_enabled = True | |
543 | |
544 def DisableNotifications(self): | |
545 self._notifications_enabled = False | |
546 | |
547 def MakeNotification(self, channel, data): | |
548 """Makes a notification from the given channel and encoded data. | |
549 | |
550 Args: | |
551 channel: The channel on which to send the notification. | |
552 data: The notification payload. | |
553 """ | |
554 notification_stanza = CloneXml(self._NOTIFICATION_STANZA) | |
555 push_element = notification_stanza.getElementsByTagName('push')[0] | |
556 push_element.setAttribute('channel', channel) | |
557 data_element = push_element.getElementsByTagName('data')[0] | |
558 encoded_data = base64.b64encode(data) | |
559 data_text = notification_stanza.parentNode.createTextNode(encoded_data) | |
560 data_element.appendChild(data_text) | |
561 return notification_stanza | |
562 | |
563 def SendNotification(self, channel, data): | |
564 """Sends a notification to all connections. | |
565 | |
566 Args: | |
567 channel: The channel on which to send the notification. | |
568 data: The notification payload. | |
569 """ | |
570 notification_stanza = self.MakeNotification(channel, data) | |
571 self.ForwardNotification(None, notification_stanza) | |
572 notification_stanza.unlink() | |
573 | |
574 def SetAuthenticated(self, auth_valid): | |
575 self._authenticated = auth_valid | |
576 | |
577 def GetAuthenticated(self): | |
578 return self._authenticated | |
579 | |
580 # XmppConnection delegate methods. | |
581 def OnXmppHandshakeDone(self, xmpp_connection): | |
582 self._handshake_done_connections.add(xmpp_connection) | |
583 | |
584 def OnXmppConnectionClosed(self, xmpp_connection): | |
585 self._connections.discard(xmpp_connection) | |
586 self._handshake_done_connections.discard(xmpp_connection) | |
587 | |
588 def ForwardNotification(self, unused_xmpp_connection, notification_stanza): | |
589 if self._notifications_enabled: | |
590 for connection in self._handshake_done_connections: | |
591 print 'Sending notification to %s' % connection | |
592 connection.ForwardNotification(notification_stanza) | |
593 else: | |
594 print 'Notifications disabled; dropping notification' | |
OLD | NEW |