Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: tools/testrunner/local/pool.py

Issue 275093002: Introduce a dynamic process pool for the local test driver (Closed) Base URL: https://v8.googlecode.com/svn/branches/bleeding_edge
Patch Set: Review Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2014 the V8 project authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 from multiprocessing import Event, Process, Queue
7
8 class NormalResult():
9 def __init__(self, result):
10 self.result = result
11 self.exception = False
12 self.break_now = False
13
14
15 class ExceptionResult():
16 def __init__(self):
17 self.exception = True
18 self.break_now = False
19
20
21 class BreakResult():
22 def __init__(self):
23 self.exception = False
24 self.break_now = True
25
26
27 def Worker(fn, work_queue, done_queue, done):
28 """Worker to be run in a child process.
29 The worker stops on two conditions. 1. When the poison pill "STOP" is
30 reached or 2. when the event "done" is set."""
31 try:
32 for args in iter(work_queue.get, "STOP"):
33 if done.is_set():
34 break
35 try:
36 done_queue.put(NormalResult(fn(*args)))
37 except Exception, e:
38 print(">>> EXCEPTION: %s" % e)
39 done_queue.put(ExceptionResult())
40 except KeyboardInterrupt:
41 done_queue.put(BreakResult())
42
43
44 class Pool():
45 """Distributes tasks to a number of worker processes.
46 New tasks can be added dynamically even after the workers have been started.
47 Requirement: Tasks can only be added from the parent process, e.g. while
48 consuming the results generator."""
49
50 # Factor to calculate the maximum number of items in the work/done queue.
51 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
52 BUFFER_FACTOR = 4
53
54 def __init__(self, num_workers):
55 self.num_workers = num_workers
56 self.processes = []
57 self.terminated = False
58
59 # Invariant: count >= #work_queue + #done_queue. It is greater when a
60 # worker takes an item from the work_queue and before the result is
61 # submitted to the done_queue. It is equal when no worker is working,
62 # e.g. when all workers have finished, and when no results are processed.
63 # Count is only accessed by the parent process. Only the parent process is
64 # allowed to remove items from the done_queue and to add items to the
65 # work_queue.
66 self.count = 0
67 self.work_queue = Queue()
68 self.done_queue = Queue()
69 self.done = Event()
70
71 def imap_unordered(self, fn, gen):
72 """Maps function "fn" to items in generator "gen" on the worker processes
73 in an arbitrary order. The items are expected to be lists of arguments to
74 the function. Returns a results iterator."""
75 try:
76 gen = iter(gen)
77 self.advance = self._advance_more
78
79 for w in xrange(self.num_workers):
80 p = Process(target=Worker, args=(fn,
81 self.work_queue,
82 self.done_queue,
83 self.done))
84 self.processes.append(p)
85 p.start()
86
87 self.advance(gen)
88 while self.count > 0:
89 result = self.done_queue.get()
90 self.count -= 1
91 if result.exception:
92 # Ignore items with unexpected exceptions.
93 continue
94 elif result.break_now:
95 # A keyboard interrupt happened in one of the worker processes.
96 raise KeyboardInterrupt
97 else:
98 yield result.result
99 self.advance(gen)
100 finally:
101 self.terminate()
102
103 def _advance_more(self, gen):
104 while self.count < self.num_workers * self.BUFFER_FACTOR:
105 try:
106 self.work_queue.put(gen.next())
107 self.count += 1
108 except StopIteration:
109 self.advance = self._advance_empty
110 break
111
112 def _advance_empty(self, gen):
113 pass
114
115 def add(self, args):
116 """Adds an item to the work queue. Can be called dynamically while
117 processing the results from imap_unordered."""
118 self.work_queue.put(args)
119 self.count += 1
120
121 def terminate(self):
122 if self.terminated:
123 return
124 self.terminated = True
125
126 # For exceptional tear down set the "done" event to stop the workers before
127 # they empty the queue buffer.
128 self.done.set()
129
130 for p in self.processes:
131 # During normal tear down the workers block on get(). Feed a poison pill
132 # per worker to make them stop.
133 self.work_queue.put("STOP")
134
135 for p in self.processes:
136 p.join()
OLDNEW
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698