Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(431)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_threadpool.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 import pickle, time, weakref, gc
6
7 from twisted.trial import unittest, util
8 from twisted.python import threadable
9 from twisted.internet import reactor, interfaces
10
11 #
12 # See the end of this module for the remainder of the imports.
13 #
14
15 class Synchronization(object):
16 failures = 0
17
18 def __init__(self, N, waiting):
19 self.N = N
20 self.waiting = waiting
21 self.lock = threading.Lock()
22 self.runs = []
23
24 def run(self):
25 # This is the testy part: this is supposed to be invoked
26 # serially from multiple threads. If that is actually the
27 # case, we will never fail to acquire this lock. If it is
28 # *not* the case, we might get here while someone else is
29 # holding the lock.
30 if self.lock.acquire(False):
31 if not len(self.runs) % 5:
32 time.sleep(0.0002) # Constant selected based on
33 # empirical data to maximize the
34 # chance of a quick failure if this
35 # code is broken.
36 self.lock.release()
37 else:
38 self.failures += 1
39
40 # This is just the only way I can think of to wake up the test
41 # method. It doesn't actually have anything to do with the
42 # test.
43 self.lock.acquire()
44 self.runs.append(None)
45 if len(self.runs) == self.N:
46 self.waiting.release()
47 self.lock.release()
48
49 synchronized = ["run"]
50 threadable.synchronize(Synchronization)
51
52
53
54 class ThreadPoolTestCase(unittest.TestCase):
55 """
56 Test threadpools.
57 """
58
59 def test_threadCreationArguments(self):
60 """
61 Test that creating threads in the threadpool with application-level
62 objects as arguments doesn't results in those objects never being
63 freed, with the thread maintaining a reference to them as long as it
64 exists.
65 """
66 try:
67 tp = threadpool.ThreadPool(0, 1)
68 tp.start()
69
70 # Sanity check - no threads should have been started yet.
71 self.assertEqual(tp.threads, [])
72
73 # Here's our function
74 def worker(arg):
75 pass
76 # weakref need an object subclass
77 class Dumb(object):
78 pass
79 # And here's the unique object
80 unique = Dumb()
81
82 workerRef = weakref.ref(worker)
83 uniqueRef = weakref.ref(unique)
84
85 # Put some work in
86 tp.callInThread(worker, unique)
87
88 # Add an event to wait completion
89 event = threading.Event()
90 tp.callInThread(event.set)
91 event.wait(self.getTimeout())
92
93 del worker
94 del unique
95 gc.collect()
96 self.assertEquals(uniqueRef(), None)
97 self.assertEquals(workerRef(), None)
98 finally:
99 tp.stop()
100
101
102 def test_persistence(self):
103 """
104 Threadpools can be pickled and unpickled, which should preserve the
105 number of threads and other parameters.
106 """
107 tp = threadpool.ThreadPool(7, 20)
108 tp.start()
109
110 # XXX Sigh - race condition: start should return a Deferred
111 # which fires when all the workers it started have fully
112 # started up.
113 time.sleep(0.1)
114
115 self.assertEquals(len(tp.threads), 7)
116 self.assertEquals(tp.min, 7)
117 self.assertEquals(tp.max, 20)
118
119 # check that unpickled threadpool has same number of threads
120 s = pickle.dumps(tp)
121 tp2 = pickle.loads(s)
122 tp2.start()
123
124 # XXX As above
125 time.sleep(0.1)
126
127 self.assertEquals(len(tp2.threads), 7)
128 self.assertEquals(tp2.min, 7)
129 self.assertEquals(tp2.max, 20)
130
131 tp.stop()
132 tp2.stop()
133
134
135 def _waitForLock(self, lock):
136 for i in xrange(1000000):
137 if lock.acquire(False):
138 break
139 time.sleep(1e-5)
140 else:
141 self.fail("A long time passed without succeeding")
142
143
144 def _threadpoolTest(self, method):
145 """
146 Test synchronization of calls made with C{method}, which should be
147 one of the mecanisms of the threadpool to execute work in threads.
148 """
149 # This is a schizophrenic test: it seems to be trying to test
150 # both the callInThread()/dispatch() behavior of the ThreadPool as well
151 # as the serialization behavior of threadable.synchronize(). It
152 # would probably make more sense as two much simpler tests.
153 N = 10
154
155 tp = threadpool.ThreadPool()
156 tp.start()
157 try:
158 waiting = threading.Lock()
159 waiting.acquire()
160 actor = Synchronization(N, waiting)
161
162 for i in xrange(N):
163 method(tp, actor)
164
165 self._waitForLock(waiting)
166
167 self.failIf(actor.failures, "run() re-entered %d times" %
168 (actor.failures,))
169 finally:
170 tp.stop()
171
172
173 def test_dispatch(self):
174 """
175 Call C{_threadpoolTest} with C{dispatch}.
176 """
177 return self._threadpoolTest(
178 lambda tp, actor: tp.dispatch(actor, actor.run))
179
180 test_dispatch.suppress = [util.suppress(
181 message="dispatch\(\) is deprecated since Twisted 8.0, "
182 "use callInThread\(\) instead",
183 category=DeprecationWarning)]
184
185
186 def test_callInThread(self):
187 """
188 Call C{_threadpoolTest} with C{callInThread}.
189 """
190 return self._threadpoolTest(
191 lambda tp, actor: tp.callInThread(actor.run))
192
193
194 def test_existingWork(self):
195 """
196 Work added to the threadpool before its start should be executed once
197 the threadpool is started: this is ensured by trying to release a lock
198 previously acquired.
199 """
200 waiter = threading.Lock()
201 waiter.acquire()
202
203 tp = threadpool.ThreadPool(0, 1)
204 tp.callInThread(waiter.release) # before start()
205 tp.start()
206
207 try:
208 self._waitForLock(waiter)
209 finally:
210 tp.stop()
211
212
213 def test_dispatchDeprecation(self):
214 """
215 Test for the deprecation of the dispatch method.
216 """
217 tp = threadpool.ThreadPool()
218 tp.start()
219 def cb():
220 return tp.dispatch(None, lambda: None)
221 try:
222 self.assertWarns(DeprecationWarning,
223 "dispatch() is deprecated since Twisted 8.0, "
224 "use callInThread() instead",
225 __file__, cb)
226 finally:
227 tp.stop()
228
229
230 def test_dispatchWithCallbackDeprecation(self):
231 """
232 Test for the deprecation of the dispatchWithCallback method.
233 """
234 tp = threadpool.ThreadPool()
235 tp.start()
236 def cb():
237 return tp.dispatchWithCallback(
238 None,
239 lambda x: None,
240 lambda x: None,
241 lambda: None)
242 try:
243 self.assertWarns(DeprecationWarning,
244 "dispatchWithCallback() is deprecated since Twisted 8.0, "
245 "use twisted.internet.threads.deferToThread() instead.",
246 __file__, cb)
247 finally:
248 tp.stop()
249
250
251
252 class RaceConditionTestCase(unittest.TestCase):
253 def setUp(self):
254 self.event = threading.Event()
255 self.threadpool = threadpool.ThreadPool(0, 10)
256 self.threadpool.start()
257
258
259 def tearDown(self):
260 del self.event
261 self.threadpool.stop()
262 del self.threadpool
263
264
265 def test_synchronization(self):
266 """
267 Test a race condition: ensure that actions run in the pool synchronize
268 with actions run in the main thread.
269 """
270 timeout = self.getTimeout()
271 self.threadpool.callInThread(self.event.set)
272 self.event.wait(timeout)
273 self.event.clear()
274 for i in range(3):
275 self.threadpool.callInThread(self.event.wait)
276 self.threadpool.callInThread(self.event.set)
277 self.event.wait(timeout)
278 if not self.event.isSet():
279 self.event.set()
280 self.fail("Actions not synchronized")
281
282
283 def test_singleThread(self):
284 """
285 Test that the creation of new threads in the pool occurs only when
286 more jobs are added and all existing threads are occupied.
287 """
288 # Ensure no threads running
289 self.assertEquals(self.threadpool.workers, 0)
290 timeout = self.getTimeout()
291 for i in range(10):
292 self.threadpool.callInThread(self.event.set)
293 self.event.wait(timeout)
294 self.event.clear()
295
296 # Ensure there are very few threads running
297 self.failUnless(self.threadpool.workers <= 2)
298
299
300
301 if interfaces.IReactorThreads(reactor, None) is None:
302 for cls in ThreadPoolTestCase, RaceConditionTestCase:
303 setattr(cls, 'skip', "No thread support, nothing to test here")
304 else:
305 import threading
306 from twisted.python import threadpool
307
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_threadable.py ('k') | third_party/twisted_8_1/twisted/test/test_threads.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698