OLD | NEW |
1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
2 # | 2 # |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
6 # | 6 # |
7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
8 # | 8 # |
9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
(...skipping 296 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
307 self.last_flushed_at = time.time() | 307 self.last_flushed_at = time.time() |
308 finally: | 308 finally: |
309 self.lock.release() | 309 self.lock.release() |
310 if should_flush: | 310 if should_flush: |
311 self.stdout.flush() | 311 self.stdout.flush() |
312 | 312 |
313 def flush(self): | 313 def flush(self): |
314 self.stdout.flush() | 314 self.stdout.flush() |
315 | 315 |
316 | 316 |
| 317 class StdoutAnnotated(object): |
| 318 """Prepends every line with a string.""" |
| 319 def __init__(self, prepend, stdout): |
| 320 self.prepend = prepend |
| 321 self.buf = '' |
| 322 self.stdout = stdout |
| 323 |
| 324 def write(self, out): |
| 325 self.buf += out |
| 326 while '\n' in self.buf: |
| 327 line, self.buf = self.buf.split('\n', 1) |
| 328 self.stdout.write(self.prepend + line + '\n') |
| 329 |
| 330 def flush(self): |
| 331 pass |
| 332 |
| 333 def full_flush(self): |
| 334 if self.buf: |
| 335 self.stdout.write(self.prepend + self.buf) |
| 336 self.stdout.flush() |
| 337 self.buf = '' |
| 338 |
| 339 |
317 def CheckCallAndFilter(args, stdout=None, filter_fn=None, | 340 def CheckCallAndFilter(args, stdout=None, filter_fn=None, |
318 print_stdout=None, call_filter_on_first_line=False, | 341 print_stdout=None, call_filter_on_first_line=False, |
319 **kwargs): | 342 **kwargs): |
320 """Runs a command and calls back a filter function if needed. | 343 """Runs a command and calls back a filter function if needed. |
321 | 344 |
322 Accepts all subprocess.Popen() parameters plus: | 345 Accepts all subprocess.Popen() parameters plus: |
323 print_stdout: If True, the command's stdout is forwarded to stdout. | 346 print_stdout: If True, the command's stdout is forwarded to stdout. |
324 filter_fn: A function taking a single string argument called with each line | 347 filter_fn: A function taking a single string argument called with each line |
325 of the subprocess's output. Each line has the trailing newline | 348 of the subprocess's output. Each line has the trailing newline |
326 character trimmed. | 349 character trimmed. |
(...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
562 | 585 |
563 def _flush_terminated_threads(self): | 586 def _flush_terminated_threads(self): |
564 """Flush threads that have terminated.""" | 587 """Flush threads that have terminated.""" |
565 running = self.running | 588 running = self.running |
566 self.running = [] | 589 self.running = [] |
567 for t in running: | 590 for t in running: |
568 if t.isAlive(): | 591 if t.isAlive(): |
569 self.running.append(t) | 592 self.running.append(t) |
570 else: | 593 else: |
571 t.join() | 594 t.join() |
572 t.kwargs['options'].stdout.flush() | 595 t.kwargs['options'].stdout.full_flush() |
573 if self.progress: | 596 if self.progress: |
574 self.progress.update(1) | 597 self.progress.update(1) |
575 assert not t.name in self.ran | 598 assert not t.name in self.ran |
576 if not t.name in self.ran: | 599 if not t.name in self.ran: |
577 self.ran.append(t.name) | 600 self.ran.append(t.name) |
578 | 601 |
579 def _run_one_task(self, task_item, args, kwargs): | 602 def _run_one_task(self, task_item, args, kwargs): |
580 if self.jobs > 1: | 603 if self.jobs > 1: |
581 # Start the thread. | 604 # Start the thread. |
582 index = len(self.ran) + len(self.running) + 1 | 605 index = len(self.ran) + len(self.running) + 1 |
583 # Copy 'options' just to be safe. | 606 # Copy 'options' and add annotated stdout. |
584 task_kwargs = kwargs.copy() | 607 task_kwargs = kwargs.copy() |
585 task_kwargs['options'] = copy.copy(task_kwargs['options']) | 608 task_kwargs['options'] = copy.copy(task_kwargs['options']) |
| 609 task_kwargs['options'].stdout = StdoutAnnotated( |
| 610 '%d>' % index, task_kwargs['options'].stdout) |
586 new_thread = self._Worker(task_item, args, task_kwargs) | 611 new_thread = self._Worker(task_item, args, task_kwargs) |
587 self.running.append(new_thread) | 612 self.running.append(new_thread) |
588 new_thread.start() | 613 new_thread.start() |
589 else: | 614 else: |
590 # Run the 'thread' inside the main thread. Don't try to catch any | 615 # Run the 'thread' inside the main thread. Don't try to catch any |
591 # exception. | 616 # exception. |
592 task_item.run(*args, **kwargs) | 617 task_item.run(*args, **kwargs) |
593 self.ran.append(task_item.name) | 618 self.ran.append(task_item.name) |
594 | 619 |
595 class _Worker(threading.Thread): | 620 class _Worker(threading.Thread): |
(...skipping 16 matching lines...) Expand all Loading... |
612 logging.info('Caught exception in thread %s' % self.item.name) | 637 logging.info('Caught exception in thread %s' % self.item.name) |
613 logging.info(str(sys.exc_info())) | 638 logging.info(str(sys.exc_info())) |
614 work_queue.exceptions.put(sys.exc_info()) | 639 work_queue.exceptions.put(sys.exc_info()) |
615 logging.info('Task %s done' % self.item.name) | 640 logging.info('Task %s done' % self.item.name) |
616 | 641 |
617 work_queue.ready_cond.acquire() | 642 work_queue.ready_cond.acquire() |
618 try: | 643 try: |
619 work_queue.ready_cond.notifyAll() | 644 work_queue.ready_cond.notifyAll() |
620 finally: | 645 finally: |
621 work_queue.ready_cond.release() | 646 work_queue.ready_cond.release() |
OLD | NEW |