OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 """Module containing methods/classes related to running parallel test jobs.""" |
| 6 |
| 7 import sys |
| 8 import threading |
| 9 import time |
| 10 |
| 11 import cros_build_lib as cros_lib |
| 12 |
| 13 class ParallelJob(threading.Thread): |
| 14 """Small wrapper for threading. Thread that releases a semaphores on exit.""" |
| 15 |
| 16 def __init__(self, starting_semaphore, ending_semaphore, target, args): |
| 17 """Initializes an instance of a job. |
| 18 |
| 19 Args: |
| 20 starting_semaphore: Semaphore used by caller to wait on such that |
| 21 there isn't more than a certain number of threads running. Should |
| 22 be initialized to a value for the number of threads wanting to be run |
| 23 at a time. |
| 24 ending_semaphore: Semaphore is released every time a job ends. Should be |
| 25 initialized to 0 before starting first job. Should be acquired once for |
| 26 each job. Threading.Thread.join() has a bug where if the run function |
| 27 terminates too quickly join() will hang forever. |
| 28 target: The func to run. |
| 29 args: Args to pass to the fun. |
| 30 """ |
| 31 threading.Thread.__init__(self, target=target, args=args) |
| 32 self._target = target |
| 33 self._args = args |
| 34 self._starting_semaphore = starting_semaphore |
| 35 self._ending_semaphore = ending_semaphore |
| 36 self._output = None |
| 37 self._completed = False |
| 38 |
| 39 def run(self): |
| 40 """Thread override. Runs the method specified and sets output.""" |
| 41 try: |
| 42 self._output = self._target(*self._args) |
| 43 finally: |
| 44 # Our own clean up. |
| 45 self._Cleanup() |
| 46 self._completed = True |
| 47 # From threading.py to avoid a refcycle. |
| 48 del self._target, self._args |
| 49 |
| 50 def GetOutput(self): |
| 51 """Returns the output of the method run.""" |
| 52 assert self._completed, 'GetOutput called before thread was run.' |
| 53 return self._output |
| 54 |
| 55 def _Cleanup(self): |
| 56 """Releases semaphores for a waiting caller.""" |
| 57 cros_lib.Info('Completed job %s' % self) |
| 58 self._starting_semaphore.release() |
| 59 self._ending_semaphore.release() |
| 60 |
| 61 def __str__(self): |
| 62 return '%s(%s)' % (self._target, self._args) |
| 63 |
| 64 |
| 65 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, |
| 66 print_status): |
| 67 """Runs set number of specified jobs in parallel. |
| 68 |
| 69 Args: |
| 70 number_of_simultaneous_jobs: Max number of threads to be run in parallel. |
| 71 jobs: Array of methods to run. |
| 72 jobs_args: Array of args associated with method calls. |
| 73 print_status: True if you'd like this to print out .'s as it runs jobs. |
| 74 Returns: |
| 75 Returns an array of results corresponding to each thread. |
| 76 """ |
| 77 def _TwoTupleize(x, y): |
| 78 return (x, y) |
| 79 |
| 80 threads = [] |
| 81 job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs) |
| 82 join_semaphore = threading.Semaphore(0) |
| 83 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
| 84 |
| 85 # Create the parallel jobs. |
| 86 for job, args in map(_TwoTupleize, jobs, jobs_args): |
| 87 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, |
| 88 args=args) |
| 89 threads.append(thread) |
| 90 |
| 91 # Cache sudo access. |
| 92 cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'], |
| 93 print_cmd=False, redirect_stdout=True, |
| 94 redirect_stderr=True) |
| 95 |
| 96 # We use a semaphore to ensure we don't run more jobs than required. |
| 97 # After each thread finishes, it releases (increments semaphore). |
| 98 # Acquire blocks of num jobs reached and continues when a thread finishes. |
| 99 for next_thread in threads: |
| 100 job_start_semaphore.acquire(blocking=True) |
| 101 cros_lib.Info('Starting job %s' % next_thread) |
| 102 next_thread.start() |
| 103 |
| 104 # Wait on the rest of the threads to finish. |
| 105 cros_lib.Info('Waiting for threads to complete.') |
| 106 for thread in threads: |
| 107 while not join_semaphore.acquire(blocking=False): |
| 108 time.sleep(5) |
| 109 if print_status: |
| 110 print >> sys.stderr, '.', |
| 111 |
| 112 return [thread.GetOutput() for thread in threads] |
OLD | NEW |