Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """stream.StreamEngine implementation for LogDog, using Milo annotation | 5 """stream.StreamEngine implementation for LogDog, using Milo annotation |
| 6 protobuf. | 6 protobuf. |
| 7 """ | 7 """ |
| 8 | 8 |
| 9 import collections | 9 import collections |
| 10 import contextlib | 10 import contextlib |
| (...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 147 """ | 147 """ |
| 148 | 148 |
| 149 # The name of the annotation stream. | 149 # The name of the annotation stream. |
| 150 ANNOTATION_NAME = 'annotations' | 150 ANNOTATION_NAME = 'annotations' |
| 151 | 151 |
| 152 # The default amount of time in between anotation pushes. | 152 # The default amount of time in between anotation pushes. |
| 153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30) | 153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30) |
| 154 | 154 |
| 155 | 155 |
| 156 def __init__(self, client=None, streamserver_uri=None, name_base=None, | 156 def __init__(self, client=None, streamserver_uri=None, name_base=None, |
| 157 ignore_triggers=False, environment=None, update_interval=None): | 157 dump_path=None, ignore_triggers=False, environment=None, |
| 158 update_interval=None): | |
| 158 """Initializes a new LogDog/Annotation StreamEngine. | 159 """Initializes a new LogDog/Annotation StreamEngine. |
| 159 | 160 |
| 160 Args: | 161 Args: |
| 161 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | 162 client (libs.logdog.stream.StreamClient or None): the LogDog stream client |
| 162 to use. If this is None, a new StreamClient will be instantiated when | 163 to use. If this is None, a new StreamClient will be instantiated when |
| 163 this StreamEngine is opened. | 164 this StreamEngine is opened. |
| 164 streamserver_uri (str or None): The LogDog Butler stream server URI. See | 165 streamserver_uri (str or None): The LogDog Butler stream server URI. See |
| 165 LogDog client library docs for details on supported protocols and | 166 LogDog client library docs for details on supported protocols and |
| 166 format. This will only be used when "client" is None. If this is also | 167 format. This will only be used when "client" is None. If this is also |
| 167 None, a StreamClient will be created through probing. | 168 None, a StreamClient will be created through probing. |
| 168 name_base (str or None): The default stream name prefix that will be added | 169 name_base (str or None): The default stream name prefix that will be added |
| 169 to generated LogDog stream names. If None, no prefix will be applied. | 170 to generated LogDog stream names. If None, no prefix will be applied. |
| 171 dump_path (str or None): If provided, a filesystem path where the final | |
| 172 recipe annotation protobuf binary will be dumped. | |
| 170 ignore_triggers (bool): Triggers are not supported in LogDog annotation | 173 ignore_triggers (bool): Triggers are not supported in LogDog annotation |
| 171 streams. If True, attempts to trigger will be silently ignored. If | 174 streams. If True, attempts to trigger will be silently ignored. If |
| 172 False, they will cause a NotImplementedError to be raised. | 175 False, they will cause a NotImplementedError to be raised. |
| 173 environment (_Environment or None): The _Environment instance to use for | 176 environment (_Environment or None): The _Environment instance to use for |
| 174 operations. This will be None at production, but can be overridden | 177 operations. This will be None at production, but can be overridden |
| 175 here for testing. | 178 here for testing. |
| 176 update_interval (datetime.timedelta or None): The interval of time between | 179 update_interval (datetime.timedelta or None): The interval of time between |
| 177 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be | 180 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be |
| 178 used. | 181 used. |
| 179 """ | 182 """ |
| 180 | 183 |
| 181 self._client = client | 184 self._client = client |
| 182 self._streamserver_uri = streamserver_uri | 185 self._streamserver_uri = streamserver_uri |
| 183 self._name_base = _StreamName(name_base) | 186 self._name_base = _StreamName(name_base) |
| 187 self._dump_path = dump_path | |
| 184 self._ignore_triggers = ignore_triggers | 188 self._ignore_triggers = ignore_triggers |
| 185 self._env = environment or _Environment.real() | 189 self._env = environment or _Environment.real() |
| 186 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL | 190 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL |
| 187 | 191 |
| 188 self._astate = None | 192 self._astate = None |
| 189 | 193 |
| 190 self._annotation_stream = None | 194 self._annotation_stream = None |
| 191 self._annotation_monitor = None | 195 self._annotation_monitor = None |
| 192 self._streams = collections.OrderedDict() | 196 self._streams = collections.OrderedDict() |
| 193 | 197 |
| (...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 415 # Shut down any outstanding streams that may not have been closed for | 419 # Shut down any outstanding streams that may not have been closed for |
| 416 # whatever reason. | 420 # whatever reason. |
| 417 for s in reversed(self._streams.values()): | 421 for s in reversed(self._streams.values()): |
| 418 s.close() | 422 s.close() |
| 419 | 423 |
| 420 # Close out our root Step. Manually check annotation state afterwards. | 424 # Close out our root Step. Manually check annotation state afterwards. |
| 421 self._close_step(self._astate.base) | 425 self._close_step(self._astate.base) |
| 422 self._notify_annotation_changed() | 426 self._notify_annotation_changed() |
| 423 | 427 |
| 424 # Shut down our annotation monitor and close our annotation stream. | 428 # Shut down our annotation monitor and close our annotation stream. |
| 425 self._annotation_monitor.flush_and_join() | 429 last_step_data = self._annotation_monitor.flush_and_join() |
| 426 self._annotation_stream.close() | 430 self._annotation_stream.close() |
| 427 | 431 |
| 428 # Clear our client and state. We are now closed. | 432 # Clear our client and state. We are now closed. |
| 429 self._streams.clear() | 433 self._streams.clear() |
| 430 self._client = None | 434 self._client = None |
| 431 self._astate = None | 435 self._astate = None |
| 432 | 436 |
| 437 # If requested, write out the last step data. | |
| 438 # | |
| 439 # If there is no last step data, this will write an empty file, which is | |
| 440 # still a valid protobuf. | |
| 441 if self._dump_path and last_step_data: | |
| 442 with open(self._dump_path, 'wb') as fd: | |
| 443 fd.write(last_step_data) | |
| 444 | |
| 433 def _notify_annotation_changed(self): | 445 def _notify_annotation_changed(self): |
| 434 if self._astate is None: | 446 if self._astate is None: |
| 435 return | 447 return |
| 436 | 448 |
| 437 step_data = self._astate.check() | 449 step_data = self._astate.check() |
| 438 if step_data is not None: | 450 if step_data is not None: |
| 439 self._annotation_monitor.signal_update(step_data) | 451 self._annotation_monitor.signal_update(step_data) |
| 440 | 452 |
| 441 def _set_timestamp(self, dst, dt=None): | 453 def _set_timestamp(self, dst, dt=None): |
| 442 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. | 454 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. |
| (...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 565 # Note that the timer may also suggest that we flush immediately if we're | 577 # Note that the timer may also suggest that we flush immediately if we're |
| 566 # already past our last flush interval. | 578 # already past our last flush interval. |
| 567 now = self._env.now | 579 now = self._env.now |
| 568 self._current_data = step_data | 580 self._current_data = step_data |
| 569 if structural or self._set_flush_timer_locked(now): | 581 if structural or self._set_flush_timer_locked(now): |
| 570 # We should flush immediately. | 582 # We should flush immediately. |
| 571 self._flush_now_locked(now) | 583 self._flush_now_locked(now) |
| 572 | 584 |
| 573 def flush_and_join(self): | 585 def flush_and_join(self): |
| 574 """Flushes any remaining updates and blocks until the monitor is complete. | 586 """Flushes any remaining updates and blocks until the monitor is complete. |
| 587 | |
| 588 Returns (pb.Step): The final Step protobuf, or None if no step data was | |
| 589 sent. | |
| 575 """ | 590 """ |
| 576 # Mark that we're finished and signal our event. | 591 # Mark that we're finished and signal our event. |
| 577 with self._lock: | 592 with self._lock: |
| 578 self._flush_now_locked(self._env.now) | 593 self._flush_now_locked(self._env.now) |
| 594 return self._last_flush_data | |
|
martiniss
2016/10/12 23:01:25
this is the data we last flushed. We always flush
dnj
2016/10/12 23:15:02
Yep.
| |
| 579 | 595 |
| 580 @property | 596 @property |
| 581 def latest(self): | 597 def latest(self): |
| 582 with self._lock: | 598 with self._lock: |
| 583 return self._last_flush_data | 599 return self._last_flush_data |
| 584 | 600 |
| 585 def _flush_now_locked(self, now): | 601 def _flush_now_locked(self, now): |
| 586 # Clear any current flush timer. | 602 # Clear any current flush timer. |
| 587 self._clear_flush_timer_locked() | 603 self._clear_flush_timer_locked() |
| 588 | 604 |
| (...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 795 return v.replace('/', '_') | 811 return v.replace('/', '_') |
| 796 | 812 |
| 797 @staticmethod | 813 @staticmethod |
| 798 def _normalize(v): | 814 def _normalize(v): |
| 799 return libs.logdog.streamname.normalize(v, prefix='s_') | 815 return libs.logdog.streamname.normalize(v, prefix='s_') |
| 800 | 816 |
| 801 def __str__(self): | 817 def __str__(self): |
| 802 if not self._base: | 818 if not self._base: |
| 803 raise ValueError('Cannot generate string from empty StreamName.') | 819 raise ValueError('Cannot generate string from empty StreamName.') |
| 804 return self._base | 820 return self._base |
| OLD | NEW |