| OLD | NEW |
| 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Generic utils.""" | 5 """Generic utils.""" |
| 6 | 6 |
| 7 import errno | 7 import errno |
| 8 import logging | 8 import logging |
| 9 import os | 9 import os |
| 10 import Queue | 10 import Queue |
| (...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 240 new_fileobj.last_flushed_at = time.time() | 240 new_fileobj.last_flushed_at = time.time() |
| 241 finally: | 241 finally: |
| 242 new_fileobj.lock.release() | 242 new_fileobj.lock.release() |
| 243 if should_flush: | 243 if should_flush: |
| 244 new_fileobj.flush() | 244 new_fileobj.flush() |
| 245 | 245 |
| 246 new_fileobj.write = auto_flush_write | 246 new_fileobj.write = auto_flush_write |
| 247 return new_fileobj | 247 return new_fileobj |
| 248 | 248 |
| 249 | 249 |
| 250 def MakeFileAnnotated(fileobj): | 250 def MakeFileAnnotated(fileobj, include_zero=False): |
| 251 """Creates a file object clone to automatically prepends every line in worker | 251 """Creates a file object clone to automatically prepends every line in worker |
| 252 threads with a NN> prefix.""" | 252 threads with a NN> prefix.""" |
| 253 if hasattr(fileobj, 'output_buffers'): | 253 if hasattr(fileobj, 'output_buffers'): |
| 254 # Already patched. | 254 # Already patched. |
| 255 return fileobj | 255 return fileobj |
| 256 | 256 |
| 257 # Attribute 'XXX' defined outside __init__ | 257 # Attribute 'XXX' defined outside __init__ |
| 258 # pylint: disable=W0201 | 258 # pylint: disable=W0201 |
| 259 new_fileobj = SoftClone(fileobj) | 259 new_fileobj = SoftClone(fileobj) |
| 260 if not hasattr(new_fileobj, 'lock'): | 260 if not hasattr(new_fileobj, 'lock'): |
| 261 new_fileobj.lock = threading.Lock() | 261 new_fileobj.lock = threading.Lock() |
| 262 new_fileobj.output_buffers = {} | 262 new_fileobj.output_buffers = {} |
| 263 new_fileobj.old_annotated_write = new_fileobj.write | 263 new_fileobj.old_annotated_write = new_fileobj.write |
| 264 | 264 |
| 265 def annotated_write(out): | 265 def annotated_write(out): |
| 266 index = getattr(threading.currentThread(), 'index', None) | 266 index = getattr(threading.currentThread(), 'index', None) |
| 267 if index is None: | 267 if index is None: |
| 268 # Undexed threads aren't buffered. | 268 if not include_zero: |
| 269 new_fileobj.old_annotated_write(out) | 269 # Unindexed threads aren't buffered. |
| 270 return | 270 new_fileobj.old_annotated_write(out) |
| 271 return |
| 272 index = 0 |
| 271 | 273 |
| 272 new_fileobj.lock.acquire() | 274 new_fileobj.lock.acquire() |
| 273 try: | 275 try: |
| 274 # Use a dummy array to hold the string so the code can be lockless. | 276 # Use a dummy array to hold the string so the code can be lockless. |
| 275 # Strings are immutable, requiring to keep a lock for the whole dictionary | 277 # Strings are immutable, requiring to keep a lock for the whole dictionary |
| 276 # otherwise. Using an array is faster than using a dummy object. | 278 # otherwise. Using an array is faster than using a dummy object. |
| 277 if not index in new_fileobj.output_buffers: | 279 if not index in new_fileobj.output_buffers: |
| 278 obj = new_fileobj.output_buffers[index] = [''] | 280 obj = new_fileobj.output_buffers[index] = [''] |
| 279 else: | 281 else: |
| 280 obj = new_fileobj.output_buffers[index] | 282 obj = new_fileobj.output_buffers[index] |
| (...skipping 269 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 550 try: | 552 try: |
| 551 while True: | 553 while True: |
| 552 # Check for task to run first, then wait. | 554 # Check for task to run first, then wait. |
| 553 while True: | 555 while True: |
| 554 if not self.exceptions.empty(): | 556 if not self.exceptions.empty(): |
| 555 # Systematically flush the queue when an exception logged. | 557 # Systematically flush the queue when an exception logged. |
| 556 self.queued = [] | 558 self.queued = [] |
| 557 self._flush_terminated_threads() | 559 self._flush_terminated_threads() |
| 558 if (not self.queued and not self.running or | 560 if (not self.queued and not self.running or |
| 559 self.jobs == len(self.running)): | 561 self.jobs == len(self.running)): |
| 560 # No more worker threads or can't queue anything. | 562 logging.debug('No more worker threads or can\'t queue anything.') |
| 561 break | 563 break |
| 562 | 564 |
| 563 # Check for new tasks to start. | 565 # Check for new tasks to start. |
| 564 for i in xrange(len(self.queued)): | 566 for i in xrange(len(self.queued)): |
| 565 # Verify its requirements. | 567 # Verify its requirements. |
| 566 for r in self.queued[i].requirements: | 568 for r in self.queued[i].requirements: |
| 567 if not r in self.ran: | 569 if not r in self.ran: |
| 568 # Requirement not met. | 570 # Requirement not met. |
| 569 break | 571 break |
| 570 else: | 572 else: |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 637 # exception. | 639 # exception. |
| 638 task_item.run(*args, **kwargs) | 640 task_item.run(*args, **kwargs) |
| 639 self.ran.append(task_item.name) | 641 self.ran.append(task_item.name) |
| 640 if self.progress: | 642 if self.progress: |
| 641 self.progress.update(1, ', '.join(t.item.name for t in self.running)) | 643 self.progress.update(1, ', '.join(t.item.name for t in self.running)) |
| 642 | 644 |
| 643 class _Worker(threading.Thread): | 645 class _Worker(threading.Thread): |
| 644 """One thread to execute one WorkItem.""" | 646 """One thread to execute one WorkItem.""" |
| 645 def __init__(self, item, index, args, kwargs): | 647 def __init__(self, item, index, args, kwargs): |
| 646 threading.Thread.__init__(self, name=item.name or 'Worker') | 648 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 647 logging.info(item.name) | 649 logging.info('_Worker(%s) reqs:%s' % (item.name, item.requirements)) |
| 648 self.item = item | 650 self.item = item |
| 649 self.index = index | 651 self.index = index |
| 650 self.args = args | 652 self.args = args |
| 651 self.kwargs = kwargs | 653 self.kwargs = kwargs |
| 652 | 654 |
| 653 def run(self): | 655 def run(self): |
| 654 """Runs in its own thread.""" | 656 """Runs in its own thread.""" |
| 655 logging.debug('running(%s)' % self.item.name) | 657 logging.debug('_Worker.run(%s)' % self.item.name) |
| 656 work_queue = self.kwargs['work_queue'] | 658 work_queue = self.kwargs['work_queue'] |
| 657 try: | 659 try: |
| 658 self.item.run(*self.args, **self.kwargs) | 660 self.item.run(*self.args, **self.kwargs) |
| 659 except Exception: | 661 except Exception: |
| 660 # Catch exception location. | 662 # Catch exception location. |
| 661 logging.info('Caught exception in thread %s' % self.item.name) | 663 logging.info('Caught exception in thread %s' % self.item.name) |
| 662 logging.info(str(sys.exc_info())) | 664 logging.info(str(sys.exc_info())) |
| 663 work_queue.exceptions.put(sys.exc_info()) | 665 work_queue.exceptions.put(sys.exc_info()) |
| 664 logging.info('Task %s done' % self.item.name) | 666 logging.info('_Worker.run(%s) done' % self.item.name) |
| 665 | 667 |
| 666 work_queue.ready_cond.acquire() | 668 work_queue.ready_cond.acquire() |
| 667 try: | 669 try: |
| 668 work_queue.ready_cond.notifyAll() | 670 work_queue.ready_cond.notifyAll() |
| 669 finally: | 671 finally: |
| 670 work_queue.ready_cond.release() | 672 work_queue.ready_cond.release() |
| OLD | NEW |