Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: tools/testrunner/local/pool.py

Issue 1064043002: Make test runner more chatty to avoid it getting killed by buildbot. (Closed) Base URL: https://chromium.googlesource.com/v8/v8.git@master
Patch Set: ... Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2014 the V8 project authors. All rights reserved. 2 # Copyright 2014 the V8 project authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 from Queue import Empty
6 from multiprocessing import Event, Process, Queue 7 from multiprocessing import Event, Process, Queue
7 8
8 class NormalResult(): 9 class NormalResult():
9 def __init__(self, result): 10 def __init__(self, result):
10 self.result = result 11 self.result = result
11 self.exception = False 12 self.exception = False
12 self.break_now = False 13 self.break_now = False
13 14
14 15
15 class ExceptionResult(): 16 class ExceptionResult():
16 def __init__(self): 17 def __init__(self):
17 self.exception = True 18 self.exception = True
18 self.break_now = False 19 self.break_now = False
19 20
20 21
21 class BreakResult(): 22 class BreakResult():
22 def __init__(self): 23 def __init__(self):
23 self.exception = False 24 self.exception = False
24 self.break_now = True 25 self.break_now = True
25 26
26 27
28 class MaybeResult():
29 def __init__(self, heartbeat, value):
30 self.heartbeat = heartbeat
31 self.value = value
32
33 @staticmethod
34 def create_heartbeat():
35 return MaybeResult(True, None)
36
37 @staticmethod
38 def create_result(value):
39 return MaybeResult(False, value)
40
41
27 def Worker(fn, work_queue, done_queue, done): 42 def Worker(fn, work_queue, done_queue, done):
28 """Worker to be run in a child process. 43 """Worker to be run in a child process.
29 The worker stops on two conditions. 1. When the poison pill "STOP" is 44 The worker stops on two conditions. 1. When the poison pill "STOP" is
30 reached or 2. when the event "done" is set.""" 45 reached or 2. when the event "done" is set."""
31 try: 46 try:
32 for args in iter(work_queue.get, "STOP"): 47 for args in iter(work_queue.get, "STOP"):
33 if done.is_set(): 48 if done.is_set():
34 break 49 break
35 try: 50 try:
36 done_queue.put(NormalResult(fn(*args))) 51 done_queue.put(NormalResult(fn(*args)))
37 except Exception, e: 52 except Exception, e:
38 print(">>> EXCEPTION: %s" % e) 53 print(">>> EXCEPTION: %s" % e)
39 done_queue.put(ExceptionResult()) 54 done_queue.put(ExceptionResult())
40 except KeyboardInterrupt: 55 except KeyboardInterrupt:
41 done_queue.put(BreakResult()) 56 done_queue.put(BreakResult())
42 57
43 58
44 class Pool(): 59 class Pool():
45 """Distributes tasks to a number of worker processes. 60 """Distributes tasks to a number of worker processes.
46 New tasks can be added dynamically even after the workers have been started. 61 New tasks can be added dynamically even after the workers have been started.
47 Requirement: Tasks can only be added from the parent process, e.g. while 62 Requirement: Tasks can only be added from the parent process, e.g. while
48 consuming the results generator.""" 63 consuming the results generator."""
49 64
50 # Factor to calculate the maximum number of items in the work/done queue. 65 # Factor to calculate the maximum number of items in the work/done queue.
51 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 66 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
52 BUFFER_FACTOR = 4 67 BUFFER_FACTOR = 4
53 68
54 def __init__(self, num_workers): 69 def __init__(self, num_workers, heartbeat_timeout=30):
55 self.num_workers = num_workers 70 self.num_workers = num_workers
56 self.processes = [] 71 self.processes = []
57 self.terminated = False 72 self.terminated = False
58 73
59 # Invariant: count >= #work_queue + #done_queue. It is greater when a 74 # Invariant: count >= #work_queue + #done_queue. It is greater when a
60 # worker takes an item from the work_queue and before the result is 75 # worker takes an item from the work_queue and before the result is
61 # submitted to the done_queue. It is equal when no worker is working, 76 # submitted to the done_queue. It is equal when no worker is working,
62 # e.g. when all workers have finished, and when no results are processed. 77 # e.g. when all workers have finished, and when no results are processed.
63 # Count is only accessed by the parent process. Only the parent process is 78 # Count is only accessed by the parent process. Only the parent process is
64 # allowed to remove items from the done_queue and to add items to the 79 # allowed to remove items from the done_queue and to add items to the
65 # work_queue. 80 # work_queue.
66 self.count = 0 81 self.count = 0
67 self.work_queue = Queue() 82 self.work_queue = Queue()
68 self.done_queue = Queue() 83 self.done_queue = Queue()
69 self.done = Event() 84 self.done = Event()
85 self.heartbeat_timeout = heartbeat_timeout
70 86
71 def imap_unordered(self, fn, gen): 87 def imap_unordered(self, fn, gen):
72 """Maps function "fn" to items in generator "gen" on the worker processes 88 """Maps function "fn" to items in generator "gen" on the worker processes
73 in an arbitrary order. The items are expected to be lists of arguments to 89 in an arbitrary order. The items are expected to be lists of arguments to
74 the function. Returns a results iterator.""" 90 the function. Returns a results iterator. A result value of type
91 MaybeResult either indicates a heartbeat of the runner, i.e. indicating
92 that the runner is still waiting for the result to be computed, or it wraps
93 the real result."""
75 try: 94 try:
76 gen = iter(gen) 95 gen = iter(gen)
77 self.advance = self._advance_more 96 self.advance = self._advance_more
78 97
79 for w in xrange(self.num_workers): 98 for w in xrange(self.num_workers):
80 p = Process(target=Worker, args=(fn, 99 p = Process(target=Worker, args=(fn,
81 self.work_queue, 100 self.work_queue,
82 self.done_queue, 101 self.done_queue,
83 self.done)) 102 self.done))
84 self.processes.append(p) 103 self.processes.append(p)
85 p.start() 104 p.start()
86 105
87 self.advance(gen) 106 self.advance(gen)
88 while self.count > 0: 107 while self.count > 0:
89 result = self.done_queue.get() 108 while True:
109 try:
110 result = self.done_queue.get(timeout=self.heartbeat_timeout)
111 break
112 except Empty:
113 # Indicate a heartbeat. The iterator will continue fetching the
114 # next result.
115 yield MaybeResult.create_heartbeat()
90 self.count -= 1 116 self.count -= 1
91 if result.exception: 117 if result.exception:
92 # Ignore items with unexpected exceptions. 118 # Ignore items with unexpected exceptions.
93 continue 119 continue
94 elif result.break_now: 120 elif result.break_now:
95 # A keyboard interrupt happened in one of the worker processes. 121 # A keyboard interrupt happened in one of the worker processes.
96 raise KeyboardInterrupt 122 raise KeyboardInterrupt
97 else: 123 else:
98 yield result.result 124 yield MaybeResult.create_result(result.result)
99 self.advance(gen) 125 self.advance(gen)
100 finally: 126 finally:
101 self.terminate() 127 self.terminate()
102 128
103 def _advance_more(self, gen): 129 def _advance_more(self, gen):
104 while self.count < self.num_workers * self.BUFFER_FACTOR: 130 while self.count < self.num_workers * self.BUFFER_FACTOR:
105 try: 131 try:
106 self.work_queue.put(gen.next()) 132 self.work_queue.put(gen.next())
107 self.count += 1 133 self.count += 1
108 except StopIteration: 134 except StopIteration:
(...skipping 28 matching lines...) Expand all
137 163
138 # Drain the queues to prevent failures when queues are garbage collected. 164 # Drain the queues to prevent failures when queues are garbage collected.
139 try: 165 try:
140 while True: self.work_queue.get(False) 166 while True: self.work_queue.get(False)
141 except: 167 except:
142 pass 168 pass
143 try: 169 try:
144 while True: self.done_queue.get(False) 170 while True: self.done_queue.get(False)
145 except: 171 except:
146 pass 172 pass
OLDNEW
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698