| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 import pickle, time, weakref, gc | |
| 6 | |
| 7 from twisted.trial import unittest, util | |
| 8 from twisted.python import threadable | |
| 9 from twisted.internet import reactor, interfaces | |
| 10 | |
| 11 # | |
| 12 # See the end of this module for the remainder of the imports. | |
| 13 # | |
| 14 | |
| 15 class Synchronization(object): | |
| 16 failures = 0 | |
| 17 | |
| 18 def __init__(self, N, waiting): | |
| 19 self.N = N | |
| 20 self.waiting = waiting | |
| 21 self.lock = threading.Lock() | |
| 22 self.runs = [] | |
| 23 | |
| 24 def run(self): | |
| 25 # This is the testy part: this is supposed to be invoked | |
| 26 # serially from multiple threads. If that is actually the | |
| 27 # case, we will never fail to acquire this lock. If it is | |
| 28 # *not* the case, we might get here while someone else is | |
| 29 # holding the lock. | |
| 30 if self.lock.acquire(False): | |
| 31 if not len(self.runs) % 5: | |
| 32 time.sleep(0.0002) # Constant selected based on | |
| 33 # empirical data to maximize the | |
| 34 # chance of a quick failure if this | |
| 35 # code is broken. | |
| 36 self.lock.release() | |
| 37 else: | |
| 38 self.failures += 1 | |
| 39 | |
| 40 # This is just the only way I can think of to wake up the test | |
| 41 # method. It doesn't actually have anything to do with the | |
| 42 # test. | |
| 43 self.lock.acquire() | |
| 44 self.runs.append(None) | |
| 45 if len(self.runs) == self.N: | |
| 46 self.waiting.release() | |
| 47 self.lock.release() | |
| 48 | |
| 49 synchronized = ["run"] | |
| 50 threadable.synchronize(Synchronization) | |
| 51 | |
| 52 | |
| 53 | |
| 54 class ThreadPoolTestCase(unittest.TestCase): | |
| 55 """ | |
| 56 Test threadpools. | |
| 57 """ | |
| 58 | |
| 59 def test_threadCreationArguments(self): | |
| 60 """ | |
| 61 Test that creating threads in the threadpool with application-level | |
| 62 objects as arguments doesn't results in those objects never being | |
| 63 freed, with the thread maintaining a reference to them as long as it | |
| 64 exists. | |
| 65 """ | |
| 66 try: | |
| 67 tp = threadpool.ThreadPool(0, 1) | |
| 68 tp.start() | |
| 69 | |
| 70 # Sanity check - no threads should have been started yet. | |
| 71 self.assertEqual(tp.threads, []) | |
| 72 | |
| 73 # Here's our function | |
| 74 def worker(arg): | |
| 75 pass | |
| 76 # weakref need an object subclass | |
| 77 class Dumb(object): | |
| 78 pass | |
| 79 # And here's the unique object | |
| 80 unique = Dumb() | |
| 81 | |
| 82 workerRef = weakref.ref(worker) | |
| 83 uniqueRef = weakref.ref(unique) | |
| 84 | |
| 85 # Put some work in | |
| 86 tp.callInThread(worker, unique) | |
| 87 | |
| 88 # Add an event to wait completion | |
| 89 event = threading.Event() | |
| 90 tp.callInThread(event.set) | |
| 91 event.wait(self.getTimeout()) | |
| 92 | |
| 93 del worker | |
| 94 del unique | |
| 95 gc.collect() | |
| 96 self.assertEquals(uniqueRef(), None) | |
| 97 self.assertEquals(workerRef(), None) | |
| 98 finally: | |
| 99 tp.stop() | |
| 100 | |
| 101 | |
| 102 def test_persistence(self): | |
| 103 """ | |
| 104 Threadpools can be pickled and unpickled, which should preserve the | |
| 105 number of threads and other parameters. | |
| 106 """ | |
| 107 tp = threadpool.ThreadPool(7, 20) | |
| 108 tp.start() | |
| 109 | |
| 110 # XXX Sigh - race condition: start should return a Deferred | |
| 111 # which fires when all the workers it started have fully | |
| 112 # started up. | |
| 113 time.sleep(0.1) | |
| 114 | |
| 115 self.assertEquals(len(tp.threads), 7) | |
| 116 self.assertEquals(tp.min, 7) | |
| 117 self.assertEquals(tp.max, 20) | |
| 118 | |
| 119 # check that unpickled threadpool has same number of threads | |
| 120 s = pickle.dumps(tp) | |
| 121 tp2 = pickle.loads(s) | |
| 122 tp2.start() | |
| 123 | |
| 124 # XXX As above | |
| 125 time.sleep(0.1) | |
| 126 | |
| 127 self.assertEquals(len(tp2.threads), 7) | |
| 128 self.assertEquals(tp2.min, 7) | |
| 129 self.assertEquals(tp2.max, 20) | |
| 130 | |
| 131 tp.stop() | |
| 132 tp2.stop() | |
| 133 | |
| 134 | |
| 135 def _waitForLock(self, lock): | |
| 136 for i in xrange(1000000): | |
| 137 if lock.acquire(False): | |
| 138 break | |
| 139 time.sleep(1e-5) | |
| 140 else: | |
| 141 self.fail("A long time passed without succeeding") | |
| 142 | |
| 143 | |
| 144 def _threadpoolTest(self, method): | |
| 145 """ | |
| 146 Test synchronization of calls made with C{method}, which should be | |
| 147 one of the mecanisms of the threadpool to execute work in threads. | |
| 148 """ | |
| 149 # This is a schizophrenic test: it seems to be trying to test | |
| 150 # both the callInThread()/dispatch() behavior of the ThreadPool as well | |
| 151 # as the serialization behavior of threadable.synchronize(). It | |
| 152 # would probably make more sense as two much simpler tests. | |
| 153 N = 10 | |
| 154 | |
| 155 tp = threadpool.ThreadPool() | |
| 156 tp.start() | |
| 157 try: | |
| 158 waiting = threading.Lock() | |
| 159 waiting.acquire() | |
| 160 actor = Synchronization(N, waiting) | |
| 161 | |
| 162 for i in xrange(N): | |
| 163 method(tp, actor) | |
| 164 | |
| 165 self._waitForLock(waiting) | |
| 166 | |
| 167 self.failIf(actor.failures, "run() re-entered %d times" % | |
| 168 (actor.failures,)) | |
| 169 finally: | |
| 170 tp.stop() | |
| 171 | |
| 172 | |
| 173 def test_dispatch(self): | |
| 174 """ | |
| 175 Call C{_threadpoolTest} with C{dispatch}. | |
| 176 """ | |
| 177 return self._threadpoolTest( | |
| 178 lambda tp, actor: tp.dispatch(actor, actor.run)) | |
| 179 | |
| 180 test_dispatch.suppress = [util.suppress( | |
| 181 message="dispatch\(\) is deprecated since Twisted 8.0, " | |
| 182 "use callInThread\(\) instead", | |
| 183 category=DeprecationWarning)] | |
| 184 | |
| 185 | |
| 186 def test_callInThread(self): | |
| 187 """ | |
| 188 Call C{_threadpoolTest} with C{callInThread}. | |
| 189 """ | |
| 190 return self._threadpoolTest( | |
| 191 lambda tp, actor: tp.callInThread(actor.run)) | |
| 192 | |
| 193 | |
| 194 def test_existingWork(self): | |
| 195 """ | |
| 196 Work added to the threadpool before its start should be executed once | |
| 197 the threadpool is started: this is ensured by trying to release a lock | |
| 198 previously acquired. | |
| 199 """ | |
| 200 waiter = threading.Lock() | |
| 201 waiter.acquire() | |
| 202 | |
| 203 tp = threadpool.ThreadPool(0, 1) | |
| 204 tp.callInThread(waiter.release) # before start() | |
| 205 tp.start() | |
| 206 | |
| 207 try: | |
| 208 self._waitForLock(waiter) | |
| 209 finally: | |
| 210 tp.stop() | |
| 211 | |
| 212 | |
| 213 def test_dispatchDeprecation(self): | |
| 214 """ | |
| 215 Test for the deprecation of the dispatch method. | |
| 216 """ | |
| 217 tp = threadpool.ThreadPool() | |
| 218 tp.start() | |
| 219 def cb(): | |
| 220 return tp.dispatch(None, lambda: None) | |
| 221 try: | |
| 222 self.assertWarns(DeprecationWarning, | |
| 223 "dispatch() is deprecated since Twisted 8.0, " | |
| 224 "use callInThread() instead", | |
| 225 __file__, cb) | |
| 226 finally: | |
| 227 tp.stop() | |
| 228 | |
| 229 | |
| 230 def test_dispatchWithCallbackDeprecation(self): | |
| 231 """ | |
| 232 Test for the deprecation of the dispatchWithCallback method. | |
| 233 """ | |
| 234 tp = threadpool.ThreadPool() | |
| 235 tp.start() | |
| 236 def cb(): | |
| 237 return tp.dispatchWithCallback( | |
| 238 None, | |
| 239 lambda x: None, | |
| 240 lambda x: None, | |
| 241 lambda: None) | |
| 242 try: | |
| 243 self.assertWarns(DeprecationWarning, | |
| 244 "dispatchWithCallback() is deprecated since Twisted 8.0, " | |
| 245 "use twisted.internet.threads.deferToThread() instead.", | |
| 246 __file__, cb) | |
| 247 finally: | |
| 248 tp.stop() | |
| 249 | |
| 250 | |
| 251 | |
| 252 class RaceConditionTestCase(unittest.TestCase): | |
| 253 def setUp(self): | |
| 254 self.event = threading.Event() | |
| 255 self.threadpool = threadpool.ThreadPool(0, 10) | |
| 256 self.threadpool.start() | |
| 257 | |
| 258 | |
| 259 def tearDown(self): | |
| 260 del self.event | |
| 261 self.threadpool.stop() | |
| 262 del self.threadpool | |
| 263 | |
| 264 | |
| 265 def test_synchronization(self): | |
| 266 """ | |
| 267 Test a race condition: ensure that actions run in the pool synchronize | |
| 268 with actions run in the main thread. | |
| 269 """ | |
| 270 timeout = self.getTimeout() | |
| 271 self.threadpool.callInThread(self.event.set) | |
| 272 self.event.wait(timeout) | |
| 273 self.event.clear() | |
| 274 for i in range(3): | |
| 275 self.threadpool.callInThread(self.event.wait) | |
| 276 self.threadpool.callInThread(self.event.set) | |
| 277 self.event.wait(timeout) | |
| 278 if not self.event.isSet(): | |
| 279 self.event.set() | |
| 280 self.fail("Actions not synchronized") | |
| 281 | |
| 282 | |
| 283 def test_singleThread(self): | |
| 284 """ | |
| 285 Test that the creation of new threads in the pool occurs only when | |
| 286 more jobs are added and all existing threads are occupied. | |
| 287 """ | |
| 288 # Ensure no threads running | |
| 289 self.assertEquals(self.threadpool.workers, 0) | |
| 290 timeout = self.getTimeout() | |
| 291 for i in range(10): | |
| 292 self.threadpool.callInThread(self.event.set) | |
| 293 self.event.wait(timeout) | |
| 294 self.event.clear() | |
| 295 | |
| 296 # Ensure there are very few threads running | |
| 297 self.failUnless(self.threadpool.workers <= 2) | |
| 298 | |
| 299 | |
| 300 | |
| 301 if interfaces.IReactorThreads(reactor, None) is None: | |
| 302 for cls in ThreadPoolTestCase, RaceConditionTestCase: | |
| 303 setattr(cls, 'skip', "No thread support, nothing to test here") | |
| 304 else: | |
| 305 import threading | |
| 306 from twisted.python import threadpool | |
| 307 | |
| OLD | NEW |