OLD | NEW |
---|---|
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Module containing methods/classes related to running parallel test jobs.""" | 5 """Module containing methods/classes related to running parallel test jobs.""" |
6 | 6 |
7 import multiprocessing | |
7 import sys | 8 import sys |
8 import threading | 9 import threading |
9 import time | 10 import time |
10 | 11 |
11 import cros_build_lib as cros_lib | 12 import cros_build_lib as cros_lib |
12 | 13 |
13 class ParallelJob(threading.Thread): | 14 class ParallelJobTimeoutError(Exception): |
14 """Small wrapper for threading. Thread that releases a semaphores on exit.""" | 15 """Thrown when a job ran for longer than expected.""" |
16 pass | |
15 | 17 |
16 def __init__(self, starting_semaphore, ending_semaphore, target, args): | 18 |
19 class ParallelJob(multiprocessing.Process): | |
20 """Small wrapper for Process that stores output of a its target method.""" | |
dgarrett
2011/04/08 03:56:42
its -> it's
sosa
2011/04/08 18:59:04
it's == it is :)
On 2011/04/08 03:56:42, dgarrett
| |
21 | |
22 MAX_TIMEOUT = 1800 | |
dgarrett
2011/04/08 03:56:42
Units?
sosa
2011/04/08 18:59:04
Done.
| |
23 SLEEP_TIMEOUT = 180 | |
24 | |
25 def __init__(self, starting_semaphore, target, args): | |
17 """Initializes an instance of a job. | 26 """Initializes an instance of a job. |
18 | 27 |
19 Args: | 28 Args: |
20 starting_semaphore: Semaphore used by caller to wait on such that | 29 starting_semaphore: Semaphore used by caller to wait on such that |
21 there isn't more than a certain number of threads running. Should | 30 there isn't more than a certain number of parallel_jobs running. Should |
22 be initialized to a value for the number of threads wanting to be run | 31 be initialized to a value for the number of parallel_jobs wanting to be |
23 at a time. | 32 run at a time. |
24 ending_semaphore: Semaphore is released every time a job ends. Should be | |
25 initialized to 0 before starting first job. Should be acquired once for | |
26 each job. Threading.Thread.join() has a bug where if the run function | |
27 terminates too quickly join() will hang forever. | |
28 target: The func to run. | 33 target: The func to run. |
29 args: Args to pass to the fun. | 34 args: Args to pass to the fun. |
30 """ | 35 """ |
31 threading.Thread.__init__(self, target=target, args=args) | 36 super(ParallelJob, self).__init__(target=target, args=args) |
32 self._target = target | 37 self._target = target |
33 self._args = args | 38 self._args = args |
34 self._starting_semaphore = starting_semaphore | 39 self._starting_semaphore = starting_semaphore |
35 self._ending_semaphore = ending_semaphore | |
36 self._output = None | |
37 self._completed = False | |
38 | 40 |
39 def run(self): | 41 def run(self): |
40 """Thread override. Runs the method specified and sets output.""" | 42 """Thread override. Runs the method specified and sets output.""" |
41 try: | 43 try: |
42 self._output = self._target(*self._args) | 44 self._target(*self._args) |
43 finally: | 45 finally: |
44 # Our own clean up. | 46 self._starting_semaphore.release() |
45 self._Cleanup() | |
46 self._completed = True | |
47 # From threading.py to avoid a refcycle. | |
48 del self._target, self._args | |
49 | 47 |
50 def GetOutput(self): | 48 @classmethod |
51 """Returns the output of the method run.""" | 49 def WaitUntilJobsComplete(cls, parallel_jobs): |
52 assert self._completed, 'GetOutput called before thread was run.' | 50 """Waits until all parallel_jobs have completed before returning. |
53 return self._output | |
54 | 51 |
55 def _Cleanup(self): | 52 Given an array of parallel_jobs, returns once all parallel_jobs have |
56 """Releases semaphores for a waiting caller.""" | 53 completed or a max timeout is reached. |
57 self._starting_semaphore.release() | 54 |
58 self._ending_semaphore.release() | 55 Raises: |
56 ParallelJobTimeoutError: if max timeout is reached. | |
57 """ | |
58 def GetCurrentActiveCount(): | |
59 """Returns the (number of active jobs, first active job).""" | |
60 active_count = 0 | |
61 active_job = None | |
62 for parallel_job in parallel_jobs: | |
63 if parallel_job.is_alive(): | |
64 active_count += 1 | |
65 if not active_job: | |
66 active_job = parallel_job | |
67 | |
68 return (active_count, parallel_job) | |
69 | |
70 start_time = time.time() | |
71 while (time.time() - start_time) < cls.MAX_TIMEOUT: | |
72 (active_count, active_job) = GetCurrentActiveCount() | |
73 if active_count == 0: | |
74 return | |
75 else: | |
76 print >> sys.stderr, ( | |
77 'Process Pool Active: Waiting on %d/%d jobs to complete' % | |
78 (active_count, len(parallel_jobs))) | |
79 active_job.join(cls.SLEEP_TIMEOUT) | |
80 | |
81 for parallel_job in parallel_jobs: | |
82 if parallel_job.is_alive(): | |
83 parallel_job.terminate() | |
84 | |
85 raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for ' | |
86 'job completion.' % cls.MAX_TIMEOUT) | |
59 | 87 |
60 def __str__(self): | 88 def __str__(self): |
61 return '%s(%s)' % (self._target, self._args) | 89 return '%s(%s)' % (self._target, self._args) |
62 | 90 |
63 | 91 |
64 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, | 92 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, |
65 print_status): | 93 print_status): |
66 """Runs set number of specified jobs in parallel. | 94 """Runs set number of specified jobs in parallel. |
67 | 95 |
68 Args: | 96 Args: |
69 number_of_simultaneous_jobs: Max number of threads to be run in parallel. | 97 number_of_simultaneous_jobs: Max number of parallel_jobs to be run in |
98 parallel. | |
70 jobs: Array of methods to run. | 99 jobs: Array of methods to run. |
71 jobs_args: Array of args associated with method calls. | 100 jobs_args: Array of args associated with method calls. |
72 print_status: True if you'd like this to print out .'s as it runs jobs. | 101 print_status: True if you'd like this to print out .'s as it runs jobs. |
73 Returns: | 102 Returns: |
74 Returns an array of results corresponding to each thread. | 103 Returns an array of results corresponding to each parallel_job. |
75 """ | 104 """ |
76 def _TwoTupleize(x, y): | 105 def _TwoTupleize(x, y): |
77 return (x, y) | 106 return (x, y) |
78 | 107 |
79 threads = [] | 108 def ProcessOutputWrapper(func, args, output): |
80 job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs) | 109 """Simple function wrapper that puts the output of a function in a queue.""" |
81 join_semaphore = threading.Semaphore(0) | 110 output.put(func(*args)) |
111 | |
82 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' | 112 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
83 | |
84 # Create the parallel jobs. | |
85 for job, args in map(_TwoTupleize, jobs, jobs_args): | |
86 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, | |
87 args=args) | |
88 threads.append(thread) | |
89 | |
90 # Cache sudo access. | 113 # Cache sudo access. |
91 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], | 114 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], |
92 print_cmd=False, redirect_stdout=True, | 115 print_cmd=False, redirect_stdout=True, |
93 redirect_stderr=True) | 116 redirect_stderr=True) |
94 | 117 |
118 parallel_jobs = [] | |
119 output_array = [] | |
120 | |
121 # Semaphore used to create a Process Pool. | |
122 job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs) | |
dgarrett
2011/04/08 03:56:42
Is this semaphore really safe across processes ins
sosa
2011/04/08 18:59:04
I think you're right, switching to multiprocessing
| |
123 | |
124 # Create the parallel jobs. | |
125 for job, args in map(_TwoTupleize, jobs, jobs_args): | |
dgarrett
2011/04/08 03:56:42
_TwoTupleize could be replaced in-line with "lambd
sosa
2011/04/08 18:59:04
Done.
| |
126 output = multiprocessing.Queue() | |
127 parallel_job = ParallelJob(job_start_semaphore, | |
128 target=ProcessOutputWrapper, | |
129 args=(job, args, output)) | |
130 parallel_jobs.append(parallel_job) | |
131 output_array.append(output) | |
132 | |
95 # We use a semaphore to ensure we don't run more jobs than required. | 133 # We use a semaphore to ensure we don't run more jobs than required. |
96 # After each thread finishes, it releases (increments semaphore). | 134 # After each parallel_job finishes, it releases (increments semaphore). |
97 # Acquire blocks of num jobs reached and continues when a thread finishes. | 135 for next_parallel_job in parallel_jobs: |
98 for next_thread in threads: | |
99 job_start_semaphore.acquire(blocking=True) | 136 job_start_semaphore.acquire(blocking=True) |
100 next_thread.start() | 137 next_parallel_job.start() |
101 | 138 |
102 # Wait on the rest of the threads to finish. | 139 ParallelJob.WaitUntilJobsComplete(parallel_jobs) |
103 for thread in threads: | 140 return [output.get() for output in output_array] |
104 while not join_semaphore.acquire(blocking=False): | |
105 time.sleep(5) | |
106 if print_status: | |
107 print >> sys.stderr, '.', | |
108 | |
109 return [thread.GetOutput() for thread in threads] | |
OLD | NEW |