Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """stream.StreamEngine implementation for LogDog, using Milo annotation | 5 """stream.StreamEngine implementation for LogDog, using Milo annotation |
| 6 protobuf. | 6 protobuf. |
| 7 """ | 7 """ |
| 8 | 8 |
| 9 import collections | 9 import collections |
| 10 import contextlib | 10 import contextlib |
| (...skipping 250 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 261 | 261 |
| 262 def trigger(self, trigger_spec): | 262 def trigger(self, trigger_spec): |
| 263 if self._engine._ignore_triggers: | 263 if self._engine._ignore_triggers: |
| 264 return | 264 return |
| 265 raise NotImplementedError( | 265 raise NotImplementedError( |
| 266 'Stream-based triggering is not supported for LogDog. Please use ' | 266 'Stream-based triggering is not supported for LogDog. Please use ' |
| 267 'a recipe module (e.g., buildbucket) directly for build scheduling.') | 267 'a recipe module (e.g., buildbucket) directly for build scheduling.') |
| 268 | 268 |
| 269 | 269 |
| 270 def __init__(self, client=None, streamserver_uri=None, name_base=None, | 270 def __init__(self, client=None, streamserver_uri=None, name_base=None, |
| 271 ignore_triggers=False, environment=None): | 271 dump_path=None, ignore_triggers=False, environment=None): |
| 272 """Initializes a new LogDog/Annotation StreamEngine. | 272 """Initializes a new LogDog/Annotation StreamEngine. |
| 273 | 273 |
| 274 Args: | 274 Args: |
| 275 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | 275 client (libs.logdog.stream.StreamClient or None): the LogDog stream client |
| 276 to use. If this is None, a new StreamClient will be instantiated when | 276 to use. If this is None, a new StreamClient will be instantiated when |
| 277 this StreamEngine is opened. | 277 this StreamEngine is opened. |
| 278 streamserver_uri (str or None): The LogDog Butler stream server URI. See | 278 streamserver_uri (str or None): The LogDog Butler stream server URI. See |
| 279 LogDog client library docs for details on supported protocols and | 279 LogDog client library docs for details on supported protocols and |
| 280 format. This will only be used when "client" is None. If this is also | 280 format. This will only be used when "client" is None. If this is also |
| 281 None, a StreamClient will be created through probing. | 281 None, a StreamClient will be created through probing. |
| 282 name_base (str or None): The default stream name prefix that will be added | 282 name_base (str or None): The default stream name prefix that will be added |
| 283 to generated LogDog stream names. If None, no prefix will be applied. | 283 to generated LogDog stream names. If None, no prefix will be applied. |
| 284 dump_path (str or None): If provided, a filesystem path where the final | |
| 285 recipe annotation protobuf binary will be dumped. | |
| 284 ignore_triggers (bool): Triggers are not supported in LogDog annotation | 286 ignore_triggers (bool): Triggers are not supported in LogDog annotation |
| 285 streams. If True, attempts to trigger will be silently ignored. If | 287 streams. If True, attempts to trigger will be silently ignored. If |
| 286 False, they will cause a NotImplementedError to be raised. | 288 False, they will cause a NotImplementedError to be raised. |
| 287 environment (_Environment or None): The _Environment instance to use for | 289 environment (_Environment or None): The _Environment instance to use for |
| 288 operations. This will be None at production, but can be overridden | 290 operations. This will be None at production, but can be overridden |
| 289 here for testing. | 291 here for testing. |
| 290 """ | 292 """ |
| 291 | 293 |
| 292 self._client = client | 294 self._client = client |
| 293 self._streamserver_uri = streamserver_uri | 295 self._streamserver_uri = streamserver_uri |
| 294 self._name_base = _StreamName(name_base) | 296 self._name_base = _StreamName(name_base) |
| 297 self._dump_path = dump_path | |
| 295 self._ignore_triggers = ignore_triggers | 298 self._ignore_triggers = ignore_triggers |
| 296 self._env = environment or _Environment.probe() | 299 self._env = environment or _Environment.probe() |
| 297 | 300 |
| 298 self._astate = None | 301 self._astate = None |
| 299 | 302 |
| 300 self._annotation_stream = None | 303 self._annotation_stream = None |
| 301 self._annotation_monitor = None | 304 self._annotation_monitor = None |
| 302 self._streams = collections.OrderedDict() | 305 self._streams = collections.OrderedDict() |
| 303 | 306 |
| 304 def new_step_stream(self, step_config): | 307 def new_step_stream(self, step_config): |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 356 # Shut down any outstanding streams that may not have been closed for | 359 # Shut down any outstanding streams that may not have been closed for |
| 357 # whatever reason. | 360 # whatever reason. |
| 358 for s in reversed(self._streams.values()): | 361 for s in reversed(self._streams.values()): |
| 359 s.close() | 362 s.close() |
| 360 | 363 |
| 361 # Close out our root Step. Manually check annotation state afterwards. | 364 # Close out our root Step. Manually check annotation state afterwards. |
| 362 self._close_step(self._astate.base) | 365 self._close_step(self._astate.base) |
| 363 self._check() | 366 self._check() |
| 364 | 367 |
| 365 # Shut down our annotation monitor and close our annotation stream. | 368 # Shut down our annotation monitor and close our annotation stream. |
| 366 self._annotation_monitor.flush_and_join() | 369 last_step_data = self._annotation_monitor.flush_and_join() |
| 367 self._annotation_stream.close() | 370 self._annotation_stream.close() |
| 368 | 371 |
| 369 # Clear our client and state. We are now closed. | 372 # Clear our client and state. We are now closed. |
| 370 self._streams.clear() | 373 self._streams.clear() |
| 371 self._client = None | 374 self._client = None |
| 372 self._astate = None | 375 self._astate = None |
| 373 | 376 |
| 377 # If requested, write out the last step data. | |
| 378 if self._dump_path and last_step_data: | |
| 379 with open(self._dump_path, 'wb') as fd: | |
|
nodir
2016/09/28 23:02:22
please swap these two lines, so in the worst case
dnj
2016/09/28 23:04:11
Done.
| |
| 380 fd.write(last_step_data) | |
| 381 | |
| 374 def _check(self): | 382 def _check(self): |
| 375 if self._astate is None: | 383 if self._astate is None: |
| 376 return | 384 return |
| 377 | 385 |
| 378 step_data = self._astate.check() | 386 step_data = self._astate.check() |
| 379 if step_data is not None: | 387 if step_data is not None: |
| 380 self._annotation_monitor.signal_update(step_data) | 388 self._annotation_monitor.signal_update(step_data) |
| 381 | 389 |
| 382 def _set_timestamp(self, dst, dt=None): | 390 def _set_timestamp(self, dst, dt=None): |
| 383 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. | 391 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. |
| (...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 504 # If not, make sure our timer is running so it will eventually be flushed. | 512 # If not, make sure our timer is running so it will eventually be flushed. |
| 505 # Note that the timer may also suggest that we flush immediately. | 513 # Note that the timer may also suggest that we flush immediately. |
| 506 now = datetime.datetime.now() | 514 now = datetime.datetime.now() |
| 507 self._current_data = step_data | 515 self._current_data = step_data |
| 508 if structural or self._set_flush_timer_locked(now): | 516 if structural or self._set_flush_timer_locked(now): |
| 509 # We should flush immediately. | 517 # We should flush immediately. |
| 510 self._flush_now_locked(now) | 518 self._flush_now_locked(now) |
| 511 | 519 |
| 512 def flush_and_join(self): | 520 def flush_and_join(self): |
| 513 """Flushes any remaining updates and blocks until the monitor is complete. | 521 """Flushes any remaining updates and blocks until the monitor is complete. |
| 522 | |
| 523 Returns (pb.Step): The final Step protobuf, or None if no step data was | |
| 524 sent. | |
| 514 """ | 525 """ |
| 515 # Mark that we're finished and signal our event. | 526 # Mark that we're finished and signal our event. |
| 516 with self._lock: | 527 with self._lock: |
| 517 self._flush_now_locked(datetime.datetime.now()) | 528 self._flush_now_locked(datetime.datetime.now()) |
| 529 return self._last_flush_data | |
| 518 | 530 |
| 519 @property | 531 @property |
| 520 def latest(self): | 532 def latest(self): |
| 521 with self._lock: | 533 with self._lock: |
| 522 return self._last_flush_data | 534 return self._last_flush_data |
| 523 | 535 |
| 524 def _flush_now_locked(self, now): | 536 def _flush_now_locked(self, now): |
| 525 # Clear any current flush timer. | 537 # Clear any current flush timer. |
| 526 self._clear_flush_timer_locked() | 538 self._clear_flush_timer_locked() |
| 527 | 539 |
| (...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 734 return v.replace('/', '_') | 746 return v.replace('/', '_') |
| 735 | 747 |
| 736 @staticmethod | 748 @staticmethod |
| 737 def _normalize(v): | 749 def _normalize(v): |
| 738 return libs.logdog.streamname.normalize(v, prefix='s_') | 750 return libs.logdog.streamname.normalize(v, prefix='s_') |
| 739 | 751 |
| 740 def __str__(self): | 752 def __str__(self): |
| 741 if not self._base: | 753 if not self._base: |
| 742 raise ValueError('Cannot generate string from empty StreamName.') | 754 raise ValueError('Cannot generate string from empty StreamName.') |
| 743 return self._base | 755 return self._base |
| OLD | NEW |