Chromium Code Reviews| Index: tools/testrunner/local/pool.py |
| diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py |
| index 602a2d4b3097b7af0bfd331d6c67b013178e1f01..f90a450261b6350bea1bd3ddc4e82c0c53b6c6f3 100644 |
| --- a/tools/testrunner/local/pool.py |
| +++ b/tools/testrunner/local/pool.py |
| @@ -3,7 +3,9 @@ |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| +from Queue import Empty |
| from multiprocessing import Event, Process, Queue |
| +import sys |
| class NormalResult(): |
| def __init__(self, result): |
| @@ -24,6 +26,17 @@ class BreakResult(): |
| self.break_now = True |
| +class Heartbeat(): |
|
Jakob Kummerow
2015/04/08 09:00:34
I'm not too happy with this kind of polymorphism.
|
| + def __init__(self): |
| + self.heartbeat = True |
| + |
| + |
| +class PoolResult(): |
| + def __init__(self, value): |
| + self.heartbeat = False |
| + self.value = value |
| + |
| + |
| def Worker(fn, work_queue, done_queue, done): |
| """Worker to be run in a child process. |
| The worker stops on two conditions. 1. When the poison pill "STOP" is |
| @@ -51,7 +64,7 @@ class Pool(): |
| # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. |
| BUFFER_FACTOR = 4 |
| - def __init__(self, num_workers): |
| + def __init__(self, num_workers, heartbeat_timeout=30): |
| self.num_workers = num_workers |
| self.processes = [] |
| self.terminated = False |
| @@ -67,11 +80,15 @@ class Pool(): |
| self.work_queue = Queue() |
| self.done_queue = Queue() |
| self.done = Event() |
| + self.heartbeat_timeout = heartbeat_timeout |
| def imap_unordered(self, fn, gen): |
| """Maps function "fn" to items in generator "gen" on the worker processes |
| in an arbitrary order. The items are expected to be lists of arguments to |
| - the function. Returns a results iterator.""" |
| + the function. Returns a results iterator. A result value can either be a |
| + hearbeat of the runner, i.e. indicating that the runner is still waiting |
|
Jakob Kummerow
2015/04/08 09:00:34
nit: "heartbeat"
|
| + for the result to be computed, or it can be a result wrapper of type |
| + PoolResult.""" |
| try: |
| gen = iter(gen) |
| self.advance = self._advance_more |
| @@ -86,7 +103,14 @@ class Pool(): |
| self.advance(gen) |
| while self.count > 0: |
| - result = self.done_queue.get() |
| + while True: |
| + try: |
| + result = self.done_queue.get(timeout=self.heartbeat_timeout) |
| + break |
| + except Empty: |
| + # Indicate a heartbeat. The iterater will continue fetching the |
|
Jakob Kummerow
2015/04/08 09:00:34
nit: "iterator"
|
| + # next result. |
| + yield Heartbeat() |
| self.count -= 1 |
| if result.exception: |
| # Ignore items with unexpected exceptions. |
| @@ -95,7 +119,7 @@ class Pool(): |
| # A keyboard interrupt happened in one of the worker processes. |
| raise KeyboardInterrupt |
| else: |
| - yield result.result |
| + yield PoolResult(result.result) |
| self.advance(gen) |
| finally: |
| self.terminate() |