| Index: commit-queue/threadpool.py
 | 
| ===================================================================
 | 
| --- commit-queue/threadpool.py	(revision 249146)
 | 
| +++ commit-queue/threadpool.py	(working copy)
 | 
| @@ -1,71 +0,0 @@
 | 
| -# coding=utf8
 | 
| -# Copyright (c) 2012 The Chromium Authors. All rights reserved.
 | 
| -# Use of this source code is governed by a BSD-style license that can be
 | 
| -# found in the LICENSE file.
 | 
| -
 | 
| -"""Thread pool with task queues to make work items asynchronous."""
 | 
| -
 | 
| -import logging
 | 
| -import Queue
 | 
| -import threading
 | 
| -import sys
 | 
| -
 | 
| -
 | 
| -class ThreadPool(object):
 | 
| -  def __init__(self, num_threads):
 | 
| -    self._tasks = Queue.Queue()
 | 
| -    self._lock = threading.Lock()
 | 
| -    self._outputs = []
 | 
| -    self._exceptions = []
 | 
| -    self._workers = [
 | 
| -      threading.Thread(target=self._run, name='worker-%d' % i)
 | 
| -      for i in range(num_threads)
 | 
| -    ]
 | 
| -    for w in self._workers:
 | 
| -      w.daemon = True
 | 
| -      w.start()
 | 
| -
 | 
| -  def add_task(self, function, *args, **kwargs):
 | 
| -    self._tasks.put((function, args, kwargs))
 | 
| -
 | 
| -  def join(self):
 | 
| -    """Extracts all the results from each threads unordered."""
 | 
| -    self._tasks.join()
 | 
| -    with self._lock:
 | 
| -      # Look for exceptions.
 | 
| -      if self._exceptions:
 | 
| -        exception = self._exceptions.pop(0)
 | 
| -        raise exception[0], exception[1], exception[2]
 | 
| -      out = self._outputs
 | 
| -      self._outputs = []
 | 
| -    return out
 | 
| -
 | 
| -  def close(self):
 | 
| -    """Closes all the threads."""
 | 
| -    for _ in range(len(self._workers)):
 | 
| -      # Enqueueing None causes the worker to stop.
 | 
| -      self._tasks.put(None)
 | 
| -    for t in self._workers:
 | 
| -      t.join()
 | 
| -
 | 
| -  def _run(self):
 | 
| -    """Runs until a None task is queued."""
 | 
| -    while True:
 | 
| -      task = self._tasks.get()
 | 
| -      if task is None:
 | 
| -        # We're done.
 | 
| -        return
 | 
| -      try:
 | 
| -        # The first item is the index.
 | 
| -        func, args, kwargs = task
 | 
| -        self._outputs.append(func(*args, **kwargs))
 | 
| -      except Exception, e:
 | 
| -        task_str = "<unserializable>"
 | 
| -        try:
 | 
| -          task_str = repr(task)
 | 
| -        except:  # pylint: disable=W0702
 | 
| -          pass
 | 
| -        logging.error('Caught exception while running %s! %s' % (task_str, e))
 | 
| -        self._exceptions.append(sys.exc_info())
 | 
| -      finally:
 | 
| -        self._tasks.task_done()
 | 
| 
 |