Index: third_party/twisted_8_1/twisted/test/test_threads.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_threads.py b/third_party/twisted_8_1/twisted/test/test_threads.py |
deleted file mode 100644 |
index 33fd42aec8c4d73292ba13b22b864267636219fb..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_threads.py |
+++ /dev/null |
@@ -1,356 +0,0 @@ |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-""" |
-Test methods in twisted.internet.threads and reactor thread APIs. |
-""" |
- |
-import sys, os, time |
- |
-from twisted.trial import unittest |
- |
-from twisted.internet import reactor, defer, interfaces, threads, protocol, error |
-from twisted.python import failure, threadable, log |
- |
-class ReactorThreadsTestCase(unittest.TestCase): |
- """ |
- Tests for the reactor threading API. |
- """ |
- |
- def test_suggestThreadPoolSize(self): |
- """ |
- Try to change maximum number of threads. |
- """ |
- reactor.suggestThreadPoolSize(34) |
- self.assertEquals(reactor.threadpool.max, 34) |
- reactor.suggestThreadPoolSize(4) |
- self.assertEquals(reactor.threadpool.max, 4) |
- |
- |
- def _waitForThread(self): |
- """ |
- The reactor's threadpool is only available when the reactor is running, |
- so to have a sane behavior during the tests we make a dummy |
- L{threads.deferToThread} call. |
- """ |
- return threads.deferToThread(time.sleep, 0) |
- |
- |
- def test_callInThread(self): |
- """ |
- Test callInThread functionality: set a C{threading.Event}, and check |
- that it's not in the main thread. |
- """ |
- def cb(ign): |
- waiter = threading.Event() |
- result = [] |
- def threadedFunc(): |
- result.append(threadable.isInIOThread()) |
- waiter.set() |
- |
- reactor.callInThread(threadedFunc) |
- waiter.wait(120) |
- if not waiter.isSet(): |
- self.fail("Timed out waiting for event.") |
- else: |
- self.assertEquals(result, [False]) |
- return self._waitForThread().addCallback(cb) |
- |
- |
- def test_callFromThread(self): |
- """ |
- Test callFromThread functionality: from the main thread, and from |
- another thread. |
- """ |
- def cb(ign): |
- firedByReactorThread = defer.Deferred() |
- firedByOtherThread = defer.Deferred() |
- |
- def threadedFunc(): |
- reactor.callFromThread(firedByOtherThread.callback, None) |
- |
- reactor.callInThread(threadedFunc) |
- reactor.callFromThread(firedByReactorThread.callback, None) |
- |
- return defer.DeferredList( |
- [firedByReactorThread, firedByOtherThread], |
- fireOnOneErrback=True) |
- return self._waitForThread().addCallback(cb) |
- |
- |
- def test_wakerOverflow(self): |
- """ |
- Try to make an overflow on the reactor waker using callFromThread. |
- """ |
- def cb(ign): |
- self.failure = None |
- waiter = threading.Event() |
- def threadedFunction(): |
- # Hopefully a hundred thousand queued calls is enough to |
- # trigger the error condition |
- for i in xrange(100000): |
- try: |
- reactor.callFromThread(lambda: None) |
- except: |
- self.failure = failure.Failure() |
- break |
- waiter.set() |
- reactor.callInThread(threadedFunction) |
- waiter.wait(120) |
- if not waiter.isSet(): |
- self.fail("Timed out waiting for event") |
- if self.failure is not None: |
- return defer.fail(self.failure) |
- return self._waitForThread().addCallback(cb) |
- |
- def _testBlockingCallFromThread(self, reactorFunc): |
- """ |
- Utility method to test L{threads.blockingCallFromThread}. |
- """ |
- waiter = threading.Event() |
- results = [] |
- errors = [] |
- def cb1(ign): |
- def threadedFunc(): |
- try: |
- r = threads.blockingCallFromThread(reactor, reactorFunc) |
- except Exception, e: |
- errors.append(e) |
- else: |
- results.append(r) |
- waiter.set() |
- |
- reactor.callInThread(threadedFunc) |
- return threads.deferToThread(waiter.wait, self.getTimeout()) |
- |
- def cb2(ign): |
- if not waiter.isSet(): |
- self.fail("Timed out waiting for event") |
- return results, errors |
- |
- return self._waitForThread().addCallback(cb1).addBoth(cb2) |
- |
- def test_blockingCallFromThread(self): |
- """ |
- Test blockingCallFromThread facility: create a thread, call a function |
- in the reactor using L{threads.blockingCallFromThread}, and verify the |
- result returned. |
- """ |
- def reactorFunc(): |
- return defer.succeed("foo") |
- def cb(res): |
- self.assertEquals(res[0][0], "foo") |
- |
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) |
- |
- def test_asyncBlockingCallFromThread(self): |
- """ |
- Test blockingCallFromThread as above, but be sure the resulting |
- Deferred is not already fired. |
- """ |
- def reactorFunc(): |
- d = defer.Deferred() |
- reactor.callLater(0.1, d.callback, "egg") |
- return d |
- def cb(res): |
- self.assertEquals(res[0][0], "egg") |
- |
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) |
- |
- def test_errorBlockingCallFromThread(self): |
- """ |
- Test error report for blockingCallFromThread. |
- """ |
- def reactorFunc(): |
- return defer.fail(RuntimeError("bar")) |
- def cb(res): |
- self.assert_(isinstance(res[1][0], RuntimeError)) |
- self.assertEquals(res[1][0].args[0], "bar") |
- |
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) |
- |
- def test_asyncErrorBlockingCallFromThread(self): |
- """ |
- Test error report for blockingCallFromThread as above, but be sure the |
- resulting Deferred is not already fired. |
- """ |
- def reactorFunc(): |
- d = defer.Deferred() |
- reactor.callLater(0.1, d.errback, RuntimeError("spam")) |
- return d |
- def cb(res): |
- self.assert_(isinstance(res[1][0], RuntimeError)) |
- self.assertEquals(res[1][0].args[0], "spam") |
- |
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb) |
- |
- |
-class Counter: |
- index = 0 |
- problem = 0 |
- |
- def add(self): |
- """A non thread-safe method.""" |
- next = self.index + 1 |
- # another thread could jump in here and increment self.index on us |
- if next != self.index + 1: |
- self.problem = 1 |
- raise ValueError |
- # or here, same issue but we wouldn't catch it. We'd overwrite |
- # their results, and the index will have lost a count. If |
- # several threads get in here, we will actually make the count |
- # go backwards when we overwrite it. |
- self.index = next |
- |
- |
- |
-class DeferredResultTestCase(unittest.TestCase): |
- """ |
- Test twisted.internet.threads. |
- """ |
- |
- def setUp(self): |
- reactor.suggestThreadPoolSize(8) |
- |
- |
- def tearDown(self): |
- reactor.suggestThreadPoolSize(0) |
- |
- |
- def testCallMultiple(self): |
- L = [] |
- N = 10 |
- d = defer.Deferred() |
- |
- def finished(): |
- self.assertEquals(L, range(N)) |
- d.callback(None) |
- |
- threads.callMultipleInThread([ |
- (L.append, (i,), {}) for i in xrange(N) |
- ] + [(reactor.callFromThread, (finished,), {})]) |
- return d |
- |
- |
- def testDeferredResult(self): |
- d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4) |
- d.addCallback(self.assertEquals, 7) |
- return d |
- |
- |
- def testDeferredFailure(self): |
- class NewError(Exception): |
- pass |
- def raiseError(): |
- raise NewError |
- d = threads.deferToThread(raiseError) |
- return self.assertFailure(d, NewError) |
- |
- |
- def testDeferredFailure2(self): |
- # set up a condition that causes cReactor to hang. These conditions |
- # can also be set by other tests when the full test suite is run in |
- # alphabetical order (test_flow.FlowTest.testThreaded followed by |
- # test_internet.ReactorCoreTestCase.testStop, to be precise). By |
- # setting them up explicitly here, we can reproduce the hang in a |
- # single precise test case instead of depending upon side effects of |
- # other tests. |
- # |
- # alas, this test appears to flunk the default reactor too |
- |
- d = threads.deferToThread(lambda: None) |
- d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0)) |
- return self.assertFailure(d, ZeroDivisionError) |
- |
- |
-_callBeforeStartupProgram = """ |
-import time |
-import %(reactor)s |
-%(reactor)s.install() |
- |
-from twisted.internet import reactor |
- |
-def threadedCall(): |
- print 'threaded call' |
- |
-reactor.callInThread(threadedCall) |
- |
-# Spin very briefly to try to give the thread a chance to run, if it |
-# is going to. Is there a better way to achieve this behavior? |
-for i in xrange(100): |
- time.sleep(0.0) |
-""" |
- |
- |
-class ThreadStartupProcessProtocol(protocol.ProcessProtocol): |
- def __init__(self, finished): |
- self.finished = finished |
- self.out = [] |
- self.err = [] |
- |
- def outReceived(self, out): |
- self.out.append(out) |
- |
- def errReceived(self, err): |
- self.err.append(err) |
- |
- def processEnded(self, reason): |
- self.finished.callback((self.out, self.err, reason)) |
- |
- |
- |
-class StartupBehaviorTestCase(unittest.TestCase): |
- """ |
- Test cases for the behavior of the reactor threadpool near startup |
- boundary conditions. |
- |
- In particular, this asserts that no threaded calls are attempted |
- until the reactor starts up, that calls attempted before it starts |
- are in fact executed once it has started, and that in both cases, |
- the reactor properly cleans itself up (which is tested for |
- somewhat implicitly, by requiring a child process be able to exit, |
- something it cannot do unless the threadpool has been properly |
- torn down). |
- """ |
- |
- |
- def testCallBeforeStartupUnexecuted(self): |
- progname = self.mktemp() |
- progfile = file(progname, 'w') |
- progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module__}) |
- progfile.close() |
- |
- def programFinished((out, err, reason)): |
- if reason.check(error.ProcessTerminated): |
- self.fail("Process did not exit cleanly (out: %s err: %s)" % (out, err)) |
- |
- if err: |
- log.msg("Unexpected output on standard error: %s" % (err,)) |
- self.failIf(out, "Expected no output, instead received:\n%s" % (out,)) |
- |
- def programTimeout(err): |
- err.trap(error.TimeoutError) |
- proto.signalProcess('KILL') |
- return err |
- |
- env = os.environ.copy() |
- env['PYTHONPATH'] = os.pathsep.join(sys.path) |
- d = defer.Deferred().addCallbacks(programFinished, programTimeout) |
- proto = ThreadStartupProcessProtocol(d) |
- reactor.spawnProcess(proto, sys.executable, ('python', progname), env) |
- return d |
- |
- |
- |
-if interfaces.IReactorThreads(reactor, None) is None: |
- for cls in (ReactorThreadsTestCase, |
- DeferredResultTestCase, |
- StartupBehaviorTestCase): |
- cls.skip = "No thread support, nothing to test here." |
-else: |
- import threading |
- |
-if interfaces.IReactorProcess(reactor, None) is None: |
- for cls in (StartupBehaviorTestCase,): |
- cls.skip = "No process support, cannot run subprocess thread tests." |