OLD | NEW |
1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
2 # | 2 # |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
6 # | 6 # |
7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
8 # | 8 # |
9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 # See the License for the specific language governing permissions and | 12 # See the License for the specific language governing permissions and |
13 # limitations under the License. | 13 # limitations under the License. |
14 | 14 |
15 """Generic utils.""" | 15 """Generic utils.""" |
16 | 16 |
| 17 import copy |
17 import errno | 18 import errno |
18 import logging | 19 import logging |
19 import os | 20 import os |
| 21 import Queue |
20 import re | 22 import re |
21 import stat | 23 import stat |
22 import subprocess | 24 import subprocess |
23 import sys | 25 import sys |
24 import threading | 26 import threading |
25 import time | 27 import time |
26 import xml.dom.minidom | 28 import xml.dom.minidom |
27 import xml.parsers.expat | 29 import xml.parsers.expat |
28 | 30 |
29 | 31 |
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
393 exec(FileRead(entries_filename), scope) | 395 exec(FileRead(entries_filename), scope) |
394 except SyntaxError, e: | 396 except SyntaxError, e: |
395 SyntaxErrorToError(filename, e) | 397 SyntaxErrorToError(filename, e) |
396 all_directories = scope['entries'].keys() | 398 all_directories = scope['entries'].keys() |
397 path_to_check = real_from_dir[len(path)+1:] | 399 path_to_check = real_from_dir[len(path)+1:] |
398 while path_to_check: | 400 while path_to_check: |
399 if path_to_check in all_directories: | 401 if path_to_check in all_directories: |
400 return path | 402 return path |
401 path_to_check = os.path.dirname(path_to_check) | 403 path_to_check = os.path.dirname(path_to_check) |
402 return None | 404 return None |
403 | 405 |
404 logging.info('Found gclient root at ' + path) | 406 logging.info('Found gclient root at ' + path) |
405 return path | 407 return path |
406 | 408 |
407 | 409 |
408 def PathDifference(root, subpath): | 410 def PathDifference(root, subpath): |
409 """Returns the difference subpath minus root.""" | 411 """Returns the difference subpath minus root.""" |
410 root = os.path.realpath(root) | 412 root = os.path.realpath(root) |
411 subpath = os.path.realpath(subpath) | 413 subpath = os.path.realpath(subpath) |
412 if not subpath.startswith(root): | 414 if not subpath.startswith(root): |
413 return None | 415 return None |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
448 return config_dir, env['entries'] | 450 return config_dir, env['entries'] |
449 | 451 |
450 | 452 |
451 class WorkItem(object): | 453 class WorkItem(object): |
452 """One work item.""" | 454 """One work item.""" |
453 # A list of string, each being a WorkItem name. | 455 # A list of string, each being a WorkItem name. |
454 requirements = [] | 456 requirements = [] |
455 # A unique string representing this work item. | 457 # A unique string representing this work item. |
456 name = None | 458 name = None |
457 | 459 |
458 def run(self): | 460 def run(self, work_queue, options): |
| 461 """work_queue and options are passed as keyword arguments so they should be |
| 462 the last parameters of the function when you override it.""" |
459 pass | 463 pass |
460 | 464 |
461 | 465 |
462 class ExecutionQueue(object): | 466 class ExecutionQueue(object): |
463 """Runs a set of WorkItem that have interdependencies and were WorkItem are | 467 """Runs a set of WorkItem that have interdependencies and were WorkItem are |
464 added as they are processed. | 468 added as they are processed. |
465 | 469 |
466 In gclient's case, Dependencies sometime needs to be run out of order due to | 470 In gclient's case, Dependencies sometime needs to be run out of order due to |
467 From() keyword. This class manages that all the required dependencies are run | 471 From() keyword. This class manages that all the required dependencies are run |
468 before running each one. | 472 before running each one. |
469 | 473 |
470 Methods of this class are thread safe. | 474 Methods of this class are thread safe. |
471 """ | 475 """ |
472 def __init__(self, jobs, progress): | 476 def __init__(self, jobs, progress): |
473 """jobs specifies the number of concurrent tasks to allow. progress is a | 477 """jobs specifies the number of concurrent tasks to allow. progress is a |
474 Progress instance.""" | 478 Progress instance.""" |
475 # Set when a thread is done or a new item is enqueued. | 479 # Set when a thread is done or a new item is enqueued. |
476 self.ready_cond = threading.Condition() | 480 self.ready_cond = threading.Condition() |
477 # Maximum number of concurrent tasks. | 481 # Maximum number of concurrent tasks. |
478 self.jobs = jobs | 482 self.jobs = jobs |
479 # List of WorkItem, for gclient, these are Dependency instances. | 483 # List of WorkItem, for gclient, these are Dependency instances. |
480 self.queued = [] | 484 self.queued = [] |
481 # List of strings representing each Dependency.name that was run. | 485 # List of strings representing each Dependency.name that was run. |
482 self.ran = [] | 486 self.ran = [] |
483 # List of items currently running. | 487 # List of items currently running. |
484 self.running = [] | 488 self.running = [] |
485 # Exceptions thrown if any. | 489 # Exceptions thrown if any. |
486 self.exceptions = [] | 490 self.exceptions = Queue.Queue() |
| 491 # Progress status |
487 self.progress = progress | 492 self.progress = progress |
488 if self.progress: | 493 if self.progress: |
489 self.progress.update() | 494 self.progress.update(0) |
490 | 495 |
491 def enqueue(self, d): | 496 def enqueue(self, d): |
492 """Enqueue one Dependency to be executed later once its requirements are | 497 """Enqueue one Dependency to be executed later once its requirements are |
493 satisfied. | 498 satisfied. |
494 """ | 499 """ |
495 assert isinstance(d, WorkItem) | 500 assert isinstance(d, WorkItem) |
496 self.ready_cond.acquire() | 501 self.ready_cond.acquire() |
497 try: | 502 try: |
498 self.queued.append(d) | 503 self.queued.append(d) |
499 total = len(self.queued) + len(self.ran) + len(self.running) | 504 total = len(self.queued) + len(self.ran) + len(self.running) |
500 logging.debug('enqueued(%s)' % d.name) | 505 logging.debug('enqueued(%s)' % d.name) |
501 if self.progress: | 506 if self.progress: |
502 self.progress._total = total + 1 | 507 self.progress._total = total + 1 |
503 self.progress.update(0) | 508 self.progress.update(0) |
504 self.ready_cond.notifyAll() | 509 self.ready_cond.notifyAll() |
505 finally: | 510 finally: |
506 self.ready_cond.release() | 511 self.ready_cond.release() |
507 | 512 |
508 def flush(self, *args, **kwargs): | 513 def flush(self, *args, **kwargs): |
509 """Runs all enqueued items until all are executed.""" | 514 """Runs all enqueued items until all are executed.""" |
| 515 kwargs['work_queue'] = self |
510 self.ready_cond.acquire() | 516 self.ready_cond.acquire() |
511 try: | 517 try: |
512 while True: | 518 while True: |
513 # Check for task to run first, then wait. | 519 # Check for task to run first, then wait. |
514 while True: | 520 while True: |
515 if self.exceptions: | 521 if not self.exceptions.empty(): |
516 # Systematically flush the queue when there is an exception logged | 522 # Systematically flush the queue when an exception logged. |
517 # in. | |
518 self.queued = [] | 523 self.queued = [] |
519 # Flush threads that have terminated. | 524 self._flush_terminated_threads() |
520 self.running = [t for t in self.running if t.isAlive()] | 525 if (not self.queued and not self.running or |
521 if not self.queued and not self.running: | 526 self.jobs == len(self.running)): |
| 527 # No more worker threads or can't queue anything. |
522 break | 528 break |
523 if self.jobs == len(self.running): | 529 |
524 break | 530 # Check for new tasks to start. |
525 for i in xrange(len(self.queued)): | 531 for i in xrange(len(self.queued)): |
526 # Verify its requirements. | 532 # Verify its requirements. |
527 for r in self.queued[i].requirements: | 533 for r in self.queued[i].requirements: |
528 if not r in self.ran: | 534 if not r in self.ran: |
529 # Requirement not met. | 535 # Requirement not met. |
530 break | 536 break |
531 else: | 537 else: |
532 # Start one work item: all its requirements are satisfied. | 538 # Start one work item: all its requirements are satisfied. |
533 d = self.queued.pop(i) | 539 self._run_one_task(self.queued.pop(i), args, kwargs) |
534 new_thread = self._Worker(self, d, args=args, kwargs=kwargs) | |
535 if self.jobs > 1: | |
536 # Start the thread. | |
537 self.running.append(new_thread) | |
538 new_thread.start() | |
539 else: | |
540 # Run the 'thread' inside the main thread. | |
541 new_thread.run() | |
542 break | 540 break |
543 else: | 541 else: |
544 # Couldn't find an item that could run. Break out the outher loop. | 542 # Couldn't find an item that could run. Break out the outher loop. |
545 break | 543 break |
| 544 |
546 if not self.queued and not self.running: | 545 if not self.queued and not self.running: |
| 546 # We're done. |
547 break | 547 break |
548 # We need to poll here otherwise Ctrl-C isn't processed. | 548 # We need to poll here otherwise Ctrl-C isn't processed. |
549 self.ready_cond.wait(10) | 549 self.ready_cond.wait(10) |
550 # Something happened: self.enqueue() or a thread terminated. Loop again. | 550 # Something happened: self.enqueue() or a thread terminated. Loop again. |
551 finally: | 551 finally: |
552 self.ready_cond.release() | 552 self.ready_cond.release() |
| 553 |
553 assert not self.running, 'Now guaranteed to be single-threaded' | 554 assert not self.running, 'Now guaranteed to be single-threaded' |
554 if self.exceptions: | 555 if not self.exceptions.empty(): |
555 # To get back the stack location correctly, the raise a, b, c form must be | 556 # To get back the stack location correctly, the raise a, b, c form must be |
556 # used, passing a tuple as the first argument doesn't work. | 557 # used, passing a tuple as the first argument doesn't work. |
557 e = self.exceptions.pop(0) | 558 e = self.exceptions.get() |
558 raise e[0], e[1], e[2] | 559 raise e[0], e[1], e[2] |
559 if self.progress: | 560 if self.progress: |
560 self.progress.end() | 561 self.progress.end() |
561 | 562 |
| 563 def _flush_terminated_threads(self): |
| 564 """Flush threads that have terminated.""" |
| 565 running = self.running |
| 566 self.running = [] |
| 567 for t in running: |
| 568 if t.isAlive(): |
| 569 self.running.append(t) |
| 570 else: |
| 571 t.join() |
| 572 t.kwargs['options'].stdout.flush() |
| 573 if self.progress: |
| 574 self.progress.update(1) |
| 575 assert not t.name in self.ran |
| 576 if not t.name in self.ran: |
| 577 self.ran.append(t.name) |
| 578 |
| 579 def _run_one_task(self, task_item, args, kwargs): |
| 580 if self.jobs > 1: |
| 581 # Start the thread. |
| 582 index = len(self.ran) + len(self.running) + 1 |
| 583 # Copy 'options' just to be safe. |
| 584 task_kwargs = kwargs.copy() |
| 585 task_kwargs['options'] = copy.copy(task_kwargs['options']) |
| 586 new_thread = self._Worker(task_item, args, task_kwargs) |
| 587 self.running.append(new_thread) |
| 588 new_thread.start() |
| 589 else: |
| 590 # Run the 'thread' inside the main thread. Don't try to catch any |
| 591 # exception. |
| 592 task_item.run(*args, **kwargs) |
| 593 self.ran.append(task_item.name) |
| 594 if self.progress: |
| 595 self.progress.update(1) |
| 596 |
562 class _Worker(threading.Thread): | 597 class _Worker(threading.Thread): |
563 """One thread to execute one WorkItem.""" | 598 """One thread to execute one WorkItem.""" |
564 def __init__(self, parent, item, args=(), kwargs=None): | 599 def __init__(self, item, args, kwargs): |
565 threading.Thread.__init__(self, name=item.name or 'Worker') | 600 threading.Thread.__init__(self, name=item.name or 'Worker') |
| 601 logging.info(item.name) |
| 602 self.item = item |
566 self.args = args | 603 self.args = args |
567 self.kwargs = kwargs or {} | 604 self.kwargs = kwargs |
568 self.item = item | |
569 self.parent = parent | |
570 | 605 |
571 def run(self): | 606 def run(self): |
572 """Runs in its own thread.""" | 607 """Runs in its own thread.""" |
573 logging.debug('running(%s)' % self.item.name) | 608 logging.debug('running(%s)' % self.item.name) |
574 exception = None | 609 work_queue = self.kwargs['work_queue'] |
575 try: | 610 try: |
576 self.item.run(*self.args, **self.kwargs) | 611 self.item.run(*self.args, **self.kwargs) |
577 except Exception: | 612 except Exception: |
578 # Catch exception location. | 613 # Catch exception location. |
579 exception = sys.exc_info() | 614 logging.info('Caught exception in thread %s' % self.item.name) |
| 615 logging.info(str(sys.exc_info())) |
| 616 work_queue.exceptions.put(sys.exc_info()) |
| 617 logging.info('Task %s done' % self.item.name) |
580 | 618 |
581 # This assumes the following code won't throw an exception. Bad. | 619 work_queue.ready_cond.acquire() |
582 self.parent.ready_cond.acquire() | |
583 try: | 620 try: |
584 if exception: | 621 work_queue.ready_cond.notifyAll() |
585 self.parent.exceptions.append(exception) | |
586 if self.parent.progress: | |
587 self.parent.progress.update(1) | |
588 assert not self.item.name in self.parent.ran | |
589 if not self.item.name in self.parent.ran: | |
590 self.parent.ran.append(self.item.name) | |
591 finally: | 622 finally: |
592 self.parent.ready_cond.notifyAll() | 623 work_queue.ready_cond.release() |
593 self.parent.ready_cond.release() | |
OLD | NEW |