Index: tools/testrunner/local/pool.py |
diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py |
index 602a2d4b3097b7af0bfd331d6c67b013178e1f01..b933f735e55e9ea5231dd3c5dd0b620a0330e3ba 100644 |
--- a/tools/testrunner/local/pool.py |
+++ b/tools/testrunner/local/pool.py |
@@ -3,6 +3,7 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
+from Queue import Empty |
from multiprocessing import Event, Process, Queue |
class NormalResult(): |
@@ -24,6 +25,20 @@ class BreakResult(): |
self.break_now = True |
+class MaybeResult(): |
+ def __init__(self, heartbeat, value): |
+ self.heartbeat = heartbeat |
+ self.value = value |
+ |
+ @staticmethod |
+ def create_heartbeat(): |
+ return MaybeResult(True, None) |
+ |
+ @staticmethod |
+ def create_result(value): |
+ return MaybeResult(False, value) |
+ |
+ |
def Worker(fn, work_queue, done_queue, done): |
"""Worker to be run in a child process. |
The worker stops on two conditions. 1. When the poison pill "STOP" is |
@@ -51,7 +66,7 @@ class Pool(): |
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens. |
BUFFER_FACTOR = 4 |
- def __init__(self, num_workers): |
+ def __init__(self, num_workers, heartbeat_timeout=30): |
self.num_workers = num_workers |
self.processes = [] |
self.terminated = False |
@@ -67,11 +82,15 @@ class Pool(): |
self.work_queue = Queue() |
self.done_queue = Queue() |
self.done = Event() |
+ self.heartbeat_timeout = heartbeat_timeout |
def imap_unordered(self, fn, gen): |
"""Maps function "fn" to items in generator "gen" on the worker processes |
in an arbitrary order. The items are expected to be lists of arguments to |
- the function. Returns a results iterator.""" |
+ the function. Returns a results iterator. A result value of type |
+ MaybeResult either indicates a heartbeat of the runner, i.e. indicating |
+ that the runner is still waiting for the result to be computed, or it wraps |
+ the real result.""" |
try: |
gen = iter(gen) |
self.advance = self._advance_more |
@@ -86,7 +105,14 @@ class Pool(): |
self.advance(gen) |
while self.count > 0: |
- result = self.done_queue.get() |
+ while True: |
+ try: |
+ result = self.done_queue.get(timeout=self.heartbeat_timeout) |
+ break |
+ except Empty: |
+ # Indicate a heartbeat. The iterator will continue fetching the |
+ # next result. |
+ yield MaybeResult.create_heartbeat() |
self.count -= 1 |
if result.exception: |
# Ignore items with unexpected exceptions. |
@@ -95,7 +121,7 @@ class Pool(): |
# A keyboard interrupt happened in one of the worker processes. |
raise KeyboardInterrupt |
else: |
- yield result.result |
+ yield MaybeResult.create_result(result.result) |
self.advance(gen) |
finally: |
self.terminate() |