Chromium Code Reviews| Index: appengine/swarming/swarming_bot/bot_code/task_runner.py |
| diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| index c780731abc5055ce0760a9f158d03317d3a4b6cf..4be7276420ce293a45ec47e379211afde5ad5b95 100644 |
| --- a/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| +++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py |
| @@ -31,7 +31,6 @@ from utils import subprocess42 |
| from utils import zip_package |
| import bot_auth |
| -import file_reader |
| # Path to this file or the zip containing this file. |
| @@ -152,10 +151,9 @@ def get_isolated_cmd( |
| class TaskDetails(object): |
| def __init__(self, data): |
| - """Loads the raw data from a manifest file specified by --in-file.""" |
| logging.info('TaskDetails(%s)', data) |
| if not isinstance(data, dict): |
| - raise ValueError('Expected dict, got %r' % data) |
| + raise InternalError('Expected dict in task_runner_in.json, got %r' % data) |
| # Get all the data first so it fails early if the task details is invalid. |
| self.bot_id = data['bot_id'] |
| @@ -177,61 +175,101 @@ class TaskDetails(object): |
| self.io_timeout = data['io_timeout'] |
| self.task_id = data['task_id'] |
| + @staticmethod |
| + def load(path): |
| + """Loads the TaskDetails from a file on disk (specified via --in-file). |
| -class MustExit(Exception): |
| - """Raised on signal that the process must exit immediately.""" |
| + Raises InternalError if the file can't be read or parsed. |
| + """ |
| + try: |
| + with open(path, 'rb') as f: |
| + return TaskDetails(json.load(f)) |
| + except (IOError, ValueError) as e: |
| + raise InternalError('Cannot load task_runner_in.json: %s' % e) |
| + |
| + |
| +class ExitSignal(Exception): |
| + """Raised on a signal that the process must exit immediately.""" |
| def __init__(self, sig): |
| - super(MustExit, self).__init__() |
| + super(ExitSignal, self).__init__(u'task_runner received signal %s' % sig) |
| self.signal = sig |
| +class InternalError(Exception): |
| + """Raised on unrecoverable errors that abort task with 'internal error'.""" |
| + |
| + |
| def load_and_run( |
| in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space, |
| bot_file, auth_params_file): |
| - """Loads the task's metadata and execute it. |
| + """Loads the task's metadata, prepares auth environment and executes the task. |
| This may throw all sorts of exceptions in case of failure. It's up to the |
| caller to trap them. These shall be considered 'internal_failure' instead of |
| 'failure' from a TaskRunResult standpoint. |
| """ |
| - # The work directory is guaranteed to exist since it was created by |
| - # bot_main.py and contains the manifest. Temporary files will be downloaded |
| - # there. It's bot_main.py that will delete the directory afterward. Tests are |
| - # not run from there. |
| + auth_system = None |
| task_result = None |
| + work_dir = os.path.dirname(out_file) |
| + |
| def handler(sig, _): |
| logging.info('Got signal %s', sig) |
| - raise MustExit(sig) |
| - work_dir = os.path.dirname(out_file) |
| + raise ExitSignal(sig) |
| + |
| try: |
| with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): |
| + # The work directory is guaranteed to exist since it was created by |
|
Vadim Sh.
2016/08/31 03:53:33
moved comment to closer where it is relevant
|
| + # bot_main.py and contains the manifest. Temporary files will be |
| + # downloaded there. It's bot_main.py that will delete the directory |
| + # afterward. Tests are not run from there. |
| if not os.path.isdir(work_dir): |
| - raise ValueError('%s expected to exist' % work_dir) |
| - |
| - with open(in_file, 'rb') as f: |
| - task_details = TaskDetails(json.load(f)) |
| - |
| + raise InternalError('%s expected to exist' % work_dir) |
| + |
| + # Raises InternalError on errors. |
| + task_details = TaskDetails.load(in_file) |
| + |
| + # This will start a thread that occasionally reads bot authentication |
| + # headers from 'auth_params_file'. |
| + if auth_params_file: |
| + try: |
| + auth_system = bot_auth.AuthSystem() |
| + auth_system.start(auth_params_file) |
| + except bot_auth.AuthSystemError as e: |
| + raise InternalError('Failed to init auth: %s' % e) |
| + |
| + # Returns bot authentication headers dict or raises InternalError. |
| + def headers_cb(): |
| + try: |
| + return auth_system.bot_headers if auth_system else {} |
| + except bot_auth.AuthSystemError as e: |
| + raise InternalError('Failed to grab bot auth headers: %s' % e) |
| + |
| + # Auth environment is up, start the command. task_result is dumped to |
| + # disk in 'finally' block. |
| task_result = run_command( |
| swarming_server, task_details, work_dir, |
| - cost_usd_hour, start, min_free_space, bot_file, auth_params_file) |
| - except MustExit as e: |
| + cost_usd_hour, start, min_free_space, bot_file, headers_cb) |
| + |
| + except (ExitSignal, InternalError) as e: |
|
Vadim Sh.
2016/08/31 03:53:33
InternalError here is new. I believe it's more cle
|
| # This normally means run_command() didn't get the chance to run, as it |
| - # itself trap MustExit and will report accordingly. In this case, we want |
| + # itself traps exceptions and will report accordingly. In this case, we want |
| # the parent process to send the message instead. |
| if not task_result: |
| task_result = { |
| - u'exit_code': None, |
| + u'exit_code': -1, |
| u'hard_timeout': False, |
| u'io_timeout': False, |
| - u'must_signal_internal_failure': |
| - u'task_runner received signal %s' % e.signal, |
| + u'must_signal_internal_failure': e.message or 'unknown error', |
|
M-A Ruel
2016/08/31 15:03:47
str(e.message) if e.message else 'unknown error'
Vadim Sh.
2016/09/01 00:15:56
Done.
|
| u'version': OUT_VERSION, |
| } |
| + |
| finally: |
| # We've found tests to delete the working directory work_dir when quitting, |
| # causing an exception here. Try to recreate the directory if necessary. |
| if not os.path.isdir(work_dir): |
| os.mkdir(work_dir) |
| + if auth_system: |
|
M-A Ruel
2016/08/31 15:03:47
One thing that could help is to use contextlib. co
Vadim Sh.
2016/09/01 00:15:56
I've considered this. AuthSystem isn't supposed to
|
| + auth_system.stop() |
| with open(out_file, 'wb') as f: |
| json.dump(task_result, f) |
| @@ -252,6 +290,10 @@ def post_update( |
| Returns: |
| False if the task should stop. |
| + |
| + Raises: |
| + InternalError if can't contact the server after many attempts or the server |
| + replies with an error. |
| """ |
| params = params.copy() |
| if exit_code is not None: |
| @@ -271,7 +313,8 @@ def post_update( |
| logging.debug('post_update() = %s', resp) |
| if not resp or resp.get('error'): |
| # Abandon it. This will force a process exit. |
| - raise ValueError(resp.get('error') if resp else 'Failed to contact server') |
| + raise InternalError( |
| + resp.get('error') if resp else 'Failed to contact server') |
| return not resp.get('must_stop', False) |
| @@ -316,62 +359,27 @@ def kill_and_wait(proc, grace_period, reason): |
| logging.warning('SIGKILL finally due to %s', reason) |
| proc.kill() |
| exit_code = proc.wait() |
| - logging.info('Waiting for proces exit in finally - done') |
| + logging.info('Waiting for process exit in finally - done') |
| return exit_code |
| -def start_reading_headers(auth_params_file): |
| - """Spawns a thread that rereads headers from --auth-params-file path. |
| - |
| - Returns: |
| - Tuple (callback that returns the last known headers, stop callback). |
| - |
| - Raises: |
| - file_reader.FatalReadError if headers file can't be read. |
| - ValueError if it can be read, but its body is invalid. |
| - """ |
| - # Read headers more often than bot_main writes them, to reduce maximum |
| - # possible latency between headers are updated and read. |
| - reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30) |
| - |
| - def read_and_validate_headers(): |
| - val = bot_auth.process_auth_params_json(reader.last_value or {}) |
| - return val.swarming_http_headers |
| - |
| - reader.start() |
| - read_and_validate_headers() # initial validation, may raise ValueError |
| - return read_and_validate_headers, reader.stop |
| - |
| - |
| def run_command( |
| swarming_server, task_details, work_dir, cost_usd_hour, |
| - task_start, min_free_space, bot_file, auth_params_file): |
| + task_start, min_free_space, bot_file, headers_cb): |
| """Runs a command and sends packets to the server to stream results back. |
| Implements both I/O and hard timeouts. Sends the packets numbered, so the |
| server can ensure they are processed in order. |
| Returns: |
| - Metadata about the command. |
| + Metadata dict with the execution result. |
| + |
| + Raises: |
| + ExitSignal if caught some signal when starting or stopping. |
| + InternalError on unexpected internal errors. |
| """ |
| # TODO(maruel): This function is incomprehensible, split and refactor. |
| - # Grab initial auth headers and start rereading them in parallel thread. They |
| - # MUST be there already. It's fatal internal error if they are not. |
| - headers_cb = lambda: {} |
| - stop_headers_reader = lambda: None |
| - if auth_params_file: |
| - try: |
| - headers_cb, stop_headers_reader = start_reading_headers(auth_params_file) |
| - except (ValueError, file_reader.FatalReadError) as e: |
| - return { |
| - u'exit_code': 1, |
| - u'hard_timeout': False, |
| - u'io_timeout': False, |
| - u'must_signal_internal_failure': str(e), |
| - u'version': OUT_VERSION, |
| - } |
| - |
| # Signal the command is about to be started. |
| last_packet = start = now = monotonic_time() |
| params = { |
| @@ -466,7 +474,7 @@ def run_command( |
| if not post_update( |
| swarming_server, headers_cb(), params, None, stdout, |
| output_chunk_start): |
| - # Server is telling us to stop. Normally task cancelation. |
| + # Server is telling us to stop. Normally task cancellation. |
|
M-A Ruel
2016/08/31 15:03:47
"Rarely used outside the U.S. According to graphs
Vadim Sh.
2016/09/01 00:15:56
Oh. My spellchecker is British then or something :
|
| if not kill_sent: |
| logging.warning('Server induced stop; sending SIGKILL') |
| proc.kill() |
| @@ -501,19 +509,12 @@ def run_command( |
| task_details.grace_period, now - timed_out) |
| proc.kill() |
| kill_sent = True |
| - logging.info('Waiting for proces exit') |
| + logging.info('Waiting for process exit') |
| exit_code = proc.wait() |
| - except MustExit as e: |
| - # TODO(maruel): Do the send SIGTERM to child process and give it |
| - # task_details.grace_period to terminate. |
| - must_signal_internal_failure = ( |
| - u'task_runner received signal %s' % e.signal) |
| - exit_code = kill_and_wait( |
| - proc, task_details.grace_period, 'signal %d' % e.signal) |
| - except (IOError, OSError): |
| + except (ExitSignal, InternalError, IOError, OSError) as e: |
|
Vadim Sh.
2016/08/31 03:53:33
same here...
Was there a reason IOError and OSEr
M-A Ruel
2016/08/31 15:03:47
- If the file is missing, subprocess.Popen(['missi
Vadim Sh.
2016/09/01 00:15:56
I believe this is caught sooner (on line 434) when
|
| # Something wrong happened, try to kill the child process. |
| - exit_code = kill_and_wait( |
| - proc, task_details.grace_period, 'exception %s' % e) |
|
Vadim Sh.
2016/08/31 03:53:32
'e' here was undefined :)
M-A Ruel
2016/08/31 15:03:47
I tried to be defensive but sucked at it.
|
| + must_signal_internal_failure = e.message or 'unknown error' |
|
M-A Ruel
2016/08/31 15:03:47
str(e.message) if e.message else 'unknown error'
Vadim Sh.
2016/09/01 00:15:56
Done.
|
| + exit_code = kill_and_wait(proc, task_details.grace_period, e.message) |
| # This is the very last packet for this command. It if was an isolated task, |
| # include the output reference to the archived .isolated file. |
| @@ -586,15 +587,24 @@ def run_command( |
| if not must_signal_internal_failure: |
| must_signal_internal_failure = '%s\n%s' % ( |
| e, traceback.format_exc()[-2048:]) |
| + |
| # TODO(maruel): Send the internal failure here instead of sending it through |
| # bot_main, this causes a race condition. |
| if exit_code is None: |
| exit_code = -1 |
| params['hard_timeout'] = had_hard_timeout |
| - # Ignore server reply to stop. |
| - post_update( |
| - swarming_server, headers_cb(), params, exit_code, |
| - stdout, output_chunk_start) |
| + |
| + # Ignore server reply to stop. Also ignore internal errors here if we are |
| + # already handling some. |
| + try: |
| + post_update( |
| + swarming_server, headers_cb(), params, exit_code, |
| + stdout, output_chunk_start) |
| + except InternalError as e: |
| + logging.error('Internal error while finishing the task: %s', e) |
| + if not must_signal_internal_failure: |
| + must_signal_internal_failure = e.message or 'unknown error' |
|
M-A Ruel
2016/08/31 15:03:47
same
Vadim Sh.
2016/09/01 00:15:56
Done.
|
| + |
| return { |
| u'exit_code': exit_code, |
| u'hard_timeout': had_hard_timeout, |
| @@ -604,7 +614,6 @@ def run_command( |
| } |
| finally: |
| file_path.try_remove(unicode(isolated_result)) |
| - stop_headers_reader() |
| def main(args): |