| OLD | NEW |
| 1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
| 2 # | 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
| 6 # | 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # | 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and | 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. | 13 # limitations under the License. |
| 14 | 14 |
| 15 """Generic utils.""" | 15 """Generic utils.""" |
| 16 | 16 |
| 17 import copy |
| 17 import errno | 18 import errno |
| 18 import logging | 19 import logging |
| 19 import os | 20 import os |
| 21 import Queue |
| 20 import re | 22 import re |
| 21 import stat | 23 import stat |
| 22 import subprocess | 24 import subprocess |
| 23 import sys | 25 import sys |
| 24 import threading | 26 import threading |
| 25 import time | 27 import time |
| 26 import xml.dom.minidom | 28 import xml.dom.minidom |
| 27 import xml.parsers.expat | 29 import xml.parsers.expat |
| 28 | 30 |
| 29 | 31 |
| (...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 393 exec(FileRead(entries_filename), scope) | 395 exec(FileRead(entries_filename), scope) |
| 394 except SyntaxError, e: | 396 except SyntaxError, e: |
| 395 SyntaxErrorToError(filename, e) | 397 SyntaxErrorToError(filename, e) |
| 396 all_directories = scope['entries'].keys() | 398 all_directories = scope['entries'].keys() |
| 397 path_to_check = real_from_dir[len(path)+1:] | 399 path_to_check = real_from_dir[len(path)+1:] |
| 398 while path_to_check: | 400 while path_to_check: |
| 399 if path_to_check in all_directories: | 401 if path_to_check in all_directories: |
| 400 return path | 402 return path |
| 401 path_to_check = os.path.dirname(path_to_check) | 403 path_to_check = os.path.dirname(path_to_check) |
| 402 return None | 404 return None |
| 403 | 405 |
| 404 logging.info('Found gclient root at ' + path) | 406 logging.info('Found gclient root at ' + path) |
| 405 return path | 407 return path |
| 406 | 408 |
| 407 | 409 |
| 408 def PathDifference(root, subpath): | 410 def PathDifference(root, subpath): |
| 409 """Returns the difference subpath minus root.""" | 411 """Returns the difference subpath minus root.""" |
| 410 root = os.path.realpath(root) | 412 root = os.path.realpath(root) |
| 411 subpath = os.path.realpath(subpath) | 413 subpath = os.path.realpath(subpath) |
| 412 if not subpath.startswith(root): | 414 if not subpath.startswith(root): |
| 413 return None | 415 return None |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 448 return config_dir, env['entries'] | 450 return config_dir, env['entries'] |
| 449 | 451 |
| 450 | 452 |
| 451 class WorkItem(object): | 453 class WorkItem(object): |
| 452 """One work item.""" | 454 """One work item.""" |
| 453 # A list of string, each being a WorkItem name. | 455 # A list of string, each being a WorkItem name. |
| 454 requirements = [] | 456 requirements = [] |
| 455 # A unique string representing this work item. | 457 # A unique string representing this work item. |
| 456 name = None | 458 name = None |
| 457 | 459 |
| 458 def run(self): | 460 def run(self, work_queue, options): |
| 461 """work_queue and options are passed as keyword arguments so they should be |
| 462 the last parameters of the function when you override it.""" |
| 459 pass | 463 pass |
| 460 | 464 |
| 461 | 465 |
| 462 class ExecutionQueue(object): | 466 class ExecutionQueue(object): |
| 463 """Runs a set of WorkItem that have interdependencies and were WorkItem are | 467 """Runs a set of WorkItem that have interdependencies and were WorkItem are |
| 464 added as they are processed. | 468 added as they are processed. |
| 465 | 469 |
| 466 In gclient's case, Dependencies sometime needs to be run out of order due to | 470 In gclient's case, Dependencies sometime needs to be run out of order due to |
| 467 From() keyword. This class manages that all the required dependencies are run | 471 From() keyword. This class manages that all the required dependencies are run |
| 468 before running each one. | 472 before running each one. |
| 469 | 473 |
| 470 Methods of this class are thread safe. | 474 Methods of this class are thread safe. |
| 471 """ | 475 """ |
| 472 def __init__(self, jobs, progress): | 476 def __init__(self, jobs, progress): |
| 473 """jobs specifies the number of concurrent tasks to allow. progress is a | 477 """jobs specifies the number of concurrent tasks to allow. progress is a |
| 474 Progress instance.""" | 478 Progress instance.""" |
| 475 # Set when a thread is done or a new item is enqueued. | 479 # Set when a thread is done or a new item is enqueued. |
| 476 self.ready_cond = threading.Condition() | 480 self.ready_cond = threading.Condition() |
| 477 # Maximum number of concurrent tasks. | 481 # Maximum number of concurrent tasks. |
| 478 self.jobs = jobs | 482 self.jobs = jobs |
| 479 # List of WorkItem, for gclient, these are Dependency instances. | 483 # List of WorkItem, for gclient, these are Dependency instances. |
| 480 self.queued = [] | 484 self.queued = [] |
| 481 # List of strings representing each Dependency.name that was run. | 485 # List of strings representing each Dependency.name that was run. |
| 482 self.ran = [] | 486 self.ran = [] |
| 483 # List of items currently running. | 487 # List of items currently running. |
| 484 self.running = [] | 488 self.running = [] |
| 485 # Exceptions thrown if any. | 489 # Exceptions thrown if any. |
| 486 self.exceptions = [] | 490 self.exceptions = Queue.Queue() |
| 491 # Progress status |
| 487 self.progress = progress | 492 self.progress = progress |
| 488 if self.progress: | 493 if self.progress: |
| 489 self.progress.update() | 494 self.progress.update(0) |
| 490 | 495 |
| 491 def enqueue(self, d): | 496 def enqueue(self, d): |
| 492 """Enqueue one Dependency to be executed later once its requirements are | 497 """Enqueue one Dependency to be executed later once its requirements are |
| 493 satisfied. | 498 satisfied. |
| 494 """ | 499 """ |
| 495 assert isinstance(d, WorkItem) | 500 assert isinstance(d, WorkItem) |
| 496 self.ready_cond.acquire() | 501 self.ready_cond.acquire() |
| 497 try: | 502 try: |
| 498 self.queued.append(d) | 503 self.queued.append(d) |
| 499 total = len(self.queued) + len(self.ran) + len(self.running) | 504 total = len(self.queued) + len(self.ran) + len(self.running) |
| 500 logging.debug('enqueued(%s)' % d.name) | 505 logging.debug('enqueued(%s)' % d.name) |
| 501 if self.progress: | 506 if self.progress: |
| 502 self.progress._total = total + 1 | 507 self.progress._total = total + 1 |
| 503 self.progress.update(0) | 508 self.progress.update(0) |
| 504 self.ready_cond.notifyAll() | 509 self.ready_cond.notifyAll() |
| 505 finally: | 510 finally: |
| 506 self.ready_cond.release() | 511 self.ready_cond.release() |
| 507 | 512 |
| 508 def flush(self, *args, **kwargs): | 513 def flush(self, *args, **kwargs): |
| 509 """Runs all enqueued items until all are executed.""" | 514 """Runs all enqueued items until all are executed.""" |
| 515 kwargs['work_queue'] = self |
| 510 self.ready_cond.acquire() | 516 self.ready_cond.acquire() |
| 511 try: | 517 try: |
| 512 while True: | 518 while True: |
| 513 # Check for task to run first, then wait. | 519 # Check for task to run first, then wait. |
| 514 while True: | 520 while True: |
| 515 if self.exceptions: | 521 if not self.exceptions.empty(): |
| 516 # Systematically flush the queue when there is an exception logged | 522 # Systematically flush the queue when an exception logged. |
| 517 # in. | |
| 518 self.queued = [] | 523 self.queued = [] |
| 519 # Flush threads that have terminated. | 524 self._flush_terminated_threads() |
| 520 self.running = [t for t in self.running if t.isAlive()] | 525 if (not self.queued and not self.running or |
| 521 if not self.queued and not self.running: | 526 self.jobs == len(self.running)): |
| 527 # No more worker threads or can't queue anything. |
| 522 break | 528 break |
| 523 if self.jobs == len(self.running): | 529 |
| 524 break | 530 # Check for new tasks to start. |
| 525 for i in xrange(len(self.queued)): | 531 for i in xrange(len(self.queued)): |
| 526 # Verify its requirements. | 532 # Verify its requirements. |
| 527 for r in self.queued[i].requirements: | 533 for r in self.queued[i].requirements: |
| 528 if not r in self.ran: | 534 if not r in self.ran: |
| 529 # Requirement not met. | 535 # Requirement not met. |
| 530 break | 536 break |
| 531 else: | 537 else: |
| 532 # Start one work item: all its requirements are satisfied. | 538 # Start one work item: all its requirements are satisfied. |
| 533 d = self.queued.pop(i) | 539 self._run_one_task(self.queued.pop(i), args, kwargs) |
| 534 new_thread = self._Worker(self, d, args=args, kwargs=kwargs) | |
| 535 if self.jobs > 1: | |
| 536 # Start the thread. | |
| 537 self.running.append(new_thread) | |
| 538 new_thread.start() | |
| 539 else: | |
| 540 # Run the 'thread' inside the main thread. | |
| 541 new_thread.run() | |
| 542 break | 540 break |
| 543 else: | 541 else: |
| 544 # Couldn't find an item that could run. Break out the outher loop. | 542 # Couldn't find an item that could run. Break out the outher loop. |
| 545 break | 543 break |
| 544 |
| 546 if not self.queued and not self.running: | 545 if not self.queued and not self.running: |
| 546 # We're done. |
| 547 break | 547 break |
| 548 # We need to poll here otherwise Ctrl-C isn't processed. | 548 # We need to poll here otherwise Ctrl-C isn't processed. |
| 549 self.ready_cond.wait(10) | 549 self.ready_cond.wait(10) |
| 550 # Something happened: self.enqueue() or a thread terminated. Loop again. | 550 # Something happened: self.enqueue() or a thread terminated. Loop again. |
| 551 finally: | 551 finally: |
| 552 self.ready_cond.release() | 552 self.ready_cond.release() |
| 553 |
| 553 assert not self.running, 'Now guaranteed to be single-threaded' | 554 assert not self.running, 'Now guaranteed to be single-threaded' |
| 554 if self.exceptions: | 555 if not self.exceptions.empty(): |
| 555 # To get back the stack location correctly, the raise a, b, c form must be | 556 # To get back the stack location correctly, the raise a, b, c form must be |
| 556 # used, passing a tuple as the first argument doesn't work. | 557 # used, passing a tuple as the first argument doesn't work. |
| 557 e = self.exceptions.pop(0) | 558 e = self.exceptions.get() |
| 558 raise e[0], e[1], e[2] | 559 raise e[0], e[1], e[2] |
| 559 if self.progress: | 560 if self.progress: |
| 560 self.progress.end() | 561 self.progress.end() |
| 561 | 562 |
| 563 def _flush_terminated_threads(self): |
| 564 """Flush threads that have terminated.""" |
| 565 running = self.running |
| 566 self.running = [] |
| 567 for t in running: |
| 568 if t.isAlive(): |
| 569 self.running.append(t) |
| 570 else: |
| 571 t.join() |
| 572 t.kwargs['options'].stdout.flush() |
| 573 if self.progress: |
| 574 self.progress.update(1) |
| 575 assert not t.name in self.ran |
| 576 if not t.name in self.ran: |
| 577 self.ran.append(t.name) |
| 578 |
| 579 def _run_one_task(self, task_item, args, kwargs): |
| 580 if self.jobs > 1: |
| 581 # Start the thread. |
| 582 index = len(self.ran) + len(self.running) + 1 |
| 583 # Copy 'options' just to be safe. |
| 584 task_kwargs = kwargs.copy() |
| 585 task_kwargs['options'] = copy.copy(task_kwargs['options']) |
| 586 new_thread = self._Worker(task_item, args, task_kwargs) |
| 587 self.running.append(new_thread) |
| 588 new_thread.start() |
| 589 else: |
| 590 # Run the 'thread' inside the main thread. Don't try to catch any |
| 591 # exception. |
| 592 task_item.run(*args, **kwargs) |
| 593 self.ran.append(task_item.name) |
| 594 if self.progress: |
| 595 self.progress.update(1) |
| 596 |
| 562 class _Worker(threading.Thread): | 597 class _Worker(threading.Thread): |
| 563 """One thread to execute one WorkItem.""" | 598 """One thread to execute one WorkItem.""" |
| 564 def __init__(self, parent, item, args=(), kwargs=None): | 599 def __init__(self, item, args, kwargs): |
| 565 threading.Thread.__init__(self, name=item.name or 'Worker') | 600 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 601 logging.info(item.name) |
| 602 self.item = item |
| 566 self.args = args | 603 self.args = args |
| 567 self.kwargs = kwargs or {} | 604 self.kwargs = kwargs |
| 568 self.item = item | |
| 569 self.parent = parent | |
| 570 | 605 |
| 571 def run(self): | 606 def run(self): |
| 572 """Runs in its own thread.""" | 607 """Runs in its own thread.""" |
| 573 logging.debug('running(%s)' % self.item.name) | 608 logging.debug('running(%s)' % self.item.name) |
| 574 exception = None | 609 work_queue = self.kwargs['work_queue'] |
| 575 try: | 610 try: |
| 576 self.item.run(*self.args, **self.kwargs) | 611 self.item.run(*self.args, **self.kwargs) |
| 577 except Exception: | 612 except Exception: |
| 578 # Catch exception location. | 613 # Catch exception location. |
| 579 exception = sys.exc_info() | 614 logging.info('Caught exception in thread %s' % self.item.name) |
| 615 logging.info(str(sys.exc_info())) |
| 616 work_queue.exceptions.put(sys.exc_info()) |
| 617 logging.info('Task %s done' % self.item.name) |
| 580 | 618 |
| 581 # This assumes the following code won't throw an exception. Bad. | 619 work_queue.ready_cond.acquire() |
| 582 self.parent.ready_cond.acquire() | |
| 583 try: | 620 try: |
| 584 if exception: | 621 work_queue.ready_cond.notifyAll() |
| 585 self.parent.exceptions.append(exception) | |
| 586 if self.parent.progress: | |
| 587 self.parent.progress.update(1) | |
| 588 assert not self.item.name in self.parent.ran | |
| 589 if not self.item.name in self.parent.ran: | |
| 590 self.parent.ran.append(self.item.name) | |
| 591 finally: | 622 finally: |
| 592 self.parent.ready_cond.notifyAll() | 623 work_queue.ready_cond.release() |
| 593 self.parent.ready_cond.release() | |
| OLD | NEW |