| Index: recipe_engine/run.py
|
| diff --git a/recipe_engine/run.py b/recipe_engine/run.py
|
| index 8878152176499e68f32dff9e53b2094ee9c72dc5..cf9ec4bb78c1edde391af965326caaf3ba9a5423 100644
|
| --- a/recipe_engine/run.py
|
| +++ b/recipe_engine/run.py
|
| @@ -75,9 +75,16 @@ from . import recipe_api
|
| from . import recipe_test_api
|
| from . import types
|
| from . import util
|
| -from . import result_pb2
|
| +
|
| +from . import env
|
| +
|
| +import argparse # this is vendored
|
| import subprocess42
|
|
|
| +from . import result_pb2
|
| +
|
| +from google.protobuf import json_format as jsonpb
|
| +
|
|
|
| SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
|
|
|
| @@ -85,6 +92,7 @@ SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
|
| # TODO(martiniss): Remove this
|
| RecipeResult = collections.namedtuple('RecipeResult', 'result')
|
|
|
| +
|
| # TODO(dnj): Replace "properties" with a generic runtime instance. This instance
|
| # will be used to seed recipe clients and expanded to include managed runtime
|
| # entities.
|
| @@ -176,6 +184,7 @@ def run_steps(properties, stream_engine, step_runner, universe_view,
|
| # TODO(martiniss): remove this
|
| RecipeResult = collections.namedtuple('RecipeResult', 'result')
|
|
|
| +
|
| class RecipeEngine(object):
|
| """
|
| Knows how to execute steps emitted by a recipe, holds global state such as
|
| @@ -434,3 +443,283 @@ class RecipeEngine(object):
|
| e, recipe, properties))
|
|
|
| return results
|
| +
|
| +
|
| +def add_subparser(parser):
|
| + def properties_file_type(filename):
|
| + with (sys.stdin if filename == '-' else open(filename)) as f:
|
| + obj = json.load(f)
|
| + if not isinstance(obj, dict):
|
| + raise argparse.ArgumentTypeError(
|
| + 'must contain a JSON object, i.e. `{}`.')
|
| + return obj
|
| +
|
| + def parse_prop(prop):
|
| + key, val = prop.split('=', 1)
|
| + try:
|
| + val = json.loads(val)
|
| + except (ValueError, SyntaxError):
|
| + pass # If a value couldn't be evaluated, keep the string version
|
| + return {key: val}
|
| +
|
| + def properties_type(value):
|
| + obj = json.loads(value)
|
| + if not isinstance(obj, dict):
|
| + raise argparse.ArgumentTypeError('must contain a JSON object, i.e. `{}`.')
|
| + return obj
|
| +
|
| + run_p = parser.add_parser(
|
| + 'run',
|
| + description='Run a recipe locally')
|
| +
|
| + run_p.add_argument(
|
| + '--workdir',
|
| + type=os.path.abspath,
|
| + help='The working directory of recipe execution')
|
| + run_p.add_argument(
|
| + '--output-result-json',
|
| + type=os.path.abspath,
|
| + help='The file to write the JSON serialized returned value \
|
| + of the recipe to')
|
| + run_p.add_argument(
|
| + '--timestamps',
|
| + action='store_true',
|
| + help='If true, emit CURRENT_TIMESTAMP annotations. '
|
| + 'Default: false. '
|
| + 'CURRENT_TIMESTAMP annotation has one parameter, current time in '
|
| + 'Unix timestamp format. '
|
| + 'CURRENT_TIMESTAMP annotation will be printed at the beginning and '
|
| + 'end of the annotation stream and also immediately before each '
|
| + 'STEP_STARTED and STEP_CLOSED annotations.',
|
| + )
|
| + prop_group = run_p.add_mutually_exclusive_group()
|
| + prop_group.add_argument(
|
| + '--properties-file',
|
| + dest='properties',
|
| + type=properties_file_type,
|
| + help=('A file containing a json blob of properties. '
|
| + 'Pass "-" to read from stdin'))
|
| + prop_group.add_argument(
|
| + '--properties',
|
| + type=properties_type,
|
| + help='A json string containing the properties')
|
| +
|
| + run_p.add_argument(
|
| + 'recipe',
|
| + help='The recipe to execute')
|
| + run_p.add_argument(
|
| + 'props',
|
| + nargs=argparse.REMAINDER,
|
| + type=parse_prop,
|
| + help='A list of property pairs; e.g. mastername=chromium.linux '
|
| + 'issue=12345. The property value will be decoded as JSON, but if '
|
| + 'this decoding fails the value will be interpreted as a string.')
|
| +
|
| + run_p.set_defaults(command='run', properties={}, func=main)
|
| +
|
| +
|
| +def handle_recipe_return(recipe_result, result_filename, stream_engine,
|
| + engine_flags):
|
| + if engine_flags and engine_flags.use_result_proto:
|
| + return new_handle_recipe_return(
|
| + recipe_result, result_filename, stream_engine)
|
| +
|
| + if 'recipe_result' in recipe_result.result:
|
| + result_string = json.dumps(
|
| + recipe_result.result['recipe_result'], indent=2)
|
| + if result_filename:
|
| + with open(result_filename, 'w') as f:
|
| + f.write(result_string)
|
| + with stream_engine.make_step_stream('recipe result') as s:
|
| + with s.new_log_stream('result') as l:
|
| + l.write_split(result_string)
|
| +
|
| + if 'traceback' in recipe_result.result:
|
| + with stream_engine.make_step_stream('Uncaught Exception') as s:
|
| + with s.new_log_stream('exception') as l:
|
| + for line in recipe_result.result['traceback']:
|
| + l.write_line(line)
|
| +
|
| + if 'reason' in recipe_result.result:
|
| + with stream_engine.make_step_stream('Failure reason') as s:
|
| + with s.new_log_stream('reason') as l:
|
| + for line in recipe_result.result['reason'].splitlines():
|
| + l.write_line(line)
|
| +
|
| + if 'status_code' in recipe_result.result:
|
| + return recipe_result.result['status_code']
|
| + else:
|
| + return 0
|
| +
|
| +
|
| +def new_handle_recipe_return(result, result_filename, stream_engine):
|
| + if result_filename:
|
| + with open(result_filename, 'w') as fil:
|
| + fil.write(jsonpb.MessageToJson(
|
| + result, including_default_value_fields=True))
|
| +
|
| + if result.json_result:
|
| + with stream_engine.make_step_stream('recipe result') as s:
|
| + with s.new_log_stream('result') as l:
|
| + l.write_split(result.json_result)
|
| +
|
| + if result.HasField('failure'):
|
| + f = result.failure
|
| + if f.HasField('exception'):
|
| + with stream_engine.make_step_stream('Uncaught Exception') as s:
|
| + s.add_step_text(f.human_reason)
|
| + with s.new_log_stream('exception') as l:
|
| + for line in f.exception.traceback:
|
| + l.write_line(line)
|
| + # TODO(martiniss): Remove this code once calling code handles these states
|
| + elif f.HasField('timeout'):
|
| + with stream_engine.make_step_stream('Step Timed Out') as s:
|
| + with s.new_log_stream('timeout_s') as l:
|
| + l.write_line(f.timeout.timeout_s)
|
| + elif f.HasField('step_data'):
|
| + with stream_engine.make_step_stream('Invalid Step Data Access') as s:
|
| + with s.new_log_stream('step') as l:
|
| + l.write_line(f.step_data.step)
|
| +
|
| + with stream_engine.make_step_stream('Failure reason') as s:
|
| + with s.new_log_stream('reason') as l:
|
| + l.write_split(f.human_reason)
|
| +
|
| + return 1
|
| +
|
| + return 0
|
| +
|
| +
|
| +# Map of arguments_pb2.Property "value" oneof conversion functions.
|
| +#
|
| +# The fields here should be kept in sync with the "value" oneof field names in
|
| +# the arguments_pb2.Arguments.Property protobuf message.
|
| +_OP_PROPERTY_CONV = {
|
| + 's': lambda prop: prop.s,
|
| + 'int': lambda prop: prop.int,
|
| + 'uint': lambda prop: prop.uint,
|
| + 'd': lambda prop: prop.d,
|
| + 'b': lambda prop: prop.b,
|
| + 'data': lambda prop: prop.data,
|
| + 'map': lambda prop: _op_properties_to_dict(prop.map.property),
|
| + 'list': lambda prop: [_op_property_value(v) for v in prop.list.property],
|
| +}
|
| +
|
| +
|
| +def _op_property_value(prop):
|
| + """Returns the Python-converted value of an arguments_pb2.Property.
|
| +
|
| + Args:
|
| + prop (arguments_pb2.Property): property to convert.
|
| + Returns: The converted value.
|
| + Raises:
|
| + ValueError: If 'prop' is incomplete or invalid.
|
| + """
|
| + typ = prop.WhichOneof('value')
|
| + conv = _OP_PROPERTY_CONV.get(typ)
|
| + if not conv:
|
| + raise ValueError('Unknown property field [%s]' % (typ,))
|
| + return conv(prop)
|
| +
|
| +
|
| +def _op_properties_to_dict(pmap):
|
| + """Creates a properties dictionary from an arguments_pb2.PropertyMap entry.
|
| +
|
| + Args:
|
| + pmap (arguments_pb2.PropertyMap): Map to convert to dictionary form.
|
| + Returns (dict): A dictionary derived from the properties in 'pmap'.
|
| + """
|
| + return dict((k, _op_property_value(pmap[k])) for k in pmap)
|
| +
|
| +
|
| +def main(package_deps, args):
|
| + from recipe_engine import step_runner
|
| + from recipe_engine import stream
|
| + from recipe_engine import stream_logdog
|
| +
|
| + config_file = args.package
|
| +
|
| + if args.props:
|
| + for p in args.props:
|
| + args.properties.update(p)
|
| +
|
| + def get_properties_from_operational_args(op_args):
|
| + if not op_args.properties.property:
|
| + return None
|
| + return _op_properties_to_dict(op_args.properties.property)
|
| +
|
| + op_args = args.operational_args
|
| + op_properties = get_properties_from_operational_args(op_args)
|
| + if args.properties and op_properties:
|
| + raise ValueError(
|
| + 'Got operational args properties as well as CLI properties.')
|
| +
|
| + properties = op_properties
|
| + if not properties:
|
| + properties = args.properties
|
| +
|
| + properties['recipe'] = args.recipe
|
| +
|
| + properties = util.strip_unicode(properties)
|
| +
|
| + os.environ['PYTHONUNBUFFERED'] = '1'
|
| + os.environ['PYTHONIOENCODING'] = 'UTF-8'
|
| +
|
| + universe_view = loader.UniverseView(
|
| + loader.RecipeUniverse(
|
| + package_deps, config_file), package_deps.root_package)
|
| +
|
| + # TODO(iannucci): this is horrible; why do we want to set a workdir anyway?
|
| + # Shouldn't the caller of recipes just CD somewhere if they want a different
|
| + # workdir?
|
| + workdir = (args.workdir or
|
| + os.path.join(SCRIPT_PATH, os.path.pardir, 'workdir'))
|
| + logging.info('Using %s as work directory' % workdir)
|
| + if not os.path.exists(workdir):
|
| + os.makedirs(workdir)
|
| +
|
| + old_cwd = os.getcwd()
|
| + os.chdir(workdir)
|
| +
|
| + # Construct our stream engines. We may want to share stream events with more
|
| + # than one StreamEngine implementation, so we will accumulate them in a
|
| + # "stream_engines" list and compose them into a MultiStreamEngine.
|
| + def build_annotation_stream_engine():
|
| + return stream.AnnotatorStreamEngine(
|
| + sys.stdout,
|
| + emit_timestamps=(args.timestamps or
|
| + op_args.annotation_flags.emit_timestamp))
|
| +
|
| + stream_engines = []
|
| + if op_args.logdog.streamserver_uri:
|
| + logging.debug('Using LogDog with parameters: [%s]', op_args.logdog)
|
| + stream_engines.append(stream_logdog.StreamEngine(
|
| + streamserver_uri=op_args.logdog.streamserver_uri,
|
| + name_base=(op_args.logdog.name_base or None),
|
| + dump_path=op_args.logdog.final_annotation_dump_path,
|
| + ))
|
| +
|
| + # If we're teeing, also fold in a standard annotation stream engine.
|
| + if op_args.logdog.tee:
|
| + stream_engines.append(build_annotation_stream_engine())
|
| + else:
|
| + # Not using LogDog; use a standard annotation stream engine.
|
| + stream_engines.append(build_annotation_stream_engine())
|
| + multi_stream_engine = stream.MultiStreamEngine.create(*stream_engines)
|
| +
|
| + emit_initial_properties = op_args.annotation_flags.emit_initial_properties
|
| + engine_flags = op_args.engine_flags
|
| +
|
| + # Have a top-level set of invariants to enforce StreamEngine expectations.
|
| + with stream.StreamEngineInvariants.wrap(multi_stream_engine) as stream_engine:
|
| + try:
|
| + ret = run_steps(
|
| + properties, stream_engine,
|
| + step_runner.SubprocessStepRunner(stream_engine, engine_flags),
|
| + universe_view, engine_flags=engine_flags,
|
| + emit_initial_properties=emit_initial_properties)
|
| + finally:
|
| + os.chdir(old_cwd)
|
| +
|
| + return handle_recipe_return(
|
| + ret, args.output_result_json, stream_engine, engine_flags)
|
|
|