OLD | NEW |
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
6 | 6 |
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
10 a way to distinguish. | 10 a way to distinguish. |
11 | 11 |
12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
15 | 15 |
16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
17 can just write to without worrying. | 17 can just write to without worrying. |
18 """ | 18 """ |
19 | 19 |
20 import json | 20 import json |
21 import time | 21 import time |
22 | 22 |
23 from . import env | 23 from . import env |
24 from . import recipe_api | 24 from . import recipe_api |
| 25 from . import util |
25 | 26 |
26 | 27 |
27 class StreamEngine(object): | 28 class StreamEngine(object): |
28 class Stream(object): | 29 class Stream(object): |
29 def write_line(self, line): | 30 def write_line(self, line): |
30 raise NotImplementedError() | 31 raise NotImplementedError() |
31 | 32 |
32 def write_split(self, string): | 33 def write_split(self, string): |
33 """Write a string (which may contain newlines) to the stream. It will | 34 """Write a string (which may contain newlines) to the stream. It will |
34 be terminated by a newline.""" | 35 be terminated by a newline.""" |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
97 pass | 98 pass |
98 | 99 |
99 def __enter__(self): | 100 def __enter__(self): |
100 self.open() | 101 self.open() |
101 return self | 102 return self |
102 | 103 |
103 def __exit__(self, _exc_type, _exc_val, _exc_tb): | 104 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
104 self.close() | 105 self.close() |
105 | 106 |
106 | 107 |
107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | |
108 # form products. This code is entirely mechanical from the types (if we | |
109 # had them formalized...). | |
110 class ProductStreamEngine(StreamEngine): | 108 class ProductStreamEngine(StreamEngine): |
| 109 """A StreamEngine that forms the non-commutative product of two other |
| 110 StreamEngines. |
| 111 |
| 112 Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
| 113 form products. This code is entirely mechanical from the types (if we |
| 114 had them formalized...). |
| 115 |
| 116 This product is non-commutative, meaning order matters. Specifically, an |
| 117 exception in "engine_a" will prevent "engine_b" from being evaluated. |
| 118 """ |
| 119 |
111 def __init__(self, engine_a, engine_b): | 120 def __init__(self, engine_a, engine_b): |
112 assert engine_a and engine_b | 121 assert engine_a and engine_b |
113 self._engine_a = engine_a | 122 self._engine_a = engine_a |
114 self._engine_b = engine_b | 123 self._engine_b = engine_b |
115 | 124 |
116 class Stream(StreamEngine.Stream): | 125 class Stream(StreamEngine.Stream): |
117 def __init__(self, stream_a, stream_b): | 126 def __init__(self, stream_a, stream_b): |
118 assert stream_a and stream_b | 127 assert stream_a and stream_b |
119 self._stream_a = stream_a | 128 self._stream_a = stream_a |
120 self._stream_b = stream_b | 129 self._stream_b = stream_b |
121 | 130 |
122 def write_line(self, line): | 131 def write_line(self, line): |
123 self._stream_a.write_line(line) | 132 self._stream_a.write_line(line) |
124 self._stream_b.write_line(line) | 133 self._stream_b.write_line(line) |
125 | 134 |
126 def close(self): | 135 def close(self): |
127 self._stream_a.close() | 136 self._stream_a.close() |
128 self._stream_b.close() | 137 self._stream_b.close() |
129 | 138 |
130 class StepStream(Stream): | 139 class StepStream(Stream): |
| 140 # pylint: disable=no-self-argument |
131 def _void_product(method_name): | 141 def _void_product(method_name): |
132 def inner(self, *args): | 142 def inner(self, *args): |
133 getattr(self._stream_a, method_name)(*args) | 143 getattr(self._stream_a, method_name)(*args) |
134 getattr(self._stream_b, method_name)(*args) | 144 getattr(self._stream_b, method_name)(*args) |
135 return inner | 145 return inner |
136 | 146 |
137 def new_log_stream(self, log_name): | 147 def new_log_stream(self, log_name): |
138 return ProductStreamEngine.Stream( | 148 return ProductStreamEngine.Stream( |
139 self._stream_a.new_log_stream(log_name), | 149 self._stream_a.new_log_stream(log_name), |
140 self._stream_b.new_log_stream(log_name)) | 150 self._stream_b.new_log_stream(log_name)) |
(...skipping 13 matching lines...) Expand all Loading... |
154 | 164 |
155 def open(self): | 165 def open(self): |
156 self._engine_a.open() | 166 self._engine_a.open() |
157 self._engine_b.open() | 167 self._engine_b.open() |
158 | 168 |
159 def close(self): | 169 def close(self): |
160 self._engine_a.close() | 170 self._engine_a.close() |
161 self._engine_b.close() | 171 self._engine_b.close() |
162 | 172 |
163 | 173 |
164 def _noop(*args, **kwargs): | 174 class MultiStreamEngine(StreamEngine): |
| 175 """A StreamEngine consisting of one or more inner StreamEngines. |
| 176 |
| 177 A call to this StreamEngine will be distributed to the inner StreamEngines. |
| 178 Any exceptions that are caught during an inner handler will be deferred until |
| 179 all inner handlers have been executed. |
| 180 """ |
| 181 |
| 182 def __init__(self, base, *engines): |
| 183 self._engines = (base,) + engines |
| 184 assert None not in self._engines |
| 185 |
| 186 @classmethod |
| 187 def create(cls, *engines): |
| 188 assert len(engines) > 0, 'At least one engine must be provided.' |
| 189 if len(engines) == 1: |
| 190 return engines[0] |
| 191 return cls(engines[0], *engines[1:]) |
| 192 |
| 193 class Stream(StreamEngine.Stream): |
| 194 def __init__(self, *streams): |
| 195 assert all(streams) |
| 196 self._streams = streams |
| 197 |
| 198 def write_line(self, line): |
| 199 util.map_defer_exceptions(lambda s: s.write_line(line), self._streams) |
| 200 |
| 201 def close(self): |
| 202 util.map_defer_exceptions(lambda s: s.close(), self._streams) |
| 203 |
| 204 class StepStream(Stream): |
| 205 # pylint: disable=no-self-argument |
| 206 def _multiplex(method_name): |
| 207 def inner(self, *args): |
| 208 util.map_defer_exceptions(lambda s: getattr(s, method_name)(*args), |
| 209 self._streams) |
| 210 return inner |
| 211 |
| 212 def new_log_stream(self, log_name): |
| 213 log_streams = [] |
| 214 try: |
| 215 for s in self._streams: |
| 216 log_streams.append(s.new_log_stream(log_name)) |
| 217 return MultiStreamEngine.Stream(*log_streams) |
| 218 except Exception: |
| 219 # Close any opened log streams. |
| 220 util.map_defer_exceptions(lambda ls: ls.close(), log_streams) |
| 221 raise |
| 222 |
| 223 add_step_text = _multiplex('add_step_text') |
| 224 add_step_summary_text = _multiplex('add_step_summary_text') |
| 225 add_step_link = _multiplex('add_step_link') |
| 226 reset_subannotation_state = _multiplex('reset_subannotation_state') |
| 227 set_step_status = _multiplex('set_step_status') |
| 228 set_build_property = _multiplex('set_build_property') |
| 229 trigger = _multiplex('trigger') |
| 230 |
| 231 def new_step_stream(self, step_config): |
| 232 return self.StepStream( |
| 233 *(se.new_step_stream(step_config) |
| 234 for se in self._engines)) |
| 235 |
| 236 def open(self): |
| 237 for se in self._engines: |
| 238 se.open() |
| 239 |
| 240 def close(self): |
| 241 util.map_defer_exceptions(lambda se: se.close(), self._engines) |
| 242 |
| 243 def append_stream_engine(self, se): |
| 244 assert isinstance(se, StreamEngine) |
| 245 self._engines.append(se) |
| 246 |
| 247 |
| 248 def _noop(*_args, **_kwargs): |
165 pass | 249 pass |
166 | 250 |
167 class NoopStreamEngine(StreamEngine): | 251 class NoopStreamEngine(StreamEngine): |
168 class Stream(StreamEngine.Stream): | 252 class Stream(StreamEngine.Stream): |
169 write_line = _noop | 253 write_line = _noop |
170 close = _noop | 254 close = _noop |
171 | 255 |
172 class StepStream(Stream): | 256 class StepStream(Stream): |
173 def new_log_stream(self, log_name): | 257 def new_log_stream(self, _log_name): |
174 return NoopStreamEngine.Stream() | 258 return NoopStreamEngine.Stream() |
175 add_step_text = _noop | 259 add_step_text = _noop |
176 add_step_summary_text = _noop | 260 add_step_summary_text = _noop |
177 add_step_link = _noop | 261 add_step_link = _noop |
178 reset_subannotation_state = _noop | 262 reset_subannotation_state = _noop |
179 set_step_status = _noop | 263 set_step_status = _noop |
180 set_build_property = _noop | 264 set_build_property = _noop |
181 trigger = _noop | 265 trigger = _noop |
182 | 266 |
183 def new_step_stream(self, step_config): | 267 def new_step_stream(self, step_config): |
184 return self.StepStream() | 268 return self.StepStream() |
185 | 269 |
186 | 270 |
187 class StreamEngineInvariants(StreamEngine): | 271 class StreamEngineInvariants(StreamEngine): |
188 """Checks that the users are using a StreamEngine hygenically. | 272 """Checks that the users are using a StreamEngine hygenically. |
189 | 273 |
190 Multiply with actually functional StreamEngines so you don't have to check | 274 Multiply with actually functional StreamEngines so you don't have to check |
191 these all over the place. | 275 these all over the place. |
192 """ | 276 """ |
193 def __init__(self): | 277 def __init__(self): |
194 self._streams = set() | 278 self._streams = set() |
195 | 279 |
| 280 @classmethod |
| 281 def wrap(cls, other): |
| 282 """Returns (ProductStreamEngine): A product applying invariants to "other". |
| 283 """ |
| 284 return ProductStreamEngine(cls(), other) |
| 285 |
196 class StepStream(StreamEngine.StepStream): | 286 class StepStream(StreamEngine.StepStream): |
197 def __init__(self, engine, step_name): | 287 def __init__(self, engine, step_name): |
| 288 super(StreamEngineInvariants.StepStream, self).__init__() |
198 self._engine = engine | 289 self._engine = engine |
199 self._step_name = step_name | 290 self._step_name = step_name |
200 self._open = True | 291 self._open = True |
201 self._logs = {} | 292 self._logs = {} |
202 self._status = 'SUCCESS' | 293 self._status = 'SUCCESS' |
203 | 294 |
204 def write_line(self, line): | 295 def write_line(self, line): |
205 assert '\n' not in line | 296 assert '\n' not in line |
206 assert self._open | 297 assert self._open |
207 | 298 |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
308 if self._current_step != step_name: | 399 if self._current_step != step_name: |
309 self.output_root_annotation('STEP_CURSOR', step_name) | 400 self.output_root_annotation('STEP_CURSOR', step_name) |
310 self._current_step = step_name | 401 self._current_step = step_name |
311 if step_name not in self._opened: | 402 if step_name not in self._opened: |
312 self.output_current_time() | 403 self.output_current_time() |
313 self.output_root_annotation('STEP_STARTED') | 404 self.output_root_annotation('STEP_STARTED') |
314 self._opened.add(step_name) | 405 self._opened.add(step_name) |
315 | 406 |
316 class StepStream(StreamEngine.StepStream): | 407 class StepStream(StreamEngine.StepStream): |
317 def __init__(self, engine, outstream, step_name): | 408 def __init__(self, engine, outstream, step_name): |
318 super(StreamEngine.StepStream, self).__init__() | 409 super(AnnotatorStreamEngine.StepStream, self).__init__() |
319 | 410 |
320 self._engine = engine | 411 self._engine = engine |
321 self._outstream = outstream | 412 self._outstream = outstream |
322 self._step_name = step_name | 413 self._step_name = step_name |
323 | 414 |
324 def basic_write(self, line): | 415 def basic_write(self, line): |
325 self._engine._step_cursor(self._step_name) | 416 self._engine._step_cursor(self._step_name) |
326 self._outstream.write(line) | 417 self._outstream.write(line) |
327 | 418 |
328 def close(self): | 419 def close(self): |
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
418 string (which means it might not be valid ascii), we decode the string with | 509 string (which means it might not be valid ascii), we decode the string with |
419 the 'replace' error mode, which replaces invalid characters with a suitable | 510 the 'replace' error mode, which replaces invalid characters with a suitable |
420 replacement character. | 511 replacement character. |
421 """ | 512 """ |
422 try: | 513 try: |
423 return str(s) | 514 return str(s) |
424 except UnicodeEncodeError: | 515 except UnicodeEncodeError: |
425 return s.encode('utf-8', 'replace') | 516 return s.encode('utf-8', 'replace') |
426 except UnicodeDecodeError: | 517 except UnicodeDecodeError: |
427 return s.decode('utf-8', 'replace') | 518 return s.decode('utf-8', 'replace') |
OLD | NEW |