Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: tools/testrunner/local/pool.py

Issue 1064043002: Make test runner more chatty to avoid it getting killed by buildbot. (Closed) Base URL: https://chromium.googlesource.com/v8/v8.git@master
Patch Set: api change Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2014 the V8 project authors. All rights reserved. 2 # Copyright 2014 the V8 project authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 from Queue import Empty
6 from multiprocessing import Event, Process, Queue 7 from multiprocessing import Event, Process, Queue
8 import sys
7 9
8 class NormalResult(): 10 class NormalResult():
9 def __init__(self, result): 11 def __init__(self, result):
10 self.result = result 12 self.result = result
11 self.exception = False 13 self.exception = False
12 self.break_now = False 14 self.break_now = False
13 15
14 16
15 class ExceptionResult(): 17 class ExceptionResult():
16 def __init__(self): 18 def __init__(self):
17 self.exception = True 19 self.exception = True
18 self.break_now = False 20 self.break_now = False
19 21
20 22
21 class BreakResult(): 23 class BreakResult():
22 def __init__(self): 24 def __init__(self):
23 self.exception = False 25 self.exception = False
24 self.break_now = True 26 self.break_now = True
25 27
26 28
29 class Heartbeat():
Jakob Kummerow 2015/04/08 09:00:34 I'm not too happy with this kind of polymorphism.
30 def __init__(self):
31 self.heartbeat = True
32
33
34 class PoolResult():
35 def __init__(self, value):
36 self.heartbeat = False
37 self.value = value
38
39
27 def Worker(fn, work_queue, done_queue, done): 40 def Worker(fn, work_queue, done_queue, done):
28 """Worker to be run in a child process. 41 """Worker to be run in a child process.
29 The worker stops on two conditions. 1. When the poison pill "STOP" is 42 The worker stops on two conditions. 1. When the poison pill "STOP" is
30 reached or 2. when the event "done" is set.""" 43 reached or 2. when the event "done" is set."""
31 try: 44 try:
32 for args in iter(work_queue.get, "STOP"): 45 for args in iter(work_queue.get, "STOP"):
33 if done.is_set(): 46 if done.is_set():
34 break 47 break
35 try: 48 try:
36 done_queue.put(NormalResult(fn(*args))) 49 done_queue.put(NormalResult(fn(*args)))
37 except Exception, e: 50 except Exception, e:
38 print(">>> EXCEPTION: %s" % e) 51 print(">>> EXCEPTION: %s" % e)
39 done_queue.put(ExceptionResult()) 52 done_queue.put(ExceptionResult())
40 except KeyboardInterrupt: 53 except KeyboardInterrupt:
41 done_queue.put(BreakResult()) 54 done_queue.put(BreakResult())
42 55
43 56
44 class Pool(): 57 class Pool():
45 """Distributes tasks to a number of worker processes. 58 """Distributes tasks to a number of worker processes.
46 New tasks can be added dynamically even after the workers have been started. 59 New tasks can be added dynamically even after the workers have been started.
47 Requirement: Tasks can only be added from the parent process, e.g. while 60 Requirement: Tasks can only be added from the parent process, e.g. while
48 consuming the results generator.""" 61 consuming the results generator."""
49 62
50 # Factor to calculate the maximum number of items in the work/done queue. 63 # Factor to calculate the maximum number of items in the work/done queue.
51 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 64 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
52 BUFFER_FACTOR = 4 65 BUFFER_FACTOR = 4
53 66
54 def __init__(self, num_workers): 67 def __init__(self, num_workers, heartbeat_timeout=30):
55 self.num_workers = num_workers 68 self.num_workers = num_workers
56 self.processes = [] 69 self.processes = []
57 self.terminated = False 70 self.terminated = False
58 71
59 # Invariant: count >= #work_queue + #done_queue. It is greater when a 72 # Invariant: count >= #work_queue + #done_queue. It is greater when a
60 # worker takes an item from the work_queue and before the result is 73 # worker takes an item from the work_queue and before the result is
61 # submitted to the done_queue. It is equal when no worker is working, 74 # submitted to the done_queue. It is equal when no worker is working,
62 # e.g. when all workers have finished, and when no results are processed. 75 # e.g. when all workers have finished, and when no results are processed.
63 # Count is only accessed by the parent process. Only the parent process is 76 # Count is only accessed by the parent process. Only the parent process is
64 # allowed to remove items from the done_queue and to add items to the 77 # allowed to remove items from the done_queue and to add items to the
65 # work_queue. 78 # work_queue.
66 self.count = 0 79 self.count = 0
67 self.work_queue = Queue() 80 self.work_queue = Queue()
68 self.done_queue = Queue() 81 self.done_queue = Queue()
69 self.done = Event() 82 self.done = Event()
83 self.heartbeat_timeout = heartbeat_timeout
70 84
71 def imap_unordered(self, fn, gen): 85 def imap_unordered(self, fn, gen):
72 """Maps function "fn" to items in generator "gen" on the worker processes 86 """Maps function "fn" to items in generator "gen" on the worker processes
73 in an arbitrary order. The items are expected to be lists of arguments to 87 in an arbitrary order. The items are expected to be lists of arguments to
74 the function. Returns a results iterator.""" 88 the function. Returns a results iterator. A result value can either be a
89 hearbeat of the runner, i.e. indicating that the runner is still waiting
Jakob Kummerow 2015/04/08 09:00:34 nit: "heartbeat"
90 for the result to be computed, or it can be a result wrapper of type
91 PoolResult."""
75 try: 92 try:
76 gen = iter(gen) 93 gen = iter(gen)
77 self.advance = self._advance_more 94 self.advance = self._advance_more
78 95
79 for w in xrange(self.num_workers): 96 for w in xrange(self.num_workers):
80 p = Process(target=Worker, args=(fn, 97 p = Process(target=Worker, args=(fn,
81 self.work_queue, 98 self.work_queue,
82 self.done_queue, 99 self.done_queue,
83 self.done)) 100 self.done))
84 self.processes.append(p) 101 self.processes.append(p)
85 p.start() 102 p.start()
86 103
87 self.advance(gen) 104 self.advance(gen)
88 while self.count > 0: 105 while self.count > 0:
89 result = self.done_queue.get() 106 while True:
107 try:
108 result = self.done_queue.get(timeout=self.heartbeat_timeout)
109 break
110 except Empty:
111 # Indicate a heartbeat. The iterater will continue fetching the
Jakob Kummerow 2015/04/08 09:00:34 nit: "iterator"
112 # next result.
113 yield Heartbeat()
90 self.count -= 1 114 self.count -= 1
91 if result.exception: 115 if result.exception:
92 # Ignore items with unexpected exceptions. 116 # Ignore items with unexpected exceptions.
93 continue 117 continue
94 elif result.break_now: 118 elif result.break_now:
95 # A keyboard interrupt happened in one of the worker processes. 119 # A keyboard interrupt happened in one of the worker processes.
96 raise KeyboardInterrupt 120 raise KeyboardInterrupt
97 else: 121 else:
98 yield result.result 122 yield PoolResult(result.result)
99 self.advance(gen) 123 self.advance(gen)
100 finally: 124 finally:
101 self.terminate() 125 self.terminate()
102 126
103 def _advance_more(self, gen): 127 def _advance_more(self, gen):
104 while self.count < self.num_workers * self.BUFFER_FACTOR: 128 while self.count < self.num_workers * self.BUFFER_FACTOR:
105 try: 129 try:
106 self.work_queue.put(gen.next()) 130 self.work_queue.put(gen.next())
107 self.count += 1 131 self.count += 1
108 except StopIteration: 132 except StopIteration:
(...skipping 28 matching lines...) Expand all
137 161
138 # Drain the queues to prevent failures when queues are garbage collected. 162 # Drain the queues to prevent failures when queues are garbage collected.
139 try: 163 try:
140 while True: self.work_queue.get(False) 164 while True: self.work_queue.get(False)
141 except: 165 except:
142 pass 166 pass
143 try: 167 try:
144 while True: self.done_queue.get(False) 168 while True: self.done_queue.get(False)
145 except: 169 except:
146 pass 170 pass
OLDNEW
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698