Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(224)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_factories.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Test code for basic Factory classes.
6 """
7
8 import pickle
9
10 from twisted.trial import unittest
11
12 from twisted.internet import reactor, defer
13 from twisted.internet.protocol import Factory, ReconnectingClientFactory
14 from twisted.protocols.basic import Int16StringReceiver
15
16 class In(Int16StringReceiver):
17 def __init__(self):
18 self.msgs = {}
19
20 def connectionMade(self):
21 self.factory.connections += 1
22
23 def stringReceived(self, msg):
24 n, msg = pickle.loads(msg)
25 self.msgs[n] = msg
26 self.sendString(pickle.dumps(n))
27
28 def connectionLost(self, reason):
29 self.factory.allMessages.append(self.msgs)
30 if len(self.factory.allMessages) >= self.factory.goal:
31 self.factory.d.callback(None)
32
33 class Out(Int16StringReceiver):
34 msgs = dict([(x, 'X' * x) for x in range(10)])
35
36 def __init__(self):
37 self.msgs = Out.msgs.copy()
38
39 def connectionMade(self):
40 for i in self.msgs.keys():
41 self.sendString(pickle.dumps( (i, self.msgs[i])))
42
43 def stringReceived(self, msg):
44 n = pickle.loads(msg)
45 del self.msgs[n]
46 if not self.msgs:
47 self.transport.loseConnection()
48 self.factory.howManyTimes -= 1
49 if self.factory.howManyTimes <= 0:
50 self.factory.stopTrying()
51
52
53
54 class ReconnectingFactoryTestCase(unittest.TestCase):
55 """
56 Tests for L{ReconnectingClientFactory}.
57 """
58 def testStopTrying(self):
59 f = Factory()
60 f.protocol = In
61 f.connections = 0
62 f.allMessages = []
63 f.goal = 2
64 f.d = defer.Deferred()
65
66 c = ReconnectingClientFactory()
67 c.initialDelay = c.delay = 0.2
68 c.protocol = Out
69 c.howManyTimes = 2
70
71 port = reactor.listenTCP(0, f)
72 self.addCleanup(port.stopListening)
73 PORT = port.getHost().port
74 reactor.connectTCP('127.0.0.1', PORT, c)
75
76 f.d.addCallback(self._testStopTrying_1, f, c)
77 return f.d
78 testStopTrying.timeout = 10
79
80
81 def _testStopTrying_1(self, res, f, c):
82 self.assertEquals(len(f.allMessages), 2,
83 "not enough messages -- %s" % f.allMessages)
84 self.assertEquals(f.connections, 2,
85 "Number of successful connections incorrect %d" %
86 f.connections)
87 self.assertEquals(f.allMessages, [Out.msgs] * 2)
88 self.failIf(c.continueTrying, "stopTrying never called or ineffective")
89
90
91 def test_serializeUnused(self):
92 """
93 A L{ReconnectingClientFactory} which hasn't been used for anything
94 can be pickled and unpickled and end up with the same state.
95 """
96 original = ReconnectingClientFactory()
97 reconstituted = pickle.loads(pickle.dumps(original))
98 self.assertEqual(original.__dict__, reconstituted.__dict__)
99
100
101 def test_deserializationResetsParameters(self):
102 """
103 A L{ReconnectingClientFactory} which is unpickled does not have an
104 L{IConnector} and has its reconnectioning timing parameters reset to
105 their initial values.
106 """
107 class FakeConnector(object):
108 def stopConnecting(self):
109 pass
110
111 factory = ReconnectingClientFactory()
112 factory.clientConnectionFailed(FakeConnector(), None)
113 try:
114 serialized = pickle.dumps(factory)
115 unserialized = pickle.loads(serialized)
116 self.assertEqual(unserialized.connector, None)
117 self.assertEqual(unserialized._callID, None)
118 self.assertEqual(unserialized.retries, 0)
119 self.assertEqual(unserialized.delay, factory.initialDelay)
120 self.assertEqual(unserialized.continueTrying, True)
121 finally:
122 factory.stopTrying()
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_extensions.py ('k') | third_party/twisted_8_1/twisted/test/test_failure.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698