OLD | NEW |
(Empty) | |
| 1 """SCons.Job |
| 2 |
| 3 This module defines the Serial and Parallel classes that execute tasks to |
| 4 complete a build. The Jobs class provides a higher level interface to start, |
| 5 stop, and wait on jobs. |
| 6 |
| 7 """ |
| 8 |
| 9 # |
| 10 # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 The S
Cons Foundation |
| 11 # |
| 12 # Permission is hereby granted, free of charge, to any person obtaining |
| 13 # a copy of this software and associated documentation files (the |
| 14 # "Software"), to deal in the Software without restriction, including |
| 15 # without limitation the rights to use, copy, modify, merge, publish, |
| 16 # distribute, sublicense, and/or sell copies of the Software, and to |
| 17 # permit persons to whom the Software is furnished to do so, subject to |
| 18 # the following conditions: |
| 19 # |
| 20 # The above copyright notice and this permission notice shall be included |
| 21 # in all copies or substantial portions of the Software. |
| 22 # |
| 23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY |
| 24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE |
| 25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| 26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE |
| 27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
| 28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
| 29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| 30 # |
| 31 |
| 32 __revision__ = "src/engine/SCons/Job.py 5134 2010/08/16 23:02:40 bdeegan" |
| 33 |
| 34 import SCons.compat |
| 35 |
| 36 import os |
| 37 import signal |
| 38 |
| 39 import SCons.Errors |
| 40 |
| 41 # The default stack size (in kilobytes) of the threads used to execute |
| 42 # jobs in parallel. |
| 43 # |
| 44 # We use a stack size of 256 kilobytes. The default on some platforms |
| 45 # is too large and prevents us from creating enough threads to fully |
| 46 # parallelized the build. For example, the default stack size on linux |
| 47 # is 8 MBytes. |
| 48 |
| 49 explicit_stack_size = None |
| 50 default_stack_size = 256 |
| 51 |
| 52 interrupt_msg = 'Build interrupted.' |
| 53 |
| 54 |
| 55 class InterruptState(object): |
| 56 def __init__(self): |
| 57 self.interrupted = False |
| 58 |
| 59 def set(self): |
| 60 self.interrupted = True |
| 61 |
| 62 def __call__(self): |
| 63 return self.interrupted |
| 64 |
| 65 |
| 66 class Jobs(object): |
| 67 """An instance of this class initializes N jobs, and provides |
| 68 methods for starting, stopping, and waiting on all N jobs. |
| 69 """ |
| 70 |
| 71 def __init__(self, num, taskmaster): |
| 72 """ |
| 73 create 'num' jobs using the given taskmaster. |
| 74 |
| 75 If 'num' is 1 or less, then a serial job will be used, |
| 76 otherwise a parallel job with 'num' worker threads will |
| 77 be used. |
| 78 |
| 79 The 'num_jobs' attribute will be set to the actual number of jobs |
| 80 allocated. If more than one job is requested but the Parallel |
| 81 class can't do it, it gets reset to 1. Wrapping interfaces that |
| 82 care should check the value of 'num_jobs' after initialization. |
| 83 """ |
| 84 |
| 85 self.job = None |
| 86 if num > 1: |
| 87 stack_size = explicit_stack_size |
| 88 if stack_size is None: |
| 89 stack_size = default_stack_size |
| 90 |
| 91 try: |
| 92 self.job = Parallel(taskmaster, num, stack_size) |
| 93 self.num_jobs = num |
| 94 except NameError: |
| 95 pass |
| 96 if self.job is None: |
| 97 self.job = Serial(taskmaster) |
| 98 self.num_jobs = 1 |
| 99 |
| 100 def run(self, postfunc=lambda: None): |
| 101 """Run the jobs. |
| 102 |
| 103 postfunc() will be invoked after the jobs has run. It will be |
| 104 invoked even if the jobs are interrupted by a keyboard |
| 105 interrupt (well, in fact by a signal such as either SIGINT, |
| 106 SIGTERM or SIGHUP). The execution of postfunc() is protected |
| 107 against keyboard interrupts and is guaranteed to run to |
| 108 completion.""" |
| 109 self._setup_sig_handler() |
| 110 try: |
| 111 self.job.start() |
| 112 finally: |
| 113 postfunc() |
| 114 self._reset_sig_handler() |
| 115 |
| 116 def were_interrupted(self): |
| 117 """Returns whether the jobs were interrupted by a signal.""" |
| 118 return self.job.interrupted() |
| 119 |
| 120 def _setup_sig_handler(self): |
| 121 """Setup an interrupt handler so that SCons can shutdown cleanly in |
| 122 various conditions: |
| 123 |
| 124 a) SIGINT: Keyboard interrupt |
| 125 b) SIGTERM: kill or system shutdown |
| 126 c) SIGHUP: Controlling shell exiting |
| 127 |
| 128 We handle all of these cases by stopping the taskmaster. It |
| 129 turns out that it very difficult to stop the build process |
| 130 by throwing asynchronously an exception such as |
| 131 KeyboardInterrupt. For example, the python Condition |
| 132 variables (threading.Condition) and queue's do not seem to |
| 133 asynchronous-exception-safe. It would require adding a whole |
| 134 bunch of try/finally block and except KeyboardInterrupt all |
| 135 over the place. |
| 136 |
| 137 Note also that we have to be careful to handle the case when |
| 138 SCons forks before executing another process. In that case, we |
| 139 want the child to exit immediately. |
| 140 """ |
| 141 def handler(signum, stack, self=self, parentpid=os.getpid()): |
| 142 if os.getpid() == parentpid: |
| 143 self.job.taskmaster.stop() |
| 144 self.job.interrupted.set() |
| 145 else: |
| 146 os._exit(2) |
| 147 |
| 148 self.old_sigint = signal.signal(signal.SIGINT, handler) |
| 149 self.old_sigterm = signal.signal(signal.SIGTERM, handler) |
| 150 try: |
| 151 self.old_sighup = signal.signal(signal.SIGHUP, handler) |
| 152 except AttributeError: |
| 153 pass |
| 154 |
| 155 def _reset_sig_handler(self): |
| 156 """Restore the signal handlers to their previous state (before the |
| 157 call to _setup_sig_handler().""" |
| 158 |
| 159 signal.signal(signal.SIGINT, self.old_sigint) |
| 160 signal.signal(signal.SIGTERM, self.old_sigterm) |
| 161 try: |
| 162 signal.signal(signal.SIGHUP, self.old_sighup) |
| 163 except AttributeError: |
| 164 pass |
| 165 |
| 166 class Serial(object): |
| 167 """This class is used to execute tasks in series, and is more efficient |
| 168 than Parallel, but is only appropriate for non-parallel builds. Only |
| 169 one instance of this class should be in existence at a time. |
| 170 |
| 171 This class is not thread safe. |
| 172 """ |
| 173 |
| 174 def __init__(self, taskmaster): |
| 175 """Create a new serial job given a taskmaster. |
| 176 |
| 177 The taskmaster's next_task() method should return the next task |
| 178 that needs to be executed, or None if there are no more tasks. The |
| 179 taskmaster's executed() method will be called for each task when it |
| 180 is successfully executed or failed() will be called if it failed to |
| 181 execute (e.g. execute() raised an exception).""" |
| 182 |
| 183 self.taskmaster = taskmaster |
| 184 self.interrupted = InterruptState() |
| 185 |
| 186 def start(self): |
| 187 """Start the job. This will begin pulling tasks from the taskmaster |
| 188 and executing them, and return when there are no more tasks. If a task |
| 189 fails to execute (i.e. execute() raises an exception), then the job will |
| 190 stop.""" |
| 191 |
| 192 while True: |
| 193 task = self.taskmaster.next_task() |
| 194 |
| 195 if task is None: |
| 196 break |
| 197 |
| 198 try: |
| 199 task.prepare() |
| 200 if task.needs_execute(): |
| 201 task.execute() |
| 202 except: |
| 203 if self.interrupted(): |
| 204 try: |
| 205 raise SCons.Errors.BuildError( |
| 206 task.targets[0], errstr=interrupt_msg) |
| 207 except: |
| 208 task.exception_set() |
| 209 else: |
| 210 task.exception_set() |
| 211 |
| 212 # Let the failed() callback function arrange for the |
| 213 # build to stop if that's appropriate. |
| 214 task.failed() |
| 215 else: |
| 216 task.executed() |
| 217 |
| 218 task.postprocess() |
| 219 self.taskmaster.cleanup() |
| 220 |
| 221 |
| 222 # Trap import failure so that everything in the Job module but the |
| 223 # Parallel class (and its dependent classes) will work if the interpreter |
| 224 # doesn't support threads. |
| 225 try: |
| 226 import queue |
| 227 import threading |
| 228 except ImportError: |
| 229 pass |
| 230 else: |
| 231 class Worker(threading.Thread): |
| 232 """A worker thread waits on a task to be posted to its request queue, |
| 233 dequeues the task, executes it, and posts a tuple including the task |
| 234 and a boolean indicating whether the task executed successfully. """ |
| 235 |
| 236 def __init__(self, requestQueue, resultsQueue, interrupted): |
| 237 threading.Thread.__init__(self) |
| 238 self.setDaemon(1) |
| 239 self.requestQueue = requestQueue |
| 240 self.resultsQueue = resultsQueue |
| 241 self.interrupted = interrupted |
| 242 self.start() |
| 243 |
| 244 def run(self): |
| 245 while True: |
| 246 task = self.requestQueue.get() |
| 247 |
| 248 if task is None: |
| 249 # The "None" value is used as a sentinel by |
| 250 # ThreadPool.cleanup(). This indicates that there |
| 251 # are no more tasks, so we should quit. |
| 252 break |
| 253 |
| 254 try: |
| 255 if self.interrupted(): |
| 256 raise SCons.Errors.BuildError( |
| 257 task.targets[0], errstr=interrupt_msg) |
| 258 task.execute() |
| 259 except: |
| 260 task.exception_set() |
| 261 ok = False |
| 262 else: |
| 263 ok = True |
| 264 |
| 265 self.resultsQueue.put((task, ok)) |
| 266 |
| 267 class ThreadPool(object): |
| 268 """This class is responsible for spawning and managing worker threads.""
" |
| 269 |
| 270 def __init__(self, num, stack_size, interrupted): |
| 271 """Create the request and reply queues, and 'num' worker threads. |
| 272 |
| 273 One must specify the stack size of the worker threads. The |
| 274 stack size is specified in kilobytes. |
| 275 """ |
| 276 self.requestQueue = queue.Queue(0) |
| 277 self.resultsQueue = queue.Queue(0) |
| 278 |
| 279 try: |
| 280 prev_size = threading.stack_size(stack_size*1024) |
| 281 except AttributeError, e: |
| 282 # Only print a warning if the stack size has been |
| 283 # explicitly set. |
| 284 if not explicit_stack_size is None: |
| 285 msg = "Setting stack size is unsupported by this version of
Python:\n " + \ |
| 286 e.args[0] |
| 287 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) |
| 288 except ValueError, e: |
| 289 msg = "Setting stack size failed:\n " + str(e) |
| 290 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) |
| 291 |
| 292 # Create worker threads |
| 293 self.workers = [] |
| 294 for _ in range(num): |
| 295 worker = Worker(self.requestQueue, self.resultsQueue, interrupte
d) |
| 296 self.workers.append(worker) |
| 297 |
| 298 if 'prev_size' in locals(): |
| 299 threading.stack_size(prev_size) |
| 300 |
| 301 def put(self, task): |
| 302 """Put task into request queue.""" |
| 303 self.requestQueue.put(task) |
| 304 |
| 305 def get(self): |
| 306 """Remove and return a result tuple from the results queue.""" |
| 307 return self.resultsQueue.get() |
| 308 |
| 309 def preparation_failed(self, task): |
| 310 self.resultsQueue.put((task, False)) |
| 311 |
| 312 def cleanup(self): |
| 313 """ |
| 314 Shuts down the thread pool, giving each worker thread a |
| 315 chance to shut down gracefully. |
| 316 """ |
| 317 # For each worker thread, put a sentinel "None" value |
| 318 # on the requestQueue (indicating that there's no work |
| 319 # to be done) so that each worker thread will get one and |
| 320 # terminate gracefully. |
| 321 for _ in self.workers: |
| 322 self.requestQueue.put(None) |
| 323 |
| 324 # Wait for all of the workers to terminate. |
| 325 # |
| 326 # If we don't do this, later Python versions (2.4, 2.5) often |
| 327 # seem to raise exceptions during shutdown. This happens |
| 328 # in requestQueue.get(), as an assertion failure that |
| 329 # requestQueue.not_full is notified while not acquired, |
| 330 # seemingly because the main thread has shut down (or is |
| 331 # in the process of doing so) while the workers are still |
| 332 # trying to pull sentinels off the requestQueue. |
| 333 # |
| 334 # Normally these terminations should happen fairly quickly, |
| 335 # but we'll stick a one-second timeout on here just in case |
| 336 # someone gets hung. |
| 337 for worker in self.workers: |
| 338 worker.join(1.0) |
| 339 self.workers = [] |
| 340 |
| 341 class Parallel(object): |
| 342 """This class is used to execute tasks in parallel, and is somewhat |
| 343 less efficient than Serial, but is appropriate for parallel builds. |
| 344 |
| 345 This class is thread safe. |
| 346 """ |
| 347 |
| 348 def __init__(self, taskmaster, num, stack_size): |
| 349 """Create a new parallel job given a taskmaster. |
| 350 |
| 351 The taskmaster's next_task() method should return the next |
| 352 task that needs to be executed, or None if there are no more |
| 353 tasks. The taskmaster's executed() method will be called |
| 354 for each task when it is successfully executed or failed() |
| 355 will be called if the task failed to execute (i.e. execute() |
| 356 raised an exception). |
| 357 |
| 358 Note: calls to taskmaster are serialized, but calls to |
| 359 execute() on distinct tasks are not serialized, because |
| 360 that is the whole point of parallel jobs: they can execute |
| 361 multiple tasks simultaneously. """ |
| 362 |
| 363 self.taskmaster = taskmaster |
| 364 self.interrupted = InterruptState() |
| 365 self.tp = ThreadPool(num, stack_size, self.interrupted) |
| 366 |
| 367 self.maxjobs = num |
| 368 |
| 369 def start(self): |
| 370 """Start the job. This will begin pulling tasks from the |
| 371 taskmaster and executing them, and return when there are no |
| 372 more tasks. If a task fails to execute (i.e. execute() raises |
| 373 an exception), then the job will stop.""" |
| 374 |
| 375 jobs = 0 |
| 376 |
| 377 while True: |
| 378 # Start up as many available tasks as we're |
| 379 # allowed to. |
| 380 while jobs < self.maxjobs: |
| 381 task = self.taskmaster.next_task() |
| 382 if task is None: |
| 383 break |
| 384 |
| 385 try: |
| 386 # prepare task for execution |
| 387 task.prepare() |
| 388 except: |
| 389 task.exception_set() |
| 390 task.failed() |
| 391 task.postprocess() |
| 392 else: |
| 393 if task.needs_execute(): |
| 394 # dispatch task |
| 395 self.tp.put(task) |
| 396 jobs = jobs + 1 |
| 397 else: |
| 398 task.executed() |
| 399 task.postprocess() |
| 400 |
| 401 if not task and not jobs: break |
| 402 |
| 403 # Let any/all completed tasks finish up before we go |
| 404 # back and put the next batch of tasks on the queue. |
| 405 while True: |
| 406 task, ok = self.tp.get() |
| 407 jobs = jobs - 1 |
| 408 |
| 409 if ok: |
| 410 task.executed() |
| 411 else: |
| 412 if self.interrupted(): |
| 413 try: |
| 414 raise SCons.Errors.BuildError( |
| 415 task.targets[0], errstr=interrupt_msg) |
| 416 except: |
| 417 task.exception_set() |
| 418 |
| 419 # Let the failed() callback function arrange |
| 420 # for the build to stop if that's appropriate. |
| 421 task.failed() |
| 422 |
| 423 task.postprocess() |
| 424 |
| 425 if self.tp.resultsQueue.empty(): |
| 426 break |
| 427 |
| 428 self.tp.cleanup() |
| 429 self.taskmaster.cleanup() |
| 430 |
| 431 # Local Variables: |
| 432 # tab-width:4 |
| 433 # indent-tabs-mode:nil |
| 434 # End: |
| 435 # vim: set expandtab tabstop=4 shiftwidth=4: |
OLD | NEW |