OLD | NEW |
---|---|
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """stream.StreamEngine implementation for LogDog, using Milo annotation | 5 """stream.StreamEngine implementation for LogDog, using Milo annotation |
6 protobuf. | 6 protobuf. |
7 """ | 7 """ |
8 | 8 |
9 import collections | 9 import collections |
10 import contextlib | 10 import contextlib |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
147 """ | 147 """ |
148 | 148 |
149 # The name of the annotation stream. | 149 # The name of the annotation stream. |
150 ANNOTATION_NAME = 'annotations' | 150 ANNOTATION_NAME = 'annotations' |
151 | 151 |
152 # The default amount of time in between anotation pushes. | 152 # The default amount of time in between anotation pushes. |
153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30) | 153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30) |
154 | 154 |
155 | 155 |
156 def __init__(self, client=None, streamserver_uri=None, name_base=None, | 156 def __init__(self, client=None, streamserver_uri=None, name_base=None, |
157 ignore_triggers=False, environment=None, update_interval=None): | 157 dump_path=None, ignore_triggers=False, environment=None, |
158 update_interval=None): | |
158 """Initializes a new LogDog/Annotation StreamEngine. | 159 """Initializes a new LogDog/Annotation StreamEngine. |
159 | 160 |
160 Args: | 161 Args: |
161 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | 162 client (libs.logdog.stream.StreamClient or None): the LogDog stream client |
162 to use. If this is None, a new StreamClient will be instantiated when | 163 to use. If this is None, a new StreamClient will be instantiated when |
163 this StreamEngine is opened. | 164 this StreamEngine is opened. |
164 streamserver_uri (str or None): The LogDog Butler stream server URI. See | 165 streamserver_uri (str or None): The LogDog Butler stream server URI. See |
165 LogDog client library docs for details on supported protocols and | 166 LogDog client library docs for details on supported protocols and |
166 format. This will only be used when "client" is None. If this is also | 167 format. This will only be used when "client" is None. If this is also |
167 None, a StreamClient will be created through probing. | 168 None, a StreamClient will be created through probing. |
168 name_base (str or None): The default stream name prefix that will be added | 169 name_base (str or None): The default stream name prefix that will be added |
169 to generated LogDog stream names. If None, no prefix will be applied. | 170 to generated LogDog stream names. If None, no prefix will be applied. |
171 dump_path (str or None): If provided, a filesystem path where the final | |
172 recipe annotation protobuf binary will be dumped. | |
170 ignore_triggers (bool): Triggers are not supported in LogDog annotation | 173 ignore_triggers (bool): Triggers are not supported in LogDog annotation |
171 streams. If True, attempts to trigger will be silently ignored. If | 174 streams. If True, attempts to trigger will be silently ignored. If |
172 False, they will cause a NotImplementedError to be raised. | 175 False, they will cause a NotImplementedError to be raised. |
173 environment (_Environment or None): The _Environment instance to use for | 176 environment (_Environment or None): The _Environment instance to use for |
174 operations. This will be None at production, but can be overridden | 177 operations. This will be None at production, but can be overridden |
175 here for testing. | 178 here for testing. |
176 update_interval (datetime.timedelta or None): The interval of time between | 179 update_interval (datetime.timedelta or None): The interval of time between |
177 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be | 180 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be |
178 used. | 181 used. |
179 """ | 182 """ |
180 | 183 |
181 self._client = client | 184 self._client = client |
182 self._streamserver_uri = streamserver_uri | 185 self._streamserver_uri = streamserver_uri |
183 self._name_base = _StreamName(name_base) | 186 self._name_base = _StreamName(name_base) |
187 self._dump_path = dump_path | |
184 self._ignore_triggers = ignore_triggers | 188 self._ignore_triggers = ignore_triggers |
185 self._env = environment or _Environment.real() | 189 self._env = environment or _Environment.real() |
186 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL | 190 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL |
187 | 191 |
188 self._astate = None | 192 self._astate = None |
189 | 193 |
190 self._annotation_stream = None | 194 self._annotation_stream = None |
191 self._annotation_monitor = None | 195 self._annotation_monitor = None |
192 self._streams = collections.OrderedDict() | 196 self._streams = collections.OrderedDict() |
193 | 197 |
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
415 # Shut down any outstanding streams that may not have been closed for | 419 # Shut down any outstanding streams that may not have been closed for |
416 # whatever reason. | 420 # whatever reason. |
417 for s in reversed(self._streams.values()): | 421 for s in reversed(self._streams.values()): |
418 s.close() | 422 s.close() |
419 | 423 |
420 # Close out our root Step. Manually check annotation state afterwards. | 424 # Close out our root Step. Manually check annotation state afterwards. |
421 self._close_step(self._astate.base) | 425 self._close_step(self._astate.base) |
422 self._notify_annotation_changed() | 426 self._notify_annotation_changed() |
423 | 427 |
424 # Shut down our annotation monitor and close our annotation stream. | 428 # Shut down our annotation monitor and close our annotation stream. |
425 self._annotation_monitor.flush_and_join() | 429 last_step_data = self._annotation_monitor.flush_and_join() |
426 self._annotation_stream.close() | 430 self._annotation_stream.close() |
427 | 431 |
428 # Clear our client and state. We are now closed. | 432 # Clear our client and state. We are now closed. |
429 self._streams.clear() | 433 self._streams.clear() |
430 self._client = None | 434 self._client = None |
431 self._astate = None | 435 self._astate = None |
432 | 436 |
437 # If requested, write out the last step data. | |
438 # | |
439 # If there is no last step data, this will write an empty file, which is | |
440 # still a valid protobuf. | |
441 if self._dump_path and last_step_data: | |
442 with open(self._dump_path, 'wb') as fd: | |
443 fd.write(last_step_data) | |
444 | |
433 def _notify_annotation_changed(self): | 445 def _notify_annotation_changed(self): |
434 if self._astate is None: | 446 if self._astate is None: |
435 return | 447 return |
436 | 448 |
437 step_data = self._astate.check() | 449 step_data = self._astate.check() |
438 if step_data is not None: | 450 if step_data is not None: |
439 self._annotation_monitor.signal_update(step_data) | 451 self._annotation_monitor.signal_update(step_data) |
440 | 452 |
441 def _set_timestamp(self, dst, dt=None): | 453 def _set_timestamp(self, dst, dt=None): |
442 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. | 454 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
565 # Note that the timer may also suggest that we flush immediately if we're | 577 # Note that the timer may also suggest that we flush immediately if we're |
566 # already past our last flush interval. | 578 # already past our last flush interval. |
567 now = self._env.now | 579 now = self._env.now |
568 self._current_data = step_data | 580 self._current_data = step_data |
569 if structural or self._set_flush_timer_locked(now): | 581 if structural or self._set_flush_timer_locked(now): |
570 # We should flush immediately. | 582 # We should flush immediately. |
571 self._flush_now_locked(now) | 583 self._flush_now_locked(now) |
572 | 584 |
573 def flush_and_join(self): | 585 def flush_and_join(self): |
574 """Flushes any remaining updates and blocks until the monitor is complete. | 586 """Flushes any remaining updates and blocks until the monitor is complete. |
587 | |
588 Returns (pb.Step): The final Step protobuf, or None if no step data was | |
589 sent. | |
575 """ | 590 """ |
576 # Mark that we're finished and signal our event. | 591 # Mark that we're finished and signal our event. |
577 with self._lock: | 592 with self._lock: |
578 self._flush_now_locked(self._env.now) | 593 self._flush_now_locked(self._env.now) |
594 return self._last_flush_data | |
martiniss
2016/10/12 23:01:25
this is the data we last flushed. We always flush
dnj
2016/10/12 23:15:02
Yep.
| |
579 | 595 |
580 @property | 596 @property |
581 def latest(self): | 597 def latest(self): |
582 with self._lock: | 598 with self._lock: |
583 return self._last_flush_data | 599 return self._last_flush_data |
584 | 600 |
585 def _flush_now_locked(self, now): | 601 def _flush_now_locked(self, now): |
586 # Clear any current flush timer. | 602 # Clear any current flush timer. |
587 self._clear_flush_timer_locked() | 603 self._clear_flush_timer_locked() |
588 | 604 |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
795 return v.replace('/', '_') | 811 return v.replace('/', '_') |
796 | 812 |
797 @staticmethod | 813 @staticmethod |
798 def _normalize(v): | 814 def _normalize(v): |
799 return libs.logdog.streamname.normalize(v, prefix='s_') | 815 return libs.logdog.streamname.normalize(v, prefix='s_') |
800 | 816 |
801 def __str__(self): | 817 def __str__(self): |
802 if not self._base: | 818 if not self._base: |
803 raise ValueError('Cannot generate string from empty StreamName.') | 819 raise ValueError('Cannot generate string from empty StreamName.') |
804 return self._base | 820 return self._base |
OLD | NEW |