Index: gclient_utils.py |
diff --git a/gclient_utils.py b/gclient_utils.py |
index aeaebc271d19ee6cb9d6c5a28f5e2fbfd8e9ef49..924f0a57df5060be7a3b6b720856755859086563 100644 |
--- a/gclient_utils.py |
+++ b/gclient_utils.py |
@@ -297,8 +297,10 @@ def CheckCallAndFilterAndHeader(args, always=False, **kwargs): |
def SoftClone(obj): |
"""Clones an object. copy.copy() doesn't work on 'file' objects.""" |
- class NewObject(object): pass |
- new_obj = NewObject() |
+ if obj.__class__.__name__ == 'SoftCloned': |
+ return obj |
+ class SoftCloned(object): pass |
+ new_obj = SoftCloned() |
for member in dir(obj): |
if member.startswith('_'): |
continue |
@@ -314,10 +316,11 @@ def MakeFileAutoFlush(fileobj, delay=10): |
return fileobj |
new_fileobj = SoftClone(fileobj) |
- new_fileobj.lock = threading.Lock() |
+ if not hasattr(new_fileobj, 'lock'): |
+ new_fileobj.lock = threading.Lock() |
new_fileobj.last_flushed_at = time.time() |
new_fileobj.delay = delay |
- new_fileobj.old_auto_flush_write = fileobj.write |
+ new_fileobj.old_auto_flush_write = new_fileobj.write |
# Silence pylint. |
new_fileobj.flush = fileobj.flush |
@@ -339,27 +342,68 @@ def MakeFileAutoFlush(fileobj, delay=10): |
return new_fileobj |
-class StdoutAnnotated(object): |
- """Prepends every line with a string.""" |
- def __init__(self, prepend, stdout): |
- self.prepend = prepend |
- self.buf = '' |
- self.stdout = stdout |
+def MakeFileAnnotated(fileobj): |
+ """Creates a file object clone to automatically prepends every line in worker |
+ threads with a NN> prefix.""" |
+ if hasattr(fileobj, 'output_buffers'): |
+ # Already patched. |
+ return fileobj |
- def write(self, out): |
- self.buf += out |
- while '\n' in self.buf: |
- line, self.buf = self.buf.split('\n', 1) |
- self.stdout.write(self.prepend + line + '\n') |
+ new_fileobj = SoftClone(fileobj) |
+ if not hasattr(new_fileobj, 'lock'): |
+ new_fileobj.lock = threading.Lock() |
+ new_fileobj.output_buffers = {} |
+ new_fileobj.old_annotated_write = new_fileobj.write |
+ |
+ def annotated_write(out): |
+ index = getattr(threading.currentThread(), 'index', None) |
+ if index is None: |
+ # Undexed threads aren't buffered. |
+ new_fileobj.old_annotated_write(out) |
+ return |
- def flush(self): |
- pass |
+ new_fileobj.lock.acquire() |
+ try: |
+ # Use a dummy array to hold the string so the code can be lockless. |
+ # Strings are immutable, requiring to keep a lock for the whole dictionary |
+ # otherwise. Using an array is faster than using a dummy object. |
+ if not index in new_fileobj.output_buffers: |
+ obj = new_fileobj.output_buffers[index] = [''] |
+ else: |
+ obj = new_fileobj.output_buffers[index] |
+ finally: |
+ new_fileobj.lock.release() |
- def full_flush(self): |
- if self.buf: |
- self.stdout.write(self.prepend + self.buf) |
- self.stdout.flush() |
- self.buf = '' |
+ # Continue lockless. |
+ obj[0] += out |
+ while '\n' in obj[0]: |
+ line, remaining = obj[0].split('\n', 1) |
+ new_fileobj.old_annotated_write('%d>%s\n' % (index, line)) |
+ obj[0] = remaining |
+ |
+ def full_flush(): |
+ """Flush buffered output.""" |
+ orphans = [] |
+ new_fileobj.lock.acquire() |
+ try: |
+ # Detect threads no longer existing. |
+ indexes = (getattr(t, 'index', None) for t in threading.enumerate()) |
+ indexed = filter(None, indexes) |
+ for index in new_fileobj.output_buffers: |
+ if not index in indexes: |
+ orphans.append((index, new_fileobj.output_buffers[index][0])) |
+ for orphan in orphans: |
+ del new_fileobj.output_buffers[orphan[0]] |
+ finally: |
+ new_fileobj.lock.release() |
+ |
+ # Don't keep the lock while writting. Will append \n when it shouldn't. |
+ for orphan in orphans: |
+ new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1])) |
+ |
+ new_fileobj.write = annotated_write |
+ new_fileobj.full_flush = full_flush |
+ return new_fileobj |
def CheckCallAndFilter(args, stdout=None, filter_fn=None, |
@@ -628,12 +672,10 @@ class ExecutionQueue(object): |
if self.jobs > 1: |
# Start the thread. |
index = len(self.ran) + len(self.running) + 1 |
- # Copy 'options' and add annotated stdout. |
+ # Copy 'options'. |
task_kwargs = kwargs.copy() |
task_kwargs['options'] = copy.copy(task_kwargs['options']) |
- task_kwargs['options'].stdout = StdoutAnnotated( |
- '%d>' % index, task_kwargs['options'].stdout) |
- new_thread = self._Worker(task_item, args, task_kwargs) |
+ new_thread = self._Worker(task_item, index, args, task_kwargs) |
self.running.append(new_thread) |
new_thread.start() |
else: |
@@ -646,10 +688,11 @@ class ExecutionQueue(object): |
class _Worker(threading.Thread): |
"""One thread to execute one WorkItem.""" |
- def __init__(self, item, args, kwargs): |
+ def __init__(self, item, index, args, kwargs): |
threading.Thread.__init__(self, name=item.name or 'Worker') |
logging.info(item.name) |
self.item = item |
+ self.index = index |
self.args = args |
self.kwargs = kwargs |