Index: gclient_utils.py |
diff --git a/gclient_utils.py b/gclient_utils.py |
index 54910ce45a2cfd482955e1cad31c70ec6a846d18..6bb555c28c01c6ac76a22c1661fe5f3b838e6d4b 100644 |
--- a/gclient_utils.py |
+++ b/gclient_utils.py |
@@ -17,11 +17,15 @@ |
import errno |
import logging |
import os |
+from third_party.repo.progress import Progress |
+import Queue |
import re |
import stat |
import subprocess |
import sys |
import time |
+import threading |
+import traceback |
import xml.dom.minidom |
import xml.parsers.expat |
@@ -371,3 +375,139 @@ def GetGClientRootAndEntries(path=None): |
execfile(config_path, env) |
config_dir = os.path.dirname(config_path) |
return config_dir, env['entries'] |
+ |
+ |
+class ThreadPool: |
+ """A thread pool class that lets one schedule jobs on many worker threads.""" |
+ |
+ def __init__(self, threads=1): |
+ self._threads = threads |
+ self._queue = Queue.Queue() |
+ self._jobs_left = 0 |
+ self._condition = threading.Condition() |
+ self._workers = [] |
+ |
+ class Worker(threading.Thread): |
+ """Internal worker class that executes jobs from the ThreadPool queue.""" |
+ |
+ def __init__(self, pool): |
+ threading.Thread.__init__(self) |
+ self.setDaemon(True) |
+ self._pool = pool |
+ self._done = False |
+ self.exceptions = [] |
+ |
+ def Done(self): |
+ """Terminates the worker threads.""" |
+ self._done = True |
+ |
+ def run(self): |
+ """Executes jobs from the pool's queue.""" |
+ while not self._done: |
+ f = self._pool._queue.get() |
+ try: |
+ try: |
+ f(self) |
+ except Exception, e: |
+ # Catch all exceptions, otherwise we can't join the thread. Print |
+ # the backtrace now, but keep the exception so that we can raise it |
+ # on the main thread. |
+ type, value, tb = sys.exc_info() |
+ traceback.print_exception(type, value, tb) |
+ self.exceptions.append(e) |
+ finally: |
+ self._pool._JobDone() |
+ |
+ def _AddJobToQueue(self, job): |
+ self._condition.acquire() |
+ self._queue.put(job) |
+ self._jobs_left += 1 |
+ self._condition.release() |
+ |
+ def _JobDone(self): |
+ self._condition.acquire() |
+ try: |
+ assert self._jobs_left |
+ self._jobs_left -= 1 |
+ if self._jobs_left == 0: |
+ self._condition.notify() |
+ finally: |
+ self._condition.release() |
+ |
+ def _JoinQueue(self): |
+ self._condition.acquire() |
+ try: |
+ while self._jobs_left: |
+ self._condition.wait(1) |
+ finally: |
+ self._condition.release() |
+ |
+ def Start(self): |
+ """Starts the thread pool. Spawns worker threads.""" |
+ if not self._threads: |
+ return |
+ assert not self._workers |
+ for i in xrange(0, self._threads): |
+ worker = self.Worker(self) |
+ self._workers.append(worker) |
+ worker.start() |
+ |
+ def Stop(self): |
+ """Stops the thread pool. Joins all worker threads.""" |
+ if not self._threads: |
+ return |
+ assert self._workers |
+ for i in xrange(0, len(self._workers)): |
+ wrapped = lambda thread: thread.Done() |
+ self._AddJobToQueue(wrapped) |
+ self._JoinQueue() |
+ for worker in self._workers: |
+ while True: |
+ worker.join(1) |
+ if not worker.isAlive(): |
+ break |
+ try: |
+ for worker in self._workers: |
+ for e in worker.exceptions: |
+ # If we collected exceptions, raise them now. |
+ raise e |
+ finally: |
+ self._workers = [] |
+ |
+ def AddJob(self, function): |
+ """Adds a job to the queue. |
+ |
+ A job is a simple closure, that will get executed on one of the worker |
+ threads.""" |
+ if self._threads: |
+ wrapped = lambda worker: function() |
+ self._AddJobToQueue(wrapped) |
+ else: |
+ function() |
+ |
+ def WaitJobs(self): |
+ """Waits for all jobs to be completed.""" |
+ if not self._threads: |
+ return |
+ assert self._workers |
+ self._JoinQueue() |
+ |
+ |
+class ThreadSafeProgress(Progress): |
+ """A thread safe progress meter. |
+ |
+ This class just wraps around a Progress class, that just forwards calls while |
+ holding a lock.""" |
+ def __init__(self, title, total=0): |
+ Progress.__init__(self, title, total) |
+ self._lock = threading.Lock() |
+ |
+ def update(self, inc=1): |
+ self._lock.acquire() |
+ Progress.update(self, inc) |
+ self._lock.release() |
+ |
+ def end(self): |
+ self._lock.acquire() |
+ Progress.end(self) |
+ self._lock.release() |