Chromium Code Reviews| Index: bin/au_test_harness/parallel_test_job.py |
| diff --git a/bin/au_test_harness/parallel_test_job.py b/bin/au_test_harness/parallel_test_job.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..331eca148350732ce68ceda3a3fc7ed1c7f41741 |
| --- /dev/null |
| +++ b/bin/au_test_harness/parallel_test_job.py |
| @@ -0,0 +1,113 @@ |
| +# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +"""Module containing methods/classes related to running parallel test jobs.""" |
| + |
| +import sys |
| +import threading |
| +import time |
| + |
| +import cros_build_lib as cros_lib |
| + |
| +class ParallelJob(threading.Thread): |
| + """Small wrapper for threading. Thread that releases a semaphores on exit.""" |
| + |
| + def __init__(self, starting_semaphore, ending_semaphore, target, args): |
| + """Initializes an instance of a job. |
| + |
| + Args: |
| + starting_semaphore: Semaphore used by caller to wait on such that |
| + there isn't more than a certain number of threads running. Should |
| + be initialized to a value for the number of threads wanting to be run |
| + at a time. |
| + ending_semaphore: Semaphore is released every time a job ends. Should be |
| + initialized to 0 before starting first job. Should be acquired once for |
| + each job. Threading.Thread.join() has a bug where if the run function |
| + terminates too quickly join() will hang forever. |
| + target: The func to run. |
| + args: Args to pass to the fun. |
| + """ |
| + threading.Thread.__init__(self, target=target, args=args) |
| + self._target = target |
| + self._args = args |
| + self._starting_semaphore = starting_semaphore |
| + self._ending_semaphore = ending_semaphore |
| + self._output = None |
| + self._completed = False |
| + |
| + def run(self): |
| + """Thread override. Runs the method specified and sets output.""" |
| + try: |
| + self._output = self._target(*self._args) |
| + finally: |
| + # Our own clean up. |
| + self._Cleanup() |
| + self._completed = True |
| + # From threading.py to avoid a refcycle. |
| + del self._target, self._args |
| + |
| + def GetOutput(self): |
| + """Returns the output of the method run.""" |
| + assert self._completed, 'GetOutput called before thread was run.' |
| + return self._output |
| + |
| + def _Cleanup(self): |
| + """Releases semaphores for a waiting caller.""" |
| + cros_lib.Info('Completed job %s' % self) |
| + self._starting_semaphore.release() |
| + self._ending_semaphore.release() |
| + |
| + def __str__(self): |
| + return '%s(%s)' % (self._target, self._args) |
| + |
| + |
| +def RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args, |
|
dgarrett
2011/03/03 02:17:21
s/sumultaneous/simultaneous/
sosa
2011/03/03 03:11:41
Done.
|
| + print_status): |
| + |
|
dgarrett
2011/03/03 02:17:21
Extra blank line.
sosa
2011/03/03 03:11:41
Done.
|
| + """Runs set number of specified jobs in parallel. |
| + |
| + Args: |
| + number_of_simultaneous_jobs: Max number of threads to be run in parallel. |
| + jobs: Array of methods to run. |
| + jobs_args: Array of args associated with method calls. |
| + print_status: True if you'd like this to print out .'s as it runs jobs. |
| + Returns: |
| + Returns an array of results corresponding to each thread. |
| + """ |
| + def _TwoTupleize(x, y): |
| + return (x, y) |
| + |
| + threads = [] |
| + job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
|
dgarrett
2011/03/03 02:17:21
s/sumultaneous/simultaneous/
sosa
2011/03/03 03:11:41
Done.
|
| + join_semaphore = threading.Semaphore(0) |
| + assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
| + |
| + # Create the parallel jobs. |
| + for job, args in map(_TwoTupleize, jobs, jobs_args): |
| + thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, |
| + args=args) |
| + threads.append(thread) |
| + |
| + # Cache sudo access. |
| + cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'], |
| + print_cmd=False, redirect_stdout=True, |
| + redirect_stderr=True) |
| + |
| + # We use a semaphore to ensure we don't run more jobs that required. |
|
dgarrett
2011/03/03 02:17:21
s/that/than/
sosa
2011/03/03 03:11:41
Done.
|
| + # After each thread finishes, it releases (increments semaphore). |
| + # Acquire blocks of num jobs reached and continues when a thread finishes. |
| + for next_thread in threads: |
| + job_start_semaphore.acquire(blocking=True) |
| + cros_lib.Info('Starting job %s' % next_thread) |
| + next_thread.start() |
| + |
| + # Wait on the rest of the threads to finish. |
| + cros_lib.Info('Waiting for threads to complete.') |
| + for thread in threads: |
| + while not join_semaphore.acquire(blocking=False): |
| + time.sleep(5) |
| + if print_status: |
| + print >> sys.stderr, '.', |
| + |
| + return [thread.GetOutput() for thread in threads] |