| OLD | NEW |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
| 6 | 6 |
| 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
| 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
| 9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
| 10 a way to distinguish. | 10 a way to distinguish. |
| 11 | 11 |
| 12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
| 13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
| 14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
| 15 | 15 |
| 16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
| 17 can just write to without worrying. | 17 can just write to without worrying. |
| 18 """ | 18 """ |
| 19 | 19 |
| 20 import json | 20 import json |
| 21 import time | 21 import time |
| 22 | 22 |
| 23 from . import env | 23 from . import env |
| 24 from . import recipe_api | 24 from . import recipe_api |
| 25 from . import util |
| 25 | 26 |
| 26 | 27 |
| 27 class StreamEngine(object): | 28 class StreamEngine(object): |
| 28 class Stream(object): | 29 class Stream(object): |
| 29 def write_line(self, line): | 30 def write_line(self, line): |
| 30 raise NotImplementedError() | 31 raise NotImplementedError() |
| 31 | 32 |
| 32 def write_split(self, string): | 33 def write_split(self, string): |
| 33 """Write a string (which may contain newlines) to the stream. It will | 34 """Write a string (which may contain newlines) to the stream. It will |
| 34 be terminated by a newline.""" | 35 be terminated by a newline.""" |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 101 return self | 102 return self |
| 102 | 103 |
| 103 def __exit__(self, _exc_type, _exc_val, _exc_tb): | 104 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
| 104 self.close() | 105 self.close() |
| 105 | 106 |
| 106 | 107 |
| 107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | 108 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
| 108 # form products. This code is entirely mechanical from the types (if we | 109 # form products. This code is entirely mechanical from the types (if we |
| 109 # had them formalized...). | 110 # had them formalized...). |
| 110 class ProductStreamEngine(StreamEngine): | 111 class ProductStreamEngine(StreamEngine): |
| 111 def __init__(self, engine_a, engine_b): | 112 def __init__(self, base, *engines): |
| 112 assert engine_a and engine_b | 113 self._engines = [base] + list(engines) |
| 113 self._engine_a = engine_a | 114 assert all(self._engines) |
| 114 self._engine_b = engine_b | |
| 115 | 115 |
| 116 class Stream(StreamEngine.Stream): | 116 class Stream(StreamEngine.Stream): |
| 117 def __init__(self, stream_a, stream_b): | 117 def __init__(self, *streams): |
| 118 assert stream_a and stream_b | 118 assert all(streams) |
| 119 self._stream_a = stream_a | 119 self._streams = streams |
| 120 self._stream_b = stream_b | |
| 121 | 120 |
| 122 def write_line(self, line): | 121 def write_line(self, line): |
| 123 self._stream_a.write_line(line) | 122 for s in self._streams: |
| 124 self._stream_b.write_line(line) | 123 s.write_line(line) |
| 125 | 124 |
| 126 def close(self): | 125 def close(self): |
| 127 self._stream_a.close() | 126 util.defer_exceptions_for(self._streams, lambda s: s.close()) |
| 128 self._stream_b.close() | |
| 129 | 127 |
| 130 class StepStream(Stream): | 128 class StepStream(Stream): |
| 131 def _void_product(method_name): | 129 def _void_product(method_name): |
| 132 def inner(self, *args): | 130 def inner(self, *args): |
| 133 getattr(self._stream_a, method_name)(*args) | 131 for s in self._streams: |
| 134 getattr(self._stream_b, method_name)(*args) | 132 getattr(s, method_name)(*args) |
| 135 return inner | 133 return inner |
| 136 | 134 |
| 137 def new_log_stream(self, log_name): | 135 def new_log_stream(self, log_name): |
| 138 return ProductStreamEngine.Stream( | 136 return ProductStreamEngine.Stream( |
| 139 self._stream_a.new_log_stream(log_name), | 137 *(se.new_log_stream(log_name) for se in self._streams)) |
| 140 self._stream_b.new_log_stream(log_name)) | |
| 141 | 138 |
| 142 add_step_text = _void_product('add_step_text') | 139 add_step_text = _void_product('add_step_text') |
| 143 add_step_summary_text = _void_product('add_step_summary_text') | 140 add_step_summary_text = _void_product('add_step_summary_text') |
| 144 add_step_link = _void_product('add_step_link') | 141 add_step_link = _void_product('add_step_link') |
| 145 reset_subannotation_state = _void_product('reset_subannotation_state') | 142 reset_subannotation_state = _void_product('reset_subannotation_state') |
| 146 set_step_status = _void_product('set_step_status') | 143 set_step_status = _void_product('set_step_status') |
| 147 set_build_property = _void_product('set_build_property') | 144 set_build_property = _void_product('set_build_property') |
| 148 trigger = _void_product('trigger') | 145 trigger = _void_product('trigger') |
| 149 | 146 |
| 150 def new_step_stream(self, step_config): | 147 def new_step_stream(self, step_config): |
| 151 return self.StepStream( | 148 return self.StepStream( |
| 152 self._engine_a.new_step_stream(step_config), | 149 *(se.new_step_stream(step_config) |
| 153 self._engine_b.new_step_stream(step_config)) | 150 for se in self._engines)) |
| 154 | 151 |
| 155 def open(self): | 152 def open(self): |
| 156 self._engine_a.open() | 153 for se in self._engines: |
| 157 self._engine_b.open() | 154 se.open() |
| 158 | 155 |
| 159 def close(self): | 156 def close(self): |
| 160 self._engine_a.close() | 157 util.defer_exceptions_for(self._engines, lambda se: se.close()) |
| 161 self._engine_b.close() | 158 |
| 159 def append_stream_engine(self, se): |
| 160 assert isinstance(se, StreamEngine) |
| 161 self._engines.append(se) |
| 162 | 162 |
| 163 | 163 |
| 164 def _noop(*args, **kwargs): | 164 def _noop(*args, **kwargs): |
| 165 pass | 165 pass |
| 166 | 166 |
| 167 class NoopStreamEngine(StreamEngine): | 167 class NoopStreamEngine(StreamEngine): |
| 168 class Stream(StreamEngine.Stream): | 168 class Stream(StreamEngine.Stream): |
| 169 write_line = _noop | 169 write_line = _noop |
| 170 close = _noop | 170 close = _noop |
| 171 | 171 |
| (...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 418 string (which means it might not be valid ascii), we decode the string with | 418 string (which means it might not be valid ascii), we decode the string with |
| 419 the 'replace' error mode, which replaces invalid characters with a suitable | 419 the 'replace' error mode, which replaces invalid characters with a suitable |
| 420 replacement character. | 420 replacement character. |
| 421 """ | 421 """ |
| 422 try: | 422 try: |
| 423 return str(s) | 423 return str(s) |
| 424 except UnicodeEncodeError: | 424 except UnicodeEncodeError: |
| 425 return s.encode('utf-8', 'replace') | 425 return s.encode('utf-8', 'replace') |
| 426 except UnicodeDecodeError: | 426 except UnicodeDecodeError: |
| 427 return s.decode('utf-8', 'replace') | 427 return s.decode('utf-8', 'replace') |
| OLD | NEW |