| Index: gclient_utils.py
|
| diff --git a/gclient_utils.py b/gclient_utils.py
|
| index d2ba580c11739d68e40543038102f18b6ca9e523..ecc5eac52feb3ff6f25a709d4a1342a2aab558b7 100644
|
| --- a/gclient_utils.py
|
| +++ b/gclient_utils.py
|
| @@ -14,9 +14,11 @@
|
|
|
| """Generic utils."""
|
|
|
| +import copy
|
| import errno
|
| import logging
|
| import os
|
| +import Queue
|
| import re
|
| import stat
|
| import subprocess
|
| @@ -400,7 +402,7 @@ def FindGclientRoot(from_dir, filename='.gclient'):
|
| return path
|
| path_to_check = os.path.dirname(path_to_check)
|
| return None
|
| -
|
| +
|
| logging.info('Found gclient root at ' + path)
|
| return path
|
|
|
| @@ -455,7 +457,9 @@ class WorkItem(object):
|
| # A unique string representing this work item.
|
| name = None
|
|
|
| - def run(self):
|
| + def run(self, work_queue, options):
|
| + """work_queue and options are passed as keyword arguments so they should be
|
| + the last parameters of the function when you override it."""
|
| pass
|
|
|
|
|
| @@ -483,10 +487,11 @@ class ExecutionQueue(object):
|
| # List of items currently running.
|
| self.running = []
|
| # Exceptions thrown if any.
|
| - self.exceptions = []
|
| + self.exceptions = Queue.Queue()
|
| + # Progress status
|
| self.progress = progress
|
| if self.progress:
|
| - self.progress.update()
|
| + self.progress.update(0)
|
|
|
| def enqueue(self, d):
|
| """Enqueue one Dependency to be executed later once its requirements are
|
| @@ -507,21 +512,22 @@ class ExecutionQueue(object):
|
|
|
| def flush(self, *args, **kwargs):
|
| """Runs all enqueued items until all are executed."""
|
| + kwargs['work_queue'] = self
|
| self.ready_cond.acquire()
|
| try:
|
| while True:
|
| # Check for task to run first, then wait.
|
| while True:
|
| - if self.exceptions:
|
| - # Systematically flush the queue when there is an exception logged
|
| - # in.
|
| + if not self.exceptions.empty():
|
| + # Systematically flush the queue when an exception logged.
|
| self.queued = []
|
| - # Flush threads that have terminated.
|
| - self.running = [t for t in self.running if t.isAlive()]
|
| - if not self.queued and not self.running:
|
| - break
|
| - if self.jobs == len(self.running):
|
| + self._flush_terminated_threads()
|
| + if (not self.queued and not self.running or
|
| + self.jobs == len(self.running)):
|
| + # No more worker threads or can't queue anything.
|
| break
|
| +
|
| + # Check for new tasks to start.
|
| for i in xrange(len(self.queued)):
|
| # Verify its requirements.
|
| for r in self.queued[i].requirements:
|
| @@ -530,64 +536,88 @@ class ExecutionQueue(object):
|
| break
|
| else:
|
| # Start one work item: all its requirements are satisfied.
|
| - d = self.queued.pop(i)
|
| - new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
|
| - if self.jobs > 1:
|
| - # Start the thread.
|
| - self.running.append(new_thread)
|
| - new_thread.start()
|
| - else:
|
| - # Run the 'thread' inside the main thread.
|
| - new_thread.run()
|
| + self._run_one_task(self.queued.pop(i), args, kwargs)
|
| break
|
| else:
|
| # Couldn't find an item that could run. Break out the outher loop.
|
| break
|
| +
|
| if not self.queued and not self.running:
|
| + # We're done.
|
| break
|
| # We need to poll here otherwise Ctrl-C isn't processed.
|
| self.ready_cond.wait(10)
|
| # Something happened: self.enqueue() or a thread terminated. Loop again.
|
| finally:
|
| self.ready_cond.release()
|
| +
|
| assert not self.running, 'Now guaranteed to be single-threaded'
|
| - if self.exceptions:
|
| + if not self.exceptions.empty():
|
| # To get back the stack location correctly, the raise a, b, c form must be
|
| # used, passing a tuple as the first argument doesn't work.
|
| - e = self.exceptions.pop(0)
|
| + e = self.exceptions.get()
|
| raise e[0], e[1], e[2]
|
| if self.progress:
|
| self.progress.end()
|
|
|
| + def _flush_terminated_threads(self):
|
| + """Flush threads that have terminated."""
|
| + running = self.running
|
| + self.running = []
|
| + for t in running:
|
| + if t.isAlive():
|
| + self.running.append(t)
|
| + else:
|
| + t.join()
|
| + t.kwargs['options'].stdout.flush()
|
| + if self.progress:
|
| + self.progress.update(1)
|
| + assert not t.name in self.ran
|
| + if not t.name in self.ran:
|
| + self.ran.append(t.name)
|
| +
|
| + def _run_one_task(self, task_item, args, kwargs):
|
| + if self.jobs > 1:
|
| + # Start the thread.
|
| + index = len(self.ran) + len(self.running) + 1
|
| + # Copy 'options' just to be safe.
|
| + task_kwargs = kwargs.copy()
|
| + task_kwargs['options'] = copy.copy(task_kwargs['options'])
|
| + new_thread = self._Worker(task_item, args, task_kwargs)
|
| + self.running.append(new_thread)
|
| + new_thread.start()
|
| + else:
|
| + # Run the 'thread' inside the main thread. Don't try to catch any
|
| + # exception.
|
| + task_item.run(*args, **kwargs)
|
| + self.ran.append(task_item.name)
|
| + if self.progress:
|
| + self.progress.update(1)
|
| +
|
| class _Worker(threading.Thread):
|
| """One thread to execute one WorkItem."""
|
| - def __init__(self, parent, item, args=(), kwargs=None):
|
| + def __init__(self, item, args, kwargs):
|
| threading.Thread.__init__(self, name=item.name or 'Worker')
|
| - self.args = args
|
| - self.kwargs = kwargs or {}
|
| + logging.info(item.name)
|
| self.item = item
|
| - self.parent = parent
|
| + self.args = args
|
| + self.kwargs = kwargs
|
|
|
| def run(self):
|
| """Runs in its own thread."""
|
| logging.debug('running(%s)' % self.item.name)
|
| - exception = None
|
| + work_queue = self.kwargs['work_queue']
|
| try:
|
| self.item.run(*self.args, **self.kwargs)
|
| except Exception:
|
| # Catch exception location.
|
| - exception = sys.exc_info()
|
| + logging.info('Caught exception in thread %s' % self.item.name)
|
| + logging.info(str(sys.exc_info()))
|
| + work_queue.exceptions.put(sys.exc_info())
|
| + logging.info('Task %s done' % self.item.name)
|
|
|
| - # This assumes the following code won't throw an exception. Bad.
|
| - self.parent.ready_cond.acquire()
|
| + work_queue.ready_cond.acquire()
|
| try:
|
| - if exception:
|
| - self.parent.exceptions.append(exception)
|
| - if self.parent.progress:
|
| - self.parent.progress.update(1)
|
| - assert not self.item.name in self.parent.ran
|
| - if not self.item.name in self.parent.ran:
|
| - self.parent.ran.append(self.item.name)
|
| + work_queue.ready_cond.notifyAll()
|
| finally:
|
| - self.parent.ready_cond.notifyAll()
|
| - self.parent.ready_cond.release()
|
| + work_queue.ready_cond.release()
|
|
|