| Index: recipe_engine/stream.py
|
| diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
|
| index ce98d2079e88cc4d5f1a3ad443221e681992d483..4be99e2fcd754a39ca5e9c15051cdff511aaa3bb 100644
|
| --- a/recipe_engine/stream.py
|
| +++ b/recipe_engine/stream.py
|
| @@ -22,6 +22,7 @@ import time
|
|
|
| from . import env
|
| from . import recipe_api
|
| +from . import util
|
|
|
|
|
| class StreamEngine(object):
|
| @@ -104,10 +105,18 @@ class StreamEngine(object):
|
| self.close()
|
|
|
|
|
| -# Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
|
| -# form products. This code is entirely mechanical from the types (if we
|
| -# had them formalized...).
|
| class ProductStreamEngine(StreamEngine):
|
| + """A StreamEngine that forms the non-commutative product of two other
|
| + StreamEngines.
|
| +
|
| + Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
|
| + form products. This code is entirely mechanical from the types (if we
|
| + had them formalized...).
|
| +
|
| + This product is non-commutative, meaning order matters. Specifically, an
|
| + exception in "engine_a" will prevent "engine_b" from being evaluated.
|
| + """
|
| +
|
| def __init__(self, engine_a, engine_b):
|
| assert engine_a and engine_b
|
| self._engine_a = engine_a
|
| @@ -128,6 +137,7 @@ class ProductStreamEngine(StreamEngine):
|
| self._stream_b.close()
|
|
|
| class StepStream(Stream):
|
| + # pylint: disable=no-self-argument
|
| def _void_product(method_name):
|
| def inner(self, *args):
|
| getattr(self._stream_a, method_name)(*args)
|
| @@ -161,7 +171,81 @@ class ProductStreamEngine(StreamEngine):
|
| self._engine_b.close()
|
|
|
|
|
| -def _noop(*args, **kwargs):
|
| +class MultiStreamEngine(StreamEngine):
|
| + """A StreamEngine consisting of one or more inner StreamEngines.
|
| +
|
| + A call to this StreamEngine will be distributed to the inner StreamEngines.
|
| + Any exceptions that are caught during an inner handler will be deferred until
|
| + all inner handlers have been executed.
|
| + """
|
| +
|
| + def __init__(self, base, *engines):
|
| + self._engines = (base,) + engines
|
| + assert None not in self._engines
|
| +
|
| + @classmethod
|
| + def create(cls, *engines):
|
| + assert len(engines) > 0, 'At least one engine must be provided.'
|
| + if len(engines) == 1:
|
| + return engines[0]
|
| + return cls(engines[0], *engines[1:])
|
| +
|
| + class Stream(StreamEngine.Stream):
|
| + def __init__(self, *streams):
|
| + assert all(streams)
|
| + self._streams = streams
|
| +
|
| + def write_line(self, line):
|
| + util.map_defer_exceptions(lambda s: s.write_line(line), self._streams)
|
| +
|
| + def close(self):
|
| + util.map_defer_exceptions(lambda s: s.close(), self._streams)
|
| +
|
| + class StepStream(Stream):
|
| + # pylint: disable=no-self-argument
|
| + def _multiplex(method_name):
|
| + def inner(self, *args):
|
| + util.map_defer_exceptions(lambda s: getattr(s, method_name)(*args),
|
| + self._streams)
|
| + return inner
|
| +
|
| + def new_log_stream(self, log_name):
|
| + log_streams = []
|
| + try:
|
| + for s in self._streams:
|
| + log_streams.append(s.new_log_stream(log_name))
|
| + return MultiStreamEngine.Stream(*log_streams)
|
| + except Exception:
|
| + # Close any opened log streams.
|
| + util.map_defer_exceptions(lambda ls: ls.close(), log_streams)
|
| + raise
|
| +
|
| + add_step_text = _multiplex('add_step_text')
|
| + add_step_summary_text = _multiplex('add_step_summary_text')
|
| + add_step_link = _multiplex('add_step_link')
|
| + reset_subannotation_state = _multiplex('reset_subannotation_state')
|
| + set_step_status = _multiplex('set_step_status')
|
| + set_build_property = _multiplex('set_build_property')
|
| + trigger = _multiplex('trigger')
|
| +
|
| + def new_step_stream(self, step_config):
|
| + return self.StepStream(
|
| + *(se.new_step_stream(step_config)
|
| + for se in self._engines))
|
| +
|
| + def open(self):
|
| + for se in self._engines:
|
| + se.open()
|
| +
|
| + def close(self):
|
| + util.map_defer_exceptions(lambda se: se.close(), self._engines)
|
| +
|
| + def append_stream_engine(self, se):
|
| + assert isinstance(se, StreamEngine)
|
| + self._engines.append(se)
|
| +
|
| +
|
| +def _noop(*_args, **_kwargs):
|
| pass
|
|
|
| class NoopStreamEngine(StreamEngine):
|
| @@ -170,7 +254,7 @@ class NoopStreamEngine(StreamEngine):
|
| close = _noop
|
|
|
| class StepStream(Stream):
|
| - def new_log_stream(self, log_name):
|
| + def new_log_stream(self, _log_name):
|
| return NoopStreamEngine.Stream()
|
| add_step_text = _noop
|
| add_step_summary_text = _noop
|
| @@ -193,8 +277,15 @@ class StreamEngineInvariants(StreamEngine):
|
| def __init__(self):
|
| self._streams = set()
|
|
|
| + @classmethod
|
| + def wrap(cls, other):
|
| + """Returns (ProductStreamEngine): A product applying invariants to "other".
|
| + """
|
| + return ProductStreamEngine(cls(), other)
|
| +
|
| class StepStream(StreamEngine.StepStream):
|
| def __init__(self, engine, step_name):
|
| + super(StreamEngineInvariants.StepStream, self).__init__()
|
| self._engine = engine
|
| self._step_name = step_name
|
| self._open = True
|
| @@ -315,7 +406,7 @@ class AnnotatorStreamEngine(StreamEngine):
|
|
|
| class StepStream(StreamEngine.StepStream):
|
| def __init__(self, engine, outstream, step_name):
|
| - super(StreamEngine.StepStream, self).__init__()
|
| + super(AnnotatorStreamEngine.StepStream, self).__init__()
|
|
|
| self._engine = engine
|
| self._outstream = outstream
|
|
|