Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(116)

Unified Diff: swarm_client/tests/threading_utils_test.py

Issue 69143004: Delete swarm_client. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « swarm_client/tests/swarming_test.py ('k') | swarm_client/tests/trace_inputs/child1.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: swarm_client/tests/threading_utils_test.py
===================================================================
--- swarm_client/tests/threading_utils_test.py (revision 235167)
+++ swarm_client/tests/threading_utils_test.py (working copy)
@@ -1,540 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2013 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-# Lambda may not be necessary.
-# pylint: disable=W0108
-
-import functools
-import logging
-import os
-import signal
-import sys
-import threading
-import time
-import unittest
-
-ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.insert(0, ROOT_DIR)
-
-from utils import threading_utils
-
-
-def timeout(max_running_time):
- """Test method decorator that fails the test if it executes longer
- than |max_running_time| seconds.
-
- It exists to terminate tests in case of deadlocks. There's a high chance that
- process is broken after such timeout (due to hanging deadlocked threads that
- can own some shared resources). But failing early (maybe not in a cleanest
- way) due to timeout is generally better than hanging indefinitely.
-
- |max_running_time| should be an order of magnitude (or even two orders) larger
- than the expected run time of the test to compensate for slow machine, high
- CPU utilization by some other processes, etc.
-
- Can not be nested.
-
- Noop on windows (since win32 doesn't support signal.setitimer).
- """
- if sys.platform == 'win32':
- return lambda method: method
-
- def decorator(method):
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- signal.signal(signal.SIGALRM, lambda *_args: self.fail('Timeout'))
- signal.setitimer(signal.ITIMER_REAL, max_running_time)
- try:
- return method(self, *args, **kwargs)
- finally:
- signal.signal(signal.SIGALRM, signal.SIG_DFL)
- signal.setitimer(signal.ITIMER_REAL, 0)
- return wrapper
-
- return decorator
-
-
-class ThreadPoolTest(unittest.TestCase):
- MIN_THREADS = 0
- MAX_THREADS = 32
-
- # Append custom assert messages to default ones (works with python >= 2.7).
- longMessage = True
-
- @staticmethod
- def sleep_task(duration=0.01):
- """Returns function that sleeps |duration| sec and returns its argument."""
- def task(arg):
- time.sleep(duration)
- return arg
- return task
-
- def retrying_sleep_task(self, duration=0.01):
- """Returns function that adds sleep_task to the thread pool."""
- def task(arg):
- self.thread_pool.add_task(0, self.sleep_task(duration), arg)
- return task
-
- @staticmethod
- def none_task():
- """Returns function that returns None."""
- return lambda _arg: None
-
- def setUp(self):
- super(ThreadPoolTest, self).setUp()
- self.thread_pool = threading_utils.ThreadPool(
- self.MIN_THREADS, self.MAX_THREADS, 0)
-
- @timeout(1)
- def tearDown(self):
- super(ThreadPoolTest, self).tearDown()
- self.thread_pool.close()
-
- def get_results_via_join(self, _expected):
- return self.thread_pool.join()
-
- def get_results_via_get_one_result(self, expected):
- return [self.thread_pool.get_one_result() for _ in expected]
-
- def get_results_via_iter_results(self, _expected):
- return list(self.thread_pool.iter_results())
-
- def run_results_test(self, task, results_getter, args=None, expected=None):
- """Template function for tests checking that pool returns all results.
-
- Will add multiple instances of |task| to the thread pool, then call
- |results_getter| to get back all results and compare them to expected ones.
- """
- args = range(0, 100) if args is None else args
- expected = args if expected is None else expected
- msg = 'Using \'%s\' to get results.' % (results_getter.__name__,)
-
- for i in args:
- self.thread_pool.add_task(0, task, i)
- results = results_getter(expected)
-
- # Check that got all results back (exact same set, no duplicates).
- self.assertEqual(set(expected), set(results), msg)
- self.assertEqual(len(expected), len(results), msg)
-
- # Queue is empty, result request should fail.
- with self.assertRaises(threading_utils.ThreadPoolEmpty):
- self.thread_pool.get_one_result()
-
- @timeout(1)
- def test_get_one_result_ok(self):
- self.thread_pool.add_task(0, lambda: 'OK')
- self.assertEqual(self.thread_pool.get_one_result(), 'OK')
-
- @timeout(1)
- def test_get_one_result_fail(self):
- # No tasks added -> get_one_result raises an exception.
- with self.assertRaises(threading_utils.ThreadPoolEmpty):
- self.thread_pool.get_one_result()
-
- @timeout(5)
- def test_join(self):
- self.run_results_test(self.sleep_task(),
- self.get_results_via_join)
-
- @timeout(5)
- def test_get_one_result(self):
- self.run_results_test(self.sleep_task(),
- self.get_results_via_get_one_result)
-
- @timeout(5)
- def test_iter_results(self):
- self.run_results_test(self.sleep_task(),
- self.get_results_via_iter_results)
-
- @timeout(5)
- def test_retry_and_join(self):
- self.run_results_test(self.retrying_sleep_task(),
- self.get_results_via_join)
-
- @timeout(5)
- def test_retry_and_get_one_result(self):
- self.run_results_test(self.retrying_sleep_task(),
- self.get_results_via_get_one_result)
-
- @timeout(5)
- def test_retry_and_iter_results(self):
- self.run_results_test(self.retrying_sleep_task(),
- self.get_results_via_iter_results)
-
- @timeout(5)
- def test_none_task_and_join(self):
- self.run_results_test(self.none_task(),
- self.get_results_via_join,
- expected=[])
-
- @timeout(5)
- def test_none_task_and_get_one_result(self):
- self.thread_pool.add_task(0, self.none_task(), 0)
- with self.assertRaises(threading_utils.ThreadPoolEmpty):
- self.thread_pool.get_one_result()
-
- @timeout(5)
- def test_none_task_and_and_iter_results(self):
- self.run_results_test(self.none_task(),
- self.get_results_via_iter_results,
- expected=[])
-
- @timeout(5)
- def test_generator_task(self):
- MULTIPLIER = 1000
- COUNT = 10
-
- # Generator that yields [i * MULTIPLIER, i * MULTIPLIER + COUNT).
- def generator_task(i):
- for j in xrange(COUNT):
- time.sleep(0.001)
- yield i * MULTIPLIER + j
-
- # Arguments for tasks and expected results.
- args = range(0, 10)
- expected = [i * MULTIPLIER + j for i in args for j in xrange(COUNT)]
-
- # Test all possible ways to pull results from the thread pool.
- getters = (self.get_results_via_join,
- self.get_results_via_iter_results,
- self.get_results_via_get_one_result,)
- for results_getter in getters:
- self.run_results_test(generator_task, results_getter, args, expected)
-
- @timeout(5)
- def test_concurrent_iter_results(self):
- def poller_proc(result):
- result.extend(self.thread_pool.iter_results())
-
- args = range(0, 100)
- for i in args:
- self.thread_pool.add_task(0, self.sleep_task(), i)
-
- # Start a bunch of threads, all calling iter_results in parallel.
- pollers = []
- for _ in xrange(0, 4):
- result = []
- poller = threading.Thread(target=poller_proc, args=(result,))
- poller.start()
- pollers.append((poller, result))
-
- # Collects results from all polling threads.
- all_results = []
- for poller, results in pollers:
- poller.join()
- all_results.extend(results)
-
- # Check that got all results back (exact same set, no duplicates).
- self.assertEqual(set(args), set(all_results))
- self.assertEqual(len(args), len(all_results))
-
- @timeout(1)
- def test_adding_tasks_after_close(self):
- pool = threading_utils.ThreadPool(1, 1, 0)
- pool.add_task(0, lambda: None)
- pool.close()
- with self.assertRaises(threading_utils.ThreadPoolClosed):
- pool.add_task(0, lambda: None)
-
- @timeout(1)
- def test_double_close(self):
- pool = threading_utils.ThreadPool(1, 1, 0)
- pool.close()
- with self.assertRaises(threading_utils.ThreadPoolClosed):
- pool.close()
-
- def test_priority(self):
- # Verifies that a lower priority is run first.
- with threading_utils.ThreadPool(1, 1, 0) as pool:
- lock = threading.Lock()
-
- def wait_and_return(x):
- with lock:
- return x
-
- def return_x(x):
- return x
-
- with lock:
- pool.add_task(0, wait_and_return, 'a')
- pool.add_task(2, return_x, 'b')
- pool.add_task(1, return_x, 'c')
-
- actual = pool.join()
- self.assertEqual(['a', 'c', 'b'], actual)
-
- @timeout(2)
- def test_abort(self):
- # Trigger a ridiculous amount of tasks, and abort the remaining.
- with threading_utils.ThreadPool(2, 2, 0) as pool:
- # Allow 10 tasks to run initially.
- sem = threading.Semaphore(10)
-
- def grab_and_return(x):
- sem.acquire()
- return x
-
- for i in range(100):
- pool.add_task(0, grab_and_return, i)
-
- # Running at 11 would hang.
- results = [pool.get_one_result() for _ in xrange(10)]
- # At that point, there's 10 completed tasks and 2 tasks hanging, 88
- # pending.
- self.assertEqual(88, pool.abort())
- # Calling .join() before these 2 .release() would hang.
- sem.release()
- sem.release()
- results.extend(pool.join())
- # The results *may* be out of order. Even if the calls are processed
- # strictly in FIFO mode, a thread may preempt another one when returning the
- # values.
- self.assertEqual(range(12), sorted(results))
-
-
-class AutoRetryThreadPoolTest(unittest.TestCase):
- def test_bad_class(self):
- exceptions = [AutoRetryThreadPoolTest]
- with self.assertRaises(AssertionError):
- threading_utils.AutoRetryThreadPool(exceptions, 1, 0, 1, 0)
-
- def test_no_exception(self):
- with self.assertRaises(AssertionError):
- threading_utils.AutoRetryThreadPool([], 1, 0, 1, 0)
-
- def test_bad_retry(self):
- exceptions = [IOError]
- with self.assertRaises(AssertionError):
- threading_utils.AutoRetryThreadPool(exceptions, 256, 0, 1, 0)
-
- def test_bad_priority(self):
- exceptions = [IOError]
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool:
- pool.add_task(0, lambda x: x, 0)
- pool.add_task(256, lambda x: x, 0)
- pool.add_task(512, lambda x: x, 0)
- with self.assertRaises(AssertionError):
- pool.add_task(1, lambda x: x, 0)
- with self.assertRaises(AssertionError):
- pool.add_task(255, lambda x: x, 0)
-
- def test_priority(self):
- # Verifies that a lower priority is run first.
- exceptions = [IOError]
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool:
- lock = threading.Lock()
-
- def wait_and_return(x):
- with lock:
- return x
-
- def return_x(x):
- return x
-
- with lock:
- pool.add_task(pool.HIGH, wait_and_return, 'a')
- pool.add_task(pool.LOW, return_x, 'b')
- pool.add_task(pool.MED, return_x, 'c')
-
- actual = pool.join()
- self.assertEqual(['a', 'c', 'b'], actual)
-
- def test_retry_inherited(self):
- # Exception class inheritance works.
- class CustomException(IOError):
- pass
- ran = []
- def throw(to_throw, x):
- ran.append(x)
- if to_throw:
- raise to_throw.pop(0)
- return x
- with threading_utils.AutoRetryThreadPool([IOError], 1, 1, 1, 0) as pool:
- pool.add_task(pool.MED, throw, [CustomException('a')], 'yay')
- actual = pool.join()
- self.assertEqual(['yay'], actual)
- self.assertEqual(['yay', 'yay'], ran)
-
- def test_retry_2_times(self):
- exceptions = [IOError, OSError]
- to_throw = [OSError('a'), IOError('b')]
- def throw(x):
- if to_throw:
- raise to_throw.pop(0)
- return x
- with threading_utils.AutoRetryThreadPool(exceptions, 2, 1, 1, 0) as pool:
- pool.add_task(pool.MED, throw, 'yay')
- actual = pool.join()
- self.assertEqual(['yay'], actual)
-
- def test_retry_too_many_times(self):
- exceptions = [IOError, OSError]
- to_throw = [OSError('a'), IOError('b')]
- def throw(x):
- if to_throw:
- raise to_throw.pop(0)
- return x
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool:
- pool.add_task(pool.MED, throw, 'yay')
- with self.assertRaises(IOError):
- pool.join()
-
- def test_retry_mutation_1(self):
- # This is to warn that mutable arguments WILL be mutated.
- def throw(to_throw, x):
- if to_throw:
- raise to_throw.pop(0)
- return x
- exceptions = [IOError, OSError]
- with threading_utils.AutoRetryThreadPool(exceptions, 1, 1, 1, 0) as pool:
- pool.add_task(pool.MED, throw, [OSError('a'), IOError('b')], 'yay')
- with self.assertRaises(IOError):
- pool.join()
-
- def test_retry_mutation_2(self):
- # This is to warn that mutable arguments WILL be mutated.
- def throw(to_throw, x):
- if to_throw:
- raise to_throw.pop(0)
- return x
- exceptions = [IOError, OSError]
- with threading_utils.AutoRetryThreadPool(exceptions, 2, 1, 1, 0) as pool:
- pool.add_task(pool.MED, throw, [OSError('a'), IOError('b')], 'yay')
- actual = pool.join()
- self.assertEqual(['yay'], actual)
-
- def test_retry_interleaved(self):
- # Verifies that retries are interleaved. This is important, we don't want a
- # retried task to take all the pool during retries.
- exceptions = [IOError, OSError]
- lock = threading.Lock()
- ran = []
- with threading_utils.AutoRetryThreadPool(exceptions, 2, 1, 1, 0) as pool:
- def lock_and_throw(to_throw, x):
- with lock:
- ran.append(x)
- if to_throw:
- raise to_throw.pop(0)
- return x
- with lock:
- pool.add_task(
- pool.MED, lock_and_throw, [OSError('a'), IOError('b')], 'A')
- pool.add_task(
- pool.MED, lock_and_throw, [OSError('a'), IOError('b')], 'B')
-
- actual = pool.join()
- self.assertEqual(['A', 'B'], actual)
- # Retries are properly interleaved:
- self.assertEqual(['A', 'B', 'A', 'B', 'A', 'B'], ran)
-
- def test_add_task_with_channel_success(self):
- with threading_utils.AutoRetryThreadPool([OSError], 2, 1, 1, 0) as pool:
- channel = threading_utils.TaskChannel()
- pool.add_task_with_channel(channel, 0, lambda: 0)
- self.assertEqual(0, channel.pull())
-
- def test_add_task_with_channel_fatal_error(self):
- with threading_utils.AutoRetryThreadPool([OSError], 2, 1, 1, 0) as pool:
- channel = threading_utils.TaskChannel()
- def throw(exc):
- raise exc
- pool.add_task_with_channel(channel, 0, throw, ValueError())
- with self.assertRaises(ValueError):
- channel.pull()
-
- def test_add_task_with_channel_retryable_error(self):
- with threading_utils.AutoRetryThreadPool([OSError], 2, 1, 1, 0) as pool:
- channel = threading_utils.TaskChannel()
- def throw(exc):
- raise exc
- pool.add_task_with_channel(channel, 0, throw, OSError())
- with self.assertRaises(OSError):
- channel.pull()
-
-
-class FakeProgress(object):
- @staticmethod
- def print_update():
- pass
-
-
-class WorkerPoolTest(unittest.TestCase):
- def test_normal(self):
- mapper = lambda value: -value
- progress = FakeProgress()
- with threading_utils.ThreadPoolWithProgress(progress, 8, 8, 0) as pool:
- for i in range(32):
- pool.add_task(0, mapper, i)
- results = pool.join()
- self.assertEqual(range(-31, 1), sorted(results))
-
- def test_exception(self):
- class FearsomeException(Exception):
- pass
- def mapper(value):
- raise FearsomeException(value)
- task_added = False
- try:
- progress = FakeProgress()
- with threading_utils.ThreadPoolWithProgress(progress, 8, 8, 0) as pool:
- pool.add_task(0, mapper, 0)
- task_added = True
- pool.join()
- self.fail()
- except FearsomeException:
- self.assertEqual(True, task_added)
-
-
-class TaskChannelTest(unittest.TestCase):
- def test_passes_simple_value(self):
- with threading_utils.ThreadPool(1, 1, 0) as tp:
- channel = threading_utils.TaskChannel()
- tp.add_task(0, lambda: channel.send_result(0))
- self.assertEqual(0, channel.pull())
-
- def test_passes_exception_value(self):
- with threading_utils.ThreadPool(1, 1, 0) as tp:
- channel = threading_utils.TaskChannel()
- tp.add_task(0, lambda: channel.send_result(Exception()))
- self.assertTrue(isinstance(channel.pull(), Exception))
-
- def test_wrap_task_passes_simple_value(self):
- with threading_utils.ThreadPool(1, 1, 0) as tp:
- channel = threading_utils.TaskChannel()
- tp.add_task(0, channel.wrap_task(lambda: 0))
- self.assertEqual(0, channel.pull())
-
- def test_wrap_task_passes_exception_value(self):
- with threading_utils.ThreadPool(1, 1, 0) as tp:
- channel = threading_utils.TaskChannel()
- tp.add_task(0, channel.wrap_task(lambda: Exception()))
- self.assertTrue(isinstance(channel.pull(), Exception))
-
- def test_send_exception_raises_exception(self):
- class CustomError(Exception):
- pass
- with threading_utils.ThreadPool(1, 1, 0) as tp:
- channel = threading_utils.TaskChannel()
- tp.add_task(0, lambda: channel.send_exception(CustomError()))
- with self.assertRaises(CustomError):
- channel.pull()
-
- def test_wrap_task_raises_exception(self):
- class CustomError(Exception):
- pass
- with threading_utils.ThreadPool(1, 1, 0) as tp:
- channel = threading_utils.TaskChannel()
- def task_func():
- raise CustomError()
- tp.add_task(0, channel.wrap_task(task_func))
- with self.assertRaises(CustomError):
- channel.pull()
-
-
-if __name__ == '__main__':
- VERBOSE = '-v' in sys.argv
- logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR)
- unittest.main()
« no previous file with comments | « swarm_client/tests/swarming_test.py ('k') | swarm_client/tests/trace_inputs/child1.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698