OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2014 the V8 project authors. All rights reserved. | 2 # Copyright 2014 the V8 project authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 from Queue import Empty | 6 from Queue import Empty |
7 from multiprocessing import Event, Process, Queue | 7 from multiprocessing import Event, Process, Queue |
8 import traceback | 8 import traceback |
9 | 9 |
10 | 10 |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
102 | 102 |
103 Args: | 103 Args: |
104 process_context_fn: Function executed once by each worker. Expected to | 104 process_context_fn: Function executed once by each worker. Expected to |
105 return a process-context object. If present, this object is passed | 105 return a process-context object. If present, this object is passed |
106 as additional argument to each call to fn. | 106 as additional argument to each call to fn. |
107 process_context_args: List of arguments for the invocation of | 107 process_context_args: List of arguments for the invocation of |
108 process_context_fn. All arguments will be pickled and sent beyond the | 108 process_context_fn. All arguments will be pickled and sent beyond the |
109 process boundary. | 109 process boundary. |
110 """ | 110 """ |
111 try: | 111 try: |
| 112 internal_error = False |
112 gen = iter(gen) | 113 gen = iter(gen) |
113 self.advance = self._advance_more | 114 self.advance = self._advance_more |
114 | 115 |
115 for w in xrange(self.num_workers): | 116 for w in xrange(self.num_workers): |
116 p = Process(target=Worker, args=(fn, | 117 p = Process(target=Worker, args=(fn, |
117 self.work_queue, | 118 self.work_queue, |
118 self.done_queue, | 119 self.done_queue, |
119 self.done, | 120 self.done, |
120 process_context_fn, | 121 process_context_fn, |
121 process_context_args)) | 122 process_context_args)) |
122 self.processes.append(p) | 123 self.processes.append(p) |
123 p.start() | 124 p.start() |
124 | 125 |
125 self.advance(gen) | 126 self.advance(gen) |
126 while self.count > 0: | 127 while self.count > 0: |
127 while True: | 128 while True: |
128 try: | 129 try: |
129 result = self.done_queue.get(timeout=self.heartbeat_timeout) | 130 result = self.done_queue.get(timeout=self.heartbeat_timeout) |
130 break | 131 break |
131 except Empty: | 132 except Empty: |
132 # Indicate a heartbeat. The iterator will continue fetching the | 133 # Indicate a heartbeat. The iterator will continue fetching the |
133 # next result. | 134 # next result. |
134 yield MaybeResult.create_heartbeat() | 135 yield MaybeResult.create_heartbeat() |
135 self.count -= 1 | 136 self.count -= 1 |
136 if result.exception: | 137 if result.exception: |
137 # Ignore items with unexpected exceptions. | 138 # TODO(machenbach): Handle a few known types of internal errors |
| 139 # gracefully, e.g. missing test files. |
| 140 internal_error = True |
138 continue | 141 continue |
139 elif result.break_now: | 142 elif result.break_now: |
140 # A keyboard interrupt happened in one of the worker processes. | 143 # A keyboard interrupt happened in one of the worker processes. |
141 raise KeyboardInterrupt | 144 raise KeyboardInterrupt |
142 else: | 145 else: |
143 yield MaybeResult.create_result(result.result) | 146 yield MaybeResult.create_result(result.result) |
144 self.advance(gen) | 147 self.advance(gen) |
145 finally: | 148 finally: |
146 self.terminate() | 149 self.terminate() |
| 150 if internal_error: |
| 151 raise Exception("Internal error in a worker process.") |
147 | 152 |
148 def _advance_more(self, gen): | 153 def _advance_more(self, gen): |
149 while self.count < self.num_workers * self.BUFFER_FACTOR: | 154 while self.count < self.num_workers * self.BUFFER_FACTOR: |
150 try: | 155 try: |
151 self.work_queue.put(gen.next()) | 156 self.work_queue.put(gen.next()) |
152 self.count += 1 | 157 self.count += 1 |
153 except StopIteration: | 158 except StopIteration: |
154 self.advance = self._advance_empty | 159 self.advance = self._advance_empty |
155 break | 160 break |
156 | 161 |
(...skipping 25 matching lines...) Expand all Loading... |
182 | 187 |
183 # Drain the queues to prevent failures when queues are garbage collected. | 188 # Drain the queues to prevent failures when queues are garbage collected. |
184 try: | 189 try: |
185 while True: self.work_queue.get(False) | 190 while True: self.work_queue.get(False) |
186 except: | 191 except: |
187 pass | 192 pass |
188 try: | 193 try: |
189 while True: self.done_queue.get(False) | 194 while True: self.done_queue.get(False) |
190 except: | 195 except: |
191 pass | 196 pass |
OLD | NEW |