| Index: tools/testrunner/local/pool.py
|
| diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py
|
| index 602a2d4b3097b7af0bfd331d6c67b013178e1f01..b933f735e55e9ea5231dd3c5dd0b620a0330e3ba 100644
|
| --- a/tools/testrunner/local/pool.py
|
| +++ b/tools/testrunner/local/pool.py
|
| @@ -3,6 +3,7 @@
|
| # Use of this source code is governed by a BSD-style license that can be
|
| # found in the LICENSE file.
|
|
|
| +from Queue import Empty
|
| from multiprocessing import Event, Process, Queue
|
|
|
| class NormalResult():
|
| @@ -24,6 +25,20 @@ class BreakResult():
|
| self.break_now = True
|
|
|
|
|
| +class MaybeResult():
|
| + def __init__(self, heartbeat, value):
|
| + self.heartbeat = heartbeat
|
| + self.value = value
|
| +
|
| + @staticmethod
|
| + def create_heartbeat():
|
| + return MaybeResult(True, None)
|
| +
|
| + @staticmethod
|
| + def create_result(value):
|
| + return MaybeResult(False, value)
|
| +
|
| +
|
| def Worker(fn, work_queue, done_queue, done):
|
| """Worker to be run in a child process.
|
| The worker stops on two conditions. 1. When the poison pill "STOP" is
|
| @@ -51,7 +66,7 @@ class Pool():
|
| # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
|
| BUFFER_FACTOR = 4
|
|
|
| - def __init__(self, num_workers):
|
| + def __init__(self, num_workers, heartbeat_timeout=30):
|
| self.num_workers = num_workers
|
| self.processes = []
|
| self.terminated = False
|
| @@ -67,11 +82,15 @@ class Pool():
|
| self.work_queue = Queue()
|
| self.done_queue = Queue()
|
| self.done = Event()
|
| + self.heartbeat_timeout = heartbeat_timeout
|
|
|
| def imap_unordered(self, fn, gen):
|
| """Maps function "fn" to items in generator "gen" on the worker processes
|
| in an arbitrary order. The items are expected to be lists of arguments to
|
| - the function. Returns a results iterator."""
|
| + the function. Returns a results iterator. A result value of type
|
| + MaybeResult either indicates a heartbeat of the runner, i.e. indicating
|
| + that the runner is still waiting for the result to be computed, or it wraps
|
| + the real result."""
|
| try:
|
| gen = iter(gen)
|
| self.advance = self._advance_more
|
| @@ -86,7 +105,14 @@ class Pool():
|
|
|
| self.advance(gen)
|
| while self.count > 0:
|
| - result = self.done_queue.get()
|
| + while True:
|
| + try:
|
| + result = self.done_queue.get(timeout=self.heartbeat_timeout)
|
| + break
|
| + except Empty:
|
| + # Indicate a heartbeat. The iterator will continue fetching the
|
| + # next result.
|
| + yield MaybeResult.create_heartbeat()
|
| self.count -= 1
|
| if result.exception:
|
| # Ignore items with unexpected exceptions.
|
| @@ -95,7 +121,7 @@ class Pool():
|
| # A keyboard interrupt happened in one of the worker processes.
|
| raise KeyboardInterrupt
|
| else:
|
| - yield result.result
|
| + yield MaybeResult.create_result(result.result)
|
| self.advance(gen)
|
| finally:
|
| self.terminate()
|
|
|