OLD | NEW |
1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed by the Apache v2.0 license that can be | 2 # Use of this source code is governed by the Apache v2.0 license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
6 | 6 |
7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
9 | 9 |
10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
77 return [sys.executable, THIS_FILE, 'run_isolated'] | 77 return [sys.executable, THIS_FILE, 'run_isolated'] |
78 | 78 |
79 | 79 |
80 def get_isolated_cmd( | 80 def get_isolated_cmd( |
81 work_dir, task_details, isolated_result, min_free_space): | 81 work_dir, task_details, isolated_result, min_free_space): |
82 """Returns the command to call run_isolated. Mocked in tests.""" | 82 """Returns the command to call run_isolated. Mocked in tests.""" |
83 bot_dir = os.path.dirname(work_dir) | 83 bot_dir = os.path.dirname(work_dir) |
84 if os.path.isfile(isolated_result): | 84 if os.path.isfile(isolated_result): |
85 os.remove(isolated_result) | 85 os.remove(isolated_result) |
86 cmd = get_run_isolated() | 86 cmd = get_run_isolated() |
| 87 if task_details.inputs_ref: |
| 88 cmd.extend( |
| 89 [ |
| 90 '--isolated', task_details.inputs_ref['isolated'].encode('utf-8'), |
| 91 '--namespace', task_details.inputs_ref['namespace'].encode('utf-8'), |
| 92 '-I', task_details.inputs_ref['isolatedserver'].encode('utf-8'), |
| 93 ]) |
| 94 |
87 cmd.extend( | 95 cmd.extend( |
88 [ | 96 [ |
89 '--isolated', task_details.inputs_ref['isolated'].encode('utf-8'), | |
90 '--namespace', task_details.inputs_ref['namespace'].encode('utf-8'), | |
91 '-I', task_details.inputs_ref['isolatedserver'].encode('utf-8'), | |
92 '--json', isolated_result, | 97 '--json', isolated_result, |
93 '--log-file', os.path.join(bot_dir, 'logs', 'run_isolated.log'), | 98 '--log-file', os.path.join(bot_dir, 'logs', 'run_isolated.log'), |
94 '--cache', os.path.join(bot_dir, 'cache'), | 99 '--cache', os.path.join(bot_dir, 'cache'), |
95 '--root-dir', os.path.join(work_dir, 'isolated'), | 100 '--root-dir', os.path.join(work_dir, 'isolated'), |
96 ]) | 101 ]) |
97 if min_free_space: | 102 if min_free_space: |
98 cmd.extend(('--min-free-space', str(min_free_space))) | 103 cmd.extend(('--min-free-space', str(min_free_space))) |
99 | 104 |
100 if task_details.hard_timeout: | 105 if task_details.hard_timeout: |
101 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) | 106 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) |
102 if task_details.grace_period: | 107 if task_details.grace_period: |
103 cmd.extend(('--grace-period', str(task_details.grace_period))) | 108 cmd.extend(('--grace-period', str(task_details.grace_period))) |
104 if task_details.extra_args: | 109 |
105 cmd.append('--') | 110 cmd.append('--') |
| 111 if task_details.command: |
| 112 cmd.extend(task_details.command) |
| 113 elif task_details.extra_args: |
106 cmd.extend(task_details.extra_args) | 114 cmd.extend(task_details.extra_args) |
107 return cmd | 115 return cmd |
108 | 116 |
109 | 117 |
110 class TaskDetails(object): | 118 class TaskDetails(object): |
111 def __init__(self, data): | 119 def __init__(self, data): |
112 """Loads the raw data. | 120 """Loads the raw data. |
113 | 121 |
114 It is expected to have at least: | 122 It is expected to have at least: |
115 - bot_id | 123 - bot_id |
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
289 # TODO(maruel): This function is incomprehensible, split and refactor. | 297 # TODO(maruel): This function is incomprehensible, split and refactor. |
290 # Signal the command is about to be started. | 298 # Signal the command is about to be started. |
291 last_packet = start = now = monotonic_time() | 299 last_packet = start = now = monotonic_time() |
292 params = { | 300 params = { |
293 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., | 301 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., |
294 'id': task_details.bot_id, | 302 'id': task_details.bot_id, |
295 'task_id': task_details.task_id, | 303 'task_id': task_details.task_id, |
296 } | 304 } |
297 post_update(swarming_server, params, None, '', 0) | 305 post_update(swarming_server, params, None, '', 0) |
298 | 306 |
299 if task_details.command: | 307 isolated_result = os.path.join(work_dir, 'isolated_result.json') |
300 # Raw command. | 308 cmd = get_isolated_cmd( |
301 cmd = task_details.command | 309 work_dir, task_details, isolated_result, min_free_space) |
302 isolated_result = None | 310 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to |
303 else: | 311 # give one 'grace_period' slot to the child process and one slot to upload |
304 # Isolated task. | 312 # the results back. |
305 isolated_result = os.path.join(work_dir, 'isolated_result.json') | 313 task_details.hard_timeout = 0 |
306 cmd = get_isolated_cmd( | 314 if task_details.grace_period: |
307 work_dir, task_details, isolated_result, min_free_space) | 315 task_details.grace_period *= 2 |
308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to | |
309 # give one 'grace_period' slot to the child process and one slot to upload | |
310 # the results back. | |
311 task_details.hard_timeout = 0 | |
312 if task_details.grace_period: | |
313 task_details.grace_period *= 2 | |
314 | 316 |
315 try: | 317 try: |
316 # TODO(maruel): Support both channels independently and display stderr in | 318 # TODO(maruel): Support both channels independently and display stderr in |
317 # red. | 319 # red. |
318 env = None | 320 env = None |
319 if task_details.env: | 321 if task_details.env: |
320 env = os.environ.copy() | 322 env = os.environ.copy() |
321 for key, value in task_details.env.iteritems(): | 323 for key, value in task_details.env.iteritems(): |
322 if not value: | 324 if not value: |
323 env.pop(key, None) | 325 env.pop(key, None) |
(...skipping 22 matching lines...) Expand all Loading... |
346 u'exit_code': 1, | 348 u'exit_code': 1, |
347 u'hard_timeout': False, | 349 u'hard_timeout': False, |
348 u'io_timeout': False, | 350 u'io_timeout': False, |
349 u'must_signal_internal_failure': None, | 351 u'must_signal_internal_failure': None, |
350 u'version': OUT_VERSION, | 352 u'version': OUT_VERSION, |
351 } | 353 } |
352 | 354 |
353 output_chunk_start = 0 | 355 output_chunk_start = 0 |
354 stdout = '' | 356 stdout = '' |
355 exit_code = None | 357 exit_code = None |
356 had_hard_timeout = False | |
357 had_io_timeout = False | 358 had_io_timeout = False |
358 must_signal_internal_failure = None | 359 must_signal_internal_failure = None |
359 kill_sent = False | 360 kill_sent = False |
360 timed_out = None | 361 timed_out = None |
361 try: | 362 try: |
362 calc = lambda: calc_yield_wait( | 363 calc = lambda: calc_yield_wait( |
363 task_details, start, last_io, timed_out, stdout) | 364 task_details, start, last_io, timed_out, stdout) |
364 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) | 365 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) |
365 last_io = monotonic_time() | 366 last_io = monotonic_time() |
366 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): | 367 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): |
(...skipping 14 matching lines...) Expand all Loading... |
381 # Send signal on timeout if necessary. Both are failures, not | 382 # Send signal on timeout if necessary. Both are failures, not |
382 # internal_failures. | 383 # internal_failures. |
383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. | 384 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. |
384 if not timed_out: | 385 if not timed_out: |
385 if (task_details.io_timeout and | 386 if (task_details.io_timeout and |
386 now - last_io > task_details.io_timeout): | 387 now - last_io > task_details.io_timeout): |
387 had_io_timeout = True | 388 had_io_timeout = True |
388 logging.warning('I/O timeout; sending SIGTERM') | 389 logging.warning('I/O timeout; sending SIGTERM') |
389 proc.terminate() | 390 proc.terminate() |
390 timed_out = monotonic_time() | 391 timed_out = monotonic_time() |
391 elif (task_details.hard_timeout and | |
392 now - start > task_details.hard_timeout): | |
393 had_hard_timeout = True | |
394 logging.warning('Hard timeout; sending SIGTERM') | |
395 proc.terminate() | |
396 timed_out = monotonic_time() | |
397 else: | 392 else: |
398 # During grace period. | 393 # During grace period. |
399 if not kill_sent and now >= timed_out + task_details.grace_period: | 394 if not kill_sent and now >= timed_out + task_details.grace_period: |
400 # Now kill for real. The user can distinguish between the following | 395 # Now kill for real. The user can distinguish between the following |
401 # states: | 396 # states: |
402 # - signal but process exited within grace period, | 397 # - signal but process exited within grace period, |
403 # (hard_|io_)_timed_out will be set but the process exit code will | 398 # (hard_|io_)_timed_out will be set but the process exit code will |
404 # be script provided. | 399 # be script provided. |
405 # - processed exited late, exit code will be -9 on posix. | 400 # - processed exited late, exit code will be -9 on posix. |
406 logging.warning('Grace exhausted; sending SIGKILL') | 401 logging.warning('Grace exhausted; sending SIGKILL') |
407 proc.kill() | 402 proc.kill() |
408 kill_sent = True | 403 kill_sent = True |
409 logging.info('Waiting for proces exit') | 404 logging.info('Waiting for proces exit') |
410 exit_code = proc.wait() | 405 exit_code = proc.wait() |
411 except MustExit as e: | 406 except MustExit as e: |
412 # TODO(maruel): Do the send SIGTERM to child process and give it | 407 # TODO(maruel): Do the send SIGTERM to child process and give it |
413 # task_details.grace_period to terminate. | 408 # task_details.grace_period to terminate. |
414 must_signal_internal_failure = ( | 409 must_signal_internal_failure = ( |
415 u'task_runner received signal %s' % e.signal) | 410 u'task_runner received signal %s' % e.signal) |
416 exit_code = kill_and_wait( | 411 exit_code = kill_and_wait( |
417 proc, task_details.grace_period, 'signal %d' % e.signal) | 412 proc, task_details.grace_period, 'signal %d' % e.signal) |
418 except (IOError, OSError): | 413 except (IOError, OSError): |
419 # Something wrong happened, try to kill the child process. | 414 # Something wrong happened, try to kill the child process. |
420 had_hard_timeout = True | |
421 exit_code = kill_and_wait( | 415 exit_code = kill_and_wait( |
422 proc, task_details.grace_period, 'exception %s' % e) | 416 proc, task_details.grace_period, 'exception %s' % e) |
423 | 417 |
424 # This is the very last packet for this command. It if was an isolated task, | 418 # This is the very last packet for this command. It if was an isolated task, |
425 # include the output reference to the archived .isolated file. | 419 # include the output reference to the archived .isolated file. |
426 now = monotonic_time() | 420 now = monotonic_time() |
427 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 421 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
428 params['duration'] = now - start | 422 params['duration'] = now - start |
429 params['io_timeout'] = had_io_timeout | 423 params['io_timeout'] = had_io_timeout |
430 params['hard_timeout'] = had_hard_timeout | 424 had_hard_timeout = False |
431 if isolated_result: | 425 try: |
432 try: | 426 if not os.path.isfile(isolated_result): |
433 if ((had_io_timeout or had_hard_timeout) and | 427 # It's possible if |
434 not os.path.isfile(isolated_result)): | 428 # - run_isolated.py did not start |
435 # It's possible that run_isolated failed to quit quickly enough; it | 429 # - run_isolated.py started, but arguments were invalid |
436 # could be because there was too much data to upload back or something | 430 # - host in a situation unable to fork |
437 # else. Do not create an internal error, just send back the (partial) | 431 # - grand child process outliving the child process deleting everything |
438 # view as task_runner saw it, for example the real exit_code is | 432 # it can |
439 # unknown. | 433 # Do not create an internal error, just send back the (partial) |
440 logging.warning('TIMED_OUT and there\'s no result file') | 434 # view as task_runner saw it, for example the real exit_code is |
| 435 # unknown. |
| 436 logging.warning('there\'s no result file') |
| 437 if exit_code is None: |
441 exit_code = -1 | 438 exit_code = -1 |
442 else: | 439 else: |
443 # See run_isolated.py for the format. | 440 # See run_isolated.py for the format. |
444 with open(isolated_result, 'rb') as f: | 441 with open(isolated_result, 'rb') as f: |
445 run_isolated_result = json.load(f) | 442 run_isolated_result = json.load(f) |
446 logging.debug('run_isolated:\n%s', run_isolated_result) | 443 logging.debug('run_isolated:\n%s', run_isolated_result) |
447 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, | 444 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, |
448 # mapping time, etc) from run_isolated and push them to the server. | 445 # mapping time, etc) from run_isolated and push them to the server. |
449 if run_isolated_result['outputs_ref']: | 446 if run_isolated_result['outputs_ref']: |
450 params['outputs_ref'] = run_isolated_result['outputs_ref'] | 447 params['outputs_ref'] = run_isolated_result['outputs_ref'] |
451 had_hard_timeout = ( | 448 had_hard_timeout = run_isolated_result['had_hard_timeout'] |
452 had_hard_timeout or run_isolated_result['had_hard_timeout']) | 449 if not had_io_timeout and not had_hard_timeout: |
453 params['hard_timeout'] = had_hard_timeout | 450 if run_isolated_result['internal_failure']: |
454 if not had_io_timeout and not had_hard_timeout: | 451 must_signal_internal_failure = ( |
455 if run_isolated_result['internal_failure']: | 452 run_isolated_result['internal_failure']) |
456 must_signal_internal_failure = ( | 453 logging.error('%s', must_signal_internal_failure) |
457 run_isolated_result['internal_failure']) | 454 elif exit_code: |
458 logging.error('%s', must_signal_internal_failure) | 455 # TODO(maruel): Grab stdout from run_isolated. |
459 elif exit_code: | 456 must_signal_internal_failure = ( |
460 # TODO(maruel): Grab stdout from run_isolated. | 457 'run_isolated internal failure %d' % exit_code) |
461 must_signal_internal_failure = ( | 458 logging.error('%s', must_signal_internal_failure) |
462 'run_isolated internal failure %d' % exit_code) | 459 exit_code = run_isolated_result['exit_code'] |
463 logging.error('%s', must_signal_internal_failure) | 460 if run_isolated_result.get('duration') is not None: |
464 exit_code = run_isolated_result['exit_code'] | 461 # Calculate the real task duration as measured by run_isolated and |
465 if run_isolated_result.get('duration') is not None: | 462 # calculate the remaining overhead. |
466 # Calculate the real task duration as measured by run_isolated and | 463 params['bot_overhead'] = params['duration'] |
467 # calculate the remaining overhead. | 464 params['duration'] = run_isolated_result['duration'] |
468 params['bot_overhead'] = params['duration'] | 465 params['bot_overhead'] -= params['duration'] |
469 params['duration'] = run_isolated_result['duration'] | 466 params['bot_overhead'] -= run_isolated_result.get( |
470 params['bot_overhead'] -= params['duration'] | 467 'download', {}).get('duration', 0) |
471 params['bot_overhead'] -= run_isolated_result.get( | 468 params['bot_overhead'] -= run_isolated_result.get( |
472 'download', {}).get('duration', 0) | 469 'upload', {}).get('duration', 0) |
473 params['bot_overhead'] -= run_isolated_result.get( | 470 if params['bot_overhead'] < 0: |
474 'upload', {}).get('duration', 0) | 471 params['bot_overhead'] = 0 |
475 if params['bot_overhead'] < 0: | 472 stats = run_isolated_result.get('stats') |
476 params['bot_overhead'] = 0 | 473 if stats: |
477 stats = run_isolated_result.get('stats') | 474 params['isolated_stats'] = stats |
478 if stats: | 475 except (IOError, OSError, ValueError) as e: |
479 params['isolated_stats'] = stats | 476 logging.error('Swallowing error: %s', e) |
480 except (IOError, OSError, ValueError) as e: | 477 if not must_signal_internal_failure: |
481 logging.error('Swallowing error: %s', e) | 478 must_signal_internal_failure = str(e) |
482 if not must_signal_internal_failure: | |
483 must_signal_internal_failure = str(e) | |
484 # TODO(maruel): Send the internal failure here instead of sending it through | 479 # TODO(maruel): Send the internal failure here instead of sending it through |
485 # bot_main, this causes a race condition. | 480 # bot_main, this causes a race condition. |
486 if exit_code is None: | 481 if exit_code is None: |
487 exit_code = -1 | 482 exit_code = -1 |
| 483 params['hard_timeout'] = had_hard_timeout |
488 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) | 484 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) |
489 return { | 485 return { |
490 u'exit_code': exit_code, | 486 u'exit_code': exit_code, |
491 u'hard_timeout': had_hard_timeout, | 487 u'hard_timeout': had_hard_timeout, |
492 u'io_timeout': had_io_timeout, | 488 u'io_timeout': had_io_timeout, |
493 u'must_signal_internal_failure': must_signal_internal_failure, | 489 u'must_signal_internal_failure': must_signal_internal_failure, |
494 u'version': OUT_VERSION, | 490 u'version': OUT_VERSION, |
495 } | 491 } |
496 finally: | 492 finally: |
497 if isolated_result: | 493 try: |
498 try: | 494 os.remove(isolated_result) |
499 os.remove(isolated_result) | 495 except OSError: |
500 except OSError: | 496 pass |
501 pass | |
502 | 497 |
503 | 498 |
504 def main(args): | 499 def main(args): |
505 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | 500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
506 parser.add_option('--in-file', help='Name of the request file') | 501 parser.add_option('--in-file', help='Name of the request file') |
507 parser.add_option( | 502 parser.add_option( |
508 '--out-file', help='Name of the JSON file to write a task summary to') | 503 '--out-file', help='Name of the JSON file to write a task summary to') |
509 parser.add_option( | 504 parser.add_option( |
510 '--swarming-server', help='Swarming server to send data back') | 505 '--swarming-server', help='Swarming server to send data back') |
511 parser.add_option( | 506 parser.add_option( |
(...skipping 14 matching lines...) Expand all Loading... |
526 if options.start > now: | 521 if options.start > now: |
527 options.start = now | 522 options.start = now |
528 | 523 |
529 try: | 524 try: |
530 load_and_run( | 525 load_and_run( |
531 options.in_file, options.swarming_server, options.cost_usd_hour, | 526 options.in_file, options.swarming_server, options.cost_usd_hour, |
532 options.start, options.out_file, options.min_free_space) | 527 options.start, options.out_file, options.min_free_space) |
533 return 0 | 528 return 0 |
534 finally: | 529 finally: |
535 logging.info('quitting') | 530 logging.info('quitting') |
OLD | NEW |