OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_udp -*- | |
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Tests for implementations of L{IReactorUDP} and L{IReactorMulticast}. | |
7 """ | |
8 | |
9 from twisted.trial import unittest, util | |
10 | |
11 from twisted.internet.defer import Deferred, gatherResults, maybeDeferred | |
12 from twisted.internet import protocol, reactor, error, defer, interfaces | |
13 from twisted.python import runtime | |
14 | |
15 | |
16 class Mixin: | |
17 | |
18 started = 0 | |
19 stopped = 0 | |
20 | |
21 startedDeferred = None | |
22 | |
23 def __init__(self): | |
24 self.packets = [] | |
25 | |
26 def startProtocol(self): | |
27 self.started = 1 | |
28 if self.startedDeferred is not None: | |
29 d, self.startedDeferred = self.startedDeferred, None | |
30 d.callback(None) | |
31 | |
32 def stopProtocol(self): | |
33 self.stopped = 1 | |
34 | |
35 | |
36 class Server(Mixin, protocol.DatagramProtocol): | |
37 packetReceived = None | |
38 refused = 0 | |
39 | |
40 | |
41 def datagramReceived(self, data, addr): | |
42 self.packets.append((data, addr)) | |
43 if self.packetReceived is not None: | |
44 d, self.packetReceived = self.packetReceived, None | |
45 d.callback(None) | |
46 | |
47 | |
48 | |
49 class Client(Mixin, protocol.ConnectedDatagramProtocol): | |
50 | |
51 packetReceived = None | |
52 refused = 0 | |
53 | |
54 def datagramReceived(self, data): | |
55 self.packets.append(data) | |
56 if self.packetReceived is not None: | |
57 d, self.packetReceived = self.packetReceived, None | |
58 d.callback(None) | |
59 | |
60 def connectionFailed(self, failure): | |
61 if self.startedDeferred is not None: | |
62 d, self.startedDeferred = self.startedDeferred, None | |
63 d.errback(failure) | |
64 self.failure = failure | |
65 | |
66 def connectionRefused(self): | |
67 if self.startedDeferred is not None: | |
68 d, self.startedDeferred = self.startedDeferred, None | |
69 d.errback(error.ConnectionRefusedError("yup")) | |
70 self.refused = 1 | |
71 | |
72 | |
73 class GoodClient(Server): | |
74 | |
75 def connectionRefused(self): | |
76 if self.startedDeferred is not None: | |
77 d, self.startedDeferred = self.startedDeferred, None | |
78 d.errback(error.ConnectionRefusedError("yup")) | |
79 self.refused = 1 | |
80 | |
81 | |
82 | |
83 class BadClientError(Exception): | |
84 """ | |
85 Raised by BadClient at the end of every datagramReceived call to try and | |
86 screw stuff up. | |
87 """ | |
88 | |
89 | |
90 | |
91 class BadClient(protocol.DatagramProtocol): | |
92 """ | |
93 A DatagramProtocol which always raises an exception from datagramReceived. | |
94 Used to test error handling behavior in the reactor for that method. | |
95 """ | |
96 d = None | |
97 | |
98 def setDeferred(self, d): | |
99 """ | |
100 Set the Deferred which will be called back when datagramReceived is | |
101 called. | |
102 """ | |
103 self.d = d | |
104 | |
105 | |
106 def datagramReceived(self, bytes, addr): | |
107 if self.d is not None: | |
108 d, self.d = self.d, None | |
109 d.callback(bytes) | |
110 raise BadClientError("Application code is very buggy!") | |
111 | |
112 | |
113 | |
114 class OldConnectedUDPTestCase(unittest.TestCase): | |
115 def testStartStop(self): | |
116 client = Client() | |
117 d = client.startedDeferred = defer.Deferred() | |
118 port2 = reactor.connectUDP("127.0.0.1", 8888, client) | |
119 | |
120 def assertName(): | |
121 self.failUnless(repr(port2).find('test_udp.Client') >= 0) | |
122 | |
123 def cbStarted(ignored): | |
124 self.assertEquals(client.started, 1) | |
125 self.assertEquals(client.stopped, 0) | |
126 assertName() | |
127 d = defer.maybeDeferred(port2.stopListening) | |
128 d.addCallback(lambda ign: assertName()) | |
129 return d | |
130 | |
131 return d.addCallback(cbStarted) | |
132 testStartStop.suppress = [ | |
133 util.suppress(message='use listenUDP and then transport.connect', | |
134 category=DeprecationWarning)] | |
135 | |
136 | |
137 def testDNSFailure(self): | |
138 client = Client() | |
139 d = client.startedDeferred = defer.Deferred() | |
140 # if this domain exists, shoot your sysadmin | |
141 reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client) | |
142 | |
143 def didNotConnect(ign): | |
144 self.assertEquals(client.stopped, 0) | |
145 self.assertEquals(client.started, 0) | |
146 | |
147 d = self.assertFailure(d, error.DNSLookupError) | |
148 d.addCallback(didNotConnect) | |
149 return d | |
150 testDNSFailure.suppress = [ | |
151 util.suppress(message='use listenUDP and then transport.connect', | |
152 category=DeprecationWarning)] | |
153 | |
154 | |
155 def testSendPackets(self): | |
156 server = Server() | |
157 serverStarted = server.startedDeferred = defer.Deferred() | |
158 | |
159 client = Client() | |
160 clientStarted = client.startedDeferred = defer.Deferred() | |
161 | |
162 port1 = reactor.listenUDP(0, server, interface="127.0.0.1") | |
163 | |
164 def cbServerStarted(ignored): | |
165 self.port2 = reactor.connectUDP("127.0.0.1", | |
166 server.transport.getHost().port, | |
167 client) | |
168 return clientStarted | |
169 | |
170 d = serverStarted.addCallback(cbServerStarted) | |
171 | |
172 def cbClientStarted(ignored): | |
173 clientSend = server.packetReceived = defer.Deferred() | |
174 serverSend = client.packetReceived = defer.Deferred() | |
175 | |
176 cAddr = client.transport.getHost() | |
177 server.transport.write("hello", (cAddr.host, cAddr.port)) | |
178 client.transport.write("world") | |
179 | |
180 # No one will ever call errback on either of these Deferreds, | |
181 # otherwise I would pass fireOnOneErrback=True here. | |
182 return defer.DeferredList([clientSend, serverSend]) | |
183 | |
184 d.addCallback(cbClientStarted) | |
185 | |
186 def cbPackets(ignored): | |
187 self.assertEquals(client.packets, ["hello"]) | |
188 self.assertEquals(server.packets, | |
189 [("world", ("127.0.0.1", | |
190 client.transport.getHost().port))]) | |
191 | |
192 return defer.DeferredList([ | |
193 defer.maybeDeferred(port1.stopListening), | |
194 defer.maybeDeferred(self.port2.stopListening)], | |
195 fireOnOneErrback=True) | |
196 | |
197 d.addCallback(cbPackets) | |
198 return d | |
199 testSendPackets.suppress = [ | |
200 util.suppress(message='use listenUDP and then transport.connect', | |
201 category=DeprecationWarning)] | |
202 | |
203 | |
204 def test_connectionRefused(self): | |
205 """ | |
206 Test that using the connected UDP API will deliver connection refused | |
207 notification when packets are sent to an address at which no one is | |
208 listening. | |
209 """ | |
210 # XXX - assume no one listening on port 80 UDP | |
211 client = Client() | |
212 clientStarted = client.startedDeferred = Deferred() | |
213 server = Server() | |
214 serverStarted = server.startedDeferred = Deferred() | |
215 started = gatherResults([clientStarted, serverStarted]) | |
216 | |
217 clientPort = reactor.connectUDP("127.0.0.1", 80, client) | |
218 serverPort = reactor.listenUDP(0, server, interface="127.0.0.1") | |
219 | |
220 def cbStarted(ignored): | |
221 clientRefused = client.startedDeferred = Deferred() | |
222 | |
223 client.transport.write("a") | |
224 client.transport.write("b") | |
225 server.transport.write("c", ("127.0.0.1", 80)) | |
226 server.transport.write("d", ("127.0.0.1", 80)) | |
227 server.transport.write("e", ("127.0.0.1", 80)) | |
228 | |
229 c = clientPort.getHost() | |
230 s = serverPort.getHost() | |
231 server.transport.write("toserver", (s.host, s.port)) | |
232 server.transport.write("toclient", (c.host, c.port)) | |
233 | |
234 return self.assertFailure(clientRefused, error.ConnectionRefusedErro
r) | |
235 started.addCallback(cbStarted) | |
236 | |
237 def cleanup(passthrough): | |
238 result = gatherResults([ | |
239 maybeDeferred(clientPort.stopListening), | |
240 maybeDeferred(serverPort.stopListening)]) | |
241 result.addCallback(lambda ign: passthrough) | |
242 return result | |
243 | |
244 started.addBoth(cleanup) | |
245 return started | |
246 test_connectionRefused.suppress = [ | |
247 util.suppress(message='use listenUDP and then transport.connect', | |
248 category=DeprecationWarning)] | |
249 | |
250 | |
251 | |
252 class UDPTestCase(unittest.TestCase): | |
253 | |
254 def testOldAddress(self): | |
255 server = Server() | |
256 d = server.startedDeferred = defer.Deferred() | |
257 p = reactor.listenUDP(0, server, interface="127.0.0.1") | |
258 def cbStarted(ignored): | |
259 addr = p.getHost() | |
260 self.assertEquals(addr, ('INET_UDP', addr.host, addr.port)) | |
261 return p.stopListening() | |
262 return d.addCallback(cbStarted) | |
263 testOldAddress.suppress = [ | |
264 util.suppress(message='IPv4Address.__getitem__', | |
265 category=DeprecationWarning)] | |
266 | |
267 | |
268 def testStartStop(self): | |
269 server = Server() | |
270 d = server.startedDeferred = defer.Deferred() | |
271 port1 = reactor.listenUDP(0, server, interface="127.0.0.1") | |
272 def cbStarted(ignored): | |
273 self.assertEquals(server.started, 1) | |
274 self.assertEquals(server.stopped, 0) | |
275 return port1.stopListening() | |
276 def cbStopped(ignored): | |
277 self.assertEquals(server.stopped, 1) | |
278 return d.addCallback(cbStarted).addCallback(cbStopped) | |
279 | |
280 def testRebind(self): | |
281 # Ensure binding the same DatagramProtocol repeatedly invokes all | |
282 # the right callbacks. | |
283 server = Server() | |
284 d = server.startedDeferred = defer.Deferred() | |
285 p = reactor.listenUDP(0, server, interface="127.0.0.1") | |
286 | |
287 def cbStarted(ignored, port): | |
288 return port.stopListening() | |
289 | |
290 def cbStopped(ignored): | |
291 d = server.startedDeferred = defer.Deferred() | |
292 p = reactor.listenUDP(0, server, interface="127.0.0.1") | |
293 return d.addCallback(cbStarted, p) | |
294 | |
295 return d.addCallback(cbStarted, p) | |
296 | |
297 | |
298 def testBindError(self): | |
299 server = Server() | |
300 d = server.startedDeferred = defer.Deferred() | |
301 port = reactor.listenUDP(0, server, interface='127.0.0.1') | |
302 | |
303 def cbStarted(ignored): | |
304 self.assertEquals(port.getHost(), server.transport.getHost()) | |
305 | |
306 server2 = Server() | |
307 self.assertRaises( | |
308 error.CannotListenError, | |
309 reactor.listenUDP, port.getHost().port, server2, | |
310 interface='127.0.0.1') | |
311 d.addCallback(cbStarted) | |
312 | |
313 def cbFinished(ignored): | |
314 return port.stopListening() | |
315 d.addCallback(cbFinished) | |
316 return d | |
317 | |
318 def testSendPackets(self): | |
319 server = Server() | |
320 serverStarted = server.startedDeferred = defer.Deferred() | |
321 port1 = reactor.listenUDP(0, server, interface="127.0.0.1") | |
322 | |
323 client = GoodClient() | |
324 clientStarted = client.startedDeferred = defer.Deferred() | |
325 | |
326 def cbServerStarted(ignored): | |
327 self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1") | |
328 return clientStarted | |
329 | |
330 d = serverStarted.addCallback(cbServerStarted) | |
331 | |
332 def cbClientStarted(ignored): | |
333 client.transport.connect("127.0.0.1", | |
334 server.transport.getHost().port) | |
335 cAddr = client.transport.getHost() | |
336 sAddr = server.transport.getHost() | |
337 | |
338 serverSend = client.packetReceived = defer.Deferred() | |
339 server.transport.write("hello", (cAddr.host, cAddr.port)) | |
340 | |
341 clientWrites = [ | |
342 ("a",), | |
343 ("b", None), | |
344 ("c", (sAddr.host, sAddr.port))] | |
345 | |
346 def cbClientSend(ignored): | |
347 if clientWrites: | |
348 nextClientWrite = server.packetReceived = defer.Deferred() | |
349 nextClientWrite.addCallback(cbClientSend) | |
350 client.transport.write(*clientWrites.pop(0)) | |
351 return nextClientWrite | |
352 | |
353 # No one will ever call .errback on either of these Deferreds, | |
354 # but there is a non-trivial amount of test code which might | |
355 # cause them to fail somehow. So fireOnOneErrback=True. | |
356 return defer.DeferredList([ | |
357 cbClientSend(None), | |
358 serverSend], | |
359 fireOnOneErrback=True) | |
360 | |
361 d.addCallback(cbClientStarted) | |
362 | |
363 def cbSendsFinished(ignored): | |
364 cAddr = client.transport.getHost() | |
365 sAddr = server.transport.getHost() | |
366 self.assertEquals( | |
367 client.packets, | |
368 [("hello", (sAddr.host, sAddr.port))]) | |
369 clientAddr = (cAddr.host, cAddr.port) | |
370 self.assertEquals( | |
371 server.packets, | |
372 [("a", clientAddr), | |
373 ("b", clientAddr), | |
374 ("c", clientAddr)]) | |
375 | |
376 d.addCallback(cbSendsFinished) | |
377 | |
378 def cbFinished(ignored): | |
379 return defer.DeferredList([ | |
380 defer.maybeDeferred(port1.stopListening), | |
381 defer.maybeDeferred(self.port2.stopListening)], | |
382 fireOnOneErrback=True) | |
383 | |
384 d.addCallback(cbFinished) | |
385 return d | |
386 | |
387 | |
388 def testConnectionRefused(self): | |
389 # assume no one listening on port 80 UDP | |
390 client = GoodClient() | |
391 clientStarted = client.startedDeferred = defer.Deferred() | |
392 port = reactor.listenUDP(0, client, interface="127.0.0.1") | |
393 | |
394 server = Server() | |
395 serverStarted = server.startedDeferred = defer.Deferred() | |
396 port2 = reactor.listenUDP(0, server, interface="127.0.0.1") | |
397 | |
398 d = defer.DeferredList( | |
399 [clientStarted, serverStarted], | |
400 fireOnOneErrback=True) | |
401 | |
402 def cbStarted(ignored): | |
403 connectionRefused = client.startedDeferred = defer.Deferred() | |
404 client.transport.connect("127.0.0.1", 80) | |
405 | |
406 for i in range(10): | |
407 client.transport.write(str(i)) | |
408 server.transport.write(str(i), ("127.0.0.1", 80)) | |
409 | |
410 return self.assertFailure( | |
411 connectionRefused, | |
412 error.ConnectionRefusedError) | |
413 | |
414 d.addCallback(cbStarted) | |
415 | |
416 def cbFinished(ignored): | |
417 return defer.DeferredList([ | |
418 defer.maybeDeferred(port.stopListening), | |
419 defer.maybeDeferred(port2.stopListening)], | |
420 fireOnOneErrback=True) | |
421 | |
422 d.addCallback(cbFinished) | |
423 return d | |
424 | |
425 def testBadConnect(self): | |
426 client = GoodClient() | |
427 port = reactor.listenUDP(0, client, interface="127.0.0.1") | |
428 self.assertRaises(ValueError, client.transport.connect, | |
429 "localhost", 80) | |
430 client.transport.connect("127.0.0.1", 80) | |
431 self.assertRaises(RuntimeError, client.transport.connect, | |
432 "127.0.0.1", 80) | |
433 return port.stopListening() | |
434 | |
435 | |
436 | |
437 def testDatagramReceivedError(self): | |
438 """ | |
439 Test that when datagramReceived raises an exception it is logged but | |
440 the port is not disconnected. | |
441 """ | |
442 finalDeferred = defer.Deferred() | |
443 | |
444 def cbCompleted(ign): | |
445 """ | |
446 Flush the exceptions which the reactor should have logged and make | |
447 sure they're actually there. | |
448 """ | |
449 errs = self.flushLoggedErrors(BadClientError) | |
450 self.assertEquals(len(errs), 2, "Incorrectly found %d errors, expect
ed 2" % (len(errs),)) | |
451 finalDeferred.addCallback(cbCompleted) | |
452 | |
453 client = BadClient() | |
454 port = reactor.listenUDP(0, client, interface='127.0.0.1') | |
455 | |
456 def cbCleanup(result): | |
457 """ | |
458 Disconnect the port we started and pass on whatever was given to us | |
459 in case it was a Failure. | |
460 """ | |
461 return defer.maybeDeferred(port.stopListening).addBoth(lambda ign: r
esult) | |
462 finalDeferred.addBoth(cbCleanup) | |
463 | |
464 addr = port.getHost() | |
465 | |
466 # UDP is not reliable. Try to send as many as 60 packets before giving | |
467 # up. Conceivably, all sixty could be lost, but they probably won't be | |
468 # unless all UDP traffic is being dropped, and then the rest of these | |
469 # UDP tests will likely fail as well. Ideally, this test (and probably | |
470 # others) wouldn't even use actual UDP traffic: instead, they would | |
471 # stub out the socket with a fake one which could be made to behave in | |
472 # whatever way the test desires. Unfortunately, this is hard because | |
473 # of differences in various reactor implementations. | |
474 attempts = range(60) | |
475 succeededAttempts = [] | |
476 | |
477 def makeAttempt(): | |
478 """ | |
479 Send one packet to the listening BadClient. Set up a 0.1 second | |
480 timeout to do re-transmits in case the packet is dropped. When two | |
481 packets have been received by the BadClient, stop sending and let | |
482 the finalDeferred's callbacks do some assertions. | |
483 """ | |
484 if not attempts: | |
485 try: | |
486 self.fail("Not enough packets received") | |
487 except: | |
488 finalDeferred.errback() | |
489 | |
490 self.failIfIdentical(client.transport, None, "UDP Protocol lost its
transport") | |
491 | |
492 packet = str(attempts.pop(0)) | |
493 packetDeferred = defer.Deferred() | |
494 client.setDeferred(packetDeferred) | |
495 client.transport.write(packet, (addr.host, addr.port)) | |
496 | |
497 def cbPacketReceived(packet): | |
498 """ | |
499 A packet arrived. Cancel the timeout for it, record it, and | |
500 maybe finish the test. | |
501 """ | |
502 timeoutCall.cancel() | |
503 succeededAttempts.append(packet) | |
504 if len(succeededAttempts) == 2: | |
505 # The second error has not yet been logged, since the | |
506 # exception which causes it hasn't even been raised yet. | |
507 # Give the datagramReceived call a chance to finish, then | |
508 # let the test finish asserting things. | |
509 reactor.callLater(0, finalDeferred.callback, None) | |
510 else: | |
511 makeAttempt() | |
512 | |
513 def ebPacketTimeout(err): | |
514 """ | |
515 The packet wasn't received quickly enough. Try sending another | |
516 one. It doesn't matter if the packet for which this was the | |
517 timeout eventually arrives: makeAttempt throws away the | |
518 Deferred on which this function is the errback, so when | |
519 datagramReceived callbacks, so it won't be on this Deferred, so | |
520 it won't raise an AlreadyCalledError. | |
521 """ | |
522 makeAttempt() | |
523 | |
524 packetDeferred.addCallbacks(cbPacketReceived, ebPacketTimeout) | |
525 packetDeferred.addErrback(finalDeferred.errback) | |
526 | |
527 timeoutCall = reactor.callLater( | |
528 0.1, packetDeferred.errback, | |
529 error.TimeoutError( | |
530 "Timed out in testDatagramReceivedError")) | |
531 | |
532 makeAttempt() | |
533 return finalDeferred | |
534 | |
535 | |
536 def testPortRepr(self): | |
537 client = GoodClient() | |
538 p = reactor.listenUDP(0, client) | |
539 portNo = str(p.getHost().port) | |
540 self.failIf(repr(p).find(portNo) == -1) | |
541 def stoppedListening(ign): | |
542 self.failIf(repr(p).find(portNo) != -1) | |
543 d = defer.maybeDeferred(p.stopListening) | |
544 d.addCallback(stoppedListening) | |
545 return d | |
546 | |
547 | |
548 class ReactorShutdownInteraction(unittest.TestCase): | |
549 """Test reactor shutdown interaction""" | |
550 | |
551 def setUp(self): | |
552 """Start a UDP port""" | |
553 self.server = Server() | |
554 self.port = reactor.listenUDP(0, self.server, interface='127.0.0.1') | |
555 | |
556 def tearDown(self): | |
557 """Stop the UDP port""" | |
558 return self.port.stopListening() | |
559 | |
560 def testShutdownFromDatagramReceived(self): | |
561 """Test reactor shutdown while in a recvfrom() loop""" | |
562 | |
563 # udp.Port's doRead calls recvfrom() in a loop, as an optimization. | |
564 # It is important this loop terminate under various conditions. | |
565 # Previously, if datagramReceived synchronously invoked | |
566 # reactor.stop(), under certain reactors, the Port's socket would | |
567 # synchronously disappear, causing an AttributeError inside that | |
568 # loop. This was mishandled, causing the loop to spin forever. | |
569 # This test is primarily to ensure that the loop never spins | |
570 # forever. | |
571 | |
572 finished = defer.Deferred() | |
573 pr = self.server.packetReceived = defer.Deferred() | |
574 | |
575 def pktRece(ignored): | |
576 # Simulate reactor.stop() behavior :( | |
577 self.server.transport.connectionLost() | |
578 # Then delay this Deferred chain until the protocol has been | |
579 # disconnected, as the reactor should do in an error condition | |
580 # such as we are inducing. This is very much a whitebox test. | |
581 reactor.callLater(0, finished.callback, None) | |
582 pr.addCallback(pktRece) | |
583 | |
584 def flushErrors(ignored): | |
585 # We are breaking abstraction and calling private APIs, any | |
586 # number of horrible errors might occur. As long as the reactor | |
587 # doesn't hang, this test is satisfied. (There may be room for | |
588 # another, stricter test.) | |
589 self.flushLoggedErrors() | |
590 finished.addCallback(flushErrors) | |
591 self.server.transport.write('\0' * 64, ('127.0.0.1', | |
592 self.server.transport.getHost().port)) | |
593 return finished | |
594 | |
595 | |
596 | |
597 class MulticastTestCase(unittest.TestCase): | |
598 | |
599 def setUp(self): | |
600 self.server = Server() | |
601 self.client = Client() | |
602 # multicast won't work if we listen over loopback, apparently | |
603 self.port1 = reactor.listenMulticast(0, self.server) | |
604 self.port2 = reactor.listenMulticast(0, self.client) | |
605 self.client.transport.connect( | |
606 "127.0.0.1", self.server.transport.getHost().port) | |
607 | |
608 | |
609 def tearDown(self): | |
610 return gatherResults([ | |
611 maybeDeferred(self.port1.stopListening), | |
612 maybeDeferred(self.port2.stopListening)]) | |
613 | |
614 | |
615 def testTTL(self): | |
616 for o in self.client, self.server: | |
617 self.assertEquals(o.transport.getTTL(), 1) | |
618 o.transport.setTTL(2) | |
619 self.assertEquals(o.transport.getTTL(), 2) | |
620 | |
621 | |
622 def test_loopback(self): | |
623 """ | |
624 Test that after loopback mode has been set, multicast packets are | |
625 delivered to their sender. | |
626 """ | |
627 self.assertEquals(self.server.transport.getLoopbackMode(), 1) | |
628 addr = self.server.transport.getHost() | |
629 joined = self.server.transport.joinGroup("225.0.0.250") | |
630 | |
631 def cbJoined(ignored): | |
632 d = self.server.packetReceived = Deferred() | |
633 self.server.transport.write("hello", ("225.0.0.250", addr.port)) | |
634 return d | |
635 joined.addCallback(cbJoined) | |
636 | |
637 def cbPacket(ignored): | |
638 self.assertEqual(len(self.server.packets), 1) | |
639 self.server.transport.setLoopbackMode(0) | |
640 self.assertEquals(self.server.transport.getLoopbackMode(), 0) | |
641 self.server.transport.write("hello", ("225.0.0.250", addr.port)) | |
642 | |
643 # This is fairly lame. | |
644 d = Deferred() | |
645 reactor.callLater(0, d.callback, None) | |
646 return d | |
647 joined.addCallback(cbPacket) | |
648 | |
649 def cbNoPacket(ignored): | |
650 self.assertEqual(len(self.server.packets), 1) | |
651 joined.addCallback(cbNoPacket) | |
652 | |
653 return joined | |
654 | |
655 | |
656 def test_interface(self): | |
657 """ | |
658 Test C{getOutgoingInterface} and C{setOutgoingInterface}. | |
659 """ | |
660 self.assertEqual( | |
661 self.client.transport.getOutgoingInterface(), "0.0.0.0") | |
662 self.assertEqual( | |
663 self.server.transport.getOutgoingInterface(), "0.0.0.0") | |
664 | |
665 d1 = self.client.transport.setOutgoingInterface("127.0.0.1") | |
666 d2 = self.server.transport.setOutgoingInterface("127.0.0.1") | |
667 result = gatherResults([d1, d2]) | |
668 | |
669 def cbInterfaces(ignored): | |
670 self.assertEqual( | |
671 self.client.transport.getOutgoingInterface(), "127.0.0.1") | |
672 self.assertEqual( | |
673 self.server.transport.getOutgoingInterface(), "127.0.0.1") | |
674 result.addCallback(cbInterfaces) | |
675 return result | |
676 | |
677 | |
678 def test_joinLeave(self): | |
679 """ | |
680 Test that multicast a group can be joined and left. | |
681 """ | |
682 d = self.client.transport.joinGroup("225.0.0.250") | |
683 | |
684 def clientJoined(ignored): | |
685 return self.client.transport.leaveGroup("225.0.0.250") | |
686 d.addCallback(clientJoined) | |
687 | |
688 def clientLeft(ignored): | |
689 return self.server.transport.joinGroup("225.0.0.250") | |
690 d.addCallback(clientLeft) | |
691 | |
692 def serverJoined(ignored): | |
693 return self.server.transport.leaveGroup("225.0.0.250") | |
694 d.addCallback(serverJoined) | |
695 | |
696 return d | |
697 | |
698 | |
699 def test_joinFailure(self): | |
700 """ | |
701 Test that an attempt to join an address which is not a multicast | |
702 address fails with L{error.MulticastJoinError}. | |
703 """ | |
704 # 127.0.0.1 is not a multicast address, so joining it should fail. | |
705 return self.assertFailure( | |
706 self.client.transport.joinGroup("127.0.0.1"), | |
707 error.MulticastJoinError) | |
708 if runtime.platform.isWindows(): | |
709 test_joinFailure.todo = "Windows' multicast is wonky" | |
710 | |
711 | |
712 def test_multicast(self): | |
713 """ | |
714 Test that a multicast group can be joined and messages sent to and | |
715 received from it. | |
716 """ | |
717 c = Server() | |
718 p = reactor.listenMulticast(0, c) | |
719 addr = self.server.transport.getHost() | |
720 | |
721 joined = self.server.transport.joinGroup("225.0.0.250") | |
722 | |
723 def cbJoined(ignored): | |
724 d = self.server.packetReceived = Deferred() | |
725 c.transport.write("hello world", ("225.0.0.250", addr.port)) | |
726 return d | |
727 joined.addCallback(cbJoined) | |
728 | |
729 def cbPacket(ignored): | |
730 self.assertEquals(self.server.packets[0][0], "hello world") | |
731 joined.addCallback(cbPacket) | |
732 | |
733 def cleanup(passthrough): | |
734 result = maybeDeferred(p.stopListening) | |
735 result.addCallback(lambda ign: passthrough) | |
736 return result | |
737 joined.addCallback(cleanup) | |
738 | |
739 return joined | |
740 | |
741 | |
742 def test_multiListen(self): | |
743 """ | |
744 Test that multiple sockets can listen on the same multicast port and | |
745 that they both receive multicast messages directed to that address. | |
746 """ | |
747 firstClient = Server() | |
748 firstPort = reactor.listenMulticast( | |
749 0, firstClient, listenMultiple=True) | |
750 | |
751 portno = firstPort.getHost().port | |
752 | |
753 secondClient = Server() | |
754 secondPort = reactor.listenMulticast( | |
755 portno, secondClient, listenMultiple=True) | |
756 | |
757 joined = self.server.transport.joinGroup("225.0.0.250") | |
758 | |
759 def serverJoined(ignored): | |
760 d1 = firstClient.packetReceived = Deferred() | |
761 d2 = secondClient.packetReceived = Deferred() | |
762 firstClient.transport.write("hello world", ("225.0.0.250", portno)) | |
763 return gatherResults([d1, d2]) | |
764 joined.addCallback(serverJoined) | |
765 | |
766 def gotPackets(ignored): | |
767 self.assertEquals(firstClient.packets[0][0], "hello world") | |
768 self.assertEquals(secondClient.packets[0][0], "hello world") | |
769 joined.addCallback(gotPackets) | |
770 | |
771 def cleanup(passthrough): | |
772 result = gatherResults([ | |
773 maybeDeferred(firstPort.stopListening), | |
774 maybeDeferred(secondPort.stopListening)]) | |
775 result.addCallback(lambda ign: passthrough) | |
776 return result | |
777 joined.addBoth(cleanup) | |
778 return joined | |
779 if runtime.platform.isWindows(): | |
780 test_multiListen.skip = ("on non-linux platforms it appears multiple " | |
781 "processes can listen, but not multiple sockets
" | |
782 "in same process?") | |
783 | |
784 if not interfaces.IReactorUDP(reactor, None): | |
785 UDPTestCase.skip = "This reactor does not support UDP" | |
786 ReactorShutdownInteraction.skip = "This reactor does not support UDP" | |
787 if not hasattr(reactor, "connectUDP"): | |
788 OldConnectedUDPTestCase.skip = "This reactor does not support connectUDP" | |
789 if not interfaces.IReactorMulticast(reactor, None): | |
790 MulticastTestCase.skip = "This reactor does not support multicast" | |
791 | |
792 def checkForLinux22(): | |
793 import os | |
794 if os.path.exists("/proc/version"): | |
795 s = open("/proc/version").read() | |
796 if s.startswith("Linux version"): | |
797 s = s.split()[2] | |
798 if s.split(".")[:2] == ["2", "2"]: | |
799 f = MulticastTestCase.testInterface.im_func | |
800 f.todo = "figure out why this fails in linux 2.2" | |
801 checkForLinux22() | |
OLD | NEW |