Index: tools/testrunner/local/pool.py |
diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py |
index b933f735e55e9ea5231dd3c5dd0b620a0330e3ba..6d123fd4e5e7fbd9e79688dc151dc81aac973ddd 100644 |
--- a/tools/testrunner/local/pool.py |
+++ b/tools/testrunner/local/pool.py |
@@ -5,6 +5,8 @@ |
from Queue import Empty |
from multiprocessing import Event, Process, Queue |
+import traceback |
+ |
class NormalResult(): |
def __init__(self, result): |
@@ -39,17 +41,22 @@ class MaybeResult(): |
return MaybeResult(False, value) |
-def Worker(fn, work_queue, done_queue, done): |
+def Worker(fn, work_queue, done_queue, done, |
+ process_context_fn=None, process_context_args=None): |
"""Worker to be run in a child process. |
The worker stops on two conditions. 1. When the poison pill "STOP" is |
reached or 2. when the event "done" is set.""" |
try: |
+ kwargs = {} |
+ if process_context_fn and process_context_args is not None: |
+ kwargs.update(process_context=process_context_fn(*process_context_args)) |
for args in iter(work_queue.get, "STOP"): |
if done.is_set(): |
break |
try: |
- done_queue.put(NormalResult(fn(*args))) |
+ done_queue.put(NormalResult(fn(*args, **kwargs))) |
except Exception, e: |
+ traceback.print_exc() |
print(">>> EXCEPTION: %s" % e) |
done_queue.put(ExceptionResult()) |
except KeyboardInterrupt: |
@@ -84,13 +91,23 @@ class Pool(): |
self.done = Event() |
self.heartbeat_timeout = heartbeat_timeout |
- def imap_unordered(self, fn, gen): |
+ def imap_unordered(self, fn, gen, |
+ process_context_fn=None, process_context_args=None): |
"""Maps function "fn" to items in generator "gen" on the worker processes |
in an arbitrary order. The items are expected to be lists of arguments to |
the function. Returns a results iterator. A result value of type |
MaybeResult either indicates a heartbeat of the runner, i.e. indicating |
that the runner is still waiting for the result to be computed, or it wraps |
- the real result.""" |
+ the real result. |
+ |
+ Args: |
+ process_context_fn: Function executed once by each worker. Expected to |
+ return a process-context object. If present, this object is passed |
+ as additional argument to each call to fn. |
+ process_context_args: List of arguments for the invocation of |
+ process_context_fn. All arguments will be pickled and sent beyond the |
+ process boundary. |
+ """ |
try: |
gen = iter(gen) |
self.advance = self._advance_more |
@@ -99,7 +116,9 @@ class Pool(): |
p = Process(target=Worker, args=(fn, |
self.work_queue, |
self.done_queue, |
- self.done)) |
+ self.done, |
+ process_context_fn, |
+ process_context_args)) |
self.processes.append(p) |
p.start() |