Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: recipe_engine/stream_logdog.py

Issue 2378903002: Annotation protobuf code can now dump final. (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/arguments_pb2.py ('k') | recipes.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """stream.StreamEngine implementation for LogDog, using Milo annotation 5 """stream.StreamEngine implementation for LogDog, using Milo annotation
6 protobuf. 6 protobuf.
7 """ 7 """
8 8
9 import collections 9 import collections
10 import contextlib 10 import contextlib
(...skipping 250 matching lines...) Expand 10 before | Expand all | Expand 10 after
261 261
262 def trigger(self, trigger_spec): 262 def trigger(self, trigger_spec):
263 if self._engine._ignore_triggers: 263 if self._engine._ignore_triggers:
264 return 264 return
265 raise NotImplementedError( 265 raise NotImplementedError(
266 'Stream-based triggering is not supported for LogDog. Please use ' 266 'Stream-based triggering is not supported for LogDog. Please use '
267 'a recipe module (e.g., buildbucket) directly for build scheduling.') 267 'a recipe module (e.g., buildbucket) directly for build scheduling.')
268 268
269 269
270 def __init__(self, client=None, streamserver_uri=None, name_base=None, 270 def __init__(self, client=None, streamserver_uri=None, name_base=None,
271 ignore_triggers=False, environment=None): 271 dump_path=None, ignore_triggers=False, environment=None):
272 """Initializes a new LogDog/Annotation StreamEngine. 272 """Initializes a new LogDog/Annotation StreamEngine.
273 273
274 Args: 274 Args:
275 client (libs.logdog.stream.StreamClient or None): the LogDog stream client 275 client (libs.logdog.stream.StreamClient or None): the LogDog stream client
276 to use. If this is None, a new StreamClient will be instantiated when 276 to use. If this is None, a new StreamClient will be instantiated when
277 this StreamEngine is opened. 277 this StreamEngine is opened.
278 streamserver_uri (str or None): The LogDog Butler stream server URI. See 278 streamserver_uri (str or None): The LogDog Butler stream server URI. See
279 LogDog client library docs for details on supported protocols and 279 LogDog client library docs for details on supported protocols and
280 format. This will only be used when "client" is None. If this is also 280 format. This will only be used when "client" is None. If this is also
281 None, a StreamClient will be created through probing. 281 None, a StreamClient will be created through probing.
282 name_base (str or None): The default stream name prefix that will be added 282 name_base (str or None): The default stream name prefix that will be added
283 to generated LogDog stream names. If None, no prefix will be applied. 283 to generated LogDog stream names. If None, no prefix will be applied.
284 dump_path (str or None): If provided, a filesystem path where the final
285 recipe annotation protobuf binary will be dumped.
284 ignore_triggers (bool): Triggers are not supported in LogDog annotation 286 ignore_triggers (bool): Triggers are not supported in LogDog annotation
285 streams. If True, attempts to trigger will be silently ignored. If 287 streams. If True, attempts to trigger will be silently ignored. If
286 False, they will cause a NotImplementedError to be raised. 288 False, they will cause a NotImplementedError to be raised.
287 environment (_Environment or None): The _Environment instance to use for 289 environment (_Environment or None): The _Environment instance to use for
288 operations. This will be None at production, but can be overridden 290 operations. This will be None at production, but can be overridden
289 here for testing. 291 here for testing.
290 """ 292 """
291 293
292 self._client = client 294 self._client = client
293 self._streamserver_uri = streamserver_uri 295 self._streamserver_uri = streamserver_uri
294 self._name_base = _StreamName(name_base) 296 self._name_base = _StreamName(name_base)
297 self._dump_path = dump_path
295 self._ignore_triggers = ignore_triggers 298 self._ignore_triggers = ignore_triggers
296 self._env = environment or _Environment.probe() 299 self._env = environment or _Environment.probe()
297 300
298 self._astate = None 301 self._astate = None
299 302
300 self._annotation_stream = None 303 self._annotation_stream = None
301 self._annotation_monitor = None 304 self._annotation_monitor = None
302 self._streams = collections.OrderedDict() 305 self._streams = collections.OrderedDict()
303 306
304 def new_step_stream(self, step_config): 307 def new_step_stream(self, step_config):
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 # Shut down any outstanding streams that may not have been closed for 359 # Shut down any outstanding streams that may not have been closed for
357 # whatever reason. 360 # whatever reason.
358 for s in reversed(self._streams.values()): 361 for s in reversed(self._streams.values()):
359 s.close() 362 s.close()
360 363
361 # Close out our root Step. Manually check annotation state afterwards. 364 # Close out our root Step. Manually check annotation state afterwards.
362 self._close_step(self._astate.base) 365 self._close_step(self._astate.base)
363 self._check() 366 self._check()
364 367
365 # Shut down our annotation monitor and close our annotation stream. 368 # Shut down our annotation monitor and close our annotation stream.
366 self._annotation_monitor.flush_and_join() 369 last_step_data = self._annotation_monitor.flush_and_join()
367 self._annotation_stream.close() 370 self._annotation_stream.close()
368 371
369 # Clear our client and state. We are now closed. 372 # Clear our client and state. We are now closed.
370 self._streams.clear() 373 self._streams.clear()
371 self._client = None 374 self._client = None
372 self._astate = None 375 self._astate = None
373 376
377 # If requested, write out the last step data.
378 if self._dump_path and last_step_data:
379 with open(self._dump_path, 'wb') as fd:
nodir 2016/09/28 23:02:22 please swap these two lines, so in the worst case
dnj 2016/09/28 23:04:11 Done.
380 fd.write(last_step_data)
381
374 def _check(self): 382 def _check(self):
375 if self._astate is None: 383 if self._astate is None:
376 return 384 return
377 385
378 step_data = self._astate.check() 386 step_data = self._astate.check()
379 if step_data is not None: 387 if step_data is not None:
380 self._annotation_monitor.signal_update(step_data) 388 self._annotation_monitor.signal_update(step_data)
381 389
382 def _set_timestamp(self, dst, dt=None): 390 def _set_timestamp(self, dst, dt=None):
383 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. 391 """Populates a timestamp_pb2.Timestamp, dst, with a datetime.
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
504 # If not, make sure our timer is running so it will eventually be flushed. 512 # If not, make sure our timer is running so it will eventually be flushed.
505 # Note that the timer may also suggest that we flush immediately. 513 # Note that the timer may also suggest that we flush immediately.
506 now = datetime.datetime.now() 514 now = datetime.datetime.now()
507 self._current_data = step_data 515 self._current_data = step_data
508 if structural or self._set_flush_timer_locked(now): 516 if structural or self._set_flush_timer_locked(now):
509 # We should flush immediately. 517 # We should flush immediately.
510 self._flush_now_locked(now) 518 self._flush_now_locked(now)
511 519
512 def flush_and_join(self): 520 def flush_and_join(self):
513 """Flushes any remaining updates and blocks until the monitor is complete. 521 """Flushes any remaining updates and blocks until the monitor is complete.
522
523 Returns (pb.Step): The final Step protobuf, or None if no step data was
524 sent.
514 """ 525 """
515 # Mark that we're finished and signal our event. 526 # Mark that we're finished and signal our event.
516 with self._lock: 527 with self._lock:
517 self._flush_now_locked(datetime.datetime.now()) 528 self._flush_now_locked(datetime.datetime.now())
529 return self._last_flush_data
518 530
519 @property 531 @property
520 def latest(self): 532 def latest(self):
521 with self._lock: 533 with self._lock:
522 return self._last_flush_data 534 return self._last_flush_data
523 535
524 def _flush_now_locked(self, now): 536 def _flush_now_locked(self, now):
525 # Clear any current flush timer. 537 # Clear any current flush timer.
526 self._clear_flush_timer_locked() 538 self._clear_flush_timer_locked()
527 539
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after
734 return v.replace('/', '_') 746 return v.replace('/', '_')
735 747
736 @staticmethod 748 @staticmethod
737 def _normalize(v): 749 def _normalize(v):
738 return libs.logdog.streamname.normalize(v, prefix='s_') 750 return libs.logdog.streamname.normalize(v, prefix='s_')
739 751
740 def __str__(self): 752 def __str__(self):
741 if not self._base: 753 if not self._base:
742 raise ValueError('Cannot generate string from empty StreamName.') 754 raise ValueError('Cannot generate string from empty StreamName.')
743 return self._base 755 return self._base
OLDNEW
« no previous file with comments | « recipe_engine/arguments_pb2.py ('k') | recipes.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698