Index: recipe_engine/run.py |
diff --git a/recipe_engine/run.py b/recipe_engine/run.py |
index 8878152176499e68f32dff9e53b2094ee9c72dc5..97dcba16ae7ab6dcf70a2f96afa412dfb19c04ab 100644 |
--- a/recipe_engine/run.py |
+++ b/recipe_engine/run.py |
@@ -66,6 +66,7 @@ iterable_of_things. |
import collections |
import json |
import logging |
+import argparse |
dnj
2017/04/27 17:14:01
nit: order
iannucci
2017/04/29 15:45:59
Done.
|
import os |
import sys |
import traceback |
@@ -75,9 +76,15 @@ from . import recipe_api |
from . import recipe_test_api |
from . import types |
from . import util |
-from . import result_pb2 |
+ |
+from . import env |
dnj
2017/04/27 17:14:01
Shouldn't this be merged above?
iannucci
2017/04/29 15:45:59
I like to keep the env import line separate becaus
|
+ |
import subprocess42 |
+from . import result_pb2 |
+ |
+from google.protobuf import json_format as jsonpb |
+ |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) |
@@ -434,3 +441,280 @@ class RecipeEngine(object): |
e, recipe, properties)) |
return results |
+ |
+ |
+def add_subparser(parser): |
+ def properties_file_type(filename): |
+ with (sys.stdin if filename == '-' else open(filename)) as f: |
+ obj = json.load(f) |
+ if not isinstance(obj, dict): |
+ raise argparse.ArgumentTypeError( |
+ 'must contain a JSON object, i.e. `{}`.') |
+ return obj |
+ |
+ def parse_prop(prop): |
+ key, val = prop.split('=', 1) |
+ try: |
+ val = json.loads(val) |
+ except (ValueError, SyntaxError): |
+ pass # If a value couldn't be evaluated, keep the string version |
+ return {key: val} |
+ |
+ def properties_type(value): |
+ obj = json.loads(value) |
+ if not isinstance(obj, dict): |
+ raise argparse.ArgumentTypeError('must contain a JSON object, i.e. `{}`.') |
+ return obj |
+ |
+ run_p = parser.add_parser( |
+ 'run', |
+ description='Run a recipe locally') |
+ |
+ run_p.add_argument( |
+ '--workdir', |
+ type=os.path.abspath, |
+ help='The working directory of recipe execution') |
+ run_p.add_argument( |
+ '--output-result-json', |
+ type=os.path.abspath, |
+ help='The file to write the JSON serialized returned value \ |
+ of the recipe to') |
+ run_p.add_argument( |
+ '--timestamps', |
+ action='store_true', |
+ help='If true, emit CURRENT_TIMESTAMP annotations. ' |
+ 'Default: false. ' |
+ 'CURRENT_TIMESTAMP annotation has one parameter, current time in ' |
+ 'Unix timestamp format. ' |
+ 'CURRENT_TIMESTAMP annotation will be printed at the beginning and ' |
+ 'end of the annotation stream and also immediately before each ' |
+ 'STEP_STARTED and STEP_CLOSED annotations.', |
+ ) |
+ prop_group = run_p.add_mutually_exclusive_group() |
+ prop_group.add_argument( |
+ '--properties-file', |
+ dest='properties', |
+ type=properties_file_type, |
+ help=('A file containing a json blob of properties. ' |
+ 'Pass "-" to read from stdin')) |
+ prop_group.add_argument( |
+ '--properties', |
+ type=properties_type, |
+ help='A json string containing the properties') |
+ |
+ run_p.add_argument( |
+ 'recipe', |
+ help='The recipe to execute') |
+ run_p.add_argument( |
+ 'props', |
+ nargs=argparse.REMAINDER, |
+ type=parse_prop, |
+ help='A list of property pairs; e.g. mastername=chromium.linux ' |
+ 'issue=12345. The property value will be decoded as JSON, but if ' |
+ 'this decoding fails the value will be interpreted as a string.') |
+ |
+ run_p.set_defaults(command='run', properties={}, func=main) |
+ |
dnj
2017/04/27 17:14:01
nit: two spaces (here and elsewhere)
iannucci
2017/04/29 15:45:59
Done.
|
+def handle_recipe_return(recipe_result, result_filename, stream_engine, |
+ engine_flags): |
+ if engine_flags and engine_flags.use_result_proto: |
+ return new_handle_recipe_return( |
+ recipe_result, result_filename, stream_engine) |
+ |
+ if 'recipe_result' in recipe_result.result: |
+ result_string = json.dumps( |
+ recipe_result.result['recipe_result'], indent=2) |
+ if result_filename: |
+ with open(result_filename, 'w') as f: |
+ f.write(result_string) |
+ with stream_engine.make_step_stream('recipe result') as s: |
+ with s.new_log_stream('result') as l: |
+ l.write_split(result_string) |
+ |
+ if 'traceback' in recipe_result.result: |
+ with stream_engine.make_step_stream('Uncaught Exception') as s: |
+ with s.new_log_stream('exception') as l: |
+ for line in recipe_result.result['traceback']: |
+ l.write_line(line) |
+ |
+ if 'reason' in recipe_result.result: |
+ with stream_engine.make_step_stream('Failure reason') as s: |
+ with s.new_log_stream('reason') as l: |
+ for line in recipe_result.result['reason'].splitlines(): |
+ l.write_line(line) |
+ |
+ if 'status_code' in recipe_result.result: |
+ return recipe_result.result['status_code'] |
+ else: |
+ return 0 |
+ |
+def new_handle_recipe_return(result, result_filename, stream_engine): |
+ if result_filename: |
+ with open(result_filename, 'w') as fil: |
+ fil.write(jsonpb.MessageToJson( |
+ result, including_default_value_fields=True)) |
+ |
+ if result.json_result: |
+ with stream_engine.make_step_stream('recipe result') as s: |
+ with s.new_log_stream('result') as l: |
+ l.write_split(result.json_result) |
+ |
+ if result.HasField('failure'): |
+ f = result.failure |
+ if f.HasField('exception'): |
+ with stream_engine.make_step_stream('Uncaught Exception') as s: |
+ s.add_step_text(f.human_reason) |
+ with s.new_log_stream('exception') as l: |
+ for line in f.exception.traceback: |
+ l.write_line(line) |
+ # TODO(martiniss): Remove this code once calling code handles these states |
+ elif f.HasField('timeout'): |
+ with stream_engine.make_step_stream('Step Timed Out') as s: |
+ with s.new_log_stream('timeout_s') as l: |
+ l.write_line(f.timeout.timeout_s) |
+ elif f.HasField('step_data'): |
+ with stream_engine.make_step_stream('Invalid Step Data Access') as s: |
+ with s.new_log_stream('step') as l: |
+ l.write_line(f.step_data.step) |
+ |
+ with stream_engine.make_step_stream('Failure reason') as s: |
+ with s.new_log_stream('reason') as l: |
+ l.write_split(f.human_reason) |
+ |
+ return 1 |
+ |
+ return 0 |
+ |
+ |
+# Map of arguments_pb2.Property "value" oneof conversion functions. |
+# |
+# The fields here should be kept in sync with the "value" oneof field names in |
+# the arguments_pb2.Arguments.Property protobuf message. |
+_OP_PROPERTY_CONV = { |
+ 's': lambda prop: prop.s, |
+ 'int': lambda prop: prop.int, |
+ 'uint': lambda prop: prop.uint, |
+ 'd': lambda prop: prop.d, |
+ 'b': lambda prop: prop.b, |
+ 'data': lambda prop: prop.data, |
+ 'map': lambda prop: _op_properties_to_dict(prop.map.property), |
+ 'list': lambda prop: [_op_property_value(v) for v in prop.list.property], |
+} |
+ |
+def _op_property_value(prop): |
+ """Returns the Python-converted value of an arguments_pb2.Property. |
+ |
+ Args: |
+ prop (arguments_pb2.Property): property to convert. |
+ Returns: The converted value. |
+ Raises: |
+ ValueError: If 'prop' is incomplete or invalid. |
+ """ |
+ typ = prop.WhichOneof('value') |
+ conv = _OP_PROPERTY_CONV.get(typ) |
+ if not conv: |
+ raise ValueError('Unknown property field [%s]' % (typ,)) |
+ return conv(prop) |
+ |
+ |
+def _op_properties_to_dict(pmap): |
+ """Creates a properties dictionary from an arguments_pb2.PropertyMap entry. |
+ |
+ Args: |
+ pmap (arguments_pb2.PropertyMap): Map to convert to dictionary form. |
+ Returns (dict): A dictionary derived from the properties in 'pmap'. |
+ """ |
+ return dict((k, _op_property_value(pmap[k])) for k in pmap) |
+ |
+ |
+def main(package_deps, args): |
+ from recipe_engine import step_runner |
+ from recipe_engine import stream |
+ from recipe_engine import stream_logdog |
+ |
+ config_file = args.package |
+ |
+ if args.props: |
+ for p in args.props: |
+ args.properties.update(p) |
+ |
+ def get_properties_from_operational_args(op_args): |
+ if not op_args.properties.property: |
+ return None |
+ return _op_properties_to_dict(op_args.properties.property) |
+ |
+ op_args = args.operational_args |
+ op_properties = get_properties_from_operational_args(op_args) |
+ if args.properties and op_properties: |
+ raise ValueError( |
+ 'Got operational args properties as well as CLI properties.') |
+ |
+ properties = op_properties |
+ if not properties: |
+ properties = args.properties |
+ |
+ properties['recipe'] = args.recipe |
+ |
+ properties = util.strip_unicode(properties) |
+ |
+ os.environ['PYTHONUNBUFFERED'] = '1' |
+ os.environ['PYTHONIOENCODING'] = 'UTF-8' |
+ |
+ universe_view = loader.UniverseView( |
+ loader.RecipeUniverse( |
+ package_deps, config_file), package_deps.root_package) |
+ |
+ # TODO(iannucci): this is horrible; why do we want to set a workdir anyway? |
+ # Shouldn't the caller of recipes just CD somewhere if they want a different |
+ # workdir? |
+ workdir = (args.workdir or |
+ os.path.join(SCRIPT_PATH, os.path.pardir, 'workdir')) |
+ logging.info('Using %s as work directory' % workdir) |
+ if not os.path.exists(workdir): |
+ os.makedirs(workdir) |
+ |
+ old_cwd = os.getcwd() |
+ os.chdir(workdir) |
+ |
+ # Construct our stream engines. We may want to share stream events with more |
+ # than one StreamEngine implementation, so we will accumulate them in a |
+ # "stream_engines" list and compose them into a MultiStreamEngine. |
+ def build_annotation_stream_engine(): |
+ return stream.AnnotatorStreamEngine( |
+ sys.stdout, |
+ emit_timestamps=(args.timestamps or |
+ op_args.annotation_flags.emit_timestamp)) |
+ |
+ stream_engines = [] |
+ if op_args.logdog.streamserver_uri: |
+ logging.debug('Using LogDog with parameters: [%s]', op_args.logdog) |
+ stream_engines.append(stream_logdog.StreamEngine( |
+ streamserver_uri=op_args.logdog.streamserver_uri, |
+ name_base=(op_args.logdog.name_base or None), |
+ dump_path=op_args.logdog.final_annotation_dump_path, |
+ )) |
+ |
+ # If we're teeing, also fold in a standard annotation stream engine. |
+ if op_args.logdog.tee: |
+ stream_engines.append(build_annotation_stream_engine()) |
+ else: |
+ # Not using LogDog; use a standard annotation stream engine. |
+ stream_engines.append(build_annotation_stream_engine()) |
+ multi_stream_engine = stream.MultiStreamEngine.create(*stream_engines) |
+ |
+ emit_initial_properties = op_args.annotation_flags.emit_initial_properties |
+ engine_flags = op_args.engine_flags |
+ |
+ # Have a top-level set of invariants to enforce StreamEngine expectations. |
+ with stream.StreamEngineInvariants.wrap(multi_stream_engine) as stream_engine: |
+ try: |
+ ret = run_steps( |
+ properties, stream_engine, |
+ step_runner.SubprocessStepRunner(stream_engine, engine_flags), |
+ universe_view, engine_flags=engine_flags, |
+ emit_initial_properties=emit_initial_properties) |
+ finally: |
+ os.chdir(old_cwd) |
+ |
+ return handle_recipe_return( |
+ ret, args.output_result_json, stream_engine, engine_flags) |