Index: recipe_engine/stream_logdog.py |
diff --git a/recipe_engine/stream_logdog.py b/recipe_engine/stream_logdog.py |
index 5531a04911012027a2f46b0f0dd8a373af03bf5c..8c8c69703ccfd7efa4a0a98496f5b3f8237e8827 100644 |
--- a/recipe_engine/stream_logdog.py |
+++ b/recipe_engine/stream_logdog.py |
@@ -154,7 +154,8 @@ class StreamEngine(stream.StreamEngine): |
def __init__(self, client=None, streamserver_uri=None, name_base=None, |
- ignore_triggers=False, environment=None, update_interval=None): |
+ dump_path=None, ignore_triggers=False, environment=None, |
+ update_interval=None): |
"""Initializes a new LogDog/Annotation StreamEngine. |
Args: |
@@ -167,6 +168,8 @@ class StreamEngine(stream.StreamEngine): |
None, a StreamClient will be created through probing. |
name_base (str or None): The default stream name prefix that will be added |
to generated LogDog stream names. If None, no prefix will be applied. |
+ dump_path (str or None): If provided, a filesystem path where the final |
+ recipe annotation protobuf binary will be dumped. |
ignore_triggers (bool): Triggers are not supported in LogDog annotation |
streams. If True, attempts to trigger will be silently ignored. If |
False, they will cause a NotImplementedError to be raised. |
@@ -181,6 +184,7 @@ class StreamEngine(stream.StreamEngine): |
self._client = client |
self._streamserver_uri = streamserver_uri |
self._name_base = _StreamName(name_base) |
+ self._dump_path = dump_path |
self._ignore_triggers = ignore_triggers |
self._env = environment or _Environment.real() |
self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL |
@@ -422,7 +426,7 @@ class StreamEngine(stream.StreamEngine): |
self._notify_annotation_changed() |
# Shut down our annotation monitor and close our annotation stream. |
- self._annotation_monitor.flush_and_join() |
+ last_step_data = self._annotation_monitor.flush_and_join() |
self._annotation_stream.close() |
# Clear our client and state. We are now closed. |
@@ -430,6 +434,14 @@ class StreamEngine(stream.StreamEngine): |
self._client = None |
self._astate = None |
+ # If requested, write out the last step data. |
+ # |
+ # If there is no last step data, this will write an empty file, which is |
+ # still a valid protobuf. |
+ if self._dump_path and last_step_data: |
+ with open(self._dump_path, 'wb') as fd: |
+ fd.write(last_step_data) |
+ |
def _notify_annotation_changed(self): |
if self._astate is None: |
return |
@@ -572,10 +584,14 @@ class _AnnotationMonitor(object): |
def flush_and_join(self): |
"""Flushes any remaining updates and blocks until the monitor is complete. |
+ |
+ Returns (pb.Step): The final Step protobuf, or None if no step data was |
+ sent. |
""" |
# Mark that we're finished and signal our event. |
with self._lock: |
self._flush_now_locked(self._env.now) |
+ return self._last_flush_data |
martiniss
2016/10/12 23:01:25
this is the data we last flushed. We always flush
dnj
2016/10/12 23:15:02
Yep.
|
@property |
def latest(self): |