| OLD | NEW |
| 1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
| 2 # | 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
| 6 # | 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # | 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and | 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. | 13 # limitations under the License. |
| 14 | 14 |
| 15 """Generic utils.""" | 15 """Generic utils.""" |
| 16 | 16 |
| 17 import errno | 17 import errno |
| 18 import logging | 18 import logging |
| 19 import os | 19 import os |
| 20 import re | 20 import re |
| 21 import stat | 21 import stat |
| 22 import subprocess | 22 import subprocess |
| 23 import sys | 23 import sys |
| 24 import threading |
| 24 import time | 25 import time |
| 25 import threading | 26 import threading |
| 26 import xml.dom.minidom | 27 import xml.dom.minidom |
| 27 import xml.parsers.expat | 28 import xml.parsers.expat |
| 28 | 29 |
| 29 | 30 |
| 30 class CheckCallError(OSError): | 31 class CheckCallError(OSError): |
| 31 """CheckCall() returned non-0.""" | 32 """CheckCall() returned non-0.""" |
| 32 def __init__(self, command, cwd, retcode, stdout, stderr=None): | 33 def __init__(self, command, cwd, retcode, stdout, stderr=None): |
| 33 OSError.__init__(self, command, cwd, retcode, stdout, stderr) | 34 OSError.__init__(self, command, cwd, retcode, stdout, stderr) |
| (...skipping 337 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 371 # A list of string, each being a WorkItem name. | 372 # A list of string, each being a WorkItem name. |
| 372 requirements = [] | 373 requirements = [] |
| 373 # A unique string representing this work item. | 374 # A unique string representing this work item. |
| 374 name = None | 375 name = None |
| 375 | 376 |
| 376 def run(self): | 377 def run(self): |
| 377 pass | 378 pass |
| 378 | 379 |
| 379 | 380 |
| 380 class ExecutionQueue(object): | 381 class ExecutionQueue(object): |
| 381 """Dependencies sometime needs to be run out of order due to From() keyword. | 382 """Runs a set of WorkItem that have interdependencies and were WorkItem are |
| 383 added as they are processed. |
| 382 | 384 |
| 383 This class manages that all the required dependencies are run before running | 385 In gclient's case, Dependencies sometime needs to be run out of order due to |
| 384 each one. | 386 From() keyword. This class manages that all the required dependencies are run |
| 387 before running each one. |
| 385 | 388 |
| 386 Methods of this class are multithread safe. | 389 Methods of this class are thread safe. |
| 387 """ | 390 """ |
| 388 def __init__(self, progress): | 391 def __init__(self, jobs, progress): |
| 389 self.lock = threading.Lock() | 392 """jobs specifies the number of concurrent tasks to allow. progress is a |
| 390 # List of WorkItem, Dependency inherits from WorkItem. | 393 Progress instance.""" |
| 394 # Set when a thread is done or a new item is enqueued. |
| 395 self.ready_cond = threading.Condition() |
| 396 # Maximum number of concurrent tasks. |
| 397 self.jobs = jobs |
| 398 # List of WorkItem, for gclient, these are Dependency instances. |
| 391 self.queued = [] | 399 self.queued = [] |
| 392 # List of strings representing each Dependency.name that was run. | 400 # List of strings representing each Dependency.name that was run. |
| 393 self.ran = [] | 401 self.ran = [] |
| 394 # List of items currently running. | 402 # List of items currently running. |
| 395 self.running = [] | 403 self.running = [] |
| 404 # Exceptions thrown if any. |
| 405 self.exceptions = [] |
| 396 self.progress = progress | 406 self.progress = progress |
| 397 if self.progress: | 407 if self.progress: |
| 398 self.progress.update() | 408 self.progress.update() |
| 399 | 409 |
| 400 def enqueue(self, d): | 410 def enqueue(self, d): |
| 401 """Enqueue one Dependency to be executed later once its requirements are | 411 """Enqueue one Dependency to be executed later once its requirements are |
| 402 satisfied. | 412 satisfied. |
| 403 """ | 413 """ |
| 404 assert isinstance(d, WorkItem) | 414 assert isinstance(d, WorkItem) |
| 415 self.ready_cond.acquire() |
| 405 try: | 416 try: |
| 406 self.lock.acquire() | |
| 407 self.queued.append(d) | 417 self.queued.append(d) |
| 408 total = len(self.queued) + len(self.ran) + len(self.running) | 418 total = len(self.queued) + len(self.ran) + len(self.running) |
| 419 logging.debug('enqueued(%s)' % d.name) |
| 420 if self.progress: |
| 421 self.progress._total = total + 1 |
| 422 self.progress.update(0) |
| 423 self.ready_cond.notifyAll() |
| 409 finally: | 424 finally: |
| 410 self.lock.release() | 425 self.ready_cond.release() |
| 411 if self.progress: | |
| 412 self.progress._total = total + 1 | |
| 413 self.progress.update(0) | |
| 414 | 426 |
| 415 def flush(self, *args, **kwargs): | 427 def flush(self, *args, **kwargs): |
| 416 """Runs all enqueued items until all are executed.""" | 428 """Runs all enqueued items until all are executed.""" |
| 417 while self._run_one_item(*args, **kwargs): | 429 self.ready_cond.acquire() |
| 418 pass | |
| 419 queued = [] | |
| 420 running = [] | |
| 421 try: | 430 try: |
| 422 self.lock.acquire() | 431 while True: |
| 423 if self.queued: | 432 # Check for task to run first, then wait. |
| 424 queued = self.queued | 433 while True: |
| 425 self.queued = [] | 434 if self.exceptions: |
| 426 if self.running: | 435 # Systematically flush the queue when there is an exception logged |
| 427 running = self.running | 436 # in. |
| 428 self.running = [] | 437 self.queued = [] |
| 438 # Flush threads that have terminated. |
| 439 self.running = [t for t in self.running if t.isAlive()] |
| 440 if not self.queued and not self.running: |
| 441 break |
| 442 if self.jobs == len(self.running): |
| 443 break |
| 444 for i in xrange(len(self.queued)): |
| 445 # Verify its requirements. |
| 446 for r in self.queued[i].requirements: |
| 447 if not r in self.ran: |
| 448 # Requirement not met. |
| 449 break |
| 450 else: |
| 451 # Start one work item: all its requirements are satisfied. |
| 452 d = self.queued.pop(i) |
| 453 new_thread = self._Worker(self, d, args=args, kwargs=kwargs) |
| 454 if self.jobs > 1: |
| 455 # Start the thread. |
| 456 self.running.append(new_thread) |
| 457 new_thread.start() |
| 458 else: |
| 459 # Run the 'thread' inside the main thread. |
| 460 new_thread.run() |
| 461 break |
| 462 else: |
| 463 # Couldn't find an item that could run. Break out the outher loop. |
| 464 break |
| 465 if not self.queued and not self.running: |
| 466 break |
| 467 # We need to poll here otherwise Ctrl-C isn't processed. |
| 468 self.ready_cond.wait(10) |
| 469 # Something happened: self.enqueue() or a thread terminated. Loop again. |
| 429 finally: | 470 finally: |
| 430 self.lock.release() | 471 self.ready_cond.release() |
| 472 assert not self.running, 'Now guaranteed to be single-threaded' |
| 473 if self.exceptions: |
| 474 # TODO(maruel): Get back the original stack location. |
| 475 raise self.exceptions.pop(0) |
| 431 if self.progress: | 476 if self.progress: |
| 432 self.progress.end() | 477 self.progress.end() |
| 433 if queued: | |
| 434 raise gclient_utils.Error('Entries still queued: %s' % str(queued)) | |
| 435 if running: | |
| 436 raise gclient_utils.Error('Entries still queued: %s' % str(running)) | |
| 437 | 478 |
| 438 def _run_one_item(self, *args, **kwargs): | 479 class _Worker(threading.Thread): |
| 439 """Removes one item from the queue that has all its requirements completed | 480 """One thread to execute one WorkItem.""" |
| 440 and execute it. | 481 def __init__(self, parent, item, args=(), kwargs=None): |
| 482 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 483 self.args = args |
| 484 self.kwargs = kwargs or {} |
| 485 self.item = item |
| 486 self.parent = parent |
| 441 | 487 |
| 442 Returns False if no item could be run. | 488 def run(self): |
| 443 """ | 489 """Runs in its own thread.""" |
| 444 i = 0 | 490 logging.debug('running(%s)' % self.item.name) |
| 445 d = None | 491 exception = None |
| 446 try: | 492 try: |
| 447 self.lock.acquire() | 493 self.item.run(*self.args, **self.kwargs) |
| 448 while i != len(self.queued) and not d: | 494 except Exception, e: |
| 449 d = self.queued.pop(i) | 495 # TODO(maruel): Catch exception location. |
| 450 for r in d.requirements: | 496 exception = e |
| 451 if not r in self.ran: | 497 |
| 452 self.queued.insert(i, d) | 498 # This assumes the following code won't throw an exception. Bad. |
| 453 d = None | 499 self.parent.ready_cond.acquire() |
| 454 break | 500 try: |
| 455 i += 1 | 501 if exception: |
| 456 if not d: | 502 self.parent.exceptions.append(exception) |
| 457 return False | 503 if self.parent.progress: |
| 458 self.running.append(d) | 504 self.parent.progress.update(1) |
| 459 finally: | 505 assert not self.item.name in self.parent.ran |
| 460 self.lock.release() | 506 if not self.item.name in self.parent.ran: |
| 461 d.run(*args, **kwargs) | 507 self.parent.ran.append(self.item.name) |
| 462 try: | 508 finally: |
| 463 self.lock.acquire() | 509 self.parent.ready_cond.notifyAll() |
| 464 assert not d.name in self.ran | 510 self.parent.ready_cond.release() |
| 465 if not d.name in self.ran: | |
| 466 self.ran.append(d.name) | |
| 467 self.running.remove(d) | |
| 468 if self.progress: | |
| 469 self.progress.update(1) | |
| 470 finally: | |
| 471 self.lock.release() | |
| 472 return True | |
| OLD | NEW |