Index: third_party/twisted_8_1/twisted/test/test_udp.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_udp.py b/third_party/twisted_8_1/twisted/test/test_udp.py |
deleted file mode 100644 |
index 182efa8a54a5cb028158e3d2e16fdde4be007aab..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_udp.py |
+++ /dev/null |
@@ -1,801 +0,0 @@ |
-# -*- test-case-name: twisted.test.test_udp -*- |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Tests for implementations of L{IReactorUDP} and L{IReactorMulticast}. |
-""" |
- |
-from twisted.trial import unittest, util |
- |
-from twisted.internet.defer import Deferred, gatherResults, maybeDeferred |
-from twisted.internet import protocol, reactor, error, defer, interfaces |
-from twisted.python import runtime |
- |
- |
-class Mixin: |
- |
- started = 0 |
- stopped = 0 |
- |
- startedDeferred = None |
- |
- def __init__(self): |
- self.packets = [] |
- |
- def startProtocol(self): |
- self.started = 1 |
- if self.startedDeferred is not None: |
- d, self.startedDeferred = self.startedDeferred, None |
- d.callback(None) |
- |
- def stopProtocol(self): |
- self.stopped = 1 |
- |
- |
-class Server(Mixin, protocol.DatagramProtocol): |
- packetReceived = None |
- refused = 0 |
- |
- |
- def datagramReceived(self, data, addr): |
- self.packets.append((data, addr)) |
- if self.packetReceived is not None: |
- d, self.packetReceived = self.packetReceived, None |
- d.callback(None) |
- |
- |
- |
-class Client(Mixin, protocol.ConnectedDatagramProtocol): |
- |
- packetReceived = None |
- refused = 0 |
- |
- def datagramReceived(self, data): |
- self.packets.append(data) |
- if self.packetReceived is not None: |
- d, self.packetReceived = self.packetReceived, None |
- d.callback(None) |
- |
- def connectionFailed(self, failure): |
- if self.startedDeferred is not None: |
- d, self.startedDeferred = self.startedDeferred, None |
- d.errback(failure) |
- self.failure = failure |
- |
- def connectionRefused(self): |
- if self.startedDeferred is not None: |
- d, self.startedDeferred = self.startedDeferred, None |
- d.errback(error.ConnectionRefusedError("yup")) |
- self.refused = 1 |
- |
- |
-class GoodClient(Server): |
- |
- def connectionRefused(self): |
- if self.startedDeferred is not None: |
- d, self.startedDeferred = self.startedDeferred, None |
- d.errback(error.ConnectionRefusedError("yup")) |
- self.refused = 1 |
- |
- |
- |
-class BadClientError(Exception): |
- """ |
- Raised by BadClient at the end of every datagramReceived call to try and |
- screw stuff up. |
- """ |
- |
- |
- |
-class BadClient(protocol.DatagramProtocol): |
- """ |
- A DatagramProtocol which always raises an exception from datagramReceived. |
- Used to test error handling behavior in the reactor for that method. |
- """ |
- d = None |
- |
- def setDeferred(self, d): |
- """ |
- Set the Deferred which will be called back when datagramReceived is |
- called. |
- """ |
- self.d = d |
- |
- |
- def datagramReceived(self, bytes, addr): |
- if self.d is not None: |
- d, self.d = self.d, None |
- d.callback(bytes) |
- raise BadClientError("Application code is very buggy!") |
- |
- |
- |
-class OldConnectedUDPTestCase(unittest.TestCase): |
- def testStartStop(self): |
- client = Client() |
- d = client.startedDeferred = defer.Deferred() |
- port2 = reactor.connectUDP("127.0.0.1", 8888, client) |
- |
- def assertName(): |
- self.failUnless(repr(port2).find('test_udp.Client') >= 0) |
- |
- def cbStarted(ignored): |
- self.assertEquals(client.started, 1) |
- self.assertEquals(client.stopped, 0) |
- assertName() |
- d = defer.maybeDeferred(port2.stopListening) |
- d.addCallback(lambda ign: assertName()) |
- return d |
- |
- return d.addCallback(cbStarted) |
- testStartStop.suppress = [ |
- util.suppress(message='use listenUDP and then transport.connect', |
- category=DeprecationWarning)] |
- |
- |
- def testDNSFailure(self): |
- client = Client() |
- d = client.startedDeferred = defer.Deferred() |
- # if this domain exists, shoot your sysadmin |
- reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client) |
- |
- def didNotConnect(ign): |
- self.assertEquals(client.stopped, 0) |
- self.assertEquals(client.started, 0) |
- |
- d = self.assertFailure(d, error.DNSLookupError) |
- d.addCallback(didNotConnect) |
- return d |
- testDNSFailure.suppress = [ |
- util.suppress(message='use listenUDP and then transport.connect', |
- category=DeprecationWarning)] |
- |
- |
- def testSendPackets(self): |
- server = Server() |
- serverStarted = server.startedDeferred = defer.Deferred() |
- |
- client = Client() |
- clientStarted = client.startedDeferred = defer.Deferred() |
- |
- port1 = reactor.listenUDP(0, server, interface="127.0.0.1") |
- |
- def cbServerStarted(ignored): |
- self.port2 = reactor.connectUDP("127.0.0.1", |
- server.transport.getHost().port, |
- client) |
- return clientStarted |
- |
- d = serverStarted.addCallback(cbServerStarted) |
- |
- def cbClientStarted(ignored): |
- clientSend = server.packetReceived = defer.Deferred() |
- serverSend = client.packetReceived = defer.Deferred() |
- |
- cAddr = client.transport.getHost() |
- server.transport.write("hello", (cAddr.host, cAddr.port)) |
- client.transport.write("world") |
- |
- # No one will ever call errback on either of these Deferreds, |
- # otherwise I would pass fireOnOneErrback=True here. |
- return defer.DeferredList([clientSend, serverSend]) |
- |
- d.addCallback(cbClientStarted) |
- |
- def cbPackets(ignored): |
- self.assertEquals(client.packets, ["hello"]) |
- self.assertEquals(server.packets, |
- [("world", ("127.0.0.1", |
- client.transport.getHost().port))]) |
- |
- return defer.DeferredList([ |
- defer.maybeDeferred(port1.stopListening), |
- defer.maybeDeferred(self.port2.stopListening)], |
- fireOnOneErrback=True) |
- |
- d.addCallback(cbPackets) |
- return d |
- testSendPackets.suppress = [ |
- util.suppress(message='use listenUDP and then transport.connect', |
- category=DeprecationWarning)] |
- |
- |
- def test_connectionRefused(self): |
- """ |
- Test that using the connected UDP API will deliver connection refused |
- notification when packets are sent to an address at which no one is |
- listening. |
- """ |
- # XXX - assume no one listening on port 80 UDP |
- client = Client() |
- clientStarted = client.startedDeferred = Deferred() |
- server = Server() |
- serverStarted = server.startedDeferred = Deferred() |
- started = gatherResults([clientStarted, serverStarted]) |
- |
- clientPort = reactor.connectUDP("127.0.0.1", 80, client) |
- serverPort = reactor.listenUDP(0, server, interface="127.0.0.1") |
- |
- def cbStarted(ignored): |
- clientRefused = client.startedDeferred = Deferred() |
- |
- client.transport.write("a") |
- client.transport.write("b") |
- server.transport.write("c", ("127.0.0.1", 80)) |
- server.transport.write("d", ("127.0.0.1", 80)) |
- server.transport.write("e", ("127.0.0.1", 80)) |
- |
- c = clientPort.getHost() |
- s = serverPort.getHost() |
- server.transport.write("toserver", (s.host, s.port)) |
- server.transport.write("toclient", (c.host, c.port)) |
- |
- return self.assertFailure(clientRefused, error.ConnectionRefusedError) |
- started.addCallback(cbStarted) |
- |
- def cleanup(passthrough): |
- result = gatherResults([ |
- maybeDeferred(clientPort.stopListening), |
- maybeDeferred(serverPort.stopListening)]) |
- result.addCallback(lambda ign: passthrough) |
- return result |
- |
- started.addBoth(cleanup) |
- return started |
- test_connectionRefused.suppress = [ |
- util.suppress(message='use listenUDP and then transport.connect', |
- category=DeprecationWarning)] |
- |
- |
- |
-class UDPTestCase(unittest.TestCase): |
- |
- def testOldAddress(self): |
- server = Server() |
- d = server.startedDeferred = defer.Deferred() |
- p = reactor.listenUDP(0, server, interface="127.0.0.1") |
- def cbStarted(ignored): |
- addr = p.getHost() |
- self.assertEquals(addr, ('INET_UDP', addr.host, addr.port)) |
- return p.stopListening() |
- return d.addCallback(cbStarted) |
- testOldAddress.suppress = [ |
- util.suppress(message='IPv4Address.__getitem__', |
- category=DeprecationWarning)] |
- |
- |
- def testStartStop(self): |
- server = Server() |
- d = server.startedDeferred = defer.Deferred() |
- port1 = reactor.listenUDP(0, server, interface="127.0.0.1") |
- def cbStarted(ignored): |
- self.assertEquals(server.started, 1) |
- self.assertEquals(server.stopped, 0) |
- return port1.stopListening() |
- def cbStopped(ignored): |
- self.assertEquals(server.stopped, 1) |
- return d.addCallback(cbStarted).addCallback(cbStopped) |
- |
- def testRebind(self): |
- # Ensure binding the same DatagramProtocol repeatedly invokes all |
- # the right callbacks. |
- server = Server() |
- d = server.startedDeferred = defer.Deferred() |
- p = reactor.listenUDP(0, server, interface="127.0.0.1") |
- |
- def cbStarted(ignored, port): |
- return port.stopListening() |
- |
- def cbStopped(ignored): |
- d = server.startedDeferred = defer.Deferred() |
- p = reactor.listenUDP(0, server, interface="127.0.0.1") |
- return d.addCallback(cbStarted, p) |
- |
- return d.addCallback(cbStarted, p) |
- |
- |
- def testBindError(self): |
- server = Server() |
- d = server.startedDeferred = defer.Deferred() |
- port = reactor.listenUDP(0, server, interface='127.0.0.1') |
- |
- def cbStarted(ignored): |
- self.assertEquals(port.getHost(), server.transport.getHost()) |
- |
- server2 = Server() |
- self.assertRaises( |
- error.CannotListenError, |
- reactor.listenUDP, port.getHost().port, server2, |
- interface='127.0.0.1') |
- d.addCallback(cbStarted) |
- |
- def cbFinished(ignored): |
- return port.stopListening() |
- d.addCallback(cbFinished) |
- return d |
- |
- def testSendPackets(self): |
- server = Server() |
- serverStarted = server.startedDeferred = defer.Deferred() |
- port1 = reactor.listenUDP(0, server, interface="127.0.0.1") |
- |
- client = GoodClient() |
- clientStarted = client.startedDeferred = defer.Deferred() |
- |
- def cbServerStarted(ignored): |
- self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1") |
- return clientStarted |
- |
- d = serverStarted.addCallback(cbServerStarted) |
- |
- def cbClientStarted(ignored): |
- client.transport.connect("127.0.0.1", |
- server.transport.getHost().port) |
- cAddr = client.transport.getHost() |
- sAddr = server.transport.getHost() |
- |
- serverSend = client.packetReceived = defer.Deferred() |
- server.transport.write("hello", (cAddr.host, cAddr.port)) |
- |
- clientWrites = [ |
- ("a",), |
- ("b", None), |
- ("c", (sAddr.host, sAddr.port))] |
- |
- def cbClientSend(ignored): |
- if clientWrites: |
- nextClientWrite = server.packetReceived = defer.Deferred() |
- nextClientWrite.addCallback(cbClientSend) |
- client.transport.write(*clientWrites.pop(0)) |
- return nextClientWrite |
- |
- # No one will ever call .errback on either of these Deferreds, |
- # but there is a non-trivial amount of test code which might |
- # cause them to fail somehow. So fireOnOneErrback=True. |
- return defer.DeferredList([ |
- cbClientSend(None), |
- serverSend], |
- fireOnOneErrback=True) |
- |
- d.addCallback(cbClientStarted) |
- |
- def cbSendsFinished(ignored): |
- cAddr = client.transport.getHost() |
- sAddr = server.transport.getHost() |
- self.assertEquals( |
- client.packets, |
- [("hello", (sAddr.host, sAddr.port))]) |
- clientAddr = (cAddr.host, cAddr.port) |
- self.assertEquals( |
- server.packets, |
- [("a", clientAddr), |
- ("b", clientAddr), |
- ("c", clientAddr)]) |
- |
- d.addCallback(cbSendsFinished) |
- |
- def cbFinished(ignored): |
- return defer.DeferredList([ |
- defer.maybeDeferred(port1.stopListening), |
- defer.maybeDeferred(self.port2.stopListening)], |
- fireOnOneErrback=True) |
- |
- d.addCallback(cbFinished) |
- return d |
- |
- |
- def testConnectionRefused(self): |
- # assume no one listening on port 80 UDP |
- client = GoodClient() |
- clientStarted = client.startedDeferred = defer.Deferred() |
- port = reactor.listenUDP(0, client, interface="127.0.0.1") |
- |
- server = Server() |
- serverStarted = server.startedDeferred = defer.Deferred() |
- port2 = reactor.listenUDP(0, server, interface="127.0.0.1") |
- |
- d = defer.DeferredList( |
- [clientStarted, serverStarted], |
- fireOnOneErrback=True) |
- |
- def cbStarted(ignored): |
- connectionRefused = client.startedDeferred = defer.Deferred() |
- client.transport.connect("127.0.0.1", 80) |
- |
- for i in range(10): |
- client.transport.write(str(i)) |
- server.transport.write(str(i), ("127.0.0.1", 80)) |
- |
- return self.assertFailure( |
- connectionRefused, |
- error.ConnectionRefusedError) |
- |
- d.addCallback(cbStarted) |
- |
- def cbFinished(ignored): |
- return defer.DeferredList([ |
- defer.maybeDeferred(port.stopListening), |
- defer.maybeDeferred(port2.stopListening)], |
- fireOnOneErrback=True) |
- |
- d.addCallback(cbFinished) |
- return d |
- |
- def testBadConnect(self): |
- client = GoodClient() |
- port = reactor.listenUDP(0, client, interface="127.0.0.1") |
- self.assertRaises(ValueError, client.transport.connect, |
- "localhost", 80) |
- client.transport.connect("127.0.0.1", 80) |
- self.assertRaises(RuntimeError, client.transport.connect, |
- "127.0.0.1", 80) |
- return port.stopListening() |
- |
- |
- |
- def testDatagramReceivedError(self): |
- """ |
- Test that when datagramReceived raises an exception it is logged but |
- the port is not disconnected. |
- """ |
- finalDeferred = defer.Deferred() |
- |
- def cbCompleted(ign): |
- """ |
- Flush the exceptions which the reactor should have logged and make |
- sure they're actually there. |
- """ |
- errs = self.flushLoggedErrors(BadClientError) |
- self.assertEquals(len(errs), 2, "Incorrectly found %d errors, expected 2" % (len(errs),)) |
- finalDeferred.addCallback(cbCompleted) |
- |
- client = BadClient() |
- port = reactor.listenUDP(0, client, interface='127.0.0.1') |
- |
- def cbCleanup(result): |
- """ |
- Disconnect the port we started and pass on whatever was given to us |
- in case it was a Failure. |
- """ |
- return defer.maybeDeferred(port.stopListening).addBoth(lambda ign: result) |
- finalDeferred.addBoth(cbCleanup) |
- |
- addr = port.getHost() |
- |
- # UDP is not reliable. Try to send as many as 60 packets before giving |
- # up. Conceivably, all sixty could be lost, but they probably won't be |
- # unless all UDP traffic is being dropped, and then the rest of these |
- # UDP tests will likely fail as well. Ideally, this test (and probably |
- # others) wouldn't even use actual UDP traffic: instead, they would |
- # stub out the socket with a fake one which could be made to behave in |
- # whatever way the test desires. Unfortunately, this is hard because |
- # of differences in various reactor implementations. |
- attempts = range(60) |
- succeededAttempts = [] |
- |
- def makeAttempt(): |
- """ |
- Send one packet to the listening BadClient. Set up a 0.1 second |
- timeout to do re-transmits in case the packet is dropped. When two |
- packets have been received by the BadClient, stop sending and let |
- the finalDeferred's callbacks do some assertions. |
- """ |
- if not attempts: |
- try: |
- self.fail("Not enough packets received") |
- except: |
- finalDeferred.errback() |
- |
- self.failIfIdentical(client.transport, None, "UDP Protocol lost its transport") |
- |
- packet = str(attempts.pop(0)) |
- packetDeferred = defer.Deferred() |
- client.setDeferred(packetDeferred) |
- client.transport.write(packet, (addr.host, addr.port)) |
- |
- def cbPacketReceived(packet): |
- """ |
- A packet arrived. Cancel the timeout for it, record it, and |
- maybe finish the test. |
- """ |
- timeoutCall.cancel() |
- succeededAttempts.append(packet) |
- if len(succeededAttempts) == 2: |
- # The second error has not yet been logged, since the |
- # exception which causes it hasn't even been raised yet. |
- # Give the datagramReceived call a chance to finish, then |
- # let the test finish asserting things. |
- reactor.callLater(0, finalDeferred.callback, None) |
- else: |
- makeAttempt() |
- |
- def ebPacketTimeout(err): |
- """ |
- The packet wasn't received quickly enough. Try sending another |
- one. It doesn't matter if the packet for which this was the |
- timeout eventually arrives: makeAttempt throws away the |
- Deferred on which this function is the errback, so when |
- datagramReceived callbacks, so it won't be on this Deferred, so |
- it won't raise an AlreadyCalledError. |
- """ |
- makeAttempt() |
- |
- packetDeferred.addCallbacks(cbPacketReceived, ebPacketTimeout) |
- packetDeferred.addErrback(finalDeferred.errback) |
- |
- timeoutCall = reactor.callLater( |
- 0.1, packetDeferred.errback, |
- error.TimeoutError( |
- "Timed out in testDatagramReceivedError")) |
- |
- makeAttempt() |
- return finalDeferred |
- |
- |
- def testPortRepr(self): |
- client = GoodClient() |
- p = reactor.listenUDP(0, client) |
- portNo = str(p.getHost().port) |
- self.failIf(repr(p).find(portNo) == -1) |
- def stoppedListening(ign): |
- self.failIf(repr(p).find(portNo) != -1) |
- d = defer.maybeDeferred(p.stopListening) |
- d.addCallback(stoppedListening) |
- return d |
- |
- |
-class ReactorShutdownInteraction(unittest.TestCase): |
- """Test reactor shutdown interaction""" |
- |
- def setUp(self): |
- """Start a UDP port""" |
- self.server = Server() |
- self.port = reactor.listenUDP(0, self.server, interface='127.0.0.1') |
- |
- def tearDown(self): |
- """Stop the UDP port""" |
- return self.port.stopListening() |
- |
- def testShutdownFromDatagramReceived(self): |
- """Test reactor shutdown while in a recvfrom() loop""" |
- |
- # udp.Port's doRead calls recvfrom() in a loop, as an optimization. |
- # It is important this loop terminate under various conditions. |
- # Previously, if datagramReceived synchronously invoked |
- # reactor.stop(), under certain reactors, the Port's socket would |
- # synchronously disappear, causing an AttributeError inside that |
- # loop. This was mishandled, causing the loop to spin forever. |
- # This test is primarily to ensure that the loop never spins |
- # forever. |
- |
- finished = defer.Deferred() |
- pr = self.server.packetReceived = defer.Deferred() |
- |
- def pktRece(ignored): |
- # Simulate reactor.stop() behavior :( |
- self.server.transport.connectionLost() |
- # Then delay this Deferred chain until the protocol has been |
- # disconnected, as the reactor should do in an error condition |
- # such as we are inducing. This is very much a whitebox test. |
- reactor.callLater(0, finished.callback, None) |
- pr.addCallback(pktRece) |
- |
- def flushErrors(ignored): |
- # We are breaking abstraction and calling private APIs, any |
- # number of horrible errors might occur. As long as the reactor |
- # doesn't hang, this test is satisfied. (There may be room for |
- # another, stricter test.) |
- self.flushLoggedErrors() |
- finished.addCallback(flushErrors) |
- self.server.transport.write('\0' * 64, ('127.0.0.1', |
- self.server.transport.getHost().port)) |
- return finished |
- |
- |
- |
-class MulticastTestCase(unittest.TestCase): |
- |
- def setUp(self): |
- self.server = Server() |
- self.client = Client() |
- # multicast won't work if we listen over loopback, apparently |
- self.port1 = reactor.listenMulticast(0, self.server) |
- self.port2 = reactor.listenMulticast(0, self.client) |
- self.client.transport.connect( |
- "127.0.0.1", self.server.transport.getHost().port) |
- |
- |
- def tearDown(self): |
- return gatherResults([ |
- maybeDeferred(self.port1.stopListening), |
- maybeDeferred(self.port2.stopListening)]) |
- |
- |
- def testTTL(self): |
- for o in self.client, self.server: |
- self.assertEquals(o.transport.getTTL(), 1) |
- o.transport.setTTL(2) |
- self.assertEquals(o.transport.getTTL(), 2) |
- |
- |
- def test_loopback(self): |
- """ |
- Test that after loopback mode has been set, multicast packets are |
- delivered to their sender. |
- """ |
- self.assertEquals(self.server.transport.getLoopbackMode(), 1) |
- addr = self.server.transport.getHost() |
- joined = self.server.transport.joinGroup("225.0.0.250") |
- |
- def cbJoined(ignored): |
- d = self.server.packetReceived = Deferred() |
- self.server.transport.write("hello", ("225.0.0.250", addr.port)) |
- return d |
- joined.addCallback(cbJoined) |
- |
- def cbPacket(ignored): |
- self.assertEqual(len(self.server.packets), 1) |
- self.server.transport.setLoopbackMode(0) |
- self.assertEquals(self.server.transport.getLoopbackMode(), 0) |
- self.server.transport.write("hello", ("225.0.0.250", addr.port)) |
- |
- # This is fairly lame. |
- d = Deferred() |
- reactor.callLater(0, d.callback, None) |
- return d |
- joined.addCallback(cbPacket) |
- |
- def cbNoPacket(ignored): |
- self.assertEqual(len(self.server.packets), 1) |
- joined.addCallback(cbNoPacket) |
- |
- return joined |
- |
- |
- def test_interface(self): |
- """ |
- Test C{getOutgoingInterface} and C{setOutgoingInterface}. |
- """ |
- self.assertEqual( |
- self.client.transport.getOutgoingInterface(), "0.0.0.0") |
- self.assertEqual( |
- self.server.transport.getOutgoingInterface(), "0.0.0.0") |
- |
- d1 = self.client.transport.setOutgoingInterface("127.0.0.1") |
- d2 = self.server.transport.setOutgoingInterface("127.0.0.1") |
- result = gatherResults([d1, d2]) |
- |
- def cbInterfaces(ignored): |
- self.assertEqual( |
- self.client.transport.getOutgoingInterface(), "127.0.0.1") |
- self.assertEqual( |
- self.server.transport.getOutgoingInterface(), "127.0.0.1") |
- result.addCallback(cbInterfaces) |
- return result |
- |
- |
- def test_joinLeave(self): |
- """ |
- Test that multicast a group can be joined and left. |
- """ |
- d = self.client.transport.joinGroup("225.0.0.250") |
- |
- def clientJoined(ignored): |
- return self.client.transport.leaveGroup("225.0.0.250") |
- d.addCallback(clientJoined) |
- |
- def clientLeft(ignored): |
- return self.server.transport.joinGroup("225.0.0.250") |
- d.addCallback(clientLeft) |
- |
- def serverJoined(ignored): |
- return self.server.transport.leaveGroup("225.0.0.250") |
- d.addCallback(serverJoined) |
- |
- return d |
- |
- |
- def test_joinFailure(self): |
- """ |
- Test that an attempt to join an address which is not a multicast |
- address fails with L{error.MulticastJoinError}. |
- """ |
- # 127.0.0.1 is not a multicast address, so joining it should fail. |
- return self.assertFailure( |
- self.client.transport.joinGroup("127.0.0.1"), |
- error.MulticastJoinError) |
- if runtime.platform.isWindows(): |
- test_joinFailure.todo = "Windows' multicast is wonky" |
- |
- |
- def test_multicast(self): |
- """ |
- Test that a multicast group can be joined and messages sent to and |
- received from it. |
- """ |
- c = Server() |
- p = reactor.listenMulticast(0, c) |
- addr = self.server.transport.getHost() |
- |
- joined = self.server.transport.joinGroup("225.0.0.250") |
- |
- def cbJoined(ignored): |
- d = self.server.packetReceived = Deferred() |
- c.transport.write("hello world", ("225.0.0.250", addr.port)) |
- return d |
- joined.addCallback(cbJoined) |
- |
- def cbPacket(ignored): |
- self.assertEquals(self.server.packets[0][0], "hello world") |
- joined.addCallback(cbPacket) |
- |
- def cleanup(passthrough): |
- result = maybeDeferred(p.stopListening) |
- result.addCallback(lambda ign: passthrough) |
- return result |
- joined.addCallback(cleanup) |
- |
- return joined |
- |
- |
- def test_multiListen(self): |
- """ |
- Test that multiple sockets can listen on the same multicast port and |
- that they both receive multicast messages directed to that address. |
- """ |
- firstClient = Server() |
- firstPort = reactor.listenMulticast( |
- 0, firstClient, listenMultiple=True) |
- |
- portno = firstPort.getHost().port |
- |
- secondClient = Server() |
- secondPort = reactor.listenMulticast( |
- portno, secondClient, listenMultiple=True) |
- |
- joined = self.server.transport.joinGroup("225.0.0.250") |
- |
- def serverJoined(ignored): |
- d1 = firstClient.packetReceived = Deferred() |
- d2 = secondClient.packetReceived = Deferred() |
- firstClient.transport.write("hello world", ("225.0.0.250", portno)) |
- return gatherResults([d1, d2]) |
- joined.addCallback(serverJoined) |
- |
- def gotPackets(ignored): |
- self.assertEquals(firstClient.packets[0][0], "hello world") |
- self.assertEquals(secondClient.packets[0][0], "hello world") |
- joined.addCallback(gotPackets) |
- |
- def cleanup(passthrough): |
- result = gatherResults([ |
- maybeDeferred(firstPort.stopListening), |
- maybeDeferred(secondPort.stopListening)]) |
- result.addCallback(lambda ign: passthrough) |
- return result |
- joined.addBoth(cleanup) |
- return joined |
- if runtime.platform.isWindows(): |
- test_multiListen.skip = ("on non-linux platforms it appears multiple " |
- "processes can listen, but not multiple sockets " |
- "in same process?") |
- |
-if not interfaces.IReactorUDP(reactor, None): |
- UDPTestCase.skip = "This reactor does not support UDP" |
- ReactorShutdownInteraction.skip = "This reactor does not support UDP" |
-if not hasattr(reactor, "connectUDP"): |
- OldConnectedUDPTestCase.skip = "This reactor does not support connectUDP" |
-if not interfaces.IReactorMulticast(reactor, None): |
- MulticastTestCase.skip = "This reactor does not support multicast" |
- |
-def checkForLinux22(): |
- import os |
- if os.path.exists("/proc/version"): |
- s = open("/proc/version").read() |
- if s.startswith("Linux version"): |
- s = s.split()[2] |
- if s.split(".")[:2] == ["2", "2"]: |
- f = MulticastTestCase.testInterface.im_func |
- f.todo = "figure out why this fails in linux 2.2" |
-checkForLinux22() |