Index: gclient_utils.py |
diff --git a/gclient_utils.py b/gclient_utils.py |
index d2ba580c11739d68e40543038102f18b6ca9e523..ecc5eac52feb3ff6f25a709d4a1342a2aab558b7 100644 |
--- a/gclient_utils.py |
+++ b/gclient_utils.py |
@@ -14,9 +14,11 @@ |
"""Generic utils.""" |
+import copy |
import errno |
import logging |
import os |
+import Queue |
import re |
import stat |
import subprocess |
@@ -400,7 +402,7 @@ def FindGclientRoot(from_dir, filename='.gclient'): |
return path |
path_to_check = os.path.dirname(path_to_check) |
return None |
- |
+ |
logging.info('Found gclient root at ' + path) |
return path |
@@ -455,7 +457,9 @@ class WorkItem(object): |
# A unique string representing this work item. |
name = None |
- def run(self): |
+ def run(self, work_queue, options): |
+ """work_queue and options are passed as keyword arguments so they should be |
+ the last parameters of the function when you override it.""" |
pass |
@@ -483,10 +487,11 @@ class ExecutionQueue(object): |
# List of items currently running. |
self.running = [] |
# Exceptions thrown if any. |
- self.exceptions = [] |
+ self.exceptions = Queue.Queue() |
+ # Progress status |
self.progress = progress |
if self.progress: |
- self.progress.update() |
+ self.progress.update(0) |
def enqueue(self, d): |
"""Enqueue one Dependency to be executed later once its requirements are |
@@ -507,21 +512,22 @@ class ExecutionQueue(object): |
def flush(self, *args, **kwargs): |
"""Runs all enqueued items until all are executed.""" |
+ kwargs['work_queue'] = self |
self.ready_cond.acquire() |
try: |
while True: |
# Check for task to run first, then wait. |
while True: |
- if self.exceptions: |
- # Systematically flush the queue when there is an exception logged |
- # in. |
+ if not self.exceptions.empty(): |
+ # Systematically flush the queue when an exception logged. |
self.queued = [] |
- # Flush threads that have terminated. |
- self.running = [t for t in self.running if t.isAlive()] |
- if not self.queued and not self.running: |
- break |
- if self.jobs == len(self.running): |
+ self._flush_terminated_threads() |
+ if (not self.queued and not self.running or |
+ self.jobs == len(self.running)): |
+ # No more worker threads or can't queue anything. |
break |
+ |
+ # Check for new tasks to start. |
for i in xrange(len(self.queued)): |
# Verify its requirements. |
for r in self.queued[i].requirements: |
@@ -530,64 +536,88 @@ class ExecutionQueue(object): |
break |
else: |
# Start one work item: all its requirements are satisfied. |
- d = self.queued.pop(i) |
- new_thread = self._Worker(self, d, args=args, kwargs=kwargs) |
- if self.jobs > 1: |
- # Start the thread. |
- self.running.append(new_thread) |
- new_thread.start() |
- else: |
- # Run the 'thread' inside the main thread. |
- new_thread.run() |
+ self._run_one_task(self.queued.pop(i), args, kwargs) |
break |
else: |
# Couldn't find an item that could run. Break out the outher loop. |
break |
+ |
if not self.queued and not self.running: |
+ # We're done. |
break |
# We need to poll here otherwise Ctrl-C isn't processed. |
self.ready_cond.wait(10) |
# Something happened: self.enqueue() or a thread terminated. Loop again. |
finally: |
self.ready_cond.release() |
+ |
assert not self.running, 'Now guaranteed to be single-threaded' |
- if self.exceptions: |
+ if not self.exceptions.empty(): |
# To get back the stack location correctly, the raise a, b, c form must be |
# used, passing a tuple as the first argument doesn't work. |
- e = self.exceptions.pop(0) |
+ e = self.exceptions.get() |
raise e[0], e[1], e[2] |
if self.progress: |
self.progress.end() |
+ def _flush_terminated_threads(self): |
+ """Flush threads that have terminated.""" |
+ running = self.running |
+ self.running = [] |
+ for t in running: |
+ if t.isAlive(): |
+ self.running.append(t) |
+ else: |
+ t.join() |
+ t.kwargs['options'].stdout.flush() |
+ if self.progress: |
+ self.progress.update(1) |
+ assert not t.name in self.ran |
+ if not t.name in self.ran: |
+ self.ran.append(t.name) |
+ |
+ def _run_one_task(self, task_item, args, kwargs): |
+ if self.jobs > 1: |
+ # Start the thread. |
+ index = len(self.ran) + len(self.running) + 1 |
+ # Copy 'options' just to be safe. |
+ task_kwargs = kwargs.copy() |
+ task_kwargs['options'] = copy.copy(task_kwargs['options']) |
+ new_thread = self._Worker(task_item, args, task_kwargs) |
+ self.running.append(new_thread) |
+ new_thread.start() |
+ else: |
+ # Run the 'thread' inside the main thread. Don't try to catch any |
+ # exception. |
+ task_item.run(*args, **kwargs) |
+ self.ran.append(task_item.name) |
+ if self.progress: |
+ self.progress.update(1) |
+ |
class _Worker(threading.Thread): |
"""One thread to execute one WorkItem.""" |
- def __init__(self, parent, item, args=(), kwargs=None): |
+ def __init__(self, item, args, kwargs): |
threading.Thread.__init__(self, name=item.name or 'Worker') |
- self.args = args |
- self.kwargs = kwargs or {} |
+ logging.info(item.name) |
self.item = item |
- self.parent = parent |
+ self.args = args |
+ self.kwargs = kwargs |
def run(self): |
"""Runs in its own thread.""" |
logging.debug('running(%s)' % self.item.name) |
- exception = None |
+ work_queue = self.kwargs['work_queue'] |
try: |
self.item.run(*self.args, **self.kwargs) |
except Exception: |
# Catch exception location. |
- exception = sys.exc_info() |
+ logging.info('Caught exception in thread %s' % self.item.name) |
+ logging.info(str(sys.exc_info())) |
+ work_queue.exceptions.put(sys.exc_info()) |
+ logging.info('Task %s done' % self.item.name) |
- # This assumes the following code won't throw an exception. Bad. |
- self.parent.ready_cond.acquire() |
+ work_queue.ready_cond.acquire() |
try: |
- if exception: |
- self.parent.exceptions.append(exception) |
- if self.parent.progress: |
- self.parent.progress.update(1) |
- assert not self.item.name in self.parent.ran |
- if not self.item.name in self.parent.ran: |
- self.parent.ran.append(self.item.name) |
+ work_queue.ready_cond.notifyAll() |
finally: |
- self.parent.ready_cond.notifyAll() |
- self.parent.ready_cond.release() |
+ work_queue.ready_cond.release() |