Index: scons-2.0.1/engine/SCons/Job.py |
=================================================================== |
--- scons-2.0.1/engine/SCons/Job.py (revision 0) |
+++ scons-2.0.1/engine/SCons/Job.py (revision 0) |
@@ -0,0 +1,435 @@ |
+"""SCons.Job |
+ |
+This module defines the Serial and Parallel classes that execute tasks to |
+complete a build. The Jobs class provides a higher level interface to start, |
+stop, and wait on jobs. |
+ |
+""" |
+ |
+# |
+# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 The SCons Foundation |
+# |
+# Permission is hereby granted, free of charge, to any person obtaining |
+# a copy of this software and associated documentation files (the |
+# "Software"), to deal in the Software without restriction, including |
+# without limitation the rights to use, copy, modify, merge, publish, |
+# distribute, sublicense, and/or sell copies of the Software, and to |
+# permit persons to whom the Software is furnished to do so, subject to |
+# the following conditions: |
+# |
+# The above copyright notice and this permission notice shall be included |
+# in all copies or substantial portions of the Software. |
+# |
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY |
+# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE |
+# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE |
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
+# |
+ |
+__revision__ = "src/engine/SCons/Job.py 5134 2010/08/16 23:02:40 bdeegan" |
+ |
+import SCons.compat |
+ |
+import os |
+import signal |
+ |
+import SCons.Errors |
+ |
+# The default stack size (in kilobytes) of the threads used to execute |
+# jobs in parallel. |
+# |
+# We use a stack size of 256 kilobytes. The default on some platforms |
+# is too large and prevents us from creating enough threads to fully |
+# parallelized the build. For example, the default stack size on linux |
+# is 8 MBytes. |
+ |
+explicit_stack_size = None |
+default_stack_size = 256 |
+ |
+interrupt_msg = 'Build interrupted.' |
+ |
+ |
+class InterruptState(object): |
+ def __init__(self): |
+ self.interrupted = False |
+ |
+ def set(self): |
+ self.interrupted = True |
+ |
+ def __call__(self): |
+ return self.interrupted |
+ |
+ |
+class Jobs(object): |
+ """An instance of this class initializes N jobs, and provides |
+ methods for starting, stopping, and waiting on all N jobs. |
+ """ |
+ |
+ def __init__(self, num, taskmaster): |
+ """ |
+ create 'num' jobs using the given taskmaster. |
+ |
+ If 'num' is 1 or less, then a serial job will be used, |
+ otherwise a parallel job with 'num' worker threads will |
+ be used. |
+ |
+ The 'num_jobs' attribute will be set to the actual number of jobs |
+ allocated. If more than one job is requested but the Parallel |
+ class can't do it, it gets reset to 1. Wrapping interfaces that |
+ care should check the value of 'num_jobs' after initialization. |
+ """ |
+ |
+ self.job = None |
+ if num > 1: |
+ stack_size = explicit_stack_size |
+ if stack_size is None: |
+ stack_size = default_stack_size |
+ |
+ try: |
+ self.job = Parallel(taskmaster, num, stack_size) |
+ self.num_jobs = num |
+ except NameError: |
+ pass |
+ if self.job is None: |
+ self.job = Serial(taskmaster) |
+ self.num_jobs = 1 |
+ |
+ def run(self, postfunc=lambda: None): |
+ """Run the jobs. |
+ |
+ postfunc() will be invoked after the jobs has run. It will be |
+ invoked even if the jobs are interrupted by a keyboard |
+ interrupt (well, in fact by a signal such as either SIGINT, |
+ SIGTERM or SIGHUP). The execution of postfunc() is protected |
+ against keyboard interrupts and is guaranteed to run to |
+ completion.""" |
+ self._setup_sig_handler() |
+ try: |
+ self.job.start() |
+ finally: |
+ postfunc() |
+ self._reset_sig_handler() |
+ |
+ def were_interrupted(self): |
+ """Returns whether the jobs were interrupted by a signal.""" |
+ return self.job.interrupted() |
+ |
+ def _setup_sig_handler(self): |
+ """Setup an interrupt handler so that SCons can shutdown cleanly in |
+ various conditions: |
+ |
+ a) SIGINT: Keyboard interrupt |
+ b) SIGTERM: kill or system shutdown |
+ c) SIGHUP: Controlling shell exiting |
+ |
+ We handle all of these cases by stopping the taskmaster. It |
+ turns out that it very difficult to stop the build process |
+ by throwing asynchronously an exception such as |
+ KeyboardInterrupt. For example, the python Condition |
+ variables (threading.Condition) and queue's do not seem to |
+ asynchronous-exception-safe. It would require adding a whole |
+ bunch of try/finally block and except KeyboardInterrupt all |
+ over the place. |
+ |
+ Note also that we have to be careful to handle the case when |
+ SCons forks before executing another process. In that case, we |
+ want the child to exit immediately. |
+ """ |
+ def handler(signum, stack, self=self, parentpid=os.getpid()): |
+ if os.getpid() == parentpid: |
+ self.job.taskmaster.stop() |
+ self.job.interrupted.set() |
+ else: |
+ os._exit(2) |
+ |
+ self.old_sigint = signal.signal(signal.SIGINT, handler) |
+ self.old_sigterm = signal.signal(signal.SIGTERM, handler) |
+ try: |
+ self.old_sighup = signal.signal(signal.SIGHUP, handler) |
+ except AttributeError: |
+ pass |
+ |
+ def _reset_sig_handler(self): |
+ """Restore the signal handlers to their previous state (before the |
+ call to _setup_sig_handler().""" |
+ |
+ signal.signal(signal.SIGINT, self.old_sigint) |
+ signal.signal(signal.SIGTERM, self.old_sigterm) |
+ try: |
+ signal.signal(signal.SIGHUP, self.old_sighup) |
+ except AttributeError: |
+ pass |
+ |
+class Serial(object): |
+ """This class is used to execute tasks in series, and is more efficient |
+ than Parallel, but is only appropriate for non-parallel builds. Only |
+ one instance of this class should be in existence at a time. |
+ |
+ This class is not thread safe. |
+ """ |
+ |
+ def __init__(self, taskmaster): |
+ """Create a new serial job given a taskmaster. |
+ |
+ The taskmaster's next_task() method should return the next task |
+ that needs to be executed, or None if there are no more tasks. The |
+ taskmaster's executed() method will be called for each task when it |
+ is successfully executed or failed() will be called if it failed to |
+ execute (e.g. execute() raised an exception).""" |
+ |
+ self.taskmaster = taskmaster |
+ self.interrupted = InterruptState() |
+ |
+ def start(self): |
+ """Start the job. This will begin pulling tasks from the taskmaster |
+ and executing them, and return when there are no more tasks. If a task |
+ fails to execute (i.e. execute() raises an exception), then the job will |
+ stop.""" |
+ |
+ while True: |
+ task = self.taskmaster.next_task() |
+ |
+ if task is None: |
+ break |
+ |
+ try: |
+ task.prepare() |
+ if task.needs_execute(): |
+ task.execute() |
+ except: |
+ if self.interrupted(): |
+ try: |
+ raise SCons.Errors.BuildError( |
+ task.targets[0], errstr=interrupt_msg) |
+ except: |
+ task.exception_set() |
+ else: |
+ task.exception_set() |
+ |
+ # Let the failed() callback function arrange for the |
+ # build to stop if that's appropriate. |
+ task.failed() |
+ else: |
+ task.executed() |
+ |
+ task.postprocess() |
+ self.taskmaster.cleanup() |
+ |
+ |
+# Trap import failure so that everything in the Job module but the |
+# Parallel class (and its dependent classes) will work if the interpreter |
+# doesn't support threads. |
+try: |
+ import queue |
+ import threading |
+except ImportError: |
+ pass |
+else: |
+ class Worker(threading.Thread): |
+ """A worker thread waits on a task to be posted to its request queue, |
+ dequeues the task, executes it, and posts a tuple including the task |
+ and a boolean indicating whether the task executed successfully. """ |
+ |
+ def __init__(self, requestQueue, resultsQueue, interrupted): |
+ threading.Thread.__init__(self) |
+ self.setDaemon(1) |
+ self.requestQueue = requestQueue |
+ self.resultsQueue = resultsQueue |
+ self.interrupted = interrupted |
+ self.start() |
+ |
+ def run(self): |
+ while True: |
+ task = self.requestQueue.get() |
+ |
+ if task is None: |
+ # The "None" value is used as a sentinel by |
+ # ThreadPool.cleanup(). This indicates that there |
+ # are no more tasks, so we should quit. |
+ break |
+ |
+ try: |
+ if self.interrupted(): |
+ raise SCons.Errors.BuildError( |
+ task.targets[0], errstr=interrupt_msg) |
+ task.execute() |
+ except: |
+ task.exception_set() |
+ ok = False |
+ else: |
+ ok = True |
+ |
+ self.resultsQueue.put((task, ok)) |
+ |
+ class ThreadPool(object): |
+ """This class is responsible for spawning and managing worker threads.""" |
+ |
+ def __init__(self, num, stack_size, interrupted): |
+ """Create the request and reply queues, and 'num' worker threads. |
+ |
+ One must specify the stack size of the worker threads. The |
+ stack size is specified in kilobytes. |
+ """ |
+ self.requestQueue = queue.Queue(0) |
+ self.resultsQueue = queue.Queue(0) |
+ |
+ try: |
+ prev_size = threading.stack_size(stack_size*1024) |
+ except AttributeError, e: |
+ # Only print a warning if the stack size has been |
+ # explicitly set. |
+ if not explicit_stack_size is None: |
+ msg = "Setting stack size is unsupported by this version of Python:\n " + \ |
+ e.args[0] |
+ SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) |
+ except ValueError, e: |
+ msg = "Setting stack size failed:\n " + str(e) |
+ SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) |
+ |
+ # Create worker threads |
+ self.workers = [] |
+ for _ in range(num): |
+ worker = Worker(self.requestQueue, self.resultsQueue, interrupted) |
+ self.workers.append(worker) |
+ |
+ if 'prev_size' in locals(): |
+ threading.stack_size(prev_size) |
+ |
+ def put(self, task): |
+ """Put task into request queue.""" |
+ self.requestQueue.put(task) |
+ |
+ def get(self): |
+ """Remove and return a result tuple from the results queue.""" |
+ return self.resultsQueue.get() |
+ |
+ def preparation_failed(self, task): |
+ self.resultsQueue.put((task, False)) |
+ |
+ def cleanup(self): |
+ """ |
+ Shuts down the thread pool, giving each worker thread a |
+ chance to shut down gracefully. |
+ """ |
+ # For each worker thread, put a sentinel "None" value |
+ # on the requestQueue (indicating that there's no work |
+ # to be done) so that each worker thread will get one and |
+ # terminate gracefully. |
+ for _ in self.workers: |
+ self.requestQueue.put(None) |
+ |
+ # Wait for all of the workers to terminate. |
+ # |
+ # If we don't do this, later Python versions (2.4, 2.5) often |
+ # seem to raise exceptions during shutdown. This happens |
+ # in requestQueue.get(), as an assertion failure that |
+ # requestQueue.not_full is notified while not acquired, |
+ # seemingly because the main thread has shut down (or is |
+ # in the process of doing so) while the workers are still |
+ # trying to pull sentinels off the requestQueue. |
+ # |
+ # Normally these terminations should happen fairly quickly, |
+ # but we'll stick a one-second timeout on here just in case |
+ # someone gets hung. |
+ for worker in self.workers: |
+ worker.join(1.0) |
+ self.workers = [] |
+ |
+ class Parallel(object): |
+ """This class is used to execute tasks in parallel, and is somewhat |
+ less efficient than Serial, but is appropriate for parallel builds. |
+ |
+ This class is thread safe. |
+ """ |
+ |
+ def __init__(self, taskmaster, num, stack_size): |
+ """Create a new parallel job given a taskmaster. |
+ |
+ The taskmaster's next_task() method should return the next |
+ task that needs to be executed, or None if there are no more |
+ tasks. The taskmaster's executed() method will be called |
+ for each task when it is successfully executed or failed() |
+ will be called if the task failed to execute (i.e. execute() |
+ raised an exception). |
+ |
+ Note: calls to taskmaster are serialized, but calls to |
+ execute() on distinct tasks are not serialized, because |
+ that is the whole point of parallel jobs: they can execute |
+ multiple tasks simultaneously. """ |
+ |
+ self.taskmaster = taskmaster |
+ self.interrupted = InterruptState() |
+ self.tp = ThreadPool(num, stack_size, self.interrupted) |
+ |
+ self.maxjobs = num |
+ |
+ def start(self): |
+ """Start the job. This will begin pulling tasks from the |
+ taskmaster and executing them, and return when there are no |
+ more tasks. If a task fails to execute (i.e. execute() raises |
+ an exception), then the job will stop.""" |
+ |
+ jobs = 0 |
+ |
+ while True: |
+ # Start up as many available tasks as we're |
+ # allowed to. |
+ while jobs < self.maxjobs: |
+ task = self.taskmaster.next_task() |
+ if task is None: |
+ break |
+ |
+ try: |
+ # prepare task for execution |
+ task.prepare() |
+ except: |
+ task.exception_set() |
+ task.failed() |
+ task.postprocess() |
+ else: |
+ if task.needs_execute(): |
+ # dispatch task |
+ self.tp.put(task) |
+ jobs = jobs + 1 |
+ else: |
+ task.executed() |
+ task.postprocess() |
+ |
+ if not task and not jobs: break |
+ |
+ # Let any/all completed tasks finish up before we go |
+ # back and put the next batch of tasks on the queue. |
+ while True: |
+ task, ok = self.tp.get() |
+ jobs = jobs - 1 |
+ |
+ if ok: |
+ task.executed() |
+ else: |
+ if self.interrupted(): |
+ try: |
+ raise SCons.Errors.BuildError( |
+ task.targets[0], errstr=interrupt_msg) |
+ except: |
+ task.exception_set() |
+ |
+ # Let the failed() callback function arrange |
+ # for the build to stop if that's appropriate. |
+ task.failed() |
+ |
+ task.postprocess() |
+ |
+ if self.tp.resultsQueue.empty(): |
+ break |
+ |
+ self.tp.cleanup() |
+ self.taskmaster.cleanup() |
+ |
+# Local Variables: |
+# tab-width:4 |
+# indent-tabs-mode:nil |
+# End: |
+# vim: set expandtab tabstop=4 shiftwidth=4: |
Property changes on: scons-2.0.1/engine/SCons/Job.py |
___________________________________________________________________ |
Added: svn:eol-style |
+ LF |