Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(130)

Unified Diff: third_party/twisted_8_1/twisted/test/test_threadpool.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_threadpool.py
diff --git a/third_party/twisted_8_1/twisted/test/test_threadpool.py b/third_party/twisted_8_1/twisted/test/test_threadpool.py
deleted file mode 100644
index f0b57b2eba6967e94e932289a8c01a751c0b160b..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_threadpool.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-import pickle, time, weakref, gc
-
-from twisted.trial import unittest, util
-from twisted.python import threadable
-from twisted.internet import reactor, interfaces
-
-#
-# See the end of this module for the remainder of the imports.
-#
-
-class Synchronization(object):
- failures = 0
-
- def __init__(self, N, waiting):
- self.N = N
- self.waiting = waiting
- self.lock = threading.Lock()
- self.runs = []
-
- def run(self):
- # This is the testy part: this is supposed to be invoked
- # serially from multiple threads. If that is actually the
- # case, we will never fail to acquire this lock. If it is
- # *not* the case, we might get here while someone else is
- # holding the lock.
- if self.lock.acquire(False):
- if not len(self.runs) % 5:
- time.sleep(0.0002) # Constant selected based on
- # empirical data to maximize the
- # chance of a quick failure if this
- # code is broken.
- self.lock.release()
- else:
- self.failures += 1
-
- # This is just the only way I can think of to wake up the test
- # method. It doesn't actually have anything to do with the
- # test.
- self.lock.acquire()
- self.runs.append(None)
- if len(self.runs) == self.N:
- self.waiting.release()
- self.lock.release()
-
- synchronized = ["run"]
-threadable.synchronize(Synchronization)
-
-
-
-class ThreadPoolTestCase(unittest.TestCase):
- """
- Test threadpools.
- """
-
- def test_threadCreationArguments(self):
- """
- Test that creating threads in the threadpool with application-level
- objects as arguments doesn't results in those objects never being
- freed, with the thread maintaining a reference to them as long as it
- exists.
- """
- try:
- tp = threadpool.ThreadPool(0, 1)
- tp.start()
-
- # Sanity check - no threads should have been started yet.
- self.assertEqual(tp.threads, [])
-
- # Here's our function
- def worker(arg):
- pass
- # weakref need an object subclass
- class Dumb(object):
- pass
- # And here's the unique object
- unique = Dumb()
-
- workerRef = weakref.ref(worker)
- uniqueRef = weakref.ref(unique)
-
- # Put some work in
- tp.callInThread(worker, unique)
-
- # Add an event to wait completion
- event = threading.Event()
- tp.callInThread(event.set)
- event.wait(self.getTimeout())
-
- del worker
- del unique
- gc.collect()
- self.assertEquals(uniqueRef(), None)
- self.assertEquals(workerRef(), None)
- finally:
- tp.stop()
-
-
- def test_persistence(self):
- """
- Threadpools can be pickled and unpickled, which should preserve the
- number of threads and other parameters.
- """
- tp = threadpool.ThreadPool(7, 20)
- tp.start()
-
- # XXX Sigh - race condition: start should return a Deferred
- # which fires when all the workers it started have fully
- # started up.
- time.sleep(0.1)
-
- self.assertEquals(len(tp.threads), 7)
- self.assertEquals(tp.min, 7)
- self.assertEquals(tp.max, 20)
-
- # check that unpickled threadpool has same number of threads
- s = pickle.dumps(tp)
- tp2 = pickle.loads(s)
- tp2.start()
-
- # XXX As above
- time.sleep(0.1)
-
- self.assertEquals(len(tp2.threads), 7)
- self.assertEquals(tp2.min, 7)
- self.assertEquals(tp2.max, 20)
-
- tp.stop()
- tp2.stop()
-
-
- def _waitForLock(self, lock):
- for i in xrange(1000000):
- if lock.acquire(False):
- break
- time.sleep(1e-5)
- else:
- self.fail("A long time passed without succeeding")
-
-
- def _threadpoolTest(self, method):
- """
- Test synchronization of calls made with C{method}, which should be
- one of the mecanisms of the threadpool to execute work in threads.
- """
- # This is a schizophrenic test: it seems to be trying to test
- # both the callInThread()/dispatch() behavior of the ThreadPool as well
- # as the serialization behavior of threadable.synchronize(). It
- # would probably make more sense as two much simpler tests.
- N = 10
-
- tp = threadpool.ThreadPool()
- tp.start()
- try:
- waiting = threading.Lock()
- waiting.acquire()
- actor = Synchronization(N, waiting)
-
- for i in xrange(N):
- method(tp, actor)
-
- self._waitForLock(waiting)
-
- self.failIf(actor.failures, "run() re-entered %d times" %
- (actor.failures,))
- finally:
- tp.stop()
-
-
- def test_dispatch(self):
- """
- Call C{_threadpoolTest} with C{dispatch}.
- """
- return self._threadpoolTest(
- lambda tp, actor: tp.dispatch(actor, actor.run))
-
- test_dispatch.suppress = [util.suppress(
- message="dispatch\(\) is deprecated since Twisted 8.0, "
- "use callInThread\(\) instead",
- category=DeprecationWarning)]
-
-
- def test_callInThread(self):
- """
- Call C{_threadpoolTest} with C{callInThread}.
- """
- return self._threadpoolTest(
- lambda tp, actor: tp.callInThread(actor.run))
-
-
- def test_existingWork(self):
- """
- Work added to the threadpool before its start should be executed once
- the threadpool is started: this is ensured by trying to release a lock
- previously acquired.
- """
- waiter = threading.Lock()
- waiter.acquire()
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThread(waiter.release) # before start()
- tp.start()
-
- try:
- self._waitForLock(waiter)
- finally:
- tp.stop()
-
-
- def test_dispatchDeprecation(self):
- """
- Test for the deprecation of the dispatch method.
- """
- tp = threadpool.ThreadPool()
- tp.start()
- def cb():
- return tp.dispatch(None, lambda: None)
- try:
- self.assertWarns(DeprecationWarning,
- "dispatch() is deprecated since Twisted 8.0, "
- "use callInThread() instead",
- __file__, cb)
- finally:
- tp.stop()
-
-
- def test_dispatchWithCallbackDeprecation(self):
- """
- Test for the deprecation of the dispatchWithCallback method.
- """
- tp = threadpool.ThreadPool()
- tp.start()
- def cb():
- return tp.dispatchWithCallback(
- None,
- lambda x: None,
- lambda x: None,
- lambda: None)
- try:
- self.assertWarns(DeprecationWarning,
- "dispatchWithCallback() is deprecated since Twisted 8.0, "
- "use twisted.internet.threads.deferToThread() instead.",
- __file__, cb)
- finally:
- tp.stop()
-
-
-
-class RaceConditionTestCase(unittest.TestCase):
- def setUp(self):
- self.event = threading.Event()
- self.threadpool = threadpool.ThreadPool(0, 10)
- self.threadpool.start()
-
-
- def tearDown(self):
- del self.event
- self.threadpool.stop()
- del self.threadpool
-
-
- def test_synchronization(self):
- """
- Test a race condition: ensure that actions run in the pool synchronize
- with actions run in the main thread.
- """
- timeout = self.getTimeout()
- self.threadpool.callInThread(self.event.set)
- self.event.wait(timeout)
- self.event.clear()
- for i in range(3):
- self.threadpool.callInThread(self.event.wait)
- self.threadpool.callInThread(self.event.set)
- self.event.wait(timeout)
- if not self.event.isSet():
- self.event.set()
- self.fail("Actions not synchronized")
-
-
- def test_singleThread(self):
- """
- Test that the creation of new threads in the pool occurs only when
- more jobs are added and all existing threads are occupied.
- """
- # Ensure no threads running
- self.assertEquals(self.threadpool.workers, 0)
- timeout = self.getTimeout()
- for i in range(10):
- self.threadpool.callInThread(self.event.set)
- self.event.wait(timeout)
- self.event.clear()
-
- # Ensure there are very few threads running
- self.failUnless(self.threadpool.workers <= 2)
-
-
-
-if interfaces.IReactorThreads(reactor, None) is None:
- for cls in ThreadPoolTestCase, RaceConditionTestCase:
- setattr(cls, 'skip', "No thread support, nothing to test here")
-else:
- import threading
- from twisted.python import threadpool
-
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_threadable.py ('k') | third_party/twisted_8_1/twisted/test/test_threads.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698