OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 import sys, pickle | |
5 | |
6 try: | |
7 import threading | |
8 except ImportError: | |
9 threading = None | |
10 | |
11 from twisted.trial import unittest | |
12 from twisted.python import threadable | |
13 from twisted.internet import defer, reactor | |
14 | |
15 class TestObject: | |
16 synchronized = ['aMethod'] | |
17 | |
18 x = -1 | |
19 y = 1 | |
20 | |
21 def aMethod(self): | |
22 for i in xrange(10): | |
23 self.x, self.y = self.y, self.x | |
24 self.z = self.x + self.y | |
25 assert self.z == 0, "z == %d, not 0 as expected" % (self.z,) | |
26 | |
27 threadable.synchronize(TestObject) | |
28 | |
29 class SynchronizationTestCase(unittest.TestCase): | |
30 if hasattr(sys, 'getcheckinterval'): | |
31 def setUpClass(self): | |
32 self.checkInterval = sys.getcheckinterval() | |
33 sys.setcheckinterval(7) | |
34 | |
35 def tearDownClass(self): | |
36 sys.setcheckinterval(self.checkInterval) | |
37 | |
38 | |
39 def setUp(self): | |
40 # XXX This is a trial hack. We need to make sure the reactor | |
41 # actually *starts* for isInIOThread() to have a meaningful result. | |
42 # Returning a Deferred here should force that to happen, if it has | |
43 # not happened already. In the future, this should not be | |
44 # necessary. | |
45 d = defer.Deferred() | |
46 reactor.callLater(0, d.callback, None) | |
47 return d | |
48 | |
49 | |
50 def testIsInIOThread(self): | |
51 foreignResult = [] | |
52 t = threading.Thread(target=lambda: foreignResult.append(threadable.isIn
IOThread())) | |
53 t.start() | |
54 t.join() | |
55 self.failIf(foreignResult[0], "Non-IO thread reported as IO thread") | |
56 self.failUnless(threadable.isInIOThread(), "IO thread reported as not IO
thread") | |
57 | |
58 | |
59 def testThreadedSynchronization(self): | |
60 o = TestObject() | |
61 | |
62 errors = [] | |
63 | |
64 def callMethodLots(): | |
65 try: | |
66 for i in xrange(1000): | |
67 o.aMethod() | |
68 except AssertionError, e: | |
69 errors.append(str(e)) | |
70 | |
71 threads = [] | |
72 for x in range(5): | |
73 t = threading.Thread(target=callMethodLots) | |
74 threads.append(t) | |
75 t.start() | |
76 | |
77 for t in threads: | |
78 t.join() | |
79 | |
80 if errors: | |
81 raise unittest.FailTest(errors) | |
82 | |
83 def testUnthreadedSynchronization(self): | |
84 o = TestObject() | |
85 for i in xrange(1000): | |
86 o.aMethod() | |
87 | |
88 class SerializationTestCase(unittest.TestCase): | |
89 def testPickling(self): | |
90 lock = threadable.XLock() | |
91 lockType = type(lock) | |
92 lockPickle = pickle.dumps(lock) | |
93 newLock = pickle.loads(lockPickle) | |
94 self.failUnless(isinstance(newLock, lockType)) | |
95 | |
96 def testUnpickling(self): | |
97 lockPickle = 'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n
.' | |
98 lock = pickle.loads(lockPickle) | |
99 newPickle = pickle.dumps(lock, 2) | |
100 newLock = pickle.loads(newPickle) | |
101 | |
102 if threading is None: | |
103 SynchronizationTestCase.testThreadedSynchronization.skip = "Platform lacks t
hread support" | |
104 SerializationTestCase.testPickling.skip = "Platform lacks thread support" | |
OLD | NEW |