| Index: appengine/swarming/swarming_bot/bot_code/task_runner.py
|
| diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py
|
| index 40f4208bd5df969cb1a6cc66ff9708528a5928c5..8f05f735fa01c4c86f1a183eb97eb46b9635b1f6 100644
|
| --- a/appengine/swarming/swarming_bot/bot_code/task_runner.py
|
| +++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py
|
| @@ -28,6 +28,9 @@ from utils import on_error
|
| from utils import subprocess42
|
| from utils import zip_package
|
|
|
| +import bot_auth
|
| +import file_reader
|
| +
|
|
|
| # Path to this file or the zip containing this file.
|
| THIS_FILE = os.path.abspath(zip_package.get_main_script_path())
|
| @@ -182,8 +185,8 @@ def load_and_run(
|
| task_details = TaskDetails(json.load(f))
|
|
|
| task_result = run_command(
|
| - swarming_server, task_details, work_dir, cost_usd_hour, start,
|
| - min_free_space)
|
| + swarming_server, task_details, work_dir,
|
| + cost_usd_hour, start, min_free_space)
|
| except MustExit as e:
|
| # This normally means run_command() didn't get the chance to run, as it
|
| # itself trap MustExit and will report accordingly. In this case, we want
|
| @@ -206,11 +209,14 @@ def load_and_run(
|
| json.dump(task_result, f)
|
|
|
|
|
| -def post_update(swarming_server, params, exit_code, stdout, output_chunk_start):
|
| +def post_update(
|
| + swarming_server, auth_headers, params, exit_code,
|
| + stdout, output_chunk_start):
|
| """Posts task update to task_update.
|
|
|
| Arguments:
|
| swarming_server: Base URL to Swarming server.
|
| + auth_headers: dict with HTTP authentication headers.
|
| params: Default JSON parameters for the POST.
|
| exit_code: Process exit code, only when a command completed.
|
| stdout: Incremental output since last call, if any.
|
| @@ -229,7 +235,9 @@ def post_update(swarming_server, params, exit_code, stdout, output_chunk_start):
|
| # https://code.google.com/p/swarming/issues/detail?id=62
|
| resp = net.url_read_json(
|
| swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
|
| - data=params)
|
| + data=params,
|
| + headers=auth_headers,
|
| + follow_redirects=False)
|
| logging.debug('post_update() = %s', resp)
|
| if not resp or resp.get('error'):
|
| # Abandon it. This will force a process exit.
|
| @@ -281,9 +289,32 @@ def kill_and_wait(proc, grace_period, reason):
|
| return exit_code
|
|
|
|
|
| +def start_reading_headers(auth_params_file):
|
| + """Spawns a thread that rereads headers from SWARMING_AUTH_PARAMS file.
|
| +
|
| + Returns:
|
| + Tuple (callback that returns the last known headers, stop callback).
|
| +
|
| + Raises:
|
| + file_reader.FatalReadError if headers file can't be read.
|
| + ValueError if it can be read, but its body is invalid.
|
| + """
|
| + # Read headers more often than bot_main writes them, to reduce maximum
|
| + # possible latency between headers are updated and read.
|
| + reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30)
|
| +
|
| + def read_and_validate_headers():
|
| + val = bot_auth.process_auth_params_json(reader.last_value or {})
|
| + return val.swarming_http_headers
|
| +
|
| + reader.start()
|
| + read_and_validate_headers() # initial validation, may raise ValueError
|
| + return read_and_validate_headers, reader.stop
|
| +
|
| +
|
| def run_command(
|
| - swarming_server, task_details, work_dir, cost_usd_hour, task_start,
|
| - min_free_space):
|
| + swarming_server, task_details, work_dir, cost_usd_hour,
|
| + task_start, min_free_space):
|
| """Runs a command and sends packets to the server to stream results back.
|
|
|
| Implements both I/O and hard timeouts. Sends the packets numbered, so the
|
| @@ -293,6 +324,24 @@ def run_command(
|
| Metadata about the command.
|
| """
|
| # TODO(maruel): This function is incomprehensible, split and refactor.
|
| +
|
| + # Grab initial auth headers and start rereading them in parallel thread. They
|
| + # MUST be there already. It's fatal internal error if they are not.
|
| + headers_cb = lambda: {}
|
| + stop_headers_reader = lambda: None
|
| + auth_params_file = os.environ.get('SWARMING_AUTH_PARAMS')
|
| + if auth_params_file:
|
| + try:
|
| + headers_cb, stop_headers_reader = start_reading_headers(auth_params_file)
|
| + except (ValueError, file_reader.FatalReadError) as e:
|
| + return {
|
| + u'exit_code': 1,
|
| + u'hard_timeout': False,
|
| + u'io_timeout': False,
|
| + u'must_signal_internal_failure': str(e),
|
| + u'version': OUT_VERSION,
|
| + }
|
| +
|
| # Signal the command is about to be started.
|
| last_packet = start = now = monotonic_time()
|
| params = {
|
| @@ -300,7 +349,7 @@ def run_command(
|
| 'id': task_details.bot_id,
|
| 'task_id': task_details.task_id,
|
| }
|
| - post_update(swarming_server, params, None, '', 0)
|
| + post_update(swarming_server, headers_cb(), params, None, '', 0)
|
|
|
| isolated_result = os.path.join(work_dir, 'isolated_result.json')
|
| cmd = get_isolated_cmd(
|
| @@ -342,7 +391,7 @@ def run_command(
|
| params['duration'] = now - start
|
| params['io_timeout'] = False
|
| params['hard_timeout'] = False
|
| - post_update(swarming_server, params, 1, stdout, 0)
|
| + post_update(swarming_server, headers_cb(), params, 1, stdout, 0)
|
| return {
|
| u'exit_code': 1,
|
| u'hard_timeout': False,
|
| @@ -374,7 +423,9 @@ def run_command(
|
| last_packet = monotonic_time()
|
| params['cost_usd'] = (
|
| cost_usd_hour * (last_packet - task_start) / 60. / 60.)
|
| - post_update(swarming_server, params, None, stdout, output_chunk_start)
|
| + post_update(
|
| + swarming_server, headers_cb(), params, None,
|
| + stdout, output_chunk_start)
|
| output_chunk_start += len(stdout)
|
| stdout = ''
|
|
|
| @@ -480,7 +531,9 @@ def run_command(
|
| if exit_code is None:
|
| exit_code = -1
|
| params['hard_timeout'] = had_hard_timeout
|
| - post_update(swarming_server, params, exit_code, stdout, output_chunk_start)
|
| + post_update(
|
| + swarming_server, headers_cb(), params, exit_code,
|
| + stdout, output_chunk_start)
|
| return {
|
| u'exit_code': exit_code,
|
| u'hard_timeout': had_hard_timeout,
|
| @@ -493,6 +546,7 @@ def run_command(
|
| os.remove(isolated_result)
|
| except OSError:
|
| pass
|
| + stop_headers_reader()
|
|
|
|
|
| def main(args):
|
|
|