Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(23)

Side by Side Diff: gclient_utils.py

Issue 3398008: Directly hook sys.stdout for thread annotated output. (Closed)
Patch Set: Fix small errors, smoke tests rock. Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2009 Google Inc. All Rights Reserved. 1 # Copyright 2009 Google Inc. All Rights Reserved.
2 # 2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); 3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License. 4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at 5 # You may obtain a copy of the License at
6 # 6 #
7 # http://www.apache.org/licenses/LICENSE-2.0 7 # http://www.apache.org/licenses/LICENSE-2.0
8 # 8 #
9 # Unless required by applicable law or agreed to in writing, software 9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, 10 # distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after
290 filter_fn(line) 290 filter_fn(line)
291 kwargs['filter_fn'] = filter_msg 291 kwargs['filter_fn'] = filter_msg
292 kwargs['call_filter_on_first_line'] = True 292 kwargs['call_filter_on_first_line'] = True
293 # Obviously. 293 # Obviously.
294 kwargs['print_stdout'] = True 294 kwargs['print_stdout'] = True
295 return CheckCallAndFilter(args, **kwargs) 295 return CheckCallAndFilter(args, **kwargs)
296 296
297 297
298 def SoftClone(obj): 298 def SoftClone(obj):
299 """Clones an object. copy.copy() doesn't work on 'file' objects.""" 299 """Clones an object. copy.copy() doesn't work on 'file' objects."""
300 class NewObject(object): pass 300 if obj.__class__.__name__ == 'SoftCloned':
301 new_obj = NewObject() 301 return obj
302 class SoftCloned(object): pass
303 new_obj = SoftCloned()
302 for member in dir(obj): 304 for member in dir(obj):
303 if member.startswith('_'): 305 if member.startswith('_'):
304 continue 306 continue
305 setattr(new_obj, member, getattr(obj, member)) 307 setattr(new_obj, member, getattr(obj, member))
306 return new_obj 308 return new_obj
307 309
308 310
309 def MakeFileAutoFlush(fileobj, delay=10): 311 def MakeFileAutoFlush(fileobj, delay=10):
310 """Creates a file object clone to automatically flush after N seconds.""" 312 """Creates a file object clone to automatically flush after N seconds."""
311 if hasattr(fileobj, 'last_flushed_at'): 313 if hasattr(fileobj, 'last_flushed_at'):
312 # Already patched. Just update delay. 314 # Already patched. Just update delay.
313 fileobj.delay = delay 315 fileobj.delay = delay
314 return fileobj 316 return fileobj
315 317
316 new_fileobj = SoftClone(fileobj) 318 new_fileobj = SoftClone(fileobj)
317 new_fileobj.lock = threading.Lock() 319 if not hasattr(new_fileobj, 'lock'):
320 new_fileobj.lock = threading.Lock()
318 new_fileobj.last_flushed_at = time.time() 321 new_fileobj.last_flushed_at = time.time()
319 new_fileobj.delay = delay 322 new_fileobj.delay = delay
320 new_fileobj.old_auto_flush_write = fileobj.write 323 new_fileobj.old_auto_flush_write = new_fileobj.write
321 # Silence pylint. 324 # Silence pylint.
322 new_fileobj.flush = fileobj.flush 325 new_fileobj.flush = fileobj.flush
323 326
324 def auto_flush_write(out): 327 def auto_flush_write(out):
325 new_fileobj.old_auto_flush_write(out) 328 new_fileobj.old_auto_flush_write(out)
326 should_flush = False 329 should_flush = False
327 new_fileobj.lock.acquire() 330 new_fileobj.lock.acquire()
328 try: 331 try:
329 if (new_fileobj.delay and 332 if (new_fileobj.delay and
330 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): 333 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay):
331 should_flush = True 334 should_flush = True
332 new_fileobj.last_flushed_at = time.time() 335 new_fileobj.last_flushed_at = time.time()
333 finally: 336 finally:
334 new_fileobj.lock.release() 337 new_fileobj.lock.release()
335 if should_flush: 338 if should_flush:
336 new_fileobj.flush() 339 new_fileobj.flush()
337 340
338 new_fileobj.write = auto_flush_write 341 new_fileobj.write = auto_flush_write
339 return new_fileobj 342 return new_fileobj
340 343
341 344
342 class StdoutAnnotated(object): 345 def MakeFileAnnotated(fileobj):
343 """Prepends every line with a string.""" 346 """Creates a file object clone to automatically prepends every line in worker
344 def __init__(self, prepend, stdout): 347 threads with a NN> prefix."""
345 self.prepend = prepend 348 if hasattr(fileobj, 'output_buffers'):
346 self.buf = '' 349 # Already patched.
347 self.stdout = stdout 350 return fileobj
348 351
349 def write(self, out): 352 new_fileobj = SoftClone(fileobj)
350 self.buf += out 353 if not hasattr(new_fileobj, 'lock'):
351 while '\n' in self.buf: 354 new_fileobj.lock = threading.Lock()
352 line, self.buf = self.buf.split('\n', 1) 355 new_fileobj.output_buffers = {}
353 self.stdout.write(self.prepend + line + '\n') 356 new_fileobj.old_annotated_write = new_fileobj.write
354 357
355 def flush(self): 358 def annotated_write(out):
356 pass 359 index = getattr(threading.currentThread(), 'index', None)
360 if index is None:
361 # Undexed threads aren't buffered.
362 new_fileobj.old_annotated_write(out)
363 return
357 364
358 def full_flush(self): 365 new_fileobj.lock.acquire()
359 if self.buf: 366 try:
360 self.stdout.write(self.prepend + self.buf) 367 # Use a dummy array to hold the string so the code can be lockless.
361 self.stdout.flush() 368 # Strings are immutable, requiring to keep a lock for the whole dictionary
362 self.buf = '' 369 # otherwise. Using an array is faster than using a dummy object.
370 if not index in new_fileobj.output_buffers:
371 obj = new_fileobj.output_buffers[index] = ['']
372 else:
373 obj = new_fileobj.output_buffers[index]
374 finally:
375 new_fileobj.lock.release()
376
377 # Continue lockless.
378 obj[0] += out
379 while '\n' in obj[0]:
380 line, remaining = obj[0].split('\n', 1)
381 new_fileobj.old_annotated_write('%d>%s\n' % (index, line))
382 obj[0] = remaining
383
384 def full_flush():
385 """Flush buffered output."""
386 orphans = []
387 new_fileobj.lock.acquire()
388 try:
389 # Detect threads no longer existing.
390 indexes = (getattr(t, 'index', None) for t in threading.enumerate())
391 indexed = filter(None, indexes)
392 for index in new_fileobj.output_buffers:
393 if not index in indexes:
394 orphans.append((index, new_fileobj.output_buffers[index][0]))
395 for orphan in orphans:
396 del new_fileobj.output_buffers[orphan[0]]
397 finally:
398 new_fileobj.lock.release()
399
400 # Don't keep the lock while writting. Will append \n when it shouldn't.
401 for orphan in orphans:
402 new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1]))
403
404 new_fileobj.write = annotated_write
405 new_fileobj.full_flush = full_flush
406 return new_fileobj
363 407
364 408
365 def CheckCallAndFilter(args, stdout=None, filter_fn=None, 409 def CheckCallAndFilter(args, stdout=None, filter_fn=None,
366 print_stdout=None, call_filter_on_first_line=False, 410 print_stdout=None, call_filter_on_first_line=False,
367 **kwargs): 411 **kwargs):
368 """Runs a command and calls back a filter function if needed. 412 """Runs a command and calls back a filter function if needed.
369 413
370 Accepts all subprocess.Popen() parameters plus: 414 Accepts all subprocess.Popen() parameters plus:
371 print_stdout: If True, the command's stdout is forwarded to stdout. 415 print_stdout: If True, the command's stdout is forwarded to stdout.
372 filter_fn: A function taking a single string argument called with each line 416 filter_fn: A function taking a single string argument called with each line
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
621 if self.progress: 665 if self.progress:
622 self.progress.update(1) 666 self.progress.update(1)
623 assert not t.item.name in self.ran 667 assert not t.item.name in self.ran
624 if not t.item.name in self.ran: 668 if not t.item.name in self.ran:
625 self.ran.append(t.item.name) 669 self.ran.append(t.item.name)
626 670
627 def _run_one_task(self, task_item, args, kwargs): 671 def _run_one_task(self, task_item, args, kwargs):
628 if self.jobs > 1: 672 if self.jobs > 1:
629 # Start the thread. 673 # Start the thread.
630 index = len(self.ran) + len(self.running) + 1 674 index = len(self.ran) + len(self.running) + 1
631 # Copy 'options' and add annotated stdout. 675 # Copy 'options'.
632 task_kwargs = kwargs.copy() 676 task_kwargs = kwargs.copy()
633 task_kwargs['options'] = copy.copy(task_kwargs['options']) 677 task_kwargs['options'] = copy.copy(task_kwargs['options'])
634 task_kwargs['options'].stdout = StdoutAnnotated( 678 new_thread = self._Worker(task_item, index, args, task_kwargs)
635 '%d>' % index, task_kwargs['options'].stdout)
636 new_thread = self._Worker(task_item, args, task_kwargs)
637 self.running.append(new_thread) 679 self.running.append(new_thread)
638 new_thread.start() 680 new_thread.start()
639 else: 681 else:
640 # Run the 'thread' inside the main thread. Don't try to catch any 682 # Run the 'thread' inside the main thread. Don't try to catch any
641 # exception. 683 # exception.
642 task_item.run(*args, **kwargs) 684 task_item.run(*args, **kwargs)
643 self.ran.append(task_item.name) 685 self.ran.append(task_item.name)
644 if self.progress: 686 if self.progress:
645 self.progress.update(1) 687 self.progress.update(1)
646 688
647 class _Worker(threading.Thread): 689 class _Worker(threading.Thread):
648 """One thread to execute one WorkItem.""" 690 """One thread to execute one WorkItem."""
649 def __init__(self, item, args, kwargs): 691 def __init__(self, item, index, args, kwargs):
650 threading.Thread.__init__(self, name=item.name or 'Worker') 692 threading.Thread.__init__(self, name=item.name or 'Worker')
651 logging.info(item.name) 693 logging.info(item.name)
652 self.item = item 694 self.item = item
695 self.index = index
653 self.args = args 696 self.args = args
654 self.kwargs = kwargs 697 self.kwargs = kwargs
655 698
656 def run(self): 699 def run(self):
657 """Runs in its own thread.""" 700 """Runs in its own thread."""
658 logging.debug('running(%s)' % self.item.name) 701 logging.debug('running(%s)' % self.item.name)
659 work_queue = self.kwargs['work_queue'] 702 work_queue = self.kwargs['work_queue']
660 try: 703 try:
661 self.item.run(*self.args, **self.kwargs) 704 self.item.run(*self.args, **self.kwargs)
662 except Exception: 705 except Exception:
663 # Catch exception location. 706 # Catch exception location.
664 logging.info('Caught exception in thread %s' % self.item.name) 707 logging.info('Caught exception in thread %s' % self.item.name)
665 logging.info(str(sys.exc_info())) 708 logging.info(str(sys.exc_info()))
666 work_queue.exceptions.put(sys.exc_info()) 709 work_queue.exceptions.put(sys.exc_info())
667 logging.info('Task %s done' % self.item.name) 710 logging.info('Task %s done' % self.item.name)
668 711
669 work_queue.ready_cond.acquire() 712 work_queue.ready_cond.acquire()
670 try: 713 try:
671 work_queue.ready_cond.notifyAll() 714 work_queue.ready_cond.notifyAll()
672 finally: 715 finally:
673 work_queue.ready_cond.release() 716 work_queue.ready_cond.release()
OLDNEW
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698