Index: third_party/lit/lit/run.py |
diff --git a/third_party/lit/lit/run.py b/third_party/lit/lit/run.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4817dd6d1d790e4f6de9802e7d226ba0888c87f4 |
--- /dev/null |
+++ b/third_party/lit/lit/run.py |
@@ -0,0 +1,299 @@ |
+import os |
+import threading |
+import time |
+import traceback |
+try: |
+ import Queue as queue |
+except ImportError: |
+ import queue |
+ |
+try: |
+ import win32api |
+except ImportError: |
+ win32api = None |
+ |
+try: |
+ import multiprocessing |
+except ImportError: |
+ multiprocessing = None |
+ |
+import lit.Test |
+ |
+### |
+# Test Execution Implementation |
+ |
+class LockedValue(object): |
+ def __init__(self, value): |
+ self.lock = threading.Lock() |
+ self._value = value |
+ |
+ def _get_value(self): |
+ self.lock.acquire() |
+ try: |
+ return self._value |
+ finally: |
+ self.lock.release() |
+ |
+ def _set_value(self, value): |
+ self.lock.acquire() |
+ try: |
+ self._value = value |
+ finally: |
+ self.lock.release() |
+ |
+ value = property(_get_value, _set_value) |
+ |
+class TestProvider(object): |
+ def __init__(self, tests, num_jobs, queue_impl, canceled_flag): |
+ self.canceled_flag = canceled_flag |
+ |
+ # Create a shared queue to provide the test indices. |
+ self.queue = queue_impl() |
+ for i in range(len(tests)): |
+ self.queue.put(i) |
+ for i in range(num_jobs): |
+ self.queue.put(None) |
+ |
+ def cancel(self): |
+ self.canceled_flag.value = 1 |
+ |
+ def get(self): |
+ # Check if we are canceled. |
+ if self.canceled_flag.value: |
+ return None |
+ |
+ # Otherwise take the next test. |
+ return self.queue.get() |
+ |
+class Tester(object): |
+ def __init__(self, run_instance, provider, consumer): |
+ self.run_instance = run_instance |
+ self.provider = provider |
+ self.consumer = consumer |
+ |
+ def run(self): |
+ while True: |
+ item = self.provider.get() |
+ if item is None: |
+ break |
+ self.run_test(item) |
+ self.consumer.task_finished() |
+ |
+ def run_test(self, test_index): |
+ test = self.run_instance.tests[test_index] |
+ try: |
+ self.run_instance.execute_test(test) |
+ except KeyboardInterrupt: |
+ # This is a sad hack. Unfortunately subprocess goes |
+ # bonkers with ctrl-c and we start forking merrily. |
+ print('\nCtrl-C detected, goodbye.') |
+ os.kill(0,9) |
+ self.consumer.update(test_index, test) |
+ |
+class ThreadResultsConsumer(object): |
+ def __init__(self, display): |
+ self.display = display |
+ self.lock = threading.Lock() |
+ |
+ def update(self, test_index, test): |
+ self.lock.acquire() |
+ try: |
+ self.display.update(test) |
+ finally: |
+ self.lock.release() |
+ |
+ def task_finished(self): |
+ pass |
+ |
+ def handle_results(self): |
+ pass |
+ |
+class MultiprocessResultsConsumer(object): |
+ def __init__(self, run, display, num_jobs): |
+ self.run = run |
+ self.display = display |
+ self.num_jobs = num_jobs |
+ self.queue = multiprocessing.Queue() |
+ |
+ def update(self, test_index, test): |
+ # This method is called in the child processes, and communicates the |
+ # results to the actual display implementation via an output queue. |
+ self.queue.put((test_index, test.result)) |
+ |
+ def task_finished(self): |
+ # This method is called in the child processes, and communicates that |
+ # individual tasks are complete. |
+ self.queue.put(None) |
+ |
+ def handle_results(self): |
+ # This method is called in the parent, and consumes the results from the |
+ # output queue and dispatches to the actual display. The method will |
+ # complete after each of num_jobs tasks has signalled completion. |
+ completed = 0 |
+ while completed != self.num_jobs: |
+ # Wait for a result item. |
+ item = self.queue.get() |
+ if item is None: |
+ completed += 1 |
+ continue |
+ |
+ # Update the test result in the parent process. |
+ index,result = item |
+ test = self.run.tests[index] |
+ test.result = result |
+ |
+ self.display.update(test) |
+ |
+def run_one_tester(run, provider, display): |
+ tester = Tester(run, provider, display) |
+ tester.run() |
+ |
+### |
+ |
+class Run(object): |
+ """ |
+ This class represents a concrete, configured testing run. |
+ """ |
+ |
+ def __init__(self, lit_config, tests): |
+ self.lit_config = lit_config |
+ self.tests = tests |
+ |
+ def execute_test(self, test): |
+ result = None |
+ start_time = time.time() |
+ try: |
+ result = test.config.test_format.execute(test, self.lit_config) |
+ |
+ # Support deprecated result from execute() which returned the result |
+ # code and additional output as a tuple. |
+ if isinstance(result, tuple): |
+ code, output = result |
+ result = lit.Test.Result(code, output) |
+ elif not isinstance(result, lit.Test.Result): |
+ raise ValueError("unexpected result from test execution") |
+ except KeyboardInterrupt: |
+ raise |
+ except: |
+ if self.lit_config.debug: |
+ raise |
+ output = 'Exception during script execution:\n' |
+ output += traceback.format_exc() |
+ output += '\n' |
+ result = lit.Test.Result(lit.Test.UNRESOLVED, output) |
+ result.elapsed = time.time() - start_time |
+ |
+ test.setResult(result) |
+ |
+ def execute_tests(self, display, jobs, max_time=None, |
+ use_processes=False): |
+ """ |
+ execute_tests(display, jobs, [max_time]) |
+ |
+ Execute each of the tests in the run, using up to jobs number of |
+ parallel tasks, and inform the display of each individual result. The |
+ provided tests should be a subset of the tests available in this run |
+ object. |
+ |
+ If max_time is non-None, it should be a time in seconds after which to |
+ stop executing tests. |
+ |
+ The display object will have its update method called with each test as |
+ it is completed. The calls are guaranteed to be locked with respect to |
+ one another, but are *not* guaranteed to be called on the same thread as |
+ this method was invoked on. |
+ |
+ Upon completion, each test in the run will have its result |
+ computed. Tests which were not actually executed (for any reason) will |
+ be given an UNRESOLVED result. |
+ """ |
+ |
+ # Choose the appropriate parallel execution implementation. |
+ consumer = None |
+ if jobs != 1 and use_processes and multiprocessing: |
+ try: |
+ task_impl = multiprocessing.Process |
+ queue_impl = multiprocessing.Queue |
+ canceled_flag = multiprocessing.Value('i', 0) |
+ consumer = MultiprocessResultsConsumer(self, display, jobs) |
+ except: |
+ # multiprocessing fails to initialize with certain OpenBSD and |
+ # FreeBSD Python versions: http://bugs.python.org/issue3770 |
+ # Unfortunately the error raised also varies by platform. |
+ self.lit_config.note('failed to initialize multiprocessing') |
+ consumer = None |
+ if not consumer: |
+ task_impl = threading.Thread |
+ queue_impl = queue.Queue |
+ canceled_flag = LockedValue(0) |
+ consumer = ThreadResultsConsumer(display) |
+ |
+ # Because some tests use threads internally, and at least on Linux each |
+ # of these threads counts toward the current process limit, try to |
+ # raise the (soft) process limit so that tests don't fail due to |
+ # resource exhaustion. |
+ try: |
+ cpus = lit.util.detectCPUs() |
+ desired_limit = jobs * cpus * 2 # the 2 is a safety factor |
+ |
+ # Import the resource module here inside this try block because it |
+ # will likely fail on Windows. |
+ import resource |
+ |
+ max_procs_soft, max_procs_hard = resource.getrlimit(resource.RLIMIT_NPROC) |
+ desired_limit = min(desired_limit, max_procs_hard) |
+ |
+ if max_procs_soft < desired_limit: |
+ resource.setrlimit(resource.RLIMIT_NPROC, (desired_limit, max_procs_hard)) |
+ self.lit_config.note('raised the process limit from %d to %d' % \ |
+ (max_procs_soft, desired_limit)) |
+ except: |
+ pass |
+ |
+ # Create the test provider. |
+ provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag) |
+ |
+ # Install a console-control signal handler on Windows. |
+ if win32api is not None: |
+ def console_ctrl_handler(type): |
+ provider.cancel() |
+ return True |
+ win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) |
+ |
+ # Install a timeout handler, if requested. |
+ if max_time is not None: |
+ def timeout_handler(): |
+ provider.cancel() |
+ timeout_timer = threading.Timer(max_time, timeout_handler) |
+ timeout_timer.start() |
+ |
+ # If not using multiple tasks, just run the tests directly. |
+ if jobs == 1: |
+ run_one_tester(self, provider, consumer) |
+ else: |
+ # Otherwise, execute the tests in parallel |
+ self._execute_tests_in_parallel(task_impl, provider, consumer, jobs) |
+ |
+ # Cancel the timeout handler. |
+ if max_time is not None: |
+ timeout_timer.cancel() |
+ |
+ # Update results for any tests which weren't run. |
+ for test in self.tests: |
+ if test.result is None: |
+ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) |
+ |
+ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): |
+ # Start all of the tasks. |
+ tasks = [task_impl(target=run_one_tester, |
+ args=(self, provider, consumer)) |
+ for i in range(jobs)] |
+ for t in tasks: |
+ t.start() |
+ |
+ # Allow the consumer to handle results, if necessary. |
+ consumer.handle_results() |
+ |
+ # Wait for all the tasks to complete. |
+ for t in tasks: |
+ t.join() |