OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 | |
5 import pickle, time, weakref, gc | |
6 | |
7 from twisted.trial import unittest, util | |
8 from twisted.python import threadable | |
9 from twisted.internet import reactor, interfaces | |
10 | |
11 # | |
12 # See the end of this module for the remainder of the imports. | |
13 # | |
14 | |
15 class Synchronization(object): | |
16 failures = 0 | |
17 | |
18 def __init__(self, N, waiting): | |
19 self.N = N | |
20 self.waiting = waiting | |
21 self.lock = threading.Lock() | |
22 self.runs = [] | |
23 | |
24 def run(self): | |
25 # This is the testy part: this is supposed to be invoked | |
26 # serially from multiple threads. If that is actually the | |
27 # case, we will never fail to acquire this lock. If it is | |
28 # *not* the case, we might get here while someone else is | |
29 # holding the lock. | |
30 if self.lock.acquire(False): | |
31 if not len(self.runs) % 5: | |
32 time.sleep(0.0002) # Constant selected based on | |
33 # empirical data to maximize the | |
34 # chance of a quick failure if this | |
35 # code is broken. | |
36 self.lock.release() | |
37 else: | |
38 self.failures += 1 | |
39 | |
40 # This is just the only way I can think of to wake up the test | |
41 # method. It doesn't actually have anything to do with the | |
42 # test. | |
43 self.lock.acquire() | |
44 self.runs.append(None) | |
45 if len(self.runs) == self.N: | |
46 self.waiting.release() | |
47 self.lock.release() | |
48 | |
49 synchronized = ["run"] | |
50 threadable.synchronize(Synchronization) | |
51 | |
52 | |
53 | |
54 class ThreadPoolTestCase(unittest.TestCase): | |
55 """ | |
56 Test threadpools. | |
57 """ | |
58 | |
59 def test_threadCreationArguments(self): | |
60 """ | |
61 Test that creating threads in the threadpool with application-level | |
62 objects as arguments doesn't results in those objects never being | |
63 freed, with the thread maintaining a reference to them as long as it | |
64 exists. | |
65 """ | |
66 try: | |
67 tp = threadpool.ThreadPool(0, 1) | |
68 tp.start() | |
69 | |
70 # Sanity check - no threads should have been started yet. | |
71 self.assertEqual(tp.threads, []) | |
72 | |
73 # Here's our function | |
74 def worker(arg): | |
75 pass | |
76 # weakref need an object subclass | |
77 class Dumb(object): | |
78 pass | |
79 # And here's the unique object | |
80 unique = Dumb() | |
81 | |
82 workerRef = weakref.ref(worker) | |
83 uniqueRef = weakref.ref(unique) | |
84 | |
85 # Put some work in | |
86 tp.callInThread(worker, unique) | |
87 | |
88 # Add an event to wait completion | |
89 event = threading.Event() | |
90 tp.callInThread(event.set) | |
91 event.wait(self.getTimeout()) | |
92 | |
93 del worker | |
94 del unique | |
95 gc.collect() | |
96 self.assertEquals(uniqueRef(), None) | |
97 self.assertEquals(workerRef(), None) | |
98 finally: | |
99 tp.stop() | |
100 | |
101 | |
102 def test_persistence(self): | |
103 """ | |
104 Threadpools can be pickled and unpickled, which should preserve the | |
105 number of threads and other parameters. | |
106 """ | |
107 tp = threadpool.ThreadPool(7, 20) | |
108 tp.start() | |
109 | |
110 # XXX Sigh - race condition: start should return a Deferred | |
111 # which fires when all the workers it started have fully | |
112 # started up. | |
113 time.sleep(0.1) | |
114 | |
115 self.assertEquals(len(tp.threads), 7) | |
116 self.assertEquals(tp.min, 7) | |
117 self.assertEquals(tp.max, 20) | |
118 | |
119 # check that unpickled threadpool has same number of threads | |
120 s = pickle.dumps(tp) | |
121 tp2 = pickle.loads(s) | |
122 tp2.start() | |
123 | |
124 # XXX As above | |
125 time.sleep(0.1) | |
126 | |
127 self.assertEquals(len(tp2.threads), 7) | |
128 self.assertEquals(tp2.min, 7) | |
129 self.assertEquals(tp2.max, 20) | |
130 | |
131 tp.stop() | |
132 tp2.stop() | |
133 | |
134 | |
135 def _waitForLock(self, lock): | |
136 for i in xrange(1000000): | |
137 if lock.acquire(False): | |
138 break | |
139 time.sleep(1e-5) | |
140 else: | |
141 self.fail("A long time passed without succeeding") | |
142 | |
143 | |
144 def _threadpoolTest(self, method): | |
145 """ | |
146 Test synchronization of calls made with C{method}, which should be | |
147 one of the mecanisms of the threadpool to execute work in threads. | |
148 """ | |
149 # This is a schizophrenic test: it seems to be trying to test | |
150 # both the callInThread()/dispatch() behavior of the ThreadPool as well | |
151 # as the serialization behavior of threadable.synchronize(). It | |
152 # would probably make more sense as two much simpler tests. | |
153 N = 10 | |
154 | |
155 tp = threadpool.ThreadPool() | |
156 tp.start() | |
157 try: | |
158 waiting = threading.Lock() | |
159 waiting.acquire() | |
160 actor = Synchronization(N, waiting) | |
161 | |
162 for i in xrange(N): | |
163 method(tp, actor) | |
164 | |
165 self._waitForLock(waiting) | |
166 | |
167 self.failIf(actor.failures, "run() re-entered %d times" % | |
168 (actor.failures,)) | |
169 finally: | |
170 tp.stop() | |
171 | |
172 | |
173 def test_dispatch(self): | |
174 """ | |
175 Call C{_threadpoolTest} with C{dispatch}. | |
176 """ | |
177 return self._threadpoolTest( | |
178 lambda tp, actor: tp.dispatch(actor, actor.run)) | |
179 | |
180 test_dispatch.suppress = [util.suppress( | |
181 message="dispatch\(\) is deprecated since Twisted 8.0, " | |
182 "use callInThread\(\) instead", | |
183 category=DeprecationWarning)] | |
184 | |
185 | |
186 def test_callInThread(self): | |
187 """ | |
188 Call C{_threadpoolTest} with C{callInThread}. | |
189 """ | |
190 return self._threadpoolTest( | |
191 lambda tp, actor: tp.callInThread(actor.run)) | |
192 | |
193 | |
194 def test_existingWork(self): | |
195 """ | |
196 Work added to the threadpool before its start should be executed once | |
197 the threadpool is started: this is ensured by trying to release a lock | |
198 previously acquired. | |
199 """ | |
200 waiter = threading.Lock() | |
201 waiter.acquire() | |
202 | |
203 tp = threadpool.ThreadPool(0, 1) | |
204 tp.callInThread(waiter.release) # before start() | |
205 tp.start() | |
206 | |
207 try: | |
208 self._waitForLock(waiter) | |
209 finally: | |
210 tp.stop() | |
211 | |
212 | |
213 def test_dispatchDeprecation(self): | |
214 """ | |
215 Test for the deprecation of the dispatch method. | |
216 """ | |
217 tp = threadpool.ThreadPool() | |
218 tp.start() | |
219 def cb(): | |
220 return tp.dispatch(None, lambda: None) | |
221 try: | |
222 self.assertWarns(DeprecationWarning, | |
223 "dispatch() is deprecated since Twisted 8.0, " | |
224 "use callInThread() instead", | |
225 __file__, cb) | |
226 finally: | |
227 tp.stop() | |
228 | |
229 | |
230 def test_dispatchWithCallbackDeprecation(self): | |
231 """ | |
232 Test for the deprecation of the dispatchWithCallback method. | |
233 """ | |
234 tp = threadpool.ThreadPool() | |
235 tp.start() | |
236 def cb(): | |
237 return tp.dispatchWithCallback( | |
238 None, | |
239 lambda x: None, | |
240 lambda x: None, | |
241 lambda: None) | |
242 try: | |
243 self.assertWarns(DeprecationWarning, | |
244 "dispatchWithCallback() is deprecated since Twisted 8.0, " | |
245 "use twisted.internet.threads.deferToThread() instead.", | |
246 __file__, cb) | |
247 finally: | |
248 tp.stop() | |
249 | |
250 | |
251 | |
252 class RaceConditionTestCase(unittest.TestCase): | |
253 def setUp(self): | |
254 self.event = threading.Event() | |
255 self.threadpool = threadpool.ThreadPool(0, 10) | |
256 self.threadpool.start() | |
257 | |
258 | |
259 def tearDown(self): | |
260 del self.event | |
261 self.threadpool.stop() | |
262 del self.threadpool | |
263 | |
264 | |
265 def test_synchronization(self): | |
266 """ | |
267 Test a race condition: ensure that actions run in the pool synchronize | |
268 with actions run in the main thread. | |
269 """ | |
270 timeout = self.getTimeout() | |
271 self.threadpool.callInThread(self.event.set) | |
272 self.event.wait(timeout) | |
273 self.event.clear() | |
274 for i in range(3): | |
275 self.threadpool.callInThread(self.event.wait) | |
276 self.threadpool.callInThread(self.event.set) | |
277 self.event.wait(timeout) | |
278 if not self.event.isSet(): | |
279 self.event.set() | |
280 self.fail("Actions not synchronized") | |
281 | |
282 | |
283 def test_singleThread(self): | |
284 """ | |
285 Test that the creation of new threads in the pool occurs only when | |
286 more jobs are added and all existing threads are occupied. | |
287 """ | |
288 # Ensure no threads running | |
289 self.assertEquals(self.threadpool.workers, 0) | |
290 timeout = self.getTimeout() | |
291 for i in range(10): | |
292 self.threadpool.callInThread(self.event.set) | |
293 self.event.wait(timeout) | |
294 self.event.clear() | |
295 | |
296 # Ensure there are very few threads running | |
297 self.failUnless(self.threadpool.workers <= 2) | |
298 | |
299 | |
300 | |
301 if interfaces.IReactorThreads(reactor, None) is None: | |
302 for cls in ThreadPoolTestCase, RaceConditionTestCase: | |
303 setattr(cls, 'skip', "No thread support, nothing to test here") | |
304 else: | |
305 import threading | |
306 from twisted.python import threadpool | |
307 | |
OLD | NEW |