Index: gclient_utils.py |
diff --git a/gclient_utils.py b/gclient_utils.py |
index c530f4cb4cd79078635de9fe26a0912023e426a0..b2b009314036ea3e8fb5910e27c78d78966eeb7c 100644 |
--- a/gclient_utils.py |
+++ b/gclient_utils.py |
@@ -21,6 +21,7 @@ import re |
import stat |
import subprocess |
import sys |
+import threading |
import time |
import threading |
import xml.dom.minidom |
@@ -378,21 +379,30 @@ class WorkItem(object): |
class ExecutionQueue(object): |
- """Dependencies sometime needs to be run out of order due to From() keyword. |
+ """Runs a set of WorkItem that have interdependencies and were WorkItem are |
+ added as they are processed. |
- This class manages that all the required dependencies are run before running |
- each one. |
+ In gclient's case, Dependencies sometime needs to be run out of order due to |
+ From() keyword. This class manages that all the required dependencies are run |
+ before running each one. |
- Methods of this class are multithread safe. |
+ Methods of this class are thread safe. |
""" |
- def __init__(self, progress): |
- self.lock = threading.Lock() |
- # List of WorkItem, Dependency inherits from WorkItem. |
+ def __init__(self, jobs, progress): |
+ """jobs specifies the number of concurrent tasks to allow. progress is a |
+ Progress instance.""" |
+ # Set when a thread is done or a new item is enqueued. |
+ self.ready_cond = threading.Condition() |
+ # Maximum number of concurrent tasks. |
+ self.jobs = jobs |
+ # List of WorkItem, for gclient, these are Dependency instances. |
self.queued = [] |
# List of strings representing each Dependency.name that was run. |
self.ran = [] |
# List of items currently running. |
self.running = [] |
+ # Exceptions thrown if any. |
+ self.exceptions = [] |
self.progress = progress |
if self.progress: |
self.progress.update() |
@@ -402,71 +412,99 @@ class ExecutionQueue(object): |
satisfied. |
""" |
assert isinstance(d, WorkItem) |
+ self.ready_cond.acquire() |
try: |
- self.lock.acquire() |
self.queued.append(d) |
total = len(self.queued) + len(self.ran) + len(self.running) |
+ logging.debug('enqueued(%s)' % d.name) |
+ if self.progress: |
+ self.progress._total = total + 1 |
+ self.progress.update(0) |
+ self.ready_cond.notifyAll() |
finally: |
- self.lock.release() |
- if self.progress: |
- self.progress._total = total + 1 |
- self.progress.update(0) |
+ self.ready_cond.release() |
def flush(self, *args, **kwargs): |
"""Runs all enqueued items until all are executed.""" |
- while self._run_one_item(*args, **kwargs): |
- pass |
- queued = [] |
- running = [] |
+ self.ready_cond.acquire() |
try: |
- self.lock.acquire() |
- if self.queued: |
- queued = self.queued |
- self.queued = [] |
- if self.running: |
- running = self.running |
- self.running = [] |
+ while True: |
+ # Check for task to run first, then wait. |
+ while True: |
+ if self.exceptions: |
+ # Systematically flush the queue when there is an exception logged |
+ # in. |
+ self.queued = [] |
+ # Flush threads that have terminated. |
+ self.running = [t for t in self.running if t.isAlive()] |
+ if not self.queued and not self.running: |
+ break |
+ if self.jobs == len(self.running): |
+ break |
+ for i in xrange(len(self.queued)): |
+ # Verify its requirements. |
+ for r in self.queued[i].requirements: |
+ if not r in self.ran: |
+ # Requirement not met. |
+ break |
+ else: |
+ # Start one work item: all its requirements are satisfied. |
+ d = self.queued.pop(i) |
+ new_thread = self._Worker(self, d, args=args, kwargs=kwargs) |
+ if self.jobs > 1: |
+ # Start the thread. |
+ self.running.append(new_thread) |
+ new_thread.start() |
+ else: |
+ # Run the 'thread' inside the main thread. |
+ new_thread.run() |
+ break |
+ else: |
+ # Couldn't find an item that could run. Break out the outher loop. |
+ break |
+ if not self.queued and not self.running: |
+ break |
+ # We need to poll here otherwise Ctrl-C isn't processed. |
+ self.ready_cond.wait(10) |
+ # Something happened: self.enqueue() or a thread terminated. Loop again. |
finally: |
- self.lock.release() |
+ self.ready_cond.release() |
+ assert not self.running, 'Now guaranteed to be single-threaded' |
+ if self.exceptions: |
+ # TODO(maruel): Get back the original stack location. |
+ raise self.exceptions.pop(0) |
if self.progress: |
self.progress.end() |
- if queued: |
- raise gclient_utils.Error('Entries still queued: %s' % str(queued)) |
- if running: |
- raise gclient_utils.Error('Entries still queued: %s' % str(running)) |
- def _run_one_item(self, *args, **kwargs): |
- """Removes one item from the queue that has all its requirements completed |
- and execute it. |
+ class _Worker(threading.Thread): |
+ """One thread to execute one WorkItem.""" |
+ def __init__(self, parent, item, args=(), kwargs=None): |
+ threading.Thread.__init__(self, name=item.name or 'Worker') |
+ self.args = args |
+ self.kwargs = kwargs or {} |
+ self.item = item |
+ self.parent = parent |
+ |
+ def run(self): |
+ """Runs in its own thread.""" |
+ logging.debug('running(%s)' % self.item.name) |
+ exception = None |
+ try: |
+ self.item.run(*self.args, **self.kwargs) |
+ except Exception, e: |
+ # TODO(maruel): Catch exception location. |
+ exception = e |
- Returns False if no item could be run. |
- """ |
- i = 0 |
- d = None |
- try: |
- self.lock.acquire() |
- while i != len(self.queued) and not d: |
- d = self.queued.pop(i) |
- for r in d.requirements: |
- if not r in self.ran: |
- self.queued.insert(i, d) |
- d = None |
- break |
- i += 1 |
- if not d: |
- return False |
- self.running.append(d) |
- finally: |
- self.lock.release() |
- d.run(*args, **kwargs) |
- try: |
- self.lock.acquire() |
- assert not d.name in self.ran |
- if not d.name in self.ran: |
- self.ran.append(d.name) |
- self.running.remove(d) |
- if self.progress: |
- self.progress.update(1) |
- finally: |
- self.lock.release() |
- return True |
+ # This assumes the following code won't throw an exception. Bad. |
+ self.parent.ready_cond.acquire() |
+ try: |
+ if exception: |
+ self.parent.exceptions.append(exception) |
+ if self.parent.progress: |
+ self.parent.progress.update(1) |
+ assert not self.item.name in self.parent.ran |
+ if not self.item.name in self.parent.ran: |
+ self.parent.ran.append(self.item.name) |
+ finally: |
+ self.parent.ready_cond.notifyAll() |
+ self.parent.ready_cond.release() |