| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 """ | |
| 6 Test methods in twisted.internet.threads and reactor thread APIs. | |
| 7 """ | |
| 8 | |
| 9 import sys, os, time | |
| 10 | |
| 11 from twisted.trial import unittest | |
| 12 | |
| 13 from twisted.internet import reactor, defer, interfaces, threads, protocol, erro
r | |
| 14 from twisted.python import failure, threadable, log | |
| 15 | |
| 16 class ReactorThreadsTestCase(unittest.TestCase): | |
| 17 """ | |
| 18 Tests for the reactor threading API. | |
| 19 """ | |
| 20 | |
| 21 def test_suggestThreadPoolSize(self): | |
| 22 """ | |
| 23 Try to change maximum number of threads. | |
| 24 """ | |
| 25 reactor.suggestThreadPoolSize(34) | |
| 26 self.assertEquals(reactor.threadpool.max, 34) | |
| 27 reactor.suggestThreadPoolSize(4) | |
| 28 self.assertEquals(reactor.threadpool.max, 4) | |
| 29 | |
| 30 | |
| 31 def _waitForThread(self): | |
| 32 """ | |
| 33 The reactor's threadpool is only available when the reactor is running, | |
| 34 so to have a sane behavior during the tests we make a dummy | |
| 35 L{threads.deferToThread} call. | |
| 36 """ | |
| 37 return threads.deferToThread(time.sleep, 0) | |
| 38 | |
| 39 | |
| 40 def test_callInThread(self): | |
| 41 """ | |
| 42 Test callInThread functionality: set a C{threading.Event}, and check | |
| 43 that it's not in the main thread. | |
| 44 """ | |
| 45 def cb(ign): | |
| 46 waiter = threading.Event() | |
| 47 result = [] | |
| 48 def threadedFunc(): | |
| 49 result.append(threadable.isInIOThread()) | |
| 50 waiter.set() | |
| 51 | |
| 52 reactor.callInThread(threadedFunc) | |
| 53 waiter.wait(120) | |
| 54 if not waiter.isSet(): | |
| 55 self.fail("Timed out waiting for event.") | |
| 56 else: | |
| 57 self.assertEquals(result, [False]) | |
| 58 return self._waitForThread().addCallback(cb) | |
| 59 | |
| 60 | |
| 61 def test_callFromThread(self): | |
| 62 """ | |
| 63 Test callFromThread functionality: from the main thread, and from | |
| 64 another thread. | |
| 65 """ | |
| 66 def cb(ign): | |
| 67 firedByReactorThread = defer.Deferred() | |
| 68 firedByOtherThread = defer.Deferred() | |
| 69 | |
| 70 def threadedFunc(): | |
| 71 reactor.callFromThread(firedByOtherThread.callback, None) | |
| 72 | |
| 73 reactor.callInThread(threadedFunc) | |
| 74 reactor.callFromThread(firedByReactorThread.callback, None) | |
| 75 | |
| 76 return defer.DeferredList( | |
| 77 [firedByReactorThread, firedByOtherThread], | |
| 78 fireOnOneErrback=True) | |
| 79 return self._waitForThread().addCallback(cb) | |
| 80 | |
| 81 | |
| 82 def test_wakerOverflow(self): | |
| 83 """ | |
| 84 Try to make an overflow on the reactor waker using callFromThread. | |
| 85 """ | |
| 86 def cb(ign): | |
| 87 self.failure = None | |
| 88 waiter = threading.Event() | |
| 89 def threadedFunction(): | |
| 90 # Hopefully a hundred thousand queued calls is enough to | |
| 91 # trigger the error condition | |
| 92 for i in xrange(100000): | |
| 93 try: | |
| 94 reactor.callFromThread(lambda: None) | |
| 95 except: | |
| 96 self.failure = failure.Failure() | |
| 97 break | |
| 98 waiter.set() | |
| 99 reactor.callInThread(threadedFunction) | |
| 100 waiter.wait(120) | |
| 101 if not waiter.isSet(): | |
| 102 self.fail("Timed out waiting for event") | |
| 103 if self.failure is not None: | |
| 104 return defer.fail(self.failure) | |
| 105 return self._waitForThread().addCallback(cb) | |
| 106 | |
| 107 def _testBlockingCallFromThread(self, reactorFunc): | |
| 108 """ | |
| 109 Utility method to test L{threads.blockingCallFromThread}. | |
| 110 """ | |
| 111 waiter = threading.Event() | |
| 112 results = [] | |
| 113 errors = [] | |
| 114 def cb1(ign): | |
| 115 def threadedFunc(): | |
| 116 try: | |
| 117 r = threads.blockingCallFromThread(reactor, reactorFunc) | |
| 118 except Exception, e: | |
| 119 errors.append(e) | |
| 120 else: | |
| 121 results.append(r) | |
| 122 waiter.set() | |
| 123 | |
| 124 reactor.callInThread(threadedFunc) | |
| 125 return threads.deferToThread(waiter.wait, self.getTimeout()) | |
| 126 | |
| 127 def cb2(ign): | |
| 128 if not waiter.isSet(): | |
| 129 self.fail("Timed out waiting for event") | |
| 130 return results, errors | |
| 131 | |
| 132 return self._waitForThread().addCallback(cb1).addBoth(cb2) | |
| 133 | |
| 134 def test_blockingCallFromThread(self): | |
| 135 """ | |
| 136 Test blockingCallFromThread facility: create a thread, call a function | |
| 137 in the reactor using L{threads.blockingCallFromThread}, and verify the | |
| 138 result returned. | |
| 139 """ | |
| 140 def reactorFunc(): | |
| 141 return defer.succeed("foo") | |
| 142 def cb(res): | |
| 143 self.assertEquals(res[0][0], "foo") | |
| 144 | |
| 145 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
| 146 | |
| 147 def test_asyncBlockingCallFromThread(self): | |
| 148 """ | |
| 149 Test blockingCallFromThread as above, but be sure the resulting | |
| 150 Deferred is not already fired. | |
| 151 """ | |
| 152 def reactorFunc(): | |
| 153 d = defer.Deferred() | |
| 154 reactor.callLater(0.1, d.callback, "egg") | |
| 155 return d | |
| 156 def cb(res): | |
| 157 self.assertEquals(res[0][0], "egg") | |
| 158 | |
| 159 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
| 160 | |
| 161 def test_errorBlockingCallFromThread(self): | |
| 162 """ | |
| 163 Test error report for blockingCallFromThread. | |
| 164 """ | |
| 165 def reactorFunc(): | |
| 166 return defer.fail(RuntimeError("bar")) | |
| 167 def cb(res): | |
| 168 self.assert_(isinstance(res[1][0], RuntimeError)) | |
| 169 self.assertEquals(res[1][0].args[0], "bar") | |
| 170 | |
| 171 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
| 172 | |
| 173 def test_asyncErrorBlockingCallFromThread(self): | |
| 174 """ | |
| 175 Test error report for blockingCallFromThread as above, but be sure the | |
| 176 resulting Deferred is not already fired. | |
| 177 """ | |
| 178 def reactorFunc(): | |
| 179 d = defer.Deferred() | |
| 180 reactor.callLater(0.1, d.errback, RuntimeError("spam")) | |
| 181 return d | |
| 182 def cb(res): | |
| 183 self.assert_(isinstance(res[1][0], RuntimeError)) | |
| 184 self.assertEquals(res[1][0].args[0], "spam") | |
| 185 | |
| 186 return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) | |
| 187 | |
| 188 | |
| 189 class Counter: | |
| 190 index = 0 | |
| 191 problem = 0 | |
| 192 | |
| 193 def add(self): | |
| 194 """A non thread-safe method.""" | |
| 195 next = self.index + 1 | |
| 196 # another thread could jump in here and increment self.index on us | |
| 197 if next != self.index + 1: | |
| 198 self.problem = 1 | |
| 199 raise ValueError | |
| 200 # or here, same issue but we wouldn't catch it. We'd overwrite | |
| 201 # their results, and the index will have lost a count. If | |
| 202 # several threads get in here, we will actually make the count | |
| 203 # go backwards when we overwrite it. | |
| 204 self.index = next | |
| 205 | |
| 206 | |
| 207 | |
| 208 class DeferredResultTestCase(unittest.TestCase): | |
| 209 """ | |
| 210 Test twisted.internet.threads. | |
| 211 """ | |
| 212 | |
| 213 def setUp(self): | |
| 214 reactor.suggestThreadPoolSize(8) | |
| 215 | |
| 216 | |
| 217 def tearDown(self): | |
| 218 reactor.suggestThreadPoolSize(0) | |
| 219 | |
| 220 | |
| 221 def testCallMultiple(self): | |
| 222 L = [] | |
| 223 N = 10 | |
| 224 d = defer.Deferred() | |
| 225 | |
| 226 def finished(): | |
| 227 self.assertEquals(L, range(N)) | |
| 228 d.callback(None) | |
| 229 | |
| 230 threads.callMultipleInThread([ | |
| 231 (L.append, (i,), {}) for i in xrange(N) | |
| 232 ] + [(reactor.callFromThread, (finished,), {})]) | |
| 233 return d | |
| 234 | |
| 235 | |
| 236 def testDeferredResult(self): | |
| 237 d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4) | |
| 238 d.addCallback(self.assertEquals, 7) | |
| 239 return d | |
| 240 | |
| 241 | |
| 242 def testDeferredFailure(self): | |
| 243 class NewError(Exception): | |
| 244 pass | |
| 245 def raiseError(): | |
| 246 raise NewError | |
| 247 d = threads.deferToThread(raiseError) | |
| 248 return self.assertFailure(d, NewError) | |
| 249 | |
| 250 | |
| 251 def testDeferredFailure2(self): | |
| 252 # set up a condition that causes cReactor to hang. These conditions | |
| 253 # can also be set by other tests when the full test suite is run in | |
| 254 # alphabetical order (test_flow.FlowTest.testThreaded followed by | |
| 255 # test_internet.ReactorCoreTestCase.testStop, to be precise). By | |
| 256 # setting them up explicitly here, we can reproduce the hang in a | |
| 257 # single precise test case instead of depending upon side effects of | |
| 258 # other tests. | |
| 259 # | |
| 260 # alas, this test appears to flunk the default reactor too | |
| 261 | |
| 262 d = threads.deferToThread(lambda: None) | |
| 263 d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0)) | |
| 264 return self.assertFailure(d, ZeroDivisionError) | |
| 265 | |
| 266 | |
| 267 _callBeforeStartupProgram = """ | |
| 268 import time | |
| 269 import %(reactor)s | |
| 270 %(reactor)s.install() | |
| 271 | |
| 272 from twisted.internet import reactor | |
| 273 | |
| 274 def threadedCall(): | |
| 275 print 'threaded call' | |
| 276 | |
| 277 reactor.callInThread(threadedCall) | |
| 278 | |
| 279 # Spin very briefly to try to give the thread a chance to run, if it | |
| 280 # is going to. Is there a better way to achieve this behavior? | |
| 281 for i in xrange(100): | |
| 282 time.sleep(0.0) | |
| 283 """ | |
| 284 | |
| 285 | |
| 286 class ThreadStartupProcessProtocol(protocol.ProcessProtocol): | |
| 287 def __init__(self, finished): | |
| 288 self.finished = finished | |
| 289 self.out = [] | |
| 290 self.err = [] | |
| 291 | |
| 292 def outReceived(self, out): | |
| 293 self.out.append(out) | |
| 294 | |
| 295 def errReceived(self, err): | |
| 296 self.err.append(err) | |
| 297 | |
| 298 def processEnded(self, reason): | |
| 299 self.finished.callback((self.out, self.err, reason)) | |
| 300 | |
| 301 | |
| 302 | |
| 303 class StartupBehaviorTestCase(unittest.TestCase): | |
| 304 """ | |
| 305 Test cases for the behavior of the reactor threadpool near startup | |
| 306 boundary conditions. | |
| 307 | |
| 308 In particular, this asserts that no threaded calls are attempted | |
| 309 until the reactor starts up, that calls attempted before it starts | |
| 310 are in fact executed once it has started, and that in both cases, | |
| 311 the reactor properly cleans itself up (which is tested for | |
| 312 somewhat implicitly, by requiring a child process be able to exit, | |
| 313 something it cannot do unless the threadpool has been properly | |
| 314 torn down). | |
| 315 """ | |
| 316 | |
| 317 | |
| 318 def testCallBeforeStartupUnexecuted(self): | |
| 319 progname = self.mktemp() | |
| 320 progfile = file(progname, 'w') | |
| 321 progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module_
_}) | |
| 322 progfile.close() | |
| 323 | |
| 324 def programFinished((out, err, reason)): | |
| 325 if reason.check(error.ProcessTerminated): | |
| 326 self.fail("Process did not exit cleanly (out: %s err: %s)" % (ou
t, err)) | |
| 327 | |
| 328 if err: | |
| 329 log.msg("Unexpected output on standard error: %s" % (err,)) | |
| 330 self.failIf(out, "Expected no output, instead received:\n%s" % (out,
)) | |
| 331 | |
| 332 def programTimeout(err): | |
| 333 err.trap(error.TimeoutError) | |
| 334 proto.signalProcess('KILL') | |
| 335 return err | |
| 336 | |
| 337 env = os.environ.copy() | |
| 338 env['PYTHONPATH'] = os.pathsep.join(sys.path) | |
| 339 d = defer.Deferred().addCallbacks(programFinished, programTimeout) | |
| 340 proto = ThreadStartupProcessProtocol(d) | |
| 341 reactor.spawnProcess(proto, sys.executable, ('python', progname), env) | |
| 342 return d | |
| 343 | |
| 344 | |
| 345 | |
| 346 if interfaces.IReactorThreads(reactor, None) is None: | |
| 347 for cls in (ReactorThreadsTestCase, | |
| 348 DeferredResultTestCase, | |
| 349 StartupBehaviorTestCase): | |
| 350 cls.skip = "No thread support, nothing to test here." | |
| 351 else: | |
| 352 import threading | |
| 353 | |
| 354 if interfaces.IReactorProcess(reactor, None) is None: | |
| 355 for cls in (StartupBehaviorTestCase,): | |
| 356 cls.skip = "No process support, cannot run subprocess thread tests." | |
| OLD | NEW |