Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(284)

Unified Diff: recipe_engine/stream.py

Issue 2245113002: Track step nesting in StreamEngine. (Closed) Base URL: https://github.com/luci/recipes-py@emit-initial-properties
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: recipe_engine/stream.py
diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
index 4aeb9a6fcb68eddd741d7fc1c8f11b99bf07420d..26bd989310ffa31fd624fe4e720d8dc35f9bb9d9 100644
--- a/recipe_engine/stream.py
+++ b/recipe_engine/stream.py
@@ -62,16 +62,14 @@ class StreamEngine(object):
def set_step_status(self, status):
raise NotImplementedError()
- def set_nest_level(self, nest_level):
dnj 2016/08/15 17:33:53 diediediediediediedie
- raise NotImplementedError()
-
def set_build_property(self, key, value):
raise NotImplementedError()
def trigger(self, trigger_spec):
raise NotImplementedError()
- def new_step_stream(self, step_name, allow_subannotations=False):
+ def new_step_stream(self, step_name, allow_subannotations=False,
+ nested=False):
"""Creates a new StepStream in this engine.
The step will be considered started at the moment this method is called.
@@ -81,8 +79,11 @@ class StreamEngine(object):
guard them by prefixing them with ! (False). The proper way to do this
is to implement an annotations parser that converts to StreamEngine calls;
i.e. parse -> re-emit.
- """
+ Args:
+ nested (bool): If True, this is a nested step, and should be visually
+ nested under the current step.
+ """
raise NotImplementedError()
def open(self):
@@ -104,11 +105,13 @@ class StreamEngine(object):
# had them formalized...).
class ProductStreamEngine(StreamEngine):
def __init__(self, engine_a, engine_b):
+ assert engine_a and engine_b
dnj 2016/08/15 17:33:53 Useful assertions, not relevant to CL.
self._engine_a = engine_a
self._engine_b = engine_b
class Stream(StreamEngine.Stream):
def __init__(self, stream_a, stream_b):
+ assert stream_a and stream_b
self._stream_a = stream_a
self._stream_b = stream_b
@@ -137,14 +140,13 @@ class ProductStreamEngine(StreamEngine):
add_step_link = _void_product('add_step_link')
reset_subannotation_state = _void_product('reset_subannotation_state')
set_step_status = _void_product('set_step_status')
- set_nest_level = _void_product('set_nest_level')
set_build_property = _void_product('set_build_property')
trigger = _void_product('trigger')
- def new_step_stream(self, step_name, allow_subannotations=False):
+ def new_step_stream(self, step_name, **kwargs):
return self.StepStream(
- self._engine_a.new_step_stream(step_name, allow_subannotations),
- self._engine_b.new_step_stream(step_name, allow_subannotations))
+ self._engine_a.new_step_stream(step_name, **kwargs),
+ self._engine_b.new_step_stream(step_name, **kwargs))
def open(self):
self._engine_a.open()
@@ -171,11 +173,10 @@ class NoopStreamEngine(StreamEngine):
add_step_link = _noop
reset_subannotation_state = _noop
set_step_status = _noop
- set_nest_level = _noop
set_build_property = _noop
trigger = _noop
- def new_step_stream(self, step_name, allow_subannotations=False):
+ def new_step_stream(self, step_name, **kwargs):
return self.StepStream()
@@ -233,9 +234,6 @@ class StreamEngineInvariants(StreamEngine):
'Cannot set successful status after status is %s' % self._status)
self._status = status
- def set_nest_level(self, nest_level):
- assert isinstance(nest_level, int)
-
def set_build_property(self, key, value):
pass
@@ -259,83 +257,16 @@ class StreamEngineInvariants(StreamEngine):
assert self._open
self._open = False
- def new_step_stream(self, step_name, allow_subannotations=False):
+ def new_step_stream(self, step_name, **kwargs):
assert step_name not in self._streams, 'Step %s already exists' % step_name
self._streams.add(step_name)
return self.StepStream(self, step_name)
-class AnnotationStepStream(StreamEngine.StepStream):
dnj 2016/08/15 17:33:53 Because we no longer have a separate simulation An
- def __init__(self, emit_timestamps=False, time_fn=None):
- self.emit_timestamps = emit_timestamps
- self.time_fn = time_fn or time.time
-
- def basic_write(self, line):
- raise NotImplementedError()
-
- def output_annotation(self, *args):
- self.basic_write(
- '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
-
- def write_line(self, line):
- if line.startswith('@@@'):
- self.basic_write('!' + line + '\n')
- else:
- self.basic_write(line + '\n')
-
- def close(self):
- if self.emit_timestamps:
- self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
- self.output_annotation('STEP_CLOSED')
-
- def new_log_stream(self, log_name):
- return self.StepLogStream(self, log_name)
-
- def add_step_text(self, text):
- self.output_annotation('STEP_TEXT', text)
-
- def add_step_summary_text(self, text):
- self.output_annotation('STEP_SUMMARY_TEXT', text)
-
- def add_step_link(self, name, url):
- self.output_annotation('STEP_LINK', name, url)
-
- def set_step_status(self, status):
- if status == 'SUCCESS':
- pass
- elif status == 'WARNING':
- self.output_annotation('STEP_WARNINGS')
- elif status == 'FAILURE':
- self.output_annotation('STEP_FAILURE')
- elif status == 'EXCEPTION':
- self.output_annotation('STEP_EXCEPTION')
- else:
- raise Exception('Impossible status %s' % status)
-
- def set_nest_level(self, nest_level):
- self.output_annotation('STEP_NEST_LEVEL', str(nest_level))
-
- def set_build_property(self, key, value):
- self.output_annotation('SET_BUILD_PROPERTY', key, value)
-
- def trigger(self, spec):
- self.output_annotation('STEP_TRIGGER', spec)
-
- class StepLogStream(StreamEngine.Stream):
- def __init__(self, step_stream, log_name):
- self._step_stream = step_stream
- self._log_name = log_name.replace('/', '/')
-
- def write_line(self, line):
- self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
-
- def close(self):
- self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
-
-
class AnnotatorStreamEngine(StreamEngine):
def __init__(self, outstream, emit_timestamps=False, time_fn=None):
self._current_step = None
+ self._current_nest_level = 0
dnj 2016/08/15 17:33:53 The AnnotatorStreamEngine now tracks the nest leve
self._opened = set()
self._outstream = outstream
self.emit_timestamps = emit_timestamps
@@ -343,47 +274,109 @@ class AnnotatorStreamEngine(StreamEngine):
def open(self):
super(AnnotatorStreamEngine, self).open()
- if self.emit_timestamps:
- self.output_current_time()
- self.output_annotation('HONOR_ZERO_RETURN_CODE')
+ self.output_current_time()
+ self.output_root_annotation('HONOR_ZERO_RETURN_CODE')
def close(self):
super(AnnotatorStreamEngine, self).close()
- if self.emit_timestamps:
- self.output_current_time()
+ self.output_current_time()
- def output_current_time(self):
+ def output_current_time(self, step=None):
"""Prints CURRENT_TIMESTAMP annotation with current time."""
- self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
+ if step:
+ self._step_cursor(step)
+ if self.emit_timestamps:
+ self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn())
- def output_annotation(self, *args):
+ @staticmethod
+ def write_annotation(outstream, *args):
# Flush the stream before & after engine annotations, because they can
# change which step we are talking about and this matters to buildbot.
- self._outstream.flush()
- self._outstream.write(
+ outstream.flush()
+ outstream.write(
'@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
- self._outstream.flush()
+ outstream.flush()
+
+ def output_root_annotation(self, *args):
+ self.write_annotation(self._outstream, *args)
def _step_cursor(self, name):
if self._current_step != name:
- self.output_annotation('STEP_CURSOR', name)
+ self.output_root_annotation('STEP_CURSOR', name)
self._current_step = name
if name not in self._opened:
- if self.emit_timestamps:
- self.output_current_time()
- self.output_annotation('STEP_STARTED')
+ self.output_current_time()
+ self.output_root_annotation('STEP_STARTED')
self._opened.add(name)
- class StepStream(AnnotationStepStream):
- def __init__(self, engine, step_name):
- AnnotationStepStream.__init__(
- self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn)
+ class StepStream(StreamEngine.StepStream):
+ def __init__(self, engine, outstream, step_name, nested):
+ super(StreamEngine.StepStream, self).__init__()
+
self._engine = engine
+ self._outstream = outstream
self._step_name = step_name
+ self._nested = nested
def basic_write(self, line):
self._engine._step_cursor(self._step_name)
- self._engine._outstream.write(line)
+ self._outstream.write(line)
+
+ def close(self):
+ self._engine._notify_step_finished(self)
dnj 2016/08/15 17:33:53 This is new, see below.
+
+ def output_annotation(self, *args):
dnj 2016/08/15 17:33:53 Ack, am going to rewrite this in terms of 'write_a
+ self.basic_write(
+ '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
+
+ def write_line(self, line):
dnj 2016/08/15 17:33:53 (All of this is copy/paste from outer class when i
+ if line.startswith('@@@'):
+ self.basic_write('!' + line + '\n')
+ else:
+ self.basic_write(line + '\n')
+
+ def new_log_stream(self, log_name):
+ return self._engine.StepLogStream(self, log_name)
+
+ def add_step_text(self, text):
+ self.output_annotation('STEP_TEXT', text)
+
+ def add_step_summary_text(self, text):
+ self.output_annotation('STEP_SUMMARY_TEXT', text)
+
+ def add_step_link(self, name, url):
+ self.output_annotation('STEP_LINK', name, url)
+
+ def set_step_status(self, status):
+ if status == 'SUCCESS':
+ pass
+ elif status == 'WARNING':
+ self.output_annotation('STEP_WARNINGS')
+ elif status == 'FAILURE':
+ self.output_annotation('STEP_FAILURE')
+ elif status == 'EXCEPTION':
+ self.output_annotation('STEP_EXCEPTION')
+ else:
+ raise Exception('Impossible status %s' % status)
+
+ def set_build_property(self, key, value):
+ self.output_annotation('SET_BUILD_PROPERTY', key, value)
+
+ def trigger(self, spec):
+ self.output_annotation('STEP_TRIGGER', spec)
+
+
+ class StepLogStream(StreamEngine.Stream):
+ def __init__(self, step_stream, log_name):
+ self._step_stream = step_stream
+ self._log_name = log_name.replace('/', '/')
+
+ def write_line(self, line):
+ self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
+
+ def close(self):
+ self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
+
class AllowSubannotationsStepStream(StepStream):
def write_line(self, line):
@@ -396,25 +389,36 @@ class AnnotatorStreamEngine(StreamEngine):
def reset_subannotation_state(self):
self._engine._current_step = None
- def new_step_stream(self, step_name, allow_subannotations=False):
- self.output_annotation('SEED_STEP', step_name)
- if allow_subannotations:
- return self.AllowSubannotationsStepStream(self, step_name)
- else:
- return self.StepStream(self, step_name)
-
-class BareAnnotationStepStream(AnnotationStepStream):
- """A StepStream that is not tied to any engine, and emits assuming it has the
- cursor.
+ def new_step_stream(self, step_name, allow_subannotations=False,
+ nested=False):
+ self.output_root_annotation('SEED_STEP', step_name)
+ return self._create_step_stream(step_name, self._outstream,
+ allow_subannotations, nested)
- This is used for capturing the annotations in the engine.
- """
- def __init__(self, outstream):
- self._outstream = outstream
-
- def basic_write(self, line):
- self._outstream.write(encode_str(line))
+ def _create_step_stream(self, step_name, outstream,
+ allow_subannotations, nested):
+ if allow_subannotations:
+ stream = self.AllowSubannotationsStepStream(self, outstream, step_name,
+ nested)
+ else:
+ stream = self.StepStream(self, outstream, step_name, nested)
+
+ if nested:
+ # Increase our current nest level and emit a nested annotation for this
+ # step. This will be decreased when the step finishes.
+ self._current_nest_level += 1
+ stream.output_annotation('STEP_NEST_LEVEL', str(self._current_nest_level))
+ return stream
+
+ def _notify_step_finished(self, step_stream):
+ if step_stream._nested:
dnj 2016/08/15 17:33:53 (Called when a step is closed. I put this here ins
+ orig = self._current_nest_level
+ self._current_nest_level -= 1
+ assert self._current_nest_level >= 0
+
+ self.output_current_time(step=step_stream._step_name)
+ step_stream.output_annotation('STEP_CLOSED')
def encode_str(s):

Powered by Google App Engine
This is Rietveld 408576698