Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3017)

Unified Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2300543002: Minor refactoring in task_runner.py in preparation for adding more code. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Minor refactoring in task_runner.py in preparation for adding more code. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/swarming/swarming_bot/bot_code/task_runner.py
diff --git a/appengine/swarming/swarming_bot/bot_code/task_runner.py b/appengine/swarming/swarming_bot/bot_code/task_runner.py
index c780731abc5055ce0760a9f158d03317d3a4b6cf..b012ed3b140b20f32602be477dc30b3520c97183 100644
--- a/appengine/swarming/swarming_bot/bot_code/task_runner.py
+++ b/appengine/swarming/swarming_bot/bot_code/task_runner.py
@@ -31,7 +31,6 @@ from utils import subprocess42
from utils import zip_package
import bot_auth
-import file_reader
# Path to this file or the zip containing this file.
@@ -152,10 +151,9 @@ def get_isolated_cmd(
class TaskDetails(object):
def __init__(self, data):
- """Loads the raw data from a manifest file specified by --in-file."""
logging.info('TaskDetails(%s)', data)
if not isinstance(data, dict):
- raise ValueError('Expected dict, got %r' % data)
+ raise InternalError('Expected dict in task_runner_in.json, got %r' % data)
# Get all the data first so it fails early if the task details is invalid.
self.bot_id = data['bot_id']
@@ -177,61 +175,101 @@ class TaskDetails(object):
self.io_timeout = data['io_timeout']
self.task_id = data['task_id']
+ @staticmethod
+ def load(path):
+ """Loads the TaskDetails from a file on disk (specified via --in-file).
-class MustExit(Exception):
- """Raised on signal that the process must exit immediately."""
+ Raises InternalError if the file can't be read or parsed.
+ """
+ try:
+ with open(path, 'rb') as f:
+ return TaskDetails(json.load(f))
+ except (IOError, ValueError) as e:
+ raise InternalError('Cannot load task_runner_in.json: %s' % e)
+
+
+class ExitSignal(Exception):
+ """Raised on a signal that the process must exit immediately."""
def __init__(self, sig):
- super(MustExit, self).__init__()
+ super(ExitSignal, self).__init__(u'task_runner received signal %s' % sig)
self.signal = sig
+class InternalError(Exception):
+ """Raised on unrecoverable errors that abort task with 'internal error'."""
+
+
def load_and_run(
in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space,
bot_file, auth_params_file):
- """Loads the task's metadata and execute it.
+ """Loads the task's metadata, prepares auth environment and executes the task.
This may throw all sorts of exceptions in case of failure. It's up to the
caller to trap them. These shall be considered 'internal_failure' instead of
'failure' from a TaskRunResult standpoint.
"""
- # The work directory is guaranteed to exist since it was created by
- # bot_main.py and contains the manifest. Temporary files will be downloaded
- # there. It's bot_main.py that will delete the directory afterward. Tests are
- # not run from there.
+ auth_system = None
task_result = None
+ work_dir = os.path.dirname(out_file)
+
def handler(sig, _):
logging.info('Got signal %s', sig)
- raise MustExit(sig)
- work_dir = os.path.dirname(out_file)
+ raise ExitSignal(sig)
+
try:
with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler):
+ # The work directory is guaranteed to exist since it was created by
+ # bot_main.py and contains the manifest. Temporary files will be
+ # downloaded there. It's bot_main.py that will delete the directory
+ # afterward. Tests are not run from there.
if not os.path.isdir(work_dir):
- raise ValueError('%s expected to exist' % work_dir)
-
- with open(in_file, 'rb') as f:
- task_details = TaskDetails(json.load(f))
-
+ raise InternalError('%s expected to exist' % work_dir)
+
+ # Raises InternalError on errors.
+ task_details = TaskDetails.load(in_file)
+
+ # This will start a thread that occasionally reads bot authentication
+ # headers from 'auth_params_file'.
+ if auth_params_file:
+ try:
+ auth_system = bot_auth.AuthSystem()
+ auth_system.start(auth_params_file)
+ except bot_auth.AuthSystemError as e:
+ raise InternalError('Failed to init auth: %s' % e)
+
+ # Returns bot authentication headers dict or raises InternalError.
+ def headers_cb():
+ try:
+ return auth_system.bot_headers if auth_system else {}
+ except bot_auth.AuthSystemError as e:
+ raise InternalError('Failed to grab bot auth headers: %s' % e)
+
+ # Auth environment is up, start the command. task_result is dumped to
+ # disk in 'finally' block.
task_result = run_command(
swarming_server, task_details, work_dir,
- cost_usd_hour, start, min_free_space, bot_file, auth_params_file)
- except MustExit as e:
+ cost_usd_hour, start, min_free_space, bot_file, headers_cb)
+
+ except (ExitSignal, InternalError) as e:
# This normally means run_command() didn't get the chance to run, as it
- # itself trap MustExit and will report accordingly. In this case, we want
+ # itself traps exceptions and will report accordingly. In this case, we want
# the parent process to send the message instead.
if not task_result:
task_result = {
- u'exit_code': None,
+ u'exit_code': -1,
u'hard_timeout': False,
u'io_timeout': False,
- u'must_signal_internal_failure':
- u'task_runner received signal %s' % e.signal,
+ u'must_signal_internal_failure': str(e.message or 'unknown error'),
u'version': OUT_VERSION,
}
+
finally:
# We've found tests to delete the working directory work_dir when quitting,
# causing an exception here. Try to recreate the directory if necessary.
if not os.path.isdir(work_dir):
os.mkdir(work_dir)
+ if auth_system:
+ auth_system.stop()
with open(out_file, 'wb') as f:
json.dump(task_result, f)
@@ -252,6 +290,10 @@ def post_update(
Returns:
False if the task should stop.
+
+ Raises:
+ InternalError if can't contact the server after many attempts or the server
+ replies with an error.
"""
params = params.copy()
if exit_code is not None:
@@ -271,7 +313,8 @@ def post_update(
logging.debug('post_update() = %s', resp)
if not resp or resp.get('error'):
# Abandon it. This will force a process exit.
- raise ValueError(resp.get('error') if resp else 'Failed to contact server')
+ raise InternalError(
+ resp.get('error') if resp else 'Failed to contact server')
return not resp.get('must_stop', False)
@@ -316,62 +359,27 @@ def kill_and_wait(proc, grace_period, reason):
logging.warning('SIGKILL finally due to %s', reason)
proc.kill()
exit_code = proc.wait()
- logging.info('Waiting for proces exit in finally - done')
+ logging.info('Waiting for process exit in finally - done')
return exit_code
-def start_reading_headers(auth_params_file):
- """Spawns a thread that rereads headers from --auth-params-file path.
-
- Returns:
- Tuple (callback that returns the last known headers, stop callback).
-
- Raises:
- file_reader.FatalReadError if headers file can't be read.
- ValueError if it can be read, but its body is invalid.
- """
- # Read headers more often than bot_main writes them, to reduce maximum
- # possible latency between headers are updated and read.
- reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30)
-
- def read_and_validate_headers():
- val = bot_auth.process_auth_params_json(reader.last_value or {})
- return val.swarming_http_headers
-
- reader.start()
- read_and_validate_headers() # initial validation, may raise ValueError
- return read_and_validate_headers, reader.stop
-
-
def run_command(
swarming_server, task_details, work_dir, cost_usd_hour,
- task_start, min_free_space, bot_file, auth_params_file):
+ task_start, min_free_space, bot_file, headers_cb):
"""Runs a command and sends packets to the server to stream results back.
Implements both I/O and hard timeouts. Sends the packets numbered, so the
server can ensure they are processed in order.
Returns:
- Metadata about the command.
+ Metadata dict with the execution result.
+
+ Raises:
+ ExitSignal if caught some signal when starting or stopping.
+ InternalError on unexpected internal errors.
"""
# TODO(maruel): This function is incomprehensible, split and refactor.
- # Grab initial auth headers and start rereading them in parallel thread. They
- # MUST be there already. It's fatal internal error if they are not.
- headers_cb = lambda: {}
- stop_headers_reader = lambda: None
- if auth_params_file:
- try:
- headers_cb, stop_headers_reader = start_reading_headers(auth_params_file)
- except (ValueError, file_reader.FatalReadError) as e:
- return {
- u'exit_code': 1,
- u'hard_timeout': False,
- u'io_timeout': False,
- u'must_signal_internal_failure': str(e),
- u'version': OUT_VERSION,
- }
-
# Signal the command is about to be started.
last_packet = start = now = monotonic_time()
params = {
@@ -466,7 +474,7 @@ def run_command(
if not post_update(
swarming_server, headers_cb(), params, None, stdout,
output_chunk_start):
- # Server is telling us to stop. Normally task cancelation.
+ # Server is telling us to stop. Normally task cancellation.
if not kill_sent:
logging.warning('Server induced stop; sending SIGKILL')
proc.kill()
@@ -501,19 +509,12 @@ def run_command(
task_details.grace_period, now - timed_out)
proc.kill()
kill_sent = True
- logging.info('Waiting for proces exit')
+ logging.info('Waiting for process exit')
exit_code = proc.wait()
- except MustExit as e:
- # TODO(maruel): Do the send SIGTERM to child process and give it
- # task_details.grace_period to terminate.
- must_signal_internal_failure = (
- u'task_runner received signal %s' % e.signal)
- exit_code = kill_and_wait(
- proc, task_details.grace_period, 'signal %d' % e.signal)
- except (IOError, OSError):
+ except (ExitSignal, InternalError, IOError, OSError) as e:
# Something wrong happened, try to kill the child process.
- exit_code = kill_and_wait(
- proc, task_details.grace_period, 'exception %s' % e)
+ must_signal_internal_failure = str(e.message or 'unknown error')
+ exit_code = kill_and_wait(proc, task_details.grace_period, e.message)
# This is the very last packet for this command. It if was an isolated task,
# include the output reference to the archived .isolated file.
@@ -586,15 +587,24 @@ def run_command(
if not must_signal_internal_failure:
must_signal_internal_failure = '%s\n%s' % (
e, traceback.format_exc()[-2048:])
+
# TODO(maruel): Send the internal failure here instead of sending it through
# bot_main, this causes a race condition.
if exit_code is None:
exit_code = -1
params['hard_timeout'] = had_hard_timeout
- # Ignore server reply to stop.
- post_update(
- swarming_server, headers_cb(), params, exit_code,
- stdout, output_chunk_start)
+
+ # Ignore server reply to stop. Also ignore internal errors here if we are
+ # already handling some.
+ try:
+ post_update(
+ swarming_server, headers_cb(), params, exit_code,
+ stdout, output_chunk_start)
+ except InternalError as e:
+ logging.error('Internal error while finishing the task: %s', e)
+ if not must_signal_internal_failure:
+ must_signal_internal_failure = str(e.message or 'unknown error')
+
return {
u'exit_code': exit_code,
u'hard_timeout': had_hard_timeout,
@@ -604,7 +614,6 @@ def run_command(
}
finally:
file_path.try_remove(unicode(isolated_result))
- stop_headers_reader()
def main(args):
« no previous file with comments | « appengine/swarming/swarming_bot/bot_code/bot_auth.py ('k') | appengine/swarming/swarming_bot/bot_code/task_runner_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698