| OLD | NEW |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
| 6 | 6 |
| 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
| 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
| 9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
| 10 a way to distinguish. | 10 a way to distinguish. |
| 11 | 11 |
| 12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
| 13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
| 14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
| 15 | 15 |
| 16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
| 17 can just write to without worrying. | 17 can just write to without worrying. |
| 18 """ | 18 """ |
| 19 | 19 |
| 20 import json | 20 import json |
| 21 import time | 21 import time |
| 22 | 22 |
| 23 from . import env | 23 from . import env |
| 24 from . import recipe_api | 24 from . import recipe_api |
| 25 from . import util |
| 25 | 26 |
| 26 | 27 |
| 27 class StreamEngine(object): | 28 class StreamEngine(object): |
| 28 class Stream(object): | 29 class Stream(object): |
| 29 def write_line(self, line): | 30 def write_line(self, line): |
| 30 raise NotImplementedError() | 31 raise NotImplementedError() |
| 31 | 32 |
| 32 def write_split(self, string): | 33 def write_split(self, string): |
| 33 """Write a string (which may contain newlines) to the stream. It will | 34 """Write a string (which may contain newlines) to the stream. It will |
| 34 be terminated by a newline.""" | 35 be terminated by a newline.""" |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 97 pass | 98 pass |
| 98 | 99 |
| 99 def __enter__(self): | 100 def __enter__(self): |
| 100 self.open() | 101 self.open() |
| 101 return self | 102 return self |
| 102 | 103 |
| 103 def __exit__(self, _exc_type, _exc_val, _exc_tb): | 104 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
| 104 self.close() | 105 self.close() |
| 105 | 106 |
| 106 | 107 |
| 107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | |
| 108 # form products. This code is entirely mechanical from the types (if we | |
| 109 # had them formalized...). | |
| 110 class ProductStreamEngine(StreamEngine): | 108 class ProductStreamEngine(StreamEngine): |
| 109 """A StreamEngine that forms the non-commutative product of two other |
| 110 StreamEngines. |
| 111 |
| 112 Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
| 113 form products. This code is entirely mechanical from the types (if we |
| 114 had them formalized...). |
| 115 |
| 116 This product is non-commutative, meaning order matters. Specifically, an |
| 117 exception in "engine_a" will prevent "engine_b" from being evaluated. |
| 118 """ |
| 119 |
| 111 def __init__(self, engine_a, engine_b): | 120 def __init__(self, engine_a, engine_b): |
| 112 assert engine_a and engine_b | 121 assert engine_a and engine_b |
| 113 self._engine_a = engine_a | 122 self._engine_a = engine_a |
| 114 self._engine_b = engine_b | 123 self._engine_b = engine_b |
| 115 | 124 |
| 116 class Stream(StreamEngine.Stream): | 125 class Stream(StreamEngine.Stream): |
| 117 def __init__(self, stream_a, stream_b): | 126 def __init__(self, stream_a, stream_b): |
| 118 assert stream_a and stream_b | 127 assert stream_a and stream_b |
| 119 self._stream_a = stream_a | 128 self._stream_a = stream_a |
| 120 self._stream_b = stream_b | 129 self._stream_b = stream_b |
| 121 | 130 |
| 122 def write_line(self, line): | 131 def write_line(self, line): |
| 123 self._stream_a.write_line(line) | 132 self._stream_a.write_line(line) |
| 124 self._stream_b.write_line(line) | 133 self._stream_b.write_line(line) |
| 125 | 134 |
| 126 def close(self): | 135 def close(self): |
| 127 self._stream_a.close() | 136 self._stream_a.close() |
| 128 self._stream_b.close() | 137 self._stream_b.close() |
| 129 | 138 |
| 130 class StepStream(Stream): | 139 class StepStream(Stream): |
| 140 # pylint: disable=no-self-argument |
| 131 def _void_product(method_name): | 141 def _void_product(method_name): |
| 132 def inner(self, *args): | 142 def inner(self, *args): |
| 133 getattr(self._stream_a, method_name)(*args) | 143 getattr(self._stream_a, method_name)(*args) |
| 134 getattr(self._stream_b, method_name)(*args) | 144 getattr(self._stream_b, method_name)(*args) |
| 135 return inner | 145 return inner |
| 136 | 146 |
| 137 def new_log_stream(self, log_name): | 147 def new_log_stream(self, log_name): |
| 138 return ProductStreamEngine.Stream( | 148 return ProductStreamEngine.Stream( |
| 139 self._stream_a.new_log_stream(log_name), | 149 self._stream_a.new_log_stream(log_name), |
| 140 self._stream_b.new_log_stream(log_name)) | 150 self._stream_b.new_log_stream(log_name)) |
| (...skipping 13 matching lines...) Expand all Loading... |
| 154 | 164 |
| 155 def open(self): | 165 def open(self): |
| 156 self._engine_a.open() | 166 self._engine_a.open() |
| 157 self._engine_b.open() | 167 self._engine_b.open() |
| 158 | 168 |
| 159 def close(self): | 169 def close(self): |
| 160 self._engine_a.close() | 170 self._engine_a.close() |
| 161 self._engine_b.close() | 171 self._engine_b.close() |
| 162 | 172 |
| 163 | 173 |
| 164 def _noop(*args, **kwargs): | 174 class MultiStreamEngine(StreamEngine): |
| 175 """A StreamEngine consisting of one or more inner StreamEngines. |
| 176 |
| 177 A call to this StreamEngine will be distributed to the inner StreamEngines. |
| 178 Any exceptions that are caught during an inner handler will be deferred until |
| 179 all inner handlers have been executed. |
| 180 """ |
| 181 |
| 182 def __init__(self, base, *engines): |
| 183 self._engines = (base,) + engines |
| 184 assert None not in self._engines |
| 185 |
| 186 @classmethod |
| 187 def create(cls, *engines): |
| 188 assert len(engines) > 0, 'At least one engine must be provided.' |
| 189 if len(engines) == 1: |
| 190 return engines[0] |
| 191 return cls(engines[0], *engines[1:]) |
| 192 |
| 193 class Stream(StreamEngine.Stream): |
| 194 def __init__(self, *streams): |
| 195 assert all(streams) |
| 196 self._streams = streams |
| 197 |
| 198 def write_line(self, line): |
| 199 util.map_defer_exceptions(lambda s: s.write_line(line), self._streams) |
| 200 |
| 201 def close(self): |
| 202 util.map_defer_exceptions(lambda s: s.close(), self._streams) |
| 203 |
| 204 class StepStream(Stream): |
| 205 # pylint: disable=no-self-argument |
| 206 def _multiplex(method_name): |
| 207 def inner(self, *args): |
| 208 util.map_defer_exceptions(lambda s: getattr(s, method_name)(*args), |
| 209 self._streams) |
| 210 return inner |
| 211 |
| 212 def new_log_stream(self, log_name): |
| 213 log_streams = [] |
| 214 try: |
| 215 for s in self._streams: |
| 216 log_streams.append(s.new_log_stream(log_name)) |
| 217 return MultiStreamEngine.Stream(*log_streams) |
| 218 except Exception: |
| 219 # Close any opened log streams. |
| 220 util.map_defer_exceptions(lambda ls: ls.close(), log_streams) |
| 221 raise |
| 222 |
| 223 add_step_text = _multiplex('add_step_text') |
| 224 add_step_summary_text = _multiplex('add_step_summary_text') |
| 225 add_step_link = _multiplex('add_step_link') |
| 226 reset_subannotation_state = _multiplex('reset_subannotation_state') |
| 227 set_step_status = _multiplex('set_step_status') |
| 228 set_build_property = _multiplex('set_build_property') |
| 229 trigger = _multiplex('trigger') |
| 230 |
| 231 def new_step_stream(self, step_config): |
| 232 return self.StepStream( |
| 233 *(se.new_step_stream(step_config) |
| 234 for se in self._engines)) |
| 235 |
| 236 def open(self): |
| 237 for se in self._engines: |
| 238 se.open() |
| 239 |
| 240 def close(self): |
| 241 util.map_defer_exceptions(lambda se: se.close(), self._engines) |
| 242 |
| 243 def append_stream_engine(self, se): |
| 244 assert isinstance(se, StreamEngine) |
| 245 self._engines.append(se) |
| 246 |
| 247 |
| 248 def _noop(*_args, **_kwargs): |
| 165 pass | 249 pass |
| 166 | 250 |
| 167 class NoopStreamEngine(StreamEngine): | 251 class NoopStreamEngine(StreamEngine): |
| 168 class Stream(StreamEngine.Stream): | 252 class Stream(StreamEngine.Stream): |
| 169 write_line = _noop | 253 write_line = _noop |
| 170 close = _noop | 254 close = _noop |
| 171 | 255 |
| 172 class StepStream(Stream): | 256 class StepStream(Stream): |
| 173 def new_log_stream(self, log_name): | 257 def new_log_stream(self, _log_name): |
| 174 return NoopStreamEngine.Stream() | 258 return NoopStreamEngine.Stream() |
| 175 add_step_text = _noop | 259 add_step_text = _noop |
| 176 add_step_summary_text = _noop | 260 add_step_summary_text = _noop |
| 177 add_step_link = _noop | 261 add_step_link = _noop |
| 178 reset_subannotation_state = _noop | 262 reset_subannotation_state = _noop |
| 179 set_step_status = _noop | 263 set_step_status = _noop |
| 180 set_build_property = _noop | 264 set_build_property = _noop |
| 181 trigger = _noop | 265 trigger = _noop |
| 182 | 266 |
| 183 def new_step_stream(self, step_config): | 267 def new_step_stream(self, step_config): |
| 184 return self.StepStream() | 268 return self.StepStream() |
| 185 | 269 |
| 186 | 270 |
| 187 class StreamEngineInvariants(StreamEngine): | 271 class StreamEngineInvariants(StreamEngine): |
| 188 """Checks that the users are using a StreamEngine hygenically. | 272 """Checks that the users are using a StreamEngine hygenically. |
| 189 | 273 |
| 190 Multiply with actually functional StreamEngines so you don't have to check | 274 Multiply with actually functional StreamEngines so you don't have to check |
| 191 these all over the place. | 275 these all over the place. |
| 192 """ | 276 """ |
| 193 def __init__(self): | 277 def __init__(self): |
| 194 self._streams = set() | 278 self._streams = set() |
| 195 | 279 |
| 280 @classmethod |
| 281 def wrap(cls, other): |
| 282 """Returns (ProductStreamEngine): A product applying invariants to "other". |
| 283 """ |
| 284 return ProductStreamEngine(cls(), other) |
| 285 |
| 196 class StepStream(StreamEngine.StepStream): | 286 class StepStream(StreamEngine.StepStream): |
| 197 def __init__(self, engine, step_name): | 287 def __init__(self, engine, step_name): |
| 288 super(StreamEngineInvariants.StepStream, self).__init__() |
| 198 self._engine = engine | 289 self._engine = engine |
| 199 self._step_name = step_name | 290 self._step_name = step_name |
| 200 self._open = True | 291 self._open = True |
| 201 self._logs = {} | 292 self._logs = {} |
| 202 self._status = 'SUCCESS' | 293 self._status = 'SUCCESS' |
| 203 | 294 |
| 204 def write_line(self, line): | 295 def write_line(self, line): |
| 205 assert '\n' not in line | 296 assert '\n' not in line |
| 206 assert self._open | 297 assert self._open |
| 207 | 298 |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 308 if self._current_step != step_name: | 399 if self._current_step != step_name: |
| 309 self.output_root_annotation('STEP_CURSOR', step_name) | 400 self.output_root_annotation('STEP_CURSOR', step_name) |
| 310 self._current_step = step_name | 401 self._current_step = step_name |
| 311 if step_name not in self._opened: | 402 if step_name not in self._opened: |
| 312 self.output_current_time() | 403 self.output_current_time() |
| 313 self.output_root_annotation('STEP_STARTED') | 404 self.output_root_annotation('STEP_STARTED') |
| 314 self._opened.add(step_name) | 405 self._opened.add(step_name) |
| 315 | 406 |
| 316 class StepStream(StreamEngine.StepStream): | 407 class StepStream(StreamEngine.StepStream): |
| 317 def __init__(self, engine, outstream, step_name): | 408 def __init__(self, engine, outstream, step_name): |
| 318 super(StreamEngine.StepStream, self).__init__() | 409 super(AnnotatorStreamEngine.StepStream, self).__init__() |
| 319 | 410 |
| 320 self._engine = engine | 411 self._engine = engine |
| 321 self._outstream = outstream | 412 self._outstream = outstream |
| 322 self._step_name = step_name | 413 self._step_name = step_name |
| 323 | 414 |
| 324 def basic_write(self, line): | 415 def basic_write(self, line): |
| 325 self._engine._step_cursor(self._step_name) | 416 self._engine._step_cursor(self._step_name) |
| 326 self._outstream.write(line) | 417 self._outstream.write(line) |
| 327 | 418 |
| 328 def close(self): | 419 def close(self): |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 418 string (which means it might not be valid ascii), we decode the string with | 509 string (which means it might not be valid ascii), we decode the string with |
| 419 the 'replace' error mode, which replaces invalid characters with a suitable | 510 the 'replace' error mode, which replaces invalid characters with a suitable |
| 420 replacement character. | 511 replacement character. |
| 421 """ | 512 """ |
| 422 try: | 513 try: |
| 423 return str(s) | 514 return str(s) |
| 424 except UnicodeEncodeError: | 515 except UnicodeEncodeError: |
| 425 return s.encode('utf-8', 'replace') | 516 return s.encode('utf-8', 'replace') |
| 426 except UnicodeDecodeError: | 517 except UnicodeDecodeError: |
| 427 return s.decode('utf-8', 'replace') | 518 return s.decode('utf-8', 'replace') |
| OLD | NEW |