OLD | NEW |
1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
6 | 6 |
7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
9 | 9 |
10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
107 '--cache', os.path.join(bot_dir, 'cache'), | 107 '--cache', os.path.join(bot_dir, 'cache'), |
108 '--root-dir', os.path.join(work_dir, 'isolated'), | 108 '--root-dir', os.path.join(work_dir, 'isolated'), |
109 ]) | 109 ]) |
110 if min_free_space: | 110 if min_free_space: |
111 cmd.extend(('--min-free-space', str(min_free_space))) | 111 cmd.extend(('--min-free-space', str(min_free_space))) |
112 | 112 |
113 if task_details.hard_timeout: | 113 if task_details.hard_timeout: |
114 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) | 114 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) |
115 if task_details.grace_period: | 115 if task_details.grace_period: |
116 cmd.extend(('--grace-period', str(task_details.grace_period))) | 116 cmd.extend(('--grace-period', str(task_details.grace_period))) |
117 if task_details.extra_args: | 117 |
118 cmd.append('--') | 118 cmd.append('--') |
| 119 if task_details.command: |
| 120 cmd.extend(task_details.command) |
| 121 elif task_details.extra_args: |
119 cmd.extend(task_details.extra_args) | 122 cmd.extend(task_details.extra_args) |
120 return cmd | 123 return cmd |
121 | 124 |
122 | 125 |
123 class TaskDetails(object): | 126 class TaskDetails(object): |
124 def __init__(self, data): | 127 def __init__(self, data): |
125 """Loads the raw data from a manifest file specified by --in-file.""" | 128 """Loads the raw data from a manifest file specified by --in-file.""" |
126 logging.info('TaskDetails(%s)', data) | 129 logging.info('TaskDetails(%s)', data) |
127 if not isinstance(data, dict): | 130 if not isinstance(data, dict): |
128 raise ValueError('Expected dict, got %r' % data) | 131 raise ValueError('Expected dict, got %r' % data) |
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
292 # TODO(maruel): This function is incomprehensible, split and refactor. | 295 # TODO(maruel): This function is incomprehensible, split and refactor. |
293 # Signal the command is about to be started. | 296 # Signal the command is about to be started. |
294 last_packet = start = now = monotonic_time() | 297 last_packet = start = now = monotonic_time() |
295 params = { | 298 params = { |
296 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., | 299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., |
297 'id': task_details.bot_id, | 300 'id': task_details.bot_id, |
298 'task_id': task_details.task_id, | 301 'task_id': task_details.task_id, |
299 } | 302 } |
300 post_update(swarming_server, params, None, '', 0) | 303 post_update(swarming_server, params, None, '', 0) |
301 | 304 |
302 if task_details.command: | 305 isolated_result = os.path.join(work_dir, 'isolated_result.json') |
303 # Raw command. | 306 cmd = get_isolated_cmd( |
304 cmd = task_details.command | 307 work_dir, task_details, isolated_result, min_free_space) |
305 isolated_result = None | 308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to |
306 else: | 309 # give one 'grace_period' slot to the child process and one slot to upload |
307 # Isolated task. | 310 # the results back. |
308 isolated_result = os.path.join(work_dir, 'isolated_result.json') | 311 task_details.hard_timeout = 0 |
309 cmd = get_isolated_cmd( | 312 if task_details.grace_period: |
310 work_dir, task_details, isolated_result, min_free_space) | 313 task_details.grace_period *= 2 |
311 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to | |
312 # give one 'grace_period' slot to the child process and one slot to upload | |
313 # the results back. | |
314 task_details.hard_timeout = 0 | |
315 if task_details.grace_period: | |
316 task_details.grace_period *= 2 | |
317 | 314 |
318 try: | 315 try: |
319 # TODO(maruel): Support both channels independently and display stderr in | 316 # TODO(maruel): Support both channels independently and display stderr in |
320 # red. | 317 # red. |
321 env = None | 318 env = None |
322 if task_details.env: | 319 if task_details.env: |
323 env = os.environ.copy() | 320 env = os.environ.copy() |
324 for key, value in task_details.env.iteritems(): | 321 for key, value in task_details.env.iteritems(): |
325 if not value: | 322 if not value: |
326 env.pop(key, None) | 323 env.pop(key, None) |
(...skipping 23 matching lines...) Expand all Loading... |
350 u'exit_code': 1, | 347 u'exit_code': 1, |
351 u'hard_timeout': False, | 348 u'hard_timeout': False, |
352 u'io_timeout': False, | 349 u'io_timeout': False, |
353 u'must_signal_internal_failure': None, | 350 u'must_signal_internal_failure': None, |
354 u'version': OUT_VERSION, | 351 u'version': OUT_VERSION, |
355 } | 352 } |
356 | 353 |
357 output_chunk_start = 0 | 354 output_chunk_start = 0 |
358 stdout = '' | 355 stdout = '' |
359 exit_code = None | 356 exit_code = None |
360 had_hard_timeout = False | |
361 had_io_timeout = False | 357 had_io_timeout = False |
362 must_signal_internal_failure = None | 358 must_signal_internal_failure = None |
363 kill_sent = False | 359 kill_sent = False |
364 timed_out = None | 360 timed_out = None |
365 try: | 361 try: |
366 calc = lambda: calc_yield_wait( | 362 calc = lambda: calc_yield_wait( |
367 task_details, start, last_io, timed_out, stdout) | 363 task_details, start, last_io, timed_out, stdout) |
368 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) | 364 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) |
369 last_io = monotonic_time() | 365 last_io = monotonic_time() |
370 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): | 366 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): |
(...skipping 14 matching lines...) Expand all Loading... |
385 # Send signal on timeout if necessary. Both are failures, not | 381 # Send signal on timeout if necessary. Both are failures, not |
386 # internal_failures. | 382 # internal_failures. |
387 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. | 383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. |
388 if not timed_out: | 384 if not timed_out: |
389 if (task_details.io_timeout and | 385 if (task_details.io_timeout and |
390 now - last_io > task_details.io_timeout): | 386 now - last_io > task_details.io_timeout): |
391 had_io_timeout = True | 387 had_io_timeout = True |
392 logging.warning('I/O timeout; sending SIGTERM') | 388 logging.warning('I/O timeout; sending SIGTERM') |
393 proc.terminate() | 389 proc.terminate() |
394 timed_out = monotonic_time() | 390 timed_out = monotonic_time() |
395 elif (task_details.hard_timeout and | |
396 now - start > task_details.hard_timeout): | |
397 had_hard_timeout = True | |
398 logging.warning('Hard timeout; sending SIGTERM') | |
399 proc.terminate() | |
400 timed_out = monotonic_time() | |
401 else: | 391 else: |
402 # During grace period. | 392 # During grace period. |
403 if not kill_sent and now >= timed_out + task_details.grace_period: | 393 if not kill_sent and now >= timed_out + task_details.grace_period: |
404 # Now kill for real. The user can distinguish between the following | 394 # Now kill for real. The user can distinguish between the following |
405 # states: | 395 # states: |
406 # - signal but process exited within grace period, | 396 # - signal but process exited within grace period, |
407 # (hard_|io_)_timed_out will be set but the process exit code will | 397 # (hard_|io_)_timed_out will be set but the process exit code will |
408 # be script provided. | 398 # be script provided. |
409 # - processed exited late, exit code will be -9 on posix. | 399 # - processed exited late, exit code will be -9 on posix. |
410 logging.warning('Grace exhausted; sending SIGKILL') | 400 logging.warning('Grace exhausted; sending SIGKILL') |
411 proc.kill() | 401 proc.kill() |
412 kill_sent = True | 402 kill_sent = True |
413 logging.info('Waiting for proces exit') | 403 logging.info('Waiting for proces exit') |
414 exit_code = proc.wait() | 404 exit_code = proc.wait() |
415 except MustExit as e: | 405 except MustExit as e: |
416 # TODO(maruel): Do the send SIGTERM to child process and give it | 406 # TODO(maruel): Do the send SIGTERM to child process and give it |
417 # task_details.grace_period to terminate. | 407 # task_details.grace_period to terminate. |
418 must_signal_internal_failure = ( | 408 must_signal_internal_failure = ( |
419 u'task_runner received signal %s' % e.signal) | 409 u'task_runner received signal %s' % e.signal) |
420 exit_code = kill_and_wait( | 410 exit_code = kill_and_wait( |
421 proc, task_details.grace_period, 'signal %d' % e.signal) | 411 proc, task_details.grace_period, 'signal %d' % e.signal) |
422 except (IOError, OSError): | 412 except (IOError, OSError): |
423 # Something wrong happened, try to kill the child process. | 413 # Something wrong happened, try to kill the child process. |
424 had_hard_timeout = True | |
425 exit_code = kill_and_wait( | 414 exit_code = kill_and_wait( |
426 proc, task_details.grace_period, 'exception %s' % e) | 415 proc, task_details.grace_period, 'exception %s' % e) |
427 | 416 |
428 # This is the very last packet for this command. It if was an isolated task, | 417 # This is the very last packet for this command. It if was an isolated task, |
429 # include the output reference to the archived .isolated file. | 418 # include the output reference to the archived .isolated file. |
430 now = monotonic_time() | 419 now = monotonic_time() |
431 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 420 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
432 params['duration'] = now - start | 421 params['duration'] = now - start |
433 params['io_timeout'] = had_io_timeout | 422 params['io_timeout'] = had_io_timeout |
434 params['hard_timeout'] = had_hard_timeout | 423 had_hard_timeout = False |
435 if isolated_result: | 424 try: |
436 try: | 425 if not os.path.isfile(isolated_result): |
437 if ((had_io_timeout or had_hard_timeout) and | 426 # It's possible if |
438 not os.path.isfile(isolated_result)): | 427 # - run_isolated.py did not start |
439 # It's possible that run_isolated failed to quit quickly enough; it | 428 # - run_isolated.py started, but arguments were invalid |
440 # could be because there was too much data to upload back or something | 429 # - host in a situation unable to fork |
441 # else. Do not create an internal error, just send back the (partial) | 430 # - grand child process outliving the child process deleting everything |
442 # view as task_runner saw it, for example the real exit_code is | 431 # it can |
443 # unknown. | 432 # Do not create an internal error, just send back the (partial) |
444 logging.warning('TIMED_OUT and there\'s no result file') | 433 # view as task_runner saw it, for example the real exit_code is |
| 434 # unknown. |
| 435 logging.warning('there\'s no result file') |
| 436 if exit_code is None: |
445 exit_code = -1 | 437 exit_code = -1 |
446 else: | 438 else: |
447 # See run_isolated.py for the format. | 439 # See run_isolated.py for the format. |
448 with open(isolated_result, 'rb') as f: | 440 with open(isolated_result, 'rb') as f: |
449 run_isolated_result = json.load(f) | 441 run_isolated_result = json.load(f) |
450 logging.debug('run_isolated:\n%s', run_isolated_result) | 442 logging.debug('run_isolated:\n%s', run_isolated_result) |
451 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, | 443 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, |
452 # mapping time, etc) from run_isolated and push them to the server. | 444 # mapping time, etc) from run_isolated and push them to the server. |
453 if run_isolated_result['outputs_ref']: | 445 if run_isolated_result['outputs_ref']: |
454 params['outputs_ref'] = run_isolated_result['outputs_ref'] | 446 params['outputs_ref'] = run_isolated_result['outputs_ref'] |
455 had_hard_timeout = ( | 447 had_hard_timeout = run_isolated_result['had_hard_timeout'] |
456 had_hard_timeout or run_isolated_result['had_hard_timeout']) | 448 if not had_io_timeout and not had_hard_timeout: |
457 params['hard_timeout'] = had_hard_timeout | 449 if run_isolated_result['internal_failure']: |
458 if not had_io_timeout and not had_hard_timeout: | 450 must_signal_internal_failure = ( |
459 if run_isolated_result['internal_failure']: | 451 run_isolated_result['internal_failure']) |
460 must_signal_internal_failure = ( | 452 logging.error('%s', must_signal_internal_failure) |
461 run_isolated_result['internal_failure']) | 453 elif exit_code: |
462 logging.error('%s', must_signal_internal_failure) | 454 # TODO(maruel): Grab stdout from run_isolated. |
463 elif exit_code: | 455 must_signal_internal_failure = ( |
464 # TODO(maruel): Grab stdout from run_isolated. | 456 'run_isolated internal failure %d' % exit_code) |
465 must_signal_internal_failure = ( | 457 logging.error('%s', must_signal_internal_failure) |
466 'run_isolated internal failure %d' % exit_code) | 458 exit_code = run_isolated_result['exit_code'] |
467 logging.error('%s', must_signal_internal_failure) | 459 if run_isolated_result.get('duration') is not None: |
468 exit_code = run_isolated_result['exit_code'] | 460 # Calculate the real task duration as measured by run_isolated and |
469 if run_isolated_result.get('duration') is not None: | 461 # calculate the remaining overhead. |
470 # Calculate the real task duration as measured by run_isolated and | 462 params['bot_overhead'] = params['duration'] |
471 # calculate the remaining overhead. | 463 params['duration'] = run_isolated_result['duration'] |
472 params['bot_overhead'] = params['duration'] | 464 params['bot_overhead'] -= params['duration'] |
473 params['duration'] = run_isolated_result['duration'] | 465 params['bot_overhead'] -= run_isolated_result.get( |
474 params['bot_overhead'] -= params['duration'] | 466 'download', {}).get('duration', 0) |
475 params['bot_overhead'] -= run_isolated_result.get( | 467 params['bot_overhead'] -= run_isolated_result.get( |
476 'download', {}).get('duration', 0) | 468 'upload', {}).get('duration', 0) |
477 params['bot_overhead'] -= run_isolated_result.get( | 469 if params['bot_overhead'] < 0: |
478 'upload', {}).get('duration', 0) | 470 params['bot_overhead'] = 0 |
479 if params['bot_overhead'] < 0: | 471 stats = run_isolated_result.get('stats') |
480 params['bot_overhead'] = 0 | 472 if stats: |
481 stats = run_isolated_result.get('stats') | 473 params['isolated_stats'] = stats |
482 if stats: | 474 except (IOError, OSError, ValueError) as e: |
483 params['isolated_stats'] = stats | 475 logging.error('Swallowing error: %s', e) |
484 except (IOError, OSError, ValueError) as e: | 476 if not must_signal_internal_failure: |
485 logging.error('Swallowing error: %s', e) | 477 must_signal_internal_failure = str(e) |
486 if not must_signal_internal_failure: | |
487 must_signal_internal_failure = str(e) | |
488 # TODO(maruel): Send the internal failure here instead of sending it through | 478 # TODO(maruel): Send the internal failure here instead of sending it through |
489 # bot_main, this causes a race condition. | 479 # bot_main, this causes a race condition. |
490 if exit_code is None: | 480 if exit_code is None: |
491 exit_code = -1 | 481 exit_code = -1 |
| 482 params['hard_timeout'] = had_hard_timeout |
492 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) | 483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) |
493 return { | 484 return { |
494 u'exit_code': exit_code, | 485 u'exit_code': exit_code, |
495 u'hard_timeout': had_hard_timeout, | 486 u'hard_timeout': had_hard_timeout, |
496 u'io_timeout': had_io_timeout, | 487 u'io_timeout': had_io_timeout, |
497 u'must_signal_internal_failure': must_signal_internal_failure, | 488 u'must_signal_internal_failure': must_signal_internal_failure, |
498 u'version': OUT_VERSION, | 489 u'version': OUT_VERSION, |
499 } | 490 } |
500 finally: | 491 finally: |
501 if isolated_result: | 492 try: |
502 try: | 493 os.remove(isolated_result) |
503 os.remove(isolated_result) | 494 except OSError: |
504 except OSError: | 495 pass |
505 pass | |
506 | 496 |
507 | 497 |
508 def main(args): | 498 def main(args): |
509 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | 499 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
510 parser.add_option('--in-file', help='Name of the request file') | 500 parser.add_option('--in-file', help='Name of the request file') |
511 parser.add_option( | 501 parser.add_option( |
512 '--out-file', help='Name of the JSON file to write a task summary to') | 502 '--out-file', help='Name of the JSON file to write a task summary to') |
513 parser.add_option( | 503 parser.add_option( |
514 '--swarming-server', help='Swarming server to send data back') | 504 '--swarming-server', help='Swarming server to send data back') |
515 parser.add_option( | 505 parser.add_option( |
(...skipping 14 matching lines...) Expand all Loading... |
530 if options.start > now: | 520 if options.start > now: |
531 options.start = now | 521 options.start = now |
532 | 522 |
533 try: | 523 try: |
534 load_and_run( | 524 load_and_run( |
535 options.in_file, options.swarming_server, options.cost_usd_hour, | 525 options.in_file, options.swarming_server, options.cost_usd_hour, |
536 options.start, options.out_file, options.min_free_space) | 526 options.start, options.out_file, options.min_free_space) |
537 return 0 | 527 return 0 |
538 finally: | 528 finally: |
539 logging.info('quitting') | 529 logging.info('quitting') |
OLD | NEW |