OLD | NEW |
---|---|
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """stream.StreamEngine implementation for LogDog, using Milo annotation | 5 """stream.StreamEngine implementation for LogDog, using Milo annotation |
6 protobuf. | 6 protobuf. |
7 """ | 7 """ |
8 | 8 |
9 import collections | 9 import collections |
10 import contextlib | 10 import contextlib |
(...skipping 250 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
261 | 261 |
262 def trigger(self, trigger_spec): | 262 def trigger(self, trigger_spec): |
263 if self._engine._ignore_triggers: | 263 if self._engine._ignore_triggers: |
264 return | 264 return |
265 raise NotImplementedError( | 265 raise NotImplementedError( |
266 'Stream-based triggering is not supported for LogDog. Please use ' | 266 'Stream-based triggering is not supported for LogDog. Please use ' |
267 'a recipe module (e.g., buildbucket) directly for build scheduling.') | 267 'a recipe module (e.g., buildbucket) directly for build scheduling.') |
268 | 268 |
269 | 269 |
270 def __init__(self, client=None, streamserver_uri=None, name_base=None, | 270 def __init__(self, client=None, streamserver_uri=None, name_base=None, |
271 ignore_triggers=False, environment=None): | 271 dump_path=None, ignore_triggers=False, environment=None): |
272 """Initializes a new LogDog/Annotation StreamEngine. | 272 """Initializes a new LogDog/Annotation StreamEngine. |
273 | 273 |
274 Args: | 274 Args: |
275 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | 275 client (libs.logdog.stream.StreamClient or None): the LogDog stream client |
276 to use. If this is None, a new StreamClient will be instantiated when | 276 to use. If this is None, a new StreamClient will be instantiated when |
277 this StreamEngine is opened. | 277 this StreamEngine is opened. |
278 streamserver_uri (str or None): The LogDog Butler stream server URI. See | 278 streamserver_uri (str or None): The LogDog Butler stream server URI. See |
279 LogDog client library docs for details on supported protocols and | 279 LogDog client library docs for details on supported protocols and |
280 format. This will only be used when "client" is None. If this is also | 280 format. This will only be used when "client" is None. If this is also |
281 None, a StreamClient will be created through probing. | 281 None, a StreamClient will be created through probing. |
282 name_base (str or None): The default stream name prefix that will be added | 282 name_base (str or None): The default stream name prefix that will be added |
283 to generated LogDog stream names. If None, no prefix will be applied. | 283 to generated LogDog stream names. If None, no prefix will be applied. |
284 dump_path (str or None): If provided, a filesystem path where the final | |
285 recipe annotation protobuf binary will be dumped. | |
284 ignore_triggers (bool): Triggers are not supported in LogDog annotation | 286 ignore_triggers (bool): Triggers are not supported in LogDog annotation |
285 streams. If True, attempts to trigger will be silently ignored. If | 287 streams. If True, attempts to trigger will be silently ignored. If |
286 False, they will cause a NotImplementedError to be raised. | 288 False, they will cause a NotImplementedError to be raised. |
287 environment (_Environment or None): The _Environment instance to use for | 289 environment (_Environment or None): The _Environment instance to use for |
288 operations. This will be None at production, but can be overridden | 290 operations. This will be None at production, but can be overridden |
289 here for testing. | 291 here for testing. |
290 """ | 292 """ |
291 | 293 |
292 self._client = client | 294 self._client = client |
293 self._streamserver_uri = streamserver_uri | 295 self._streamserver_uri = streamserver_uri |
294 self._name_base = _StreamName(name_base) | 296 self._name_base = _StreamName(name_base) |
297 self._dump_path = dump_path | |
295 self._ignore_triggers = ignore_triggers | 298 self._ignore_triggers = ignore_triggers |
296 self._env = environment or _Environment.probe() | 299 self._env = environment or _Environment.probe() |
297 | 300 |
298 self._astate = None | 301 self._astate = None |
299 | 302 |
300 self._annotation_stream = None | 303 self._annotation_stream = None |
301 self._annotation_monitor = None | 304 self._annotation_monitor = None |
302 self._streams = collections.OrderedDict() | 305 self._streams = collections.OrderedDict() |
303 | 306 |
304 def new_step_stream(self, step_config): | 307 def new_step_stream(self, step_config): |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
356 # Shut down any outstanding streams that may not have been closed for | 359 # Shut down any outstanding streams that may not have been closed for |
357 # whatever reason. | 360 # whatever reason. |
358 for s in reversed(self._streams.values()): | 361 for s in reversed(self._streams.values()): |
359 s.close() | 362 s.close() |
360 | 363 |
361 # Close out our root Step. Manually check annotation state afterwards. | 364 # Close out our root Step. Manually check annotation state afterwards. |
362 self._close_step(self._astate.base) | 365 self._close_step(self._astate.base) |
363 self._check() | 366 self._check() |
364 | 367 |
365 # Shut down our annotation monitor and close our annotation stream. | 368 # Shut down our annotation monitor and close our annotation stream. |
366 self._annotation_monitor.flush_and_join() | 369 last_step_data = self._annotation_monitor.flush_and_join() |
367 self._annotation_stream.close() | 370 self._annotation_stream.close() |
368 | 371 |
369 # Clear our client and state. We are now closed. | 372 # Clear our client and state. We are now closed. |
370 self._streams.clear() | 373 self._streams.clear() |
371 self._client = None | 374 self._client = None |
372 self._astate = None | 375 self._astate = None |
373 | 376 |
377 # If requested, write out the last step data. | |
378 if self._dump_path and last_step_data: | |
379 with open(self._dump_path, 'wb') as fd: | |
nodir
2016/09/28 23:02:22
please swap these two lines, so in the worst case
dnj
2016/09/28 23:04:11
Done.
| |
380 fd.write(last_step_data) | |
381 | |
374 def _check(self): | 382 def _check(self): |
375 if self._astate is None: | 383 if self._astate is None: |
376 return | 384 return |
377 | 385 |
378 step_data = self._astate.check() | 386 step_data = self._astate.check() |
379 if step_data is not None: | 387 if step_data is not None: |
380 self._annotation_monitor.signal_update(step_data) | 388 self._annotation_monitor.signal_update(step_data) |
381 | 389 |
382 def _set_timestamp(self, dst, dt=None): | 390 def _set_timestamp(self, dst, dt=None): |
383 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. | 391 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. |
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
504 # If not, make sure our timer is running so it will eventually be flushed. | 512 # If not, make sure our timer is running so it will eventually be flushed. |
505 # Note that the timer may also suggest that we flush immediately. | 513 # Note that the timer may also suggest that we flush immediately. |
506 now = datetime.datetime.now() | 514 now = datetime.datetime.now() |
507 self._current_data = step_data | 515 self._current_data = step_data |
508 if structural or self._set_flush_timer_locked(now): | 516 if structural or self._set_flush_timer_locked(now): |
509 # We should flush immediately. | 517 # We should flush immediately. |
510 self._flush_now_locked(now) | 518 self._flush_now_locked(now) |
511 | 519 |
512 def flush_and_join(self): | 520 def flush_and_join(self): |
513 """Flushes any remaining updates and blocks until the monitor is complete. | 521 """Flushes any remaining updates and blocks until the monitor is complete. |
522 | |
523 Returns (pb.Step): The final Step protobuf, or None if no step data was | |
524 sent. | |
514 """ | 525 """ |
515 # Mark that we're finished and signal our event. | 526 # Mark that we're finished and signal our event. |
516 with self._lock: | 527 with self._lock: |
517 self._flush_now_locked(datetime.datetime.now()) | 528 self._flush_now_locked(datetime.datetime.now()) |
529 return self._last_flush_data | |
518 | 530 |
519 @property | 531 @property |
520 def latest(self): | 532 def latest(self): |
521 with self._lock: | 533 with self._lock: |
522 return self._last_flush_data | 534 return self._last_flush_data |
523 | 535 |
524 def _flush_now_locked(self, now): | 536 def _flush_now_locked(self, now): |
525 # Clear any current flush timer. | 537 # Clear any current flush timer. |
526 self._clear_flush_timer_locked() | 538 self._clear_flush_timer_locked() |
527 | 539 |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
734 return v.replace('/', '_') | 746 return v.replace('/', '_') |
735 | 747 |
736 @staticmethod | 748 @staticmethod |
737 def _normalize(v): | 749 def _normalize(v): |
738 return libs.logdog.streamname.normalize(v, prefix='s_') | 750 return libs.logdog.streamname.normalize(v, prefix='s_') |
739 | 751 |
740 def __str__(self): | 752 def __str__(self): |
741 if not self._base: | 753 if not self._base: |
742 raise ValueError('Cannot generate string from empty StreamName.') | 754 raise ValueError('Cannot generate string from empty StreamName.') |
743 return self._base | 755 return self._base |
OLD | NEW |