Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2101)

Unified Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/task_runner.py
diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py
index 40f4208bd5df969cb1a6cc66ff9708528a5928c5..0d21d0b26401223596fff7c24686bc0e0bc6f90c 100644
--- a/appengine/swarming/swarming_bot/bot_code/task_runner.py
+++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py
@@ -157,7 +157,8 @@ class MustExit(Exception):
def load_and_run(
- in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space):
+ in_file, swarming_server, auth_headers_file, cost_usd_hour, start,
+ out_file, min_free_space):
"""Loads the task's metadata and execute it.
This may throw all sorts of exceptions in case of failure. It's up to the
@@ -182,8 +183,8 @@ def load_and_run(
task_details = TaskDetails(json.load(f))
task_result = run_command(
- swarming_server, task_details, work_dir, cost_usd_hour, start,
- min_free_space)
+ swarming_server, task_details, work_dir, auth_headers_file,
+ cost_usd_hour, start, min_free_space)
except MustExit as e:
# This normally means run_command() didn't get the chance to run, as it
# itself trap MustExit and will report accordingly. In this case, we want
@@ -206,11 +207,14 @@ def load_and_run(
json.dump(task_result, f)
-def post_update(swarming_server, params, exit_code, stdout, output_chunk_start):
+def post_update(
+ swarming_server, auth_headers_file, params, exit_code,
+ stdout, output_chunk_start):
"""Posts task update to task_update.
Arguments:
swarming_server: Base URL to Swarming server.
+ auth_headers_file: file to read HTTP authentication headers from.
params: Default JSON parameters for the POST.
exit_code: Process exit code, only when a command completed.
stdout: Incremental output since last call, if any.
@@ -225,11 +229,14 @@ def post_update(swarming_server, params, exit_code, stdout, output_chunk_start):
# chunks are processed and saved in the DB in order.
params['output'] = base64.b64encode(stdout)
params['output_chunk_start'] = output_chunk_start
+ headers = read_auth_headers(auth_headers_file) if auth_headers_file else {}
M-A Ruel 2016/06/03 20:00:49 I think it'd prefer to have it update the token in
Vadim Sh. 2016/06/03 23:37:55 Done. I made it symmetrical to FileRefresherThread
# TODO(maruel): Support early cancellation.
# https://code.google.com/p/swarming/issues/detail?id=62
resp = net.url_read_json(
swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
- data=params)
+ data=params,
+ headers=headers,
+ follow_redirects=False)
logging.debug('post_update() = %s', resp)
if not resp or resp.get('error'):
# Abandon it. This will force a process exit.
@@ -249,6 +256,35 @@ def should_post_update(stdout, now, last_packet):
return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval
+def read_auth_headers(path):
+ """Reads authentication headers from the given file.
+
+ The file is kept up-to-date by the main bot process (see AuthHeadersDumper in
+ bot_main.py).
+
+ Retries a bunch of times on errors to workaround Windows file locking issues.
+ If it fails to read the file even after a bunch of retries, raises ValueError
+ that eventually aborts the task (since we can't run it without
+ authentication).
+ """
+ attempts = 100
+ while True:
+ try:
+ with open(path, 'rb') as f:
+ headers = json.load(f)
+ if not isinstance(headers, dict):
+ raise ValueError('Expecting dict, got %r' % (headers,))
+ # The headers are ASCII for sure, so don't bother with picking the
+ # correct unicode encoding, default would work.
+ return {str(k): str(v) for k, v in headers.iteritems()}
+ except (OSError, IOError, ValueError) as e:
+ last_error = 'Failed to read auth headers from %s: %s' % (path, e)
+ attempts -= 1
+ if not attempts:
+ raise ValueError(last_error)
+ time.sleep(0.05)
+
+
def calc_yield_wait(task_details, start, last_io, timed_out, stdout):
"""Calculates the maximum number of seconds to wait in yield_any()."""
now = monotonic_time()
@@ -282,8 +318,8 @@ def kill_and_wait(proc, grace_period, reason):
def run_command(
- swarming_server, task_details, work_dir, cost_usd_hour, task_start,
- min_free_space):
+ swarming_server, task_details, work_dir, auth_headers_file, cost_usd_hour,
+ task_start, min_free_space):
"""Runs a command and sends packets to the server to stream results back.
Implements both I/O and hard timeouts. Sends the packets numbered, so the
@@ -300,7 +336,7 @@ def run_command(
'id': task_details.bot_id,
'task_id': task_details.task_id,
}
- post_update(swarming_server, params, None, '', 0)
+ post_update(swarming_server, auth_headers_file, params, None, '', 0)
isolated_result = os.path.join(work_dir, 'isolated_result.json')
cmd = get_isolated_cmd(
@@ -342,7 +378,7 @@ def run_command(
params['duration'] = now - start
params['io_timeout'] = False
params['hard_timeout'] = False
- post_update(swarming_server, params, 1, stdout, 0)
+ post_update(swarming_server, auth_headers_file, params, 1, stdout, 0)
return {
u'exit_code': 1,
u'hard_timeout': False,
@@ -374,7 +410,9 @@ def run_command(
last_packet = monotonic_time()
params['cost_usd'] = (
cost_usd_hour * (last_packet - task_start) / 60. / 60.)
- post_update(swarming_server, params, None, stdout, output_chunk_start)
+ post_update(
+ swarming_server, auth_headers_file, params, None,
+ stdout, output_chunk_start)
output_chunk_start += len(stdout)
stdout = ''
@@ -480,7 +518,9 @@ def run_command(
if exit_code is None:
exit_code = -1
params['hard_timeout'] = had_hard_timeout
- post_update(swarming_server, params, exit_code, stdout, output_chunk_start)
+ post_update(
+ swarming_server, auth_headers_file, params, exit_code,
+ stdout, output_chunk_start)
return {
u'exit_code': exit_code,
u'hard_timeout': had_hard_timeout,
@@ -504,6 +544,9 @@ def main(args):
parser.add_option(
'--swarming-server', help='Swarming server to send data back')
parser.add_option(
+ '--auth-headers-file',
+ help='Name of the file to read authentication headers from')
+ parser.add_option(
'--cost-usd-hour', type='float', help='Cost of this VM in $/h')
parser.add_option('--start', type='float', help='Time this task was started')
parser.add_option(
@@ -523,8 +566,9 @@ def main(args):
try:
load_and_run(
- options.in_file, options.swarming_server, options.cost_usd_hour,
- options.start, options.out_file, options.min_free_space)
+ options.in_file, options.swarming_server, options.auth_headers_file,
+ options.cost_usd_hour, options.start, options.out_file,
+ options.min_free_space)
return 0
finally:
logging.info('quitting')

Powered by Google App Engine
This is Rietveld 408576698