Index: appengine/swarming/swarming_bot/bot_code/task_runner.py |
diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
index 40f4208bd5df969cb1a6cc66ff9708528a5928c5..8f05f735fa01c4c86f1a183eb97eb46b9635b1f6 100644 |
--- a/appengine/swarming/swarming_bot/bot_code/task_runner.py |
+++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
@@ -28,6 +28,9 @@ from utils import on_error |
from utils import subprocess42 |
from utils import zip_package |
+import bot_auth |
+import file_reader |
+ |
# Path to this file or the zip containing this file. |
THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) |
@@ -182,8 +185,8 @@ def load_and_run( |
task_details = TaskDetails(json.load(f)) |
task_result = run_command( |
- swarming_server, task_details, work_dir, cost_usd_hour, start, |
- min_free_space) |
+ swarming_server, task_details, work_dir, |
+ cost_usd_hour, start, min_free_space) |
except MustExit as e: |
# This normally means run_command() didn't get the chance to run, as it |
# itself trap MustExit and will report accordingly. In this case, we want |
@@ -206,11 +209,14 @@ def load_and_run( |
json.dump(task_result, f) |
-def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): |
+def post_update( |
+ swarming_server, auth_headers, params, exit_code, |
+ stdout, output_chunk_start): |
"""Posts task update to task_update. |
Arguments: |
swarming_server: Base URL to Swarming server. |
+ auth_headers: dict with HTTP authentication headers. |
params: Default JSON parameters for the POST. |
exit_code: Process exit code, only when a command completed. |
stdout: Incremental output since last call, if any. |
@@ -229,7 +235,9 @@ def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): |
# https://code.google.com/p/swarming/issues/detail?id=62 |
resp = net.url_read_json( |
swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], |
- data=params) |
+ data=params, |
+ headers=auth_headers, |
+ follow_redirects=False) |
logging.debug('post_update() = %s', resp) |
if not resp or resp.get('error'): |
# Abandon it. This will force a process exit. |
@@ -281,9 +289,32 @@ def kill_and_wait(proc, grace_period, reason): |
return exit_code |
+def start_reading_headers(auth_params_file): |
+ """Spawns a thread that rereads headers from SWARMING_AUTH_PARAMS file. |
+ |
+ Returns: |
+ Tuple (callback that returns the last known headers, stop callback). |
+ |
+ Raises: |
+ file_reader.FatalReadError if headers file can't be read. |
+ ValueError if it can be read, but its body is invalid. |
+ """ |
+ # Read headers more often than bot_main writes them, to reduce maximum |
+ # possible latency between headers are updated and read. |
+ reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30) |
+ |
+ def read_and_validate_headers(): |
+ val = bot_auth.process_auth_params_json(reader.last_value or {}) |
+ return val.swarming_http_headers |
+ |
+ reader.start() |
+ read_and_validate_headers() # initial validation, may raise ValueError |
+ return read_and_validate_headers, reader.stop |
+ |
+ |
def run_command( |
- swarming_server, task_details, work_dir, cost_usd_hour, task_start, |
- min_free_space): |
+ swarming_server, task_details, work_dir, cost_usd_hour, |
+ task_start, min_free_space): |
"""Runs a command and sends packets to the server to stream results back. |
Implements both I/O and hard timeouts. Sends the packets numbered, so the |
@@ -293,6 +324,24 @@ def run_command( |
Metadata about the command. |
""" |
# TODO(maruel): This function is incomprehensible, split and refactor. |
+ |
+ # Grab initial auth headers and start rereading them in parallel thread. They |
+ # MUST be there already. It's fatal internal error if they are not. |
+ headers_cb = lambda: {} |
+ stop_headers_reader = lambda: None |
+ auth_params_file = os.environ.get('SWARMING_AUTH_PARAMS') |
+ if auth_params_file: |
+ try: |
+ headers_cb, stop_headers_reader = start_reading_headers(auth_params_file) |
+ except (ValueError, file_reader.FatalReadError) as e: |
+ return { |
+ u'exit_code': 1, |
+ u'hard_timeout': False, |
+ u'io_timeout': False, |
+ u'must_signal_internal_failure': str(e), |
+ u'version': OUT_VERSION, |
+ } |
+ |
# Signal the command is about to be started. |
last_packet = start = now = monotonic_time() |
params = { |
@@ -300,7 +349,7 @@ def run_command( |
'id': task_details.bot_id, |
'task_id': task_details.task_id, |
} |
- post_update(swarming_server, params, None, '', 0) |
+ post_update(swarming_server, headers_cb(), params, None, '', 0) |
isolated_result = os.path.join(work_dir, 'isolated_result.json') |
cmd = get_isolated_cmd( |
@@ -342,7 +391,7 @@ def run_command( |
params['duration'] = now - start |
params['io_timeout'] = False |
params['hard_timeout'] = False |
- post_update(swarming_server, params, 1, stdout, 0) |
+ post_update(swarming_server, headers_cb(), params, 1, stdout, 0) |
return { |
u'exit_code': 1, |
u'hard_timeout': False, |
@@ -374,7 +423,9 @@ def run_command( |
last_packet = monotonic_time() |
params['cost_usd'] = ( |
cost_usd_hour * (last_packet - task_start) / 60. / 60.) |
- post_update(swarming_server, params, None, stdout, output_chunk_start) |
+ post_update( |
+ swarming_server, headers_cb(), params, None, |
+ stdout, output_chunk_start) |
output_chunk_start += len(stdout) |
stdout = '' |
@@ -480,7 +531,9 @@ def run_command( |
if exit_code is None: |
exit_code = -1 |
params['hard_timeout'] = had_hard_timeout |
- post_update(swarming_server, params, exit_code, stdout, output_chunk_start) |
+ post_update( |
+ swarming_server, headers_cb(), params, exit_code, |
+ stdout, output_chunk_start) |
return { |
u'exit_code': exit_code, |
u'hard_timeout': had_hard_timeout, |
@@ -493,6 +546,7 @@ def run_command( |
os.remove(isolated_result) |
except OSError: |
pass |
+ stop_headers_reader() |
def main(args): |