OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 | |
5 """ | |
6 Test methods in twisted.internet.threads and reactor thread APIs. | |
7 """ | |
8 | |
9 import sys, os, time | |
10 | |
11 from twisted.trial import unittest | |
12 | |
13 from twisted.internet import reactor, defer, interfaces, threads, protocol, erro
r | |
14 from twisted.python import failure, threadable, log | |
15 | |
16 class ReactorThreadsTestCase(unittest.TestCase): | |
17 """ | |
18 Tests for the reactor threading API. | |
19 """ | |
20 | |
21 def test_suggestThreadPoolSize(self): | |
22 """ | |
23 Try to change maximum number of threads. | |
24 """ | |
25 reactor.suggestThreadPoolSize(34) | |
26 self.assertEquals(reactor.threadpool.max, 34) | |
27 reactor.suggestThreadPoolSize(4) | |
28 self.assertEquals(reactor.threadpool.max, 4) | |
29 | |
30 | |
31 def _waitForThread(self): | |
32 """ | |
33 The reactor's threadpool is only available when the reactor is running, | |
34 so to have a sane behavior during the tests we make a dummy | |
35 L{threads.deferToThread} call. | |
36 """ | |
37 return threads.deferToThread(time.sleep, 0) | |
38 | |
39 | |
40 def test_callInThread(self): | |
41 """ | |
42 Test callInThread functionality: set a C{threading.Event}, and check | |
43 that it's not in the main thread. | |
44 """ | |
45 def cb(ign): | |
46 waiter = threading.Event() | |
47 result = [] | |
48 def threadedFunc(): | |
49 result.append(threadable.isInIOThread()) | |
50 waiter.set() | |
51 | |
52 reactor.callInThread(threadedFunc) | |
53 waiter.wait(120) | |
54 if not waiter.isSet(): | |
55 self.fail("Timed out waiting for event.") | |
56 else: | |
57 self.assertEquals(result, [False]) | |
58 return self._waitForThread().addCallback(cb) | |
59 | |
60 | |
61 def test_callFromThread(self): | |
62 """ | |
63 Test callFromThread functionality: from the main thread, and from | |
64 another thread. | |
65 """ | |
66 def cb(ign): | |
67 firedByReactorThread = defer.Deferred() | |
68 firedByOtherThread = defer.Deferred() | |
69 | |
70 def threadedFunc(): | |
71 reactor.callFromThread(firedByOtherThread.callback, None) | |
72 | |
73 reactor.callInThread(threadedFunc) | |
74 reactor.callFromThread(firedByReactorThread.callback, None) | |
75 | |
76 return defer.DeferredList( | |
77 [firedByReactorThread, firedByOtherThread], | |
78 fireOnOneErrback=True) | |
79 return self._waitForThread().addCallback(cb) | |
80 | |
81 | |
82 def test_wakerOverflow(self): | |
83 """ | |
84 Try to make an overflow on the reactor waker using callFromThread. | |
85 """ | |
86 def cb(ign): | |
87 self.failure = None | |
88 waiter = threading.Event() | |
89 def threadedFunction(): | |
90 # Hopefully a hundred thousand queued calls is enough to | |
91 # trigger the error condition | |
92 for i in xrange(100000): | |
93 try: | |
94 reactor.callFromThread(lambda: None) | |
95 except: | |
96 self.failure = failure.Failure() | |
97 break | |
98 waiter.set() | |
99 reactor.callInThread(threadedFunction) | |
100 waiter.wait(120) | |
101 if not waiter.isSet(): | |
102 self.fail("Timed out waiting for event") | |
103 if self.failure is not None: | |
104 return defer.fail(self.failure) | |
105 return self._waitForThread().addCallback(cb) | |
106 | |
107 def _testBlockingCallFromThread(self, reactorFunc): | |
108 """ | |
109 Utility method to test L{threads.blockingCallFromThread}. | |
110 """ | |
111 waiter = threading.Event() | |
112 results = [] | |
113 errors = [] | |
114 def cb1(ign): | |
115 def threadedFunc(): | |
116 try: | |
117 r = threads.blockingCallFromThread(reactor, reactorFunc) | |
118 except Exception, e: | |
119 errors.append(e) | |
120 else: | |
121 results.append(r) | |
122 waiter.set() | |
123 | |
124 reactor.callInThread(threadedFunc) | |
125 return threads.deferToThread(waiter.wait, self.getTimeout()) | |
126 | |
127 def cb2(ign): | |
128 if not waiter.isSet(): | |
129 self.fail("Timed out waiting for event") | |
130 return results, errors | |
131 | |
132 return self._waitForThread().addCallback(cb1).addBoth(cb2) | |
133 | |
134 def test_blockingCallFromThread(self): | |
135 """ | |
136 Test blockingCallFromThread facility: create a thread, call a function | |
137 in the reactor using L{threads.blockingCallFromThread}, and verify the | |
138 result returned. | |
139 """ | |
140 def reactorFunc(): | |
141 return defer.succeed("foo") | |
142 def cb(res): | |
143 self.assertEquals(res[0][0], "foo") | |
144 | |
145 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
146 | |
147 def test_asyncBlockingCallFromThread(self): | |
148 """ | |
149 Test blockingCallFromThread as above, but be sure the resulting | |
150 Deferred is not already fired. | |
151 """ | |
152 def reactorFunc(): | |
153 d = defer.Deferred() | |
154 reactor.callLater(0.1, d.callback, "egg") | |
155 return d | |
156 def cb(res): | |
157 self.assertEquals(res[0][0], "egg") | |
158 | |
159 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
160 | |
161 def test_errorBlockingCallFromThread(self): | |
162 """ | |
163 Test error report for blockingCallFromThread. | |
164 """ | |
165 def reactorFunc(): | |
166 return defer.fail(RuntimeError("bar")) | |
167 def cb(res): | |
168 self.assert_(isinstance(res[1][0], RuntimeError)) | |
169 self.assertEquals(res[1][0].args[0], "bar") | |
170 | |
171 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
172 | |
173 def test_asyncErrorBlockingCallFromThread(self): | |
174 """ | |
175 Test error report for blockingCallFromThread as above, but be sure the | |
176 resulting Deferred is not already fired. | |
177 """ | |
178 def reactorFunc(): | |
179 d = defer.Deferred() | |
180 reactor.callLater(0.1, d.errback, RuntimeError("spam")) | |
181 return d | |
182 def cb(res): | |
183 self.assert_(isinstance(res[1][0], RuntimeError)) | |
184 self.assertEquals(res[1][0].args[0], "spam") | |
185 | |
186 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
187 | |
188 | |
189 class Counter: | |
190 index = 0 | |
191 problem = 0 | |
192 | |
193 def add(self): | |
194 """A non thread-safe method.""" | |
195 next = self.index + 1 | |
196 # another thread could jump in here and increment self.index on us | |
197 if next != self.index + 1: | |
198 self.problem = 1 | |
199 raise ValueError | |
200 # or here, same issue but we wouldn't catch it. We'd overwrite | |
201 # their results, and the index will have lost a count. If | |
202 # several threads get in here, we will actually make the count | |
203 # go backwards when we overwrite it. | |
204 self.index = next | |
205 | |
206 | |
207 | |
208 class DeferredResultTestCase(unittest.TestCase): | |
209 """ | |
210 Test twisted.internet.threads. | |
211 """ | |
212 | |
213 def setUp(self): | |
214 reactor.suggestThreadPoolSize(8) | |
215 | |
216 | |
217 def tearDown(self): | |
218 reactor.suggestThreadPoolSize(0) | |
219 | |
220 | |
221 def testCallMultiple(self): | |
222 L = [] | |
223 N = 10 | |
224 d = defer.Deferred() | |
225 | |
226 def finished(): | |
227 self.assertEquals(L, range(N)) | |
228 d.callback(None) | |
229 | |
230 threads.callMultipleInThread([ | |
231 (L.append, (i,), {}) for i in xrange(N) | |
232 ] + [(reactor.callFromThread, (finished,), {})]) | |
233 return d | |
234 | |
235 | |
236 def testDeferredResult(self): | |
237 d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4) | |
238 d.addCallback(self.assertEquals, 7) | |
239 return d | |
240 | |
241 | |
242 def testDeferredFailure(self): | |
243 class NewError(Exception): | |
244 pass | |
245 def raiseError(): | |
246 raise NewError | |
247 d = threads.deferToThread(raiseError) | |
248 return self.assertFailure(d, NewError) | |
249 | |
250 | |
251 def testDeferredFailure2(self): | |
252 # set up a condition that causes cReactor to hang. These conditions | |
253 # can also be set by other tests when the full test suite is run in | |
254 # alphabetical order (test_flow.FlowTest.testThreaded followed by | |
255 # test_internet.ReactorCoreTestCase.testStop, to be precise). By | |
256 # setting them up explicitly here, we can reproduce the hang in a | |
257 # single precise test case instead of depending upon side effects of | |
258 # other tests. | |
259 # | |
260 # alas, this test appears to flunk the default reactor too | |
261 | |
262 d = threads.deferToThread(lambda: None) | |
263 d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0)) | |
264 return self.assertFailure(d, ZeroDivisionError) | |
265 | |
266 | |
267 _callBeforeStartupProgram = """ | |
268 import time | |
269 import %(reactor)s | |
270 %(reactor)s.install() | |
271 | |
272 from twisted.internet import reactor | |
273 | |
274 def threadedCall(): | |
275 print 'threaded call' | |
276 | |
277 reactor.callInThread(threadedCall) | |
278 | |
279 # Spin very briefly to try to give the thread a chance to run, if it | |
280 # is going to. Is there a better way to achieve this behavior? | |
281 for i in xrange(100): | |
282 time.sleep(0.0) | |
283 """ | |
284 | |
285 | |
286 class ThreadStartupProcessProtocol(protocol.ProcessProtocol): | |
287 def __init__(self, finished): | |
288 self.finished = finished | |
289 self.out = [] | |
290 self.err = [] | |
291 | |
292 def outReceived(self, out): | |
293 self.out.append(out) | |
294 | |
295 def errReceived(self, err): | |
296 self.err.append(err) | |
297 | |
298 def processEnded(self, reason): | |
299 self.finished.callback((self.out, self.err, reason)) | |
300 | |
301 | |
302 | |
303 class StartupBehaviorTestCase(unittest.TestCase): | |
304 """ | |
305 Test cases for the behavior of the reactor threadpool near startup | |
306 boundary conditions. | |
307 | |
308 In particular, this asserts that no threaded calls are attempted | |
309 until the reactor starts up, that calls attempted before it starts | |
310 are in fact executed once it has started, and that in both cases, | |
311 the reactor properly cleans itself up (which is tested for | |
312 somewhat implicitly, by requiring a child process be able to exit, | |
313 something it cannot do unless the threadpool has been properly | |
314 torn down). | |
315 """ | |
316 | |
317 | |
318 def testCallBeforeStartupUnexecuted(self): | |
319 progname = self.mktemp() | |
320 progfile = file(progname, 'w') | |
321 progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module_
_}) | |
322 progfile.close() | |
323 | |
324 def programFinished((out, err, reason)): | |
325 if reason.check(error.ProcessTerminated): | |
326 self.fail("Process did not exit cleanly (out: %s err: %s)" % (ou
t, err)) | |
327 | |
328 if err: | |
329 log.msg("Unexpected output on standard error: %s" % (err,)) | |
330 self.failIf(out, "Expected no output, instead received:\n%s" % (out,
)) | |
331 | |
332 def programTimeout(err): | |
333 err.trap(error.TimeoutError) | |
334 proto.signalProcess('KILL') | |
335 return err | |
336 | |
337 env = os.environ.copy() | |
338 env['PYTHONPATH'] = os.pathsep.join(sys.path) | |
339 d = defer.Deferred().addCallbacks(programFinished, programTimeout) | |
340 proto = ThreadStartupProcessProtocol(d) | |
341 reactor.spawnProcess(proto, sys.executable, ('python', progname), env) | |
342 return d | |
343 | |
344 | |
345 | |
346 if interfaces.IReactorThreads(reactor, None) is None: | |
347 for cls in (ReactorThreadsTestCase, | |
348 DeferredResultTestCase, | |
349 StartupBehaviorTestCase): | |
350 cls.skip = "No thread support, nothing to test here." | |
351 else: | |
352 import threading | |
353 | |
354 if interfaces.IReactorProcess(reactor, None) is None: | |
355 for cls in (StartupBehaviorTestCase,): | |
356 cls.skip = "No process support, cannot run subprocess thread tests." | |
OLD | NEW |