Index: tests/threading_utils_test.py |
diff --git a/tests/threading_utils_test.py b/tests/threading_utils_test.py |
index 723d4db4ac95e867994bdf2cae341f1e73d57359..e06dc65e2efb8237364e96c232792fb7f96a4570 100755 |
--- a/tests/threading_utils_test.py |
+++ b/tests/threading_utils_test.py |
@@ -266,6 +266,34 @@ class ThreadPoolTest(unittest.TestCase): |
actual = pool.join() |
self.assertEqual(['a', 'c', 'b'], actual) |
+ @timeout(2) |
+ def test_abort(self): |
+ # Trigger a ridiculous amount of tasks, and abort the remaining. |
+ with threading_utils.ThreadPool(2, 2, 0) as pool: |
+ # Allow 10 tasks to run initially. |
+ sem = threading.Semaphore(10) |
+ |
+ def grab_and_return(x): |
+ sem.acquire() |
+ return x |
+ |
+ for i in range(100): |
+ pool.add_task(0, grab_and_return, i) |
+ |
+ # Running at 11 would hang. |
+ results = [pool.get_one_result() for _ in xrange(10)] |
+ # At that point, there's 10 completed tasks and 2 tasks hanging, 88 |
+ # pending. |
+ self.assertEqual(88, pool.abort()) |
+ # Calling .join() before these 2 .release() would hang. |
+ sem.release() |
+ sem.release() |
+ results.extend(pool.join()) |
+ # The results *may* be out of order. Even if the calls are processed |
+ # strictly in FIFO mode, a thread may preempt another one when returning the |
+ # values. |
+ self.assertEqual(range(12), sorted(results)) |
+ |
class AutoRetryThreadPoolTest(unittest.TestCase): |
def test_bad_class(self): |