OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2013 The Chromium Authors. All rights reserved. | 2 # Copyright 2013 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 # Lambda may not be necessary. | 6 # Lambda may not be necessary. |
7 # pylint: disable=W0108 | 7 # pylint: disable=W0108 |
8 | 8 |
9 import functools | 9 import functools |
10 import logging | 10 import logging |
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
259 return x | 259 return x |
260 | 260 |
261 with lock: | 261 with lock: |
262 pool.add_task(0, wait_and_return, 'a') | 262 pool.add_task(0, wait_and_return, 'a') |
263 pool.add_task(2, return_x, 'b') | 263 pool.add_task(2, return_x, 'b') |
264 pool.add_task(1, return_x, 'c') | 264 pool.add_task(1, return_x, 'c') |
265 | 265 |
266 actual = pool.join() | 266 actual = pool.join() |
267 self.assertEqual(['a', 'c', 'b'], actual) | 267 self.assertEqual(['a', 'c', 'b'], actual) |
268 | 268 |
| 269 @timeout(2) |
| 270 def test_abort(self): |
| 271 # Trigger a ridiculous amount of tasks, and abort the remaining. |
| 272 with threading_utils.ThreadPool(2, 2, 0) as pool: |
| 273 # Allow 10 tasks to run initially. |
| 274 sem = threading.Semaphore(10) |
| 275 |
| 276 def grab_and_return(x): |
| 277 sem.acquire() |
| 278 return x |
| 279 |
| 280 for i in range(100): |
| 281 pool.add_task(0, grab_and_return, i) |
| 282 |
| 283 # Running at 11 would hang. |
| 284 results = [pool.get_one_result() for _ in xrange(10)] |
| 285 # At that point, there's 10 completed tasks and 2 tasks hanging, 88 |
| 286 # pending. |
| 287 self.assertEqual(88, pool.abort()) |
| 288 # Calling .join() before these 2 .release() would hang. |
| 289 sem.release() |
| 290 sem.release() |
| 291 results.extend(pool.join()) |
| 292 # The results *may* be out of order. Even if the calls are processed |
| 293 # strictly in FIFO mode, a thread may preempt another one when returning the |
| 294 # values. |
| 295 self.assertEqual(range(12), sorted(results)) |
| 296 |
269 | 297 |
270 class AutoRetryThreadPoolTest(unittest.TestCase): | 298 class AutoRetryThreadPoolTest(unittest.TestCase): |
271 def test_bad_class(self): | 299 def test_bad_class(self): |
272 exceptions = [AutoRetryThreadPoolTest] | 300 exceptions = [AutoRetryThreadPoolTest] |
273 with self.assertRaises(AssertionError): | 301 with self.assertRaises(AssertionError): |
274 threading_utils.AutoRetryThreadPool(exceptions, 1, 0, 1, 0) | 302 threading_utils.AutoRetryThreadPool(exceptions, 1, 0, 1, 0) |
275 | 303 |
276 def test_no_exception(self): | 304 def test_no_exception(self): |
277 with self.assertRaises(AssertionError): | 305 with self.assertRaises(AssertionError): |
278 threading_utils.AutoRetryThreadPool([], 1, 0, 1, 0) | 306 threading_utils.AutoRetryThreadPool([], 1, 0, 1, 0) |
(...skipping 224 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
503 raise CustomError() | 531 raise CustomError() |
504 tp.add_task(0, channel.wrap_task(task_func)) | 532 tp.add_task(0, channel.wrap_task(task_func)) |
505 with self.assertRaises(CustomError): | 533 with self.assertRaises(CustomError): |
506 channel.pull() | 534 channel.pull() |
507 | 535 |
508 | 536 |
509 if __name__ == '__main__': | 537 if __name__ == '__main__': |
510 VERBOSE = '-v' in sys.argv | 538 VERBOSE = '-v' in sys.argv |
511 logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) | 539 logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) |
512 unittest.main() | 540 unittest.main() |
OLD | NEW |