OLD | NEW |
1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
2 # | 2 # |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
6 # | 6 # |
7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
8 # | 8 # |
9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 # See the License for the specific language governing permissions and | 12 # See the License for the specific language governing permissions and |
13 # limitations under the License. | 13 # limitations under the License. |
14 | 14 |
15 """Generic utils.""" | 15 """Generic utils.""" |
16 | 16 |
17 import errno | 17 import errno |
18 import logging | 18 import logging |
19 import os | 19 import os |
20 import re | 20 import re |
21 import stat | 21 import stat |
22 import subprocess | 22 import subprocess |
23 import sys | 23 import sys |
| 24 import threading |
24 import time | 25 import time |
25 import threading | 26 import threading |
26 import xml.dom.minidom | 27 import xml.dom.minidom |
27 import xml.parsers.expat | 28 import xml.parsers.expat |
28 | 29 |
29 | 30 |
30 class CheckCallError(OSError): | 31 class CheckCallError(OSError): |
31 """CheckCall() returned non-0.""" | 32 """CheckCall() returned non-0.""" |
32 def __init__(self, command, cwd, retcode, stdout, stderr=None): | 33 def __init__(self, command, cwd, retcode, stdout, stderr=None): |
33 OSError.__init__(self, command, cwd, retcode, stdout, stderr) | 34 OSError.__init__(self, command, cwd, retcode, stdout, stderr) |
(...skipping 337 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
371 # A list of string, each being a WorkItem name. | 372 # A list of string, each being a WorkItem name. |
372 requirements = [] | 373 requirements = [] |
373 # A unique string representing this work item. | 374 # A unique string representing this work item. |
374 name = None | 375 name = None |
375 | 376 |
376 def run(self): | 377 def run(self): |
377 pass | 378 pass |
378 | 379 |
379 | 380 |
380 class ExecutionQueue(object): | 381 class ExecutionQueue(object): |
381 """Dependencies sometime needs to be run out of order due to From() keyword. | 382 """Runs a set of WorkItem that have interdependencies and were WorkItem are |
| 383 added as they are processed. |
382 | 384 |
383 This class manages that all the required dependencies are run before running | 385 In gclient's case, Dependencies sometime needs to be run out of order due to |
384 each one. | 386 From() keyword. This class manages that all the required dependencies are run |
| 387 before running each one. |
385 | 388 |
386 Methods of this class are multithread safe. | 389 Methods of this class are thread safe. |
387 """ | 390 """ |
388 def __init__(self, progress): | 391 def __init__(self, jobs, progress): |
389 self.lock = threading.Lock() | 392 """jobs specifies the number of concurrent tasks to allow. progress is a |
390 # List of WorkItem, Dependency inherits from WorkItem. | 393 Progress instance.""" |
| 394 # Set when a thread is done or a new item is enqueued. |
| 395 self.ready_cond = threading.Condition() |
| 396 # Maximum number of concurrent tasks. |
| 397 self.jobs = jobs |
| 398 # List of WorkItem, for gclient, these are Dependency instances. |
391 self.queued = [] | 399 self.queued = [] |
392 # List of strings representing each Dependency.name that was run. | 400 # List of strings representing each Dependency.name that was run. |
393 self.ran = [] | 401 self.ran = [] |
394 # List of items currently running. | 402 # List of items currently running. |
395 self.running = [] | 403 self.running = [] |
| 404 # Exceptions thrown if any. |
| 405 self.exceptions = [] |
396 self.progress = progress | 406 self.progress = progress |
397 if self.progress: | 407 if self.progress: |
398 self.progress.update() | 408 self.progress.update() |
399 | 409 |
400 def enqueue(self, d): | 410 def enqueue(self, d): |
401 """Enqueue one Dependency to be executed later once its requirements are | 411 """Enqueue one Dependency to be executed later once its requirements are |
402 satisfied. | 412 satisfied. |
403 """ | 413 """ |
404 assert isinstance(d, WorkItem) | 414 assert isinstance(d, WorkItem) |
| 415 self.ready_cond.acquire() |
405 try: | 416 try: |
406 self.lock.acquire() | |
407 self.queued.append(d) | 417 self.queued.append(d) |
408 total = len(self.queued) + len(self.ran) + len(self.running) | 418 total = len(self.queued) + len(self.ran) + len(self.running) |
| 419 logging.debug('enqueued(%s)' % d.name) |
| 420 if self.progress: |
| 421 self.progress._total = total + 1 |
| 422 self.progress.update(0) |
| 423 self.ready_cond.notifyAll() |
409 finally: | 424 finally: |
410 self.lock.release() | 425 self.ready_cond.release() |
411 if self.progress: | |
412 self.progress._total = total + 1 | |
413 self.progress.update(0) | |
414 | 426 |
415 def flush(self, *args, **kwargs): | 427 def flush(self, *args, **kwargs): |
416 """Runs all enqueued items until all are executed.""" | 428 """Runs all enqueued items until all are executed.""" |
417 while self._run_one_item(*args, **kwargs): | 429 self.ready_cond.acquire() |
418 pass | |
419 queued = [] | |
420 running = [] | |
421 try: | 430 try: |
422 self.lock.acquire() | 431 while True: |
423 if self.queued: | 432 # Check for task to run first, then wait. |
424 queued = self.queued | 433 while True: |
425 self.queued = [] | 434 if self.exceptions: |
426 if self.running: | 435 # Systematically flush the queue when there is an exception logged |
427 running = self.running | 436 # in. |
428 self.running = [] | 437 self.queued = [] |
| 438 # Flush threads that have terminated. |
| 439 self.running = [t for t in self.running if t.isAlive()] |
| 440 if not self.queued and not self.running: |
| 441 break |
| 442 if self.jobs == len(self.running): |
| 443 break |
| 444 for i in xrange(len(self.queued)): |
| 445 # Verify its requirements. |
| 446 for r in self.queued[i].requirements: |
| 447 if not r in self.ran: |
| 448 # Requirement not met. |
| 449 break |
| 450 else: |
| 451 # Start one work item: all its requirements are satisfied. |
| 452 d = self.queued.pop(i) |
| 453 new_thread = self._Worker(self, d, args=args, kwargs=kwargs) |
| 454 if self.jobs > 1: |
| 455 # Start the thread. |
| 456 self.running.append(new_thread) |
| 457 new_thread.start() |
| 458 else: |
| 459 # Run the 'thread' inside the main thread. |
| 460 new_thread.run() |
| 461 break |
| 462 else: |
| 463 # Couldn't find an item that could run. Break out the outher loop. |
| 464 break |
| 465 if not self.queued and not self.running: |
| 466 break |
| 467 # We need to poll here otherwise Ctrl-C isn't processed. |
| 468 self.ready_cond.wait(10) |
| 469 # Something happened: self.enqueue() or a thread terminated. Loop again. |
429 finally: | 470 finally: |
430 self.lock.release() | 471 self.ready_cond.release() |
| 472 assert not self.running, 'Now guaranteed to be single-threaded' |
| 473 if self.exceptions: |
| 474 # TODO(maruel): Get back the original stack location. |
| 475 raise self.exceptions.pop(0) |
431 if self.progress: | 476 if self.progress: |
432 self.progress.end() | 477 self.progress.end() |
433 if queued: | |
434 raise gclient_utils.Error('Entries still queued: %s' % str(queued)) | |
435 if running: | |
436 raise gclient_utils.Error('Entries still queued: %s' % str(running)) | |
437 | 478 |
438 def _run_one_item(self, *args, **kwargs): | 479 class _Worker(threading.Thread): |
439 """Removes one item from the queue that has all its requirements completed | 480 """One thread to execute one WorkItem.""" |
440 and execute it. | 481 def __init__(self, parent, item, args=(), kwargs=None): |
| 482 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 483 self.args = args |
| 484 self.kwargs = kwargs or {} |
| 485 self.item = item |
| 486 self.parent = parent |
441 | 487 |
442 Returns False if no item could be run. | 488 def run(self): |
443 """ | 489 """Runs in its own thread.""" |
444 i = 0 | 490 logging.debug('running(%s)' % self.item.name) |
445 d = None | 491 exception = None |
446 try: | 492 try: |
447 self.lock.acquire() | 493 self.item.run(*self.args, **self.kwargs) |
448 while i != len(self.queued) and not d: | 494 except Exception, e: |
449 d = self.queued.pop(i) | 495 # TODO(maruel): Catch exception location. |
450 for r in d.requirements: | 496 exception = e |
451 if not r in self.ran: | 497 |
452 self.queued.insert(i, d) | 498 # This assumes the following code won't throw an exception. Bad. |
453 d = None | 499 self.parent.ready_cond.acquire() |
454 break | 500 try: |
455 i += 1 | 501 if exception: |
456 if not d: | 502 self.parent.exceptions.append(exception) |
457 return False | 503 if self.parent.progress: |
458 self.running.append(d) | 504 self.parent.progress.update(1) |
459 finally: | 505 assert not self.item.name in self.parent.ran |
460 self.lock.release() | 506 if not self.item.name in self.parent.ran: |
461 d.run(*args, **kwargs) | 507 self.parent.ran.append(self.item.name) |
462 try: | 508 finally: |
463 self.lock.acquire() | 509 self.parent.ready_cond.notifyAll() |
464 assert not d.name in self.ran | 510 self.parent.ready_cond.release() |
465 if not d.name in self.ran: | |
466 self.ran.append(d.name) | |
467 self.running.remove(d) | |
468 if self.progress: | |
469 self.progress.update(1) | |
470 finally: | |
471 self.lock.release() | |
472 return True | |
OLD | NEW |