OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2014 the V8 project authors. All rights reserved. | 2 # Copyright 2014 the V8 project authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 from Queue import Empty | 6 from Queue import Empty |
7 from multiprocessing import Event, Process, Queue | 7 from multiprocessing import Event, Process, Queue |
| 8 import traceback |
| 9 |
8 | 10 |
9 class NormalResult(): | 11 class NormalResult(): |
10 def __init__(self, result): | 12 def __init__(self, result): |
11 self.result = result | 13 self.result = result |
12 self.exception = False | 14 self.exception = False |
13 self.break_now = False | 15 self.break_now = False |
14 | 16 |
15 | 17 |
16 class ExceptionResult(): | 18 class ExceptionResult(): |
17 def __init__(self): | 19 def __init__(self): |
(...skipping 14 matching lines...) Expand all Loading... |
32 | 34 |
33 @staticmethod | 35 @staticmethod |
34 def create_heartbeat(): | 36 def create_heartbeat(): |
35 return MaybeResult(True, None) | 37 return MaybeResult(True, None) |
36 | 38 |
37 @staticmethod | 39 @staticmethod |
38 def create_result(value): | 40 def create_result(value): |
39 return MaybeResult(False, value) | 41 return MaybeResult(False, value) |
40 | 42 |
41 | 43 |
42 def Worker(fn, work_queue, done_queue, done): | 44 def Worker(fn, work_queue, done_queue, done, |
| 45 process_context_fn=None, process_context_args=None): |
43 """Worker to be run in a child process. | 46 """Worker to be run in a child process. |
44 The worker stops on two conditions. 1. When the poison pill "STOP" is | 47 The worker stops on two conditions. 1. When the poison pill "STOP" is |
45 reached or 2. when the event "done" is set.""" | 48 reached or 2. when the event "done" is set.""" |
46 try: | 49 try: |
| 50 kwargs = {} |
| 51 if process_context_fn and process_context_args is not None: |
| 52 kwargs.update(process_context=process_context_fn(*process_context_args)) |
47 for args in iter(work_queue.get, "STOP"): | 53 for args in iter(work_queue.get, "STOP"): |
48 if done.is_set(): | 54 if done.is_set(): |
49 break | 55 break |
50 try: | 56 try: |
51 done_queue.put(NormalResult(fn(*args))) | 57 done_queue.put(NormalResult(fn(*args, **kwargs))) |
52 except Exception, e: | 58 except Exception, e: |
| 59 traceback.print_exc() |
53 print(">>> EXCEPTION: %s" % e) | 60 print(">>> EXCEPTION: %s" % e) |
54 done_queue.put(ExceptionResult()) | 61 done_queue.put(ExceptionResult()) |
55 except KeyboardInterrupt: | 62 except KeyboardInterrupt: |
56 done_queue.put(BreakResult()) | 63 done_queue.put(BreakResult()) |
57 | 64 |
58 | 65 |
59 class Pool(): | 66 class Pool(): |
60 """Distributes tasks to a number of worker processes. | 67 """Distributes tasks to a number of worker processes. |
61 New tasks can be added dynamically even after the workers have been started. | 68 New tasks can be added dynamically even after the workers have been started. |
62 Requirement: Tasks can only be added from the parent process, e.g. while | 69 Requirement: Tasks can only be added from the parent process, e.g. while |
(...skipping 14 matching lines...) Expand all Loading... |
77 # e.g. when all workers have finished, and when no results are processed. | 84 # e.g. when all workers have finished, and when no results are processed. |
78 # Count is only accessed by the parent process. Only the parent process is | 85 # Count is only accessed by the parent process. Only the parent process is |
79 # allowed to remove items from the done_queue and to add items to the | 86 # allowed to remove items from the done_queue and to add items to the |
80 # work_queue. | 87 # work_queue. |
81 self.count = 0 | 88 self.count = 0 |
82 self.work_queue = Queue() | 89 self.work_queue = Queue() |
83 self.done_queue = Queue() | 90 self.done_queue = Queue() |
84 self.done = Event() | 91 self.done = Event() |
85 self.heartbeat_timeout = heartbeat_timeout | 92 self.heartbeat_timeout = heartbeat_timeout |
86 | 93 |
87 def imap_unordered(self, fn, gen): | 94 def imap_unordered(self, fn, gen, |
| 95 process_context_fn=None, process_context_args=None): |
88 """Maps function "fn" to items in generator "gen" on the worker processes | 96 """Maps function "fn" to items in generator "gen" on the worker processes |
89 in an arbitrary order. The items are expected to be lists of arguments to | 97 in an arbitrary order. The items are expected to be lists of arguments to |
90 the function. Returns a results iterator. A result value of type | 98 the function. Returns a results iterator. A result value of type |
91 MaybeResult either indicates a heartbeat of the runner, i.e. indicating | 99 MaybeResult either indicates a heartbeat of the runner, i.e. indicating |
92 that the runner is still waiting for the result to be computed, or it wraps | 100 that the runner is still waiting for the result to be computed, or it wraps |
93 the real result.""" | 101 the real result. |
| 102 |
| 103 Args: |
| 104 process_context_fn: Function executed once by each worker. Expected to |
| 105 return a process-context object. If present, this object is passed |
| 106 as additional argument to each call to fn. |
| 107 process_context_args: List of arguments for the invocation of |
| 108 process_context_fn. All arguments will be pickled and sent beyond the |
| 109 process boundary. |
| 110 """ |
94 try: | 111 try: |
95 gen = iter(gen) | 112 gen = iter(gen) |
96 self.advance = self._advance_more | 113 self.advance = self._advance_more |
97 | 114 |
98 for w in xrange(self.num_workers): | 115 for w in xrange(self.num_workers): |
99 p = Process(target=Worker, args=(fn, | 116 p = Process(target=Worker, args=(fn, |
100 self.work_queue, | 117 self.work_queue, |
101 self.done_queue, | 118 self.done_queue, |
102 self.done)) | 119 self.done, |
| 120 process_context_fn, |
| 121 process_context_args)) |
103 self.processes.append(p) | 122 self.processes.append(p) |
104 p.start() | 123 p.start() |
105 | 124 |
106 self.advance(gen) | 125 self.advance(gen) |
107 while self.count > 0: | 126 while self.count > 0: |
108 while True: | 127 while True: |
109 try: | 128 try: |
110 result = self.done_queue.get(timeout=self.heartbeat_timeout) | 129 result = self.done_queue.get(timeout=self.heartbeat_timeout) |
111 break | 130 break |
112 except Empty: | 131 except Empty: |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
163 | 182 |
164 # Drain the queues to prevent failures when queues are garbage collected. | 183 # Drain the queues to prevent failures when queues are garbage collected. |
165 try: | 184 try: |
166 while True: self.work_queue.get(False) | 185 while True: self.work_queue.get(False) |
167 except: | 186 except: |
168 pass | 187 pass |
169 try: | 188 try: |
170 while True: self.done_queue.get(False) | 189 while True: self.done_queue.get(False) |
171 except: | 190 except: |
172 pass | 191 pass |
OLD | NEW |