Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(228)

Unified Diff: third_party/twisted_8_1/twisted/test/test_factories.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_factories.py
diff --git a/third_party/twisted_8_1/twisted/test/test_factories.py b/third_party/twisted_8_1/twisted/test/test_factories.py
deleted file mode 100644
index 63cf210669c281388784405cc8229313fca7a1c5..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_factories.py
+++ /dev/null
@@ -1,122 +0,0 @@
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Test code for basic Factory classes.
-"""
-
-import pickle
-
-from twisted.trial import unittest
-
-from twisted.internet import reactor, defer
-from twisted.internet.protocol import Factory, ReconnectingClientFactory
-from twisted.protocols.basic import Int16StringReceiver
-
-class In(Int16StringReceiver):
- def __init__(self):
- self.msgs = {}
-
- def connectionMade(self):
- self.factory.connections += 1
-
- def stringReceived(self, msg):
- n, msg = pickle.loads(msg)
- self.msgs[n] = msg
- self.sendString(pickle.dumps(n))
-
- def connectionLost(self, reason):
- self.factory.allMessages.append(self.msgs)
- if len(self.factory.allMessages) >= self.factory.goal:
- self.factory.d.callback(None)
-
-class Out(Int16StringReceiver):
- msgs = dict([(x, 'X' * x) for x in range(10)])
-
- def __init__(self):
- self.msgs = Out.msgs.copy()
-
- def connectionMade(self):
- for i in self.msgs.keys():
- self.sendString(pickle.dumps( (i, self.msgs[i])))
-
- def stringReceived(self, msg):
- n = pickle.loads(msg)
- del self.msgs[n]
- if not self.msgs:
- self.transport.loseConnection()
- self.factory.howManyTimes -= 1
- if self.factory.howManyTimes <= 0:
- self.factory.stopTrying()
-
-
-
-class ReconnectingFactoryTestCase(unittest.TestCase):
- """
- Tests for L{ReconnectingClientFactory}.
- """
- def testStopTrying(self):
- f = Factory()
- f.protocol = In
- f.connections = 0
- f.allMessages = []
- f.goal = 2
- f.d = defer.Deferred()
-
- c = ReconnectingClientFactory()
- c.initialDelay = c.delay = 0.2
- c.protocol = Out
- c.howManyTimes = 2
-
- port = reactor.listenTCP(0, f)
- self.addCleanup(port.stopListening)
- PORT = port.getHost().port
- reactor.connectTCP('127.0.0.1', PORT, c)
-
- f.d.addCallback(self._testStopTrying_1, f, c)
- return f.d
- testStopTrying.timeout = 10
-
-
- def _testStopTrying_1(self, res, f, c):
- self.assertEquals(len(f.allMessages), 2,
- "not enough messages -- %s" % f.allMessages)
- self.assertEquals(f.connections, 2,
- "Number of successful connections incorrect %d" %
- f.connections)
- self.assertEquals(f.allMessages, [Out.msgs] * 2)
- self.failIf(c.continueTrying, "stopTrying never called or ineffective")
-
-
- def test_serializeUnused(self):
- """
- A L{ReconnectingClientFactory} which hasn't been used for anything
- can be pickled and unpickled and end up with the same state.
- """
- original = ReconnectingClientFactory()
- reconstituted = pickle.loads(pickle.dumps(original))
- self.assertEqual(original.__dict__, reconstituted.__dict__)
-
-
- def test_deserializationResetsParameters(self):
- """
- A L{ReconnectingClientFactory} which is unpickled does not have an
- L{IConnector} and has its reconnectioning timing parameters reset to
- their initial values.
- """
- class FakeConnector(object):
- def stopConnecting(self):
- pass
-
- factory = ReconnectingClientFactory()
- factory.clientConnectionFailed(FakeConnector(), None)
- try:
- serialized = pickle.dumps(factory)
- unserialized = pickle.loads(serialized)
- self.assertEqual(unserialized.connector, None)
- self.assertEqual(unserialized._callID, None)
- self.assertEqual(unserialized.retries, 0)
- self.assertEqual(unserialized.delay, factory.initialDelay)
- self.assertEqual(unserialized.continueTrying, True)
- finally:
- factory.stopTrying()
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_extensions.py ('k') | third_party/twisted_8_1/twisted/test/test_failure.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698