OLD | NEW |
---|---|
(Empty) | |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Module containing methods/classes related to running parallel test jobs.""" | |
6 | |
7 import sys | |
8 import threading | |
9 import time | |
10 | |
11 import cros_build_lib as cros_lib | |
12 | |
13 class ParallelJob(threading.Thread): | |
14 """Small wrapper for threading. Thread that releases a semaphores on exit.""" | |
15 | |
16 def __init__(self, starting_semaphore, ending_semaphore, target, args): | |
17 """Initializes an instance of a job. | |
18 | |
19 Args: | |
20 starting_semaphore: Semaphore used by caller to wait on such that | |
21 there isn't more than a certain number of threads running. Should | |
22 be initialized to a value for the number of threads wanting to be run | |
23 at a time. | |
24 ending_semaphore: Semaphore is released every time a job ends. Should be | |
25 initialized to 0 before starting first job. Should be acquired once for | |
26 each job. Threading.Thread.join() has a bug where if the run function | |
27 terminates too quickly join() will hang forever. | |
28 target: The func to run. | |
29 args: Args to pass to the fun. | |
30 """ | |
31 threading.Thread.__init__(self, target=target, args=args) | |
32 self._target = target | |
33 self._args = args | |
34 self._starting_semaphore = starting_semaphore | |
35 self._ending_semaphore = ending_semaphore | |
36 self._output = None | |
37 self._completed = False | |
38 | |
39 def run(self): | |
40 """Thread override. Runs the method specified and sets output.""" | |
41 try: | |
42 self._output = self._target(*self._args) | |
43 finally: | |
44 # Our own clean up. | |
45 self._Cleanup() | |
46 self._completed = True | |
47 # From threading.py to avoid a refcycle. | |
48 del self._target, self._args | |
49 | |
50 def GetOutput(self): | |
51 """Returns the output of the method run.""" | |
52 assert self._completed, 'GetOutput called before thread was run.' | |
53 return self._output | |
54 | |
55 def _Cleanup(self): | |
56 """Releases semaphores for a waiting caller.""" | |
57 cros_lib.Info('Completed job %s' % self) | |
58 self._starting_semaphore.release() | |
59 self._ending_semaphore.release() | |
60 | |
61 def __str__(self): | |
62 return '%s(%s)' % (self._target, self._args) | |
63 | |
64 | |
65 def RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args, | |
dgarrett
2011/03/03 02:17:21
s/sumultaneous/simultaneous/
sosa
2011/03/03 03:11:41
Done.
| |
66 print_status): | |
67 | |
dgarrett
2011/03/03 02:17:21
Extra blank line.
sosa
2011/03/03 03:11:41
Done.
| |
68 """Runs set number of specified jobs in parallel. | |
69 | |
70 Args: | |
71 number_of_simultaneous_jobs: Max number of threads to be run in parallel. | |
72 jobs: Array of methods to run. | |
73 jobs_args: Array of args associated with method calls. | |
74 print_status: True if you'd like this to print out .'s as it runs jobs. | |
75 Returns: | |
76 Returns an array of results corresponding to each thread. | |
77 """ | |
78 def _TwoTupleize(x, y): | |
79 return (x, y) | |
80 | |
81 threads = [] | |
82 job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) | |
dgarrett
2011/03/03 02:17:21
s/sumultaneous/simultaneous/
sosa
2011/03/03 03:11:41
Done.
| |
83 join_semaphore = threading.Semaphore(0) | |
84 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' | |
85 | |
86 # Create the parallel jobs. | |
87 for job, args in map(_TwoTupleize, jobs, jobs_args): | |
88 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, | |
89 args=args) | |
90 threads.append(thread) | |
91 | |
92 # Cache sudo access. | |
93 cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'], | |
94 print_cmd=False, redirect_stdout=True, | |
95 redirect_stderr=True) | |
96 | |
97 # We use a semaphore to ensure we don't run more jobs that required. | |
dgarrett
2011/03/03 02:17:21
s/that/than/
sosa
2011/03/03 03:11:41
Done.
| |
98 # After each thread finishes, it releases (increments semaphore). | |
99 # Acquire blocks of num jobs reached and continues when a thread finishes. | |
100 for next_thread in threads: | |
101 job_start_semaphore.acquire(blocking=True) | |
102 cros_lib.Info('Starting job %s' % next_thread) | |
103 next_thread.start() | |
104 | |
105 # Wait on the rest of the threads to finish. | |
106 cros_lib.Info('Waiting for threads to complete.') | |
107 for thread in threads: | |
108 while not join_semaphore.acquire(blocking=False): | |
109 time.sleep(5) | |
110 if print_status: | |
111 print >> sys.stderr, '.', | |
112 | |
113 return [thread.GetOutput() for thread in threads] | |
OLD | NEW |