Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(89)

Unified Diff: gclient_utils.py

Issue 1640001: Add -j option to gclient to run parallel updates (Closed)
Patch Set: update to tot Created 10 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gclient_utils.py
diff --git a/gclient_utils.py b/gclient_utils.py
index 54910ce45a2cfd482955e1cad31c70ec6a846d18..6bb555c28c01c6ac76a22c1661fe5f3b838e6d4b 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -17,11 +17,15 @@
import errno
import logging
import os
+from third_party.repo.progress import Progress
+import Queue
import re
import stat
import subprocess
import sys
import time
+import threading
+import traceback
import xml.dom.minidom
import xml.parsers.expat
@@ -371,3 +375,139 @@ def GetGClientRootAndEntries(path=None):
execfile(config_path, env)
config_dir = os.path.dirname(config_path)
return config_dir, env['entries']
+
+
+class ThreadPool:
+ """A thread pool class that lets one schedule jobs on many worker threads."""
+
+ def __init__(self, threads=1):
+ self._threads = threads
+ self._queue = Queue.Queue()
+ self._jobs_left = 0
+ self._condition = threading.Condition()
+ self._workers = []
+
+ class Worker(threading.Thread):
+ """Internal worker class that executes jobs from the ThreadPool queue."""
+
+ def __init__(self, pool):
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+ self._pool = pool
+ self._done = False
+ self.exceptions = []
+
+ def Done(self):
+ """Terminates the worker threads."""
+ self._done = True
+
+ def run(self):
+ """Executes jobs from the pool's queue."""
+ while not self._done:
+ f = self._pool._queue.get()
+ try:
+ try:
+ f(self)
+ except Exception, e:
+ # Catch all exceptions, otherwise we can't join the thread. Print
+ # the backtrace now, but keep the exception so that we can raise it
+ # on the main thread.
+ type, value, tb = sys.exc_info()
+ traceback.print_exception(type, value, tb)
+ self.exceptions.append(e)
+ finally:
+ self._pool._JobDone()
+
+ def _AddJobToQueue(self, job):
+ self._condition.acquire()
+ self._queue.put(job)
+ self._jobs_left += 1
+ self._condition.release()
+
+ def _JobDone(self):
+ self._condition.acquire()
+ try:
+ assert self._jobs_left
+ self._jobs_left -= 1
+ if self._jobs_left == 0:
+ self._condition.notify()
+ finally:
+ self._condition.release()
+
+ def _JoinQueue(self):
+ self._condition.acquire()
+ try:
+ while self._jobs_left:
+ self._condition.wait(1)
+ finally:
+ self._condition.release()
+
+ def Start(self):
+ """Starts the thread pool. Spawns worker threads."""
+ if not self._threads:
+ return
+ assert not self._workers
+ for i in xrange(0, self._threads):
+ worker = self.Worker(self)
+ self._workers.append(worker)
+ worker.start()
+
+ def Stop(self):
+ """Stops the thread pool. Joins all worker threads."""
+ if not self._threads:
+ return
+ assert self._workers
+ for i in xrange(0, len(self._workers)):
+ wrapped = lambda thread: thread.Done()
+ self._AddJobToQueue(wrapped)
+ self._JoinQueue()
+ for worker in self._workers:
+ while True:
+ worker.join(1)
+ if not worker.isAlive():
+ break
+ try:
+ for worker in self._workers:
+ for e in worker.exceptions:
+ # If we collected exceptions, raise them now.
+ raise e
+ finally:
+ self._workers = []
+
+ def AddJob(self, function):
+ """Adds a job to the queue.
+
+ A job is a simple closure, that will get executed on one of the worker
+ threads."""
+ if self._threads:
+ wrapped = lambda worker: function()
+ self._AddJobToQueue(wrapped)
+ else:
+ function()
+
+ def WaitJobs(self):
+ """Waits for all jobs to be completed."""
+ if not self._threads:
+ return
+ assert self._workers
+ self._JoinQueue()
+
+
+class ThreadSafeProgress(Progress):
+ """A thread safe progress meter.
+
+ This class just wraps around a Progress class, that just forwards calls while
+ holding a lock."""
+ def __init__(self, title, total=0):
+ Progress.__init__(self, title, total)
+ self._lock = threading.Lock()
+
+ def update(self, inc=1):
+ self._lock.acquire()
+ Progress.update(self, inc)
+ self._lock.release()
+
+ def end(self):
+ self._lock.acquire()
+ Progress.end(self)
+ self._lock.release()
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698