| Index: gclient_utils.py
 | 
| diff --git a/gclient_utils.py b/gclient_utils.py
 | 
| index d2ba580c11739d68e40543038102f18b6ca9e523..ecc5eac52feb3ff6f25a709d4a1342a2aab558b7 100644
 | 
| --- a/gclient_utils.py
 | 
| +++ b/gclient_utils.py
 | 
| @@ -14,9 +14,11 @@
 | 
|  
 | 
|  """Generic utils."""
 | 
|  
 | 
| +import copy
 | 
|  import errno
 | 
|  import logging
 | 
|  import os
 | 
| +import Queue
 | 
|  import re
 | 
|  import stat
 | 
|  import subprocess
 | 
| @@ -400,7 +402,7 @@ def FindGclientRoot(from_dir, filename='.gclient'):
 | 
|          return path
 | 
|        path_to_check = os.path.dirname(path_to_check)
 | 
|      return None
 | 
| -    
 | 
| +
 | 
|    logging.info('Found gclient root at ' + path)
 | 
|    return path
 | 
|  
 | 
| @@ -455,7 +457,9 @@ class WorkItem(object):
 | 
|    # A unique string representing this work item.
 | 
|    name = None
 | 
|  
 | 
| -  def run(self):
 | 
| +  def run(self, work_queue, options):
 | 
| +    """work_queue and options are passed as keyword arguments so they should be
 | 
| +    the last parameters of the function when you override it."""
 | 
|      pass
 | 
|  
 | 
|  
 | 
| @@ -483,10 +487,11 @@ class ExecutionQueue(object):
 | 
|      # List of items currently running.
 | 
|      self.running = []
 | 
|      # Exceptions thrown if any.
 | 
| -    self.exceptions = []
 | 
| +    self.exceptions = Queue.Queue()
 | 
| +    # Progress status
 | 
|      self.progress = progress
 | 
|      if self.progress:
 | 
| -      self.progress.update()
 | 
| +      self.progress.update(0)
 | 
|  
 | 
|    def enqueue(self, d):
 | 
|      """Enqueue one Dependency to be executed later once its requirements are
 | 
| @@ -507,21 +512,22 @@ class ExecutionQueue(object):
 | 
|  
 | 
|    def flush(self, *args, **kwargs):
 | 
|      """Runs all enqueued items until all are executed."""
 | 
| +    kwargs['work_queue'] = self
 | 
|      self.ready_cond.acquire()
 | 
|      try:
 | 
|        while True:
 | 
|          # Check for task to run first, then wait.
 | 
|          while True:
 | 
| -          if self.exceptions:
 | 
| -            # Systematically flush the queue when there is an exception logged
 | 
| -            # in.
 | 
| +          if not self.exceptions.empty():
 | 
| +            # Systematically flush the queue when an exception logged.
 | 
|              self.queued = []
 | 
| -          # Flush threads that have terminated.
 | 
| -          self.running = [t for t in self.running if t.isAlive()]
 | 
| -          if not self.queued and not self.running:
 | 
| -            break
 | 
| -          if self.jobs == len(self.running):
 | 
| +          self._flush_terminated_threads()
 | 
| +          if (not self.queued and not self.running or
 | 
| +              self.jobs == len(self.running)):
 | 
| +            # No more worker threads or can't queue anything.
 | 
|              break
 | 
| +
 | 
| +          # Check for new tasks to start.
 | 
|            for i in xrange(len(self.queued)):
 | 
|              # Verify its requirements.
 | 
|              for r in self.queued[i].requirements:
 | 
| @@ -530,64 +536,88 @@ class ExecutionQueue(object):
 | 
|                  break
 | 
|              else:
 | 
|                # Start one work item: all its requirements are satisfied.
 | 
| -              d = self.queued.pop(i)
 | 
| -              new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
 | 
| -              if self.jobs > 1:
 | 
| -                # Start the thread.
 | 
| -                self.running.append(new_thread)
 | 
| -                new_thread.start()
 | 
| -              else:
 | 
| -                # Run the 'thread' inside the main thread.
 | 
| -                new_thread.run()
 | 
| +              self._run_one_task(self.queued.pop(i), args, kwargs)
 | 
|                break
 | 
|            else:
 | 
|              # Couldn't find an item that could run. Break out the outher loop.
 | 
|              break
 | 
| +
 | 
|          if not self.queued and not self.running:
 | 
| +          # We're done.
 | 
|            break
 | 
|          # We need to poll here otherwise Ctrl-C isn't processed.
 | 
|          self.ready_cond.wait(10)
 | 
|          # Something happened: self.enqueue() or a thread terminated. Loop again.
 | 
|      finally:
 | 
|        self.ready_cond.release()
 | 
| +
 | 
|      assert not self.running, 'Now guaranteed to be single-threaded'
 | 
| -    if self.exceptions:
 | 
| +    if not self.exceptions.empty():
 | 
|        # To get back the stack location correctly, the raise a, b, c form must be
 | 
|        # used, passing a tuple as the first argument doesn't work.
 | 
| -      e = self.exceptions.pop(0)
 | 
| +      e = self.exceptions.get()
 | 
|        raise e[0], e[1], e[2]
 | 
|      if self.progress:
 | 
|        self.progress.end()
 | 
|  
 | 
| +  def _flush_terminated_threads(self):
 | 
| +    """Flush threads that have terminated."""
 | 
| +    running = self.running
 | 
| +    self.running = []
 | 
| +    for t in running:
 | 
| +      if t.isAlive():
 | 
| +        self.running.append(t)
 | 
| +      else:
 | 
| +        t.join()
 | 
| +        t.kwargs['options'].stdout.flush()
 | 
| +        if self.progress:
 | 
| +          self.progress.update(1)
 | 
| +        assert not t.name in self.ran
 | 
| +        if not t.name in self.ran:
 | 
| +          self.ran.append(t.name)
 | 
| +
 | 
| +  def _run_one_task(self, task_item, args, kwargs):
 | 
| +    if self.jobs > 1:
 | 
| +      # Start the thread.
 | 
| +      index = len(self.ran) + len(self.running) + 1
 | 
| +      # Copy 'options' just to be safe.
 | 
| +      task_kwargs = kwargs.copy()
 | 
| +      task_kwargs['options'] = copy.copy(task_kwargs['options'])
 | 
| +      new_thread = self._Worker(task_item, args, task_kwargs)
 | 
| +      self.running.append(new_thread)
 | 
| +      new_thread.start()
 | 
| +    else:
 | 
| +      # Run the 'thread' inside the main thread. Don't try to catch any
 | 
| +      # exception.
 | 
| +      task_item.run(*args, **kwargs)
 | 
| +      self.ran.append(task_item.name)
 | 
| +      if self.progress:
 | 
| +        self.progress.update(1)
 | 
| +
 | 
|    class _Worker(threading.Thread):
 | 
|      """One thread to execute one WorkItem."""
 | 
| -    def __init__(self, parent, item, args=(), kwargs=None):
 | 
| +    def __init__(self, item, args, kwargs):
 | 
|        threading.Thread.__init__(self, name=item.name or 'Worker')
 | 
| -      self.args = args
 | 
| -      self.kwargs = kwargs or {}
 | 
| +      logging.info(item.name)
 | 
|        self.item = item
 | 
| -      self.parent = parent
 | 
| +      self.args = args
 | 
| +      self.kwargs = kwargs
 | 
|  
 | 
|      def run(self):
 | 
|        """Runs in its own thread."""
 | 
|        logging.debug('running(%s)' % self.item.name)
 | 
| -      exception = None
 | 
| +      work_queue = self.kwargs['work_queue']
 | 
|        try:
 | 
|          self.item.run(*self.args, **self.kwargs)
 | 
|        except Exception:
 | 
|          # Catch exception location.
 | 
| -        exception = sys.exc_info()
 | 
| +        logging.info('Caught exception in thread %s' % self.item.name)
 | 
| +        logging.info(str(sys.exc_info()))
 | 
| +        work_queue.exceptions.put(sys.exc_info())
 | 
| +      logging.info('Task %s done' % self.item.name)
 | 
|  
 | 
| -      # This assumes the following code won't throw an exception. Bad.
 | 
| -      self.parent.ready_cond.acquire()
 | 
| +      work_queue.ready_cond.acquire()
 | 
|        try:
 | 
| -        if exception:
 | 
| -          self.parent.exceptions.append(exception)
 | 
| -        if self.parent.progress:
 | 
| -          self.parent.progress.update(1)
 | 
| -        assert not self.item.name in self.parent.ran
 | 
| -        if not self.item.name in self.parent.ran:
 | 
| -          self.parent.ran.append(self.item.name)
 | 
| +        work_queue.ready_cond.notifyAll()
 | 
|        finally:
 | 
| -        self.parent.ready_cond.notifyAll()
 | 
| -        self.parent.ready_cond.release()
 | 
| +        work_queue.ready_cond.release()
 | 
| 
 |