Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1793)

Unified Diff: bin/au_test_harness/parallel_test_job.py

Issue 6597122: Refactor au_test_harness into modules and refactor to use worker design. (Closed) Base URL: http://git.chromium.org/git/crosutils.git@master
Patch Set: 80 char Created 9 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: bin/au_test_harness/parallel_test_job.py
diff --git a/bin/au_test_harness/parallel_test_job.py b/bin/au_test_harness/parallel_test_job.py
new file mode 100644
index 0000000000000000000000000000000000000000..331eca148350732ce68ceda3a3fc7ed1c7f41741
--- /dev/null
+++ b/bin/au_test_harness/parallel_test_job.py
@@ -0,0 +1,113 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Module containing methods/classes related to running parallel test jobs."""
+
+import sys
+import threading
+import time
+
+import cros_build_lib as cros_lib
+
+class ParallelJob(threading.Thread):
+ """Small wrapper for threading. Thread that releases a semaphores on exit."""
+
+ def __init__(self, starting_semaphore, ending_semaphore, target, args):
+ """Initializes an instance of a job.
+
+ Args:
+ starting_semaphore: Semaphore used by caller to wait on such that
+ there isn't more than a certain number of threads running. Should
+ be initialized to a value for the number of threads wanting to be run
+ at a time.
+ ending_semaphore: Semaphore is released every time a job ends. Should be
+ initialized to 0 before starting first job. Should be acquired once for
+ each job. Threading.Thread.join() has a bug where if the run function
+ terminates too quickly join() will hang forever.
+ target: The func to run.
+ args: Args to pass to the fun.
+ """
+ threading.Thread.__init__(self, target=target, args=args)
+ self._target = target
+ self._args = args
+ self._starting_semaphore = starting_semaphore
+ self._ending_semaphore = ending_semaphore
+ self._output = None
+ self._completed = False
+
+ def run(self):
+ """Thread override. Runs the method specified and sets output."""
+ try:
+ self._output = self._target(*self._args)
+ finally:
+ # Our own clean up.
+ self._Cleanup()
+ self._completed = True
+ # From threading.py to avoid a refcycle.
+ del self._target, self._args
+
+ def GetOutput(self):
+ """Returns the output of the method run."""
+ assert self._completed, 'GetOutput called before thread was run.'
+ return self._output
+
+ def _Cleanup(self):
+ """Releases semaphores for a waiting caller."""
+ cros_lib.Info('Completed job %s' % self)
+ self._starting_semaphore.release()
+ self._ending_semaphore.release()
+
+ def __str__(self):
+ return '%s(%s)' % (self._target, self._args)
+
+
+def RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args,
dgarrett 2011/03/03 02:17:21 s/sumultaneous/simultaneous/
sosa 2011/03/03 03:11:41 Done.
+ print_status):
+
dgarrett 2011/03/03 02:17:21 Extra blank line.
sosa 2011/03/03 03:11:41 Done.
+ """Runs set number of specified jobs in parallel.
+
+ Args:
+ number_of_simultaneous_jobs: Max number of threads to be run in parallel.
+ jobs: Array of methods to run.
+ jobs_args: Array of args associated with method calls.
+ print_status: True if you'd like this to print out .'s as it runs jobs.
+ Returns:
+ Returns an array of results corresponding to each thread.
+ """
+ def _TwoTupleize(x, y):
+ return (x, y)
+
+ threads = []
+ job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
dgarrett 2011/03/03 02:17:21 s/sumultaneous/simultaneous/
sosa 2011/03/03 03:11:41 Done.
+ join_semaphore = threading.Semaphore(0)
+ assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
+
+ # Create the parallel jobs.
+ for job, args in map(_TwoTupleize, jobs, jobs_args):
+ thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
+ args=args)
+ threads.append(thread)
+
+ # Cache sudo access.
+ cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'],
+ print_cmd=False, redirect_stdout=True,
+ redirect_stderr=True)
+
+ # We use a semaphore to ensure we don't run more jobs that required.
dgarrett 2011/03/03 02:17:21 s/that/than/
sosa 2011/03/03 03:11:41 Done.
+ # After each thread finishes, it releases (increments semaphore).
+ # Acquire blocks of num jobs reached and continues when a thread finishes.
+ for next_thread in threads:
+ job_start_semaphore.acquire(blocking=True)
+ cros_lib.Info('Starting job %s' % next_thread)
+ next_thread.start()
+
+ # Wait on the rest of the threads to finish.
+ cros_lib.Info('Waiting for threads to complete.')
+ for thread in threads:
+ while not join_semaphore.acquire(blocking=False):
+ time.sleep(5)
+ if print_status:
+ print >> sys.stderr, '.',
+
+ return [thread.GetOutput() for thread in threads]

Powered by Google App Engine
This is Rietveld 408576698