Chromium Code Reviews| Index: appengine/swarming/swarming_bot/bot_code/task_runner.py |
| diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| index 40f4208bd5df969cb1a6cc66ff9708528a5928c5..0d21d0b26401223596fff7c24686bc0e0bc6f90c 100644 |
| --- a/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| +++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| @@ -157,7 +157,8 @@ class MustExit(Exception): |
| def load_and_run( |
| - in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space): |
| + in_file, swarming_server, auth_headers_file, cost_usd_hour, start, |
| + out_file, min_free_space): |
| """Loads the task's metadata and execute it. |
| This may throw all sorts of exceptions in case of failure. It's up to the |
| @@ -182,8 +183,8 @@ def load_and_run( |
| task_details = TaskDetails(json.load(f)) |
| task_result = run_command( |
| - swarming_server, task_details, work_dir, cost_usd_hour, start, |
| - min_free_space) |
| + swarming_server, task_details, work_dir, auth_headers_file, |
| + cost_usd_hour, start, min_free_space) |
| except MustExit as e: |
| # This normally means run_command() didn't get the chance to run, as it |
| # itself trap MustExit and will report accordingly. In this case, we want |
| @@ -206,11 +207,14 @@ def load_and_run( |
| json.dump(task_result, f) |
| -def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): |
| +def post_update( |
| + swarming_server, auth_headers_file, params, exit_code, |
| + stdout, output_chunk_start): |
| """Posts task update to task_update. |
| Arguments: |
| swarming_server: Base URL to Swarming server. |
| + auth_headers_file: file to read HTTP authentication headers from. |
| params: Default JSON parameters for the POST. |
| exit_code: Process exit code, only when a command completed. |
| stdout: Incremental output since last call, if any. |
| @@ -225,11 +229,14 @@ def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): |
| # chunks are processed and saved in the DB in order. |
| params['output'] = base64.b64encode(stdout) |
| params['output_chunk_start'] = output_chunk_start |
| + headers = read_auth_headers(auth_headers_file) if auth_headers_file else {} |
|
M-A Ruel
2016/06/03 20:00:49
I think it'd prefer to have it update the token in
Vadim Sh.
2016/06/03 23:37:55
Done. I made it symmetrical to FileRefresherThread
|
| # TODO(maruel): Support early cancellation. |
| # https://code.google.com/p/swarming/issues/detail?id=62 |
| resp = net.url_read_json( |
| swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], |
| - data=params) |
| + data=params, |
| + headers=headers, |
| + follow_redirects=False) |
| logging.debug('post_update() = %s', resp) |
| if not resp or resp.get('error'): |
| # Abandon it. This will force a process exit. |
| @@ -249,6 +256,35 @@ def should_post_update(stdout, now, last_packet): |
| return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval |
| +def read_auth_headers(path): |
| + """Reads authentication headers from the given file. |
| + |
| + The file is kept up-to-date by the main bot process (see AuthHeadersDumper in |
| + bot_main.py). |
| + |
| + Retries a bunch of times on errors to workaround Windows file locking issues. |
| + If it fails to read the file even after a bunch of retries, raises ValueError |
| + that eventually aborts the task (since we can't run it without |
| + authentication). |
| + """ |
| + attempts = 100 |
| + while True: |
| + try: |
| + with open(path, 'rb') as f: |
| + headers = json.load(f) |
| + if not isinstance(headers, dict): |
| + raise ValueError('Expecting dict, got %r' % (headers,)) |
| + # The headers are ASCII for sure, so don't bother with picking the |
| + # correct unicode encoding, default would work. |
| + return {str(k): str(v) for k, v in headers.iteritems()} |
| + except (OSError, IOError, ValueError) as e: |
| + last_error = 'Failed to read auth headers from %s: %s' % (path, e) |
| + attempts -= 1 |
| + if not attempts: |
| + raise ValueError(last_error) |
| + time.sleep(0.05) |
| + |
| + |
| def calc_yield_wait(task_details, start, last_io, timed_out, stdout): |
| """Calculates the maximum number of seconds to wait in yield_any().""" |
| now = monotonic_time() |
| @@ -282,8 +318,8 @@ def kill_and_wait(proc, grace_period, reason): |
| def run_command( |
| - swarming_server, task_details, work_dir, cost_usd_hour, task_start, |
| - min_free_space): |
| + swarming_server, task_details, work_dir, auth_headers_file, cost_usd_hour, |
| + task_start, min_free_space): |
| """Runs a command and sends packets to the server to stream results back. |
| Implements both I/O and hard timeouts. Sends the packets numbered, so the |
| @@ -300,7 +336,7 @@ def run_command( |
| 'id': task_details.bot_id, |
| 'task_id': task_details.task_id, |
| } |
| - post_update(swarming_server, params, None, '', 0) |
| + post_update(swarming_server, auth_headers_file, params, None, '', 0) |
| isolated_result = os.path.join(work_dir, 'isolated_result.json') |
| cmd = get_isolated_cmd( |
| @@ -342,7 +378,7 @@ def run_command( |
| params['duration'] = now - start |
| params['io_timeout'] = False |
| params['hard_timeout'] = False |
| - post_update(swarming_server, params, 1, stdout, 0) |
| + post_update(swarming_server, auth_headers_file, params, 1, stdout, 0) |
| return { |
| u'exit_code': 1, |
| u'hard_timeout': False, |
| @@ -374,7 +410,9 @@ def run_command( |
| last_packet = monotonic_time() |
| params['cost_usd'] = ( |
| cost_usd_hour * (last_packet - task_start) / 60. / 60.) |
| - post_update(swarming_server, params, None, stdout, output_chunk_start) |
| + post_update( |
| + swarming_server, auth_headers_file, params, None, |
| + stdout, output_chunk_start) |
| output_chunk_start += len(stdout) |
| stdout = '' |
| @@ -480,7 +518,9 @@ def run_command( |
| if exit_code is None: |
| exit_code = -1 |
| params['hard_timeout'] = had_hard_timeout |
| - post_update(swarming_server, params, exit_code, stdout, output_chunk_start) |
| + post_update( |
| + swarming_server, auth_headers_file, params, exit_code, |
| + stdout, output_chunk_start) |
| return { |
| u'exit_code': exit_code, |
| u'hard_timeout': had_hard_timeout, |
| @@ -504,6 +544,9 @@ def main(args): |
| parser.add_option( |
| '--swarming-server', help='Swarming server to send data back') |
| parser.add_option( |
| + '--auth-headers-file', |
| + help='Name of the file to read authentication headers from') |
| + parser.add_option( |
| '--cost-usd-hour', type='float', help='Cost of this VM in $/h') |
| parser.add_option('--start', type='float', help='Time this task was started') |
| parser.add_option( |
| @@ -523,8 +566,9 @@ def main(args): |
| try: |
| load_and_run( |
| - options.in_file, options.swarming_server, options.cost_usd_hour, |
| - options.start, options.out_file, options.min_free_space) |
| + options.in_file, options.swarming_server, options.auth_headers_file, |
| + options.cost_usd_hour, options.start, options.out_file, |
| + options.min_free_space) |
| return 0 |
| finally: |
| logging.info('quitting') |