Index: third_party/twisted_8_1/twisted/test/test_threadpool.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_threadpool.py b/third_party/twisted_8_1/twisted/test/test_threadpool.py |
deleted file mode 100644 |
index f0b57b2eba6967e94e932289a8c01a751c0b160b..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_threadpool.py |
+++ /dev/null |
@@ -1,307 +0,0 @@ |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-import pickle, time, weakref, gc |
- |
-from twisted.trial import unittest, util |
-from twisted.python import threadable |
-from twisted.internet import reactor, interfaces |
- |
-# |
-# See the end of this module for the remainder of the imports. |
-# |
- |
-class Synchronization(object): |
- failures = 0 |
- |
- def __init__(self, N, waiting): |
- self.N = N |
- self.waiting = waiting |
- self.lock = threading.Lock() |
- self.runs = [] |
- |
- def run(self): |
- # This is the testy part: this is supposed to be invoked |
- # serially from multiple threads. If that is actually the |
- # case, we will never fail to acquire this lock. If it is |
- # *not* the case, we might get here while someone else is |
- # holding the lock. |
- if self.lock.acquire(False): |
- if not len(self.runs) % 5: |
- time.sleep(0.0002) # Constant selected based on |
- # empirical data to maximize the |
- # chance of a quick failure if this |
- # code is broken. |
- self.lock.release() |
- else: |
- self.failures += 1 |
- |
- # This is just the only way I can think of to wake up the test |
- # method. It doesn't actually have anything to do with the |
- # test. |
- self.lock.acquire() |
- self.runs.append(None) |
- if len(self.runs) == self.N: |
- self.waiting.release() |
- self.lock.release() |
- |
- synchronized = ["run"] |
-threadable.synchronize(Synchronization) |
- |
- |
- |
-class ThreadPoolTestCase(unittest.TestCase): |
- """ |
- Test threadpools. |
- """ |
- |
- def test_threadCreationArguments(self): |
- """ |
- Test that creating threads in the threadpool with application-level |
- objects as arguments doesn't results in those objects never being |
- freed, with the thread maintaining a reference to them as long as it |
- exists. |
- """ |
- try: |
- tp = threadpool.ThreadPool(0, 1) |
- tp.start() |
- |
- # Sanity check - no threads should have been started yet. |
- self.assertEqual(tp.threads, []) |
- |
- # Here's our function |
- def worker(arg): |
- pass |
- # weakref need an object subclass |
- class Dumb(object): |
- pass |
- # And here's the unique object |
- unique = Dumb() |
- |
- workerRef = weakref.ref(worker) |
- uniqueRef = weakref.ref(unique) |
- |
- # Put some work in |
- tp.callInThread(worker, unique) |
- |
- # Add an event to wait completion |
- event = threading.Event() |
- tp.callInThread(event.set) |
- event.wait(self.getTimeout()) |
- |
- del worker |
- del unique |
- gc.collect() |
- self.assertEquals(uniqueRef(), None) |
- self.assertEquals(workerRef(), None) |
- finally: |
- tp.stop() |
- |
- |
- def test_persistence(self): |
- """ |
- Threadpools can be pickled and unpickled, which should preserve the |
- number of threads and other parameters. |
- """ |
- tp = threadpool.ThreadPool(7, 20) |
- tp.start() |
- |
- # XXX Sigh - race condition: start should return a Deferred |
- # which fires when all the workers it started have fully |
- # started up. |
- time.sleep(0.1) |
- |
- self.assertEquals(len(tp.threads), 7) |
- self.assertEquals(tp.min, 7) |
- self.assertEquals(tp.max, 20) |
- |
- # check that unpickled threadpool has same number of threads |
- s = pickle.dumps(tp) |
- tp2 = pickle.loads(s) |
- tp2.start() |
- |
- # XXX As above |
- time.sleep(0.1) |
- |
- self.assertEquals(len(tp2.threads), 7) |
- self.assertEquals(tp2.min, 7) |
- self.assertEquals(tp2.max, 20) |
- |
- tp.stop() |
- tp2.stop() |
- |
- |
- def _waitForLock(self, lock): |
- for i in xrange(1000000): |
- if lock.acquire(False): |
- break |
- time.sleep(1e-5) |
- else: |
- self.fail("A long time passed without succeeding") |
- |
- |
- def _threadpoolTest(self, method): |
- """ |
- Test synchronization of calls made with C{method}, which should be |
- one of the mecanisms of the threadpool to execute work in threads. |
- """ |
- # This is a schizophrenic test: it seems to be trying to test |
- # both the callInThread()/dispatch() behavior of the ThreadPool as well |
- # as the serialization behavior of threadable.synchronize(). It |
- # would probably make more sense as two much simpler tests. |
- N = 10 |
- |
- tp = threadpool.ThreadPool() |
- tp.start() |
- try: |
- waiting = threading.Lock() |
- waiting.acquire() |
- actor = Synchronization(N, waiting) |
- |
- for i in xrange(N): |
- method(tp, actor) |
- |
- self._waitForLock(waiting) |
- |
- self.failIf(actor.failures, "run() re-entered %d times" % |
- (actor.failures,)) |
- finally: |
- tp.stop() |
- |
- |
- def test_dispatch(self): |
- """ |
- Call C{_threadpoolTest} with C{dispatch}. |
- """ |
- return self._threadpoolTest( |
- lambda tp, actor: tp.dispatch(actor, actor.run)) |
- |
- test_dispatch.suppress = [util.suppress( |
- message="dispatch\(\) is deprecated since Twisted 8.0, " |
- "use callInThread\(\) instead", |
- category=DeprecationWarning)] |
- |
- |
- def test_callInThread(self): |
- """ |
- Call C{_threadpoolTest} with C{callInThread}. |
- """ |
- return self._threadpoolTest( |
- lambda tp, actor: tp.callInThread(actor.run)) |
- |
- |
- def test_existingWork(self): |
- """ |
- Work added to the threadpool before its start should be executed once |
- the threadpool is started: this is ensured by trying to release a lock |
- previously acquired. |
- """ |
- waiter = threading.Lock() |
- waiter.acquire() |
- |
- tp = threadpool.ThreadPool(0, 1) |
- tp.callInThread(waiter.release) # before start() |
- tp.start() |
- |
- try: |
- self._waitForLock(waiter) |
- finally: |
- tp.stop() |
- |
- |
- def test_dispatchDeprecation(self): |
- """ |
- Test for the deprecation of the dispatch method. |
- """ |
- tp = threadpool.ThreadPool() |
- tp.start() |
- def cb(): |
- return tp.dispatch(None, lambda: None) |
- try: |
- self.assertWarns(DeprecationWarning, |
- "dispatch() is deprecated since Twisted 8.0, " |
- "use callInThread() instead", |
- __file__, cb) |
- finally: |
- tp.stop() |
- |
- |
- def test_dispatchWithCallbackDeprecation(self): |
- """ |
- Test for the deprecation of the dispatchWithCallback method. |
- """ |
- tp = threadpool.ThreadPool() |
- tp.start() |
- def cb(): |
- return tp.dispatchWithCallback( |
- None, |
- lambda x: None, |
- lambda x: None, |
- lambda: None) |
- try: |
- self.assertWarns(DeprecationWarning, |
- "dispatchWithCallback() is deprecated since Twisted 8.0, " |
- "use twisted.internet.threads.deferToThread() instead.", |
- __file__, cb) |
- finally: |
- tp.stop() |
- |
- |
- |
-class RaceConditionTestCase(unittest.TestCase): |
- def setUp(self): |
- self.event = threading.Event() |
- self.threadpool = threadpool.ThreadPool(0, 10) |
- self.threadpool.start() |
- |
- |
- def tearDown(self): |
- del self.event |
- self.threadpool.stop() |
- del self.threadpool |
- |
- |
- def test_synchronization(self): |
- """ |
- Test a race condition: ensure that actions run in the pool synchronize |
- with actions run in the main thread. |
- """ |
- timeout = self.getTimeout() |
- self.threadpool.callInThread(self.event.set) |
- self.event.wait(timeout) |
- self.event.clear() |
- for i in range(3): |
- self.threadpool.callInThread(self.event.wait) |
- self.threadpool.callInThread(self.event.set) |
- self.event.wait(timeout) |
- if not self.event.isSet(): |
- self.event.set() |
- self.fail("Actions not synchronized") |
- |
- |
- def test_singleThread(self): |
- """ |
- Test that the creation of new threads in the pool occurs only when |
- more jobs are added and all existing threads are occupied. |
- """ |
- # Ensure no threads running |
- self.assertEquals(self.threadpool.workers, 0) |
- timeout = self.getTimeout() |
- for i in range(10): |
- self.threadpool.callInThread(self.event.set) |
- self.event.wait(timeout) |
- self.event.clear() |
- |
- # Ensure there are very few threads running |
- self.failUnless(self.threadpool.workers <= 2) |
- |
- |
- |
-if interfaces.IReactorThreads(reactor, None) is None: |
- for cls in ThreadPoolTestCase, RaceConditionTestCase: |
- setattr(cls, 'skip', "No thread support, nothing to test here") |
-else: |
- import threading |
- from twisted.python import threadpool |
- |