OLD | NEW |
1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Generic utils.""" | 5 """Generic utils.""" |
6 | 6 |
7 import errno | 7 import errno |
8 import logging | 8 import logging |
9 import os | 9 import os |
10 import Queue | 10 import Queue |
(...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
240 new_fileobj.last_flushed_at = time.time() | 240 new_fileobj.last_flushed_at = time.time() |
241 finally: | 241 finally: |
242 new_fileobj.lock.release() | 242 new_fileobj.lock.release() |
243 if should_flush: | 243 if should_flush: |
244 new_fileobj.flush() | 244 new_fileobj.flush() |
245 | 245 |
246 new_fileobj.write = auto_flush_write | 246 new_fileobj.write = auto_flush_write |
247 return new_fileobj | 247 return new_fileobj |
248 | 248 |
249 | 249 |
250 def MakeFileAnnotated(fileobj): | 250 def MakeFileAnnotated(fileobj, include_zero=False): |
251 """Creates a file object clone to automatically prepends every line in worker | 251 """Creates a file object clone to automatically prepends every line in worker |
252 threads with a NN> prefix.""" | 252 threads with a NN> prefix.""" |
253 if hasattr(fileobj, 'output_buffers'): | 253 if hasattr(fileobj, 'output_buffers'): |
254 # Already patched. | 254 # Already patched. |
255 return fileobj | 255 return fileobj |
256 | 256 |
257 # Attribute 'XXX' defined outside __init__ | 257 # Attribute 'XXX' defined outside __init__ |
258 # pylint: disable=W0201 | 258 # pylint: disable=W0201 |
259 new_fileobj = SoftClone(fileobj) | 259 new_fileobj = SoftClone(fileobj) |
260 if not hasattr(new_fileobj, 'lock'): | 260 if not hasattr(new_fileobj, 'lock'): |
261 new_fileobj.lock = threading.Lock() | 261 new_fileobj.lock = threading.Lock() |
262 new_fileobj.output_buffers = {} | 262 new_fileobj.output_buffers = {} |
263 new_fileobj.old_annotated_write = new_fileobj.write | 263 new_fileobj.old_annotated_write = new_fileobj.write |
264 | 264 |
265 def annotated_write(out): | 265 def annotated_write(out): |
266 index = getattr(threading.currentThread(), 'index', None) | 266 index = getattr(threading.currentThread(), 'index', None) |
267 if index is None: | 267 if index is None: |
268 # Undexed threads aren't buffered. | 268 if not include_zero: |
269 new_fileobj.old_annotated_write(out) | 269 # Unindexed threads aren't buffered. |
270 return | 270 new_fileobj.old_annotated_write(out) |
| 271 return |
| 272 index = 0 |
271 | 273 |
272 new_fileobj.lock.acquire() | 274 new_fileobj.lock.acquire() |
273 try: | 275 try: |
274 # Use a dummy array to hold the string so the code can be lockless. | 276 # Use a dummy array to hold the string so the code can be lockless. |
275 # Strings are immutable, requiring to keep a lock for the whole dictionary | 277 # Strings are immutable, requiring to keep a lock for the whole dictionary |
276 # otherwise. Using an array is faster than using a dummy object. | 278 # otherwise. Using an array is faster than using a dummy object. |
277 if not index in new_fileobj.output_buffers: | 279 if not index in new_fileobj.output_buffers: |
278 obj = new_fileobj.output_buffers[index] = [''] | 280 obj = new_fileobj.output_buffers[index] = [''] |
279 else: | 281 else: |
280 obj = new_fileobj.output_buffers[index] | 282 obj = new_fileobj.output_buffers[index] |
(...skipping 269 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
550 try: | 552 try: |
551 while True: | 553 while True: |
552 # Check for task to run first, then wait. | 554 # Check for task to run first, then wait. |
553 while True: | 555 while True: |
554 if not self.exceptions.empty(): | 556 if not self.exceptions.empty(): |
555 # Systematically flush the queue when an exception logged. | 557 # Systematically flush the queue when an exception logged. |
556 self.queued = [] | 558 self.queued = [] |
557 self._flush_terminated_threads() | 559 self._flush_terminated_threads() |
558 if (not self.queued and not self.running or | 560 if (not self.queued and not self.running or |
559 self.jobs == len(self.running)): | 561 self.jobs == len(self.running)): |
560 # No more worker threads or can't queue anything. | 562 logging.debug('No more worker threads or can\'t queue anything.') |
561 break | 563 break |
562 | 564 |
563 # Check for new tasks to start. | 565 # Check for new tasks to start. |
564 for i in xrange(len(self.queued)): | 566 for i in xrange(len(self.queued)): |
565 # Verify its requirements. | 567 # Verify its requirements. |
566 for r in self.queued[i].requirements: | 568 for r in self.queued[i].requirements: |
567 if not r in self.ran: | 569 if not r in self.ran: |
568 # Requirement not met. | 570 # Requirement not met. |
569 break | 571 break |
570 else: | 572 else: |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
637 # exception. | 639 # exception. |
638 task_item.run(*args, **kwargs) | 640 task_item.run(*args, **kwargs) |
639 self.ran.append(task_item.name) | 641 self.ran.append(task_item.name) |
640 if self.progress: | 642 if self.progress: |
641 self.progress.update(1, ', '.join(t.item.name for t in self.running)) | 643 self.progress.update(1, ', '.join(t.item.name for t in self.running)) |
642 | 644 |
643 class _Worker(threading.Thread): | 645 class _Worker(threading.Thread): |
644 """One thread to execute one WorkItem.""" | 646 """One thread to execute one WorkItem.""" |
645 def __init__(self, item, index, args, kwargs): | 647 def __init__(self, item, index, args, kwargs): |
646 threading.Thread.__init__(self, name=item.name or 'Worker') | 648 threading.Thread.__init__(self, name=item.name or 'Worker') |
647 logging.info(item.name) | 649 logging.info('_Worker(%s) reqs:%s' % (item.name, item.requirements)) |
648 self.item = item | 650 self.item = item |
649 self.index = index | 651 self.index = index |
650 self.args = args | 652 self.args = args |
651 self.kwargs = kwargs | 653 self.kwargs = kwargs |
652 | 654 |
653 def run(self): | 655 def run(self): |
654 """Runs in its own thread.""" | 656 """Runs in its own thread.""" |
655 logging.debug('running(%s)' % self.item.name) | 657 logging.debug('_Worker.run(%s)' % self.item.name) |
656 work_queue = self.kwargs['work_queue'] | 658 work_queue = self.kwargs['work_queue'] |
657 try: | 659 try: |
658 self.item.run(*self.args, **self.kwargs) | 660 self.item.run(*self.args, **self.kwargs) |
659 except Exception: | 661 except Exception: |
660 # Catch exception location. | 662 # Catch exception location. |
661 logging.info('Caught exception in thread %s' % self.item.name) | 663 logging.info('Caught exception in thread %s' % self.item.name) |
662 logging.info(str(sys.exc_info())) | 664 logging.info(str(sys.exc_info())) |
663 work_queue.exceptions.put(sys.exc_info()) | 665 work_queue.exceptions.put(sys.exc_info()) |
664 logging.info('Task %s done' % self.item.name) | 666 logging.info('_Worker.run(%s) done' % self.item.name) |
665 | 667 |
666 work_queue.ready_cond.acquire() | 668 work_queue.ready_cond.acquire() |
667 try: | 669 try: |
668 work_queue.ready_cond.notifyAll() | 670 work_queue.ready_cond.notifyAll() |
669 finally: | 671 finally: |
670 work_queue.ready_cond.release() | 672 work_queue.ready_cond.release() |
OLD | NEW |