Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Unified Diff: gclient_utils.py

Issue 3336015: Add the infrastructure necessary to support annotated stdout. (Closed)
Patch Set: add progress.update() Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gclient_utils.py
diff --git a/gclient_utils.py b/gclient_utils.py
index d2ba580c11739d68e40543038102f18b6ca9e523..ecc5eac52feb3ff6f25a709d4a1342a2aab558b7 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -14,9 +14,11 @@
"""Generic utils."""
+import copy
import errno
import logging
import os
+import Queue
import re
import stat
import subprocess
@@ -400,7 +402,7 @@ def FindGclientRoot(from_dir, filename='.gclient'):
return path
path_to_check = os.path.dirname(path_to_check)
return None
-
+
logging.info('Found gclient root at ' + path)
return path
@@ -455,7 +457,9 @@ class WorkItem(object):
# A unique string representing this work item.
name = None
- def run(self):
+ def run(self, work_queue, options):
+ """work_queue and options are passed as keyword arguments so they should be
+ the last parameters of the function when you override it."""
pass
@@ -483,10 +487,11 @@ class ExecutionQueue(object):
# List of items currently running.
self.running = []
# Exceptions thrown if any.
- self.exceptions = []
+ self.exceptions = Queue.Queue()
+ # Progress status
self.progress = progress
if self.progress:
- self.progress.update()
+ self.progress.update(0)
def enqueue(self, d):
"""Enqueue one Dependency to be executed later once its requirements are
@@ -507,21 +512,22 @@ class ExecutionQueue(object):
def flush(self, *args, **kwargs):
"""Runs all enqueued items until all are executed."""
+ kwargs['work_queue'] = self
self.ready_cond.acquire()
try:
while True:
# Check for task to run first, then wait.
while True:
- if self.exceptions:
- # Systematically flush the queue when there is an exception logged
- # in.
+ if not self.exceptions.empty():
+ # Systematically flush the queue when an exception logged.
self.queued = []
- # Flush threads that have terminated.
- self.running = [t for t in self.running if t.isAlive()]
- if not self.queued and not self.running:
- break
- if self.jobs == len(self.running):
+ self._flush_terminated_threads()
+ if (not self.queued and not self.running or
+ self.jobs == len(self.running)):
+ # No more worker threads or can't queue anything.
break
+
+ # Check for new tasks to start.
for i in xrange(len(self.queued)):
# Verify its requirements.
for r in self.queued[i].requirements:
@@ -530,64 +536,88 @@ class ExecutionQueue(object):
break
else:
# Start one work item: all its requirements are satisfied.
- d = self.queued.pop(i)
- new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
- if self.jobs > 1:
- # Start the thread.
- self.running.append(new_thread)
- new_thread.start()
- else:
- # Run the 'thread' inside the main thread.
- new_thread.run()
+ self._run_one_task(self.queued.pop(i), args, kwargs)
break
else:
# Couldn't find an item that could run. Break out the outher loop.
break
+
if not self.queued and not self.running:
+ # We're done.
break
# We need to poll here otherwise Ctrl-C isn't processed.
self.ready_cond.wait(10)
# Something happened: self.enqueue() or a thread terminated. Loop again.
finally:
self.ready_cond.release()
+
assert not self.running, 'Now guaranteed to be single-threaded'
- if self.exceptions:
+ if not self.exceptions.empty():
# To get back the stack location correctly, the raise a, b, c form must be
# used, passing a tuple as the first argument doesn't work.
- e = self.exceptions.pop(0)
+ e = self.exceptions.get()
raise e[0], e[1], e[2]
if self.progress:
self.progress.end()
+ def _flush_terminated_threads(self):
+ """Flush threads that have terminated."""
+ running = self.running
+ self.running = []
+ for t in running:
+ if t.isAlive():
+ self.running.append(t)
+ else:
+ t.join()
+ t.kwargs['options'].stdout.flush()
+ if self.progress:
+ self.progress.update(1)
+ assert not t.name in self.ran
+ if not t.name in self.ran:
+ self.ran.append(t.name)
+
+ def _run_one_task(self, task_item, args, kwargs):
+ if self.jobs > 1:
+ # Start the thread.
+ index = len(self.ran) + len(self.running) + 1
+ # Copy 'options' just to be safe.
+ task_kwargs = kwargs.copy()
+ task_kwargs['options'] = copy.copy(task_kwargs['options'])
+ new_thread = self._Worker(task_item, args, task_kwargs)
+ self.running.append(new_thread)
+ new_thread.start()
+ else:
+ # Run the 'thread' inside the main thread. Don't try to catch any
+ # exception.
+ task_item.run(*args, **kwargs)
+ self.ran.append(task_item.name)
+ if self.progress:
+ self.progress.update(1)
+
class _Worker(threading.Thread):
"""One thread to execute one WorkItem."""
- def __init__(self, parent, item, args=(), kwargs=None):
+ def __init__(self, item, args, kwargs):
threading.Thread.__init__(self, name=item.name or 'Worker')
- self.args = args
- self.kwargs = kwargs or {}
+ logging.info(item.name)
self.item = item
- self.parent = parent
+ self.args = args
+ self.kwargs = kwargs
def run(self):
"""Runs in its own thread."""
logging.debug('running(%s)' % self.item.name)
- exception = None
+ work_queue = self.kwargs['work_queue']
try:
self.item.run(*self.args, **self.kwargs)
except Exception:
# Catch exception location.
- exception = sys.exc_info()
+ logging.info('Caught exception in thread %s' % self.item.name)
+ logging.info(str(sys.exc_info()))
+ work_queue.exceptions.put(sys.exc_info())
+ logging.info('Task %s done' % self.item.name)
- # This assumes the following code won't throw an exception. Bad.
- self.parent.ready_cond.acquire()
+ work_queue.ready_cond.acquire()
try:
- if exception:
- self.parent.exceptions.append(exception)
- if self.parent.progress:
- self.parent.progress.update(1)
- assert not self.item.name in self.parent.ran
- if not self.item.name in self.parent.ran:
- self.parent.ran.append(self.item.name)
+ work_queue.ready_cond.notifyAll()
finally:
- self.parent.ready_cond.notifyAll()
- self.parent.ready_cond.release()
+ work_queue.ready_cond.release()
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698