| Index: recipe_engine/stream.py
|
| diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
|
| index ce98d2079e88cc4d5f1a3ad443221e681992d483..dfd9e7373e27a2dacf284ba19e470aaaaec15267 100644
|
| --- a/recipe_engine/stream.py
|
| +++ b/recipe_engine/stream.py
|
| @@ -22,6 +22,7 @@ import time
|
|
|
| from . import env
|
| from . import recipe_api
|
| +from . import util
|
|
|
|
|
| class StreamEngine(object):
|
| @@ -108,36 +109,32 @@ class StreamEngine(object):
|
| # form products. This code is entirely mechanical from the types (if we
|
| # had them formalized...).
|
| class ProductStreamEngine(StreamEngine):
|
| - def __init__(self, engine_a, engine_b):
|
| - assert engine_a and engine_b
|
| - self._engine_a = engine_a
|
| - self._engine_b = engine_b
|
| + def __init__(self, base, *engines):
|
| + self._engines = [base] + list(engines)
|
| + assert all(self._engines)
|
|
|
| class Stream(StreamEngine.Stream):
|
| - def __init__(self, stream_a, stream_b):
|
| - assert stream_a and stream_b
|
| - self._stream_a = stream_a
|
| - self._stream_b = stream_b
|
| + def __init__(self, *streams):
|
| + assert all(streams)
|
| + self._streams = streams
|
|
|
| def write_line(self, line):
|
| - self._stream_a.write_line(line)
|
| - self._stream_b.write_line(line)
|
| + for s in self._streams:
|
| + s.write_line(line)
|
|
|
| def close(self):
|
| - self._stream_a.close()
|
| - self._stream_b.close()
|
| + util.defer_exceptions_for(self._streams, lambda s: s.close())
|
|
|
| class StepStream(Stream):
|
| def _void_product(method_name):
|
| def inner(self, *args):
|
| - getattr(self._stream_a, method_name)(*args)
|
| - getattr(self._stream_b, method_name)(*args)
|
| + for s in self._streams:
|
| + getattr(s, method_name)(*args)
|
| return inner
|
|
|
| def new_log_stream(self, log_name):
|
| return ProductStreamEngine.Stream(
|
| - self._stream_a.new_log_stream(log_name),
|
| - self._stream_b.new_log_stream(log_name))
|
| + *(se.new_log_stream(log_name) for se in self._streams))
|
|
|
| add_step_text = _void_product('add_step_text')
|
| add_step_summary_text = _void_product('add_step_summary_text')
|
| @@ -149,16 +146,19 @@ class ProductStreamEngine(StreamEngine):
|
|
|
| def new_step_stream(self, step_config):
|
| return self.StepStream(
|
| - self._engine_a.new_step_stream(step_config),
|
| - self._engine_b.new_step_stream(step_config))
|
| + *(se.new_step_stream(step_config)
|
| + for se in self._engines))
|
|
|
| def open(self):
|
| - self._engine_a.open()
|
| - self._engine_b.open()
|
| + for se in self._engines:
|
| + se.open()
|
|
|
| def close(self):
|
| - self._engine_a.close()
|
| - self._engine_b.close()
|
| + util.defer_exceptions_for(self._engines, lambda se: se.close())
|
| +
|
| + def append_stream_engine(self, se):
|
| + assert isinstance(se, StreamEngine)
|
| + self._engines.append(se)
|
|
|
|
|
| def _noop(*args, **kwargs):
|
|
|