Chromium Code Reviews| Index: appengine/swarming/swarming_bot/bot_code/task_runner.py |
| diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| index 40f4208bd5df969cb1a6cc66ff9708528a5928c5..45d327d1d6ab0217b1e0dbf806b7e5d938d8bf4d 100644 |
| --- a/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| +++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| @@ -28,6 +28,8 @@ from utils import on_error |
| from utils import subprocess42 |
| from utils import zip_package |
| +import file_reader |
| + |
| # Path to this file or the zip containing this file. |
| THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) |
| @@ -157,7 +159,8 @@ class MustExit(Exception): |
| def load_and_run( |
| - in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space): |
| + in_file, swarming_server, auth_headers_file, cost_usd_hour, start, |
| + out_file, min_free_space): |
| """Loads the task's metadata and execute it. |
| This may throw all sorts of exceptions in case of failure. It's up to the |
| @@ -182,8 +185,8 @@ def load_and_run( |
| task_details = TaskDetails(json.load(f)) |
| task_result = run_command( |
| - swarming_server, task_details, work_dir, cost_usd_hour, start, |
| - min_free_space) |
| + swarming_server, task_details, work_dir, auth_headers_file, |
| + cost_usd_hour, start, min_free_space) |
| except MustExit as e: |
| # This normally means run_command() didn't get the chance to run, as it |
| # itself trap MustExit and will report accordingly. In this case, we want |
| @@ -206,11 +209,14 @@ def load_and_run( |
| json.dump(task_result, f) |
| -def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): |
| +def post_update( |
| + swarming_server, auth_headers, params, exit_code, |
| + stdout, output_chunk_start): |
| """Posts task update to task_update. |
| Arguments: |
| swarming_server: Base URL to Swarming server. |
| + auth_headers: dict with HTTP authentication headers. |
| params: Default JSON parameters for the POST. |
| exit_code: Process exit code, only when a command completed. |
| stdout: Incremental output since last call, if any. |
| @@ -229,7 +235,9 @@ def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): |
| # https://code.google.com/p/swarming/issues/detail?id=62 |
| resp = net.url_read_json( |
| swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], |
| - data=params) |
| + data=params, |
| + headers=auth_headers, |
| + follow_redirects=False) |
| logging.debug('post_update() = %s', resp) |
| if not resp or resp.get('error'): |
| # Abandon it. This will force a process exit. |
| @@ -249,6 +257,35 @@ def should_post_update(stdout, now, last_packet): |
| return len(stdout) >= MAX_CHUNK_SIZE or (now - last_packet) > packet_interval |
| +def read_auth_headers(path): |
|
M-A Ruel
2016/06/06 21:18:36
dead code?
Vadim Sh.
2016/06/06 22:17:09
oops, removed
|
| + """Reads authentication headers from the given file. |
| + |
| + The file is kept up-to-date by the main bot process (see AuthHeadersDumper in |
| + bot_main.py). |
| + |
| + Retries a bunch of times on errors to workaround Windows file locking issues. |
| + If it fails to read the file even after a bunch of retries, raises ValueError |
| + that eventually aborts the task (since we can't run it without |
| + authentication). |
| + """ |
| + attempts = 100 |
| + while True: |
| + try: |
| + with open(path, 'rb') as f: |
| + headers = json.load(f) |
| + if not isinstance(headers, dict): |
| + raise ValueError('Expecting dict, got %r' % (headers,)) |
| + # The headers are ASCII for sure, so don't bother with picking the |
| + # correct unicode encoding, default would work. |
| + return {str(k): str(v) for k, v in headers.iteritems()} |
| + except (OSError, IOError, ValueError) as e: |
| + last_error = 'Failed to read auth headers from %s: %s' % (path, e) |
| + attempts -= 1 |
| + if not attempts: |
| + raise ValueError(last_error) |
| + time.sleep(0.05) |
| + |
| + |
| def calc_yield_wait(task_details, start, last_io, timed_out, stdout): |
| """Calculates the maximum number of seconds to wait in yield_any().""" |
| now = monotonic_time() |
| @@ -281,9 +318,37 @@ def kill_and_wait(proc, grace_period, reason): |
| return exit_code |
| +def start_reading_headers(auth_headers_file): |
| + """Spawns a thread that reread headers file. |
| + |
| + Returns: |
| + Tuple (callback that returns the last known headers, stop callback). |
| + |
| + Raises: |
| + file_reader.FatalReadError if headers file can't be read. |
| + ValueError if it can be read, but its body is invalid. |
| + """ |
| + # Read headers more often than bot_main writes them, to reduce maximum |
| + # possible latency between headers are updated and read. |
| + headers_reader = file_reader.FileReaderThread( |
| + auth_headers_file, interval_sec=30) |
| + |
| + def read_and_validate_headers(): |
| + val = headers_reader.last_value or {} |
| + if not isinstance(val, dict): |
| + raise ValueError('Expecting dict with headers, got %r' % (val,)) |
| + # The headers must be ASCII for sure, so don't bother with picking the |
| + # correct unicode encoding, default would work. |
| + return {str(k): str(v) for k, v in val.iteritems()} |
| + |
| + headers_reader.start() |
| + read_and_validate_headers() # initial validation, may raise ValueError |
| + return read_and_validate_headers, headers_reader.stop |
| + |
| + |
| def run_command( |
| - swarming_server, task_details, work_dir, cost_usd_hour, task_start, |
| - min_free_space): |
| + swarming_server, task_details, work_dir, auth_headers_file, cost_usd_hour, |
| + task_start, min_free_space): |
| """Runs a command and sends packets to the server to stream results back. |
| Implements both I/O and hard timeouts. Sends the packets numbered, so the |
| @@ -293,6 +358,23 @@ def run_command( |
| Metadata about the command. |
| """ |
| # TODO(maruel): This function is incomprehensible, split and refactor. |
| + |
| + # Grab initial auth headers and start rereading them in parallel thread. They |
| + # MUST be there already. It's fatal internal error if they are not. |
| + headers_cb = lambda: {} |
| + stop_headers_reader = lambda: None |
| + if auth_headers_file: |
| + try: |
| + headers_cb, stop_headers_reader = start_reading_headers(auth_headers_file) |
| + except (ValueError, file_reader.FatalReadError) as e: |
| + return { |
| + u'exit_code': 1, |
| + u'hard_timeout': False, |
| + u'io_timeout': False, |
| + u'must_signal_internal_failure': str(e), |
| + u'version': OUT_VERSION, |
| + } |
| + |
| # Signal the command is about to be started. |
| last_packet = start = now = monotonic_time() |
| params = { |
| @@ -300,7 +382,7 @@ def run_command( |
| 'id': task_details.bot_id, |
| 'task_id': task_details.task_id, |
| } |
| - post_update(swarming_server, params, None, '', 0) |
| + post_update(swarming_server, headers_cb(), params, None, '', 0) |
| isolated_result = os.path.join(work_dir, 'isolated_result.json') |
| cmd = get_isolated_cmd( |
| @@ -342,7 +424,7 @@ def run_command( |
| params['duration'] = now - start |
| params['io_timeout'] = False |
| params['hard_timeout'] = False |
| - post_update(swarming_server, params, 1, stdout, 0) |
| + post_update(swarming_server, headers_cb(), params, 1, stdout, 0) |
| return { |
| u'exit_code': 1, |
| u'hard_timeout': False, |
| @@ -374,7 +456,9 @@ def run_command( |
| last_packet = monotonic_time() |
| params['cost_usd'] = ( |
| cost_usd_hour * (last_packet - task_start) / 60. / 60.) |
| - post_update(swarming_server, params, None, stdout, output_chunk_start) |
| + post_update( |
| + swarming_server, headers_cb(), params, None, |
| + stdout, output_chunk_start) |
| output_chunk_start += len(stdout) |
| stdout = '' |
| @@ -480,7 +564,9 @@ def run_command( |
| if exit_code is None: |
| exit_code = -1 |
| params['hard_timeout'] = had_hard_timeout |
| - post_update(swarming_server, params, exit_code, stdout, output_chunk_start) |
| + post_update( |
| + swarming_server, headers_cb(), params, exit_code, |
| + stdout, output_chunk_start) |
| return { |
| u'exit_code': exit_code, |
| u'hard_timeout': had_hard_timeout, |
| @@ -493,6 +579,7 @@ def run_command( |
| os.remove(isolated_result) |
| except OSError: |
| pass |
| + stop_headers_reader() |
| def main(args): |
| @@ -504,6 +591,9 @@ def main(args): |
| parser.add_option( |
| '--swarming-server', help='Swarming server to send data back') |
| parser.add_option( |
| + '--auth-headers-file', |
| + help='Name of the file to read authentication headers from') |
| + parser.add_option( |
| '--cost-usd-hour', type='float', help='Cost of this VM in $/h') |
| parser.add_option('--start', type='float', help='Time this task was started') |
| parser.add_option( |
| @@ -523,8 +613,9 @@ def main(args): |
| try: |
| load_and_run( |
| - options.in_file, options.swarming_server, options.cost_usd_hour, |
| - options.start, options.out_file, options.min_free_space) |
| + options.in_file, options.swarming_server, options.auth_headers_file, |
| + options.cost_usd_hour, options.start, options.out_file, |
| + options.min_free_space) |
| return 0 |
| finally: |
| logging.info('quitting') |