Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(624)

Unified Diff: recipe_engine/stream.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: pylint, fix comments Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: recipe_engine/stream.py
diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
index ce98d2079e88cc4d5f1a3ad443221e681992d483..dfd9e7373e27a2dacf284ba19e470aaaaec15267 100644
--- a/recipe_engine/stream.py
+++ b/recipe_engine/stream.py
@@ -22,6 +22,7 @@ import time
from . import env
from . import recipe_api
+from . import util
class StreamEngine(object):
@@ -108,36 +109,32 @@ class StreamEngine(object):
# form products. This code is entirely mechanical from the types (if we
# had them formalized...).
class ProductStreamEngine(StreamEngine):
- def __init__(self, engine_a, engine_b):
- assert engine_a and engine_b
- self._engine_a = engine_a
- self._engine_b = engine_b
+ def __init__(self, base, *engines):
+ self._engines = [base] + list(engines)
+ assert all(self._engines)
class Stream(StreamEngine.Stream):
- def __init__(self, stream_a, stream_b):
- assert stream_a and stream_b
- self._stream_a = stream_a
- self._stream_b = stream_b
+ def __init__(self, *streams):
+ assert all(streams)
+ self._streams = streams
def write_line(self, line):
- self._stream_a.write_line(line)
- self._stream_b.write_line(line)
+ for s in self._streams:
+ s.write_line(line)
def close(self):
- self._stream_a.close()
- self._stream_b.close()
+ util.defer_exceptions_for(self._streams, lambda s: s.close())
class StepStream(Stream):
def _void_product(method_name):
def inner(self, *args):
- getattr(self._stream_a, method_name)(*args)
- getattr(self._stream_b, method_name)(*args)
+ for s in self._streams:
+ getattr(s, method_name)(*args)
return inner
def new_log_stream(self, log_name):
return ProductStreamEngine.Stream(
- self._stream_a.new_log_stream(log_name),
- self._stream_b.new_log_stream(log_name))
+ *(se.new_log_stream(log_name) for se in self._streams))
add_step_text = _void_product('add_step_text')
add_step_summary_text = _void_product('add_step_summary_text')
@@ -149,16 +146,19 @@ class ProductStreamEngine(StreamEngine):
def new_step_stream(self, step_config):
return self.StepStream(
- self._engine_a.new_step_stream(step_config),
- self._engine_b.new_step_stream(step_config))
+ *(se.new_step_stream(step_config)
+ for se in self._engines))
def open(self):
- self._engine_a.open()
- self._engine_b.open()
+ for se in self._engines:
+ se.open()
def close(self):
- self._engine_a.close()
- self._engine_b.close()
+ util.defer_exceptions_for(self._engines, lambda se: se.close())
+
+ def append_stream_engine(self, se):
+ assert isinstance(se, StreamEngine)
+ self._engines.append(se)
def _noop(*args, **kwargs):

Powered by Google App Engine
This is Rietveld 408576698