Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(808)

Side by Side Diff: recipe_engine/stream_logdog.py

Issue 2378903002: Annotation protobuf code can now dump final. (Closed)
Patch Set: Rebase, add unit test. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/arguments_pb2.py ('k') | recipe_engine/unittests/stream_logdog_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """stream.StreamEngine implementation for LogDog, using Milo annotation 5 """stream.StreamEngine implementation for LogDog, using Milo annotation
6 protobuf. 6 protobuf.
7 """ 7 """
8 8
9 import collections 9 import collections
10 import contextlib 10 import contextlib
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 """ 147 """
148 148
149 # The name of the annotation stream. 149 # The name of the annotation stream.
150 ANNOTATION_NAME = 'annotations' 150 ANNOTATION_NAME = 'annotations'
151 151
152 # The default amount of time in between anotation pushes. 152 # The default amount of time in between anotation pushes.
153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30) 153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30)
154 154
155 155
156 def __init__(self, client=None, streamserver_uri=None, name_base=None, 156 def __init__(self, client=None, streamserver_uri=None, name_base=None,
157 ignore_triggers=False, environment=None, update_interval=None): 157 dump_path=None, ignore_triggers=False, environment=None,
158 update_interval=None):
158 """Initializes a new LogDog/Annotation StreamEngine. 159 """Initializes a new LogDog/Annotation StreamEngine.
159 160
160 Args: 161 Args:
161 client (libs.logdog.stream.StreamClient or None): the LogDog stream client 162 client (libs.logdog.stream.StreamClient or None): the LogDog stream client
162 to use. If this is None, a new StreamClient will be instantiated when 163 to use. If this is None, a new StreamClient will be instantiated when
163 this StreamEngine is opened. 164 this StreamEngine is opened.
164 streamserver_uri (str or None): The LogDog Butler stream server URI. See 165 streamserver_uri (str or None): The LogDog Butler stream server URI. See
165 LogDog client library docs for details on supported protocols and 166 LogDog client library docs for details on supported protocols and
166 format. This will only be used when "client" is None. If this is also 167 format. This will only be used when "client" is None. If this is also
167 None, a StreamClient will be created through probing. 168 None, a StreamClient will be created through probing.
168 name_base (str or None): The default stream name prefix that will be added 169 name_base (str or None): The default stream name prefix that will be added
169 to generated LogDog stream names. If None, no prefix will be applied. 170 to generated LogDog stream names. If None, no prefix will be applied.
171 dump_path (str or None): If provided, a filesystem path where the final
172 recipe annotation protobuf binary will be dumped.
170 ignore_triggers (bool): Triggers are not supported in LogDog annotation 173 ignore_triggers (bool): Triggers are not supported in LogDog annotation
171 streams. If True, attempts to trigger will be silently ignored. If 174 streams. If True, attempts to trigger will be silently ignored. If
172 False, they will cause a NotImplementedError to be raised. 175 False, they will cause a NotImplementedError to be raised.
173 environment (_Environment or None): The _Environment instance to use for 176 environment (_Environment or None): The _Environment instance to use for
174 operations. This will be None at production, but can be overridden 177 operations. This will be None at production, but can be overridden
175 here for testing. 178 here for testing.
176 update_interval (datetime.timedelta or None): The interval of time between 179 update_interval (datetime.timedelta or None): The interval of time between
177 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be 180 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be
178 used. 181 used.
179 """ 182 """
180 183
181 self._client = client 184 self._client = client
182 self._streamserver_uri = streamserver_uri 185 self._streamserver_uri = streamserver_uri
183 self._name_base = _StreamName(name_base) 186 self._name_base = _StreamName(name_base)
187 self._dump_path = dump_path
184 self._ignore_triggers = ignore_triggers 188 self._ignore_triggers = ignore_triggers
185 self._env = environment or _Environment.real() 189 self._env = environment or _Environment.real()
186 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL 190 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL
187 191
188 self._astate = None 192 self._astate = None
189 193
190 self._annotation_stream = None 194 self._annotation_stream = None
191 self._annotation_monitor = None 195 self._annotation_monitor = None
192 self._streams = collections.OrderedDict() 196 self._streams = collections.OrderedDict()
193 197
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after
415 # Shut down any outstanding streams that may not have been closed for 419 # Shut down any outstanding streams that may not have been closed for
416 # whatever reason. 420 # whatever reason.
417 for s in reversed(self._streams.values()): 421 for s in reversed(self._streams.values()):
418 s.close() 422 s.close()
419 423
420 # Close out our root Step. Manually check annotation state afterwards. 424 # Close out our root Step. Manually check annotation state afterwards.
421 self._close_step(self._astate.base) 425 self._close_step(self._astate.base)
422 self._notify_annotation_changed() 426 self._notify_annotation_changed()
423 427
424 # Shut down our annotation monitor and close our annotation stream. 428 # Shut down our annotation monitor and close our annotation stream.
425 self._annotation_monitor.flush_and_join() 429 last_step_data = self._annotation_monitor.flush_and_join()
426 self._annotation_stream.close() 430 self._annotation_stream.close()
427 431
428 # Clear our client and state. We are now closed. 432 # Clear our client and state. We are now closed.
429 self._streams.clear() 433 self._streams.clear()
430 self._client = None 434 self._client = None
431 self._astate = None 435 self._astate = None
432 436
437 # If requested, write out the last step data.
438 #
439 # If there is no last step data, this will write an empty file, which is
440 # still a valid protobuf.
441 if self._dump_path and last_step_data:
442 with open(self._dump_path, 'wb') as fd:
443 fd.write(last_step_data)
444
433 def _notify_annotation_changed(self): 445 def _notify_annotation_changed(self):
434 if self._astate is None: 446 if self._astate is None:
435 return 447 return
436 448
437 step_data = self._astate.check() 449 step_data = self._astate.check()
438 if step_data is not None: 450 if step_data is not None:
439 self._annotation_monitor.signal_update(step_data) 451 self._annotation_monitor.signal_update(step_data)
440 452
441 def _set_timestamp(self, dst, dt=None): 453 def _set_timestamp(self, dst, dt=None):
442 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. 454 """Populates a timestamp_pb2.Timestamp, dst, with a datetime.
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
565 # Note that the timer may also suggest that we flush immediately if we're 577 # Note that the timer may also suggest that we flush immediately if we're
566 # already past our last flush interval. 578 # already past our last flush interval.
567 now = self._env.now 579 now = self._env.now
568 self._current_data = step_data 580 self._current_data = step_data
569 if structural or self._set_flush_timer_locked(now): 581 if structural or self._set_flush_timer_locked(now):
570 # We should flush immediately. 582 # We should flush immediately.
571 self._flush_now_locked(now) 583 self._flush_now_locked(now)
572 584
573 def flush_and_join(self): 585 def flush_and_join(self):
574 """Flushes any remaining updates and blocks until the monitor is complete. 586 """Flushes any remaining updates and blocks until the monitor is complete.
587
588 Returns (pb.Step): The final Step protobuf, or None if no step data was
589 sent.
575 """ 590 """
576 # Mark that we're finished and signal our event. 591 # Mark that we're finished and signal our event.
577 with self._lock: 592 with self._lock:
578 self._flush_now_locked(self._env.now) 593 self._flush_now_locked(self._env.now)
594 return self._last_flush_data
martiniss 2016/10/12 23:01:25 this is the data we last flushed. We always flush
dnj 2016/10/12 23:15:02 Yep.
579 595
580 @property 596 @property
581 def latest(self): 597 def latest(self):
582 with self._lock: 598 with self._lock:
583 return self._last_flush_data 599 return self._last_flush_data
584 600
585 def _flush_now_locked(self, now): 601 def _flush_now_locked(self, now):
586 # Clear any current flush timer. 602 # Clear any current flush timer.
587 self._clear_flush_timer_locked() 603 self._clear_flush_timer_locked()
588 604
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after
795 return v.replace('/', '_') 811 return v.replace('/', '_')
796 812
797 @staticmethod 813 @staticmethod
798 def _normalize(v): 814 def _normalize(v):
799 return libs.logdog.streamname.normalize(v, prefix='s_') 815 return libs.logdog.streamname.normalize(v, prefix='s_')
800 816
801 def __str__(self): 817 def __str__(self):
802 if not self._base: 818 if not self._base:
803 raise ValueError('Cannot generate string from empty StreamName.') 819 raise ValueError('Cannot generate string from empty StreamName.')
804 return self._base 820 return self._base
OLDNEW
« no previous file with comments | « recipe_engine/arguments_pb2.py ('k') | recipe_engine/unittests/stream_logdog_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698