| Index: commit-queue/threadpool.py
|
| ===================================================================
|
| --- commit-queue/threadpool.py (revision 249146)
|
| +++ commit-queue/threadpool.py (working copy)
|
| @@ -1,71 +0,0 @@
|
| -# coding=utf8
|
| -# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Thread pool with task queues to make work items asynchronous."""
|
| -
|
| -import logging
|
| -import Queue
|
| -import threading
|
| -import sys
|
| -
|
| -
|
| -class ThreadPool(object):
|
| - def __init__(self, num_threads):
|
| - self._tasks = Queue.Queue()
|
| - self._lock = threading.Lock()
|
| - self._outputs = []
|
| - self._exceptions = []
|
| - self._workers = [
|
| - threading.Thread(target=self._run, name='worker-%d' % i)
|
| - for i in range(num_threads)
|
| - ]
|
| - for w in self._workers:
|
| - w.daemon = True
|
| - w.start()
|
| -
|
| - def add_task(self, function, *args, **kwargs):
|
| - self._tasks.put((function, args, kwargs))
|
| -
|
| - def join(self):
|
| - """Extracts all the results from each threads unordered."""
|
| - self._tasks.join()
|
| - with self._lock:
|
| - # Look for exceptions.
|
| - if self._exceptions:
|
| - exception = self._exceptions.pop(0)
|
| - raise exception[0], exception[1], exception[2]
|
| - out = self._outputs
|
| - self._outputs = []
|
| - return out
|
| -
|
| - def close(self):
|
| - """Closes all the threads."""
|
| - for _ in range(len(self._workers)):
|
| - # Enqueueing None causes the worker to stop.
|
| - self._tasks.put(None)
|
| - for t in self._workers:
|
| - t.join()
|
| -
|
| - def _run(self):
|
| - """Runs until a None task is queued."""
|
| - while True:
|
| - task = self._tasks.get()
|
| - if task is None:
|
| - # We're done.
|
| - return
|
| - try:
|
| - # The first item is the index.
|
| - func, args, kwargs = task
|
| - self._outputs.append(func(*args, **kwargs))
|
| - except Exception, e:
|
| - task_str = "<unserializable>"
|
| - try:
|
| - task_str = repr(task)
|
| - except: # pylint: disable=W0702
|
| - pass
|
| - logging.error('Caught exception while running %s! %s' % (task_str, e))
|
| - self._exceptions.append(sys.exc_info())
|
| - finally:
|
| - self._tasks.task_done()
|
|
|