OLD | NEW |
---|---|
1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Generic utils.""" | 5 """Generic utils.""" |
6 | 6 |
7 import errno | 7 import errno |
8 import logging | 8 import logging |
9 import os | 9 import os |
10 import Queue | 10 import Queue |
(...skipping 229 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
240 new_fileobj.last_flushed_at = time.time() | 240 new_fileobj.last_flushed_at = time.time() |
241 finally: | 241 finally: |
242 new_fileobj.lock.release() | 242 new_fileobj.lock.release() |
243 if should_flush: | 243 if should_flush: |
244 new_fileobj.flush() | 244 new_fileobj.flush() |
245 | 245 |
246 new_fileobj.write = auto_flush_write | 246 new_fileobj.write = auto_flush_write |
247 return new_fileobj | 247 return new_fileobj |
248 | 248 |
249 | 249 |
250 def MakeFileAnnotated(fileobj, include_zero=False): | 250 def MakeFileAnnotated(fileobj): |
M-A Ruel
2011/10/06 00:57:34
Please git svn rebase or svn up correctly.
| |
251 """Creates a file object clone to automatically prepends every line in worker | 251 """Creates a file object clone to automatically prepends every line in worker |
252 threads with a NN> prefix.""" | 252 threads with a NN> prefix.""" |
253 if hasattr(fileobj, 'output_buffers'): | 253 if hasattr(fileobj, 'output_buffers'): |
254 # Already patched. | 254 # Already patched. |
255 return fileobj | 255 return fileobj |
256 | 256 |
257 # Attribute 'XXX' defined outside __init__ | 257 # Attribute 'XXX' defined outside __init__ |
258 # pylint: disable=W0201 | 258 # pylint: disable=W0201 |
259 new_fileobj = SoftClone(fileobj) | 259 new_fileobj = SoftClone(fileobj) |
260 if not hasattr(new_fileobj, 'lock'): | 260 if not hasattr(new_fileobj, 'lock'): |
261 new_fileobj.lock = threading.Lock() | 261 new_fileobj.lock = threading.Lock() |
262 new_fileobj.output_buffers = {} | 262 new_fileobj.output_buffers = {} |
263 new_fileobj.old_annotated_write = new_fileobj.write | 263 new_fileobj.old_annotated_write = new_fileobj.write |
264 | 264 |
265 def annotated_write(out): | 265 def annotated_write(out): |
266 index = getattr(threading.currentThread(), 'index', None) | 266 index = getattr(threading.currentThread(), 'index', None) |
267 if index is None: | 267 if index is None: |
268 if not include_zero: | 268 # Undexed threads aren't buffered. |
269 # Unindexed threads aren't buffered. | 269 new_fileobj.old_annotated_write(out) |
270 new_fileobj.old_annotated_write(out) | 270 return |
271 return | |
272 index = 0 | |
273 | 271 |
274 new_fileobj.lock.acquire() | 272 new_fileobj.lock.acquire() |
275 try: | 273 try: |
276 # Use a dummy array to hold the string so the code can be lockless. | 274 # Use a dummy array to hold the string so the code can be lockless. |
277 # Strings are immutable, requiring to keep a lock for the whole dictionary | 275 # Strings are immutable, requiring to keep a lock for the whole dictionary |
278 # otherwise. Using an array is faster than using a dummy object. | 276 # otherwise. Using an array is faster than using a dummy object. |
279 if not index in new_fileobj.output_buffers: | 277 if not index in new_fileobj.output_buffers: |
280 obj = new_fileobj.output_buffers[index] = [''] | 278 obj = new_fileobj.output_buffers[index] = [''] |
281 else: | 279 else: |
282 obj = new_fileobj.output_buffers[index] | 280 obj = new_fileobj.output_buffers[index] |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
353 in_line = '' | 351 in_line = '' |
354 while in_byte: | 352 while in_byte: |
355 if in_byte != '\r': | 353 if in_byte != '\r': |
356 if print_stdout: | 354 if print_stdout: |
357 stdout.write(in_byte) | 355 stdout.write(in_byte) |
358 if in_byte != '\n': | 356 if in_byte != '\n': |
359 in_line += in_byte | 357 in_line += in_byte |
360 else: | 358 else: |
361 filter_fn(in_line) | 359 filter_fn(in_line) |
362 in_line = '' | 360 in_line = '' |
361 else: | |
362 filter_fn(in_line) | |
363 in_line = '' | |
363 in_byte = kid.stdout.read(1) | 364 in_byte = kid.stdout.read(1) |
364 # Flush the rest of buffered output. This is only an issue with | 365 # Flush the rest of buffered output. This is only an issue with |
365 # stdout/stderr not ending with a \n. | 366 # stdout/stderr not ending with a \n. |
366 if len(in_line): | 367 if len(in_line): |
367 filter_fn(in_line) | 368 filter_fn(in_line) |
368 rv = kid.wait() | 369 rv = kid.wait() |
369 except KeyboardInterrupt: | 370 except KeyboardInterrupt: |
370 print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args) | 371 print >> sys.stderr, 'Failed while running "%s"' % ' '.join(args) |
371 raise | 372 raise |
372 | 373 |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
490 | 491 |
491 @property | 492 @property |
492 def name(self): | 493 def name(self): |
493 return self._name | 494 return self._name |
494 | 495 |
495 @property | 496 @property |
496 @lockedmethod | 497 @lockedmethod |
497 def requirements(self): | 498 def requirements(self): |
498 return tuple(self._requirements) | 499 return tuple(self._requirements) |
499 | 500 |
500 @lockedmethod | |
501 def add_requirement(self, new): | |
502 self._requirements.add(new) | |
503 | |
504 | 501 |
505 class ExecutionQueue(object): | 502 class ExecutionQueue(object): |
506 """Runs a set of WorkItem that have interdependencies and were WorkItem are | 503 """Runs a set of WorkItem that have interdependencies and were WorkItem are |
507 added as they are processed. | 504 added as they are processed. |
508 | 505 |
509 In gclient's case, Dependencies sometime needs to be run out of order due to | 506 In gclient's case, Dependencies sometime needs to be run out of order due to |
510 From() keyword. This class manages that all the required dependencies are run | 507 From() keyword. This class manages that all the required dependencies are run |
511 before running each one. | 508 before running each one. |
512 | 509 |
513 Methods of this class are thread safe. | 510 Methods of this class are thread safe. |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
556 try: | 553 try: |
557 while True: | 554 while True: |
558 # Check for task to run first, then wait. | 555 # Check for task to run first, then wait. |
559 while True: | 556 while True: |
560 if not self.exceptions.empty(): | 557 if not self.exceptions.empty(): |
561 # Systematically flush the queue when an exception logged. | 558 # Systematically flush the queue when an exception logged. |
562 self.queued = [] | 559 self.queued = [] |
563 self._flush_terminated_threads() | 560 self._flush_terminated_threads() |
564 if (not self.queued and not self.running or | 561 if (not self.queued and not self.running or |
565 self.jobs == len(self.running)): | 562 self.jobs == len(self.running)): |
566 logging.debug('No more worker threads or can\'t queue anything.') | 563 # No more worker threads or can't queue anything. |
567 break | 564 break |
568 | 565 |
569 # Check for new tasks to start. | 566 # Check for new tasks to start. |
570 for i in xrange(len(self.queued)): | 567 for i in xrange(len(self.queued)): |
571 # Verify its requirements. | 568 # Verify its requirements. |
572 for r in self.queued[i].requirements: | 569 for r in self.queued[i].requirements: |
573 if not r in self.ran: | 570 if not r in self.ran: |
574 # Requirement not met. | 571 # Requirement not met. |
575 break | 572 break |
576 else: | 573 else: |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
643 # exception. | 640 # exception. |
644 task_item.run(*args, **kwargs) | 641 task_item.run(*args, **kwargs) |
645 self.ran.append(task_item.name) | 642 self.ran.append(task_item.name) |
646 if self.progress: | 643 if self.progress: |
647 self.progress.update(1, ', '.join(t.item.name for t in self.running)) | 644 self.progress.update(1, ', '.join(t.item.name for t in self.running)) |
648 | 645 |
649 class _Worker(threading.Thread): | 646 class _Worker(threading.Thread): |
650 """One thread to execute one WorkItem.""" | 647 """One thread to execute one WorkItem.""" |
651 def __init__(self, item, index, args, kwargs): | 648 def __init__(self, item, index, args, kwargs): |
652 threading.Thread.__init__(self, name=item.name or 'Worker') | 649 threading.Thread.__init__(self, name=item.name or 'Worker') |
653 logging.info('_Worker(%s) reqs:%s' % (item.name, item.requirements)) | 650 logging.info(item.name) |
654 self.item = item | 651 self.item = item |
655 self.index = index | 652 self.index = index |
656 self.args = args | 653 self.args = args |
657 self.kwargs = kwargs | 654 self.kwargs = kwargs |
658 | 655 |
659 def run(self): | 656 def run(self): |
660 """Runs in its own thread.""" | 657 """Runs in its own thread.""" |
661 logging.debug('_Worker.run(%s)' % self.item.name) | 658 logging.debug('running(%s)' % self.item.name) |
662 work_queue = self.kwargs['work_queue'] | 659 work_queue = self.kwargs['work_queue'] |
663 try: | 660 try: |
664 self.item.run(*self.args, **self.kwargs) | 661 self.item.run(*self.args, **self.kwargs) |
665 except Exception: | 662 except Exception: |
666 # Catch exception location. | 663 # Catch exception location. |
667 logging.info('Caught exception in thread %s' % self.item.name) | 664 logging.info('Caught exception in thread %s' % self.item.name) |
668 logging.info(str(sys.exc_info())) | 665 logging.info(str(sys.exc_info())) |
669 work_queue.exceptions.put(sys.exc_info()) | 666 work_queue.exceptions.put(sys.exc_info()) |
670 logging.info('_Worker.run(%s) done' % self.item.name) | 667 logging.info('Task %s done' % self.item.name) |
671 | 668 |
672 work_queue.ready_cond.acquire() | 669 work_queue.ready_cond.acquire() |
673 try: | 670 try: |
674 work_queue.ready_cond.notifyAll() | 671 work_queue.ready_cond.notifyAll() |
675 finally: | 672 finally: |
676 work_queue.ready_cond.release() | 673 work_queue.ready_cond.release() |
OLD | NEW |