OLD | NEW |
1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
2 # | 2 # |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
6 # | 6 # |
7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
8 # | 8 # |
9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
(...skipping 298 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
309 return new_obj | 309 return new_obj |
310 | 310 |
311 | 311 |
312 def MakeFileAutoFlush(fileobj, delay=10): | 312 def MakeFileAutoFlush(fileobj, delay=10): |
313 """Creates a file object clone to automatically flush after N seconds.""" | 313 """Creates a file object clone to automatically flush after N seconds.""" |
314 if hasattr(fileobj, 'last_flushed_at'): | 314 if hasattr(fileobj, 'last_flushed_at'): |
315 # Already patched. Just update delay. | 315 # Already patched. Just update delay. |
316 fileobj.delay = delay | 316 fileobj.delay = delay |
317 return fileobj | 317 return fileobj |
318 | 318 |
| 319 # Attribute 'XXX' defined outside __init__ |
| 320 # pylint: disable=W0201 |
319 new_fileobj = SoftClone(fileobj) | 321 new_fileobj = SoftClone(fileobj) |
320 if not hasattr(new_fileobj, 'lock'): | 322 if not hasattr(new_fileobj, 'lock'): |
321 new_fileobj.lock = threading.Lock() | 323 new_fileobj.lock = threading.Lock() |
322 new_fileobj.last_flushed_at = time.time() | 324 new_fileobj.last_flushed_at = time.time() |
323 new_fileobj.delay = delay | 325 new_fileobj.delay = delay |
324 new_fileobj.old_auto_flush_write = new_fileobj.write | 326 new_fileobj.old_auto_flush_write = new_fileobj.write |
325 # Silence pylint. | 327 # Silence pylint. |
326 new_fileobj.flush = fileobj.flush | 328 new_fileobj.flush = fileobj.flush |
327 | 329 |
328 def auto_flush_write(out): | 330 def auto_flush_write(out): |
(...skipping 14 matching lines...) Expand all Loading... |
343 return new_fileobj | 345 return new_fileobj |
344 | 346 |
345 | 347 |
346 def MakeFileAnnotated(fileobj): | 348 def MakeFileAnnotated(fileobj): |
347 """Creates a file object clone to automatically prepends every line in worker | 349 """Creates a file object clone to automatically prepends every line in worker |
348 threads with a NN> prefix.""" | 350 threads with a NN> prefix.""" |
349 if hasattr(fileobj, 'output_buffers'): | 351 if hasattr(fileobj, 'output_buffers'): |
350 # Already patched. | 352 # Already patched. |
351 return fileobj | 353 return fileobj |
352 | 354 |
| 355 # Attribute 'XXX' defined outside __init__ |
| 356 # pylint: disable=W0201 |
353 new_fileobj = SoftClone(fileobj) | 357 new_fileobj = SoftClone(fileobj) |
354 if not hasattr(new_fileobj, 'lock'): | 358 if not hasattr(new_fileobj, 'lock'): |
355 new_fileobj.lock = threading.Lock() | 359 new_fileobj.lock = threading.Lock() |
356 new_fileobj.output_buffers = {} | 360 new_fileobj.output_buffers = {} |
357 new_fileobj.old_annotated_write = new_fileobj.write | 361 new_fileobj.old_annotated_write = new_fileobj.write |
358 | 362 |
359 def annotated_write(out): | 363 def annotated_write(out): |
360 index = getattr(threading.currentThread(), 'index', None) | 364 index = getattr(threading.currentThread(), 'index', None) |
361 if index is None: | 365 if index is None: |
362 # Undexed threads aren't buffered. | 366 # Undexed threads aren't buffered. |
(...skipping 328 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
691 logging.info(item.name) | 695 logging.info(item.name) |
692 self.item = item | 696 self.item = item |
693 self.index = index | 697 self.index = index |
694 self.args = args | 698 self.args = args |
695 self.kwargs = kwargs | 699 self.kwargs = kwargs |
696 | 700 |
697 def run(self): | 701 def run(self): |
698 """Runs in its own thread.""" | 702 """Runs in its own thread.""" |
699 logging.debug('running(%s)' % self.item.name) | 703 logging.debug('running(%s)' % self.item.name) |
700 work_queue = self.kwargs['work_queue'] | 704 work_queue = self.kwargs['work_queue'] |
| 705 # It's necessary to catch all exceptions. |
| 706 # pylint: disable=W0703 |
701 try: | 707 try: |
702 self.item.run(*self.args, **self.kwargs) | 708 self.item.run(*self.args, **self.kwargs) |
703 except Exception: | 709 except Exception: |
704 # Catch exception location. | 710 # Catch exception location. |
705 logging.info('Caught exception in thread %s' % self.item.name) | 711 logging.info('Caught exception in thread %s' % self.item.name) |
706 logging.info(str(sys.exc_info())) | 712 logging.info(str(sys.exc_info())) |
707 work_queue.exceptions.put(sys.exc_info()) | 713 work_queue.exceptions.put(sys.exc_info()) |
708 logging.info('Task %s done' % self.item.name) | 714 logging.info('Task %s done' % self.item.name) |
709 | 715 |
710 work_queue.ready_cond.acquire() | 716 work_queue.ready_cond.acquire() |
711 try: | 717 try: |
712 work_queue.ready_cond.notifyAll() | 718 work_queue.ready_cond.notifyAll() |
713 finally: | 719 finally: |
714 work_queue.ready_cond.release() | 720 work_queue.ready_cond.release() |
OLD | NEW |