| Index: recipe_engine/stream.py
|
| diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
|
| index 4aeb9a6fcb68eddd741d7fc1c8f11b99bf07420d..f916c80e01e7e2936e8f9a7c37f5113ee3334043 100644
|
| --- a/recipe_engine/stream.py
|
| +++ b/recipe_engine/stream.py
|
| @@ -62,16 +62,14 @@ class StreamEngine(object):
|
| def set_step_status(self, status):
|
| raise NotImplementedError()
|
|
|
| - def set_nest_level(self, nest_level):
|
| - raise NotImplementedError()
|
| -
|
| def set_build_property(self, key, value):
|
| raise NotImplementedError()
|
|
|
| def trigger(self, trigger_spec):
|
| raise NotImplementedError()
|
|
|
| - def new_step_stream(self, step_name, allow_subannotations=False):
|
| + def new_step_stream(self, step_name, allow_subannotations=False,
|
| + nest_level=None):
|
| """Creates a new StepStream in this engine.
|
|
|
| The step will be considered started at the moment this method is called.
|
| @@ -81,8 +79,14 @@ class StreamEngine(object):
|
| guard them by prefixing them with ! (False). The proper way to do this
|
| is to implement an annotations parser that converts to StreamEngine calls;
|
| i.e. parse -> re-emit.
|
| +
|
| + Args:
|
| + nest_level (int): The nest level of the step. None/0 are top-level.
|
| """
|
| + return self._new_step_stream(step_name, allow_subannotations, nest_level)
|
|
|
| + def _new_step_stream(self, step_name, allow_subannotations, nest_level):
|
| + """ABC overridable function for "new_step_stream" with no defaults."""
|
| raise NotImplementedError()
|
|
|
| def open(self):
|
| @@ -104,11 +108,13 @@ class StreamEngine(object):
|
| # had them formalized...).
|
| class ProductStreamEngine(StreamEngine):
|
| def __init__(self, engine_a, engine_b):
|
| + assert engine_a and engine_b
|
| self._engine_a = engine_a
|
| self._engine_b = engine_b
|
|
|
| class Stream(StreamEngine.Stream):
|
| def __init__(self, stream_a, stream_b):
|
| + assert stream_a and stream_b
|
| self._stream_a = stream_a
|
| self._stream_b = stream_b
|
|
|
| @@ -137,14 +143,15 @@ class ProductStreamEngine(StreamEngine):
|
| add_step_link = _void_product('add_step_link')
|
| reset_subannotation_state = _void_product('reset_subannotation_state')
|
| set_step_status = _void_product('set_step_status')
|
| - set_nest_level = _void_product('set_nest_level')
|
| set_build_property = _void_product('set_build_property')
|
| trigger = _void_product('trigger')
|
|
|
| - def new_step_stream(self, step_name, allow_subannotations=False):
|
| + def _new_step_stream(self, step_name, allow_subannotations, nest_level):
|
| return self.StepStream(
|
| - self._engine_a.new_step_stream(step_name, allow_subannotations),
|
| - self._engine_b.new_step_stream(step_name, allow_subannotations))
|
| + self._engine_a._new_step_stream(
|
| + step_name, allow_subannotations, nest_level),
|
| + self._engine_b._new_step_stream(
|
| + step_name, allow_subannotations, nest_level))
|
|
|
| def open(self):
|
| self._engine_a.open()
|
| @@ -171,11 +178,10 @@ class NoopStreamEngine(StreamEngine):
|
| add_step_link = _noop
|
| reset_subannotation_state = _noop
|
| set_step_status = _noop
|
| - set_nest_level = _noop
|
| set_build_property = _noop
|
| trigger = _noop
|
|
|
| - def new_step_stream(self, step_name, allow_subannotations=False):
|
| + def _new_step_stream(self, step_name, allow_subannotations, nest_level):
|
| return self.StepStream()
|
|
|
|
|
| @@ -233,9 +239,6 @@ class StreamEngineInvariants(StreamEngine):
|
| 'Cannot set successful status after status is %s' % self._status)
|
| self._status = status
|
|
|
| - def set_nest_level(self, nest_level):
|
| - assert isinstance(nest_level, int)
|
| -
|
| def set_build_property(self, key, value):
|
| pass
|
|
|
| @@ -259,67 +262,113 @@ class StreamEngineInvariants(StreamEngine):
|
| assert self._open
|
| self._open = False
|
|
|
| - def new_step_stream(self, step_name, allow_subannotations=False):
|
| + def _new_step_stream(self, step_name, allow_subannotations, nest_level):
|
| assert step_name not in self._streams, 'Step %s already exists' % step_name
|
| self._streams.add(step_name)
|
| return self.StepStream(self, step_name)
|
|
|
|
|
| -class AnnotationStepStream(StreamEngine.StepStream):
|
| - def __init__(self, emit_timestamps=False, time_fn=None):
|
| +class AnnotatorStreamEngine(StreamEngine):
|
| + def __init__(self, outstream, emit_timestamps=False, time_fn=None):
|
| + self._current_step = None
|
| + self._opened = set()
|
| + self._outstream = outstream
|
| self.emit_timestamps = emit_timestamps
|
| self.time_fn = time_fn or time.time
|
|
|
| - def basic_write(self, line):
|
| - raise NotImplementedError()
|
| + def open(self):
|
| + super(AnnotatorStreamEngine, self).open()
|
| + self.output_current_time()
|
| + self.output_root_annotation('HONOR_ZERO_RETURN_CODE')
|
| +
|
| + def close(self):
|
| + super(AnnotatorStreamEngine, self).close()
|
| + self.output_current_time()
|
|
|
| - def output_annotation(self, *args):
|
| - self.basic_write(
|
| + def output_current_time(self, step=None):
|
| + """Prints CURRENT_TIMESTAMP annotation with current time."""
|
| + if step:
|
| + self._step_cursor(step)
|
| + if self.emit_timestamps:
|
| + self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn())
|
| +
|
| + @staticmethod
|
| + def write_annotation(outstream, *args):
|
| + # Flush the stream before & after engine annotations, because they can
|
| + # change which step we are talking about and this matters to buildbot.
|
| + outstream.flush()
|
| + outstream.write(
|
| '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
|
| + outstream.flush()
|
|
|
| - def write_line(self, line):
|
| - if line.startswith('@@@'):
|
| - self.basic_write('!' + line + '\n')
|
| - else:
|
| - self.basic_write(line + '\n')
|
| + def output_root_annotation(self, *args):
|
| + self.write_annotation(self._outstream, *args)
|
|
|
| - def close(self):
|
| - if self.emit_timestamps:
|
| - self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
|
| - self.output_annotation('STEP_CLOSED')
|
| + def _step_cursor(self, step_name):
|
| + if self._current_step != step_name:
|
| + self.output_root_annotation('STEP_CURSOR', step_name)
|
| + self._current_step = step_name
|
| + if step_name not in self._opened:
|
| + self.output_current_time()
|
| + self.output_root_annotation('STEP_STARTED')
|
| + self._opened.add(step_name)
|
|
|
| - def new_log_stream(self, log_name):
|
| - return self.StepLogStream(self, log_name)
|
| + class StepStream(StreamEngine.StepStream):
|
| + def __init__(self, engine, outstream, step_name):
|
| + super(StreamEngine.StepStream, self).__init__()
|
|
|
| - def add_step_text(self, text):
|
| - self.output_annotation('STEP_TEXT', text)
|
| + self._engine = engine
|
| + self._outstream = outstream
|
| + self._step_name = step_name
|
|
|
| - def add_step_summary_text(self, text):
|
| - self.output_annotation('STEP_SUMMARY_TEXT', text)
|
| + def basic_write(self, line):
|
| + self._engine._step_cursor(self._step_name)
|
| + self._outstream.write(line)
|
|
|
| - def add_step_link(self, name, url):
|
| - self.output_annotation('STEP_LINK', name, url)
|
| + def close(self):
|
| + self._engine.output_current_time(step=self._step_name)
|
| + self.output_annotation('STEP_CLOSED')
|
|
|
| - def set_step_status(self, status):
|
| - if status == 'SUCCESS':
|
| - pass
|
| - elif status == 'WARNING':
|
| - self.output_annotation('STEP_WARNINGS')
|
| - elif status == 'FAILURE':
|
| - self.output_annotation('STEP_FAILURE')
|
| - elif status == 'EXCEPTION':
|
| - self.output_annotation('STEP_EXCEPTION')
|
| - else:
|
| - raise Exception('Impossible status %s' % status)
|
| + def output_annotation(self, *args):
|
| + self._engine._step_cursor(self._step_name)
|
| + self._engine.write_annotation(self._outstream, *args)
|
| +
|
| + def write_line(self, line):
|
| + if line.startswith('@@@'):
|
| + self.basic_write('!' + line + '\n')
|
| + else:
|
| + self.basic_write(line + '\n')
|
| +
|
| + def new_log_stream(self, log_name):
|
| + return self._engine.StepLogStream(self, log_name)
|
| +
|
| + def add_step_text(self, text):
|
| + self.output_annotation('STEP_TEXT', text)
|
| +
|
| + def add_step_summary_text(self, text):
|
| + self.output_annotation('STEP_SUMMARY_TEXT', text)
|
| +
|
| + def add_step_link(self, name, url):
|
| + self.output_annotation('STEP_LINK', name, url)
|
| +
|
| + def set_step_status(self, status):
|
| + if status == 'SUCCESS':
|
| + pass
|
| + elif status == 'WARNING':
|
| + self.output_annotation('STEP_WARNINGS')
|
| + elif status == 'FAILURE':
|
| + self.output_annotation('STEP_FAILURE')
|
| + elif status == 'EXCEPTION':
|
| + self.output_annotation('STEP_EXCEPTION')
|
| + else:
|
| + raise Exception('Impossible status %s' % status)
|
|
|
| - def set_nest_level(self, nest_level):
|
| - self.output_annotation('STEP_NEST_LEVEL', str(nest_level))
|
| + def set_build_property(self, key, value):
|
| + self.output_annotation('SET_BUILD_PROPERTY', key, value)
|
|
|
| - def set_build_property(self, key, value):
|
| - self.output_annotation('SET_BUILD_PROPERTY', key, value)
|
| + def trigger(self, spec):
|
| + self.output_annotation('STEP_TRIGGER', spec)
|
|
|
| - def trigger(self, spec):
|
| - self.output_annotation('STEP_TRIGGER', spec)
|
|
|
| class StepLogStream(StreamEngine.Stream):
|
| def __init__(self, step_stream, log_name):
|
| @@ -333,58 +382,6 @@ class AnnotationStepStream(StreamEngine.StepStream):
|
| self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
|
|
|
|
|
| -class AnnotatorStreamEngine(StreamEngine):
|
| - def __init__(self, outstream, emit_timestamps=False, time_fn=None):
|
| - self._current_step = None
|
| - self._opened = set()
|
| - self._outstream = outstream
|
| - self.emit_timestamps = emit_timestamps
|
| - self.time_fn = time_fn or time.time
|
| -
|
| - def open(self):
|
| - super(AnnotatorStreamEngine, self).open()
|
| - if self.emit_timestamps:
|
| - self.output_current_time()
|
| - self.output_annotation('HONOR_ZERO_RETURN_CODE')
|
| -
|
| - def close(self):
|
| - super(AnnotatorStreamEngine, self).close()
|
| - if self.emit_timestamps:
|
| - self.output_current_time()
|
| -
|
| - def output_current_time(self):
|
| - """Prints CURRENT_TIMESTAMP annotation with current time."""
|
| - self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
|
| -
|
| - def output_annotation(self, *args):
|
| - # Flush the stream before & after engine annotations, because they can
|
| - # change which step we are talking about and this matters to buildbot.
|
| - self._outstream.flush()
|
| - self._outstream.write(
|
| - '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
|
| - self._outstream.flush()
|
| -
|
| - def _step_cursor(self, name):
|
| - if self._current_step != name:
|
| - self.output_annotation('STEP_CURSOR', name)
|
| - self._current_step = name
|
| - if name not in self._opened:
|
| - if self.emit_timestamps:
|
| - self.output_current_time()
|
| - self.output_annotation('STEP_STARTED')
|
| - self._opened.add(name)
|
| -
|
| - class StepStream(AnnotationStepStream):
|
| - def __init__(self, engine, step_name):
|
| - AnnotationStepStream.__init__(
|
| - self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn)
|
| - self._engine = engine
|
| - self._step_name = step_name
|
| -
|
| - def basic_write(self, line):
|
| - self._engine._step_cursor(self._step_name)
|
| - self._engine._outstream.write(line)
|
| -
|
| class AllowSubannotationsStepStream(StepStream):
|
| def write_line(self, line):
|
| self.basic_write(line + '\n')
|
| @@ -396,25 +393,23 @@ class AnnotatorStreamEngine(StreamEngine):
|
| def reset_subannotation_state(self):
|
| self._engine._current_step = None
|
|
|
| - def new_step_stream(self, step_name, allow_subannotations=False):
|
| - self.output_annotation('SEED_STEP', step_name)
|
| - if allow_subannotations:
|
| - return self.AllowSubannotationsStepStream(self, step_name)
|
| - else:
|
| - return self.StepStream(self, step_name)
|
| -
|
|
|
| -class BareAnnotationStepStream(AnnotationStepStream):
|
| - """A StepStream that is not tied to any engine, and emits assuming it has the
|
| - cursor.
|
| + def _new_step_stream(self, step_name, allow_subannotations, nest_level):
|
| + return self._create_step_stream(step_name, self._outstream,
|
| + allow_subannotations, nest_level)
|
|
|
| - This is used for capturing the annotations in the engine.
|
| - """
|
| - def __init__(self, outstream):
|
| - self._outstream = outstream
|
| + def _create_step_stream(self, step_name, outstream, allow_subannotations,
|
| + nest_level):
|
| + self.output_root_annotation('SEED_STEP', step_name)
|
| + if allow_subannotations:
|
| + stream = self.AllowSubannotationsStepStream(self, outstream, step_name)
|
| + else:
|
| + stream = self.StepStream(self, outstream, step_name)
|
|
|
| - def basic_write(self, line):
|
| - self._outstream.write(encode_str(line))
|
| + if nest_level > 0:
|
| + # Emit our current nest level, if we are nested.
|
| + stream.output_annotation('STEP_NEST_LEVEL', str(nest_level))
|
| + return stream
|
|
|
|
|
| def encode_str(s):
|
|
|