OLD | NEW |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Module containing methods/classes related to running parallel test jobs.""" | 5 """Module containing methods/classes related to running parallel test jobs.""" |
6 | 6 |
7 import sys | 7 import sys |
8 import threading | 8 import threading |
9 import time | 9 import time |
10 | 10 |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
47 # From threading.py to avoid a refcycle. | 47 # From threading.py to avoid a refcycle. |
48 del self._target, self._args | 48 del self._target, self._args |
49 | 49 |
50 def GetOutput(self): | 50 def GetOutput(self): |
51 """Returns the output of the method run.""" | 51 """Returns the output of the method run.""" |
52 assert self._completed, 'GetOutput called before thread was run.' | 52 assert self._completed, 'GetOutput called before thread was run.' |
53 return self._output | 53 return self._output |
54 | 54 |
55 def _Cleanup(self): | 55 def _Cleanup(self): |
56 """Releases semaphores for a waiting caller.""" | 56 """Releases semaphores for a waiting caller.""" |
57 cros_lib.Info('Completed job %s' % self) | |
58 self._starting_semaphore.release() | 57 self._starting_semaphore.release() |
59 self._ending_semaphore.release() | 58 self._ending_semaphore.release() |
60 | 59 |
61 def __str__(self): | 60 def __str__(self): |
62 return '%s(%s)' % (self._target, self._args) | 61 return '%s(%s)' % (self._target, self._args) |
63 | 62 |
64 | 63 |
65 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, | 64 def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, |
66 print_status): | 65 print_status): |
67 """Runs set number of specified jobs in parallel. | 66 """Runs set number of specified jobs in parallel. |
(...skipping 14 matching lines...) Expand all Loading... |
82 join_semaphore = threading.Semaphore(0) | 81 join_semaphore = threading.Semaphore(0) |
83 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' | 82 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
84 | 83 |
85 # Create the parallel jobs. | 84 # Create the parallel jobs. |
86 for job, args in map(_TwoTupleize, jobs, jobs_args): | 85 for job, args in map(_TwoTupleize, jobs, jobs_args): |
87 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, | 86 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, |
88 args=args) | 87 args=args) |
89 threads.append(thread) | 88 threads.append(thread) |
90 | 89 |
91 # Cache sudo access. | 90 # Cache sudo access. |
92 cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'], | 91 cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], |
93 print_cmd=False, redirect_stdout=True, | 92 print_cmd=False, redirect_stdout=True, |
94 redirect_stderr=True) | 93 redirect_stderr=True) |
95 | 94 |
96 # We use a semaphore to ensure we don't run more jobs than required. | 95 # We use a semaphore to ensure we don't run more jobs than required. |
97 # After each thread finishes, it releases (increments semaphore). | 96 # After each thread finishes, it releases (increments semaphore). |
98 # Acquire blocks of num jobs reached and continues when a thread finishes. | 97 # Acquire blocks of num jobs reached and continues when a thread finishes. |
99 for next_thread in threads: | 98 for next_thread in threads: |
100 job_start_semaphore.acquire(blocking=True) | 99 job_start_semaphore.acquire(blocking=True) |
101 cros_lib.Info('Starting job %s' % next_thread) | |
102 next_thread.start() | 100 next_thread.start() |
103 | 101 |
104 # Wait on the rest of the threads to finish. | 102 # Wait on the rest of the threads to finish. |
105 cros_lib.Info('Waiting for threads to complete.') | |
106 for thread in threads: | 103 for thread in threads: |
107 while not join_semaphore.acquire(blocking=False): | 104 while not join_semaphore.acquire(blocking=False): |
108 time.sleep(5) | 105 time.sleep(5) |
109 if print_status: | 106 if print_status: |
110 print >> sys.stderr, '.', | 107 print >> sys.stderr, '.', |
111 | 108 |
112 return [thread.GetOutput() for thread in threads] | 109 return [thread.GetOutput() for thread in threads] |
OLD | NEW |