Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Unified Diff: third_party/twisted_8_1/twisted/test/test_threads.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_threads.py
diff --git a/third_party/twisted_8_1/twisted/test/test_threads.py b/third_party/twisted_8_1/twisted/test/test_threads.py
deleted file mode 100644
index 33fd42aec8c4d73292ba13b22b864267636219fb..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_threads.py
+++ /dev/null
@@ -1,356 +0,0 @@
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-Test methods in twisted.internet.threads and reactor thread APIs.
-"""
-
-import sys, os, time
-
-from twisted.trial import unittest
-
-from twisted.internet import reactor, defer, interfaces, threads, protocol, error
-from twisted.python import failure, threadable, log
-
-class ReactorThreadsTestCase(unittest.TestCase):
- """
- Tests for the reactor threading API.
- """
-
- def test_suggestThreadPoolSize(self):
- """
- Try to change maximum number of threads.
- """
- reactor.suggestThreadPoolSize(34)
- self.assertEquals(reactor.threadpool.max, 34)
- reactor.suggestThreadPoolSize(4)
- self.assertEquals(reactor.threadpool.max, 4)
-
-
- def _waitForThread(self):
- """
- The reactor's threadpool is only available when the reactor is running,
- so to have a sane behavior during the tests we make a dummy
- L{threads.deferToThread} call.
- """
- return threads.deferToThread(time.sleep, 0)
-
-
- def test_callInThread(self):
- """
- Test callInThread functionality: set a C{threading.Event}, and check
- that it's not in the main thread.
- """
- def cb(ign):
- waiter = threading.Event()
- result = []
- def threadedFunc():
- result.append(threadable.isInIOThread())
- waiter.set()
-
- reactor.callInThread(threadedFunc)
- waiter.wait(120)
- if not waiter.isSet():
- self.fail("Timed out waiting for event.")
- else:
- self.assertEquals(result, [False])
- return self._waitForThread().addCallback(cb)
-
-
- def test_callFromThread(self):
- """
- Test callFromThread functionality: from the main thread, and from
- another thread.
- """
- def cb(ign):
- firedByReactorThread = defer.Deferred()
- firedByOtherThread = defer.Deferred()
-
- def threadedFunc():
- reactor.callFromThread(firedByOtherThread.callback, None)
-
- reactor.callInThread(threadedFunc)
- reactor.callFromThread(firedByReactorThread.callback, None)
-
- return defer.DeferredList(
- [firedByReactorThread, firedByOtherThread],
- fireOnOneErrback=True)
- return self._waitForThread().addCallback(cb)
-
-
- def test_wakerOverflow(self):
- """
- Try to make an overflow on the reactor waker using callFromThread.
- """
- def cb(ign):
- self.failure = None
- waiter = threading.Event()
- def threadedFunction():
- # Hopefully a hundred thousand queued calls is enough to
- # trigger the error condition
- for i in xrange(100000):
- try:
- reactor.callFromThread(lambda: None)
- except:
- self.failure = failure.Failure()
- break
- waiter.set()
- reactor.callInThread(threadedFunction)
- waiter.wait(120)
- if not waiter.isSet():
- self.fail("Timed out waiting for event")
- if self.failure is not None:
- return defer.fail(self.failure)
- return self._waitForThread().addCallback(cb)
-
- def _testBlockingCallFromThread(self, reactorFunc):
- """
- Utility method to test L{threads.blockingCallFromThread}.
- """
- waiter = threading.Event()
- results = []
- errors = []
- def cb1(ign):
- def threadedFunc():
- try:
- r = threads.blockingCallFromThread(reactor, reactorFunc)
- except Exception, e:
- errors.append(e)
- else:
- results.append(r)
- waiter.set()
-
- reactor.callInThread(threadedFunc)
- return threads.deferToThread(waiter.wait, self.getTimeout())
-
- def cb2(ign):
- if not waiter.isSet():
- self.fail("Timed out waiting for event")
- return results, errors
-
- return self._waitForThread().addCallback(cb1).addBoth(cb2)
-
- def test_blockingCallFromThread(self):
- """
- Test blockingCallFromThread facility: create a thread, call a function
- in the reactor using L{threads.blockingCallFromThread}, and verify the
- result returned.
- """
- def reactorFunc():
- return defer.succeed("foo")
- def cb(res):
- self.assertEquals(res[0][0], "foo")
-
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
-
- def test_asyncBlockingCallFromThread(self):
- """
- Test blockingCallFromThread as above, but be sure the resulting
- Deferred is not already fired.
- """
- def reactorFunc():
- d = defer.Deferred()
- reactor.callLater(0.1, d.callback, "egg")
- return d
- def cb(res):
- self.assertEquals(res[0][0], "egg")
-
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
-
- def test_errorBlockingCallFromThread(self):
- """
- Test error report for blockingCallFromThread.
- """
- def reactorFunc():
- return defer.fail(RuntimeError("bar"))
- def cb(res):
- self.assert_(isinstance(res[1][0], RuntimeError))
- self.assertEquals(res[1][0].args[0], "bar")
-
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
-
- def test_asyncErrorBlockingCallFromThread(self):
- """
- Test error report for blockingCallFromThread as above, but be sure the
- resulting Deferred is not already fired.
- """
- def reactorFunc():
- d = defer.Deferred()
- reactor.callLater(0.1, d.errback, RuntimeError("spam"))
- return d
- def cb(res):
- self.assert_(isinstance(res[1][0], RuntimeError))
- self.assertEquals(res[1][0].args[0], "spam")
-
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
-
-
-class Counter:
- index = 0
- problem = 0
-
- def add(self):
- """A non thread-safe method."""
- next = self.index + 1
- # another thread could jump in here and increment self.index on us
- if next != self.index + 1:
- self.problem = 1
- raise ValueError
- # or here, same issue but we wouldn't catch it. We'd overwrite
- # their results, and the index will have lost a count. If
- # several threads get in here, we will actually make the count
- # go backwards when we overwrite it.
- self.index = next
-
-
-
-class DeferredResultTestCase(unittest.TestCase):
- """
- Test twisted.internet.threads.
- """
-
- def setUp(self):
- reactor.suggestThreadPoolSize(8)
-
-
- def tearDown(self):
- reactor.suggestThreadPoolSize(0)
-
-
- def testCallMultiple(self):
- L = []
- N = 10
- d = defer.Deferred()
-
- def finished():
- self.assertEquals(L, range(N))
- d.callback(None)
-
- threads.callMultipleInThread([
- (L.append, (i,), {}) for i in xrange(N)
- ] + [(reactor.callFromThread, (finished,), {})])
- return d
-
-
- def testDeferredResult(self):
- d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4)
- d.addCallback(self.assertEquals, 7)
- return d
-
-
- def testDeferredFailure(self):
- class NewError(Exception):
- pass
- def raiseError():
- raise NewError
- d = threads.deferToThread(raiseError)
- return self.assertFailure(d, NewError)
-
-
- def testDeferredFailure2(self):
- # set up a condition that causes cReactor to hang. These conditions
- # can also be set by other tests when the full test suite is run in
- # alphabetical order (test_flow.FlowTest.testThreaded followed by
- # test_internet.ReactorCoreTestCase.testStop, to be precise). By
- # setting them up explicitly here, we can reproduce the hang in a
- # single precise test case instead of depending upon side effects of
- # other tests.
- #
- # alas, this test appears to flunk the default reactor too
-
- d = threads.deferToThread(lambda: None)
- d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0))
- return self.assertFailure(d, ZeroDivisionError)
-
-
-_callBeforeStartupProgram = """
-import time
-import %(reactor)s
-%(reactor)s.install()
-
-from twisted.internet import reactor
-
-def threadedCall():
- print 'threaded call'
-
-reactor.callInThread(threadedCall)
-
-# Spin very briefly to try to give the thread a chance to run, if it
-# is going to. Is there a better way to achieve this behavior?
-for i in xrange(100):
- time.sleep(0.0)
-"""
-
-
-class ThreadStartupProcessProtocol(protocol.ProcessProtocol):
- def __init__(self, finished):
- self.finished = finished
- self.out = []
- self.err = []
-
- def outReceived(self, out):
- self.out.append(out)
-
- def errReceived(self, err):
- self.err.append(err)
-
- def processEnded(self, reason):
- self.finished.callback((self.out, self.err, reason))
-
-
-
-class StartupBehaviorTestCase(unittest.TestCase):
- """
- Test cases for the behavior of the reactor threadpool near startup
- boundary conditions.
-
- In particular, this asserts that no threaded calls are attempted
- until the reactor starts up, that calls attempted before it starts
- are in fact executed once it has started, and that in both cases,
- the reactor properly cleans itself up (which is tested for
- somewhat implicitly, by requiring a child process be able to exit,
- something it cannot do unless the threadpool has been properly
- torn down).
- """
-
-
- def testCallBeforeStartupUnexecuted(self):
- progname = self.mktemp()
- progfile = file(progname, 'w')
- progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module__})
- progfile.close()
-
- def programFinished((out, err, reason)):
- if reason.check(error.ProcessTerminated):
- self.fail("Process did not exit cleanly (out: %s err: %s)" % (out, err))
-
- if err:
- log.msg("Unexpected output on standard error: %s" % (err,))
- self.failIf(out, "Expected no output, instead received:\n%s" % (out,))
-
- def programTimeout(err):
- err.trap(error.TimeoutError)
- proto.signalProcess('KILL')
- return err
-
- env = os.environ.copy()
- env['PYTHONPATH'] = os.pathsep.join(sys.path)
- d = defer.Deferred().addCallbacks(programFinished, programTimeout)
- proto = ThreadStartupProcessProtocol(d)
- reactor.spawnProcess(proto, sys.executable, ('python', progname), env)
- return d
-
-
-
-if interfaces.IReactorThreads(reactor, None) is None:
- for cls in (ReactorThreadsTestCase,
- DeferredResultTestCase,
- StartupBehaviorTestCase):
- cls.skip = "No thread support, nothing to test here."
-else:
- import threading
-
-if interfaces.IReactorProcess(reactor, None) is None:
- for cls in (StartupBehaviorTestCase,):
- cls.skip = "No process support, cannot run subprocess thread tests."
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_threadpool.py ('k') | third_party/twisted_8_1/twisted/test/test_timeoutqueue.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698