Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2656)

Unified Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/task_runner.py
diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py
index 40f4208bd5df969cb1a6cc66ff9708528a5928c5..8f05f735fa01c4c86f1a183eb97eb46b9635b1f6 100644
--- a/appengine/swarming/swarming_bot/bot_code/task_runner.py
+++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py
@@ -28,6 +28,9 @@ from utils import on_error
from utils import subprocess42
from utils import zip_package
+import bot_auth
+import file_reader
+
# Path to this file or the zip containing this file.
THIS_FILE = os.path.abspath(zip_package.get_main_script_path())
@@ -182,8 +185,8 @@ def load_and_run(
task_details = TaskDetails(json.load(f))
task_result = run_command(
- swarming_server, task_details, work_dir, cost_usd_hour, start,
- min_free_space)
+ swarming_server, task_details, work_dir,
+ cost_usd_hour, start, min_free_space)
except MustExit as e:
# This normally means run_command() didn't get the chance to run, as it
# itself trap MustExit and will report accordingly. In this case, we want
@@ -206,11 +209,14 @@ def load_and_run(
json.dump(task_result, f)
-def post_update(swarming_server, params, exit_code, stdout, output_chunk_start):
+def post_update(
+ swarming_server, auth_headers, params, exit_code,
+ stdout, output_chunk_start):
"""Posts task update to task_update.
Arguments:
swarming_server: Base URL to Swarming server.
+ auth_headers: dict with HTTP authentication headers.
params: Default JSON parameters for the POST.
exit_code: Process exit code, only when a command completed.
stdout: Incremental output since last call, if any.
@@ -229,7 +235,9 @@ def post_update(swarming_server, params, exit_code, stdout, output_chunk_start):
# https://code.google.com/p/swarming/issues/detail?id=62
resp = net.url_read_json(
swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
- data=params)
+ data=params,
+ headers=auth_headers,
+ follow_redirects=False)
logging.debug('post_update() = %s', resp)
if not resp or resp.get('error'):
# Abandon it. This will force a process exit.
@@ -281,9 +289,32 @@ def kill_and_wait(proc, grace_period, reason):
return exit_code
+def start_reading_headers(auth_params_file):
+ """Spawns a thread that rereads headers from SWARMING_AUTH_PARAMS file.
+
+ Returns:
+ Tuple (callback that returns the last known headers, stop callback).
+
+ Raises:
+ file_reader.FatalReadError if headers file can't be read.
+ ValueError if it can be read, but its body is invalid.
+ """
+ # Read headers more often than bot_main writes them, to reduce maximum
+ # possible latency between headers are updated and read.
+ reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30)
+
+ def read_and_validate_headers():
+ val = bot_auth.process_auth_params_json(reader.last_value or {})
+ return val.swarming_http_headers
+
+ reader.start()
+ read_and_validate_headers() # initial validation, may raise ValueError
+ return read_and_validate_headers, reader.stop
+
+
def run_command(
- swarming_server, task_details, work_dir, cost_usd_hour, task_start,
- min_free_space):
+ swarming_server, task_details, work_dir, cost_usd_hour,
+ task_start, min_free_space):
"""Runs a command and sends packets to the server to stream results back.
Implements both I/O and hard timeouts. Sends the packets numbered, so the
@@ -293,6 +324,24 @@ def run_command(
Metadata about the command.
"""
# TODO(maruel): This function is incomprehensible, split and refactor.
+
+ # Grab initial auth headers and start rereading them in parallel thread. They
+ # MUST be there already. It's fatal internal error if they are not.
+ headers_cb = lambda: {}
+ stop_headers_reader = lambda: None
+ auth_params_file = os.environ.get('SWARMING_AUTH_PARAMS')
+ if auth_params_file:
+ try:
+ headers_cb, stop_headers_reader = start_reading_headers(auth_params_file)
+ except (ValueError, file_reader.FatalReadError) as e:
+ return {
+ u'exit_code': 1,
+ u'hard_timeout': False,
+ u'io_timeout': False,
+ u'must_signal_internal_failure': str(e),
+ u'version': OUT_VERSION,
+ }
+
# Signal the command is about to be started.
last_packet = start = now = monotonic_time()
params = {
@@ -300,7 +349,7 @@ def run_command(
'id': task_details.bot_id,
'task_id': task_details.task_id,
}
- post_update(swarming_server, params, None, '', 0)
+ post_update(swarming_server, headers_cb(), params, None, '', 0)
isolated_result = os.path.join(work_dir, 'isolated_result.json')
cmd = get_isolated_cmd(
@@ -342,7 +391,7 @@ def run_command(
params['duration'] = now - start
params['io_timeout'] = False
params['hard_timeout'] = False
- post_update(swarming_server, params, 1, stdout, 0)
+ post_update(swarming_server, headers_cb(), params, 1, stdout, 0)
return {
u'exit_code': 1,
u'hard_timeout': False,
@@ -374,7 +423,9 @@ def run_command(
last_packet = monotonic_time()
params['cost_usd'] = (
cost_usd_hour * (last_packet - task_start) / 60. / 60.)
- post_update(swarming_server, params, None, stdout, output_chunk_start)
+ post_update(
+ swarming_server, headers_cb(), params, None,
+ stdout, output_chunk_start)
output_chunk_start += len(stdout)
stdout = ''
@@ -480,7 +531,9 @@ def run_command(
if exit_code is None:
exit_code = -1
params['hard_timeout'] = had_hard_timeout
- post_update(swarming_server, params, exit_code, stdout, output_chunk_start)
+ post_update(
+ swarming_server, headers_cb(), params, exit_code,
+ stdout, output_chunk_start)
return {
u'exit_code': exit_code,
u'hard_timeout': had_hard_timeout,
@@ -493,6 +546,7 @@ def run_command(
os.remove(isolated_result)
except OSError:
pass
+ stop_headers_reader()
def main(args):

Powered by Google App Engine
This is Rietveld 408576698