Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Generic utils.""" | 5 """Generic utils.""" |
| 6 | 6 |
| 7 import errno | 7 import errno |
| 8 import logging | 8 import logging |
| 9 import os | 9 import os |
| 10 import Queue | 10 import Queue |
| (...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 240 new_fileobj.last_flushed_at = time.time() | 240 new_fileobj.last_flushed_at = time.time() |
| 241 finally: | 241 finally: |
| 242 new_fileobj.lock.release() | 242 new_fileobj.lock.release() |
| 243 if should_flush: | 243 if should_flush: |
| 244 new_fileobj.flush() | 244 new_fileobj.flush() |
| 245 | 245 |
| 246 new_fileobj.write = auto_flush_write | 246 new_fileobj.write = auto_flush_write |
| 247 return new_fileobj | 247 return new_fileobj |
| 248 | 248 |
| 249 | 249 |
| 250 def MakeFileAnnotated(fileobj, include_zero=False): | 250 def MakeFileAnnotated(fileobj): |
|
M-A Ruel
2011/10/06 00:57:34
Please git svn rebase or svn up correctly.
| |
| 251 """Creates a file object clone to automatically prepends every line in worker | 251 """Creates a file object clone to automatically prepends every line in worker |
| 252 threads with a NN> prefix.""" | 252 threads with a NN> prefix.""" |
| 253 if hasattr(fileobj, 'output_buffers'): | 253 if hasattr(fileobj, 'output_buffers'): |
| 254 # Already patched. | 254 # Already patched. |
| 255 return fileobj | 255 return fileobj |
| 256 | 256 |
| 257 # Attribute 'XXX' defined outside __init__ | 257 # Attribute 'XXX' defined outside __init__ |
| 258 # pylint: disable=W0201 | 258 # pylint: disable=W0201 |
| 259 new_fileobj = SoftClone(fileobj) | 259 new_fileobj = SoftClone(fileobj) |
| 260 if not hasattr(new_fileobj, 'lock'): | 260 if not hasattr(new_fileobj, 'lock'): |
| 261 new_fileobj.lock = threading.Lock() | 261 new_fileobj.lock = threading.Lock() |
| 262 new_fileobj.output_buffers = {} | 262 new_fileobj.output_buffers = {} |
| 263 new_fileobj.old_annotated_write = new_fileobj.write | 263 new_fileobj.old_annotated_write = new_fileobj.write |
| 264 | 264 |
| 265 def annotated_write(out): | 265 def annotated_write(out): |
| 266 index = getattr(threading.currentThread(), 'index', None) | 266 index = getattr(threading.currentThread(), 'index', None) |
| 267 if index is None: | 267 if index is None: |
| 268 if not include_zero: | 268 # Undexed threads aren't buffered. |
| 269 # Unindexed threads aren't buffered. | 269 new_fileobj.old_annotated_write(out) |
| 270 new_fileobj.old_annotated_write(out) | 270 return |
| 271 return | |
| 272 index = 0 | |
| 273 | 271 |
| 274 new_fileobj.lock.acquire() | 272 new_fileobj.lock.acquire() |
| 275 try: | 273 try: |
| 276 # Use a dummy array to hold the string so the code can be lockless. | 274 # Use a dummy array to hold the string so the code can be lockless. |
| 277 # Strings are immutable, requiring to keep a lock for the whole dictionary | 275 # Strings are immutable, requiring to keep a lock for the whole dictionary |
| 278 # otherwise. Using an array is faster than using a dummy object. | 276 # otherwise. Using an array is faster than using a dummy object. |
| 279 if not index in new_fileobj.output_buffers: | 277 if not index in new_fileobj.output_buffers: |
| 280 obj = new_fileobj.output_buffers[index] = [''] | 278 obj = new_fileobj.output_buffers[index] = [''] |
| 281 else: | 279 else: |
| 282 obj = new_fileobj.output_buffers[index] | 280 obj = new_fileobj.output_buffers[index] |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 353 in_line = '' | 351 in_line = '' |
| 354 while in_byte: | 352 while in_byte: |
| 355 if in_byte != '\r': | 353 if in_byte != '\r': |
| 356 if print_stdout: | 354 if print_stdout: |
| 357 stdout.write(in_byte) | 355 stdout.write(in_byte) |
| 358 if in_byte != '\n': | 356 if in_byte != '\n': |
| 359 in_line += in_byte | 357 in_line += in_byte |
| 360 else: | 358 else: |
| 361 filter_fn(in_line) | 359 filter_fn(in_line) |
| 362 in_line = '' | 360 in_line = '' |
| 361 else: | |
| 362 filter_fn(in_line) | |
| 363 in_line = '' | |
| 363 in_byte = kid.stdout.read(1) | 364 in_byte = kid.stdout.read(1) |
| 364 # Flush the rest of buffered output. This is only an issue with | 365 # Flush the rest of buffered output. This is only an issue with |
| 365 # stdout/stderr not ending with a \n. | 366 # stdout/stderr not ending with a \n. |
| 366 if len(in_line): | 367 if len(in_line): |
| 367 filter_fn(in_line) | 368 filter_fn(in_line) |
| 368 rv = kid.wait() | 369 rv = kid.wait() |
| 369 except KeyboardInterrupt: | 370 except KeyboardInterrupt: |
| 370 print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args) | 371 print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args) |
| 371 raise | 372 raise |
| 372 | 373 |
| (...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 490 | 491 |
| 491 @property | 492 @property |
| 492 def name(self): | 493 def name(self): |
| 493 return self._name | 494 return self._name |
| 494 | 495 |
| 495 @property | 496 @property |
| 496 @lockedmethod | 497 @lockedmethod |
| 497 def requirements(self): | 498 def requirements(self): |
| 498 return tuple(self._requirements) | 499 return tuple(self._requirements) |
| 499 | 500 |
| 500 @lockedmethod | |
| 501 def add_requirement(self, new): | |
| 502 self._requirements.add(new) | |
| 503 | |
| 504 | 501 |
| 505 class ExecutionQueue(object): | 502 class ExecutionQueue(object): |
| 506 """Runs a set of WorkItem that have interdependencies and were WorkItem are | 503 """Runs a set of WorkItem that have interdependencies and were WorkItem are |
| 507 added as they are processed. | 504 added as they are processed. |
| 508 | 505 |
| 509 In gclient's case, Dependencies sometime needs to be run out of order due to | 506 In gclient's case, Dependencies sometime needs to be run out of order due to |
| 510 From() keyword. This class manages that all the required dependencies are run | 507 From() keyword. This class manages that all the required dependencies are run |
| 511 before running each one. | 508 before running each one. |
| 512 | 509 |
| 513 Methods of this class are thread safe. | 510 Methods of this class are thread safe. |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 556 try: | 553 try: |
| 557 while True: | 554 while True: |
| 558 # Check for task to run first, then wait. | 555 # Check for task to run first, then wait. |
| 559 while True: | 556 while True: |
| 560 if not self.exceptions.empty(): | 557 if not self.exceptions.empty(): |
| 561 # Systematically flush the queue when an exception logged. | 558 # Systematically flush the queue when an exception logged. |
| 562 self.queued = [] | 559 self.queued = [] |
| 563 self._flush_terminated_threads() | 560 self._flush_terminated_threads() |
| 564 if (not self.queued and not self.running or | 561 if (not self.queued and not self.running or |
| 565 self.jobs == len(self.running)): | 562 self.jobs == len(self.running)): |
| 566 logging.debug('No more worker threads or can\'t queue anything.') | 563 # No more worker threads or can't queue anything. |
| 567 break | 564 break |
| 568 | 565 |
| 569 # Check for new tasks to start. | 566 # Check for new tasks to start. |
| 570 for i in xrange(len(self.queued)): | 567 for i in xrange(len(self.queued)): |
| 571 # Verify its requirements. | 568 # Verify its requirements. |
| 572 for r in self.queued[i].requirements: | 569 for r in self.queued[i].requirements: |
| 573 if not r in self.ran: | 570 if not r in self.ran: |
| 574 # Requirement not met. | 571 # Requirement not met. |
| 575 break | 572 break |
| 576 else: | 573 else: |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 643 # exception. | 640 # exception. |
| 644 task_item.run(*args, **kwargs) | 641 task_item.run(*args, **kwargs) |
| 645 self.ran.append(task_item.name) | 642 self.ran.append(task_item.name) |
| 646 if self.progress: | 643 if self.progress: |
| 647 self.progress.update(1, ', '.join(t.item.name for t in self.running)) | 644 self.progress.update(1, ', '.join(t.item.name for t in self.running)) |
| 648 | 645 |
| 649 class _Worker(threading.Thread): | 646 class _Worker(threading.Thread): |
| 650 """One thread to execute one WorkItem.""" | 647 """One thread to execute one WorkItem.""" |
| 651 def __init__(self, item, index, args, kwargs): | 648 def __init__(self, item, index, args, kwargs): |
| 652 threading.Thread.__init__(self, name=item.name or 'Worker') | 649 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 653 logging.info('_Worker(%s) reqs:%s' % (item.name, item.requirements)) | 650 logging.info(item.name) |
| 654 self.item = item | 651 self.item = item |
| 655 self.index = index | 652 self.index = index |
| 656 self.args = args | 653 self.args = args |
| 657 self.kwargs = kwargs | 654 self.kwargs = kwargs |
| 658 | 655 |
| 659 def run(self): | 656 def run(self): |
| 660 """Runs in its own thread.""" | 657 """Runs in its own thread.""" |
| 661 logging.debug('_Worker.run(%s)' % self.item.name) | 658 logging.debug('running(%s)' % self.item.name) |
| 662 work_queue = self.kwargs['work_queue'] | 659 work_queue = self.kwargs['work_queue'] |
| 663 try: | 660 try: |
| 664 self.item.run(*self.args, **self.kwargs) | 661 self.item.run(*self.args, **self.kwargs) |
| 665 except Exception: | 662 except Exception: |
| 666 # Catch exception location. | 663 # Catch exception location. |
| 667 logging.info('Caught exception in thread %s' % self.item.name) | 664 logging.info('Caught exception in thread %s' % self.item.name) |
| 668 logging.info(str(sys.exc_info())) | 665 logging.info(str(sys.exc_info())) |
| 669 work_queue.exceptions.put(sys.exc_info()) | 666 work_queue.exceptions.put(sys.exc_info()) |
| 670 logging.info('_Worker.run(%s) done' % self.item.name) | 667 logging.info('Task %s done' % self.item.name) |
| 671 | 668 |
| 672 work_queue.ready_cond.acquire() | 669 work_queue.ready_cond.acquire() |
| 673 try: | 670 try: |
| 674 work_queue.ready_cond.notifyAll() | 671 work_queue.ready_cond.notifyAll() |
| 675 finally: | 672 finally: |
| 676 work_queue.ready_cond.release() | 673 work_queue.ready_cond.release() |
| OLD | NEW |