Index: recipe_engine/stream.py |
diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py |
index ce98d2079e88cc4d5f1a3ad443221e681992d483..dfd9e7373e27a2dacf284ba19e470aaaaec15267 100644 |
--- a/recipe_engine/stream.py |
+++ b/recipe_engine/stream.py |
@@ -22,6 +22,7 @@ import time |
from . import env |
from . import recipe_api |
+from . import util |
class StreamEngine(object): |
@@ -108,36 +109,32 @@ class StreamEngine(object): |
# form products. This code is entirely mechanical from the types (if we |
# had them formalized...). |
class ProductStreamEngine(StreamEngine): |
- def __init__(self, engine_a, engine_b): |
- assert engine_a and engine_b |
- self._engine_a = engine_a |
- self._engine_b = engine_b |
+ def __init__(self, base, *engines): |
+ self._engines = [base] + list(engines) |
+ assert all(self._engines) |
class Stream(StreamEngine.Stream): |
- def __init__(self, stream_a, stream_b): |
- assert stream_a and stream_b |
- self._stream_a = stream_a |
- self._stream_b = stream_b |
+ def __init__(self, *streams): |
+ assert all(streams) |
+ self._streams = streams |
def write_line(self, line): |
- self._stream_a.write_line(line) |
- self._stream_b.write_line(line) |
+ for s in self._streams: |
+ s.write_line(line) |
def close(self): |
- self._stream_a.close() |
- self._stream_b.close() |
+ util.defer_exceptions_for(self._streams, lambda s: s.close()) |
class StepStream(Stream): |
def _void_product(method_name): |
def inner(self, *args): |
- getattr(self._stream_a, method_name)(*args) |
- getattr(self._stream_b, method_name)(*args) |
+ for s in self._streams: |
+ getattr(s, method_name)(*args) |
return inner |
def new_log_stream(self, log_name): |
return ProductStreamEngine.Stream( |
- self._stream_a.new_log_stream(log_name), |
- self._stream_b.new_log_stream(log_name)) |
+ *(se.new_log_stream(log_name) for se in self._streams)) |
add_step_text = _void_product('add_step_text') |
add_step_summary_text = _void_product('add_step_summary_text') |
@@ -149,16 +146,19 @@ class ProductStreamEngine(StreamEngine): |
def new_step_stream(self, step_config): |
return self.StepStream( |
- self._engine_a.new_step_stream(step_config), |
- self._engine_b.new_step_stream(step_config)) |
+ *(se.new_step_stream(step_config) |
+ for se in self._engines)) |
def open(self): |
- self._engine_a.open() |
- self._engine_b.open() |
+ for se in self._engines: |
+ se.open() |
def close(self): |
- self._engine_a.close() |
- self._engine_b.close() |
+ util.defer_exceptions_for(self._engines, lambda se: se.close()) |
+ |
+ def append_stream_engine(self, se): |
+ assert isinstance(se, StreamEngine) |
+ self._engines.append(se) |
def _noop(*args, **kwargs): |