Index: swarm_client/tests/threading_utils_test.py |
=================================================================== |
--- swarm_client/tests/threading_utils_test.py (revision 235167) |
+++ swarm_client/tests/threading_utils_test.py (working copy) |
@@ -1,540 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright 2013 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-# Lambda may not be necessary. |
-# pylint: disable=W0108 |
- |
-import functools |
-import logging |
-import os |
-import signal |
-import sys |
-import threading |
-import time |
-import unittest |
- |
-ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
-sys.path.insert(0, ROOT_DIR) |
- |
-from utils import threading_utils |
- |
- |
-def timeout(max_running_time): |
- """Test method decorator that fails the test if it executes longer |
- than |max_running_time| seconds. |
- |
- It exists to terminate tests in case of deadlocks. There's a high chance that |
- process is broken after such timeout (due to hanging deadlocked threads that |
- can own some shared resources). But failing early (maybe not in a cleanest |
- way) due to timeout is generally better than hanging indefinitely. |
- |
- |max_running_time| should be an order of magnitude (or even two orders) larger |
- than the expected run time of the test to compensate for slow machine, high |
- CPU utilization by some other processes, etc. |
- |
- Can not be nested. |
- |
- Noop on windows (since win32 doesn't support signal.setitimer). |
- """ |
- if sys.platform == 'win32': |
- return lambda method: method |
- |
- def decorator(method): |
- @functools.wraps(method) |
- def wrapper(self, *args, **kwargs): |
- signal.signal(signal.SIGALRM, lambda *_args: self.fail('Timeout')) |
- signal.setitimer(signal.ITIMER_REAL, max_running_time) |
- try: |
- return method(self, *args, **kwargs) |
- finally: |
- signal.signal(signal.SIGALRM, signal.SIG_DFL) |
- signal.setitimer(signal.ITIMER_REAL, 0) |
- return wrapper |
- |
- return decorator |
- |
- |
-class ThreadPoolTest(unittest.TestCase): |
- MIN_THREADS = 0 |
- MAX_THREADS = 32 |
- |
- # Append custom assert messages to default ones (works with python >= 2.7). |
- longMessage = True |
- |
- @staticmethod |
- def sleep_task(duration=0.01): |
- """Returns function that sleeps |duration| sec and returns its argument.""" |
- def task(arg): |
- time.sleep(duration) |
- return arg |
- return task |
- |
- def retrying_sleep_task(self, duration=0.01): |
- """Returns function that adds sleep_task to the thread pool.""" |
- def task(arg): |
- self.thread_pool.add_task(0, self.sleep_task(duration), arg) |
- return task |
- |
- @staticmethod |
- def none_task(): |
- """Returns function that returns None.""" |
- return lambda _arg: None |
- |
- def setUp(self): |
- super(ThreadPoolTest, self).setUp() |
- self.thread_pool = threading_utils.ThreadPool( |
- self.MIN_THREADS, self.MAX_THREADS, 0) |
- |
- @timeout(1) |
- def tearDown(self): |
- super(ThreadPoolTest, self).tearDown() |
- self.thread_pool.close() |
- |
- def get_results_via_join(self, _expected): |
- return self.thread_pool.join() |
- |
- def get_results_via_get_one_result(self, expected): |
- return [self.thread_pool.get_one_result() for _ in expected] |
- |
- def get_results_via_iter_results(self, _expected): |
- return list(self.thread_pool.iter_results()) |
- |
- def run_results_test(self, task, results_getter, args=None, expected=None): |
- """Template function for tests checking that pool returns all results. |
- |
- Will add multiple instances of |task| to the thread pool, then call |
- |results_getter| to get back all results and compare them to expected ones. |
- """ |
- args = range(0, 100) if args is None else args |
- expected = args if expected is None else expected |
- msg = 'Using \'%s\' to get results.' % (results_getter.__name__,) |
- |
- for i in args: |
- self.thread_pool.add_task(0, task, i) |
- results = results_getter(expected) |
- |
- # Check that got all results back (exact same set, no duplicates). |
- self.assertEqual(set(expected), set(results), msg) |
- self.assertEqual(len(expected), len(results), msg) |
- |
- # Queue is empty, result request should fail. |
- with self.assertRaises(threading_utils.ThreadPoolEmpty): |
- self.thread_pool.get_one_result() |
- |
- @timeout(1) |
- def test_get_one_result_ok(self): |
- self.thread_pool.add_task(0, lambda: 'OK') |
- self.assertEqual(self.thread_pool.get_one_result(), 'OK') |
- |
- @timeout(1) |
- def test_get_one_result_fail(self): |
- # No tasks added -> get_one_result raises an exception. |
- with self.assertRaises(threading_utils.ThreadPoolEmpty): |
- self.thread_pool.get_one_result() |
- |
- @timeout(5) |
- def test_join(self): |
- self.run_results_test(self.sleep_task(), |
- self.get_results_via_join) |
- |
- @timeout(5) |
- def test_get_one_result(self): |
- self.run_results_test(self.sleep_task(), |
- self.get_results_via_get_one_result) |
- |
- @timeout(5) |
- def test_iter_results(self): |
- self.run_results_test(self.sleep_task(), |
- self.get_results_via_iter_results) |
- |
- @timeout(5) |
- def test_retry_and_join(self): |
- self.run_results_test(self.retrying_sleep_task(), |
- self.get_results_via_join) |
- |
- @timeout(5) |
- def test_retry_and_get_one_result(self): |
- self.run_results_test(self.retrying_sleep_task(), |
- self.get_results_via_get_one_result) |
- |
- @timeout(5) |
- def test_retry_and_iter_results(self): |
- self.run_results_test(self.retrying_sleep_task(), |
- self.get_results_via_iter_results) |
- |
- @timeout(5) |
- def test_none_task_and_join(self): |
- self.run_results_test(self.none_task(), |
- self.get_results_via_join, |
- expected=[]) |
- |
- @timeout(5) |
- def test_none_task_and_get_one_result(self): |
- self.thread_pool.add_task(0, self.none_task(), 0) |
- with self.assertRaises(threading_utils.ThreadPoolEmpty): |
- self.thread_pool.get_one_result() |
- |
- @timeout(5) |
- def test_none_task_and_and_iter_results(self): |
- self.run_results_test(self.none_task(), |
- self.get_results_via_iter_results, |
- expected=[]) |
- |
- @timeout(5) |
- def test_generator_task(self): |
- MULTIPLIER = 1000 |
- COUNT = 10 |
- |
- # Generator that yields [i * MULTIPLIER, i * MULTIPLIER + COUNT). |
- def generator_task(i): |
- for j in xrange(COUNT): |
- time.sleep(0.001) |
- yield i * MULTIPLIER + j |
- |
- # Arguments for tasks and expected results. |
- args = range(0, 10) |
- expected = [i * MULTIPLIER + j for i in args for j in xrange(COUNT)] |
- |
- # Test all possible ways to pull results from the thread pool. |
- getters = (self.get_results_via_join, |
- self.get_results_via_iter_results, |
- self.get_results_via_get_one_result,) |
- for results_getter in getters: |
- self.run_results_test(generator_task, results_getter, args, expected) |
- |
- @timeout(5) |
- def test_concurrent_iter_results(self): |
- def poller_proc(result): |
- result.extend(self.thread_pool.iter_results()) |
- |
- args = range(0, 100) |
- for i in args: |
- self.thread_pool.add_task(0, self.sleep_task(), i) |
- |
- # Start a bunch of threads, all calling iter_results in parallel. |
- pollers = [] |
- for _ in xrange(0, 4): |
- result = [] |
- poller = threading.Thread(target=poller_proc, args=(result,)) |
- poller.start() |
- pollers.append((poller, result)) |
- |
- # Collects results from all polling threads. |
- all_results = [] |
- for poller, results in pollers: |
- poller.join() |
- all_results.extend(results) |
- |
- # Check that got all results back (exact same set, no duplicates). |
- self.assertEqual(set(args), set(all_results)) |
- self.assertEqual(len(args), len(all_results)) |
- |
- @timeout(1) |
- def test_adding_tasks_after_close(self): |
- pool = threading_utils.ThreadPool(1, 1, 0) |
- pool.add_task(0, lambda: None) |
- pool.close() |
- with self.assertRaises(threading_utils.ThreadPoolClosed): |
- pool.add_task(0, lambda: None) |
- |
- @timeout(1) |
- def test_double_close(self): |
- pool = threading_utils.ThreadPool(1, 1, 0) |
- pool.close() |
- with self.assertRaises(threading_utils.ThreadPoolClosed): |
- pool.close() |
- |
- def test_priority(self): |
- # Verifies that a lower priority is run first. |
- with threading_utils.ThreadPool(1, 1, 0) as pool: |
- lock = threading.Lock() |
- |
- def wait_and_return(x): |
- with lock: |
- return x |
- |
- def return_x(x): |
- return x |
- |
- with lock: |
- pool.add_task(0, wait_and_return, 'a') |
- pool.add_task(2, return_x, 'b') |
- pool.add_task(1, return_x, 'c') |
- |
- actual = pool.join() |
- self.assertEqual(['a', 'c', 'b'], actual) |
- |
- @timeout(2) |
- def test_abort(self): |
- # Trigger a ridiculous amount of tasks, and abort the remaining. |
- with threading_utils.ThreadPool(2, 2, 0) as pool: |
- # Allow 10 tasks to run initially. |
- sem = threading.Semaphore(10) |
- |
- def grab_and_return(x): |
- sem.acquire() |
- return x |
- |
- for i in range(100): |
- pool.add_task(0, grab_and_return, i) |
- |
- # Running at 11 would hang. |
- results = [pool.get_one_result() for _ in xrange(10)] |
- # At that point, there's 10 completed tasks and 2 tasks hanging, 88 |
- # pending. |
- self.assertEqual(88, pool.abort()) |
- # Calling .join() before these 2 .release() would hang. |
- sem.release() |
- sem.release() |
- results.extend(pool.join()) |
- # The results *may* be out of order. Even if the calls are processed |
- # strictly in FIFO mode, a thread may preempt another one when returning the |
- # values. |
- self.assertEqual(range(12), sorted(results)) |
- |
- |
-class AutoRetryThreadPoolTest(unittest.TestCase): |
- def test_bad_class(self): |
- exceptions = [AutoRetryThreadPoolTest] |
- with self.assertRaises(AssertionError): |
- threading_utils.AutoRetryThreadPool(exceptions, 1, 0, 1, 0) |
- |
- def test_no_exception(self): |
- with self.assertRaises(AssertionError): |
- threading_utils.AutoRetryThreadPool([], 1, 0, 1, 0) |
- |
- def test_bad_retry(self): |
- exceptions = [IOError] |
- with self.assertRaises(AssertionError): |
- threading_utils.AutoRetryThreadPool(exceptions, 256, 0, 1, 0) |
- |
- def test_bad_priority(self): |
- exceptions = [IOError] |
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool: |
- pool.add_task(0, lambda x: x, 0) |
- pool.add_task(256, lambda x: x, 0) |
- pool.add_task(512, lambda x: x, 0) |
- with self.assertRaises(AssertionError): |
- pool.add_task(1, lambda x: x, 0) |
- with self.assertRaises(AssertionError): |
- pool.add_task(255, lambda x: x, 0) |
- |
- def test_priority(self): |
- # Verifies that a lower priority is run first. |
- exceptions = [IOError] |
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool: |
- lock = threading.Lock() |
- |
- def wait_and_return(x): |
- with lock: |
- return x |
- |
- def return_x(x): |
- return x |
- |
- with lock: |
- pool.add_task(pool.HIGH, wait_and_return, 'a') |
- pool.add_task(pool.LOW, return_x, 'b') |
- pool.add_task(pool.MED, return_x, 'c') |
- |
- actual = pool.join() |
- self.assertEqual(['a', 'c', 'b'], actual) |
- |
- def test_retry_inherited(self): |
- # Exception class inheritance works. |
- class CustomException(IOError): |
- pass |
- ran = [] |
- def throw(to_throw, x): |
- ran.append(x) |
- if to_throw: |
- raise to_throw.pop(0) |
- return x |
- with threading_utils.AutoRetryThreadPool([IOError], 1, 1, 1, 0) as pool: |
- pool.add_task(pool.MED, throw, [CustomException('a')], 'yay') |
- actual = pool.join() |
- self.assertEqual(['yay'], actual) |
- self.assertEqual(['yay', 'yay'], ran) |
- |
- def test_retry_2_times(self): |
- exceptions = [IOError, OSError] |
- to_throw = [OSError('a'), IOError('b')] |
- def throw(x): |
- if to_throw: |
- raise to_throw.pop(0) |
- return x |
- with threading_utils.AutoRetryThreadPool(exceptions, 2, 1, 1, 0) as pool: |
- pool.add_task(pool.MED, throw, 'yay') |
- actual = pool.join() |
- self.assertEqual(['yay'], actual) |
- |
- def test_retry_too_many_times(self): |
- exceptions = [IOError, OSError] |
- to_throw = [OSError('a'), IOError('b')] |
- def throw(x): |
- if to_throw: |
- raise to_throw.pop(0) |
- return x |
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool: |
- pool.add_task(pool.MED, throw, 'yay') |
- with self.assertRaises(IOError): |
- pool.join() |
- |
- def test_retry_mutation_1(self): |
- # This is to warn that mutable arguments WILL be mutated. |
- def throw(to_throw, x): |
- if to_throw: |
- raise to_throw.pop(0) |
- return x |
- exceptions = [IOError, OSError] |
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool: |
- pool.add_task(pool.MED, throw, [OSError('a'), IOError('b')], 'yay') |
- with self.assertRaises(IOError): |
- pool.join() |
- |
- def test_retry_mutation_2(self): |
- # This is to warn that mutable arguments WILL be mutated. |
- def throw(to_throw, x): |
- if to_throw: |
- raise to_throw.pop(0) |
- return x |
- exceptions = [IOError, OSError] |
- with threading_utils.AutoRetryThreadPool(exceptions, 2, 1, 1, 0) as pool: |
- pool.add_task(pool.MED, throw, [OSError('a'), IOError('b')], 'yay') |
- actual = pool.join() |
- self.assertEqual(['yay'], actual) |
- |
- def test_retry_interleaved(self): |
- # Verifies that retries are interleaved. This is important, we don't want a |
- # retried task to take all the pool during retries. |
- exceptions = [IOError, OSError] |
- lock = threading.Lock() |
- ran = [] |
- with threading_utils.AutoRetryThreadPool(exceptions, 2, 1, 1, 0) as pool: |
- def lock_and_throw(to_throw, x): |
- with lock: |
- ran.append(x) |
- if to_throw: |
- raise to_throw.pop(0) |
- return x |
- with lock: |
- pool.add_task( |
- pool.MED, lock_and_throw, [OSError('a'), IOError('b')], 'A') |
- pool.add_task( |
- pool.MED, lock_and_throw, [OSError('a'), IOError('b')], 'B') |
- |
- actual = pool.join() |
- self.assertEqual(['A', 'B'], actual) |
- # Retries are properly interleaved: |
- self.assertEqual(['A', 'B', 'A', 'B', 'A', 'B'], ran) |
- |
- def test_add_task_with_channel_success(self): |
- with threading_utils.AutoRetryThreadPool([OSError], 2, 1, 1, 0) as pool: |
- channel = threading_utils.TaskChannel() |
- pool.add_task_with_channel(channel, 0, lambda: 0) |
- self.assertEqual(0, channel.pull()) |
- |
- def test_add_task_with_channel_fatal_error(self): |
- with threading_utils.AutoRetryThreadPool([OSError], 2, 1, 1, 0) as pool: |
- channel = threading_utils.TaskChannel() |
- def throw(exc): |
- raise exc |
- pool.add_task_with_channel(channel, 0, throw, ValueError()) |
- with self.assertRaises(ValueError): |
- channel.pull() |
- |
- def test_add_task_with_channel_retryable_error(self): |
- with threading_utils.AutoRetryThreadPool([OSError], 2, 1, 1, 0) as pool: |
- channel = threading_utils.TaskChannel() |
- def throw(exc): |
- raise exc |
- pool.add_task_with_channel(channel, 0, throw, OSError()) |
- with self.assertRaises(OSError): |
- channel.pull() |
- |
- |
-class FakeProgress(object): |
- @staticmethod |
- def print_update(): |
- pass |
- |
- |
-class WorkerPoolTest(unittest.TestCase): |
- def test_normal(self): |
- mapper = lambda value: -value |
- progress = FakeProgress() |
- with threading_utils.ThreadPoolWithProgress(progress, 8, 8, 0) as pool: |
- for i in range(32): |
- pool.add_task(0, mapper, i) |
- results = pool.join() |
- self.assertEqual(range(-31, 1), sorted(results)) |
- |
- def test_exception(self): |
- class FearsomeException(Exception): |
- pass |
- def mapper(value): |
- raise FearsomeException(value) |
- task_added = False |
- try: |
- progress = FakeProgress() |
- with threading_utils.ThreadPoolWithProgress(progress, 8, 8, 0) as pool: |
- pool.add_task(0, mapper, 0) |
- task_added = True |
- pool.join() |
- self.fail() |
- except FearsomeException: |
- self.assertEqual(True, task_added) |
- |
- |
-class TaskChannelTest(unittest.TestCase): |
- def test_passes_simple_value(self): |
- with threading_utils.ThreadPool(1, 1, 0) as tp: |
- channel = threading_utils.TaskChannel() |
- tp.add_task(0, lambda: channel.send_result(0)) |
- self.assertEqual(0, channel.pull()) |
- |
- def test_passes_exception_value(self): |
- with threading_utils.ThreadPool(1, 1, 0) as tp: |
- channel = threading_utils.TaskChannel() |
- tp.add_task(0, lambda: channel.send_result(Exception())) |
- self.assertTrue(isinstance(channel.pull(), Exception)) |
- |
- def test_wrap_task_passes_simple_value(self): |
- with threading_utils.ThreadPool(1, 1, 0) as tp: |
- channel = threading_utils.TaskChannel() |
- tp.add_task(0, channel.wrap_task(lambda: 0)) |
- self.assertEqual(0, channel.pull()) |
- |
- def test_wrap_task_passes_exception_value(self): |
- with threading_utils.ThreadPool(1, 1, 0) as tp: |
- channel = threading_utils.TaskChannel() |
- tp.add_task(0, channel.wrap_task(lambda: Exception())) |
- self.assertTrue(isinstance(channel.pull(), Exception)) |
- |
- def test_send_exception_raises_exception(self): |
- class CustomError(Exception): |
- pass |
- with threading_utils.ThreadPool(1, 1, 0) as tp: |
- channel = threading_utils.TaskChannel() |
- tp.add_task(0, lambda: channel.send_exception(CustomError())) |
- with self.assertRaises(CustomError): |
- channel.pull() |
- |
- def test_wrap_task_raises_exception(self): |
- class CustomError(Exception): |
- pass |
- with threading_utils.ThreadPool(1, 1, 0) as tp: |
- channel = threading_utils.TaskChannel() |
- def task_func(): |
- raise CustomError() |
- tp.add_task(0, channel.wrap_task(task_func)) |
- with self.assertRaises(CustomError): |
- channel.pull() |
- |
- |
-if __name__ == '__main__': |
- VERBOSE = '-v' in sys.argv |
- logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) |
- unittest.main() |