| OLD | NEW |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
| 6 | 6 |
| 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
| 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
| 9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
| 10 a way to distinguish. | 10 a way to distinguish. |
| 11 | 11 |
| 12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
| 13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
| 14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
| 15 | 15 |
| 16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
| 17 can just write to without worrying. | 17 can just write to without worrying. |
| 18 """ | 18 """ |
| 19 | 19 |
| 20 import json | 20 import json |
| 21 import time |
| 21 | 22 |
| 22 class StreamEngine(object): | 23 class StreamEngine(object): |
| 23 class Stream(object): | 24 class Stream(object): |
| 24 def write_line(self, line): | 25 def write_line(self, line): |
| 25 raise NotImplementedError() | 26 raise NotImplementedError() |
| 26 | 27 |
| 27 def write_split(self, string): | 28 def write_split(self, string): |
| 28 """Write a string (which may contain newlines) to the stream. It will | 29 """Write a string (which may contain newlines) to the stream. It will |
| 29 be terminated by a newline.""" | 30 be terminated by a newline.""" |
| 30 for actual_line in string.splitlines() or ['']: # preserve empty lines | 31 for actual_line in string.splitlines() or ['']: # preserve empty lines |
| (...skipping 30 matching lines...) Expand all Loading... |
| 61 def set_nest_level(self, nest_level): | 62 def set_nest_level(self, nest_level): |
| 62 raise NotImplementedError() | 63 raise NotImplementedError() |
| 63 | 64 |
| 64 def set_build_property(self, key, value): | 65 def set_build_property(self, key, value): |
| 65 raise NotImplementedError() | 66 raise NotImplementedError() |
| 66 | 67 |
| 67 def trigger(self, trigger_spec): | 68 def trigger(self, trigger_spec): |
| 68 raise NotImplementedError() | 69 raise NotImplementedError() |
| 69 | 70 |
| 70 def new_step_stream(self, step_name, allow_subannotations=False): | 71 def new_step_stream(self, step_name, allow_subannotations=False): |
| 71 """Craete a new StepStream in this engine. | 72 """Creates a new StepStream in this engine. |
| 72 | 73 |
| 73 The step will be considered started at the moment this method is called. | 74 The step will be considered started at the moment this method is called. |
| 74 | 75 |
| 75 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow | 76 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow |
| 76 annotations that this step emits through to the annotator (True), or | 77 annotations that this step emits through to the annotator (True), or |
| 77 guard them by prefixing them with ! (False). The proper way to do this | 78 guard them by prefixing them with ! (False). The proper way to do this |
| 78 is to implement an annotations parser that converts to StreamEngine calls; | 79 is to implement an annotations parser that converts to StreamEngine calls; |
| 79 i.e. parse -> re-emit. | 80 i.e. parse -> re-emit. |
| 80 """ | 81 """ |
| 81 | 82 |
| 82 raise NotImplementedError() | 83 raise NotImplementedError() |
| 83 | 84 |
| 85 def open(self): |
| 86 pass |
| 87 |
| 88 def close(self): |
| 89 pass |
| 90 |
| 91 def __enter__(self): |
| 92 self.open() |
| 93 return self |
| 94 |
| 95 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
| 96 self.close() |
| 97 |
| 84 | 98 |
| 85 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | 99 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
| 86 # form products. This code is entirely mechanical from the types (if we | 100 # form products. This code is entirely mechanical from the types (if we |
| 87 # had them formalized...). | 101 # had them formalized...). |
| 88 class ProductStreamEngine(StreamEngine): | 102 class ProductStreamEngine(StreamEngine): |
| 89 def __init__(self, engine_a, engine_b): | 103 def __init__(self, engine_a, engine_b): |
| 90 self._engine_a = engine_a | 104 self._engine_a = engine_a |
| 91 self._engine_b = engine_b | 105 self._engine_b = engine_b |
| 92 | 106 |
| 93 class Stream(StreamEngine.Stream): | 107 class Stream(StreamEngine.Stream): |
| (...skipping 28 matching lines...) Expand all Loading... |
| 122 set_step_status = _void_product('set_step_status') | 136 set_step_status = _void_product('set_step_status') |
| 123 set_nest_level = _void_product('set_nest_level') | 137 set_nest_level = _void_product('set_nest_level') |
| 124 set_build_property = _void_product('set_build_property') | 138 set_build_property = _void_product('set_build_property') |
| 125 trigger = _void_product('trigger') | 139 trigger = _void_product('trigger') |
| 126 | 140 |
| 127 def new_step_stream(self, step_name, allow_subannotations=False): | 141 def new_step_stream(self, step_name, allow_subannotations=False): |
| 128 return self.StepStream( | 142 return self.StepStream( |
| 129 self._engine_a.new_step_stream(step_name, allow_subannotations), | 143 self._engine_a.new_step_stream(step_name, allow_subannotations), |
| 130 self._engine_b.new_step_stream(step_name, allow_subannotations)) | 144 self._engine_b.new_step_stream(step_name, allow_subannotations)) |
| 131 | 145 |
| 146 def open(self): |
| 147 self._engine_a.open() |
| 148 self._engine_b.open() |
| 149 |
| 150 def close(self): |
| 151 self._engine_a.close() |
| 152 self._engine_b.close() |
| 153 |
| 132 | 154 |
| 133 def _noop(*args, **kwargs): | 155 def _noop(*args, **kwargs): |
| 134 pass | 156 pass |
| 135 | 157 |
| 136 class NoopStreamEngine(StreamEngine): | 158 class NoopStreamEngine(StreamEngine): |
| 137 class Stream(StreamEngine.Stream): | 159 class Stream(StreamEngine.Stream): |
| 138 write_line = _noop | 160 write_line = _noop |
| 139 close = _noop | 161 close = _noop |
| 140 | 162 |
| 141 class StepStream(Stream): | 163 class StepStream(Stream): |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 234 assert self._open | 256 assert self._open |
| 235 self._open = False | 257 self._open = False |
| 236 | 258 |
| 237 def new_step_stream(self, step_name, allow_subannotations=False): | 259 def new_step_stream(self, step_name, allow_subannotations=False): |
| 238 assert step_name not in self._streams, 'Step %s already exists' % step_name | 260 assert step_name not in self._streams, 'Step %s already exists' % step_name |
| 239 self._streams.add(step_name) | 261 self._streams.add(step_name) |
| 240 return self.StepStream(self, step_name) | 262 return self.StepStream(self, step_name) |
| 241 | 263 |
| 242 | 264 |
| 243 class AnnotationStepStream(StreamEngine.StepStream): | 265 class AnnotationStepStream(StreamEngine.StepStream): |
| 266 def __init__(self, emit_timestamps=False, time_fn=None): |
| 267 self.emit_timestamps = emit_timestamps |
| 268 self.time_fn = time_fn or time.time |
| 269 |
| 244 def basic_write(self, line): | 270 def basic_write(self, line): |
| 245 raise NotImplementedError() | 271 raise NotImplementedError() |
| 246 | 272 |
| 247 def output_annotation(self, *args): | 273 def output_annotation(self, *args): |
| 248 self.basic_write('@@@' + '@'.join(args) + '@@@\n') | 274 self.basic_write('@@@' + '@'.join(map(str, args)) + '@@@\n') |
| 249 | 275 |
| 250 def write_line(self, line): | 276 def write_line(self, line): |
| 251 if line.startswith('@@@'): | 277 if line.startswith('@@@'): |
| 252 self.basic_write('!' + line + '\n') | 278 self.basic_write('!' + line + '\n') |
| 253 else: | 279 else: |
| 254 self.basic_write(line + '\n') | 280 self.basic_write(line + '\n') |
| 255 | 281 |
| 256 def close(self): | 282 def close(self): |
| 283 if self.emit_timestamps: |
| 284 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
| 257 self.output_annotation('STEP_CLOSED') | 285 self.output_annotation('STEP_CLOSED') |
| 258 | 286 |
| 259 def new_log_stream(self, log_name): | 287 def new_log_stream(self, log_name): |
| 260 return self.StepLogStream(self, log_name) | 288 return self.StepLogStream(self, log_name) |
| 261 | 289 |
| 262 def add_step_text(self, text): | 290 def add_step_text(self, text): |
| 263 self.output_annotation('STEP_TEXT', text) | 291 self.output_annotation('STEP_TEXT', text) |
| 264 | 292 |
| 265 def add_step_summary_text(self, text): | 293 def add_step_summary_text(self, text): |
| 266 self.output_annotation('STEP_SUMMARY_TEXT', text) | 294 self.output_annotation('STEP_SUMMARY_TEXT', text) |
| (...skipping 28 matching lines...) Expand all Loading... |
| 295 self._log_name = log_name.replace('/', '/') | 323 self._log_name = log_name.replace('/', '/') |
| 296 | 324 |
| 297 def write_line(self, line): | 325 def write_line(self, line): |
| 298 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line) | 326 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line) |
| 299 | 327 |
| 300 def close(self): | 328 def close(self): |
| 301 self._step_stream.output_annotation('STEP_LOG_END', self._log_name) | 329 self._step_stream.output_annotation('STEP_LOG_END', self._log_name) |
| 302 | 330 |
| 303 | 331 |
| 304 class AnnotatorStreamEngine(StreamEngine): | 332 class AnnotatorStreamEngine(StreamEngine): |
| 305 def __init__(self, outstream): | 333 def __init__(self, outstream, emit_timestamps=False, time_fn=None): |
| 306 self._current_step = None | 334 self._current_step = None |
| 307 self._opened = set() | 335 self._opened = set() |
| 308 self._outstream = outstream | 336 self._outstream = outstream |
| 337 self.emit_timestamps = emit_timestamps |
| 338 self.time_fn = time_fn or time.time |
| 339 |
| 340 def open(self): |
| 341 super(AnnotatorStreamEngine, self).open() |
| 342 if self.emit_timestamps: |
| 343 self.output_current_time() |
| 309 self.output_annotation('HONOR_ZERO_RETURN_CODE') | 344 self.output_annotation('HONOR_ZERO_RETURN_CODE') |
| 310 | 345 |
| 346 def close(self): |
| 347 super(AnnotatorStreamEngine, self).close() |
| 348 if self.emit_timestamps: |
| 349 self.output_current_time() |
| 350 |
| 351 def output_current_time(self): |
| 352 """Prints CURRENT_TIMESTAMP annotation with current time.""" |
| 353 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) |
| 354 |
| 311 def output_annotation(self, *args): | 355 def output_annotation(self, *args): |
| 312 # Flush the stream before & after engine annotations, because they can | 356 # Flush the stream before & after engine annotations, because they can |
| 313 # change which step we are talking about and this matters to buildbot. | 357 # change which step we are talking about and this matters to buildbot. |
| 314 self._outstream.flush() | 358 self._outstream.flush() |
| 315 self._outstream.write('@@@' + '@'.join(args) + '@@@\n') | 359 self._outstream.write('@@@' + '@'.join(map(str, args)) + '@@@\n') |
| 316 self._outstream.flush() | 360 self._outstream.flush() |
| 317 | 361 |
| 318 def _step_cursor(self, name): | 362 def _step_cursor(self, name): |
| 319 if self._current_step != name: | 363 if self._current_step != name: |
| 320 self.output_annotation('STEP_CURSOR', name) | 364 self.output_annotation('STEP_CURSOR', name) |
| 321 self._current_step = name | 365 self._current_step = name |
| 322 if name not in self._opened: | 366 if name not in self._opened: |
| 367 if self.emit_timestamps: |
| 368 self.output_current_time() |
| 323 self.output_annotation('STEP_STARTED') | 369 self.output_annotation('STEP_STARTED') |
| 324 self._opened.add(name) | 370 self._opened.add(name) |
| 325 | 371 |
| 326 class StepStream(AnnotationStepStream): | 372 class StepStream(AnnotationStepStream): |
| 327 def __init__(self, engine, step_name): | 373 def __init__(self, engine, step_name): |
| 374 AnnotationStepStream.__init__( |
| 375 self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn) |
| 328 self._engine = engine | 376 self._engine = engine |
| 329 self._step_name = step_name | 377 self._step_name = step_name |
| 330 | 378 |
| 331 def basic_write(self, line): | 379 def basic_write(self, line): |
| 332 self._engine._step_cursor(self._step_name) | 380 self._engine._step_cursor(self._step_name) |
| 333 self._engine._outstream.write(line) | 381 self._engine._outstream.write(line) |
| 334 | 382 |
| 335 class AllowSubannotationsStepStream(StepStream): | 383 class AllowSubannotationsStepStream(StepStream): |
| 336 def write_line(self, line): | 384 def write_line(self, line): |
| 337 self.basic_write(line + '\n') | 385 self.basic_write(line + '\n') |
| (...skipping 17 matching lines...) Expand all Loading... |
| 355 """A StepStream that is not tied to any engine, and emits assuming it has the | 403 """A StepStream that is not tied to any engine, and emits assuming it has the |
| 356 cursor. | 404 cursor. |
| 357 | 405 |
| 358 This is used for capturing the annotations in the engine. | 406 This is used for capturing the annotations in the engine. |
| 359 """ | 407 """ |
| 360 def __init__(self, outstream): | 408 def __init__(self, outstream): |
| 361 self._outstream = outstream | 409 self._outstream = outstream |
| 362 | 410 |
| 363 def basic_write(self, line): | 411 def basic_write(self, line): |
| 364 self._outstream.write(line) | 412 self._outstream.write(line) |
| 365 | |
| OLD | NEW |