Chromium Code Reviews| Index: recipe_engine/stream.py |
| diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py |
| index e9f28be6011ed13769e69c9f67c02fca0b24a441..f18c3d1204dcec37aa8fd566a3b25a9aecf19bb2 100644 |
| --- a/recipe_engine/stream.py |
| +++ b/recipe_engine/stream.py |
| @@ -22,6 +22,11 @@ import time |
| from . import env |
| from . import recipe_api |
| +from . import util |
| + |
| +import libs.logdog.bootstrap |
| +import libs.logdog.stream |
| +import annotations_pb2 as miloproto |
|
martiniss
2016/09/01 21:59:47
Unneeded.
dnj
2016/09/07 17:54:57
Done.
|
| class StreamEngine(object): |
| @@ -108,36 +113,33 @@ class StreamEngine(object): |
| # form products. This code is entirely mechanical from the types (if we |
| # had them formalized...). |
| class ProductStreamEngine(StreamEngine): |
| - def __init__(self, engine_a, engine_b): |
| - assert engine_a and engine_b |
| - self._engine_a = engine_a |
| - self._engine_b = engine_b |
| + def __init__(self, base, *engines): |
| + self._engines = [base] + list(engines) |
| + assert all(self._engines) |
| class Stream(StreamEngine.Stream): |
| - def __init__(self, stream_a, stream_b): |
| - assert stream_a and stream_b |
| - self._stream_a = stream_a |
| - self._stream_b = stream_b |
| + def __init__(self, *streams): |
| + assert all(streams) |
| + self._streams = streams |
| def write_line(self, line): |
| - self._stream_a.write_line(line) |
| - self._stream_b.write_line(line) |
| + for s in self._streams: |
| + s.write_line(line) |
| def close(self): |
| - self._stream_a.close() |
| - self._stream_b.close() |
| + for s in util.defer_exceptions_for(self._streams): |
| + s.close() |
| class StepStream(Stream): |
| def _void_product(method_name): |
| def inner(self, *args): |
| - getattr(self._stream_a, method_name)(*args) |
| - getattr(self._stream_b, method_name)(*args) |
| + for s in self._streams: |
| + getattr(s, method_name)(*args) |
| return inner |
| def new_log_stream(self, log_name): |
| return ProductStreamEngine.Stream( |
| - self._stream_a.new_log_stream(log_name), |
| - self._stream_b.new_log_stream(log_name)) |
| + *(se.new_log_stream(log_name) for se in self._streams)) |
| add_step_text = _void_product('add_step_text') |
| add_step_summary_text = _void_product('add_step_summary_text') |
| @@ -149,16 +151,20 @@ class ProductStreamEngine(StreamEngine): |
| def new_step_stream(self, step_config): |
| return self.StepStream( |
| - self._engine_a.new_step_stream(step_config), |
| - self._engine_b.new_step_stream(step_config)) |
| + *list(se.new_step_stream(step_config) |
|
martiniss
2016/09/01 21:59:47
why the "*list"? Why not *(
dnj
2016/09/07 17:54:57
Done.
|
| + for se in self._engines)) |
| def open(self): |
| - self._engine_a.open() |
| - self._engine_b.open() |
| + for se in self._engines: |
| + se.open() |
| def close(self): |
| - self._engine_a.close() |
| - self._engine_b.close() |
| + for se in util.defer_exceptions_for(self._engines): |
| + se.close() |
| + |
| + def append_stream_engine(self, se): |
| + assert isinstance(se, StreamEngine) |
| + self._engines.append(se) |
| def _noop(*args, **kwargs): |