Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(861)

Side by Side Diff: au_test_harness/parallel_test_job.py

Issue 6825062: Fix bug where multiprocessing.Queue hangs on large outputs. (Closed) Base URL: http://git.chromium.org/git/crostestutils.git@master
Patch Set: Nits Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « au_test_harness/cros_au_test_harness.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Module containing methods/classes related to running parallel test jobs.""" 5 """Module containing methods/classes related to running parallel test jobs."""
6 6
7 import multiprocessing 7 import multiprocessing
8 import sys 8 import sys
9 import time 9 import time
10 10
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
82 if parallel_job.is_alive(): 82 if parallel_job.is_alive():
83 parallel_job.terminate() 83 parallel_job.terminate()
84 84
85 raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for ' 85 raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for '
86 'job completion.' % cls.MAX_TIMEOUT_SECONDS) 86 'job completion.' % cls.MAX_TIMEOUT_SECONDS)
87 87
88 def __str__(self): 88 def __str__(self):
89 return '%s(%s)' % (self._target, self._args) 89 return '%s(%s)' % (self._target, self._args)
90 90
91 91
92 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, 92 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args):
93 print_status):
94 """Runs set number of specified jobs in parallel. 93 """Runs set number of specified jobs in parallel.
95 94
95 Note that there is a bug in Python Queue implementation that doesn't
96 allow arbitrary sizes to be returned. Instead, the process will just
97 appear to have hung. Be careful when accepting output.
98
96 Args: 99 Args:
97 number_of_simultaneous_jobs: Max number of parallel_jobs to be run in 100 number_of_simultaneous_jobs: Max number of parallel_jobs to be run in
98 parallel. 101 parallel.
99 jobs: Array of methods to run. 102 jobs: Array of methods to run.
100 jobs_args: Array of args associated with method calls. 103 jobs_args: Array of args associated with method calls.
101 print_status: True if you'd like this to print out .'s as it runs jobs.
102 Returns: 104 Returns:
103 Returns an array of results corresponding to each parallel_job. 105 Returns an array of results corresponding to each parallel_job's output.
104 """ 106 """
105 def ProcessOutputWrapper(func, args, output): 107 def ProcessOutputWrapper(func, args, output_queue):
106 """Simple function wrapper that puts the output of a function in a queue.""" 108 """Simple function wrapper that puts the output of a function in a queue."""
107 try: 109 try:
108 output.put(func(*args)) 110 output_queue.put(func(*args))
109 except: 111 except:
110 output.put('') 112 output_queue.put(None)
111 raise 113 raise
112 finally: 114 finally:
113 output.close() 115 output_queue.close()
114 116
115 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' 117 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
116 # Cache sudo access. 118 # Cache sudo access.
117 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], 119 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'],
118 print_cmd=False, redirect_stdout=True, 120 print_cmd=False, redirect_stdout=True,
119 redirect_stderr=True) 121 redirect_stderr=True)
120 122
121 parallel_jobs = [] 123 parallel_jobs = []
122 output_array = [] 124 output_array = []
123 125
(...skipping 10 matching lines...) Expand all
134 output_array.append(output) 136 output_array.append(output)
135 137
136 # We use a semaphore to ensure we don't run more jobs than required. 138 # We use a semaphore to ensure we don't run more jobs than required.
137 # After each parallel_job finishes, it releases (increments semaphore). 139 # After each parallel_job finishes, it releases (increments semaphore).
138 for next_parallel_job in parallel_jobs: 140 for next_parallel_job in parallel_jobs:
139 job_start_semaphore.acquire(block=True) 141 job_start_semaphore.acquire(block=True)
140 next_parallel_job.start() 142 next_parallel_job.start()
141 143
142 ParallelJob.WaitUntilJobsComplete(parallel_jobs) 144 ParallelJob.WaitUntilJobsComplete(parallel_jobs)
143 return [output.get() for output in output_array] 145 return [output.get() for output in output_array]
OLDNEW
« no previous file with comments | « au_test_harness/cros_au_test_harness.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698