Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Unified Diff: gclient_utils.py

Issue 3398008: Directly hook sys.stdout for thread annotated output. (Closed)
Patch Set: Fix small errors, smoke tests rock. Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gclient_utils.py
diff --git a/gclient_utils.py b/gclient_utils.py
index aeaebc271d19ee6cb9d6c5a28f5e2fbfd8e9ef49..924f0a57df5060be7a3b6b720856755859086563 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -297,8 +297,10 @@ def CheckCallAndFilterAndHeader(args, always=False, **kwargs):
def SoftClone(obj):
"""Clones an object. copy.copy() doesn't work on 'file' objects."""
- class NewObject(object): pass
- new_obj = NewObject()
+ if obj.__class__.__name__ == 'SoftCloned':
+ return obj
+ class SoftCloned(object): pass
+ new_obj = SoftCloned()
for member in dir(obj):
if member.startswith('_'):
continue
@@ -314,10 +316,11 @@ def MakeFileAutoFlush(fileobj, delay=10):
return fileobj
new_fileobj = SoftClone(fileobj)
- new_fileobj.lock = threading.Lock()
+ if not hasattr(new_fileobj, 'lock'):
+ new_fileobj.lock = threading.Lock()
new_fileobj.last_flushed_at = time.time()
new_fileobj.delay = delay
- new_fileobj.old_auto_flush_write = fileobj.write
+ new_fileobj.old_auto_flush_write = new_fileobj.write
# Silence pylint.
new_fileobj.flush = fileobj.flush
@@ -339,27 +342,68 @@ def MakeFileAutoFlush(fileobj, delay=10):
return new_fileobj
-class StdoutAnnotated(object):
- """Prepends every line with a string."""
- def __init__(self, prepend, stdout):
- self.prepend = prepend
- self.buf = ''
- self.stdout = stdout
+def MakeFileAnnotated(fileobj):
+ """Creates a file object clone to automatically prepends every line in worker
+ threads with a NN> prefix."""
+ if hasattr(fileobj, 'output_buffers'):
+ # Already patched.
+ return fileobj
- def write(self, out):
- self.buf += out
- while '\n' in self.buf:
- line, self.buf = self.buf.split('\n', 1)
- self.stdout.write(self.prepend + line + '\n')
+ new_fileobj = SoftClone(fileobj)
+ if not hasattr(new_fileobj, 'lock'):
+ new_fileobj.lock = threading.Lock()
+ new_fileobj.output_buffers = {}
+ new_fileobj.old_annotated_write = new_fileobj.write
+
+ def annotated_write(out):
+ index = getattr(threading.currentThread(), 'index', None)
+ if index is None:
+ # Undexed threads aren't buffered.
+ new_fileobj.old_annotated_write(out)
+ return
- def flush(self):
- pass
+ new_fileobj.lock.acquire()
+ try:
+ # Use a dummy array to hold the string so the code can be lockless.
+ # Strings are immutable, requiring to keep a lock for the whole dictionary
+ # otherwise. Using an array is faster than using a dummy object.
+ if not index in new_fileobj.output_buffers:
+ obj = new_fileobj.output_buffers[index] = ['']
+ else:
+ obj = new_fileobj.output_buffers[index]
+ finally:
+ new_fileobj.lock.release()
- def full_flush(self):
- if self.buf:
- self.stdout.write(self.prepend + self.buf)
- self.stdout.flush()
- self.buf = ''
+ # Continue lockless.
+ obj[0] += out
+ while '\n' in obj[0]:
+ line, remaining = obj[0].split('\n', 1)
+ new_fileobj.old_annotated_write('%d>%s\n' % (index, line))
+ obj[0] = remaining
+
+ def full_flush():
+ """Flush buffered output."""
+ orphans = []
+ new_fileobj.lock.acquire()
+ try:
+ # Detect threads no longer existing.
+ indexes = (getattr(t, 'index', None) for t in threading.enumerate())
+ indexed = filter(None, indexes)
+ for index in new_fileobj.output_buffers:
+ if not index in indexes:
+ orphans.append((index, new_fileobj.output_buffers[index][0]))
+ for orphan in orphans:
+ del new_fileobj.output_buffers[orphan[0]]
+ finally:
+ new_fileobj.lock.release()
+
+ # Don't keep the lock while writting. Will append \n when it shouldn't.
+ for orphan in orphans:
+ new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1]))
+
+ new_fileobj.write = annotated_write
+ new_fileobj.full_flush = full_flush
+ return new_fileobj
def CheckCallAndFilter(args, stdout=None, filter_fn=None,
@@ -628,12 +672,10 @@ class ExecutionQueue(object):
if self.jobs > 1:
# Start the thread.
index = len(self.ran) + len(self.running) + 1
- # Copy 'options' and add annotated stdout.
+ # Copy 'options'.
task_kwargs = kwargs.copy()
task_kwargs['options'] = copy.copy(task_kwargs['options'])
- task_kwargs['options'].stdout = StdoutAnnotated(
- '%d>' % index, task_kwargs['options'].stdout)
- new_thread = self._Worker(task_item, args, task_kwargs)
+ new_thread = self._Worker(task_item, index, args, task_kwargs)
self.running.append(new_thread)
new_thread.start()
else:
@@ -646,10 +688,11 @@ class ExecutionQueue(object):
class _Worker(threading.Thread):
"""One thread to execute one WorkItem."""
- def __init__(self, item, args, kwargs):
+ def __init__(self, item, index, args, kwargs):
threading.Thread.__init__(self, name=item.name or 'Worker')
logging.info(item.name)
self.item = item
+ self.index = index
self.args = args
self.kwargs = kwargs
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698