OLD | NEW |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Module containing methods/classes related to running parallel test jobs.""" | 5 """Module containing methods/classes related to running parallel test jobs.""" |
6 | 6 |
7 import multiprocessing | 7 import multiprocessing |
8 import sys | 8 import sys |
9 import time | 9 import time |
10 | 10 |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
82 if parallel_job.is_alive(): | 82 if parallel_job.is_alive(): |
83 parallel_job.terminate() | 83 parallel_job.terminate() |
84 | 84 |
85 raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for ' | 85 raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for ' |
86 'job completion.' % cls.MAX_TIMEOUT_SECONDS) | 86 'job completion.' % cls.MAX_TIMEOUT_SECONDS) |
87 | 87 |
88 def __str__(self): | 88 def __str__(self): |
89 return '%s(%s)' % (self._target, self._args) | 89 return '%s(%s)' % (self._target, self._args) |
90 | 90 |
91 | 91 |
92 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, | 92 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args): |
93 print_status): | |
94 """Runs set number of specified jobs in parallel. | 93 """Runs set number of specified jobs in parallel. |
95 | 94 |
| 95 Note that there is a bug in Python Queue implementation that doesn't |
| 96 allow arbitrary sizes to be returned. Instead, the process will just |
| 97 appear to have hung. Be careful when accepting output. |
| 98 |
96 Args: | 99 Args: |
97 number_of_simultaneous_jobs: Max number of parallel_jobs to be run in | 100 number_of_simultaneous_jobs: Max number of parallel_jobs to be run in |
98 parallel. | 101 parallel. |
99 jobs: Array of methods to run. | 102 jobs: Array of methods to run. |
100 jobs_args: Array of args associated with method calls. | 103 jobs_args: Array of args associated with method calls. |
101 print_status: True if you'd like this to print out .'s as it runs jobs. | |
102 Returns: | 104 Returns: |
103 Returns an array of results corresponding to each parallel_job. | 105 Returns an array of results corresponding to each parallel_job's output. |
104 """ | 106 """ |
105 def ProcessOutputWrapper(func, args, output): | 107 def ProcessOutputWrapper(func, args, output_queue): |
106 """Simple function wrapper that puts the output of a function in a queue.""" | 108 """Simple function wrapper that puts the output of a function in a queue.""" |
107 try: | 109 try: |
108 output.put(func(*args)) | 110 output_queue.put(func(*args)) |
109 except: | 111 except: |
110 output.put('') | 112 output_queue.put(None) |
111 raise | 113 raise |
112 finally: | 114 finally: |
113 output.close() | 115 output_queue.close() |
114 | 116 |
115 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' | 117 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
116 # Cache sudo access. | 118 # Cache sudo access. |
117 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], | 119 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], |
118 print_cmd=False, redirect_stdout=True, | 120 print_cmd=False, redirect_stdout=True, |
119 redirect_stderr=True) | 121 redirect_stderr=True) |
120 | 122 |
121 parallel_jobs = [] | 123 parallel_jobs = [] |
122 output_array = [] | 124 output_array = [] |
123 | 125 |
(...skipping 10 matching lines...) Expand all Loading... |
134 output_array.append(output) | 136 output_array.append(output) |
135 | 137 |
136 # We use a semaphore to ensure we don't run more jobs than required. | 138 # We use a semaphore to ensure we don't run more jobs than required. |
137 # After each parallel_job finishes, it releases (increments semaphore). | 139 # After each parallel_job finishes, it releases (increments semaphore). |
138 for next_parallel_job in parallel_jobs: | 140 for next_parallel_job in parallel_jobs: |
139 job_start_semaphore.acquire(block=True) | 141 job_start_semaphore.acquire(block=True) |
140 next_parallel_job.start() | 142 next_parallel_job.start() |
141 | 143 |
142 ParallelJob.WaitUntilJobsComplete(parallel_jobs) | 144 ParallelJob.WaitUntilJobsComplete(parallel_jobs) |
143 return [output.get() for output in output_array] | 145 return [output.get() for output in output_array] |
OLD | NEW |