Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Side by Side Diff: gclient_utils.py

Issue 3336015: Add the infrastructure necessary to support annotated stdout. (Closed)
Patch Set: add progress.update() Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2009 Google Inc. All Rights Reserved. 1 # Copyright 2009 Google Inc. All Rights Reserved.
2 # 2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); 3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License. 4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at 5 # You may obtain a copy of the License at
6 # 6 #
7 # http://www.apache.org/licenses/LICENSE-2.0 7 # http://www.apache.org/licenses/LICENSE-2.0
8 # 8 #
9 # Unless required by applicable law or agreed to in writing, software 9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, 10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and 12 # See the License for the specific language governing permissions and
13 # limitations under the License. 13 # limitations under the License.
14 14
15 """Generic utils.""" 15 """Generic utils."""
16 16
17 import copy
17 import errno 18 import errno
18 import logging 19 import logging
19 import os 20 import os
21 import Queue
20 import re 22 import re
21 import stat 23 import stat
22 import subprocess 24 import subprocess
23 import sys 25 import sys
24 import threading 26 import threading
25 import time 27 import time
26 import xml.dom.minidom 28 import xml.dom.minidom
27 import xml.parsers.expat 29 import xml.parsers.expat
28 30
29 31
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
393 exec(FileRead(entries_filename), scope) 395 exec(FileRead(entries_filename), scope)
394 except SyntaxError, e: 396 except SyntaxError, e:
395 SyntaxErrorToError(filename, e) 397 SyntaxErrorToError(filename, e)
396 all_directories = scope['entries'].keys() 398 all_directories = scope['entries'].keys()
397 path_to_check = real_from_dir[len(path)+1:] 399 path_to_check = real_from_dir[len(path)+1:]
398 while path_to_check: 400 while path_to_check:
399 if path_to_check in all_directories: 401 if path_to_check in all_directories:
400 return path 402 return path
401 path_to_check = os.path.dirname(path_to_check) 403 path_to_check = os.path.dirname(path_to_check)
402 return None 404 return None
403 405
404 logging.info('Found gclient root at ' + path) 406 logging.info('Found gclient root at ' + path)
405 return path 407 return path
406 408
407 409
408 def PathDifference(root, subpath): 410 def PathDifference(root, subpath):
409 """Returns the difference subpath minus root.""" 411 """Returns the difference subpath minus root."""
410 root = os.path.realpath(root) 412 root = os.path.realpath(root)
411 subpath = os.path.realpath(subpath) 413 subpath = os.path.realpath(subpath)
412 if not subpath.startswith(root): 414 if not subpath.startswith(root):
413 return None 415 return None
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
448 return config_dir, env['entries'] 450 return config_dir, env['entries']
449 451
450 452
451 class WorkItem(object): 453 class WorkItem(object):
452 """One work item.""" 454 """One work item."""
453 # A list of string, each being a WorkItem name. 455 # A list of string, each being a WorkItem name.
454 requirements = [] 456 requirements = []
455 # A unique string representing this work item. 457 # A unique string representing this work item.
456 name = None 458 name = None
457 459
458 def run(self): 460 def run(self, work_queue, options):
461 """work_queue and options are passed as keyword arguments so they should be
462 the last parameters of the function when you override it."""
459 pass 463 pass
460 464
461 465
462 class ExecutionQueue(object): 466 class ExecutionQueue(object):
463 """Runs a set of WorkItem that have interdependencies and were WorkItem are 467 """Runs a set of WorkItem that have interdependencies and were WorkItem are
464 added as they are processed. 468 added as they are processed.
465 469
466 In gclient's case, Dependencies sometime needs to be run out of order due to 470 In gclient's case, Dependencies sometime needs to be run out of order due to
467 From() keyword. This class manages that all the required dependencies are run 471 From() keyword. This class manages that all the required dependencies are run
468 before running each one. 472 before running each one.
469 473
470 Methods of this class are thread safe. 474 Methods of this class are thread safe.
471 """ 475 """
472 def __init__(self, jobs, progress): 476 def __init__(self, jobs, progress):
473 """jobs specifies the number of concurrent tasks to allow. progress is a 477 """jobs specifies the number of concurrent tasks to allow. progress is a
474 Progress instance.""" 478 Progress instance."""
475 # Set when a thread is done or a new item is enqueued. 479 # Set when a thread is done or a new item is enqueued.
476 self.ready_cond = threading.Condition() 480 self.ready_cond = threading.Condition()
477 # Maximum number of concurrent tasks. 481 # Maximum number of concurrent tasks.
478 self.jobs = jobs 482 self.jobs = jobs
479 # List of WorkItem, for gclient, these are Dependency instances. 483 # List of WorkItem, for gclient, these are Dependency instances.
480 self.queued = [] 484 self.queued = []
481 # List of strings representing each Dependency.name that was run. 485 # List of strings representing each Dependency.name that was run.
482 self.ran = [] 486 self.ran = []
483 # List of items currently running. 487 # List of items currently running.
484 self.running = [] 488 self.running = []
485 # Exceptions thrown if any. 489 # Exceptions thrown if any.
486 self.exceptions = [] 490 self.exceptions = Queue.Queue()
491 # Progress status
487 self.progress = progress 492 self.progress = progress
488 if self.progress: 493 if self.progress:
489 self.progress.update() 494 self.progress.update(0)
490 495
491 def enqueue(self, d): 496 def enqueue(self, d):
492 """Enqueue one Dependency to be executed later once its requirements are 497 """Enqueue one Dependency to be executed later once its requirements are
493 satisfied. 498 satisfied.
494 """ 499 """
495 assert isinstance(d, WorkItem) 500 assert isinstance(d, WorkItem)
496 self.ready_cond.acquire() 501 self.ready_cond.acquire()
497 try: 502 try:
498 self.queued.append(d) 503 self.queued.append(d)
499 total = len(self.queued) + len(self.ran) + len(self.running) 504 total = len(self.queued) + len(self.ran) + len(self.running)
500 logging.debug('enqueued(%s)' % d.name) 505 logging.debug('enqueued(%s)' % d.name)
501 if self.progress: 506 if self.progress:
502 self.progress._total = total + 1 507 self.progress._total = total + 1
503 self.progress.update(0) 508 self.progress.update(0)
504 self.ready_cond.notifyAll() 509 self.ready_cond.notifyAll()
505 finally: 510 finally:
506 self.ready_cond.release() 511 self.ready_cond.release()
507 512
508 def flush(self, *args, **kwargs): 513 def flush(self, *args, **kwargs):
509 """Runs all enqueued items until all are executed.""" 514 """Runs all enqueued items until all are executed."""
515 kwargs['work_queue'] = self
510 self.ready_cond.acquire() 516 self.ready_cond.acquire()
511 try: 517 try:
512 while True: 518 while True:
513 # Check for task to run first, then wait. 519 # Check for task to run first, then wait.
514 while True: 520 while True:
515 if self.exceptions: 521 if not self.exceptions.empty():
516 # Systematically flush the queue when there is an exception logged 522 # Systematically flush the queue when an exception logged.
517 # in.
518 self.queued = [] 523 self.queued = []
519 # Flush threads that have terminated. 524 self._flush_terminated_threads()
520 self.running = [t for t in self.running if t.isAlive()] 525 if (not self.queued and not self.running or
521 if not self.queued and not self.running: 526 self.jobs == len(self.running)):
527 # No more worker threads or can't queue anything.
522 break 528 break
523 if self.jobs == len(self.running): 529
524 break 530 # Check for new tasks to start.
525 for i in xrange(len(self.queued)): 531 for i in xrange(len(self.queued)):
526 # Verify its requirements. 532 # Verify its requirements.
527 for r in self.queued[i].requirements: 533 for r in self.queued[i].requirements:
528 if not r in self.ran: 534 if not r in self.ran:
529 # Requirement not met. 535 # Requirement not met.
530 break 536 break
531 else: 537 else:
532 # Start one work item: all its requirements are satisfied. 538 # Start one work item: all its requirements are satisfied.
533 d = self.queued.pop(i) 539 self._run_one_task(self.queued.pop(i), args, kwargs)
534 new_thread = self._Worker(self, d, args=args, kwargs=kwargs)
535 if self.jobs > 1:
536 # Start the thread.
537 self.running.append(new_thread)
538 new_thread.start()
539 else:
540 # Run the 'thread' inside the main thread.
541 new_thread.run()
542 break 540 break
543 else: 541 else:
544 # Couldn't find an item that could run. Break out the outher loop. 542 # Couldn't find an item that could run. Break out the outher loop.
545 break 543 break
544
546 if not self.queued and not self.running: 545 if not self.queued and not self.running:
546 # We're done.
547 break 547 break
548 # We need to poll here otherwise Ctrl-C isn't processed. 548 # We need to poll here otherwise Ctrl-C isn't processed.
549 self.ready_cond.wait(10) 549 self.ready_cond.wait(10)
550 # Something happened: self.enqueue() or a thread terminated. Loop again. 550 # Something happened: self.enqueue() or a thread terminated. Loop again.
551 finally: 551 finally:
552 self.ready_cond.release() 552 self.ready_cond.release()
553
553 assert not self.running, 'Now guaranteed to be single-threaded' 554 assert not self.running, 'Now guaranteed to be single-threaded'
554 if self.exceptions: 555 if not self.exceptions.empty():
555 # To get back the stack location correctly, the raise a, b, c form must be 556 # To get back the stack location correctly, the raise a, b, c form must be
556 # used, passing a tuple as the first argument doesn't work. 557 # used, passing a tuple as the first argument doesn't work.
557 e = self.exceptions.pop(0) 558 e = self.exceptions.get()
558 raise e[0], e[1], e[2] 559 raise e[0], e[1], e[2]
559 if self.progress: 560 if self.progress:
560 self.progress.end() 561 self.progress.end()
561 562
563 def _flush_terminated_threads(self):
564 """Flush threads that have terminated."""
565 running = self.running
566 self.running = []
567 for t in running:
568 if t.isAlive():
569 self.running.append(t)
570 else:
571 t.join()
572 t.kwargs['options'].stdout.flush()
573 if self.progress:
574 self.progress.update(1)
575 assert not t.name in self.ran
576 if not t.name in self.ran:
577 self.ran.append(t.name)
578
579 def _run_one_task(self, task_item, args, kwargs):
580 if self.jobs > 1:
581 # Start the thread.
582 index = len(self.ran) + len(self.running) + 1
583 # Copy 'options' just to be safe.
584 task_kwargs = kwargs.copy()
585 task_kwargs['options'] = copy.copy(task_kwargs['options'])
586 new_thread = self._Worker(task_item, args, task_kwargs)
587 self.running.append(new_thread)
588 new_thread.start()
589 else:
590 # Run the 'thread' inside the main thread. Don't try to catch any
591 # exception.
592 task_item.run(*args, **kwargs)
593 self.ran.append(task_item.name)
594 if self.progress:
595 self.progress.update(1)
596
562 class _Worker(threading.Thread): 597 class _Worker(threading.Thread):
563 """One thread to execute one WorkItem.""" 598 """One thread to execute one WorkItem."""
564 def __init__(self, parent, item, args=(), kwargs=None): 599 def __init__(self, item, args, kwargs):
565 threading.Thread.__init__(self, name=item.name or 'Worker') 600 threading.Thread.__init__(self, name=item.name or 'Worker')
601 logging.info(item.name)
602 self.item = item
566 self.args = args 603 self.args = args
567 self.kwargs = kwargs or {} 604 self.kwargs = kwargs
568 self.item = item
569 self.parent = parent
570 605
571 def run(self): 606 def run(self):
572 """Runs in its own thread.""" 607 """Runs in its own thread."""
573 logging.debug('running(%s)' % self.item.name) 608 logging.debug('running(%s)' % self.item.name)
574 exception = None 609 work_queue = self.kwargs['work_queue']
575 try: 610 try:
576 self.item.run(*self.args, **self.kwargs) 611 self.item.run(*self.args, **self.kwargs)
577 except Exception: 612 except Exception:
578 # Catch exception location. 613 # Catch exception location.
579 exception = sys.exc_info() 614 logging.info('Caught exception in thread %s' % self.item.name)
615 logging.info(str(sys.exc_info()))
616 work_queue.exceptions.put(sys.exc_info())
617 logging.info('Task %s done' % self.item.name)
580 618
581 # This assumes the following code won't throw an exception. Bad. 619 work_queue.ready_cond.acquire()
582 self.parent.ready_cond.acquire()
583 try: 620 try:
584 if exception: 621 work_queue.ready_cond.notifyAll()
585 self.parent.exceptions.append(exception)
586 if self.parent.progress:
587 self.parent.progress.update(1)
588 assert not self.item.name in self.parent.ran
589 if not self.item.name in self.parent.ran:
590 self.parent.ran.append(self.item.name)
591 finally: 622 finally:
592 self.parent.ready_cond.notifyAll() 623 work_queue.ready_cond.release()
593 self.parent.ready_cond.release()
OLDNEW
« no previous file with comments | « gclient.py ('k') | tests/gclient_utils_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698