Index: tools/testrunner/local/pool.py |
diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py |
index 602a2d4b3097b7af0bfd331d6c67b013178e1f01..f90a450261b6350bea1bd3ddc4e82c0c53b6c6f3 100644 |
--- a/tools/testrunner/local/pool.py |
+++ b/tools/testrunner/local/pool.py |
@@ -3,7 +3,9 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
+from Queue import Empty |
from multiprocessing import Event, Process, Queue |
+import sys |
class NormalResult(): |
def __init__(self, result): |
@@ -24,6 +26,17 @@ class BreakResult(): |
self.break_now = True |
+class Heartbeat(): |
Jakob Kummerow
2015/04/08 09:00:34
I'm not too happy with this kind of polymorphism.
|
+ def __init__(self): |
+ self.heartbeat = True |
+ |
+ |
+class PoolResult(): |
+ def __init__(self, value): |
+ self.heartbeat = False |
+ self.value = value |
+ |
+ |
def Worker(fn, work_queue, done_queue, done): |
"""Worker to be run in a child process. |
The worker stops on two conditions. 1. When the poison pill "STOP" is |
@@ -51,7 +64,7 @@ class Pool(): |
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens. |
BUFFER_FACTOR = 4 |
- def __init__(self, num_workers): |
+ def __init__(self, num_workers, heartbeat_timeout=30): |
self.num_workers = num_workers |
self.processes = [] |
self.terminated = False |
@@ -67,11 +80,15 @@ class Pool(): |
self.work_queue = Queue() |
self.done_queue = Queue() |
self.done = Event() |
+ self.heartbeat_timeout = heartbeat_timeout |
def imap_unordered(self, fn, gen): |
"""Maps function "fn" to items in generator "gen" on the worker processes |
in an arbitrary order. The items are expected to be lists of arguments to |
- the function. Returns a results iterator.""" |
+ the function. Returns a results iterator. A result value can either be a |
+ hearbeat of the runner, i.e. indicating that the runner is still waiting |
Jakob Kummerow
2015/04/08 09:00:34
nit: "heartbeat"
|
+ for the result to be computed, or it can be a result wrapper of type |
+ PoolResult.""" |
try: |
gen = iter(gen) |
self.advance = self._advance_more |
@@ -86,7 +103,14 @@ class Pool(): |
self.advance(gen) |
while self.count > 0: |
- result = self.done_queue.get() |
+ while True: |
+ try: |
+ result = self.done_queue.get(timeout=self.heartbeat_timeout) |
+ break |
+ except Empty: |
+ # Indicate a heartbeat. The iterater will continue fetching the |
Jakob Kummerow
2015/04/08 09:00:34
nit: "iterator"
|
+ # next result. |
+ yield Heartbeat() |
self.count -= 1 |
if result.exception: |
# Ignore items with unexpected exceptions. |
@@ -95,7 +119,7 @@ class Pool(): |
# A keyboard interrupt happened in one of the worker processes. |
raise KeyboardInterrupt |
else: |
- yield result.result |
+ yield PoolResult(result.result) |
self.advance(gen) |
finally: |
self.terminate() |