| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2014 the V8 project authors. All rights reserved. | 2 # Copyright 2014 the V8 project authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 from Queue import Empty | 6 from Queue import Empty |
| 7 from multiprocessing import Event, Process, Queue | 7 from multiprocessing import Event, Process, Queue |
| 8 import traceback |
| 9 |
| 8 | 10 |
| 9 class NormalResult(): | 11 class NormalResult(): |
| 10 def __init__(self, result): | 12 def __init__(self, result): |
| 11 self.result = result | 13 self.result = result |
| 12 self.exception = False | 14 self.exception = False |
| 13 self.break_now = False | 15 self.break_now = False |
| 14 | 16 |
| 15 | 17 |
| 16 class ExceptionResult(): | 18 class ExceptionResult(): |
| 17 def __init__(self): | 19 def __init__(self): |
| (...skipping 14 matching lines...) Expand all Loading... |
| 32 | 34 |
| 33 @staticmethod | 35 @staticmethod |
| 34 def create_heartbeat(): | 36 def create_heartbeat(): |
| 35 return MaybeResult(True, None) | 37 return MaybeResult(True, None) |
| 36 | 38 |
| 37 @staticmethod | 39 @staticmethod |
| 38 def create_result(value): | 40 def create_result(value): |
| 39 return MaybeResult(False, value) | 41 return MaybeResult(False, value) |
| 40 | 42 |
| 41 | 43 |
| 42 def Worker(fn, work_queue, done_queue, done): | 44 def Worker(fn, work_queue, done_queue, done, |
| 45 process_context_fn=None, process_context_args=None): |
| 43 """Worker to be run in a child process. | 46 """Worker to be run in a child process. |
| 44 The worker stops on two conditions. 1. When the poison pill "STOP" is | 47 The worker stops on two conditions. 1. When the poison pill "STOP" is |
| 45 reached or 2. when the event "done" is set.""" | 48 reached or 2. when the event "done" is set.""" |
| 46 try: | 49 try: |
| 50 kwargs = {} |
| 51 if process_context_fn and process_context_args is not None: |
| 52 kwargs.update(process_context=process_context_fn(*process_context_args)) |
| 47 for args in iter(work_queue.get, "STOP"): | 53 for args in iter(work_queue.get, "STOP"): |
| 48 if done.is_set(): | 54 if done.is_set(): |
| 49 break | 55 break |
| 50 try: | 56 try: |
| 51 done_queue.put(NormalResult(fn(*args))) | 57 done_queue.put(NormalResult(fn(*args, **kwargs))) |
| 52 except Exception, e: | 58 except Exception, e: |
| 59 traceback.print_exc() |
| 53 print(">>> EXCEPTION: %s" % e) | 60 print(">>> EXCEPTION: %s" % e) |
| 54 done_queue.put(ExceptionResult()) | 61 done_queue.put(ExceptionResult()) |
| 55 except KeyboardInterrupt: | 62 except KeyboardInterrupt: |
| 56 done_queue.put(BreakResult()) | 63 done_queue.put(BreakResult()) |
| 57 | 64 |
| 58 | 65 |
| 59 class Pool(): | 66 class Pool(): |
| 60 """Distributes tasks to a number of worker processes. | 67 """Distributes tasks to a number of worker processes. |
| 61 New tasks can be added dynamically even after the workers have been started. | 68 New tasks can be added dynamically even after the workers have been started. |
| 62 Requirement: Tasks can only be added from the parent process, e.g. while | 69 Requirement: Tasks can only be added from the parent process, e.g. while |
| (...skipping 14 matching lines...) Expand all Loading... |
| 77 # e.g. when all workers have finished, and when no results are processed. | 84 # e.g. when all workers have finished, and when no results are processed. |
| 78 # Count is only accessed by the parent process. Only the parent process is | 85 # Count is only accessed by the parent process. Only the parent process is |
| 79 # allowed to remove items from the done_queue and to add items to the | 86 # allowed to remove items from the done_queue and to add items to the |
| 80 # work_queue. | 87 # work_queue. |
| 81 self.count = 0 | 88 self.count = 0 |
| 82 self.work_queue = Queue() | 89 self.work_queue = Queue() |
| 83 self.done_queue = Queue() | 90 self.done_queue = Queue() |
| 84 self.done = Event() | 91 self.done = Event() |
| 85 self.heartbeat_timeout = heartbeat_timeout | 92 self.heartbeat_timeout = heartbeat_timeout |
| 86 | 93 |
| 87 def imap_unordered(self, fn, gen): | 94 def imap_unordered(self, fn, gen, |
| 95 process_context_fn=None, process_context_args=None): |
| 88 """Maps function "fn" to items in generator "gen" on the worker processes | 96 """Maps function "fn" to items in generator "gen" on the worker processes |
| 89 in an arbitrary order. The items are expected to be lists of arguments to | 97 in an arbitrary order. The items are expected to be lists of arguments to |
| 90 the function. Returns a results iterator. A result value of type | 98 the function. Returns a results iterator. A result value of type |
| 91 MaybeResult either indicates a heartbeat of the runner, i.e. indicating | 99 MaybeResult either indicates a heartbeat of the runner, i.e. indicating |
| 92 that the runner is still waiting for the result to be computed, or it wraps | 100 that the runner is still waiting for the result to be computed, or it wraps |
| 93 the real result.""" | 101 the real result. |
| 102 |
| 103 Args: |
| 104 process_context_fn: Function executed once by each worker. Expected to |
| 105 return a process-context object. If present, this object is passed |
| 106 as additional argument to each call to fn. |
| 107 process_context_args: List of arguments for the invocation of |
| 108 process_context_fn. All arguments will be pickled and sent beyond the |
| 109 process boundary. |
| 110 """ |
| 94 try: | 111 try: |
| 95 gen = iter(gen) | 112 gen = iter(gen) |
| 96 self.advance = self._advance_more | 113 self.advance = self._advance_more |
| 97 | 114 |
| 98 for w in xrange(self.num_workers): | 115 for w in xrange(self.num_workers): |
| 99 p = Process(target=Worker, args=(fn, | 116 p = Process(target=Worker, args=(fn, |
| 100 self.work_queue, | 117 self.work_queue, |
| 101 self.done_queue, | 118 self.done_queue, |
| 102 self.done)) | 119 self.done, |
| 120 process_context_fn, |
| 121 process_context_args)) |
| 103 self.processes.append(p) | 122 self.processes.append(p) |
| 104 p.start() | 123 p.start() |
| 105 | 124 |
| 106 self.advance(gen) | 125 self.advance(gen) |
| 107 while self.count > 0: | 126 while self.count > 0: |
| 108 while True: | 127 while True: |
| 109 try: | 128 try: |
| 110 result = self.done_queue.get(timeout=self.heartbeat_timeout) | 129 result = self.done_queue.get(timeout=self.heartbeat_timeout) |
| 111 break | 130 break |
| 112 except Empty: | 131 except Empty: |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 163 | 182 |
| 164 # Drain the queues to prevent failures when queues are garbage collected. | 183 # Drain the queues to prevent failures when queues are garbage collected. |
| 165 try: | 184 try: |
| 166 while True: self.work_queue.get(False) | 185 while True: self.work_queue.get(False) |
| 167 except: | 186 except: |
| 168 pass | 187 pass |
| 169 try: | 188 try: |
| 170 while True: self.done_queue.get(False) | 189 while True: self.done_queue.get(False) |
| 171 except: | 190 except: |
| 172 pass | 191 pass |
| OLD | NEW |