| OLD | NEW |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
| 6 | 6 |
| 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
| 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
| 9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
| 10 a way to distinguish. | 10 a way to distinguish. |
| (...skipping 28 matching lines...) Expand all Loading... |
| 39 Args: | 39 Args: |
| 40 stream (StreamEngine.Stream): The stream to output to. | 40 stream (StreamEngine.Stream): The stream to output to. |
| 41 it (iterable): An iterable that yields strings to write. | 41 it (iterable): An iterable that yields strings to write. |
| 42 """ | 42 """ |
| 43 for text in it: | 43 for text in it: |
| 44 lines = (text.split('\n') if text else ('',)) | 44 lines = (text.split('\n') if text else ('',)) |
| 45 for line in lines: | 45 for line in lines: |
| 46 stream.write_line(line) | 46 stream.write_line(line) |
| 47 | 47 |
| 48 | 48 |
| 49 def _check(cond, msg, **kwargs): |
| 50 """Runtime assertion used for parameter correctness.""" |
| 51 if not cond: |
| 52 raise ValueError('Failed assertion: %s %r' % (msg, kwargs)) |
| 53 |
| 54 |
| 49 class StreamEngine(object): | 55 class StreamEngine(object): |
| 56 |
| 50 class Stream(object): | 57 class Stream(object): |
| 51 def write_line(self, line): | 58 def write_line(self, line): |
| 52 raise NotImplementedError() | 59 raise NotImplementedError() |
| 53 | 60 |
| 54 def write_split(self, string): | 61 def write_split(self, string): |
| 55 """Write a string (which may contain newlines) to the stream. It will | 62 """Write a string (which may contain newlines) to the stream. It will |
| 56 be terminated by a newline.""" | 63 be terminated by a newline.""" |
| 57 for actual_line in string.splitlines() or ['']: # preserve empty lines | 64 for actual_line in string.splitlines() or ['']: # preserve empty lines |
| 58 self.write_line(actual_line) | 65 self.write_line(actual_line) |
| 59 | 66 |
| (...skipping 26 matching lines...) Expand all Loading... |
| 86 raise NotImplementedError() | 93 raise NotImplementedError() |
| 87 | 94 |
| 88 def set_build_property(self, key, value): | 95 def set_build_property(self, key, value): |
| 89 raise NotImplementedError() | 96 raise NotImplementedError() |
| 90 | 97 |
| 91 def trigger(self, trigger_spec): | 98 def trigger(self, trigger_spec): |
| 92 raise NotImplementedError() | 99 raise NotImplementedError() |
| 93 | 100 |
| 94 def make_step_stream(self, name, **kwargs): | 101 def make_step_stream(self, name, **kwargs): |
| 95 """Shorthand for creating a step stream from a step configuration dict.""" | 102 """Shorthand for creating a step stream from a step configuration dict.""" |
| 103 _check('\n' not in name, 'Stream name must not contain a newline.', |
| 104 name=name) |
| 96 kwargs['name'] = name | 105 kwargs['name'] = name |
| 97 return self.new_step_stream(recipe_api.StepConfig.create(**kwargs)) | 106 return self.new_step_stream(recipe_api.StepConfig.create(**kwargs)) |
| 98 | 107 |
| 99 def new_step_stream(self, step_config): | 108 def new_step_stream(self, step_config): |
| 100 """Creates a new StepStream in this engine. | 109 """Creates a new StepStream in this engine. |
| 101 | 110 |
| 102 The step will be considered started at the moment this method is called. | 111 The step will be considered started at the moment this method is called. |
| 103 | 112 |
| 104 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow | 113 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow |
| 105 annotations that this step emits through to the annotator (True), or | 114 annotations that this step emits through to the annotator (True), or |
| (...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 307 class StepStream(StreamEngine.StepStream): | 316 class StepStream(StreamEngine.StepStream): |
| 308 def __init__(self, engine, step_name): | 317 def __init__(self, engine, step_name): |
| 309 super(StreamEngineInvariants.StepStream, self).__init__() | 318 super(StreamEngineInvariants.StepStream, self).__init__() |
| 310 self._engine = engine | 319 self._engine = engine |
| 311 self._step_name = step_name | 320 self._step_name = step_name |
| 312 self._open = True | 321 self._open = True |
| 313 self._logs = {} | 322 self._logs = {} |
| 314 self._status = 'SUCCESS' | 323 self._status = 'SUCCESS' |
| 315 | 324 |
| 316 def write_line(self, line): | 325 def write_line(self, line): |
| 317 assert '\n' not in line | 326 assert '\n' not in line, 'Individual line must not contain a newline.' |
| 318 assert self._open | 327 assert self._open |
| 319 | 328 |
| 320 def close(self): | 329 def close(self): |
| 321 assert self._open | 330 assert self._open |
| 322 for log_name, log in self._logs.iteritems(): | 331 for log_name, log in self._logs.iteritems(): |
| 323 assert not log._open, 'Log %s still open when closing step %s' % ( | 332 assert not log._open, 'Log %s still open when closing step %s' % ( |
| 324 log_name, self._step_name) | 333 log_name, self._step_name) |
| 325 self._open = False | 334 self._open = False |
| 326 | 335 |
| 327 def new_log_stream(self, log_name): | 336 def new_log_stream(self, log_name): |
| 328 assert self._open | 337 assert self._open |
| 329 assert log_name not in self._logs, 'Log %s already exists in step %s' % ( | 338 assert log_name not in self._logs, 'Log %s already exists in step %s' % ( |
| 330 log_name, self._step_name) | 339 log_name, self._step_name) |
| 340 |
| 341 _check('\n' not in log_name, 'Log name must not contain a newline.', |
| 342 log_name=log_name) |
| 331 ret = self._engine.LogStream(self, log_name) | 343 ret = self._engine.LogStream(self, log_name) |
| 332 self._logs[log_name] = ret | 344 self._logs[log_name] = ret |
| 333 return ret | 345 return ret |
| 334 | 346 |
| 335 def add_step_text(self, text): | 347 def add_step_text(self, text): |
| 336 pass | 348 _check('\n' not in text, 'Step text must not contain a newline.', |
| 349 text=text) |
| 337 | 350 |
| 338 def add_step_summary_text(self, text): | 351 def add_step_summary_text(self, text): |
| 339 pass | 352 _check('\n' not in text, 'Step summary text must not contain a newline.', |
| 353 text=text) |
| 340 | 354 |
| 341 def add_step_link(self, name, url): | 355 def add_step_link(self, name, url): |
| 342 assert isinstance(name, basestring), 'Link name %s is not a string' % name | 356 _check(isinstance(name, basestring), 'Link name is not a string', |
| 343 assert isinstance(url, basestring), 'Link url %s is not a string' % url | 357 name=name) |
| 358 _check('\n' not in name, 'Link name must not contain a newline.', |
| 359 name=name) |
| 360 _check(isinstance(url, basestring), |
| 361 'Link URL is not a string', url=url) |
| 362 _check('\n' not in url, 'Link URL must not contain a newline.', |
| 363 url=url) |
| 344 | 364 |
| 345 def set_step_status(self, status): | 365 def set_step_status(self, status): |
| 346 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION') | 366 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION') |
| 347 if status == 'SUCCESS': | 367 if status == 'SUCCESS': |
| 348 # A constraint imposed by the annotations implementation | 368 # A constraint imposed by the annotations implementation |
| 349 assert self._status == 'SUCCESS', ( | 369 assert self._status == 'SUCCESS', ( |
| 350 'Cannot set successful status after status is %s' % self._status) | 370 'Cannot set successful status after status is %s' % self._status) |
| 351 self._status = status | 371 self._status = status |
| 352 | 372 |
| 353 def set_build_property(self, key, value): | 373 def set_build_property(self, key, value): |
| 354 pass | 374 _check('\n' not in key, 'Property key must not contain a newline.', |
| 375 key=key) |
| 376 _check('\n' not in value, 'Property value must not contain a newline.', |
| 377 value=value) |
| 378 json.loads(value) # value must be a valid JSON object. |
| 355 | 379 |
| 356 def trigger(self, spec): | 380 def trigger(self, spec): |
| 357 assert '\n' not in spec # Spec must fit on one line. | 381 _check('\n' not in spec, 'Spec must not contain a newline.', |
| 358 json.loads(spec) # Spec must be a valid json object. | 382 spec=spec) |
| 383 json.loads(spec) # Spec must be a valid JSON object. |
| 359 | 384 |
| 360 class LogStream(StreamEngine.Stream): | 385 class LogStream(StreamEngine.Stream): |
| 361 def __init__(self, step_stream, log_name): | 386 def __init__(self, step_stream, log_name): |
| 362 self._step_stream = step_stream | 387 self._step_stream = step_stream |
| 363 self._log_name = log_name | 388 self._log_name = log_name |
| 364 self._open = True | 389 self._open = True |
| 365 | 390 |
| 366 def write_line(self, line): | 391 def write_line(self, line): |
| 367 assert '\n' not in line | 392 assert '\n' not in line, 'Individual line must not contain a newline.' |
| 368 assert self._step_stream._open | 393 assert self._step_stream._open |
| 369 assert self._open | 394 assert self._open |
| 370 | 395 |
| 371 def close(self): | 396 def close(self): |
| 372 assert self._step_stream._open | 397 assert self._step_stream._open |
| 373 assert self._open | 398 assert self._open |
| 374 self._open = False | 399 self._open = False |
| 375 | 400 |
| 376 def new_step_stream(self, step_config): | 401 def new_step_stream(self, step_config): |
| 377 assert step_config.name not in self._streams, ( | 402 assert step_config.name not in self._streams, ( |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 530 string (which means it might not be valid ascii), we decode the string with | 555 string (which means it might not be valid ascii), we decode the string with |
| 531 the 'replace' error mode, which replaces invalid characters with a suitable | 556 the 'replace' error mode, which replaces invalid characters with a suitable |
| 532 replacement character. | 557 replacement character. |
| 533 """ | 558 """ |
| 534 try: | 559 try: |
| 535 return str(s) | 560 return str(s) |
| 536 except UnicodeEncodeError: | 561 except UnicodeEncodeError: |
| 537 return s.encode('utf-8', 'replace') | 562 return s.encode('utf-8', 'replace') |
| 538 except UnicodeDecodeError: | 563 except UnicodeDecodeError: |
| 539 return s.decode('utf-8', 'replace') | 564 return s.decode('utf-8', 'replace') |
| OLD | NEW |