Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(143)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_udp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_udp -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Tests for implementations of L{IReactorUDP} and L{IReactorMulticast}.
7 """
8
9 from twisted.trial import unittest, util
10
11 from twisted.internet.defer import Deferred, gatherResults, maybeDeferred
12 from twisted.internet import protocol, reactor, error, defer, interfaces
13 from twisted.python import runtime
14
15
16 class Mixin:
17
18 started = 0
19 stopped = 0
20
21 startedDeferred = None
22
23 def __init__(self):
24 self.packets = []
25
26 def startProtocol(self):
27 self.started = 1
28 if self.startedDeferred is not None:
29 d, self.startedDeferred = self.startedDeferred, None
30 d.callback(None)
31
32 def stopProtocol(self):
33 self.stopped = 1
34
35
36 class Server(Mixin, protocol.DatagramProtocol):
37 packetReceived = None
38 refused = 0
39
40
41 def datagramReceived(self, data, addr):
42 self.packets.append((data, addr))
43 if self.packetReceived is not None:
44 d, self.packetReceived = self.packetReceived, None
45 d.callback(None)
46
47
48
49 class Client(Mixin, protocol.ConnectedDatagramProtocol):
50
51 packetReceived = None
52 refused = 0
53
54 def datagramReceived(self, data):
55 self.packets.append(data)
56 if self.packetReceived is not None:
57 d, self.packetReceived = self.packetReceived, None
58 d.callback(None)
59
60 def connectionFailed(self, failure):
61 if self.startedDeferred is not None:
62 d, self.startedDeferred = self.startedDeferred, None
63 d.errback(failure)
64 self.failure = failure
65
66 def connectionRefused(self):
67 if self.startedDeferred is not None:
68 d, self.startedDeferred = self.startedDeferred, None
69 d.errback(error.ConnectionRefusedError("yup"))
70 self.refused = 1
71
72
73 class GoodClient(Server):
74
75 def connectionRefused(self):
76 if self.startedDeferred is not None:
77 d, self.startedDeferred = self.startedDeferred, None
78 d.errback(error.ConnectionRefusedError("yup"))
79 self.refused = 1
80
81
82
83 class BadClientError(Exception):
84 """
85 Raised by BadClient at the end of every datagramReceived call to try and
86 screw stuff up.
87 """
88
89
90
91 class BadClient(protocol.DatagramProtocol):
92 """
93 A DatagramProtocol which always raises an exception from datagramReceived.
94 Used to test error handling behavior in the reactor for that method.
95 """
96 d = None
97
98 def setDeferred(self, d):
99 """
100 Set the Deferred which will be called back when datagramReceived is
101 called.
102 """
103 self.d = d
104
105
106 def datagramReceived(self, bytes, addr):
107 if self.d is not None:
108 d, self.d = self.d, None
109 d.callback(bytes)
110 raise BadClientError("Application code is very buggy!")
111
112
113
114 class OldConnectedUDPTestCase(unittest.TestCase):
115 def testStartStop(self):
116 client = Client()
117 d = client.startedDeferred = defer.Deferred()
118 port2 = reactor.connectUDP("127.0.0.1", 8888, client)
119
120 def assertName():
121 self.failUnless(repr(port2).find('test_udp.Client') >= 0)
122
123 def cbStarted(ignored):
124 self.assertEquals(client.started, 1)
125 self.assertEquals(client.stopped, 0)
126 assertName()
127 d = defer.maybeDeferred(port2.stopListening)
128 d.addCallback(lambda ign: assertName())
129 return d
130
131 return d.addCallback(cbStarted)
132 testStartStop.suppress = [
133 util.suppress(message='use listenUDP and then transport.connect',
134 category=DeprecationWarning)]
135
136
137 def testDNSFailure(self):
138 client = Client()
139 d = client.startedDeferred = defer.Deferred()
140 # if this domain exists, shoot your sysadmin
141 reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client)
142
143 def didNotConnect(ign):
144 self.assertEquals(client.stopped, 0)
145 self.assertEquals(client.started, 0)
146
147 d = self.assertFailure(d, error.DNSLookupError)
148 d.addCallback(didNotConnect)
149 return d
150 testDNSFailure.suppress = [
151 util.suppress(message='use listenUDP and then transport.connect',
152 category=DeprecationWarning)]
153
154
155 def testSendPackets(self):
156 server = Server()
157 serverStarted = server.startedDeferred = defer.Deferred()
158
159 client = Client()
160 clientStarted = client.startedDeferred = defer.Deferred()
161
162 port1 = reactor.listenUDP(0, server, interface="127.0.0.1")
163
164 def cbServerStarted(ignored):
165 self.port2 = reactor.connectUDP("127.0.0.1",
166 server.transport.getHost().port,
167 client)
168 return clientStarted
169
170 d = serverStarted.addCallback(cbServerStarted)
171
172 def cbClientStarted(ignored):
173 clientSend = server.packetReceived = defer.Deferred()
174 serverSend = client.packetReceived = defer.Deferred()
175
176 cAddr = client.transport.getHost()
177 server.transport.write("hello", (cAddr.host, cAddr.port))
178 client.transport.write("world")
179
180 # No one will ever call errback on either of these Deferreds,
181 # otherwise I would pass fireOnOneErrback=True here.
182 return defer.DeferredList([clientSend, serverSend])
183
184 d.addCallback(cbClientStarted)
185
186 def cbPackets(ignored):
187 self.assertEquals(client.packets, ["hello"])
188 self.assertEquals(server.packets,
189 [("world", ("127.0.0.1",
190 client.transport.getHost().port))])
191
192 return defer.DeferredList([
193 defer.maybeDeferred(port1.stopListening),
194 defer.maybeDeferred(self.port2.stopListening)],
195 fireOnOneErrback=True)
196
197 d.addCallback(cbPackets)
198 return d
199 testSendPackets.suppress = [
200 util.suppress(message='use listenUDP and then transport.connect',
201 category=DeprecationWarning)]
202
203
204 def test_connectionRefused(self):
205 """
206 Test that using the connected UDP API will deliver connection refused
207 notification when packets are sent to an address at which no one is
208 listening.
209 """
210 # XXX - assume no one listening on port 80 UDP
211 client = Client()
212 clientStarted = client.startedDeferred = Deferred()
213 server = Server()
214 serverStarted = server.startedDeferred = Deferred()
215 started = gatherResults([clientStarted, serverStarted])
216
217 clientPort = reactor.connectUDP("127.0.0.1", 80, client)
218 serverPort = reactor.listenUDP(0, server, interface="127.0.0.1")
219
220 def cbStarted(ignored):
221 clientRefused = client.startedDeferred = Deferred()
222
223 client.transport.write("a")
224 client.transport.write("b")
225 server.transport.write("c", ("127.0.0.1", 80))
226 server.transport.write("d", ("127.0.0.1", 80))
227 server.transport.write("e", ("127.0.0.1", 80))
228
229 c = clientPort.getHost()
230 s = serverPort.getHost()
231 server.transport.write("toserver", (s.host, s.port))
232 server.transport.write("toclient", (c.host, c.port))
233
234 return self.assertFailure(clientRefused, error.ConnectionRefusedErro r)
235 started.addCallback(cbStarted)
236
237 def cleanup(passthrough):
238 result = gatherResults([
239 maybeDeferred(clientPort.stopListening),
240 maybeDeferred(serverPort.stopListening)])
241 result.addCallback(lambda ign: passthrough)
242 return result
243
244 started.addBoth(cleanup)
245 return started
246 test_connectionRefused.suppress = [
247 util.suppress(message='use listenUDP and then transport.connect',
248 category=DeprecationWarning)]
249
250
251
252 class UDPTestCase(unittest.TestCase):
253
254 def testOldAddress(self):
255 server = Server()
256 d = server.startedDeferred = defer.Deferred()
257 p = reactor.listenUDP(0, server, interface="127.0.0.1")
258 def cbStarted(ignored):
259 addr = p.getHost()
260 self.assertEquals(addr, ('INET_UDP', addr.host, addr.port))
261 return p.stopListening()
262 return d.addCallback(cbStarted)
263 testOldAddress.suppress = [
264 util.suppress(message='IPv4Address.__getitem__',
265 category=DeprecationWarning)]
266
267
268 def testStartStop(self):
269 server = Server()
270 d = server.startedDeferred = defer.Deferred()
271 port1 = reactor.listenUDP(0, server, interface="127.0.0.1")
272 def cbStarted(ignored):
273 self.assertEquals(server.started, 1)
274 self.assertEquals(server.stopped, 0)
275 return port1.stopListening()
276 def cbStopped(ignored):
277 self.assertEquals(server.stopped, 1)
278 return d.addCallback(cbStarted).addCallback(cbStopped)
279
280 def testRebind(self):
281 # Ensure binding the same DatagramProtocol repeatedly invokes all
282 # the right callbacks.
283 server = Server()
284 d = server.startedDeferred = defer.Deferred()
285 p = reactor.listenUDP(0, server, interface="127.0.0.1")
286
287 def cbStarted(ignored, port):
288 return port.stopListening()
289
290 def cbStopped(ignored):
291 d = server.startedDeferred = defer.Deferred()
292 p = reactor.listenUDP(0, server, interface="127.0.0.1")
293 return d.addCallback(cbStarted, p)
294
295 return d.addCallback(cbStarted, p)
296
297
298 def testBindError(self):
299 server = Server()
300 d = server.startedDeferred = defer.Deferred()
301 port = reactor.listenUDP(0, server, interface='127.0.0.1')
302
303 def cbStarted(ignored):
304 self.assertEquals(port.getHost(), server.transport.getHost())
305
306 server2 = Server()
307 self.assertRaises(
308 error.CannotListenError,
309 reactor.listenUDP, port.getHost().port, server2,
310 interface='127.0.0.1')
311 d.addCallback(cbStarted)
312
313 def cbFinished(ignored):
314 return port.stopListening()
315 d.addCallback(cbFinished)
316 return d
317
318 def testSendPackets(self):
319 server = Server()
320 serverStarted = server.startedDeferred = defer.Deferred()
321 port1 = reactor.listenUDP(0, server, interface="127.0.0.1")
322
323 client = GoodClient()
324 clientStarted = client.startedDeferred = defer.Deferred()
325
326 def cbServerStarted(ignored):
327 self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1")
328 return clientStarted
329
330 d = serverStarted.addCallback(cbServerStarted)
331
332 def cbClientStarted(ignored):
333 client.transport.connect("127.0.0.1",
334 server.transport.getHost().port)
335 cAddr = client.transport.getHost()
336 sAddr = server.transport.getHost()
337
338 serverSend = client.packetReceived = defer.Deferred()
339 server.transport.write("hello", (cAddr.host, cAddr.port))
340
341 clientWrites = [
342 ("a",),
343 ("b", None),
344 ("c", (sAddr.host, sAddr.port))]
345
346 def cbClientSend(ignored):
347 if clientWrites:
348 nextClientWrite = server.packetReceived = defer.Deferred()
349 nextClientWrite.addCallback(cbClientSend)
350 client.transport.write(*clientWrites.pop(0))
351 return nextClientWrite
352
353 # No one will ever call .errback on either of these Deferreds,
354 # but there is a non-trivial amount of test code which might
355 # cause them to fail somehow. So fireOnOneErrback=True.
356 return defer.DeferredList([
357 cbClientSend(None),
358 serverSend],
359 fireOnOneErrback=True)
360
361 d.addCallback(cbClientStarted)
362
363 def cbSendsFinished(ignored):
364 cAddr = client.transport.getHost()
365 sAddr = server.transport.getHost()
366 self.assertEquals(
367 client.packets,
368 [("hello", (sAddr.host, sAddr.port))])
369 clientAddr = (cAddr.host, cAddr.port)
370 self.assertEquals(
371 server.packets,
372 [("a", clientAddr),
373 ("b", clientAddr),
374 ("c", clientAddr)])
375
376 d.addCallback(cbSendsFinished)
377
378 def cbFinished(ignored):
379 return defer.DeferredList([
380 defer.maybeDeferred(port1.stopListening),
381 defer.maybeDeferred(self.port2.stopListening)],
382 fireOnOneErrback=True)
383
384 d.addCallback(cbFinished)
385 return d
386
387
388 def testConnectionRefused(self):
389 # assume no one listening on port 80 UDP
390 client = GoodClient()
391 clientStarted = client.startedDeferred = defer.Deferred()
392 port = reactor.listenUDP(0, client, interface="127.0.0.1")
393
394 server = Server()
395 serverStarted = server.startedDeferred = defer.Deferred()
396 port2 = reactor.listenUDP(0, server, interface="127.0.0.1")
397
398 d = defer.DeferredList(
399 [clientStarted, serverStarted],
400 fireOnOneErrback=True)
401
402 def cbStarted(ignored):
403 connectionRefused = client.startedDeferred = defer.Deferred()
404 client.transport.connect("127.0.0.1", 80)
405
406 for i in range(10):
407 client.transport.write(str(i))
408 server.transport.write(str(i), ("127.0.0.1", 80))
409
410 return self.assertFailure(
411 connectionRefused,
412 error.ConnectionRefusedError)
413
414 d.addCallback(cbStarted)
415
416 def cbFinished(ignored):
417 return defer.DeferredList([
418 defer.maybeDeferred(port.stopListening),
419 defer.maybeDeferred(port2.stopListening)],
420 fireOnOneErrback=True)
421
422 d.addCallback(cbFinished)
423 return d
424
425 def testBadConnect(self):
426 client = GoodClient()
427 port = reactor.listenUDP(0, client, interface="127.0.0.1")
428 self.assertRaises(ValueError, client.transport.connect,
429 "localhost", 80)
430 client.transport.connect("127.0.0.1", 80)
431 self.assertRaises(RuntimeError, client.transport.connect,
432 "127.0.0.1", 80)
433 return port.stopListening()
434
435
436
437 def testDatagramReceivedError(self):
438 """
439 Test that when datagramReceived raises an exception it is logged but
440 the port is not disconnected.
441 """
442 finalDeferred = defer.Deferred()
443
444 def cbCompleted(ign):
445 """
446 Flush the exceptions which the reactor should have logged and make
447 sure they're actually there.
448 """
449 errs = self.flushLoggedErrors(BadClientError)
450 self.assertEquals(len(errs), 2, "Incorrectly found %d errors, expect ed 2" % (len(errs),))
451 finalDeferred.addCallback(cbCompleted)
452
453 client = BadClient()
454 port = reactor.listenUDP(0, client, interface='127.0.0.1')
455
456 def cbCleanup(result):
457 """
458 Disconnect the port we started and pass on whatever was given to us
459 in case it was a Failure.
460 """
461 return defer.maybeDeferred(port.stopListening).addBoth(lambda ign: r esult)
462 finalDeferred.addBoth(cbCleanup)
463
464 addr = port.getHost()
465
466 # UDP is not reliable. Try to send as many as 60 packets before giving
467 # up. Conceivably, all sixty could be lost, but they probably won't be
468 # unless all UDP traffic is being dropped, and then the rest of these
469 # UDP tests will likely fail as well. Ideally, this test (and probably
470 # others) wouldn't even use actual UDP traffic: instead, they would
471 # stub out the socket with a fake one which could be made to behave in
472 # whatever way the test desires. Unfortunately, this is hard because
473 # of differences in various reactor implementations.
474 attempts = range(60)
475 succeededAttempts = []
476
477 def makeAttempt():
478 """
479 Send one packet to the listening BadClient. Set up a 0.1 second
480 timeout to do re-transmits in case the packet is dropped. When two
481 packets have been received by the BadClient, stop sending and let
482 the finalDeferred's callbacks do some assertions.
483 """
484 if not attempts:
485 try:
486 self.fail("Not enough packets received")
487 except:
488 finalDeferred.errback()
489
490 self.failIfIdentical(client.transport, None, "UDP Protocol lost its transport")
491
492 packet = str(attempts.pop(0))
493 packetDeferred = defer.Deferred()
494 client.setDeferred(packetDeferred)
495 client.transport.write(packet, (addr.host, addr.port))
496
497 def cbPacketReceived(packet):
498 """
499 A packet arrived. Cancel the timeout for it, record it, and
500 maybe finish the test.
501 """
502 timeoutCall.cancel()
503 succeededAttempts.append(packet)
504 if len(succeededAttempts) == 2:
505 # The second error has not yet been logged, since the
506 # exception which causes it hasn't even been raised yet.
507 # Give the datagramReceived call a chance to finish, then
508 # let the test finish asserting things.
509 reactor.callLater(0, finalDeferred.callback, None)
510 else:
511 makeAttempt()
512
513 def ebPacketTimeout(err):
514 """
515 The packet wasn't received quickly enough. Try sending another
516 one. It doesn't matter if the packet for which this was the
517 timeout eventually arrives: makeAttempt throws away the
518 Deferred on which this function is the errback, so when
519 datagramReceived callbacks, so it won't be on this Deferred, so
520 it won't raise an AlreadyCalledError.
521 """
522 makeAttempt()
523
524 packetDeferred.addCallbacks(cbPacketReceived, ebPacketTimeout)
525 packetDeferred.addErrback(finalDeferred.errback)
526
527 timeoutCall = reactor.callLater(
528 0.1, packetDeferred.errback,
529 error.TimeoutError(
530 "Timed out in testDatagramReceivedError"))
531
532 makeAttempt()
533 return finalDeferred
534
535
536 def testPortRepr(self):
537 client = GoodClient()
538 p = reactor.listenUDP(0, client)
539 portNo = str(p.getHost().port)
540 self.failIf(repr(p).find(portNo) == -1)
541 def stoppedListening(ign):
542 self.failIf(repr(p).find(portNo) != -1)
543 d = defer.maybeDeferred(p.stopListening)
544 d.addCallback(stoppedListening)
545 return d
546
547
548 class ReactorShutdownInteraction(unittest.TestCase):
549 """Test reactor shutdown interaction"""
550
551 def setUp(self):
552 """Start a UDP port"""
553 self.server = Server()
554 self.port = reactor.listenUDP(0, self.server, interface='127.0.0.1')
555
556 def tearDown(self):
557 """Stop the UDP port"""
558 return self.port.stopListening()
559
560 def testShutdownFromDatagramReceived(self):
561 """Test reactor shutdown while in a recvfrom() loop"""
562
563 # udp.Port's doRead calls recvfrom() in a loop, as an optimization.
564 # It is important this loop terminate under various conditions.
565 # Previously, if datagramReceived synchronously invoked
566 # reactor.stop(), under certain reactors, the Port's socket would
567 # synchronously disappear, causing an AttributeError inside that
568 # loop. This was mishandled, causing the loop to spin forever.
569 # This test is primarily to ensure that the loop never spins
570 # forever.
571
572 finished = defer.Deferred()
573 pr = self.server.packetReceived = defer.Deferred()
574
575 def pktRece(ignored):
576 # Simulate reactor.stop() behavior :(
577 self.server.transport.connectionLost()
578 # Then delay this Deferred chain until the protocol has been
579 # disconnected, as the reactor should do in an error condition
580 # such as we are inducing. This is very much a whitebox test.
581 reactor.callLater(0, finished.callback, None)
582 pr.addCallback(pktRece)
583
584 def flushErrors(ignored):
585 # We are breaking abstraction and calling private APIs, any
586 # number of horrible errors might occur. As long as the reactor
587 # doesn't hang, this test is satisfied. (There may be room for
588 # another, stricter test.)
589 self.flushLoggedErrors()
590 finished.addCallback(flushErrors)
591 self.server.transport.write('\0' * 64, ('127.0.0.1',
592 self.server.transport.getHost().port))
593 return finished
594
595
596
597 class MulticastTestCase(unittest.TestCase):
598
599 def setUp(self):
600 self.server = Server()
601 self.client = Client()
602 # multicast won't work if we listen over loopback, apparently
603 self.port1 = reactor.listenMulticast(0, self.server)
604 self.port2 = reactor.listenMulticast(0, self.client)
605 self.client.transport.connect(
606 "127.0.0.1", self.server.transport.getHost().port)
607
608
609 def tearDown(self):
610 return gatherResults([
611 maybeDeferred(self.port1.stopListening),
612 maybeDeferred(self.port2.stopListening)])
613
614
615 def testTTL(self):
616 for o in self.client, self.server:
617 self.assertEquals(o.transport.getTTL(), 1)
618 o.transport.setTTL(2)
619 self.assertEquals(o.transport.getTTL(), 2)
620
621
622 def test_loopback(self):
623 """
624 Test that after loopback mode has been set, multicast packets are
625 delivered to their sender.
626 """
627 self.assertEquals(self.server.transport.getLoopbackMode(), 1)
628 addr = self.server.transport.getHost()
629 joined = self.server.transport.joinGroup("225.0.0.250")
630
631 def cbJoined(ignored):
632 d = self.server.packetReceived = Deferred()
633 self.server.transport.write("hello", ("225.0.0.250", addr.port))
634 return d
635 joined.addCallback(cbJoined)
636
637 def cbPacket(ignored):
638 self.assertEqual(len(self.server.packets), 1)
639 self.server.transport.setLoopbackMode(0)
640 self.assertEquals(self.server.transport.getLoopbackMode(), 0)
641 self.server.transport.write("hello", ("225.0.0.250", addr.port))
642
643 # This is fairly lame.
644 d = Deferred()
645 reactor.callLater(0, d.callback, None)
646 return d
647 joined.addCallback(cbPacket)
648
649 def cbNoPacket(ignored):
650 self.assertEqual(len(self.server.packets), 1)
651 joined.addCallback(cbNoPacket)
652
653 return joined
654
655
656 def test_interface(self):
657 """
658 Test C{getOutgoingInterface} and C{setOutgoingInterface}.
659 """
660 self.assertEqual(
661 self.client.transport.getOutgoingInterface(), "0.0.0.0")
662 self.assertEqual(
663 self.server.transport.getOutgoingInterface(), "0.0.0.0")
664
665 d1 = self.client.transport.setOutgoingInterface("127.0.0.1")
666 d2 = self.server.transport.setOutgoingInterface("127.0.0.1")
667 result = gatherResults([d1, d2])
668
669 def cbInterfaces(ignored):
670 self.assertEqual(
671 self.client.transport.getOutgoingInterface(), "127.0.0.1")
672 self.assertEqual(
673 self.server.transport.getOutgoingInterface(), "127.0.0.1")
674 result.addCallback(cbInterfaces)
675 return result
676
677
678 def test_joinLeave(self):
679 """
680 Test that multicast a group can be joined and left.
681 """
682 d = self.client.transport.joinGroup("225.0.0.250")
683
684 def clientJoined(ignored):
685 return self.client.transport.leaveGroup("225.0.0.250")
686 d.addCallback(clientJoined)
687
688 def clientLeft(ignored):
689 return self.server.transport.joinGroup("225.0.0.250")
690 d.addCallback(clientLeft)
691
692 def serverJoined(ignored):
693 return self.server.transport.leaveGroup("225.0.0.250")
694 d.addCallback(serverJoined)
695
696 return d
697
698
699 def test_joinFailure(self):
700 """
701 Test that an attempt to join an address which is not a multicast
702 address fails with L{error.MulticastJoinError}.
703 """
704 # 127.0.0.1 is not a multicast address, so joining it should fail.
705 return self.assertFailure(
706 self.client.transport.joinGroup("127.0.0.1"),
707 error.MulticastJoinError)
708 if runtime.platform.isWindows():
709 test_joinFailure.todo = "Windows' multicast is wonky"
710
711
712 def test_multicast(self):
713 """
714 Test that a multicast group can be joined and messages sent to and
715 received from it.
716 """
717 c = Server()
718 p = reactor.listenMulticast(0, c)
719 addr = self.server.transport.getHost()
720
721 joined = self.server.transport.joinGroup("225.0.0.250")
722
723 def cbJoined(ignored):
724 d = self.server.packetReceived = Deferred()
725 c.transport.write("hello world", ("225.0.0.250", addr.port))
726 return d
727 joined.addCallback(cbJoined)
728
729 def cbPacket(ignored):
730 self.assertEquals(self.server.packets[0][0], "hello world")
731 joined.addCallback(cbPacket)
732
733 def cleanup(passthrough):
734 result = maybeDeferred(p.stopListening)
735 result.addCallback(lambda ign: passthrough)
736 return result
737 joined.addCallback(cleanup)
738
739 return joined
740
741
742 def test_multiListen(self):
743 """
744 Test that multiple sockets can listen on the same multicast port and
745 that they both receive multicast messages directed to that address.
746 """
747 firstClient = Server()
748 firstPort = reactor.listenMulticast(
749 0, firstClient, listenMultiple=True)
750
751 portno = firstPort.getHost().port
752
753 secondClient = Server()
754 secondPort = reactor.listenMulticast(
755 portno, secondClient, listenMultiple=True)
756
757 joined = self.server.transport.joinGroup("225.0.0.250")
758
759 def serverJoined(ignored):
760 d1 = firstClient.packetReceived = Deferred()
761 d2 = secondClient.packetReceived = Deferred()
762 firstClient.transport.write("hello world", ("225.0.0.250", portno))
763 return gatherResults([d1, d2])
764 joined.addCallback(serverJoined)
765
766 def gotPackets(ignored):
767 self.assertEquals(firstClient.packets[0][0], "hello world")
768 self.assertEquals(secondClient.packets[0][0], "hello world")
769 joined.addCallback(gotPackets)
770
771 def cleanup(passthrough):
772 result = gatherResults([
773 maybeDeferred(firstPort.stopListening),
774 maybeDeferred(secondPort.stopListening)])
775 result.addCallback(lambda ign: passthrough)
776 return result
777 joined.addBoth(cleanup)
778 return joined
779 if runtime.platform.isWindows():
780 test_multiListen.skip = ("on non-linux platforms it appears multiple "
781 "processes can listen, but not multiple sockets "
782 "in same process?")
783
784 if not interfaces.IReactorUDP(reactor, None):
785 UDPTestCase.skip = "This reactor does not support UDP"
786 ReactorShutdownInteraction.skip = "This reactor does not support UDP"
787 if not hasattr(reactor, "connectUDP"):
788 OldConnectedUDPTestCase.skip = "This reactor does not support connectUDP"
789 if not interfaces.IReactorMulticast(reactor, None):
790 MulticastTestCase.skip = "This reactor does not support multicast"
791
792 def checkForLinux22():
793 import os
794 if os.path.exists("/proc/version"):
795 s = open("/proc/version").read()
796 if s.startswith("Linux version"):
797 s = s.split()[2]
798 if s.split(".")[:2] == ["2", "2"]:
799 f = MulticastTestCase.testInterface.im_func
800 f.todo = "figure out why this fails in linux 2.2"
801 checkForLinux22()
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_twistd.py ('k') | third_party/twisted_8_1/twisted/test/test_unix.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698