| Index: gclient_utils.py
|
| diff --git a/gclient_utils.py b/gclient_utils.py
|
| index 54910ce45a2cfd482955e1cad31c70ec6a846d18..6bb555c28c01c6ac76a22c1661fe5f3b838e6d4b 100644
|
| --- a/gclient_utils.py
|
| +++ b/gclient_utils.py
|
| @@ -17,11 +17,15 @@
|
| import errno
|
| import logging
|
| import os
|
| +from third_party.repo.progress import Progress
|
| +import Queue
|
| import re
|
| import stat
|
| import subprocess
|
| import sys
|
| import time
|
| +import threading
|
| +import traceback
|
| import xml.dom.minidom
|
| import xml.parsers.expat
|
|
|
| @@ -371,3 +375,139 @@ def GetGClientRootAndEntries(path=None):
|
| execfile(config_path, env)
|
| config_dir = os.path.dirname(config_path)
|
| return config_dir, env['entries']
|
| +
|
| +
|
| +class ThreadPool:
|
| + """A thread pool class that lets one schedule jobs on many worker threads."""
|
| +
|
| + def __init__(self, threads=1):
|
| + self._threads = threads
|
| + self._queue = Queue.Queue()
|
| + self._jobs_left = 0
|
| + self._condition = threading.Condition()
|
| + self._workers = []
|
| +
|
| + class Worker(threading.Thread):
|
| + """Internal worker class that executes jobs from the ThreadPool queue."""
|
| +
|
| + def __init__(self, pool):
|
| + threading.Thread.__init__(self)
|
| + self.setDaemon(True)
|
| + self._pool = pool
|
| + self._done = False
|
| + self.exceptions = []
|
| +
|
| + def Done(self):
|
| + """Terminates the worker threads."""
|
| + self._done = True
|
| +
|
| + def run(self):
|
| + """Executes jobs from the pool's queue."""
|
| + while not self._done:
|
| + f = self._pool._queue.get()
|
| + try:
|
| + try:
|
| + f(self)
|
| + except Exception, e:
|
| + # Catch all exceptions, otherwise we can't join the thread. Print
|
| + # the backtrace now, but keep the exception so that we can raise it
|
| + # on the main thread.
|
| + type, value, tb = sys.exc_info()
|
| + traceback.print_exception(type, value, tb)
|
| + self.exceptions.append(e)
|
| + finally:
|
| + self._pool._JobDone()
|
| +
|
| + def _AddJobToQueue(self, job):
|
| + self._condition.acquire()
|
| + self._queue.put(job)
|
| + self._jobs_left += 1
|
| + self._condition.release()
|
| +
|
| + def _JobDone(self):
|
| + self._condition.acquire()
|
| + try:
|
| + assert self._jobs_left
|
| + self._jobs_left -= 1
|
| + if self._jobs_left == 0:
|
| + self._condition.notify()
|
| + finally:
|
| + self._condition.release()
|
| +
|
| + def _JoinQueue(self):
|
| + self._condition.acquire()
|
| + try:
|
| + while self._jobs_left:
|
| + self._condition.wait(1)
|
| + finally:
|
| + self._condition.release()
|
| +
|
| + def Start(self):
|
| + """Starts the thread pool. Spawns worker threads."""
|
| + if not self._threads:
|
| + return
|
| + assert not self._workers
|
| + for i in xrange(0, self._threads):
|
| + worker = self.Worker(self)
|
| + self._workers.append(worker)
|
| + worker.start()
|
| +
|
| + def Stop(self):
|
| + """Stops the thread pool. Joins all worker threads."""
|
| + if not self._threads:
|
| + return
|
| + assert self._workers
|
| + for i in xrange(0, len(self._workers)):
|
| + wrapped = lambda thread: thread.Done()
|
| + self._AddJobToQueue(wrapped)
|
| + self._JoinQueue()
|
| + for worker in self._workers:
|
| + while True:
|
| + worker.join(1)
|
| + if not worker.isAlive():
|
| + break
|
| + try:
|
| + for worker in self._workers:
|
| + for e in worker.exceptions:
|
| + # If we collected exceptions, raise them now.
|
| + raise e
|
| + finally:
|
| + self._workers = []
|
| +
|
| + def AddJob(self, function):
|
| + """Adds a job to the queue.
|
| +
|
| + A job is a simple closure, that will get executed on one of the worker
|
| + threads."""
|
| + if self._threads:
|
| + wrapped = lambda worker: function()
|
| + self._AddJobToQueue(wrapped)
|
| + else:
|
| + function()
|
| +
|
| + def WaitJobs(self):
|
| + """Waits for all jobs to be completed."""
|
| + if not self._threads:
|
| + return
|
| + assert self._workers
|
| + self._JoinQueue()
|
| +
|
| +
|
| +class ThreadSafeProgress(Progress):
|
| + """A thread safe progress meter.
|
| +
|
| + This class just wraps around a Progress class, that just forwards calls while
|
| + holding a lock."""
|
| + def __init__(self, title, total=0):
|
| + Progress.__init__(self, title, total)
|
| + self._lock = threading.Lock()
|
| +
|
| + def update(self, inc=1):
|
| + self._lock.acquire()
|
| + Progress.update(self, inc)
|
| + self._lock.release()
|
| +
|
| + def end(self):
|
| + self._lock.acquire()
|
| + Progress.end(self)
|
| + self._lock.release()
|
|
|