| Index: bin/au_test_harness/parallel_test_job.py
|
| diff --git a/bin/au_test_harness/parallel_test_job.py b/bin/au_test_harness/parallel_test_job.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..c84ef4401186de41c719fe7e627a06aff6f5f209
|
| --- /dev/null
|
| +++ b/bin/au_test_harness/parallel_test_job.py
|
| @@ -0,0 +1,112 @@
|
| +# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +"""Module containing methods/classes related to running parallel test jobs."""
|
| +
|
| +import sys
|
| +import threading
|
| +import time
|
| +
|
| +import cros_build_lib as cros_lib
|
| +
|
| +class ParallelJob(threading.Thread):
|
| + """Small wrapper for threading. Thread that releases a semaphores on exit."""
|
| +
|
| + def __init__(self, starting_semaphore, ending_semaphore, target, args):
|
| + """Initializes an instance of a job.
|
| +
|
| + Args:
|
| + starting_semaphore: Semaphore used by caller to wait on such that
|
| + there isn't more than a certain number of threads running. Should
|
| + be initialized to a value for the number of threads wanting to be run
|
| + at a time.
|
| + ending_semaphore: Semaphore is released every time a job ends. Should be
|
| + initialized to 0 before starting first job. Should be acquired once for
|
| + each job. Threading.Thread.join() has a bug where if the run function
|
| + terminates too quickly join() will hang forever.
|
| + target: The func to run.
|
| + args: Args to pass to the fun.
|
| + """
|
| + threading.Thread.__init__(self, target=target, args=args)
|
| + self._target = target
|
| + self._args = args
|
| + self._starting_semaphore = starting_semaphore
|
| + self._ending_semaphore = ending_semaphore
|
| + self._output = None
|
| + self._completed = False
|
| +
|
| + def run(self):
|
| + """Thread override. Runs the method specified and sets output."""
|
| + try:
|
| + self._output = self._target(*self._args)
|
| + finally:
|
| + # Our own clean up.
|
| + self._Cleanup()
|
| + self._completed = True
|
| + # From threading.py to avoid a refcycle.
|
| + del self._target, self._args
|
| +
|
| + def GetOutput(self):
|
| + """Returns the output of the method run."""
|
| + assert self._completed, 'GetOutput called before thread was run.'
|
| + return self._output
|
| +
|
| + def _Cleanup(self):
|
| + """Releases semaphores for a waiting caller."""
|
| + cros_lib.Info('Completed job %s' % self)
|
| + self._starting_semaphore.release()
|
| + self._ending_semaphore.release()
|
| +
|
| + def __str__(self):
|
| + return '%s(%s)' % (self._target, self._args)
|
| +
|
| +
|
| +def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args,
|
| + print_status):
|
| + """Runs set number of specified jobs in parallel.
|
| +
|
| + Args:
|
| + number_of_simultaneous_jobs: Max number of threads to be run in parallel.
|
| + jobs: Array of methods to run.
|
| + jobs_args: Array of args associated with method calls.
|
| + print_status: True if you'd like this to print out .'s as it runs jobs.
|
| + Returns:
|
| + Returns an array of results corresponding to each thread.
|
| + """
|
| + def _TwoTupleize(x, y):
|
| + return (x, y)
|
| +
|
| + threads = []
|
| + job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs)
|
| + join_semaphore = threading.Semaphore(0)
|
| + assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
|
| +
|
| + # Create the parallel jobs.
|
| + for job, args in map(_TwoTupleize, jobs, jobs_args):
|
| + thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
|
| + args=args)
|
| + threads.append(thread)
|
| +
|
| + # Cache sudo access.
|
| + cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'],
|
| + print_cmd=False, redirect_stdout=True,
|
| + redirect_stderr=True)
|
| +
|
| + # We use a semaphore to ensure we don't run more jobs than required.
|
| + # After each thread finishes, it releases (increments semaphore).
|
| + # Acquire blocks of num jobs reached and continues when a thread finishes.
|
| + for next_thread in threads:
|
| + job_start_semaphore.acquire(blocking=True)
|
| + cros_lib.Info('Starting job %s' % next_thread)
|
| + next_thread.start()
|
| +
|
| + # Wait on the rest of the threads to finish.
|
| + cros_lib.Info('Waiting for threads to complete.')
|
| + for thread in threads:
|
| + while not join_semaphore.acquire(blocking=False):
|
| + time.sleep(5)
|
| + if print_status:
|
| + print >> sys.stderr, '.',
|
| +
|
| + return [thread.GetOutput() for thread in threads]
|
|
|