Chromium Code Reviews| Index: recipe_engine/stream.py |
| diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py |
| index 4aeb9a6fcb68eddd741d7fc1c8f11b99bf07420d..26bd989310ffa31fd624fe4e720d8dc35f9bb9d9 100644 |
| --- a/recipe_engine/stream.py |
| +++ b/recipe_engine/stream.py |
| @@ -62,16 +62,14 @@ class StreamEngine(object): |
| def set_step_status(self, status): |
| raise NotImplementedError() |
| - def set_nest_level(self, nest_level): |
|
dnj
2016/08/15 17:33:53
diediediediediediedie
|
| - raise NotImplementedError() |
| - |
| def set_build_property(self, key, value): |
| raise NotImplementedError() |
| def trigger(self, trigger_spec): |
| raise NotImplementedError() |
| - def new_step_stream(self, step_name, allow_subannotations=False): |
| + def new_step_stream(self, step_name, allow_subannotations=False, |
| + nested=False): |
| """Creates a new StepStream in this engine. |
| The step will be considered started at the moment this method is called. |
| @@ -81,8 +79,11 @@ class StreamEngine(object): |
| guard them by prefixing them with ! (False). The proper way to do this |
| is to implement an annotations parser that converts to StreamEngine calls; |
| i.e. parse -> re-emit. |
| - """ |
| + Args: |
| + nested (bool): If True, this is a nested step, and should be visually |
| + nested under the current step. |
| + """ |
| raise NotImplementedError() |
| def open(self): |
| @@ -104,11 +105,13 @@ class StreamEngine(object): |
| # had them formalized...). |
| class ProductStreamEngine(StreamEngine): |
| def __init__(self, engine_a, engine_b): |
| + assert engine_a and engine_b |
|
dnj
2016/08/15 17:33:53
Useful assertions, not relevant to CL.
|
| self._engine_a = engine_a |
| self._engine_b = engine_b |
| class Stream(StreamEngine.Stream): |
| def __init__(self, stream_a, stream_b): |
| + assert stream_a and stream_b |
| self._stream_a = stream_a |
| self._stream_b = stream_b |
| @@ -137,14 +140,13 @@ class ProductStreamEngine(StreamEngine): |
| add_step_link = _void_product('add_step_link') |
| reset_subannotation_state = _void_product('reset_subannotation_state') |
| set_step_status = _void_product('set_step_status') |
| - set_nest_level = _void_product('set_nest_level') |
| set_build_property = _void_product('set_build_property') |
| trigger = _void_product('trigger') |
| - def new_step_stream(self, step_name, allow_subannotations=False): |
| + def new_step_stream(self, step_name, **kwargs): |
| return self.StepStream( |
| - self._engine_a.new_step_stream(step_name, allow_subannotations), |
| - self._engine_b.new_step_stream(step_name, allow_subannotations)) |
| + self._engine_a.new_step_stream(step_name, **kwargs), |
| + self._engine_b.new_step_stream(step_name, **kwargs)) |
| def open(self): |
| self._engine_a.open() |
| @@ -171,11 +173,10 @@ class NoopStreamEngine(StreamEngine): |
| add_step_link = _noop |
| reset_subannotation_state = _noop |
| set_step_status = _noop |
| - set_nest_level = _noop |
| set_build_property = _noop |
| trigger = _noop |
| - def new_step_stream(self, step_name, allow_subannotations=False): |
| + def new_step_stream(self, step_name, **kwargs): |
| return self.StepStream() |
| @@ -233,9 +234,6 @@ class StreamEngineInvariants(StreamEngine): |
| 'Cannot set successful status after status is %s' % self._status) |
| self._status = status |
| - def set_nest_level(self, nest_level): |
| - assert isinstance(nest_level, int) |
| - |
| def set_build_property(self, key, value): |
| pass |
| @@ -259,83 +257,16 @@ class StreamEngineInvariants(StreamEngine): |
| assert self._open |
| self._open = False |
| - def new_step_stream(self, step_name, allow_subannotations=False): |
| + def new_step_stream(self, step_name, **kwargs): |
| assert step_name not in self._streams, 'Step %s already exists' % step_name |
| self._streams.add(step_name) |
| return self.StepStream(self, step_name) |
| -class AnnotationStepStream(StreamEngine.StepStream): |
|
dnj
2016/08/15 17:33:53
Because we no longer have a separate simulation An
|
| - def __init__(self, emit_timestamps=False, time_fn=None): |
| - self.emit_timestamps = emit_timestamps |
| - self.time_fn = time_fn or time.time |
| - |
| - def basic_write(self, line): |
| - raise NotImplementedError() |
| - |
| - def output_annotation(self, *args): |
| - self.basic_write( |
| - '@@@' + '@'.join(map(encode_str, args)) + '@@@\n') |
| - |
| - def write_line(self, line): |
| - if line.startswith('@@@'): |
| - self.basic_write('!' + line + '\n') |
| - else: |
| - self.basic_write(line + '\n') |
| - |
| - def close(self): |
| - if self.emit_timestamps: |
| - self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
| - self.output_annotation('STEP_CLOSED') |
| - |
| - def new_log_stream(self, log_name): |
| - return self.StepLogStream(self, log_name) |
| - |
| - def add_step_text(self, text): |
| - self.output_annotation('STEP_TEXT', text) |
| - |
| - def add_step_summary_text(self, text): |
| - self.output_annotation('STEP_SUMMARY_TEXT', text) |
| - |
| - def add_step_link(self, name, url): |
| - self.output_annotation('STEP_LINK', name, url) |
| - |
| - def set_step_status(self, status): |
| - if status == 'SUCCESS': |
| - pass |
| - elif status == 'WARNING': |
| - self.output_annotation('STEP_WARNINGS') |
| - elif status == 'FAILURE': |
| - self.output_annotation('STEP_FAILURE') |
| - elif status == 'EXCEPTION': |
| - self.output_annotation('STEP_EXCEPTION') |
| - else: |
| - raise Exception('Impossible status %s' % status) |
| - |
| - def set_nest_level(self, nest_level): |
| - self.output_annotation('STEP_NEST_LEVEL', str(nest_level)) |
| - |
| - def set_build_property(self, key, value): |
| - self.output_annotation('SET_BUILD_PROPERTY', key, value) |
| - |
| - def trigger(self, spec): |
| - self.output_annotation('STEP_TRIGGER', spec) |
| - |
| - class StepLogStream(StreamEngine.Stream): |
| - def __init__(self, step_stream, log_name): |
| - self._step_stream = step_stream |
| - self._log_name = log_name.replace('/', '/') |
| - |
| - def write_line(self, line): |
| - self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line) |
| - |
| - def close(self): |
| - self._step_stream.output_annotation('STEP_LOG_END', self._log_name) |
| - |
| - |
| class AnnotatorStreamEngine(StreamEngine): |
| def __init__(self, outstream, emit_timestamps=False, time_fn=None): |
| self._current_step = None |
| + self._current_nest_level = 0 |
|
dnj
2016/08/15 17:33:53
The AnnotatorStreamEngine now tracks the nest leve
|
| self._opened = set() |
| self._outstream = outstream |
| self.emit_timestamps = emit_timestamps |
| @@ -343,47 +274,109 @@ class AnnotatorStreamEngine(StreamEngine): |
| def open(self): |
| super(AnnotatorStreamEngine, self).open() |
| - if self.emit_timestamps: |
| - self.output_current_time() |
| - self.output_annotation('HONOR_ZERO_RETURN_CODE') |
| + self.output_current_time() |
| + self.output_root_annotation('HONOR_ZERO_RETURN_CODE') |
| def close(self): |
| super(AnnotatorStreamEngine, self).close() |
| - if self.emit_timestamps: |
| - self.output_current_time() |
| + self.output_current_time() |
| - def output_current_time(self): |
| + def output_current_time(self, step=None): |
| """Prints CURRENT_TIMESTAMP annotation with current time.""" |
| - self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
| + if step: |
| + self._step_cursor(step) |
| + if self.emit_timestamps: |
| + self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
| - def output_annotation(self, *args): |
| + @staticmethod |
| + def write_annotation(outstream, *args): |
| # Flush the stream before & after engine annotations, because they can |
| # change which step we are talking about and this matters to buildbot. |
| - self._outstream.flush() |
| - self._outstream.write( |
| + outstream.flush() |
| + outstream.write( |
| '@@@' + '@'.join(map(encode_str, args)) + '@@@\n') |
| - self._outstream.flush() |
| + outstream.flush() |
| + |
| + def output_root_annotation(self, *args): |
| + self.write_annotation(self._outstream, *args) |
| def _step_cursor(self, name): |
| if self._current_step != name: |
| - self.output_annotation('STEP_CURSOR', name) |
| + self.output_root_annotation('STEP_CURSOR', name) |
| self._current_step = name |
| if name not in self._opened: |
| - if self.emit_timestamps: |
| - self.output_current_time() |
| - self.output_annotation('STEP_STARTED') |
| + self.output_current_time() |
| + self.output_root_annotation('STEP_STARTED') |
| self._opened.add(name) |
| - class StepStream(AnnotationStepStream): |
| - def __init__(self, engine, step_name): |
| - AnnotationStepStream.__init__( |
| - self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn) |
| + class StepStream(StreamEngine.StepStream): |
| + def __init__(self, engine, outstream, step_name, nested): |
| + super(StreamEngine.StepStream, self).__init__() |
| + |
| self._engine = engine |
| + self._outstream = outstream |
| self._step_name = step_name |
| + self._nested = nested |
| def basic_write(self, line): |
| self._engine._step_cursor(self._step_name) |
| - self._engine._outstream.write(line) |
| + self._outstream.write(line) |
| + |
| + def close(self): |
| + self._engine._notify_step_finished(self) |
|
dnj
2016/08/15 17:33:53
This is new, see below.
|
| + |
| + def output_annotation(self, *args): |
|
dnj
2016/08/15 17:33:53
Ack, am going to rewrite this in terms of 'write_a
|
| + self.basic_write( |
| + '@@@' + '@'.join(map(encode_str, args)) + '@@@\n') |
| + |
| + def write_line(self, line): |
|
dnj
2016/08/15 17:33:53
(All of this is copy/paste from outer class when i
|
| + if line.startswith('@@@'): |
| + self.basic_write('!' + line + '\n') |
| + else: |
| + self.basic_write(line + '\n') |
| + |
| + def new_log_stream(self, log_name): |
| + return self._engine.StepLogStream(self, log_name) |
| + |
| + def add_step_text(self, text): |
| + self.output_annotation('STEP_TEXT', text) |
| + |
| + def add_step_summary_text(self, text): |
| + self.output_annotation('STEP_SUMMARY_TEXT', text) |
| + |
| + def add_step_link(self, name, url): |
| + self.output_annotation('STEP_LINK', name, url) |
| + |
| + def set_step_status(self, status): |
| + if status == 'SUCCESS': |
| + pass |
| + elif status == 'WARNING': |
| + self.output_annotation('STEP_WARNINGS') |
| + elif status == 'FAILURE': |
| + self.output_annotation('STEP_FAILURE') |
| + elif status == 'EXCEPTION': |
| + self.output_annotation('STEP_EXCEPTION') |
| + else: |
| + raise Exception('Impossible status %s' % status) |
| + |
| + def set_build_property(self, key, value): |
| + self.output_annotation('SET_BUILD_PROPERTY', key, value) |
| + |
| + def trigger(self, spec): |
| + self.output_annotation('STEP_TRIGGER', spec) |
| + |
| + |
| + class StepLogStream(StreamEngine.Stream): |
| + def __init__(self, step_stream, log_name): |
| + self._step_stream = step_stream |
| + self._log_name = log_name.replace('/', '/') |
| + |
| + def write_line(self, line): |
| + self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line) |
| + |
| + def close(self): |
| + self._step_stream.output_annotation('STEP_LOG_END', self._log_name) |
| + |
| class AllowSubannotationsStepStream(StepStream): |
| def write_line(self, line): |
| @@ -396,25 +389,36 @@ class AnnotatorStreamEngine(StreamEngine): |
| def reset_subannotation_state(self): |
| self._engine._current_step = None |
| - def new_step_stream(self, step_name, allow_subannotations=False): |
| - self.output_annotation('SEED_STEP', step_name) |
| - if allow_subannotations: |
| - return self.AllowSubannotationsStepStream(self, step_name) |
| - else: |
| - return self.StepStream(self, step_name) |
| - |
| -class BareAnnotationStepStream(AnnotationStepStream): |
| - """A StepStream that is not tied to any engine, and emits assuming it has the |
| - cursor. |
| + def new_step_stream(self, step_name, allow_subannotations=False, |
| + nested=False): |
| + self.output_root_annotation('SEED_STEP', step_name) |
| + return self._create_step_stream(step_name, self._outstream, |
| + allow_subannotations, nested) |
| - This is used for capturing the annotations in the engine. |
| - """ |
| - def __init__(self, outstream): |
| - self._outstream = outstream |
| - |
| - def basic_write(self, line): |
| - self._outstream.write(encode_str(line)) |
| + def _create_step_stream(self, step_name, outstream, |
| + allow_subannotations, nested): |
| + if allow_subannotations: |
| + stream = self.AllowSubannotationsStepStream(self, outstream, step_name, |
| + nested) |
| + else: |
| + stream = self.StepStream(self, outstream, step_name, nested) |
| + |
| + if nested: |
| + # Increase our current nest level and emit a nested annotation for this |
| + # step. This will be decreased when the step finishes. |
| + self._current_nest_level += 1 |
| + stream.output_annotation('STEP_NEST_LEVEL', str(self._current_nest_level)) |
| + return stream |
| + |
| + def _notify_step_finished(self, step_stream): |
| + if step_stream._nested: |
|
dnj
2016/08/15 17:33:53
(Called when a step is closed. I put this here ins
|
| + orig = self._current_nest_level |
| + self._current_nest_level -= 1 |
| + assert self._current_nest_level >= 0 |
| + |
| + self.output_current_time(step=step_stream._step_name) |
| + step_stream.output_annotation('STEP_CLOSED') |
| def encode_str(s): |