Index: recipe_engine/stream.py |
diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py |
index 4aeb9a6fcb68eddd741d7fc1c8f11b99bf07420d..f916c80e01e7e2936e8f9a7c37f5113ee3334043 100644 |
--- a/recipe_engine/stream.py |
+++ b/recipe_engine/stream.py |
@@ -62,16 +62,14 @@ class StreamEngine(object): |
def set_step_status(self, status): |
raise NotImplementedError() |
- def set_nest_level(self, nest_level): |
- raise NotImplementedError() |
- |
def set_build_property(self, key, value): |
raise NotImplementedError() |
def trigger(self, trigger_spec): |
raise NotImplementedError() |
- def new_step_stream(self, step_name, allow_subannotations=False): |
+ def new_step_stream(self, step_name, allow_subannotations=False, |
+ nest_level=None): |
"""Creates a new StepStream in this engine. |
The step will be considered started at the moment this method is called. |
@@ -81,8 +79,14 @@ class StreamEngine(object): |
guard them by prefixing them with ! (False). The proper way to do this |
is to implement an annotations parser that converts to StreamEngine calls; |
i.e. parse -> re-emit. |
+ |
+ Args: |
+ nest_level (int): The nest level of the step. None/0 are top-level. |
""" |
+ return self._new_step_stream(step_name, allow_subannotations, nest_level) |
+ def _new_step_stream(self, step_name, allow_subannotations, nest_level): |
+ """ABC overridable function for "new_step_stream" with no defaults.""" |
raise NotImplementedError() |
def open(self): |
@@ -104,11 +108,13 @@ class StreamEngine(object): |
# had them formalized...). |
class ProductStreamEngine(StreamEngine): |
def __init__(self, engine_a, engine_b): |
+ assert engine_a and engine_b |
self._engine_a = engine_a |
self._engine_b = engine_b |
class Stream(StreamEngine.Stream): |
def __init__(self, stream_a, stream_b): |
+ assert stream_a and stream_b |
self._stream_a = stream_a |
self._stream_b = stream_b |
@@ -137,14 +143,15 @@ class ProductStreamEngine(StreamEngine): |
add_step_link = _void_product('add_step_link') |
reset_subannotation_state = _void_product('reset_subannotation_state') |
set_step_status = _void_product('set_step_status') |
- set_nest_level = _void_product('set_nest_level') |
set_build_property = _void_product('set_build_property') |
trigger = _void_product('trigger') |
- def new_step_stream(self, step_name, allow_subannotations=False): |
+ def _new_step_stream(self, step_name, allow_subannotations, nest_level): |
return self.StepStream( |
- self._engine_a.new_step_stream(step_name, allow_subannotations), |
- self._engine_b.new_step_stream(step_name, allow_subannotations)) |
+ self._engine_a._new_step_stream( |
+ step_name, allow_subannotations, nest_level), |
+ self._engine_b._new_step_stream( |
+ step_name, allow_subannotations, nest_level)) |
def open(self): |
self._engine_a.open() |
@@ -171,11 +178,10 @@ class NoopStreamEngine(StreamEngine): |
add_step_link = _noop |
reset_subannotation_state = _noop |
set_step_status = _noop |
- set_nest_level = _noop |
set_build_property = _noop |
trigger = _noop |
- def new_step_stream(self, step_name, allow_subannotations=False): |
+ def _new_step_stream(self, step_name, allow_subannotations, nest_level): |
return self.StepStream() |
@@ -233,9 +239,6 @@ class StreamEngineInvariants(StreamEngine): |
'Cannot set successful status after status is %s' % self._status) |
self._status = status |
- def set_nest_level(self, nest_level): |
- assert isinstance(nest_level, int) |
- |
def set_build_property(self, key, value): |
pass |
@@ -259,67 +262,113 @@ class StreamEngineInvariants(StreamEngine): |
assert self._open |
self._open = False |
- def new_step_stream(self, step_name, allow_subannotations=False): |
+ def _new_step_stream(self, step_name, allow_subannotations, nest_level): |
assert step_name not in self._streams, 'Step %s already exists' % step_name |
self._streams.add(step_name) |
return self.StepStream(self, step_name) |
-class AnnotationStepStream(StreamEngine.StepStream): |
- def __init__(self, emit_timestamps=False, time_fn=None): |
+class AnnotatorStreamEngine(StreamEngine): |
+ def __init__(self, outstream, emit_timestamps=False, time_fn=None): |
+ self._current_step = None |
+ self._opened = set() |
+ self._outstream = outstream |
self.emit_timestamps = emit_timestamps |
self.time_fn = time_fn or time.time |
- def basic_write(self, line): |
- raise NotImplementedError() |
+ def open(self): |
+ super(AnnotatorStreamEngine, self).open() |
+ self.output_current_time() |
+ self.output_root_annotation('HONOR_ZERO_RETURN_CODE') |
+ |
+ def close(self): |
+ super(AnnotatorStreamEngine, self).close() |
+ self.output_current_time() |
- def output_annotation(self, *args): |
- self.basic_write( |
+ def output_current_time(self, step=None): |
+ """Prints CURRENT_TIMESTAMP annotation with current time.""" |
+ if step: |
+ self._step_cursor(step) |
+ if self.emit_timestamps: |
+ self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
+ |
+ @staticmethod |
+ def write_annotation(outstream, *args): |
+ # Flush the stream before & after engine annotations, because they can |
+ # change which step we are talking about and this matters to buildbot. |
+ outstream.flush() |
+ outstream.write( |
'@@@' + '@'.join(map(encode_str, args)) + '@@@\n') |
+ outstream.flush() |
- def write_line(self, line): |
- if line.startswith('@@@'): |
- self.basic_write('!' + line + '\n') |
- else: |
- self.basic_write(line + '\n') |
+ def output_root_annotation(self, *args): |
+ self.write_annotation(self._outstream, *args) |
- def close(self): |
- if self.emit_timestamps: |
- self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
- self.output_annotation('STEP_CLOSED') |
+ def _step_cursor(self, step_name): |
+ if self._current_step != step_name: |
+ self.output_root_annotation('STEP_CURSOR', step_name) |
+ self._current_step = step_name |
+ if step_name not in self._opened: |
+ self.output_current_time() |
+ self.output_root_annotation('STEP_STARTED') |
+ self._opened.add(step_name) |
- def new_log_stream(self, log_name): |
- return self.StepLogStream(self, log_name) |
+ class StepStream(StreamEngine.StepStream): |
+ def __init__(self, engine, outstream, step_name): |
+ super(StreamEngine.StepStream, self).__init__() |
- def add_step_text(self, text): |
- self.output_annotation('STEP_TEXT', text) |
+ self._engine = engine |
+ self._outstream = outstream |
+ self._step_name = step_name |
- def add_step_summary_text(self, text): |
- self.output_annotation('STEP_SUMMARY_TEXT', text) |
+ def basic_write(self, line): |
+ self._engine._step_cursor(self._step_name) |
+ self._outstream.write(line) |
- def add_step_link(self, name, url): |
- self.output_annotation('STEP_LINK', name, url) |
+ def close(self): |
+ self._engine.output_current_time(step=self._step_name) |
+ self.output_annotation('STEP_CLOSED') |
- def set_step_status(self, status): |
- if status == 'SUCCESS': |
- pass |
- elif status == 'WARNING': |
- self.output_annotation('STEP_WARNINGS') |
- elif status == 'FAILURE': |
- self.output_annotation('STEP_FAILURE') |
- elif status == 'EXCEPTION': |
- self.output_annotation('STEP_EXCEPTION') |
- else: |
- raise Exception('Impossible status %s' % status) |
+ def output_annotation(self, *args): |
+ self._engine._step_cursor(self._step_name) |
+ self._engine.write_annotation(self._outstream, *args) |
+ |
+ def write_line(self, line): |
+ if line.startswith('@@@'): |
+ self.basic_write('!' + line + '\n') |
+ else: |
+ self.basic_write(line + '\n') |
+ |
+ def new_log_stream(self, log_name): |
+ return self._engine.StepLogStream(self, log_name) |
+ |
+ def add_step_text(self, text): |
+ self.output_annotation('STEP_TEXT', text) |
+ |
+ def add_step_summary_text(self, text): |
+ self.output_annotation('STEP_SUMMARY_TEXT', text) |
+ |
+ def add_step_link(self, name, url): |
+ self.output_annotation('STEP_LINK', name, url) |
+ |
+ def set_step_status(self, status): |
+ if status == 'SUCCESS': |
+ pass |
+ elif status == 'WARNING': |
+ self.output_annotation('STEP_WARNINGS') |
+ elif status == 'FAILURE': |
+ self.output_annotation('STEP_FAILURE') |
+ elif status == 'EXCEPTION': |
+ self.output_annotation('STEP_EXCEPTION') |
+ else: |
+ raise Exception('Impossible status %s' % status) |
- def set_nest_level(self, nest_level): |
- self.output_annotation('STEP_NEST_LEVEL', str(nest_level)) |
+ def set_build_property(self, key, value): |
+ self.output_annotation('SET_BUILD_PROPERTY', key, value) |
- def set_build_property(self, key, value): |
- self.output_annotation('SET_BUILD_PROPERTY', key, value) |
+ def trigger(self, spec): |
+ self.output_annotation('STEP_TRIGGER', spec) |
- def trigger(self, spec): |
- self.output_annotation('STEP_TRIGGER', spec) |
class StepLogStream(StreamEngine.Stream): |
def __init__(self, step_stream, log_name): |
@@ -333,58 +382,6 @@ class AnnotationStepStream(StreamEngine.StepStream): |
self._step_stream.output_annotation('STEP_LOG_END', self._log_name) |
-class AnnotatorStreamEngine(StreamEngine): |
- def __init__(self, outstream, emit_timestamps=False, time_fn=None): |
- self._current_step = None |
- self._opened = set() |
- self._outstream = outstream |
- self.emit_timestamps = emit_timestamps |
- self.time_fn = time_fn or time.time |
- |
- def open(self): |
- super(AnnotatorStreamEngine, self).open() |
- if self.emit_timestamps: |
- self.output_current_time() |
- self.output_annotation('HONOR_ZERO_RETURN_CODE') |
- |
- def close(self): |
- super(AnnotatorStreamEngine, self).close() |
- if self.emit_timestamps: |
- self.output_current_time() |
- |
- def output_current_time(self): |
- """Prints CURRENT_TIMESTAMP annotation with current time.""" |
- self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
- |
- def output_annotation(self, *args): |
- # Flush the stream before & after engine annotations, because they can |
- # change which step we are talking about and this matters to buildbot. |
- self._outstream.flush() |
- self._outstream.write( |
- '@@@' + '@'.join(map(encode_str, args)) + '@@@\n') |
- self._outstream.flush() |
- |
- def _step_cursor(self, name): |
- if self._current_step != name: |
- self.output_annotation('STEP_CURSOR', name) |
- self._current_step = name |
- if name not in self._opened: |
- if self.emit_timestamps: |
- self.output_current_time() |
- self.output_annotation('STEP_STARTED') |
- self._opened.add(name) |
- |
- class StepStream(AnnotationStepStream): |
- def __init__(self, engine, step_name): |
- AnnotationStepStream.__init__( |
- self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn) |
- self._engine = engine |
- self._step_name = step_name |
- |
- def basic_write(self, line): |
- self._engine._step_cursor(self._step_name) |
- self._engine._outstream.write(line) |
- |
class AllowSubannotationsStepStream(StepStream): |
def write_line(self, line): |
self.basic_write(line + '\n') |
@@ -396,25 +393,23 @@ class AnnotatorStreamEngine(StreamEngine): |
def reset_subannotation_state(self): |
self._engine._current_step = None |
- def new_step_stream(self, step_name, allow_subannotations=False): |
- self.output_annotation('SEED_STEP', step_name) |
- if allow_subannotations: |
- return self.AllowSubannotationsStepStream(self, step_name) |
- else: |
- return self.StepStream(self, step_name) |
- |
-class BareAnnotationStepStream(AnnotationStepStream): |
- """A StepStream that is not tied to any engine, and emits assuming it has the |
- cursor. |
+ def _new_step_stream(self, step_name, allow_subannotations, nest_level): |
+ return self._create_step_stream(step_name, self._outstream, |
+ allow_subannotations, nest_level) |
- This is used for capturing the annotations in the engine. |
- """ |
- def __init__(self, outstream): |
- self._outstream = outstream |
+ def _create_step_stream(self, step_name, outstream, allow_subannotations, |
+ nest_level): |
+ self.output_root_annotation('SEED_STEP', step_name) |
+ if allow_subannotations: |
+ stream = self.AllowSubannotationsStepStream(self, outstream, step_name) |
+ else: |
+ stream = self.StepStream(self, outstream, step_name) |
- def basic_write(self, line): |
- self._outstream.write(encode_str(line)) |
+ if nest_level > 0: |
+ # Emit our current nest level, if we are nested. |
+ stream.output_annotation('STEP_NEST_LEVEL', str(nest_level)) |
+ return stream |
def encode_str(s): |