Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: tools/testrunner/local/pool.py

Issue 1469833002: [test-runner] Move test case processing beyond the multi-process boundary. (Closed) Base URL: https://chromium.googlesource.com/v8/v8.git@master
Patch Set: Review Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/progress.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2014 the V8 project authors. All rights reserved. 2 # Copyright 2014 the V8 project authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 from Queue import Empty 6 from Queue import Empty
7 from multiprocessing import Event, Process, Queue 7 from multiprocessing import Event, Process, Queue
8 import traceback
9
8 10
9 class NormalResult(): 11 class NormalResult():
10 def __init__(self, result): 12 def __init__(self, result):
11 self.result = result 13 self.result = result
12 self.exception = False 14 self.exception = False
13 self.break_now = False 15 self.break_now = False
14 16
15 17
16 class ExceptionResult(): 18 class ExceptionResult():
17 def __init__(self): 19 def __init__(self):
(...skipping 14 matching lines...) Expand all
32 34
33 @staticmethod 35 @staticmethod
34 def create_heartbeat(): 36 def create_heartbeat():
35 return MaybeResult(True, None) 37 return MaybeResult(True, None)
36 38
37 @staticmethod 39 @staticmethod
38 def create_result(value): 40 def create_result(value):
39 return MaybeResult(False, value) 41 return MaybeResult(False, value)
40 42
41 43
42 def Worker(fn, work_queue, done_queue, done): 44 def Worker(fn, work_queue, done_queue, done,
45 process_context_fn=None, process_context_args=None):
43 """Worker to be run in a child process. 46 """Worker to be run in a child process.
44 The worker stops on two conditions. 1. When the poison pill "STOP" is 47 The worker stops on two conditions. 1. When the poison pill "STOP" is
45 reached or 2. when the event "done" is set.""" 48 reached or 2. when the event "done" is set."""
46 try: 49 try:
50 kwargs = {}
51 if process_context_fn and process_context_args is not None:
52 kwargs.update(process_context=process_context_fn(*process_context_args))
47 for args in iter(work_queue.get, "STOP"): 53 for args in iter(work_queue.get, "STOP"):
48 if done.is_set(): 54 if done.is_set():
49 break 55 break
50 try: 56 try:
51 done_queue.put(NormalResult(fn(*args))) 57 done_queue.put(NormalResult(fn(*args, **kwargs)))
52 except Exception, e: 58 except Exception, e:
59 traceback.print_exc()
53 print(">>> EXCEPTION: %s" % e) 60 print(">>> EXCEPTION: %s" % e)
54 done_queue.put(ExceptionResult()) 61 done_queue.put(ExceptionResult())
55 except KeyboardInterrupt: 62 except KeyboardInterrupt:
56 done_queue.put(BreakResult()) 63 done_queue.put(BreakResult())
57 64
58 65
59 class Pool(): 66 class Pool():
60 """Distributes tasks to a number of worker processes. 67 """Distributes tasks to a number of worker processes.
61 New tasks can be added dynamically even after the workers have been started. 68 New tasks can be added dynamically even after the workers have been started.
62 Requirement: Tasks can only be added from the parent process, e.g. while 69 Requirement: Tasks can only be added from the parent process, e.g. while
(...skipping 14 matching lines...) Expand all
77 # e.g. when all workers have finished, and when no results are processed. 84 # e.g. when all workers have finished, and when no results are processed.
78 # Count is only accessed by the parent process. Only the parent process is 85 # Count is only accessed by the parent process. Only the parent process is
79 # allowed to remove items from the done_queue and to add items to the 86 # allowed to remove items from the done_queue and to add items to the
80 # work_queue. 87 # work_queue.
81 self.count = 0 88 self.count = 0
82 self.work_queue = Queue() 89 self.work_queue = Queue()
83 self.done_queue = Queue() 90 self.done_queue = Queue()
84 self.done = Event() 91 self.done = Event()
85 self.heartbeat_timeout = heartbeat_timeout 92 self.heartbeat_timeout = heartbeat_timeout
86 93
87 def imap_unordered(self, fn, gen): 94 def imap_unordered(self, fn, gen,
95 process_context_fn=None, process_context_args=None):
88 """Maps function "fn" to items in generator "gen" on the worker processes 96 """Maps function "fn" to items in generator "gen" on the worker processes
89 in an arbitrary order. The items are expected to be lists of arguments to 97 in an arbitrary order. The items are expected to be lists of arguments to
90 the function. Returns a results iterator. A result value of type 98 the function. Returns a results iterator. A result value of type
91 MaybeResult either indicates a heartbeat of the runner, i.e. indicating 99 MaybeResult either indicates a heartbeat of the runner, i.e. indicating
92 that the runner is still waiting for the result to be computed, or it wraps 100 that the runner is still waiting for the result to be computed, or it wraps
93 the real result.""" 101 the real result.
102
103 Args:
104 process_context_fn: Function executed once by each worker. Expected to
105 return a process-context object. If present, this object is passed
106 as additional argument to each call to fn.
107 process_context_args: List of arguments for the invocation of
108 process_context_fn. All arguments will be pickled and sent beyond the
109 process boundary.
110 """
94 try: 111 try:
95 gen = iter(gen) 112 gen = iter(gen)
96 self.advance = self._advance_more 113 self.advance = self._advance_more
97 114
98 for w in xrange(self.num_workers): 115 for w in xrange(self.num_workers):
99 p = Process(target=Worker, args=(fn, 116 p = Process(target=Worker, args=(fn,
100 self.work_queue, 117 self.work_queue,
101 self.done_queue, 118 self.done_queue,
102 self.done)) 119 self.done,
120 process_context_fn,
121 process_context_args))
103 self.processes.append(p) 122 self.processes.append(p)
104 p.start() 123 p.start()
105 124
106 self.advance(gen) 125 self.advance(gen)
107 while self.count > 0: 126 while self.count > 0:
108 while True: 127 while True:
109 try: 128 try:
110 result = self.done_queue.get(timeout=self.heartbeat_timeout) 129 result = self.done_queue.get(timeout=self.heartbeat_timeout)
111 break 130 break
112 except Empty: 131 except Empty:
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 182
164 # Drain the queues to prevent failures when queues are garbage collected. 183 # Drain the queues to prevent failures when queues are garbage collected.
165 try: 184 try:
166 while True: self.work_queue.get(False) 185 while True: self.work_queue.get(False)
167 except: 186 except:
168 pass 187 pass
169 try: 188 try:
170 while True: self.done_queue.get(False) 189 while True: self.done_queue.get(False)
171 except: 190 except:
172 pass 191 pass
OLDNEW
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/progress.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698