Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Unified Diff: client/run_isolated.py

Issue 1342673003: Significant refactoring of run_isolated. (Closed) Base URL: git@github.com:luci/luci-py.git@master
Patch Set: . Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/swarming/swarming_bot/bot_code/task_runner.py ('k') | client/tests/run_isolated_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/run_isolated.py
diff --git a/client/run_isolated.py b/client/run_isolated.py
index 75e8b325a622ef215d9ecc648cbbc918e0732de9..07cd42141bb144dc5cd4f771da4485a2db3aae12 100755
--- a/client/run_isolated.py
+++ b/client/run_isolated.py
@@ -14,7 +14,7 @@ file. All content written to this directory will be uploaded upon termination
and the .isolated file describing this directory will be printed to stdout.
"""
-__version__ = '0.4.4'
+__version__ = '0.5'
import logging
import optparse
@@ -122,12 +122,137 @@ def change_tree_read_only(rootdir, read_only):
def process_command(command, out_dir):
"""Replaces isolated specific variables in a command line."""
- filtered = []
- for arg in command:
+ def fix(arg):
if '${ISOLATED_OUTDIR}' in arg:
- arg = arg.replace('${ISOLATED_OUTDIR}', out_dir).replace('/', os.sep)
- filtered.append(arg)
- return filtered
+ return arg.replace('${ISOLATED_OUTDIR}', out_dir).replace('/', os.sep)
+ return arg
+
+ return [fix(arg) for arg in command]
+
+
+def run_command(command, cwd):
+ """Runs the command, returns the process exit code."""
+ logging.info('run_command(%s, %s)' % (command, cwd))
+ sys.stdout.flush()
+ with tools.Profiler('RunTest'):
+ try:
+ with subprocess42.Popen_with_handler(command, cwd=cwd) as p:
+ p.communicate()
+ exit_code = p.returncode
+ except OSError:
+ # This is not considered to be an internal error. The executable simply
+ # does not exit.
+ exit_code = 1
+ logging.info(
+ 'Command finished with exit code %d (%s)',
+ exit_code, hex(0xffffffff & exit_code))
+ return exit_code
+
+
+def delete_and_upload(storage, out_dir, leak_temp_dir):
+ """Deletes the temporary run directory and uploads results back.
+
+ Returns:
+ tuple(outputs_ref, success)
+ - outputs_ref is a dict referring to the results archived back to the
+ isolated server, if applicable.
+ - success is False if something occurred that means that the task must
+ forcibly be considered a failure, e.g. zombie processes were left behind.
+ """
+
+ # Upload out_dir and generate a .isolated file out of this directory. It is
+ # only done if files were written in the directory.
+ outputs_ref = None
+ if os.path.isdir(out_dir) and os.listdir(out_dir):
+ with tools.Profiler('ArchiveOutput'):
+ try:
+ results = isolateserver.archive_files_to_storage(
+ storage, [out_dir], None)
+ outputs_ref = {
+ 'isolated': results[0][0],
+ 'isolatedserver': storage.location,
+ 'namespace': storage.namespace,
+ }
+ except isolateserver.Aborted:
+ # This happens when a signal SIGTERM was received while uploading data.
+ # There is 2 causes:
+ # - The task was too slow and was about to be killed anyway due to
+ # exceeding the hard timeout.
+ # - The amount of data uploaded back is very large and took too much
+ # time to archive.
+ sys.stderr.write('Received SIGTERM while uploading')
+ # Re-raise, so it will be treated as an internal failure.
+ raise
+ try:
+ if not leak_temp_dir and not file_path.rmtree(out_dir):
+ logging.error('Had difficulties removing out_dir %s', out_dir)
+ return outputs_ref, False
+ except OSError as e:
+ # When this happens, it means there's a process error.
+ logging.error('Had difficulties removing out_dir %s: %s', out_dir, e)
+ return outputs_ref, False
+ return outputs_ref, True
+
+
+def map_and_run(isolated_hash, storage, cache, leak_temp_dir, extra_args):
+ """Maps and run the command. Returns metadata about the result."""
+ # TODO(maruel): Include performance statistics.
+ result = {
+ 'exit_code': None,
+ 'internal_failure': None,
+ 'outputs_ref': None,
+ 'version': 1,
+ }
+ tmp_root = os.path.dirname(cache.cache_dir) if cache.cache_dir else None
+ run_dir = make_temp_dir(u'run_tha_test', tmp_root)
+ out_dir = unicode(make_temp_dir(u'isolated_out', tmp_root))
+ try:
+ bundle = isolateserver.fetch_isolated(
+ isolated_hash=isolated_hash,
+ storage=storage,
+ cache=cache,
+ outdir=run_dir,
+ require_command=True)
+
+ change_tree_read_only(run_dir, bundle.read_only)
+ cwd = os.path.normpath(os.path.join(run_dir, bundle.relative_cwd))
+ command = bundle.command + extra_args
+ file_path.ensure_command_has_abs_path(command, cwd)
+ result['exit_code'] = run_command(process_command(command, out_dir), cwd)
+ except Exception as e:
+ # An internal error occured. Report accordingly so the swarming task will be
+ # retried automatically.
+ logging.error('internal failure: %s', e)
+ result['internal_failure'] = str(e)
+ on_error.report(None)
+ finally:
+ try:
+ if leak_temp_dir:
+ logging.warning(
+ 'Deliberately leaking %s for later examination', run_dir)
+ elif not file_path.rmtree(run_dir):
+ # On Windows rmtree(run_dir) call above has a synchronization effect: it
+ # finishes only when all task child processes terminate (since a running
+ # process locks *.exe file). Examine out_dir only after that call
+ # completes (since child processes may write to out_dir too and we need
+ # to wait for them to finish).
+ print >> sys.stderr, (
+ 'Failed to delete the temporary directory, forcibly failing\n'
+ 'the task because of it. No zombie process can outlive a\n'
+ 'successful task run and still be marked as successful.\n'
+ 'Fix your stuff.')
+ if result['exit_code'] == 0:
+ result['exit_code'] = 1
+
+ result['outputs_ref'], success = delete_and_upload(
+ storage, out_dir, leak_temp_dir)
+ if not success and result['exit_code'] == 0:
+ result['exit_code'] = 1
+ except Exception as e:
+ # Swallow any exception in the main finally clause.
+ logging.error('Leaking out_dir %s: %s', out_dir, e)
+ result['internal_failure'] = str(e)
+ return result
def run_tha_test(
@@ -150,141 +275,35 @@ def run_tha_test(
in-memory.
leak_temp_dir: if true, the temporary directory will be deliberately leaked
for later examination.
- result_json: file path to dump result metadata into.
+ result_json: file path to dump result metadata into. If set, the process
+ exit code is always 0 unless an internal error occured.
extra_args: optional arguments to add to the command stated in the .isolate
file.
- """
- tmp_root = os.path.dirname(cache.cache_dir) if cache.cache_dir else None
- run_dir = make_temp_dir(u'run_tha_test', tmp_root)
- out_dir = unicode(make_temp_dir(u'isolated_out', tmp_root))
- result = 0
- try:
- try:
- bundle = isolateserver.fetch_isolated(
- isolated_hash=isolated_hash,
- storage=storage,
- cache=cache,
- outdir=run_dir,
- require_command=True)
- except isolated_format.IsolatedError:
- on_error.report(None)
- return 1
-
- change_tree_read_only(run_dir, bundle.read_only)
- cwd = os.path.normpath(os.path.join(run_dir, bundle.relative_cwd))
- command = bundle.command + extra_args
- file_path.ensure_command_has_abs_path(command, cwd)
- command = process_command(command, out_dir)
- logging.info('Running %s, cwd=%s' % (command, cwd))
-
- # TODO(csharp): This should be specified somewhere else.
- # TODO(vadimsh): Pass it via 'env_vars' in manifest.
- # Add a rotating log file if one doesn't already exist.
- env = os.environ.copy()
- if MAIN_DIR:
- env.setdefault('RUN_TEST_CASES_LOG_FILE',
- os.path.join(MAIN_DIR, RUN_TEST_CASES_LOG))
+ Returns:
+ Process exit code that should be used.
+ """
+ # run_isolated exit code. Depends on if result_json is used or not.
+ result = map_and_run(
+ isolated_hash, storage, cache, leak_temp_dir, extra_args)
+ logging.info('Result:\n%s', tools.format_json(result, dense=True))
+ if result_json:
+ tools.write_json(result_json, result, dense=True)
+ # Only return 1 if there was an internal error.
+ return int(bool(result['internal_failure']))
+
+ # Marshall into old-style inline output.
+ if result['outputs_ref']:
+ data = {
+ 'hash': result['outputs_ref']['isolated'],
+ 'namespace': result['outputs_ref']['namespace'],
+ 'storage': result['outputs_ref']['isolatedserver'],
+ }
sys.stdout.flush()
- with tools.Profiler('RunTest'):
- try:
- with subprocess42.Popen_with_handler(command, cwd=cwd, env=env) as p:
- p.communicate()
- result = p.returncode
- except OSError:
- on_error.report('Failed to run %s; cwd=%s' % (command, cwd))
- result = 1
- logging.info(
- 'Command finished with exit code %d (%s)',
- result, hex(0xffffffff & result))
- finally:
- try:
- if leak_temp_dir:
- logging.warning('Deliberately leaking %s for later examination',
- run_dir)
- else:
- try:
- if not file_path.rmtree(run_dir):
- print >> sys.stderr, (
- 'Failed to delete the temporary directory, forcibly failing\n'
- 'the task because of it. No zombie process can outlive a\n'
- 'successful task run and still be marked as successful.\n'
- 'Fix your stuff.')
- result = result or 1
- except OSError as exc:
- logging.error('Leaking run_dir %s: %s', run_dir, exc)
- result = 1
-
- # HACK(vadimsh): On Windows rmtree(run_dir) call above has
- # a synchronization effect: it finishes only when all task child processes
- # terminate (since a running process locks *.exe file). Examine out_dir
- # only after that call completes (since child processes may
- # write to out_dir too and we need to wait for them to finish).
-
- # Upload out_dir and generate a .isolated file out of this directory.
- # It is only done if files were written in the directory.
- if os.path.isdir(out_dir) and os.listdir(out_dir):
- with tools.Profiler('ArchiveOutput'):
- try:
- results = isolateserver.archive_files_to_storage(
- storage, [out_dir], None)
- except isolateserver.Aborted:
- # This happens when a signal SIGTERM was received while uploading
- # data. There is 2 causes:
- # - The task was too slow and was about to be killed anyway due to
- # exceeding the hard timeout.
- # - The amount of data uploaded back is very large and took too much
- # time to archive.
- #
- # There's 3 options to handle this:
- # - Ignore the upload failure as a silent failure. This can be
- # detected client side by the fact no result file exists.
- # - Return as if the task failed. This is not factually correct.
- # - Return an internal failure. Sadly, it's impossible at this level
- # at the moment.
- #
- # For now, silently drop the upload.
- #
- # In any case, the process only has a very short grace period so it
- # needs to exit right away.
- sys.stderr.write('Received SIGTERM while uploading')
- results = None
-
- if results:
- if result_json:
- data = {
- 'isolated': results[0][0],
- 'isolatedserver': storage.location,
- 'namespace': storage.namespace,
- }
- tools.write_json(result_json, data, dense=True)
- else:
- data = {
- 'hash': results[0][0],
- 'namespace': storage.namespace,
- 'storage': storage.location,
- }
- sys.stdout.flush()
- print(
- '[run_isolated_out_hack]%s[/run_isolated_out_hack]' %
- tools.format_json(data, dense=True))
- logging.info('%s', data)
-
- finally:
- try:
- if os.path.isdir(out_dir) and not file_path.rmtree(out_dir):
- logging.error('Had difficulties removing out_dir %s', out_dir)
- result = result or 1
- except OSError as exc:
- # Only report on non-Windows or on Windows when the process had
- # succeeded. Due to the way file sharing works on Windows, it's sadly
- # expected that file deletion may fail when a test failed.
- logging.error('Failed to remove out_dir %s: %s', out_dir, exc)
- if sys.platform != 'win32' or not result:
- on_error.report(None)
- result = 1
-
- return result
+ print(
+ '[run_isolated_out_hack]%s[/run_isolated_out_hack]' %
+ tools.format_json(data, dense=True))
+ return result['exit_code'] or int(bool(result['internal_failure']))
def main(args):
@@ -293,8 +312,10 @@ def main(args):
usage='%prog <options>',
version=__version__,
log_file=RUN_ISOLATED_LOG_FILE)
-
- parser.add_option('--json', help='dump output metadata to json file')
+ parser.add_option(
+ '--json',
+ help='dump output metadata to json file. When used, run_isolated returns '
+ 'non-zero only on internal failure')
data_group = optparse.OptionGroup(parser, 'Data source')
data_group.add_option(
'-s', '--isolated',
« no previous file with comments | « appengine/swarming/swarming_bot/bot_code/task_runner.py ('k') | client/tests/run_isolated_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698