Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(216)

Unified Diff: tools/testrunner/local/pool.py

Issue 275093002: Introduce a dynamic process pool for the local test driver (Closed) Base URL: https://v8.googlecode.com/svn/branches/bleeding_edge
Patch Set: Review Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/testrunner/local/pool.py
diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py
new file mode 100644
index 0000000000000000000000000000000000000000..8f629f9e4a3862418665f5f8d4bce0e10a20f879
--- /dev/null
+++ b/tools/testrunner/local/pool.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# Copyright 2014 the V8 project authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from multiprocessing import Event, Process, Queue
+
+class NormalResult():
+ def __init__(self, result):
+ self.result = result
+ self.exception = False
+ self.break_now = False
+
+
+class ExceptionResult():
+ def __init__(self):
+ self.exception = True
+ self.break_now = False
+
+
+class BreakResult():
+ def __init__(self):
+ self.exception = False
+ self.break_now = True
+
+
+def Worker(fn, work_queue, done_queue, done):
+ """Worker to be run in a child process.
+ The worker stops on two conditions. 1. When the poison pill "STOP" is
+ reached or 2. when the event "done" is set."""
+ try:
+ for args in iter(work_queue.get, "STOP"):
+ if done.is_set():
+ break
+ try:
+ done_queue.put(NormalResult(fn(*args)))
+ except Exception, e:
+ print(">>> EXCEPTION: %s" % e)
+ done_queue.put(ExceptionResult())
+ except KeyboardInterrupt:
+ done_queue.put(BreakResult())
+
+
+class Pool():
+ """Distributes tasks to a number of worker processes.
+ New tasks can be added dynamically even after the workers have been started.
+ Requirement: Tasks can only be added from the parent process, e.g. while
+ consuming the results generator."""
+
+ # Factor to calculate the maximum number of items in the work/done queue.
+ # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
+ BUFFER_FACTOR = 4
+
+ def __init__(self, num_workers):
+ self.num_workers = num_workers
+ self.processes = []
+ self.terminated = False
+
+ # Invariant: count >= #work_queue + #done_queue. It is greater when a
+ # worker takes an item from the work_queue and before the result is
+ # submitted to the done_queue. It is equal when no worker is working,
+ # e.g. when all workers have finished, and when no results are processed.
+ # Count is only accessed by the parent process. Only the parent process is
+ # allowed to remove items from the done_queue and to add items to the
+ # work_queue.
+ self.count = 0
+ self.work_queue = Queue()
+ self.done_queue = Queue()
+ self.done = Event()
+
+ def imap_unordered(self, fn, gen):
+ """Maps function "fn" to items in generator "gen" on the worker processes
+ in an arbitrary order. The items are expected to be lists of arguments to
+ the function. Returns a results iterator."""
+ try:
+ gen = iter(gen)
+ self.advance = self._advance_more
+
+ for w in xrange(self.num_workers):
+ p = Process(target=Worker, args=(fn,
+ self.work_queue,
+ self.done_queue,
+ self.done))
+ self.processes.append(p)
+ p.start()
+
+ self.advance(gen)
+ while self.count > 0:
+ result = self.done_queue.get()
+ self.count -= 1
+ if result.exception:
+ # Ignore items with unexpected exceptions.
+ continue
+ elif result.break_now:
+ # A keyboard interrupt happened in one of the worker processes.
+ raise KeyboardInterrupt
+ else:
+ yield result.result
+ self.advance(gen)
+ finally:
+ self.terminate()
+
+ def _advance_more(self, gen):
+ while self.count < self.num_workers * self.BUFFER_FACTOR:
+ try:
+ self.work_queue.put(gen.next())
+ self.count += 1
+ except StopIteration:
+ self.advance = self._advance_empty
+ break
+
+ def _advance_empty(self, gen):
+ pass
+
+ def add(self, args):
+ """Adds an item to the work queue. Can be called dynamically while
+ processing the results from imap_unordered."""
+ self.work_queue.put(args)
+ self.count += 1
+
+ def terminate(self):
+ if self.terminated:
+ return
+ self.terminated = True
+
+ # For exceptional tear down set the "done" event to stop the workers before
+ # they empty the queue buffer.
+ self.done.set()
+
+ for p in self.processes:
+ # During normal tear down the workers block on get(). Feed a poison pill
+ # per worker to make them stop.
+ self.work_queue.put("STOP")
+
+ for p in self.processes:
+ p.join()
« no previous file with comments | « tools/testrunner/local/execution.py ('k') | tools/testrunner/local/pool_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698