Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(245)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 1949613002: task_runner: always use run_isolated (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Fix task_runner.py Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be 2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Runs a Swarming task. 5 """Runs a Swarming task.
6 6
7 Downloads all the necessary files to run the task, executes the command and 7 Downloads all the necessary files to run the task, executes the command and
8 streams results back to the Swarming server. 8 streams results back to the Swarming server.
9 9
10 The process exit code is 0 when the task was executed, even if the task itself 10 The process exit code is 0 when the task was executed, even if the task itself
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 return [sys.executable, THIS_FILE, 'run_isolated'] 77 return [sys.executable, THIS_FILE, 'run_isolated']
78 78
79 79
80 def get_isolated_cmd( 80 def get_isolated_cmd(
81 work_dir, task_details, isolated_result, min_free_space): 81 work_dir, task_details, isolated_result, min_free_space):
82 """Returns the command to call run_isolated. Mocked in tests.""" 82 """Returns the command to call run_isolated. Mocked in tests."""
83 bot_dir = os.path.dirname(work_dir) 83 bot_dir = os.path.dirname(work_dir)
84 if os.path.isfile(isolated_result): 84 if os.path.isfile(isolated_result):
85 os.remove(isolated_result) 85 os.remove(isolated_result)
86 cmd = get_run_isolated() 86 cmd = get_run_isolated()
87 if task_details.inputs_ref:
88 cmd.extend(
89 [
90 '--isolated', task_details.inputs_ref['isolated'].encode('utf-8'),
91 '--namespace', task_details.inputs_ref['namespace'].encode('utf-8'),
92 '-I', task_details.inputs_ref['isolatedserver'].encode('utf-8'),
93 ])
94
87 cmd.extend( 95 cmd.extend(
88 [ 96 [
89 '--isolated', task_details.inputs_ref['isolated'].encode('utf-8'),
90 '--namespace', task_details.inputs_ref['namespace'].encode('utf-8'),
91 '-I', task_details.inputs_ref['isolatedserver'].encode('utf-8'),
92 '--json', isolated_result, 97 '--json', isolated_result,
93 '--log-file', os.path.join(bot_dir, 'logs', 'run_isolated.log'), 98 '--log-file', os.path.join(bot_dir, 'logs', 'run_isolated.log'),
94 '--cache', os.path.join(bot_dir, 'cache'), 99 '--cache', os.path.join(bot_dir, 'cache'),
95 '--root-dir', os.path.join(work_dir, 'isolated'), 100 '--root-dir', os.path.join(work_dir, 'isolated'),
96 ]) 101 ])
97 if min_free_space: 102 if min_free_space:
98 cmd.extend(('--min-free-space', str(min_free_space))) 103 cmd.extend(('--min-free-space', str(min_free_space)))
99 104
100 if task_details.hard_timeout: 105 if task_details.hard_timeout:
101 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) 106 cmd.extend(('--hard-timeout', str(task_details.hard_timeout)))
102 if task_details.grace_period: 107 if task_details.grace_period:
103 cmd.extend(('--grace-period', str(task_details.grace_period))) 108 cmd.extend(('--grace-period', str(task_details.grace_period)))
104 if task_details.extra_args: 109
105 cmd.append('--') 110 cmd.append('--')
111 if task_details.command:
112 cmd.extend(task_details.command)
113 elif task_details.extra_args:
106 cmd.extend(task_details.extra_args) 114 cmd.extend(task_details.extra_args)
107 return cmd 115 return cmd
108 116
109 117
110 class TaskDetails(object): 118 class TaskDetails(object):
111 def __init__(self, data): 119 def __init__(self, data):
112 """Loads the raw data. 120 """Loads the raw data.
113 121
114 It is expected to have at least: 122 It is expected to have at least:
115 - bot_id 123 - bot_id
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 # TODO(maruel): This function is incomprehensible, split and refactor. 297 # TODO(maruel): This function is incomprehensible, split and refactor.
290 # Signal the command is about to be started. 298 # Signal the command is about to be started.
291 last_packet = start = now = monotonic_time() 299 last_packet = start = now = monotonic_time()
292 params = { 300 params = {
293 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., 301 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60.,
294 'id': task_details.bot_id, 302 'id': task_details.bot_id,
295 'task_id': task_details.task_id, 303 'task_id': task_details.task_id,
296 } 304 }
297 post_update(swarming_server, params, None, '', 0) 305 post_update(swarming_server, params, None, '', 0)
298 306
299 if task_details.command: 307 isolated_result = os.path.join(work_dir, 'isolated_result.json')
300 # Raw command. 308 cmd = get_isolated_cmd(
301 cmd = task_details.command 309 work_dir, task_details, isolated_result, min_free_space)
302 isolated_result = None 310 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
303 else: 311 # give one 'grace_period' slot to the child process and one slot to upload
304 # Isolated task. 312 # the results back.
305 isolated_result = os.path.join(work_dir, 'isolated_result.json') 313 task_details.hard_timeout = 0
306 cmd = get_isolated_cmd( 314 if task_details.grace_period:
307 work_dir, task_details, isolated_result, min_free_space) 315 task_details.grace_period *= 2
308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
309 # give one 'grace_period' slot to the child process and one slot to upload
310 # the results back.
311 task_details.hard_timeout = 0
312 if task_details.grace_period:
313 task_details.grace_period *= 2
314 316
315 try: 317 try:
316 # TODO(maruel): Support both channels independently and display stderr in 318 # TODO(maruel): Support both channels independently and display stderr in
317 # red. 319 # red.
318 env = None 320 env = None
319 if task_details.env: 321 if task_details.env:
320 env = os.environ.copy() 322 env = os.environ.copy()
321 for key, value in task_details.env.iteritems(): 323 for key, value in task_details.env.iteritems():
322 if not value: 324 if not value:
323 env.pop(key, None) 325 env.pop(key, None)
(...skipping 22 matching lines...) Expand all
346 u'exit_code': 1, 348 u'exit_code': 1,
347 u'hard_timeout': False, 349 u'hard_timeout': False,
348 u'io_timeout': False, 350 u'io_timeout': False,
349 u'must_signal_internal_failure': None, 351 u'must_signal_internal_failure': None,
350 u'version': OUT_VERSION, 352 u'version': OUT_VERSION,
351 } 353 }
352 354
353 output_chunk_start = 0 355 output_chunk_start = 0
354 stdout = '' 356 stdout = ''
355 exit_code = None 357 exit_code = None
356 had_hard_timeout = False
357 had_io_timeout = False 358 had_io_timeout = False
358 must_signal_internal_failure = None 359 must_signal_internal_failure = None
359 kill_sent = False 360 kill_sent = False
360 timed_out = None 361 timed_out = None
361 try: 362 try:
362 calc = lambda: calc_yield_wait( 363 calc = lambda: calc_yield_wait(
363 task_details, start, last_io, timed_out, stdout) 364 task_details, start, last_io, timed_out, stdout)
364 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) 365 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout)
365 last_io = monotonic_time() 366 last_io = monotonic_time()
366 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): 367 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc):
(...skipping 14 matching lines...) Expand all
381 # Send signal on timeout if necessary. Both are failures, not 382 # Send signal on timeout if necessary. Both are failures, not
382 # internal_failures. 383 # internal_failures.
383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. 384 # Eventually kill but return 0 so bot_main.py doesn't cancel the task.
384 if not timed_out: 385 if not timed_out:
385 if (task_details.io_timeout and 386 if (task_details.io_timeout and
386 now - last_io > task_details.io_timeout): 387 now - last_io > task_details.io_timeout):
387 had_io_timeout = True 388 had_io_timeout = True
388 logging.warning('I/O timeout; sending SIGTERM') 389 logging.warning('I/O timeout; sending SIGTERM')
389 proc.terminate() 390 proc.terminate()
390 timed_out = monotonic_time() 391 timed_out = monotonic_time()
391 elif (task_details.hard_timeout and
392 now - start > task_details.hard_timeout):
393 had_hard_timeout = True
394 logging.warning('Hard timeout; sending SIGTERM')
395 proc.terminate()
396 timed_out = monotonic_time()
397 else: 392 else:
398 # During grace period. 393 # During grace period.
399 if not kill_sent and now >= timed_out + task_details.grace_period: 394 if not kill_sent and now >= timed_out + task_details.grace_period:
400 # Now kill for real. The user can distinguish between the following 395 # Now kill for real. The user can distinguish between the following
401 # states: 396 # states:
402 # - signal but process exited within grace period, 397 # - signal but process exited within grace period,
403 # (hard_|io_)_timed_out will be set but the process exit code will 398 # (hard_|io_)_timed_out will be set but the process exit code will
404 # be script provided. 399 # be script provided.
405 # - processed exited late, exit code will be -9 on posix. 400 # - processed exited late, exit code will be -9 on posix.
406 logging.warning('Grace exhausted; sending SIGKILL') 401 logging.warning('Grace exhausted; sending SIGKILL')
407 proc.kill() 402 proc.kill()
408 kill_sent = True 403 kill_sent = True
409 logging.info('Waiting for proces exit') 404 logging.info('Waiting for proces exit')
410 exit_code = proc.wait() 405 exit_code = proc.wait()
411 except MustExit as e: 406 except MustExit as e:
412 # TODO(maruel): Do the send SIGTERM to child process and give it 407 # TODO(maruel): Do the send SIGTERM to child process and give it
413 # task_details.grace_period to terminate. 408 # task_details.grace_period to terminate.
414 must_signal_internal_failure = ( 409 must_signal_internal_failure = (
415 u'task_runner received signal %s' % e.signal) 410 u'task_runner received signal %s' % e.signal)
416 exit_code = kill_and_wait( 411 exit_code = kill_and_wait(
417 proc, task_details.grace_period, 'signal %d' % e.signal) 412 proc, task_details.grace_period, 'signal %d' % e.signal)
418 except (IOError, OSError): 413 except (IOError, OSError):
419 # Something wrong happened, try to kill the child process. 414 # Something wrong happened, try to kill the child process.
420 had_hard_timeout = True
421 exit_code = kill_and_wait( 415 exit_code = kill_and_wait(
422 proc, task_details.grace_period, 'exception %s' % e) 416 proc, task_details.grace_period, 'exception %s' % e)
423 417
424 # This is the very last packet for this command. It if was an isolated task, 418 # This is the very last packet for this command. It if was an isolated task,
425 # include the output reference to the archived .isolated file. 419 # include the output reference to the archived .isolated file.
426 now = monotonic_time() 420 now = monotonic_time()
427 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. 421 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60.
428 params['duration'] = now - start 422 params['duration'] = now - start
429 params['io_timeout'] = had_io_timeout 423 params['io_timeout'] = had_io_timeout
430 params['hard_timeout'] = had_hard_timeout 424 had_hard_timeout = False
431 if isolated_result: 425 try:
432 try: 426 if not os.path.isfile(isolated_result):
433 if ((had_io_timeout or had_hard_timeout) and 427 # It's possible if
434 not os.path.isfile(isolated_result)): 428 # - run_isolated.py did not start
435 # It's possible that run_isolated failed to quit quickly enough; it 429 # - run_isolated.py started, but arguments were invalid
436 # could be because there was too much data to upload back or something 430 # - host in a situation unable to fork
437 # else. Do not create an internal error, just send back the (partial) 431 # - grand child process outliving the child process deleting everything
438 # view as task_runner saw it, for example the real exit_code is 432 # it can
439 # unknown. 433 # Do not create an internal error, just send back the (partial)
440 logging.warning('TIMED_OUT and there\'s no result file') 434 # view as task_runner saw it, for example the real exit_code is
435 # unknown.
436 logging.warning('there\'s no result file')
437 if exit_code is None:
441 exit_code = -1 438 exit_code = -1
442 else: 439 else:
443 # See run_isolated.py for the format. 440 # See run_isolated.py for the format.
444 with open(isolated_result, 'rb') as f: 441 with open(isolated_result, 'rb') as f:
445 run_isolated_result = json.load(f) 442 run_isolated_result = json.load(f)
446 logging.debug('run_isolated:\n%s', run_isolated_result) 443 logging.debug('run_isolated:\n%s', run_isolated_result)
447 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, 444 # TODO(maruel): Grab statistics (cache hit rate, data downloaded,
448 # mapping time, etc) from run_isolated and push them to the server. 445 # mapping time, etc) from run_isolated and push them to the server.
449 if run_isolated_result['outputs_ref']: 446 if run_isolated_result['outputs_ref']:
450 params['outputs_ref'] = run_isolated_result['outputs_ref'] 447 params['outputs_ref'] = run_isolated_result['outputs_ref']
451 had_hard_timeout = ( 448 had_hard_timeout = run_isolated_result['had_hard_timeout']
452 had_hard_timeout or run_isolated_result['had_hard_timeout']) 449 if not had_io_timeout and not had_hard_timeout:
453 params['hard_timeout'] = had_hard_timeout 450 if run_isolated_result['internal_failure']:
454 if not had_io_timeout and not had_hard_timeout: 451 must_signal_internal_failure = (
455 if run_isolated_result['internal_failure']: 452 run_isolated_result['internal_failure'])
456 must_signal_internal_failure = ( 453 logging.error('%s', must_signal_internal_failure)
457 run_isolated_result['internal_failure']) 454 elif exit_code:
458 logging.error('%s', must_signal_internal_failure) 455 # TODO(maruel): Grab stdout from run_isolated.
459 elif exit_code: 456 must_signal_internal_failure = (
460 # TODO(maruel): Grab stdout from run_isolated. 457 'run_isolated internal failure %d' % exit_code)
461 must_signal_internal_failure = ( 458 logging.error('%s', must_signal_internal_failure)
462 'run_isolated internal failure %d' % exit_code) 459 exit_code = run_isolated_result['exit_code']
463 logging.error('%s', must_signal_internal_failure) 460 if run_isolated_result.get('duration') is not None:
464 exit_code = run_isolated_result['exit_code'] 461 # Calculate the real task duration as measured by run_isolated and
465 if run_isolated_result.get('duration') is not None: 462 # calculate the remaining overhead.
466 # Calculate the real task duration as measured by run_isolated and 463 params['bot_overhead'] = params['duration']
467 # calculate the remaining overhead. 464 params['duration'] = run_isolated_result['duration']
468 params['bot_overhead'] = params['duration'] 465 params['bot_overhead'] -= params['duration']
469 params['duration'] = run_isolated_result['duration'] 466 params['bot_overhead'] -= run_isolated_result.get(
470 params['bot_overhead'] -= params['duration'] 467 'download', {}).get('duration', 0)
471 params['bot_overhead'] -= run_isolated_result.get( 468 params['bot_overhead'] -= run_isolated_result.get(
472 'download', {}).get('duration', 0) 469 'upload', {}).get('duration', 0)
473 params['bot_overhead'] -= run_isolated_result.get( 470 if params['bot_overhead'] < 0:
474 'upload', {}).get('duration', 0) 471 params['bot_overhead'] = 0
475 if params['bot_overhead'] < 0: 472 stats = run_isolated_result.get('stats')
476 params['bot_overhead'] = 0 473 if stats:
477 stats = run_isolated_result.get('stats') 474 params['isolated_stats'] = stats
478 if stats: 475 except (IOError, OSError, ValueError) as e:
479 params['isolated_stats'] = stats 476 logging.error('Swallowing error: %s', e)
480 except (IOError, OSError, ValueError) as e: 477 if not must_signal_internal_failure:
481 logging.error('Swallowing error: %s', e) 478 must_signal_internal_failure = str(e)
482 if not must_signal_internal_failure:
483 must_signal_internal_failure = str(e)
484 # TODO(maruel): Send the internal failure here instead of sending it through 479 # TODO(maruel): Send the internal failure here instead of sending it through
485 # bot_main, this causes a race condition. 480 # bot_main, this causes a race condition.
486 if exit_code is None: 481 if exit_code is None:
487 exit_code = -1 482 exit_code = -1
483 params['hard_timeout'] = had_hard_timeout
488 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) 484 post_update(swarming_server, params, exit_code, stdout, output_chunk_start)
489 return { 485 return {
490 u'exit_code': exit_code, 486 u'exit_code': exit_code,
491 u'hard_timeout': had_hard_timeout, 487 u'hard_timeout': had_hard_timeout,
492 u'io_timeout': had_io_timeout, 488 u'io_timeout': had_io_timeout,
493 u'must_signal_internal_failure': must_signal_internal_failure, 489 u'must_signal_internal_failure': must_signal_internal_failure,
494 u'version': OUT_VERSION, 490 u'version': OUT_VERSION,
495 } 491 }
496 finally: 492 finally:
497 if isolated_result: 493 try:
498 try: 494 os.remove(isolated_result)
499 os.remove(isolated_result) 495 except OSError:
500 except OSError: 496 pass
501 pass
502 497
503 498
504 def main(args): 499 def main(args):
505 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
506 parser.add_option('--in-file', help='Name of the request file') 501 parser.add_option('--in-file', help='Name of the request file')
507 parser.add_option( 502 parser.add_option(
508 '--out-file', help='Name of the JSON file to write a task summary to') 503 '--out-file', help='Name of the JSON file to write a task summary to')
509 parser.add_option( 504 parser.add_option(
510 '--swarming-server', help='Swarming server to send data back') 505 '--swarming-server', help='Swarming server to send data back')
511 parser.add_option( 506 parser.add_option(
(...skipping 14 matching lines...) Expand all
526 if options.start > now: 521 if options.start > now:
527 options.start = now 522 options.start = now
528 523
529 try: 524 try:
530 load_and_run( 525 load_and_run(
531 options.in_file, options.swarming_server, options.cost_usd_hour, 526 options.in_file, options.swarming_server, options.cost_usd_hour,
532 options.start, options.out_file, options.min_free_space) 527 options.start, options.out_file, options.min_free_space)
533 return 0 528 return 0
534 finally: 529 finally:
535 logging.info('quitting') 530 logging.info('quitting')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698