| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2014 the V8 project authors. All rights reserved. | 2 # Copyright 2014 the V8 project authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 from Queue import Empty | 6 from Queue import Empty |
| 7 from multiprocessing import Event, Process, Queue | 7 from multiprocessing import Event, Process, Queue |
| 8 import traceback | 8 import traceback |
| 9 | 9 |
| 10 | 10 |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 102 | 102 |
| 103 Args: | 103 Args: |
| 104 process_context_fn: Function executed once by each worker. Expected to | 104 process_context_fn: Function executed once by each worker. Expected to |
| 105 return a process-context object. If present, this object is passed | 105 return a process-context object. If present, this object is passed |
| 106 as additional argument to each call to fn. | 106 as additional argument to each call to fn. |
| 107 process_context_args: List of arguments for the invocation of | 107 process_context_args: List of arguments for the invocation of |
| 108 process_context_fn. All arguments will be pickled and sent beyond the | 108 process_context_fn. All arguments will be pickled and sent beyond the |
| 109 process boundary. | 109 process boundary. |
| 110 """ | 110 """ |
| 111 try: | 111 try: |
| 112 internal_error = False |
| 112 gen = iter(gen) | 113 gen = iter(gen) |
| 113 self.advance = self._advance_more | 114 self.advance = self._advance_more |
| 114 | 115 |
| 115 for w in xrange(self.num_workers): | 116 for w in xrange(self.num_workers): |
| 116 p = Process(target=Worker, args=(fn, | 117 p = Process(target=Worker, args=(fn, |
| 117 self.work_queue, | 118 self.work_queue, |
| 118 self.done_queue, | 119 self.done_queue, |
| 119 self.done, | 120 self.done, |
| 120 process_context_fn, | 121 process_context_fn, |
| 121 process_context_args)) | 122 process_context_args)) |
| 122 self.processes.append(p) | 123 self.processes.append(p) |
| 123 p.start() | 124 p.start() |
| 124 | 125 |
| 125 self.advance(gen) | 126 self.advance(gen) |
| 126 while self.count > 0: | 127 while self.count > 0: |
| 127 while True: | 128 while True: |
| 128 try: | 129 try: |
| 129 result = self.done_queue.get(timeout=self.heartbeat_timeout) | 130 result = self.done_queue.get(timeout=self.heartbeat_timeout) |
| 130 break | 131 break |
| 131 except Empty: | 132 except Empty: |
| 132 # Indicate a heartbeat. The iterator will continue fetching the | 133 # Indicate a heartbeat. The iterator will continue fetching the |
| 133 # next result. | 134 # next result. |
| 134 yield MaybeResult.create_heartbeat() | 135 yield MaybeResult.create_heartbeat() |
| 135 self.count -= 1 | 136 self.count -= 1 |
| 136 if result.exception: | 137 if result.exception: |
| 137 # Ignore items with unexpected exceptions. | 138 # TODO(machenbach): Handle a few known types of internal errors |
| 139 # gracefully, e.g. missing test files. |
| 140 internal_error = True |
| 138 continue | 141 continue |
| 139 elif result.break_now: | 142 elif result.break_now: |
| 140 # A keyboard interrupt happened in one of the worker processes. | 143 # A keyboard interrupt happened in one of the worker processes. |
| 141 raise KeyboardInterrupt | 144 raise KeyboardInterrupt |
| 142 else: | 145 else: |
| 143 yield MaybeResult.create_result(result.result) | 146 yield MaybeResult.create_result(result.result) |
| 144 self.advance(gen) | 147 self.advance(gen) |
| 145 finally: | 148 finally: |
| 146 self.terminate() | 149 self.terminate() |
| 150 if internal_error: |
| 151 raise Exception("Internal error in a worker process.") |
| 147 | 152 |
| 148 def _advance_more(self, gen): | 153 def _advance_more(self, gen): |
| 149 while self.count < self.num_workers * self.BUFFER_FACTOR: | 154 while self.count < self.num_workers * self.BUFFER_FACTOR: |
| 150 try: | 155 try: |
| 151 self.work_queue.put(gen.next()) | 156 self.work_queue.put(gen.next()) |
| 152 self.count += 1 | 157 self.count += 1 |
| 153 except StopIteration: | 158 except StopIteration: |
| 154 self.advance = self._advance_empty | 159 self.advance = self._advance_empty |
| 155 break | 160 break |
| 156 | 161 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 182 | 187 |
| 183 # Drain the queues to prevent failures when queues are garbage collected. | 188 # Drain the queues to prevent failures when queues are garbage collected. |
| 184 try: | 189 try: |
| 185 while True: self.work_queue.get(False) | 190 while True: self.work_queue.get(False) |
| 186 except: | 191 except: |
| 187 pass | 192 pass |
| 188 try: | 193 try: |
| 189 while True: self.done_queue.get(False) | 194 while True: self.done_queue.get(False) |
| 190 except: | 195 except: |
| 191 pass | 196 pass |
| OLD | NEW |