| Index: gclient_utils.py
|
| diff --git a/gclient_utils.py b/gclient_utils.py
|
| index c530f4cb4cd79078635de9fe26a0912023e426a0..b2b009314036ea3e8fb5910e27c78d78966eeb7c 100644
|
| --- a/gclient_utils.py
|
| +++ b/gclient_utils.py
|
| @@ -21,6 +21,7 @@ import re
|
| import stat
|
| import subprocess
|
| import sys
|
| +import threading
|
| import time
|
| import threading
|
| import xml.dom.minidom
|
| @@ -378,21 +379,30 @@ class WorkItem(object):
|
|
|
|
|
| class ExecutionQueue(object):
|
| - """Dependencies sometime needs to be run out of order due to From() keyword.
|
| + """Runs a set of WorkItem that have interdependencies and were WorkItem are
|
| + added as they are processed.
|
|
|
| - This class manages that all the required dependencies are run before running
|
| - each one.
|
| + In gclient's case, Dependencies sometime needs to be run out of order due to
|
| + From() keyword. This class manages that all the required dependencies are run
|
| + before running each one.
|
|
|
| - Methods of this class are multithread safe.
|
| + Methods of this class are thread safe.
|
| """
|
| - def __init__(self, progress):
|
| - self.lock = threading.Lock()
|
| - # List of WorkItem, Dependency inherits from WorkItem.
|
| + def __init__(self, jobs, progress):
|
| + """jobs specifies the number of concurrent tasks to allow. progress is a
|
| + Progress instance."""
|
| + # Set when a thread is done or a new item is enqueued.
|
| + self.ready_cond = threading.Condition()
|
| + # Maximum number of concurrent tasks.
|
| + self.jobs = jobs
|
| + # List of WorkItem, for gclient, these are Dependency instances.
|
| self.queued = []
|
| # List of strings representing each Dependency.name that was run.
|
| self.ran = []
|
| # List of items currently running.
|
| self.running = []
|
| + # Exceptions thrown if any.
|
| + self.exceptions = []
|
| self.progress = progress
|
| if self.progress:
|
| self.progress.update()
|
| @@ -402,71 +412,99 @@ class ExecutionQueue(object):
|
| satisfied.
|
| """
|
| assert isinstance(d, WorkItem)
|
| + self.ready_cond.acquire()
|
| try:
|
| - self.lock.acquire()
|
| self.queued.append(d)
|
| total = len(self.queued) + len(self.ran) + len(self.running)
|
| + logging.debug('enqueued(%s)' % d.name)
|
| + if self.progress:
|
| + self.progress._total = total + 1
|
| + self.progress.update(0)
|
| + self.ready_cond.notifyAll()
|
| finally:
|
| - self.lock.release()
|
| - if self.progress:
|
| - self.progress._total = total + 1
|
| - self.progress.update(0)
|
| + self.ready_cond.release()
|
|
|
| def flush(self, *args, **kwargs):
|
| """Runs all enqueued items until all are executed."""
|
| - while self._run_one_item(*args, **kwargs):
|
| - pass
|
| - queued = []
|
| - running = []
|
| + self.ready_cond.acquire()
|
| try:
|
| - self.lock.acquire()
|
| - if self.queued:
|
| - queued = self.queued
|
| - self.queued = []
|
| - if self.running:
|
| - running = self.running
|
| - self.running = []
|
| + while True:
|
| + # Check for task to run first, then wait.
|
| + while True:
|
| + if self.exceptions:
|
| + # Systematically flush the queue when there is an exception logged
|
| + # in.
|
| + self.queued = []
|
| + # Flush threads that have terminated.
|
| + self.running = [t for t in self.running if t.isAlive()]
|
| + if not self.queued and not self.running:
|
| + break
|
| + if self.jobs == len(self.running):
|
| + break
|
| + for i in xrange(len(self.queued)):
|
| + # Verify its requirements.
|
| + for r in self.queued[i].requirements:
|
| + if not r in self.ran:
|
| + # Requirement not met.
|
| + break
|
| + else:
|
| + # Start one work item: all its requirements are satisfied.
|
| + d = self.queued.pop(i)
|
| + new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
|
| + if self.jobs > 1:
|
| + # Start the thread.
|
| + self.running.append(new_thread)
|
| + new_thread.start()
|
| + else:
|
| + # Run the 'thread' inside the main thread.
|
| + new_thread.run()
|
| + break
|
| + else:
|
| + # Couldn't find an item that could run. Break out the outher loop.
|
| + break
|
| + if not self.queued and not self.running:
|
| + break
|
| + # We need to poll here otherwise Ctrl-C isn't processed.
|
| + self.ready_cond.wait(10)
|
| + # Something happened: self.enqueue() or a thread terminated. Loop again.
|
| finally:
|
| - self.lock.release()
|
| + self.ready_cond.release()
|
| + assert not self.running, 'Now guaranteed to be single-threaded'
|
| + if self.exceptions:
|
| + # TODO(maruel): Get back the original stack location.
|
| + raise self.exceptions.pop(0)
|
| if self.progress:
|
| self.progress.end()
|
| - if queued:
|
| - raise gclient_utils.Error('Entries still queued: %s' % str(queued))
|
| - if running:
|
| - raise gclient_utils.Error('Entries still queued: %s' % str(running))
|
|
|
| - def _run_one_item(self, *args, **kwargs):
|
| - """Removes one item from the queue that has all its requirements completed
|
| - and execute it.
|
| + class _Worker(threading.Thread):
|
| + """One thread to execute one WorkItem."""
|
| + def __init__(self, parent, item, args=(), kwargs=None):
|
| + threading.Thread.__init__(self, name=item.name or 'Worker')
|
| + self.args = args
|
| + self.kwargs = kwargs or {}
|
| + self.item = item
|
| + self.parent = parent
|
| +
|
| + def run(self):
|
| + """Runs in its own thread."""
|
| + logging.debug('running(%s)' % self.item.name)
|
| + exception = None
|
| + try:
|
| + self.item.run(*self.args, **self.kwargs)
|
| + except Exception, e:
|
| + # TODO(maruel): Catch exception location.
|
| + exception = e
|
|
|
| - Returns False if no item could be run.
|
| - """
|
| - i = 0
|
| - d = None
|
| - try:
|
| - self.lock.acquire()
|
| - while i != len(self.queued) and not d:
|
| - d = self.queued.pop(i)
|
| - for r in d.requirements:
|
| - if not r in self.ran:
|
| - self.queued.insert(i, d)
|
| - d = None
|
| - break
|
| - i += 1
|
| - if not d:
|
| - return False
|
| - self.running.append(d)
|
| - finally:
|
| - self.lock.release()
|
| - d.run(*args, **kwargs)
|
| - try:
|
| - self.lock.acquire()
|
| - assert not d.name in self.ran
|
| - if not d.name in self.ran:
|
| - self.ran.append(d.name)
|
| - self.running.remove(d)
|
| - if self.progress:
|
| - self.progress.update(1)
|
| - finally:
|
| - self.lock.release()
|
| - return True
|
| + # This assumes the following code won't throw an exception. Bad.
|
| + self.parent.ready_cond.acquire()
|
| + try:
|
| + if exception:
|
| + self.parent.exceptions.append(exception)
|
| + if self.parent.progress:
|
| + self.parent.progress.update(1)
|
| + assert not self.item.name in self.parent.ran
|
| + if not self.item.name in self.parent.ran:
|
| + self.parent.ran.append(self.item.name)
|
| + finally:
|
| + self.parent.ready_cond.notifyAll()
|
| + self.parent.ready_cond.release()
|
|
|