| OLD | NEW |
| 1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
| 2 # | 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
| 6 # | 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # | 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 298 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 309 return new_obj | 309 return new_obj |
| 310 | 310 |
| 311 | 311 |
| 312 def MakeFileAutoFlush(fileobj, delay=10): | 312 def MakeFileAutoFlush(fileobj, delay=10): |
| 313 """Creates a file object clone to automatically flush after N seconds.""" | 313 """Creates a file object clone to automatically flush after N seconds.""" |
| 314 if hasattr(fileobj, 'last_flushed_at'): | 314 if hasattr(fileobj, 'last_flushed_at'): |
| 315 # Already patched. Just update delay. | 315 # Already patched. Just update delay. |
| 316 fileobj.delay = delay | 316 fileobj.delay = delay |
| 317 return fileobj | 317 return fileobj |
| 318 | 318 |
| 319 # Attribute 'XXX' defined outside __init__ |
| 320 # pylint: disable=W0201 |
| 319 new_fileobj = SoftClone(fileobj) | 321 new_fileobj = SoftClone(fileobj) |
| 320 if not hasattr(new_fileobj, 'lock'): | 322 if not hasattr(new_fileobj, 'lock'): |
| 321 new_fileobj.lock = threading.Lock() | 323 new_fileobj.lock = threading.Lock() |
| 322 new_fileobj.last_flushed_at = time.time() | 324 new_fileobj.last_flushed_at = time.time() |
| 323 new_fileobj.delay = delay | 325 new_fileobj.delay = delay |
| 324 new_fileobj.old_auto_flush_write = new_fileobj.write | 326 new_fileobj.old_auto_flush_write = new_fileobj.write |
| 325 # Silence pylint. | 327 # Silence pylint. |
| 326 new_fileobj.flush = fileobj.flush | 328 new_fileobj.flush = fileobj.flush |
| 327 | 329 |
| 328 def auto_flush_write(out): | 330 def auto_flush_write(out): |
| (...skipping 14 matching lines...) Expand all Loading... |
| 343 return new_fileobj | 345 return new_fileobj |
| 344 | 346 |
| 345 | 347 |
| 346 def MakeFileAnnotated(fileobj): | 348 def MakeFileAnnotated(fileobj): |
| 347 """Creates a file object clone to automatically prepends every line in worker | 349 """Creates a file object clone to automatically prepends every line in worker |
| 348 threads with a NN> prefix.""" | 350 threads with a NN> prefix.""" |
| 349 if hasattr(fileobj, 'output_buffers'): | 351 if hasattr(fileobj, 'output_buffers'): |
| 350 # Already patched. | 352 # Already patched. |
| 351 return fileobj | 353 return fileobj |
| 352 | 354 |
| 355 # Attribute 'XXX' defined outside __init__ |
| 356 # pylint: disable=W0201 |
| 353 new_fileobj = SoftClone(fileobj) | 357 new_fileobj = SoftClone(fileobj) |
| 354 if not hasattr(new_fileobj, 'lock'): | 358 if not hasattr(new_fileobj, 'lock'): |
| 355 new_fileobj.lock = threading.Lock() | 359 new_fileobj.lock = threading.Lock() |
| 356 new_fileobj.output_buffers = {} | 360 new_fileobj.output_buffers = {} |
| 357 new_fileobj.old_annotated_write = new_fileobj.write | 361 new_fileobj.old_annotated_write = new_fileobj.write |
| 358 | 362 |
| 359 def annotated_write(out): | 363 def annotated_write(out): |
| 360 index = getattr(threading.currentThread(), 'index', None) | 364 index = getattr(threading.currentThread(), 'index', None) |
| 361 if index is None: | 365 if index is None: |
| 362 # Undexed threads aren't buffered. | 366 # Undexed threads aren't buffered. |
| (...skipping 328 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 691 logging.info(item.name) | 695 logging.info(item.name) |
| 692 self.item = item | 696 self.item = item |
| 693 self.index = index | 697 self.index = index |
| 694 self.args = args | 698 self.args = args |
| 695 self.kwargs = kwargs | 699 self.kwargs = kwargs |
| 696 | 700 |
| 697 def run(self): | 701 def run(self): |
| 698 """Runs in its own thread.""" | 702 """Runs in its own thread.""" |
| 699 logging.debug('running(%s)' % self.item.name) | 703 logging.debug('running(%s)' % self.item.name) |
| 700 work_queue = self.kwargs['work_queue'] | 704 work_queue = self.kwargs['work_queue'] |
| 705 # It's necessary to catch all exceptions. |
| 706 # pylint: disable=W0703 |
| 701 try: | 707 try: |
| 702 self.item.run(*self.args, **self.kwargs) | 708 self.item.run(*self.args, **self.kwargs) |
| 703 except Exception: | 709 except Exception: |
| 704 # Catch exception location. | 710 # Catch exception location. |
| 705 logging.info('Caught exception in thread %s' % self.item.name) | 711 logging.info('Caught exception in thread %s' % self.item.name) |
| 706 logging.info(str(sys.exc_info())) | 712 logging.info(str(sys.exc_info())) |
| 707 work_queue.exceptions.put(sys.exc_info()) | 713 work_queue.exceptions.put(sys.exc_info()) |
| 708 logging.info('Task %s done' % self.item.name) | 714 logging.info('Task %s done' % self.item.name) |
| 709 | 715 |
| 710 work_queue.ready_cond.acquire() | 716 work_queue.ready_cond.acquire() |
| 711 try: | 717 try: |
| 712 work_queue.ready_cond.notifyAll() | 718 work_queue.ready_cond.notifyAll() |
| 713 finally: | 719 finally: |
| 714 work_queue.ready_cond.release() | 720 work_queue.ready_cond.release() |
| OLD | NEW |