| Index: gclient_utils.py
|
| diff --git a/gclient_utils.py b/gclient_utils.py
|
| index aeaebc271d19ee6cb9d6c5a28f5e2fbfd8e9ef49..924f0a57df5060be7a3b6b720856755859086563 100644
|
| --- a/gclient_utils.py
|
| +++ b/gclient_utils.py
|
| @@ -297,8 +297,10 @@ def CheckCallAndFilterAndHeader(args, always=False, **kwargs):
|
|
|
| def SoftClone(obj):
|
| """Clones an object. copy.copy() doesn't work on 'file' objects."""
|
| - class NewObject(object): pass
|
| - new_obj = NewObject()
|
| + if obj.__class__.__name__ == 'SoftCloned':
|
| + return obj
|
| + class SoftCloned(object): pass
|
| + new_obj = SoftCloned()
|
| for member in dir(obj):
|
| if member.startswith('_'):
|
| continue
|
| @@ -314,10 +316,11 @@ def MakeFileAutoFlush(fileobj, delay=10):
|
| return fileobj
|
|
|
| new_fileobj = SoftClone(fileobj)
|
| - new_fileobj.lock = threading.Lock()
|
| + if not hasattr(new_fileobj, 'lock'):
|
| + new_fileobj.lock = threading.Lock()
|
| new_fileobj.last_flushed_at = time.time()
|
| new_fileobj.delay = delay
|
| - new_fileobj.old_auto_flush_write = fileobj.write
|
| + new_fileobj.old_auto_flush_write = new_fileobj.write
|
| # Silence pylint.
|
| new_fileobj.flush = fileobj.flush
|
|
|
| @@ -339,27 +342,68 @@ def MakeFileAutoFlush(fileobj, delay=10):
|
| return new_fileobj
|
|
|
|
|
| -class StdoutAnnotated(object):
|
| - """Prepends every line with a string."""
|
| - def __init__(self, prepend, stdout):
|
| - self.prepend = prepend
|
| - self.buf = ''
|
| - self.stdout = stdout
|
| +def MakeFileAnnotated(fileobj):
|
| + """Creates a file object clone to automatically prepends every line in worker
|
| + threads with a NN> prefix."""
|
| + if hasattr(fileobj, 'output_buffers'):
|
| + # Already patched.
|
| + return fileobj
|
|
|
| - def write(self, out):
|
| - self.buf += out
|
| - while '\n' in self.buf:
|
| - line, self.buf = self.buf.split('\n', 1)
|
| - self.stdout.write(self.prepend + line + '\n')
|
| + new_fileobj = SoftClone(fileobj)
|
| + if not hasattr(new_fileobj, 'lock'):
|
| + new_fileobj.lock = threading.Lock()
|
| + new_fileobj.output_buffers = {}
|
| + new_fileobj.old_annotated_write = new_fileobj.write
|
| +
|
| + def annotated_write(out):
|
| + index = getattr(threading.currentThread(), 'index', None)
|
| + if index is None:
|
| + # Undexed threads aren't buffered.
|
| + new_fileobj.old_annotated_write(out)
|
| + return
|
|
|
| - def flush(self):
|
| - pass
|
| + new_fileobj.lock.acquire()
|
| + try:
|
| + # Use a dummy array to hold the string so the code can be lockless.
|
| + # Strings are immutable, requiring to keep a lock for the whole dictionary
|
| + # otherwise. Using an array is faster than using a dummy object.
|
| + if not index in new_fileobj.output_buffers:
|
| + obj = new_fileobj.output_buffers[index] = ['']
|
| + else:
|
| + obj = new_fileobj.output_buffers[index]
|
| + finally:
|
| + new_fileobj.lock.release()
|
|
|
| - def full_flush(self):
|
| - if self.buf:
|
| - self.stdout.write(self.prepend + self.buf)
|
| - self.stdout.flush()
|
| - self.buf = ''
|
| + # Continue lockless.
|
| + obj[0] += out
|
| + while '\n' in obj[0]:
|
| + line, remaining = obj[0].split('\n', 1)
|
| + new_fileobj.old_annotated_write('%d>%s\n' % (index, line))
|
| + obj[0] = remaining
|
| +
|
| + def full_flush():
|
| + """Flush buffered output."""
|
| + orphans = []
|
| + new_fileobj.lock.acquire()
|
| + try:
|
| + # Detect threads no longer existing.
|
| + indexes = (getattr(t, 'index', None) for t in threading.enumerate())
|
| + indexed = filter(None, indexes)
|
| + for index in new_fileobj.output_buffers:
|
| + if not index in indexes:
|
| + orphans.append((index, new_fileobj.output_buffers[index][0]))
|
| + for orphan in orphans:
|
| + del new_fileobj.output_buffers[orphan[0]]
|
| + finally:
|
| + new_fileobj.lock.release()
|
| +
|
| + # Don't keep the lock while writting. Will append \n when it shouldn't.
|
| + for orphan in orphans:
|
| + new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1]))
|
| +
|
| + new_fileobj.write = annotated_write
|
| + new_fileobj.full_flush = full_flush
|
| + return new_fileobj
|
|
|
|
|
| def CheckCallAndFilter(args, stdout=None, filter_fn=None,
|
| @@ -628,12 +672,10 @@ class ExecutionQueue(object):
|
| if self.jobs > 1:
|
| # Start the thread.
|
| index = len(self.ran) + len(self.running) + 1
|
| - # Copy 'options' and add annotated stdout.
|
| + # Copy 'options'.
|
| task_kwargs = kwargs.copy()
|
| task_kwargs['options'] = copy.copy(task_kwargs['options'])
|
| - task_kwargs['options'].stdout = StdoutAnnotated(
|
| - '%d>' % index, task_kwargs['options'].stdout)
|
| - new_thread = self._Worker(task_item, args, task_kwargs)
|
| + new_thread = self._Worker(task_item, index, args, task_kwargs)
|
| self.running.append(new_thread)
|
| new_thread.start()
|
| else:
|
| @@ -646,10 +688,11 @@ class ExecutionQueue(object):
|
|
|
| class _Worker(threading.Thread):
|
| """One thread to execute one WorkItem."""
|
| - def __init__(self, item, args, kwargs):
|
| + def __init__(self, item, index, args, kwargs):
|
| threading.Thread.__init__(self, name=item.name or 'Worker')
|
| logging.info(item.name)
|
| self.item = item
|
| + self.index = index
|
| self.args = args
|
| self.kwargs = kwargs
|
|
|
|
|