Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(131)

Side by Side Diff: au_test_harness/parallel_test_job.py

Issue 6815003: This CL updates the parallel job library in the au test harness to be more robust. (Closed) Base URL: http://git.chromium.org/git/crostestutils.git@master
Patch Set: Last fixes Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « au_test_harness/au_worker.py ('k') | au_test_harness/vm_au_worker.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Module containing methods/classes related to running parallel test jobs.""" 5 """Module containing methods/classes related to running parallel test jobs."""
6 6
7 import multiprocessing
7 import sys 8 import sys
8 import threading 9 import threading
9 import time 10 import time
10 11
11 import cros_build_lib as cros_lib 12 import cros_build_lib as cros_lib
12 13
13 class ParallelJob(threading.Thread): 14 class ParallelJobTimeoutError(Exception):
14 """Small wrapper for threading. Thread that releases a semaphores on exit.""" 15 """Thrown when a job ran for longer than expected."""
16 pass
15 17
16 def __init__(self, starting_semaphore, ending_semaphore, target, args): 18
19 class ParallelJob(multiprocessing.Process):
20 """Small wrapper for Process that stores output of a its target method."""
dgarrett 2011/04/08 03:56:42 its -> it's
sosa 2011/04/08 18:59:04 it's == it is :) On 2011/04/08 03:56:42, dgarrett
21
22 MAX_TIMEOUT = 1800
dgarrett 2011/04/08 03:56:42 Units?
sosa 2011/04/08 18:59:04 Done.
23 SLEEP_TIMEOUT = 180
24
25 def __init__(self, starting_semaphore, target, args):
17 """Initializes an instance of a job. 26 """Initializes an instance of a job.
18 27
19 Args: 28 Args:
20 starting_semaphore: Semaphore used by caller to wait on such that 29 starting_semaphore: Semaphore used by caller to wait on such that
21 there isn't more than a certain number of threads running. Should 30 there isn't more than a certain number of parallel_jobs running. Should
22 be initialized to a value for the number of threads wanting to be run 31 be initialized to a value for the number of parallel_jobs wanting to be
23 at a time. 32 run at a time.
24 ending_semaphore: Semaphore is released every time a job ends. Should be
25 initialized to 0 before starting first job. Should be acquired once for
26 each job. Threading.Thread.join() has a bug where if the run function
27 terminates too quickly join() will hang forever.
28 target: The func to run. 33 target: The func to run.
29 args: Args to pass to the fun. 34 args: Args to pass to the fun.
30 """ 35 """
31 threading.Thread.__init__(self, target=target, args=args) 36 super(ParallelJob, self).__init__(target=target, args=args)
32 self._target = target 37 self._target = target
33 self._args = args 38 self._args = args
34 self._starting_semaphore = starting_semaphore 39 self._starting_semaphore = starting_semaphore
35 self._ending_semaphore = ending_semaphore
36 self._output = None
37 self._completed = False
38 40
39 def run(self): 41 def run(self):
40 """Thread override. Runs the method specified and sets output.""" 42 """Thread override. Runs the method specified and sets output."""
41 try: 43 try:
42 self._output = self._target(*self._args) 44 self._target(*self._args)
43 finally: 45 finally:
44 # Our own clean up. 46 self._starting_semaphore.release()
45 self._Cleanup()
46 self._completed = True
47 # From threading.py to avoid a refcycle.
48 del self._target, self._args
49 47
50 def GetOutput(self): 48 @classmethod
51 """Returns the output of the method run.""" 49 def WaitUntilJobsComplete(cls, parallel_jobs):
52 assert self._completed, 'GetOutput called before thread was run.' 50 """Waits until all parallel_jobs have completed before returning.
53 return self._output
54 51
55 def _Cleanup(self): 52 Given an array of parallel_jobs, returns once all parallel_jobs have
56 """Releases semaphores for a waiting caller.""" 53 completed or a max timeout is reached.
57 self._starting_semaphore.release() 54
58 self._ending_semaphore.release() 55 Raises:
56 ParallelJobTimeoutError: if max timeout is reached.
57 """
58 def GetCurrentActiveCount():
59 """Returns the (number of active jobs, first active job)."""
60 active_count = 0
61 active_job = None
62 for parallel_job in parallel_jobs:
63 if parallel_job.is_alive():
64 active_count += 1
65 if not active_job:
66 active_job = parallel_job
67
68 return (active_count, parallel_job)
69
70 start_time = time.time()
71 while (time.time() - start_time) < cls.MAX_TIMEOUT:
72 (active_count, active_job) = GetCurrentActiveCount()
73 if active_count == 0:
74 return
75 else:
76 print >> sys.stderr, (
77 'Process Pool Active: Waiting on %d/%d jobs to complete' %
78 (active_count, len(parallel_jobs)))
79 active_job.join(cls.SLEEP_TIMEOUT)
80
81 for parallel_job in parallel_jobs:
82 if parallel_job.is_alive():
83 parallel_job.terminate()
84
85 raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for '
86 'job completion.' % cls.MAX_TIMEOUT)
59 87
60 def __str__(self): 88 def __str__(self):
61 return '%s(%s)' % (self._target, self._args) 89 return '%s(%s)' % (self._target, self._args)
62 90
63 91
64 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, 92 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args,
65 print_status): 93 print_status):
66 """Runs set number of specified jobs in parallel. 94 """Runs set number of specified jobs in parallel.
67 95
68 Args: 96 Args:
69 number_of_simultaneous_jobs: Max number of threads to be run in parallel. 97 number_of_simultaneous_jobs: Max number of parallel_jobs to be run in
98 parallel.
70 jobs: Array of methods to run. 99 jobs: Array of methods to run.
71 jobs_args: Array of args associated with method calls. 100 jobs_args: Array of args associated with method calls.
72 print_status: True if you'd like this to print out .'s as it runs jobs. 101 print_status: True if you'd like this to print out .'s as it runs jobs.
73 Returns: 102 Returns:
74 Returns an array of results corresponding to each thread. 103 Returns an array of results corresponding to each parallel_job.
75 """ 104 """
76 def _TwoTupleize(x, y): 105 def _TwoTupleize(x, y):
77 return (x, y) 106 return (x, y)
78 107
79 threads = [] 108 def ProcessOutputWrapper(func, args, output):
80 job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs) 109 """Simple function wrapper that puts the output of a function in a queue."""
81 join_semaphore = threading.Semaphore(0) 110 output.put(func(*args))
111
82 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' 112 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
83
84 # Create the parallel jobs.
85 for job, args in map(_TwoTupleize, jobs, jobs_args):
86 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
87 args=args)
88 threads.append(thread)
89
90 # Cache sudo access. 113 # Cache sudo access.
91 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], 114 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'],
92 print_cmd=False, redirect_stdout=True, 115 print_cmd=False, redirect_stdout=True,
93 redirect_stderr=True) 116 redirect_stderr=True)
94 117
118 parallel_jobs = []
119 output_array = []
120
121 # Semaphore used to create a Process Pool.
122 job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs)
dgarrett 2011/04/08 03:56:42 Is this semaphore really safe across processes ins
sosa 2011/04/08 18:59:04 I think you're right, switching to multiprocessing
123
124 # Create the parallel jobs.
125 for job, args in map(_TwoTupleize, jobs, jobs_args):
dgarrett 2011/04/08 03:56:42 _TwoTupleize could be replaced in-line with "lambd
sosa 2011/04/08 18:59:04 Done.
126 output = multiprocessing.Queue()
127 parallel_job = ParallelJob(job_start_semaphore,
128 target=ProcessOutputWrapper,
129 args=(job, args, output))
130 parallel_jobs.append(parallel_job)
131 output_array.append(output)
132
95 # We use a semaphore to ensure we don't run more jobs than required. 133 # We use a semaphore to ensure we don't run more jobs than required.
96 # After each thread finishes, it releases (increments semaphore). 134 # After each parallel_job finishes, it releases (increments semaphore).
97 # Acquire blocks of num jobs reached and continues when a thread finishes. 135 for next_parallel_job in parallel_jobs:
98 for next_thread in threads:
99 job_start_semaphore.acquire(blocking=True) 136 job_start_semaphore.acquire(blocking=True)
100 next_thread.start() 137 next_parallel_job.start()
101 138
102 # Wait on the rest of the threads to finish. 139 ParallelJob.WaitUntilJobsComplete(parallel_jobs)
103 for thread in threads: 140 return [output.get() for output in output_array]
104 while not join_semaphore.acquire(blocking=False):
105 time.sleep(5)
106 if print_status:
107 print >> sys.stderr, '.',
108
109 return [thread.GetOutput() for thread in threads]
OLDNEW
« no previous file with comments | « au_test_harness/au_worker.py ('k') | au_test_harness/vm_au_worker.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698