Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: bin/au_test_harness/parallel_test_job.py

Issue 6597122: Refactor au_test_harness into modules and refactor to use worker design. (Closed) Base URL: http://git.chromium.org/git/crosutils.git@master
Patch Set: 80 char Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Module containing methods/classes related to running parallel test jobs."""
6
7 import sys
8 import threading
9 import time
10
11 import cros_build_lib as cros_lib
12
13 class ParallelJob(threading.Thread):
14 """Small wrapper for threading. Thread that releases a semaphores on exit."""
15
16 def __init__(self, starting_semaphore, ending_semaphore, target, args):
17 """Initializes an instance of a job.
18
19 Args:
20 starting_semaphore: Semaphore used by caller to wait on such that
21 there isn't more than a certain number of threads running. Should
22 be initialized to a value for the number of threads wanting to be run
23 at a time.
24 ending_semaphore: Semaphore is released every time a job ends. Should be
25 initialized to 0 before starting first job. Should be acquired once for
26 each job. Threading.Thread.join() has a bug where if the run function
27 terminates too quickly join() will hang forever.
28 target: The func to run.
29 args: Args to pass to the fun.
30 """
31 threading.Thread.__init__(self, target=target, args=args)
32 self._target = target
33 self._args = args
34 self._starting_semaphore = starting_semaphore
35 self._ending_semaphore = ending_semaphore
36 self._output = None
37 self._completed = False
38
39 def run(self):
40 """Thread override. Runs the method specified and sets output."""
41 try:
42 self._output = self._target(*self._args)
43 finally:
44 # Our own clean up.
45 self._Cleanup()
46 self._completed = True
47 # From threading.py to avoid a refcycle.
48 del self._target, self._args
49
50 def GetOutput(self):
51 """Returns the output of the method run."""
52 assert self._completed, 'GetOutput called before thread was run.'
53 return self._output
54
55 def _Cleanup(self):
56 """Releases semaphores for a waiting caller."""
57 cros_lib.Info('Completed job %s' % self)
58 self._starting_semaphore.release()
59 self._ending_semaphore.release()
60
61 def __str__(self):
62 return '%s(%s)' % (self._target, self._args)
63
64
65 def RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args,
dgarrett 2011/03/03 02:17:21 s/sumultaneous/simultaneous/
sosa 2011/03/03 03:11:41 Done.
66 print_status):
67
dgarrett 2011/03/03 02:17:21 Extra blank line.
sosa 2011/03/03 03:11:41 Done.
68 """Runs set number of specified jobs in parallel.
69
70 Args:
71 number_of_simultaneous_jobs: Max number of threads to be run in parallel.
72 jobs: Array of methods to run.
73 jobs_args: Array of args associated with method calls.
74 print_status: True if you'd like this to print out .'s as it runs jobs.
75 Returns:
76 Returns an array of results corresponding to each thread.
77 """
78 def _TwoTupleize(x, y):
79 return (x, y)
80
81 threads = []
82 job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
dgarrett 2011/03/03 02:17:21 s/sumultaneous/simultaneous/
sosa 2011/03/03 03:11:41 Done.
83 join_semaphore = threading.Semaphore(0)
84 assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
85
86 # Create the parallel jobs.
87 for job, args in map(_TwoTupleize, jobs, jobs_args):
88 thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
89 args=args)
90 threads.append(thread)
91
92 # Cache sudo access.
93 cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'],
94 print_cmd=False, redirect_stdout=True,
95 redirect_stderr=True)
96
97 # We use a semaphore to ensure we don't run more jobs that required.
dgarrett 2011/03/03 02:17:21 s/that/than/
sosa 2011/03/03 03:11:41 Done.
98 # After each thread finishes, it releases (increments semaphore).
99 # Acquire blocks of num jobs reached and continues when a thread finishes.
100 for next_thread in threads:
101 job_start_semaphore.acquire(blocking=True)
102 cros_lib.Info('Starting job %s' % next_thread)
103 next_thread.start()
104
105 # Wait on the rest of the threads to finish.
106 cros_lib.Info('Waiting for threads to complete.')
107 for thread in threads:
108 while not join_semaphore.acquire(blocking=False):
109 time.sleep(5)
110 if print_status:
111 print >> sys.stderr, '.',
112
113 return [thread.GetOutput() for thread in threads]
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698