Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(102)

Unified Diff: gclient_utils.py

Issue 3135014: Add --jobs support to gclient. --jobs=1 is still the default for now. (Closed)
Patch Set: As long as timeout is specified, it doesn't need to be a low value Created 10 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gclient.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gclient_utils.py
diff --git a/gclient_utils.py b/gclient_utils.py
index c530f4cb4cd79078635de9fe26a0912023e426a0..b2b009314036ea3e8fb5910e27c78d78966eeb7c 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -21,6 +21,7 @@ import re
import stat
import subprocess
import sys
+import threading
import time
import threading
import xml.dom.minidom
@@ -378,21 +379,30 @@ class WorkItem(object):
class ExecutionQueue(object):
- """Dependencies sometime needs to be run out of order due to From() keyword.
+ """Runs a set of WorkItem that have interdependencies and were WorkItem are
+ added as they are processed.
- This class manages that all the required dependencies are run before running
- each one.
+ In gclient's case, Dependencies sometime needs to be run out of order due to
+ From() keyword. This class manages that all the required dependencies are run
+ before running each one.
- Methods of this class are multithread safe.
+ Methods of this class are thread safe.
"""
- def __init__(self, progress):
- self.lock = threading.Lock()
- # List of WorkItem, Dependency inherits from WorkItem.
+ def __init__(self, jobs, progress):
+ """jobs specifies the number of concurrent tasks to allow. progress is a
+ Progress instance."""
+ # Set when a thread is done or a new item is enqueued.
+ self.ready_cond = threading.Condition()
+ # Maximum number of concurrent tasks.
+ self.jobs = jobs
+ # List of WorkItem, for gclient, these are Dependency instances.
self.queued = []
# List of strings representing each Dependency.name that was run.
self.ran = []
# List of items currently running.
self.running = []
+ # Exceptions thrown if any.
+ self.exceptions = []
self.progress = progress
if self.progress:
self.progress.update()
@@ -402,71 +412,99 @@ class ExecutionQueue(object):
satisfied.
"""
assert isinstance(d, WorkItem)
+ self.ready_cond.acquire()
try:
- self.lock.acquire()
self.queued.append(d)
total = len(self.queued) + len(self.ran) + len(self.running)
+ logging.debug('enqueued(%s)' % d.name)
+ if self.progress:
+ self.progress._total = total + 1
+ self.progress.update(0)
+ self.ready_cond.notifyAll()
finally:
- self.lock.release()
- if self.progress:
- self.progress._total = total + 1
- self.progress.update(0)
+ self.ready_cond.release()
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
- while self._run_one_item(*args, **kwargs):
- pass
- queued = []
- running = []
+ self.ready_cond.acquire()
try:
- self.lock.acquire()
- if self.queued:
- queued = self.queued
- self.queued = []
- if self.running:
- running = self.running
- self.running = []
+ while True:
+ # Check for task to run first, then wait.
+ while True:
+ if self.exceptions:
+ # Systematically flush the queue when there is an exception logged
+ # in.
+ self.queued = []
+ # Flush threads that have terminated.
+ self.running = [t for t in self.running if t.isAlive()]
+ if not self.queued and not self.running:
+ break
+ if self.jobs == len(self.running):
+ break
+ for i in xrange(len(self.queued)):
+ # Verify its requirements.
+ for r in self.queued[i].requirements:
+ if not r in self.ran:
+ # Requirement not met.
+ break
+ else:
+ # Start one work item: all its requirements are satisfied.
+ d = self.queued.pop(i)
+ new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
+ if self.jobs > 1:
+ # Start the thread.
+ self.running.append(new_thread)
+ new_thread.start()
+ else:
+ # Run the 'thread' inside the main thread.
+ new_thread.run()
+ break
+ else:
+ # Couldn't find an item that could run. Break out the outher loop.
+ break
+ if not self.queued and not self.running:
+ break
+ # We need to poll here otherwise Ctrl-C isn't processed.
+ self.ready_cond.wait(10)
+ # Something happened: self.enqueue() or a thread terminated. Loop again.
finally:
- self.lock.release()
+ self.ready_cond.release()
+ assert not self.running, 'Now guaranteed to be single-threaded'
+ if self.exceptions:
+ # TODO(maruel): Get back the original stack location.
+ raise self.exceptions.pop(0)
if self.progress:
self.progress.end()
- if queued:
- raise gclient_utils.Error('Entries still queued: %s' % str(queued))
- if running:
- raise gclient_utils.Error('Entries still queued: %s' % str(running))
- def _run_one_item(self, *args, **kwargs):
- """Removes one item from the queue that has all its requirements completed
- and execute it.
+ class _Worker(threading.Thread):
+ """One thread to execute one WorkItem."""
+ def __init__(self, parent, item, args=(), kwargs=None):
+ threading.Thread.__init__(self, name=item.name or 'Worker')
+ self.args = args
+ self.kwargs = kwargs or {}
+ self.item = item
+ self.parent = parent
+
+ def run(self):
+ """Runs in its own thread."""
+ logging.debug('running(%s)' % self.item.name)
+ exception = None
+ try:
+ self.item.run(*self.args, **self.kwargs)
+ except Exception, e:
+ # TODO(maruel): Catch exception location.
+ exception = e
- Returns False if no item could be run.
- """
- i = 0
- d = None
- try:
- self.lock.acquire()
- while i != len(self.queued) and not d:
- d = self.queued.pop(i)
- for r in d.requirements:
- if not r in self.ran:
- self.queued.insert(i, d)
- d = None
- break
- i += 1
- if not d:
- return False
- self.running.append(d)
- finally:
- self.lock.release()
- d.run(*args, **kwargs)
- try:
- self.lock.acquire()
- assert not d.name in self.ran
- if not d.name in self.ran:
- self.ran.append(d.name)
- self.running.remove(d)
- if self.progress:
- self.progress.update(1)
- finally:
- self.lock.release()
- return True
+ # This assumes the following code won't throw an exception. Bad.
+ self.parent.ready_cond.acquire()
+ try:
+ if exception:
+ self.parent.exceptions.append(exception)
+ if self.parent.progress:
+ self.parent.progress.update(1)
+ assert not self.item.name in self.parent.ran
+ if not self.item.name in self.parent.ran:
+ self.parent.ran.append(self.item.name)
+ finally:
+ self.parent.ready_cond.notifyAll()
+ self.parent.ready_cond.release()
« no previous file with comments | « gclient.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698