Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_threads.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 Test methods in twisted.internet.threads and reactor thread APIs.
7 """
8
9 import sys, os, time
10
11 from twisted.trial import unittest
12
13 from twisted.internet import reactor, defer, interfaces, threads, protocol, erro r
14 from twisted.python import failure, threadable, log
15
16 class ReactorThreadsTestCase(unittest.TestCase):
17 """
18 Tests for the reactor threading API.
19 """
20
21 def test_suggestThreadPoolSize(self):
22 """
23 Try to change maximum number of threads.
24 """
25 reactor.suggestThreadPoolSize(34)
26 self.assertEquals(reactor.threadpool.max, 34)
27 reactor.suggestThreadPoolSize(4)
28 self.assertEquals(reactor.threadpool.max, 4)
29
30
31 def _waitForThread(self):
32 """
33 The reactor's threadpool is only available when the reactor is running,
34 so to have a sane behavior during the tests we make a dummy
35 L{threads.deferToThread} call.
36 """
37 return threads.deferToThread(time.sleep, 0)
38
39
40 def test_callInThread(self):
41 """
42 Test callInThread functionality: set a C{threading.Event}, and check
43 that it's not in the main thread.
44 """
45 def cb(ign):
46 waiter = threading.Event()
47 result = []
48 def threadedFunc():
49 result.append(threadable.isInIOThread())
50 waiter.set()
51
52 reactor.callInThread(threadedFunc)
53 waiter.wait(120)
54 if not waiter.isSet():
55 self.fail("Timed out waiting for event.")
56 else:
57 self.assertEquals(result, [False])
58 return self._waitForThread().addCallback(cb)
59
60
61 def test_callFromThread(self):
62 """
63 Test callFromThread functionality: from the main thread, and from
64 another thread.
65 """
66 def cb(ign):
67 firedByReactorThread = defer.Deferred()
68 firedByOtherThread = defer.Deferred()
69
70 def threadedFunc():
71 reactor.callFromThread(firedByOtherThread.callback, None)
72
73 reactor.callInThread(threadedFunc)
74 reactor.callFromThread(firedByReactorThread.callback, None)
75
76 return defer.DeferredList(
77 [firedByReactorThread, firedByOtherThread],
78 fireOnOneErrback=True)
79 return self._waitForThread().addCallback(cb)
80
81
82 def test_wakerOverflow(self):
83 """
84 Try to make an overflow on the reactor waker using callFromThread.
85 """
86 def cb(ign):
87 self.failure = None
88 waiter = threading.Event()
89 def threadedFunction():
90 # Hopefully a hundred thousand queued calls is enough to
91 # trigger the error condition
92 for i in xrange(100000):
93 try:
94 reactor.callFromThread(lambda: None)
95 except:
96 self.failure = failure.Failure()
97 break
98 waiter.set()
99 reactor.callInThread(threadedFunction)
100 waiter.wait(120)
101 if not waiter.isSet():
102 self.fail("Timed out waiting for event")
103 if self.failure is not None:
104 return defer.fail(self.failure)
105 return self._waitForThread().addCallback(cb)
106
107 def _testBlockingCallFromThread(self, reactorFunc):
108 """
109 Utility method to test L{threads.blockingCallFromThread}.
110 """
111 waiter = threading.Event()
112 results = []
113 errors = []
114 def cb1(ign):
115 def threadedFunc():
116 try:
117 r = threads.blockingCallFromThread(reactor, reactorFunc)
118 except Exception, e:
119 errors.append(e)
120 else:
121 results.append(r)
122 waiter.set()
123
124 reactor.callInThread(threadedFunc)
125 return threads.deferToThread(waiter.wait, self.getTimeout())
126
127 def cb2(ign):
128 if not waiter.isSet():
129 self.fail("Timed out waiting for event")
130 return results, errors
131
132 return self._waitForThread().addCallback(cb1).addBoth(cb2)
133
134 def test_blockingCallFromThread(self):
135 """
136 Test blockingCallFromThread facility: create a thread, call a function
137 in the reactor using L{threads.blockingCallFromThread}, and verify the
138 result returned.
139 """
140 def reactorFunc():
141 return defer.succeed("foo")
142 def cb(res):
143 self.assertEquals(res[0][0], "foo")
144
145 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
146
147 def test_asyncBlockingCallFromThread(self):
148 """
149 Test blockingCallFromThread as above, but be sure the resulting
150 Deferred is not already fired.
151 """
152 def reactorFunc():
153 d = defer.Deferred()
154 reactor.callLater(0.1, d.callback, "egg")
155 return d
156 def cb(res):
157 self.assertEquals(res[0][0], "egg")
158
159 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
160
161 def test_errorBlockingCallFromThread(self):
162 """
163 Test error report for blockingCallFromThread.
164 """
165 def reactorFunc():
166 return defer.fail(RuntimeError("bar"))
167 def cb(res):
168 self.assert_(isinstance(res[1][0], RuntimeError))
169 self.assertEquals(res[1][0].args[0], "bar")
170
171 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
172
173 def test_asyncErrorBlockingCallFromThread(self):
174 """
175 Test error report for blockingCallFromThread as above, but be sure the
176 resulting Deferred is not already fired.
177 """
178 def reactorFunc():
179 d = defer.Deferred()
180 reactor.callLater(0.1, d.errback, RuntimeError("spam"))
181 return d
182 def cb(res):
183 self.assert_(isinstance(res[1][0], RuntimeError))
184 self.assertEquals(res[1][0].args[0], "spam")
185
186 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
187
188
189 class Counter:
190 index = 0
191 problem = 0
192
193 def add(self):
194 """A non thread-safe method."""
195 next = self.index + 1
196 # another thread could jump in here and increment self.index on us
197 if next != self.index + 1:
198 self.problem = 1
199 raise ValueError
200 # or here, same issue but we wouldn't catch it. We'd overwrite
201 # their results, and the index will have lost a count. If
202 # several threads get in here, we will actually make the count
203 # go backwards when we overwrite it.
204 self.index = next
205
206
207
208 class DeferredResultTestCase(unittest.TestCase):
209 """
210 Test twisted.internet.threads.
211 """
212
213 def setUp(self):
214 reactor.suggestThreadPoolSize(8)
215
216
217 def tearDown(self):
218 reactor.suggestThreadPoolSize(0)
219
220
221 def testCallMultiple(self):
222 L = []
223 N = 10
224 d = defer.Deferred()
225
226 def finished():
227 self.assertEquals(L, range(N))
228 d.callback(None)
229
230 threads.callMultipleInThread([
231 (L.append, (i,), {}) for i in xrange(N)
232 ] + [(reactor.callFromThread, (finished,), {})])
233 return d
234
235
236 def testDeferredResult(self):
237 d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4)
238 d.addCallback(self.assertEquals, 7)
239 return d
240
241
242 def testDeferredFailure(self):
243 class NewError(Exception):
244 pass
245 def raiseError():
246 raise NewError
247 d = threads.deferToThread(raiseError)
248 return self.assertFailure(d, NewError)
249
250
251 def testDeferredFailure2(self):
252 # set up a condition that causes cReactor to hang. These conditions
253 # can also be set by other tests when the full test suite is run in
254 # alphabetical order (test_flow.FlowTest.testThreaded followed by
255 # test_internet.ReactorCoreTestCase.testStop, to be precise). By
256 # setting them up explicitly here, we can reproduce the hang in a
257 # single precise test case instead of depending upon side effects of
258 # other tests.
259 #
260 # alas, this test appears to flunk the default reactor too
261
262 d = threads.deferToThread(lambda: None)
263 d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0))
264 return self.assertFailure(d, ZeroDivisionError)
265
266
267 _callBeforeStartupProgram = """
268 import time
269 import %(reactor)s
270 %(reactor)s.install()
271
272 from twisted.internet import reactor
273
274 def threadedCall():
275 print 'threaded call'
276
277 reactor.callInThread(threadedCall)
278
279 # Spin very briefly to try to give the thread a chance to run, if it
280 # is going to. Is there a better way to achieve this behavior?
281 for i in xrange(100):
282 time.sleep(0.0)
283 """
284
285
286 class ThreadStartupProcessProtocol(protocol.ProcessProtocol):
287 def __init__(self, finished):
288 self.finished = finished
289 self.out = []
290 self.err = []
291
292 def outReceived(self, out):
293 self.out.append(out)
294
295 def errReceived(self, err):
296 self.err.append(err)
297
298 def processEnded(self, reason):
299 self.finished.callback((self.out, self.err, reason))
300
301
302
303 class StartupBehaviorTestCase(unittest.TestCase):
304 """
305 Test cases for the behavior of the reactor threadpool near startup
306 boundary conditions.
307
308 In particular, this asserts that no threaded calls are attempted
309 until the reactor starts up, that calls attempted before it starts
310 are in fact executed once it has started, and that in both cases,
311 the reactor properly cleans itself up (which is tested for
312 somewhat implicitly, by requiring a child process be able to exit,
313 something it cannot do unless the threadpool has been properly
314 torn down).
315 """
316
317
318 def testCallBeforeStartupUnexecuted(self):
319 progname = self.mktemp()
320 progfile = file(progname, 'w')
321 progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module_ _})
322 progfile.close()
323
324 def programFinished((out, err, reason)):
325 if reason.check(error.ProcessTerminated):
326 self.fail("Process did not exit cleanly (out: %s err: %s)" % (ou t, err))
327
328 if err:
329 log.msg("Unexpected output on standard error: %s" % (err,))
330 self.failIf(out, "Expected no output, instead received:\n%s" % (out, ))
331
332 def programTimeout(err):
333 err.trap(error.TimeoutError)
334 proto.signalProcess('KILL')
335 return err
336
337 env = os.environ.copy()
338 env['PYTHONPATH'] = os.pathsep.join(sys.path)
339 d = defer.Deferred().addCallbacks(programFinished, programTimeout)
340 proto = ThreadStartupProcessProtocol(d)
341 reactor.spawnProcess(proto, sys.executable, ('python', progname), env)
342 return d
343
344
345
346 if interfaces.IReactorThreads(reactor, None) is None:
347 for cls in (ReactorThreadsTestCase,
348 DeferredResultTestCase,
349 StartupBehaviorTestCase):
350 cls.skip = "No thread support, nothing to test here."
351 else:
352 import threading
353
354 if interfaces.IReactorProcess(reactor, None) is None:
355 for cls in (StartupBehaviorTestCase,):
356 cls.skip = "No process support, cannot run subprocess thread tests."
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_threadpool.py ('k') | third_party/twisted_8_1/twisted/test/test_timeoutqueue.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698