| Index: recipe_engine/stream.py
|
| diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
|
| index da532cc89083211c0e86459a59d15231e607eebd..84de2b67c25cff2a1c1e269653ff3068d0575a1a 100644
|
| --- a/recipe_engine/stream.py
|
| +++ b/recipe_engine/stream.py
|
| @@ -18,6 +18,7 @@ can just write to without worrying.
|
| """
|
|
|
| import json
|
| +import time
|
|
|
| class StreamEngine(object):
|
| class Stream(object):
|
| @@ -68,7 +69,7 @@ class StreamEngine(object):
|
| raise NotImplementedError()
|
|
|
| def new_step_stream(self, step_name, allow_subannotations=False):
|
| - """Craete a new StepStream in this engine.
|
| + """Creates a new StepStream in this engine.
|
|
|
| The step will be considered started at the moment this method is called.
|
|
|
| @@ -81,6 +82,19 @@ class StreamEngine(object):
|
|
|
| raise NotImplementedError()
|
|
|
| + def open(self):
|
| + pass
|
| +
|
| + def close(self):
|
| + pass
|
| +
|
| + def __enter__(self):
|
| + self.open()
|
| + return self
|
| +
|
| + def __exit__(self, _exc_type, _exc_val, _exc_tb):
|
| + self.close()
|
| +
|
|
|
| # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
|
| # form products. This code is entirely mechanical from the types (if we
|
| @@ -129,6 +143,14 @@ class ProductStreamEngine(StreamEngine):
|
| self._engine_a.new_step_stream(step_name, allow_subannotations),
|
| self._engine_b.new_step_stream(step_name, allow_subannotations))
|
|
|
| + def open(self):
|
| + self._engine_a.open()
|
| + self._engine_b.open()
|
| +
|
| + def close(self):
|
| + self._engine_a.close()
|
| + self._engine_b.close()
|
| +
|
|
|
| def _noop(*args, **kwargs):
|
| pass
|
| @@ -241,11 +263,15 @@ class StreamEngineInvariants(StreamEngine):
|
|
|
|
|
| class AnnotationStepStream(StreamEngine.StepStream):
|
| + def __init__(self, emit_timestamps=False, time_fn=None):
|
| + self.emit_timestamps = emit_timestamps
|
| + self.time_fn = time_fn or time.time
|
| +
|
| def basic_write(self, line):
|
| raise NotImplementedError()
|
|
|
| def output_annotation(self, *args):
|
| - self.basic_write('@@@' + '@'.join(args) + '@@@\n')
|
| + self.basic_write('@@@' + '@'.join(map(str, args)) + '@@@\n')
|
|
|
| def write_line(self, line):
|
| if line.startswith('@@@'):
|
| @@ -254,6 +280,8 @@ class AnnotationStepStream(StreamEngine.StepStream):
|
| self.basic_write(line + '\n')
|
|
|
| def close(self):
|
| + if self.emit_timestamps:
|
| + self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
|
| self.output_annotation('STEP_CLOSED')
|
|
|
| def new_log_stream(self, log_name):
|
| @@ -302,17 +330,33 @@ class AnnotationStepStream(StreamEngine.StepStream):
|
|
|
|
|
| class AnnotatorStreamEngine(StreamEngine):
|
| - def __init__(self, outstream):
|
| + def __init__(self, outstream, emit_timestamps=False, time_fn=None):
|
| self._current_step = None
|
| self._opened = set()
|
| self._outstream = outstream
|
| + self.emit_timestamps = emit_timestamps
|
| + self.time_fn = time_fn or time.time
|
| +
|
| + def open(self):
|
| + super(AnnotatorStreamEngine, self).open()
|
| + if self.emit_timestamps:
|
| + self.output_current_time()
|
| self.output_annotation('HONOR_ZERO_RETURN_CODE')
|
|
|
| + def close(self):
|
| + super(AnnotatorStreamEngine, self).close()
|
| + if self.emit_timestamps:
|
| + self.output_current_time()
|
| +
|
| + def output_current_time(self):
|
| + """Prints CURRENT_TIMESTAMP annotation with current time."""
|
| + self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
|
| +
|
| def output_annotation(self, *args):
|
| # Flush the stream before & after engine annotations, because they can
|
| # change which step we are talking about and this matters to buildbot.
|
| self._outstream.flush()
|
| - self._outstream.write('@@@' + '@'.join(args) + '@@@\n')
|
| + self._outstream.write('@@@' + '@'.join(map(str, args)) + '@@@\n')
|
| self._outstream.flush()
|
|
|
| def _step_cursor(self, name):
|
| @@ -320,11 +364,15 @@ class AnnotatorStreamEngine(StreamEngine):
|
| self.output_annotation('STEP_CURSOR', name)
|
| self._current_step = name
|
| if name not in self._opened:
|
| + if self.emit_timestamps:
|
| + self.output_current_time()
|
| self.output_annotation('STEP_STARTED')
|
| self._opened.add(name)
|
|
|
| class StepStream(AnnotationStepStream):
|
| def __init__(self, engine, step_name):
|
| + AnnotationStepStream.__init__(
|
| + self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn)
|
| self._engine = engine
|
| self._step_name = step_name
|
|
|
| @@ -362,4 +410,3 @@ class BareAnnotationStepStream(AnnotationStepStream):
|
|
|
| def basic_write(self, line):
|
| self._outstream.write(line)
|
| -
|
|
|