Index: gclient_utils.py |
diff --git a/gclient_utils.py b/gclient_utils.py |
index 5fd2ef27a7f9fc7da14ca6683792d233574c482c..c530f4cb4cd79078635de9fe26a0912023e426a0 100644 |
--- a/gclient_utils.py |
+++ b/gclient_utils.py |
@@ -22,6 +22,7 @@ import stat |
import subprocess |
import sys |
import time |
+import threading |
import xml.dom.minidom |
import xml.parsers.expat |
@@ -363,3 +364,109 @@ def GetGClientRootAndEntries(path=None): |
execfile(config_path, env) |
config_dir = os.path.dirname(config_path) |
return config_dir, env['entries'] |
+ |
+ |
+class WorkItem(object): |
+ """One work item.""" |
+ # A list of string, each being a WorkItem name. |
+ requirements = [] |
+ # A unique string representing this work item. |
+ name = None |
+ |
+ def run(self): |
+ pass |
+ |
+ |
+class ExecutionQueue(object): |
+ """Dependencies sometime needs to be run out of order due to From() keyword. |
+ |
+ This class manages that all the required dependencies are run before running |
+ each one. |
+ |
+ Methods of this class are multithread safe. |
+ """ |
+ def __init__(self, progress): |
+ self.lock = threading.Lock() |
+ # List of WorkItem, Dependency inherits from WorkItem. |
+ self.queued = [] |
+ # List of strings representing each Dependency.name that was run. |
+ self.ran = [] |
+ # List of items currently running. |
+ self.running = [] |
+ self.progress = progress |
+ if self.progress: |
+ self.progress.update() |
+ |
+ def enqueue(self, d): |
+ """Enqueue one Dependency to be executed later once its requirements are |
+ satisfied. |
+ """ |
+ assert isinstance(d, WorkItem) |
+ try: |
+ self.lock.acquire() |
+ self.queued.append(d) |
+ total = len(self.queued) + len(self.ran) + len(self.running) |
+ finally: |
+ self.lock.release() |
+ if self.progress: |
+ self.progress._total = total + 1 |
+ self.progress.update(0) |
+ |
+ def flush(self, *args, **kwargs): |
+ """Runs all enqueued items until all are executed.""" |
+ while self._run_one_item(*args, **kwargs): |
+ pass |
+ queued = [] |
+ running = [] |
+ try: |
+ self.lock.acquire() |
+ if self.queued: |
+ queued = self.queued |
+ self.queued = [] |
+ if self.running: |
+ running = self.running |
+ self.running = [] |
+ finally: |
+ self.lock.release() |
+ if self.progress: |
+ self.progress.end() |
+ if queued: |
+ raise gclient_utils.Error('Entries still queued: %s' % str(queued)) |
+ if running: |
+ raise gclient_utils.Error('Entries still queued: %s' % str(running)) |
+ |
+ def _run_one_item(self, *args, **kwargs): |
+ """Removes one item from the queue that has all its requirements completed |
+ and execute it. |
+ |
+ Returns False if no item could be run. |
+ """ |
+ i = 0 |
+ d = None |
+ try: |
+ self.lock.acquire() |
+ while i != len(self.queued) and not d: |
+ d = self.queued.pop(i) |
+ for r in d.requirements: |
+ if not r in self.ran: |
+ self.queued.insert(i, d) |
+ d = None |
+ break |
+ i += 1 |
+ if not d: |
+ return False |
+ self.running.append(d) |
+ finally: |
+ self.lock.release() |
+ d.run(*args, **kwargs) |
+ try: |
+ self.lock.acquire() |
+ assert not d.name in self.ran |
+ if not d.name in self.ran: |
+ self.ran.append(d.name) |
+ self.running.remove(d) |
+ if self.progress: |
+ self.progress.update(1) |
+ finally: |
+ self.lock.release() |
+ return True |