| OLD | NEW |
| 1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
| 2 # | 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
| 6 # | 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # | 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 290 filter_fn(line) | 290 filter_fn(line) |
| 291 kwargs['filter_fn'] = filter_msg | 291 kwargs['filter_fn'] = filter_msg |
| 292 kwargs['call_filter_on_first_line'] = True | 292 kwargs['call_filter_on_first_line'] = True |
| 293 # Obviously. | 293 # Obviously. |
| 294 kwargs['print_stdout'] = True | 294 kwargs['print_stdout'] = True |
| 295 return CheckCallAndFilter(args, **kwargs) | 295 return CheckCallAndFilter(args, **kwargs) |
| 296 | 296 |
| 297 | 297 |
| 298 def SoftClone(obj): | 298 def SoftClone(obj): |
| 299 """Clones an object. copy.copy() doesn't work on 'file' objects.""" | 299 """Clones an object. copy.copy() doesn't work on 'file' objects.""" |
| 300 class NewObject(object): pass | 300 if obj.__class__.__name__ == 'SoftCloned': |
| 301 new_obj = NewObject() | 301 return obj |
| 302 class SoftCloned(object): pass |
| 303 new_obj = SoftCloned() |
| 302 for member in dir(obj): | 304 for member in dir(obj): |
| 303 if member.startswith('_'): | 305 if member.startswith('_'): |
| 304 continue | 306 continue |
| 305 setattr(new_obj, member, getattr(obj, member)) | 307 setattr(new_obj, member, getattr(obj, member)) |
| 306 return new_obj | 308 return new_obj |
| 307 | 309 |
| 308 | 310 |
| 309 def MakeFileAutoFlush(fileobj, delay=10): | 311 def MakeFileAutoFlush(fileobj, delay=10): |
| 310 """Creates a file object clone to automatically flush after N seconds.""" | 312 """Creates a file object clone to automatically flush after N seconds.""" |
| 311 if hasattr(fileobj, 'last_flushed_at'): | 313 if hasattr(fileobj, 'last_flushed_at'): |
| 312 # Already patched. Just update delay. | 314 # Already patched. Just update delay. |
| 313 fileobj.delay = delay | 315 fileobj.delay = delay |
| 314 return fileobj | 316 return fileobj |
| 315 | 317 |
| 316 new_fileobj = SoftClone(fileobj) | 318 new_fileobj = SoftClone(fileobj) |
| 317 new_fileobj.lock = threading.Lock() | 319 if not hasattr(new_fileobj, 'lock'): |
| 320 new_fileobj.lock = threading.Lock() |
| 318 new_fileobj.last_flushed_at = time.time() | 321 new_fileobj.last_flushed_at = time.time() |
| 319 new_fileobj.delay = delay | 322 new_fileobj.delay = delay |
| 320 new_fileobj.old_auto_flush_write = fileobj.write | 323 new_fileobj.old_auto_flush_write = new_fileobj.write |
| 321 # Silence pylint. | 324 # Silence pylint. |
| 322 new_fileobj.flush = fileobj.flush | 325 new_fileobj.flush = fileobj.flush |
| 323 | 326 |
| 324 def auto_flush_write(out): | 327 def auto_flush_write(out): |
| 325 new_fileobj.old_auto_flush_write(out) | 328 new_fileobj.old_auto_flush_write(out) |
| 326 should_flush = False | 329 should_flush = False |
| 327 new_fileobj.lock.acquire() | 330 new_fileobj.lock.acquire() |
| 328 try: | 331 try: |
| 329 if (new_fileobj.delay and | 332 if (new_fileobj.delay and |
| 330 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): | 333 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): |
| 331 should_flush = True | 334 should_flush = True |
| 332 new_fileobj.last_flushed_at = time.time() | 335 new_fileobj.last_flushed_at = time.time() |
| 333 finally: | 336 finally: |
| 334 new_fileobj.lock.release() | 337 new_fileobj.lock.release() |
| 335 if should_flush: | 338 if should_flush: |
| 336 new_fileobj.flush() | 339 new_fileobj.flush() |
| 337 | 340 |
| 338 new_fileobj.write = auto_flush_write | 341 new_fileobj.write = auto_flush_write |
| 339 return new_fileobj | 342 return new_fileobj |
| 340 | 343 |
| 341 | 344 |
| 342 class StdoutAnnotated(object): | 345 def MakeFileAnnotated(fileobj): |
| 343 """Prepends every line with a string.""" | 346 """Creates a file object clone to automatically prepends every line in worker |
| 344 def __init__(self, prepend, stdout): | 347 threads with a NN> prefix.""" |
| 345 self.prepend = prepend | 348 if hasattr(fileobj, 'output_buffers'): |
| 346 self.buf = '' | 349 # Already patched. |
| 347 self.stdout = stdout | 350 return fileobj |
| 348 | 351 |
| 349 def write(self, out): | 352 new_fileobj = SoftClone(fileobj) |
| 350 self.buf += out | 353 if not hasattr(new_fileobj, 'lock'): |
| 351 while '\n' in self.buf: | 354 new_fileobj.lock = threading.Lock() |
| 352 line, self.buf = self.buf.split('\n', 1) | 355 new_fileobj.output_buffers = {} |
| 353 self.stdout.write(self.prepend + line + '\n') | 356 new_fileobj.old_annotated_write = new_fileobj.write |
| 354 | 357 |
| 355 def flush(self): | 358 def annotated_write(out): |
| 356 pass | 359 index = getattr(threading.currentThread(), 'index', None) |
| 360 if index is None: |
| 361 # Undexed threads aren't buffered. |
| 362 new_fileobj.old_annotated_write(out) |
| 363 return |
| 357 | 364 |
| 358 def full_flush(self): | 365 new_fileobj.lock.acquire() |
| 359 if self.buf: | 366 try: |
| 360 self.stdout.write(self.prepend + self.buf) | 367 # Use a dummy array to hold the string so the code can be lockless. |
| 361 self.stdout.flush() | 368 # Strings are immutable, requiring to keep a lock for the whole dictionary |
| 362 self.buf = '' | 369 # otherwise. Using an array is faster than using a dummy object. |
| 370 if not index in new_fileobj.output_buffers: |
| 371 obj = new_fileobj.output_buffers[index] = [''] |
| 372 else: |
| 373 obj = new_fileobj.output_buffers[index] |
| 374 finally: |
| 375 new_fileobj.lock.release() |
| 376 |
| 377 # Continue lockless. |
| 378 obj[0] += out |
| 379 while '\n' in obj[0]: |
| 380 line, remaining = obj[0].split('\n', 1) |
| 381 new_fileobj.old_annotated_write('%d>%s\n' % (index, line)) |
| 382 obj[0] = remaining |
| 383 |
| 384 def full_flush(): |
| 385 """Flush buffered output.""" |
| 386 orphans = [] |
| 387 new_fileobj.lock.acquire() |
| 388 try: |
| 389 # Detect threads no longer existing. |
| 390 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) |
| 391 indexed = filter(None, indexes) |
| 392 for index in new_fileobj.output_buffers: |
| 393 if not index in indexes: |
| 394 orphans.append((index, new_fileobj.output_buffers[index][0])) |
| 395 for orphan in orphans: |
| 396 del new_fileobj.output_buffers[orphan[0]] |
| 397 finally: |
| 398 new_fileobj.lock.release() |
| 399 |
| 400 # Don't keep the lock while writting. Will append \n when it shouldn't. |
| 401 for orphan in orphans: |
| 402 new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1])) |
| 403 |
| 404 new_fileobj.write = annotated_write |
| 405 new_fileobj.full_flush = full_flush |
| 406 return new_fileobj |
| 363 | 407 |
| 364 | 408 |
| 365 def CheckCallAndFilter(args, stdout=None, filter_fn=None, | 409 def CheckCallAndFilter(args, stdout=None, filter_fn=None, |
| 366 print_stdout=None, call_filter_on_first_line=False, | 410 print_stdout=None, call_filter_on_first_line=False, |
| 367 **kwargs): | 411 **kwargs): |
| 368 """Runs a command and calls back a filter function if needed. | 412 """Runs a command and calls back a filter function if needed. |
| 369 | 413 |
| 370 Accepts all subprocess.Popen() parameters plus: | 414 Accepts all subprocess.Popen() parameters plus: |
| 371 print_stdout: If True, the command's stdout is forwarded to stdout. | 415 print_stdout: If True, the command's stdout is forwarded to stdout. |
| 372 filter_fn: A function taking a single string argument called with each line | 416 filter_fn: A function taking a single string argument called with each line |
| (...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 621 if self.progress: | 665 if self.progress: |
| 622 self.progress.update(1) | 666 self.progress.update(1) |
| 623 assert not t.item.name in self.ran | 667 assert not t.item.name in self.ran |
| 624 if not t.item.name in self.ran: | 668 if not t.item.name in self.ran: |
| 625 self.ran.append(t.item.name) | 669 self.ran.append(t.item.name) |
| 626 | 670 |
| 627 def _run_one_task(self, task_item, args, kwargs): | 671 def _run_one_task(self, task_item, args, kwargs): |
| 628 if self.jobs > 1: | 672 if self.jobs > 1: |
| 629 # Start the thread. | 673 # Start the thread. |
| 630 index = len(self.ran) + len(self.running) + 1 | 674 index = len(self.ran) + len(self.running) + 1 |
| 631 # Copy 'options' and add annotated stdout. | 675 # Copy 'options'. |
| 632 task_kwargs = kwargs.copy() | 676 task_kwargs = kwargs.copy() |
| 633 task_kwargs['options'] = copy.copy(task_kwargs['options']) | 677 task_kwargs['options'] = copy.copy(task_kwargs['options']) |
| 634 task_kwargs['options'].stdout = StdoutAnnotated( | 678 new_thread = self._Worker(task_item, index, args, task_kwargs) |
| 635 '%d>' % index, task_kwargs['options'].stdout) | |
| 636 new_thread = self._Worker(task_item, args, task_kwargs) | |
| 637 self.running.append(new_thread) | 679 self.running.append(new_thread) |
| 638 new_thread.start() | 680 new_thread.start() |
| 639 else: | 681 else: |
| 640 # Run the 'thread' inside the main thread. Don't try to catch any | 682 # Run the 'thread' inside the main thread. Don't try to catch any |
| 641 # exception. | 683 # exception. |
| 642 task_item.run(*args, **kwargs) | 684 task_item.run(*args, **kwargs) |
| 643 self.ran.append(task_item.name) | 685 self.ran.append(task_item.name) |
| 644 if self.progress: | 686 if self.progress: |
| 645 self.progress.update(1) | 687 self.progress.update(1) |
| 646 | 688 |
| 647 class _Worker(threading.Thread): | 689 class _Worker(threading.Thread): |
| 648 """One thread to execute one WorkItem.""" | 690 """One thread to execute one WorkItem.""" |
| 649 def __init__(self, item, args, kwargs): | 691 def __init__(self, item, index, args, kwargs): |
| 650 threading.Thread.__init__(self, name=item.name or 'Worker') | 692 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 651 logging.info(item.name) | 693 logging.info(item.name) |
| 652 self.item = item | 694 self.item = item |
| 695 self.index = index |
| 653 self.args = args | 696 self.args = args |
| 654 self.kwargs = kwargs | 697 self.kwargs = kwargs |
| 655 | 698 |
| 656 def run(self): | 699 def run(self): |
| 657 """Runs in its own thread.""" | 700 """Runs in its own thread.""" |
| 658 logging.debug('running(%s)' % self.item.name) | 701 logging.debug('running(%s)' % self.item.name) |
| 659 work_queue = self.kwargs['work_queue'] | 702 work_queue = self.kwargs['work_queue'] |
| 660 try: | 703 try: |
| 661 self.item.run(*self.args, **self.kwargs) | 704 self.item.run(*self.args, **self.kwargs) |
| 662 except Exception: | 705 except Exception: |
| 663 # Catch exception location. | 706 # Catch exception location. |
| 664 logging.info('Caught exception in thread %s' % self.item.name) | 707 logging.info('Caught exception in thread %s' % self.item.name) |
| 665 logging.info(str(sys.exc_info())) | 708 logging.info(str(sys.exc_info())) |
| 666 work_queue.exceptions.put(sys.exc_info()) | 709 work_queue.exceptions.put(sys.exc_info()) |
| 667 logging.info('Task %s done' % self.item.name) | 710 logging.info('Task %s done' % self.item.name) |
| 668 | 711 |
| 669 work_queue.ready_cond.acquire() | 712 work_queue.ready_cond.acquire() |
| 670 try: | 713 try: |
| 671 work_queue.ready_cond.notifyAll() | 714 work_queue.ready_cond.notifyAll() |
| 672 finally: | 715 finally: |
| 673 work_queue.ready_cond.release() | 716 work_queue.ready_cond.release() |
| OLD | NEW |