OLD | NEW |
1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
2 # | 2 # |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
6 # | 6 # |
7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
8 # | 8 # |
9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
290 filter_fn(line) | 290 filter_fn(line) |
291 kwargs['filter_fn'] = filter_msg | 291 kwargs['filter_fn'] = filter_msg |
292 kwargs['call_filter_on_first_line'] = True | 292 kwargs['call_filter_on_first_line'] = True |
293 # Obviously. | 293 # Obviously. |
294 kwargs['print_stdout'] = True | 294 kwargs['print_stdout'] = True |
295 return CheckCallAndFilter(args, **kwargs) | 295 return CheckCallAndFilter(args, **kwargs) |
296 | 296 |
297 | 297 |
298 def SoftClone(obj): | 298 def SoftClone(obj): |
299 """Clones an object. copy.copy() doesn't work on 'file' objects.""" | 299 """Clones an object. copy.copy() doesn't work on 'file' objects.""" |
300 class NewObject(object): pass | 300 if obj.__class__.__name__ == 'SoftCloned': |
301 new_obj = NewObject() | 301 return obj |
| 302 class SoftCloned(object): pass |
| 303 new_obj = SoftCloned() |
302 for member in dir(obj): | 304 for member in dir(obj): |
303 if member.startswith('_'): | 305 if member.startswith('_'): |
304 continue | 306 continue |
305 setattr(new_obj, member, getattr(obj, member)) | 307 setattr(new_obj, member, getattr(obj, member)) |
306 return new_obj | 308 return new_obj |
307 | 309 |
308 | 310 |
309 def MakeFileAutoFlush(fileobj, delay=10): | 311 def MakeFileAutoFlush(fileobj, delay=10): |
310 """Creates a file object clone to automatically flush after N seconds.""" | 312 """Creates a file object clone to automatically flush after N seconds.""" |
311 if hasattr(fileobj, 'last_flushed_at'): | 313 if hasattr(fileobj, 'last_flushed_at'): |
312 # Already patched. Just update delay. | 314 # Already patched. Just update delay. |
313 fileobj.delay = delay | 315 fileobj.delay = delay |
314 return fileobj | 316 return fileobj |
315 | 317 |
316 new_fileobj = SoftClone(fileobj) | 318 new_fileobj = SoftClone(fileobj) |
317 new_fileobj.lock = threading.Lock() | 319 if not hasattr(new_fileobj, 'lock'): |
| 320 new_fileobj.lock = threading.Lock() |
318 new_fileobj.last_flushed_at = time.time() | 321 new_fileobj.last_flushed_at = time.time() |
319 new_fileobj.delay = delay | 322 new_fileobj.delay = delay |
320 new_fileobj.old_auto_flush_write = fileobj.write | 323 new_fileobj.old_auto_flush_write = new_fileobj.write |
321 # Silence pylint. | 324 # Silence pylint. |
322 new_fileobj.flush = fileobj.flush | 325 new_fileobj.flush = fileobj.flush |
323 | 326 |
324 def auto_flush_write(out): | 327 def auto_flush_write(out): |
325 new_fileobj.old_auto_flush_write(out) | 328 new_fileobj.old_auto_flush_write(out) |
326 should_flush = False | 329 should_flush = False |
327 new_fileobj.lock.acquire() | 330 new_fileobj.lock.acquire() |
328 try: | 331 try: |
329 if (new_fileobj.delay and | 332 if (new_fileobj.delay and |
330 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): | 333 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): |
331 should_flush = True | 334 should_flush = True |
332 new_fileobj.last_flushed_at = time.time() | 335 new_fileobj.last_flushed_at = time.time() |
333 finally: | 336 finally: |
334 new_fileobj.lock.release() | 337 new_fileobj.lock.release() |
335 if should_flush: | 338 if should_flush: |
336 new_fileobj.flush() | 339 new_fileobj.flush() |
337 | 340 |
338 new_fileobj.write = auto_flush_write | 341 new_fileobj.write = auto_flush_write |
339 return new_fileobj | 342 return new_fileobj |
340 | 343 |
341 | 344 |
342 class StdoutAnnotated(object): | 345 def MakeFileAnnotated(fileobj): |
343 """Prepends every line with a string.""" | 346 """Creates a file object clone to automatically prepends every line in worker |
344 def __init__(self, prepend, stdout): | 347 threads with a NN> prefix.""" |
345 self.prepend = prepend | 348 if hasattr(fileobj, 'output_buffers'): |
346 self.buf = '' | 349 # Already patched. |
347 self.stdout = stdout | 350 return fileobj |
348 | 351 |
349 def write(self, out): | 352 new_fileobj = SoftClone(fileobj) |
350 self.buf += out | 353 if not hasattr(new_fileobj, 'lock'): |
351 while '\n' in self.buf: | 354 new_fileobj.lock = threading.Lock() |
352 line, self.buf = self.buf.split('\n', 1) | 355 new_fileobj.output_buffers = {} |
353 self.stdout.write(self.prepend + line + '\n') | 356 new_fileobj.old_annotated_write = new_fileobj.write |
354 | 357 |
355 def flush(self): | 358 def annotated_write(out): |
356 pass | 359 index = getattr(threading.currentThread(), 'index', None) |
| 360 if index is None: |
| 361 # Undexed threads aren't buffered. |
| 362 new_fileobj.old_annotated_write(out) |
| 363 return |
357 | 364 |
358 def full_flush(self): | 365 new_fileobj.lock.acquire() |
359 if self.buf: | 366 try: |
360 self.stdout.write(self.prepend + self.buf) | 367 # Use a dummy array to hold the string so the code can be lockless. |
361 self.stdout.flush() | 368 # Strings are immutable, requiring to keep a lock for the whole dictionary |
362 self.buf = '' | 369 # otherwise. Using an array is faster than using a dummy object. |
| 370 if not index in new_fileobj.output_buffers: |
| 371 obj = new_fileobj.output_buffers[index] = [''] |
| 372 else: |
| 373 obj = new_fileobj.output_buffers[index] |
| 374 finally: |
| 375 new_fileobj.lock.release() |
| 376 |
| 377 # Continue lockless. |
| 378 obj[0] += out |
| 379 while '\n' in obj[0]: |
| 380 line, remaining = obj[0].split('\n', 1) |
| 381 new_fileobj.old_annotated_write('%d>%s\n' % (index, line)) |
| 382 obj[0] = remaining |
| 383 |
| 384 def full_flush(): |
| 385 """Flush buffered output.""" |
| 386 orphans = [] |
| 387 new_fileobj.lock.acquire() |
| 388 try: |
| 389 # Detect threads no longer existing. |
| 390 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) |
| 391 indexed = filter(None, indexes) |
| 392 for index in new_fileobj.output_buffers: |
| 393 if not index in indexes: |
| 394 orphans.append((index, new_fileobj.output_buffers[index][0])) |
| 395 for orphan in orphans: |
| 396 del new_fileobj.output_buffers[orphan[0]] |
| 397 finally: |
| 398 new_fileobj.lock.release() |
| 399 |
| 400 # Don't keep the lock while writting. Will append \n when it shouldn't. |
| 401 for orphan in orphans: |
| 402 new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1])) |
| 403 |
| 404 new_fileobj.write = annotated_write |
| 405 new_fileobj.full_flush = full_flush |
| 406 return new_fileobj |
363 | 407 |
364 | 408 |
365 def CheckCallAndFilter(args, stdout=None, filter_fn=None, | 409 def CheckCallAndFilter(args, stdout=None, filter_fn=None, |
366 print_stdout=None, call_filter_on_first_line=False, | 410 print_stdout=None, call_filter_on_first_line=False, |
367 **kwargs): | 411 **kwargs): |
368 """Runs a command and calls back a filter function if needed. | 412 """Runs a command and calls back a filter function if needed. |
369 | 413 |
370 Accepts all subprocess.Popen() parameters plus: | 414 Accepts all subprocess.Popen() parameters plus: |
371 print_stdout: If True, the command's stdout is forwarded to stdout. | 415 print_stdout: If True, the command's stdout is forwarded to stdout. |
372 filter_fn: A function taking a single string argument called with each line | 416 filter_fn: A function taking a single string argument called with each line |
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
621 if self.progress: | 665 if self.progress: |
622 self.progress.update(1) | 666 self.progress.update(1) |
623 assert not t.item.name in self.ran | 667 assert not t.item.name in self.ran |
624 if not t.item.name in self.ran: | 668 if not t.item.name in self.ran: |
625 self.ran.append(t.item.name) | 669 self.ran.append(t.item.name) |
626 | 670 |
627 def _run_one_task(self, task_item, args, kwargs): | 671 def _run_one_task(self, task_item, args, kwargs): |
628 if self.jobs > 1: | 672 if self.jobs > 1: |
629 # Start the thread. | 673 # Start the thread. |
630 index = len(self.ran) + len(self.running) + 1 | 674 index = len(self.ran) + len(self.running) + 1 |
631 # Copy 'options' and add annotated stdout. | 675 # Copy 'options'. |
632 task_kwargs = kwargs.copy() | 676 task_kwargs = kwargs.copy() |
633 task_kwargs['options'] = copy.copy(task_kwargs['options']) | 677 task_kwargs['options'] = copy.copy(task_kwargs['options']) |
634 task_kwargs['options'].stdout = StdoutAnnotated( | 678 new_thread = self._Worker(task_item, index, args, task_kwargs) |
635 '%d>' % index, task_kwargs['options'].stdout) | |
636 new_thread = self._Worker(task_item, args, task_kwargs) | |
637 self.running.append(new_thread) | 679 self.running.append(new_thread) |
638 new_thread.start() | 680 new_thread.start() |
639 else: | 681 else: |
640 # Run the 'thread' inside the main thread. Don't try to catch any | 682 # Run the 'thread' inside the main thread. Don't try to catch any |
641 # exception. | 683 # exception. |
642 task_item.run(*args, **kwargs) | 684 task_item.run(*args, **kwargs) |
643 self.ran.append(task_item.name) | 685 self.ran.append(task_item.name) |
644 if self.progress: | 686 if self.progress: |
645 self.progress.update(1) | 687 self.progress.update(1) |
646 | 688 |
647 class _Worker(threading.Thread): | 689 class _Worker(threading.Thread): |
648 """One thread to execute one WorkItem.""" | 690 """One thread to execute one WorkItem.""" |
649 def __init__(self, item, args, kwargs): | 691 def __init__(self, item, index, args, kwargs): |
650 threading.Thread.__init__(self, name=item.name or 'Worker') | 692 threading.Thread.__init__(self, name=item.name or 'Worker') |
651 logging.info(item.name) | 693 logging.info(item.name) |
652 self.item = item | 694 self.item = item |
| 695 self.index = index |
653 self.args = args | 696 self.args = args |
654 self.kwargs = kwargs | 697 self.kwargs = kwargs |
655 | 698 |
656 def run(self): | 699 def run(self): |
657 """Runs in its own thread.""" | 700 """Runs in its own thread.""" |
658 logging.debug('running(%s)' % self.item.name) | 701 logging.debug('running(%s)' % self.item.name) |
659 work_queue = self.kwargs['work_queue'] | 702 work_queue = self.kwargs['work_queue'] |
660 try: | 703 try: |
661 self.item.run(*self.args, **self.kwargs) | 704 self.item.run(*self.args, **self.kwargs) |
662 except Exception: | 705 except Exception: |
663 # Catch exception location. | 706 # Catch exception location. |
664 logging.info('Caught exception in thread %s' % self.item.name) | 707 logging.info('Caught exception in thread %s' % self.item.name) |
665 logging.info(str(sys.exc_info())) | 708 logging.info(str(sys.exc_info())) |
666 work_queue.exceptions.put(sys.exc_info()) | 709 work_queue.exceptions.put(sys.exc_info()) |
667 logging.info('Task %s done' % self.item.name) | 710 logging.info('Task %s done' % self.item.name) |
668 | 711 |
669 work_queue.ready_cond.acquire() | 712 work_queue.ready_cond.acquire() |
670 try: | 713 try: |
671 work_queue.ready_cond.notifyAll() | 714 work_queue.ready_cond.notifyAll() |
672 finally: | 715 finally: |
673 work_queue.ready_cond.release() | 716 work_queue.ready_cond.release() |
OLD | NEW |