Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
| 6 | 6 |
| 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
| 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
| 9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
| 10 a way to distinguish. | 10 a way to distinguish. |
| 11 | 11 |
| 12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
| 13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
| 14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
| 15 | 15 |
| 16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
| 17 can just write to without worrying. | 17 can just write to without worrying. |
| 18 """ | 18 """ |
| 19 | 19 |
| 20 import json | 20 import json |
| 21 import time | 21 import time |
| 22 | 22 |
| 23 from . import env | 23 from . import env |
| 24 from . import recipe_api | 24 from . import recipe_api |
| 25 from . import util | |
| 26 | |
| 27 import libs.logdog.bootstrap | |
| 28 import libs.logdog.stream | |
| 29 import annotations_pb2 as miloproto | |
|
martiniss
2016/09/01 21:59:47
Unneeded.
dnj
2016/09/07 17:54:57
Done.
| |
| 25 | 30 |
| 26 | 31 |
| 27 class StreamEngine(object): | 32 class StreamEngine(object): |
| 28 class Stream(object): | 33 class Stream(object): |
| 29 def write_line(self, line): | 34 def write_line(self, line): |
| 30 raise NotImplementedError() | 35 raise NotImplementedError() |
| 31 | 36 |
| 32 def write_split(self, string): | 37 def write_split(self, string): |
| 33 """Write a string (which may contain newlines) to the stream. It will | 38 """Write a string (which may contain newlines) to the stream. It will |
| 34 be terminated by a newline.""" | 39 be terminated by a newline.""" |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 101 return self | 106 return self |
| 102 | 107 |
| 103 def __exit__(self, _exc_type, _exc_val, _exc_tb): | 108 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
| 104 self.close() | 109 self.close() |
| 105 | 110 |
| 106 | 111 |
| 107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | 112 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
| 108 # form products. This code is entirely mechanical from the types (if we | 113 # form products. This code is entirely mechanical from the types (if we |
| 109 # had them formalized...). | 114 # had them formalized...). |
| 110 class ProductStreamEngine(StreamEngine): | 115 class ProductStreamEngine(StreamEngine): |
| 111 def __init__(self, engine_a, engine_b): | 116 def __init__(self, base, *engines): |
| 112 assert engine_a and engine_b | 117 self._engines = [base] + list(engines) |
| 113 self._engine_a = engine_a | 118 assert all(self._engines) |
| 114 self._engine_b = engine_b | |
| 115 | 119 |
| 116 class Stream(StreamEngine.Stream): | 120 class Stream(StreamEngine.Stream): |
| 117 def __init__(self, stream_a, stream_b): | 121 def __init__(self, *streams): |
| 118 assert stream_a and stream_b | 122 assert all(streams) |
| 119 self._stream_a = stream_a | 123 self._streams = streams |
| 120 self._stream_b = stream_b | |
| 121 | 124 |
| 122 def write_line(self, line): | 125 def write_line(self, line): |
| 123 self._stream_a.write_line(line) | 126 for s in self._streams: |
| 124 self._stream_b.write_line(line) | 127 s.write_line(line) |
| 125 | 128 |
| 126 def close(self): | 129 def close(self): |
| 127 self._stream_a.close() | 130 for s in util.defer_exceptions_for(self._streams): |
| 128 self._stream_b.close() | 131 s.close() |
| 129 | 132 |
| 130 class StepStream(Stream): | 133 class StepStream(Stream): |
| 131 def _void_product(method_name): | 134 def _void_product(method_name): |
| 132 def inner(self, *args): | 135 def inner(self, *args): |
| 133 getattr(self._stream_a, method_name)(*args) | 136 for s in self._streams: |
| 134 getattr(self._stream_b, method_name)(*args) | 137 getattr(s, method_name)(*args) |
| 135 return inner | 138 return inner |
| 136 | 139 |
| 137 def new_log_stream(self, log_name): | 140 def new_log_stream(self, log_name): |
| 138 return ProductStreamEngine.Stream( | 141 return ProductStreamEngine.Stream( |
| 139 self._stream_a.new_log_stream(log_name), | 142 *(se.new_log_stream(log_name) for se in self._streams)) |
| 140 self._stream_b.new_log_stream(log_name)) | |
| 141 | 143 |
| 142 add_step_text = _void_product('add_step_text') | 144 add_step_text = _void_product('add_step_text') |
| 143 add_step_summary_text = _void_product('add_step_summary_text') | 145 add_step_summary_text = _void_product('add_step_summary_text') |
| 144 add_step_link = _void_product('add_step_link') | 146 add_step_link = _void_product('add_step_link') |
| 145 reset_subannotation_state = _void_product('reset_subannotation_state') | 147 reset_subannotation_state = _void_product('reset_subannotation_state') |
| 146 set_step_status = _void_product('set_step_status') | 148 set_step_status = _void_product('set_step_status') |
| 147 set_build_property = _void_product('set_build_property') | 149 set_build_property = _void_product('set_build_property') |
| 148 trigger = _void_product('trigger') | 150 trigger = _void_product('trigger') |
| 149 | 151 |
| 150 def new_step_stream(self, step_config): | 152 def new_step_stream(self, step_config): |
| 151 return self.StepStream( | 153 return self.StepStream( |
| 152 self._engine_a.new_step_stream(step_config), | 154 *list(se.new_step_stream(step_config) |
|
martiniss
2016/09/01 21:59:47
why the "*list"? Why not *(
dnj
2016/09/07 17:54:57
Done.
| |
| 153 self._engine_b.new_step_stream(step_config)) | 155 for se in self._engines)) |
| 154 | 156 |
| 155 def open(self): | 157 def open(self): |
| 156 self._engine_a.open() | 158 for se in self._engines: |
| 157 self._engine_b.open() | 159 se.open() |
| 158 | 160 |
| 159 def close(self): | 161 def close(self): |
| 160 self._engine_a.close() | 162 for se in util.defer_exceptions_for(self._engines): |
| 161 self._engine_b.close() | 163 se.close() |
| 164 | |
| 165 def append_stream_engine(self, se): | |
| 166 assert isinstance(se, StreamEngine) | |
| 167 self._engines.append(se) | |
| 162 | 168 |
| 163 | 169 |
| 164 def _noop(*args, **kwargs): | 170 def _noop(*args, **kwargs): |
| 165 pass | 171 pass |
| 166 | 172 |
| 167 class NoopStreamEngine(StreamEngine): | 173 class NoopStreamEngine(StreamEngine): |
| 168 class Stream(StreamEngine.Stream): | 174 class Stream(StreamEngine.Stream): |
| 169 write_line = _noop | 175 write_line = _noop |
| 170 close = _noop | 176 close = _noop |
| 171 | 177 |
| (...skipping 247 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 419 string (which means it might not be valid ascii), we decode the string with | 425 string (which means it might not be valid ascii), we decode the string with |
| 420 the 'replace' error mode, which replaces invalid characters with a suitable | 426 the 'replace' error mode, which replaces invalid characters with a suitable |
| 421 replacement character. | 427 replacement character. |
| 422 """ | 428 """ |
| 423 try: | 429 try: |
| 424 return str(s) | 430 return str(s) |
| 425 except UnicodeEncodeError: | 431 except UnicodeEncodeError: |
| 426 return s.encode('utf-8', 'replace') | 432 return s.encode('utf-8', 'replace') |
| 427 except UnicodeDecodeError: | 433 except UnicodeDecodeError: |
| 428 return s.decode('utf-8', 'replace') | 434 return s.decode('utf-8', 'replace') |
| OLD | NEW |