Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 # that can be found in the LICENSE file. | |
| 4 | |
| 5 """stream.StreamEngine implementation for LogDog, using Milo annotation | |
| 6 protobuf. | |
| 7 """ | |
| 8 | |
| 9 import collections | |
| 10 import contextlib | |
| 11 import copy | |
| 12 import datetime | |
| 13 import os | |
| 14 import threading | |
| 15 import sys | |
| 16 | |
| 17 from . import env | |
| 18 from . import stream | |
| 19 from . import util | |
| 20 | |
| 21 import google.protobuf.message | |
| 22 import google.protobuf.timestamp_pb2 as timestamp_pb2 | |
| 23 import libs.logdog.bootstrap | |
| 24 import libs.logdog.stream | |
| 25 import libs.logdog.streamname | |
| 26 import annotations_pb2 as pb | |
| 27 | |
| 28 | |
| 29 # The datetime for the epoch. | |
| 30 _EPOCH = datetime.datetime.utcfromtimestamp(0) | |
| 31 | |
| 32 # The annotation stream ContentType. | |
| 33 # | |
| 34 # This must match the ContentType for the annotation binary protobuf, which | |
| 35 # is specified in "<luci-go>/common/proto/milo/util.go". | |
| 36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2' | |
| 37 | |
| 38 | |
| 39 class _Environment(object): | |
| 40 """Simulated system environment. The StreamEngine uses this to probe for | |
| 41 system parameters. By default, the environment will be derived from the | |
| 42 actual system. | |
| 43 """ | |
| 44 | |
| 45 def __init__(self, now_fn, argv, environ, cwd): | |
| 46 self._now_fn = now_fn | |
| 47 self.argv = argv | |
| 48 self.environ = environ | |
| 49 self.cwd = cwd | |
| 50 | |
| 51 @property | |
| 52 def now(self): | |
| 53 return self._now_fn() | |
| 54 | |
| 55 @classmethod | |
| 56 def probe(cls): | |
| 57 return cls( | |
| 58 now_fn=datetime.datetime.now, | |
| 59 argv=sys.argv[:], | |
| 60 environ=dict(os.environ), | |
| 61 cwd=os.getcwd(), | |
| 62 ) | |
| 63 | |
| 64 | |
| 65 class StreamEngine(stream.StreamEngine): | |
| 66 """A stream.StreamEngine implementation that uses Logdog streams and Milo | |
| 67 annotation protobufs. | |
| 68 | |
| 69 The generated LogDog streams will be (relative to "name_base"): | |
| 70 /annotations (or "annotation_name", if overridden) | |
| 71 The base annotations stream. | |
| 72 | |
| 73 /steps/<step_name>/ | |
| 74 Base stream name for a given step. Note that if multiple steps normalize | |
| 75 to the same <step_name> value, an index will be appended, such as | |
| 76 <step_name>_0. This can happen because the stream name component of a | |
| 77 step is normalized, so two validly-independent steps ("My Thing" and | |
| 78 "My_Thing") will both normalize to "My_Thing". In this case, the second | |
| 79 one would have the stream name component, "My_Thing_1". | |
| 80 | |
| 81 /steps/<step_name>/stdout | |
| 82 STDOUT stream for step "<step_name>". | |
| 83 /steps/<step_name>/stderr | |
| 84 STDOUT stream for step "<step_name>". | |
| 85 | |
| 86 /steps/<step_name>/logs/<log_name>/<log_name_index> | |
| 87 Stream name for a given step's logs. <log_name_index> is the index of the | |
| 88 log with the given normalize name. This is similar to <step_name>, only | |
|
martiniss
2016/09/01 21:59:47
typo: with the given normalized name
dnj
2016/09/07 17:54:58
Done.
| |
| 89 the index is added as a separate stream name component. | |
| 90 """ | |
| 91 | |
| 92 # The default name of the annotation stream. | |
| 93 DEFAULT_ANNOTATION_NAME = 'annotations' | |
| 94 | |
| 95 | |
| 96 class TextStream(stream.StreamEngine.Stream): | |
|
martiniss
2016/09/01 21:59:47
This looks really general; maybe this should be in
dnj
2016/09/07 17:54:57
I don't think so. I think it's reasonable for impl
| |
| 97 | |
| 98 def __init__(self, fd): | |
| 99 super(StreamEngine.TextStream, self).__init__() | |
| 100 self._fd = fd | |
| 101 | |
| 102 ## | |
| 103 # Implement stream.StreamEngine.Stream | |
| 104 ## | |
| 105 | |
| 106 def write_line(self, line): | |
| 107 self._fd.write(line) | |
| 108 self._fd.write('\n') | |
| 109 | |
| 110 def write_split(self, string): | |
| 111 self._fd.write(string) | |
| 112 if not string.endswith('\n'): | |
| 113 self._fd.write('\n') | |
| 114 | |
| 115 def close(self): | |
| 116 self._fd.close() | |
| 117 | |
| 118 | |
| 119 class StepStream(stream.StreamEngine.StepStream): | |
| 120 """An individual step stream.""" | |
| 121 | |
| 122 def __init__(self, engine, step): | |
| 123 # We will lazily create the STDOUT stream when the first data is written. | |
| 124 super(StreamEngine.StepStream, self).__init__() | |
| 125 | |
| 126 self._engine = engine | |
| 127 self._step = step | |
| 128 | |
| 129 # We keep track of the log streams associated with this step. | |
| 130 self._log_stream_index = {} | |
| 131 | |
| 132 # We will lazily instantiate our stdout stream when content is actually | |
| 133 # written to it. | |
| 134 self._stdout_stream = None | |
| 135 | |
| 136 # The retained step summary text. When generating failure details, this | |
| 137 # will be consumed to populate their text field. | |
| 138 self._summary_text = None | |
| 139 | |
| 140 @classmethod | |
| 141 def create(cls, engine, step): | |
| 142 stream = cls(engine, step) | |
| 143 | |
| 144 # Start our step. | |
| 145 stream._step.msg.status = pb.RUNNING | |
| 146 engine._set_timestamp(stream._step.msg.started) | |
| 147 | |
| 148 return stream | |
| 149 | |
| 150 def _check_annotation_changed(fn): | |
| 151 """Decorator that can be applied to a StepStream instance method to call | |
| 152 our bound Engine's _check after the function is finished. | |
| 153 | |
| 154 This should decorate any method that modifies annotation state. | |
| 155 """ | |
| 156 def check_after(inner, *args, **kwargs): | |
| 157 v = fn(inner, *args, **kwargs) | |
| 158 inner._engine._check() | |
| 159 return v | |
| 160 return check_after | |
| 161 | |
| 162 @_check_annotation_changed | |
| 163 def _get_stdout(self): | |
| 164 if self._stdout_stream is None: | |
| 165 # Create a new STDOUT text stream. | |
| 166 stream_name = self._step.stream_name_base.append('stdout') | |
| 167 self._stdout_stream = self._engine._client.open_text(str(stream_name)) | |
| 168 | |
| 169 self._step.msg.stdout_stream.name = str(stream_name) | |
| 170 return self._stdout_stream | |
| 171 | |
| 172 ## | |
| 173 # Implement stream.StreamEngine.Stream | |
| 174 ## | |
| 175 | |
| 176 def write_line(self, line): | |
| 177 stdout = self._get_stdout() | |
| 178 stdout.write(line) | |
| 179 stdout.write('\n') | |
| 180 | |
| 181 def write_split(self, string): | |
| 182 stdout = self._get_stdout() | |
| 183 stdout.write(string) | |
| 184 if not string.endswith('\n'): | |
| 185 stdout.write('\n') | |
| 186 | |
| 187 @_check_annotation_changed | |
| 188 def close(self): | |
| 189 if self._stdout_stream is not None: | |
| 190 self._stdout_stream.close() | |
| 191 | |
| 192 # If we still have retained summary text, a failure, and no failure detail | |
| 193 # text, copy it there. | |
| 194 if self._summary_text is not None: | |
| 195 if (self._step.msg.HasField('failure_details') and | |
| 196 not self._step.msg.failure_details.text): | |
| 197 self._step.msg.failure_details.text = self._summary_text | |
| 198 | |
| 199 # Close our Step. | |
| 200 self._engine._close_step(self._step.msg) | |
| 201 | |
| 202 | |
|
martiniss
2016/09/01 21:59:47
nit: spaces
dnj
2016/09/07 17:54:57
Done.
| |
| 203 ## | |
| 204 # Implement stream.StreamEngine.StepStream | |
| 205 ## | |
| 206 | |
| 207 @_check_annotation_changed | |
| 208 def new_log_stream(self, log_name): | |
| 209 # Generate the base normalized log stream name for this log. | |
| 210 stream_name = self._step.stream_name_base.append('logs', log_name) | |
| 211 | |
| 212 # Add the log stream index to the end of the stream name. | |
| 213 index = self._log_stream_index.setdefault(str(stream_name), 0) | |
| 214 self._log_stream_index[str(stream_name)] = index + 1 | |
| 215 stream_name = stream_name.append(str(index)) | |
| 216 | |
| 217 # Create a new log stream for this name. | |
| 218 fd = self._engine._client.open_text(str(stream_name)) | |
| 219 | |
| 220 # Update our step to include the log stream. | |
| 221 link = self._step.msg.other_links.add(label=log_name) | |
| 222 link.logdog_stream.name = str(stream_name) | |
| 223 | |
| 224 return self._engine.TextStream(fd) | |
| 225 | |
| 226 @_check_annotation_changed | |
| 227 def add_step_text(self, text): | |
| 228 self._step.msg.text.append(text) | |
| 229 | |
| 230 @_check_annotation_changed | |
| 231 def add_step_summary_text(self, text): | |
| 232 self._step.msg.text.insert(0, text) | |
| 233 self._summary_text = text | |
| 234 | |
| 235 @_check_annotation_changed | |
| 236 def add_step_link(self, name, url): | |
| 237 self._step.msg.other_links.add(label=name, url=url) | |
| 238 | |
| 239 def reset_subannotation_state(self): | |
| 240 pass | |
| 241 | |
| 242 @_check_annotation_changed | |
| 243 def set_step_status(self, status): | |
| 244 if status == 'SUCCESS': | |
| 245 self._step.msg.status = pb.SUCCESS | |
| 246 elif status == 'WARNING': | |
| 247 self._step.msg.status = pb.SUCCESS | |
| 248 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL | |
| 249 elif status == 'FAILURE': | |
| 250 self._step.msg.status = pb.FAILURE | |
| 251 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL | |
| 252 elif status == 'EXCEPTION': | |
| 253 self._step.msg.status = pb.FAILURE | |
| 254 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION | |
| 255 else: | |
| 256 raise ValueError('Unknown status [%s]' % (status,)) | |
| 257 | |
| 258 @_check_annotation_changed | |
| 259 def set_build_property(self, key, value): | |
| 260 self._engine._anno.update_properties(key=value) | |
| 261 | |
| 262 def trigger(self, trigger_spec): | |
| 263 if self._engine._ignore_triggers: | |
| 264 return | |
| 265 raise NotImplementedError( | |
|
martiniss
2016/09/01 21:59:47
this will probably cause issues when you start rol
dnj
2016/09/07 17:54:57
Understood. Not much to be done here, though, asid
| |
| 266 'Stream-based triggering is not supported for LogDog. Please use ' | |
| 267 'a recipe module (e.g., buildbucket) directly for build scheduling.') | |
| 268 | |
| 269 | |
| 270 def __init__(self, client=None, streamserver_uri=None, name_base=None, | |
|
martiniss
2016/09/01 21:59:47
Can you document this? I'm not sure exactly what c
dnj
2016/09/07 17:54:57
Done.
| |
| 271 annotation_name=None, ignore_triggers=False, env=None): | |
| 272 self._client = client | |
| 273 self._streamserver_uri = streamserver_uri | |
| 274 self._name_base = _StreamName(name_base) | |
| 275 self._annotation_name = annotation_name or self.DEFAULT_ANNOTATION_NAME | |
|
martiniss
2016/09/01 21:59:47
What would be the reason for overriding this? I gu
dnj
2016/09/07 17:54:57
Really the idea is to let users fit this into thei
| |
| 276 self._ignore_triggers = ignore_triggers | |
| 277 self._env = env or _Environment.probe() | |
| 278 | |
| 279 self._astate = None | |
| 280 | |
| 281 self._streams = collections.OrderedDict() | |
| 282 | |
| 283 # For testing, allow our "now" to be overridden. | |
|
martiniss
2016/09/01 21:59:47
???
dnj
2016/09/07 17:54:57
Old comment, deleted.
| |
| 284 | |
| 285 def new_step_stream(self, step_config): | |
| 286 # TODO(dnj): In the current iteration, subannotations are NOT supported. | |
| 287 # In order to support them, they would have to be parsed out of the stream | |
| 288 # and converted into Milo Annotation protobuf. This is a non-trivial effort | |
| 289 # and may be a waste of time, as in a LogDog-enabled world, the component | |
| 290 # emitting sub-annotations would actually just create its own annotation | |
| 291 # stream and emit its own Milo protobuf. | |
| 292 # | |
| 293 # Components that emit subannotations and don't want to be converted to use | |
| 294 # LogDog streams could bootstrap themselves through Annotee and let it do | |
| 295 # the work. | |
| 296 # | |
| 297 # For now, though, we explicitly do NOT support LogDog running with | |
| 298 # subannotations enabled. | |
| 299 if step_config.allow_subannotations: | |
| 300 raise NotImplementedError('Subannotations are not supported with LogDog ' | |
| 301 'output.') | |
| 302 | |
| 303 stream = self.StepStream.create(self, self._astate.create_step(step_config)) | |
| 304 self._check() | |
| 305 return stream | |
| 306 | |
| 307 def open(self): | |
| 308 # Initialize our client, if ont is not provided. | |
|
martiniss
2016/09/01 21:59:47
typo
dnj
2016/09/07 17:54:57
Done.
| |
| 309 if self._client is None: | |
| 310 if self._streamserver_uri: | |
| 311 self._client = libs.logdog.stream.create(self._streamserver_uri) | |
| 312 else: | |
| 313 # Probe the stream client via Bootstrap. | |
| 314 bootstrap = libs.logdog.bootstrap.probe() | |
| 315 self._client = bootstrap.stream_client() | |
| 316 | |
| 317 annotation_stream_name = self._name_base.append(self._annotation_name) | |
| 318 self._annotation_stream = self._client.open_datagram( | |
| 319 str(annotation_stream_name), | |
| 320 content_type=ANNOTATION_CONTENT_TYPE) | |
| 321 | |
| 322 # Kick off our annotation transmission thread. | |
| 323 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream) | |
|
martiniss
2016/09/01 21:59:47
I don't understand why this says "thread", when we
dnj
2016/09/07 17:54:57
Old comment, and kind of. The annotation monitor u
| |
| 324 | |
| 325 # Initialize our open streams list. | |
| 326 self._streams.clear() | |
| 327 | |
| 328 # Initialize our annotation state. | |
| 329 self._astate = _AnnotationState.create(self._name_base, env=self._env) | |
| 330 self._astate.base.status = pb.RUNNING | |
| 331 self._set_timestamp(self._astate.base.started) | |
| 332 self._check() | |
| 333 | |
| 334 def close(self): | |
| 335 assert self._astate is not None, 'StreamEngine is not open.' | |
| 336 | |
| 337 # Shut down any outstanding streams that may not have been closed for | |
| 338 # whatever reason. | |
| 339 for s in reversed(self._streams.values()): | |
|
martiniss
2016/09/01 21:59:47
why reversed?
dnj
2016/09/07 17:54:57
Streams created later should be closed before stre
| |
| 340 s.close() | |
| 341 | |
| 342 # Close out our root Step. Manually check annotation state afterwards. | |
| 343 self._close_step(self._astate.base) | |
| 344 self._check() | |
| 345 | |
| 346 # Shut down our annotation monitor and close our annotation stream. | |
| 347 self._annotation_monitor.flush_and_join() | |
| 348 self._annotation_stream.close() | |
| 349 | |
| 350 # Clear our client and state. We are now closed. | |
| 351 self._streams.clear() | |
| 352 self._client = None | |
| 353 self._astate = None | |
| 354 | |
| 355 def _check(self): | |
| 356 if self._astate is None: | |
| 357 return | |
| 358 | |
| 359 step_data = self._astate.check() | |
| 360 if step_data is not None: | |
| 361 self._annotation_monitor.signal_update(step_data) | |
| 362 | |
| 363 def _set_timestamp(self, dst): | |
| 364 """Returns (timestamp_pb2.Timestamp): Containing the time from dt. | |
| 365 | |
| 366 Args: | |
| 367 dt (datetime.datetime): the time to load into the timestamp. | |
| 368 """ | |
| 369 dt = self._env.now | |
| 370 | |
| 371 # Convert to milliseconds from epoch. | |
| 372 v = (dt - _EPOCH).total_seconds() | |
| 373 | |
| 374 dst.seconds = int(v) | |
| 375 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos. | |
| 376 | |
| 377 def _close_step(self, step): | |
| 378 """Closes a step, and any open substeps, propagating status. | |
| 379 | |
| 380 If all of the substeps are already closed, this will do nothing. However, if | |
| 381 any are open, it will close them with an infra failure state. | |
| 382 | |
| 383 If any substeps failed, the failure will be propagated to step. | |
| 384 | |
| 385 Args: | |
| 386 step (pb.Step): The Step message to close. | |
| 387 """ | |
| 388 # Close any open substeps, in case some of them didn't close. | |
| 389 failed = [] | |
| 390 incomplete = [] | |
| 391 for sub in step.substep: | |
| 392 if not sub.HasField('step'): | |
| 393 # Not an embedded substep. | |
| 394 continue | |
| 395 | |
| 396 # Did this step actually complete? It should have, by now, so if it didn't | |
| 397 # we'll be reporting an infra failure in "step". | |
| 398 if sub.step.status not in (pb.SUCCESS, pb.FAILURE): | |
| 399 incomplete.append(sub.step) | |
| 400 | |
| 401 # Close this substep. This may be a no-op, if the substep is already | |
| 402 # closed. | |
| 403 self._close_step(sub.step) | |
| 404 | |
| 405 # If a substep failed, propagate its failure status to "step". | |
| 406 if sub.step.status == pb.FAILURE: | |
| 407 failed.append(sub.step) | |
| 408 | |
| 409 # If we had any incomplete steps, mark that we failed. | |
| 410 if incomplete: | |
| 411 step.status = pb.FAILURE | |
| 412 if step.failure_details is None: | |
| 413 step.failure_details = pb.FailureDetails( | |
| 414 type=pb.FailureDetails.INFRA, | |
| 415 text='Some substeps did not complete: %s' % ( | |
| 416 ', '.join(s.name for s in incomplete)), | |
| 417 ) | |
| 418 elif failed: | |
| 419 step.status = pb.FAILURE | |
| 420 if step.failure_details is None: | |
| 421 # This step didn't successfully close, so propagate an infra failure. | |
| 422 step.failure_details = pb.FailureDetails( | |
| 423 type=pb.FailureDetails.GENERAL, | |
| 424 text='Some substeps failed: %s' % ( | |
| 425 ', '.join(s.name for s in failed)), | |
| 426 ) | |
| 427 | |
| 428 # Now close "step". If it's still RUNNING, assume that it was successful. | |
| 429 if step.status == pb.RUNNING: | |
| 430 step.status = pb.SUCCESS | |
| 431 if not step.HasField('ended'): | |
| 432 self._set_timestamp(step.ended) | |
| 433 | |
| 434 | |
| 435 | |
| 436 class _AnnotationMonitor(object): | |
|
martiniss
2016/09/01 21:59:47
Did you write this out of necessity, or before you
dnj
2016/09/07 17:54:57
Necessity. Sending a full annotation dump every it
| |
| 437 """The owner of the annotation datagram stream, sending annotation updates in | |
| 438 a controlled manner and buffering them when the content hasn't changed. | |
| 439 | |
| 440 By default, since annotation state can change rapidly, minor annotation | |
| 441 changes are throttled such that they are only actually sent periodically. | |
| 442 | |
| 443 New annotation state updates can be installed by calling `signal_update`. | |
| 444 After being started, the _AnnotationMonitor thread must be shut down by | |
| 445 calling its `flush_and_join` method. | |
| 446 """ | |
| 447 | |
| 448 # Flush interval for non-structural events. | |
| 449 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30) | |
| 450 | |
| 451 def __init__(self, fd, flush_period=None): | |
| 452 self._fd = fd | |
| 453 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD | |
| 454 | |
| 455 # The following group of variables is protected by "_lock". | |
| 456 self._lock = threading.Lock() | |
| 457 self._current_data = None | |
| 458 self._flush_timer = None | |
| 459 self._last_flush_time = None | |
| 460 self._last_flush_data = None | |
| 461 | |
| 462 def signal_update(self, step_data, structural=False): | |
| 463 """Updates the annotation state with new step data. | |
| 464 | |
| 465 This updates our state to include new step data. The annotation monitor | |
| 466 thread will pick this up and dispatch it, either: | |
| 467 - Eventually, when the flush period completes, or | |
| 468 - Immediately, if this is a structural change. | |
| 469 | |
| 470 Args: | |
| 471 step_data (str): The updated binary annotation protobuf step data. | |
| 472 structural (bool): If True, this is a structural update and should be | |
| 473 pushed immediately. | |
| 474 """ | |
| 475 with self._lock: | |
| 476 # Did our data actually change? | |
| 477 if step_data == self._last_flush_data: | |
| 478 # Nope, leave things as-is. | |
| 479 return | |
| 480 | |
| 481 # This is new data. Is it structural? If so, flush immediately. | |
| 482 # If not, make sure our timer is running so it will eventually be flushed. | |
| 483 # Note that the timer may also suggest that we flush immediately. | |
| 484 now = datetime.datetime.now() | |
| 485 self._current_data = step_data | |
| 486 if structural or self._set_flush_timer_locked(now): | |
| 487 # We should flush immediately. | |
| 488 self._flush_now_locked(now) | |
| 489 | |
| 490 def flush_and_join(self): | |
| 491 """Flushes any remaining updates and blocks until the monitor is complete. | |
| 492 """ | |
| 493 # Mark that we're finished and signal our event. | |
| 494 with self._lock: | |
| 495 self._flush_now_locked(datetime.datetime.now()) | |
| 496 | |
| 497 @property | |
| 498 def latest(self): | |
| 499 with self._lock: | |
| 500 return self._last_flush_data | |
| 501 | |
| 502 def _flush_now_locked(self, now): | |
| 503 # Clear any current flush timer. | |
| 504 self._clear_flush_timer_locked() | |
| 505 | |
| 506 # Record this flush. | |
| 507 # | |
| 508 # We set the last flush time to now because even if we don't actually send | |
| 509 # data, we have responded to the flush edge. | |
| 510 flush_data, self._current_data = self._current_data, None | |
| 511 self._last_flush_time = now | |
| 512 | |
| 513 # If the data hasn't changed since the last flush, then don't actually | |
| 514 # do anything. | |
| 515 if flush_data is None or flush_data == self._last_flush_data: | |
| 516 return | |
| 517 | |
| 518 self._last_flush_data = flush_data | |
| 519 self._fd.send(flush_data) | |
| 520 | |
| 521 def _clear_flush_timer_locked(self): | |
| 522 if self._flush_timer is not None: | |
| 523 self._flush_timer.cancel() | |
| 524 self._flush_timer = None | |
| 525 | |
| 526 def _set_flush_timer_locked(self, now): | |
| 527 if self._flush_timer is not None: | |
| 528 # Our flush timer is already running. | |
| 529 return False | |
| 530 | |
| 531 if self._last_flush_time is None: | |
| 532 # We have never flushed before, so flush immediately. | |
| 533 return True | |
| 534 | |
| 535 deadline = self._last_flush_time + self._flush_period | |
| 536 if deadline <= now: | |
| 537 # We're past our flush deadline, and should flush immediately. | |
| 538 return True | |
| 539 | |
| 540 # Start our flush timer. | |
| 541 self._flush_timer = threading.Timer((deadline - now).total_seconds(), | |
| 542 self._flush_timer_expired) | |
| 543 self._flush_timer.daemon = True | |
| 544 self._flush_timer.start() | |
| 545 | |
| 546 def _flush_timer_expired(self): | |
| 547 with self._lock: | |
| 548 self._flush_now_locked(datetime.datetime.now()) | |
| 549 | |
| 550 | |
| 551 class _AnnotationState(object): | |
| 552 """Manages an outer Milo annotation protobuf Step.""" | |
| 553 | |
| 554 Step = collections.namedtuple('Step', ( | |
| 555 'msg', 'stream_name_base', 'substream_name_index')) | |
| 556 | |
| 557 def __init__(self, base_step, stream_name_base): | |
| 558 self._base = self.Step( | |
| 559 msg=base_step, | |
| 560 stream_name_base=stream_name_base, | |
| 561 substream_name_index={}) | |
| 562 self._check_snapshot = None | |
| 563 | |
| 564 # The current step stack. This is built by updating state after new steps' | |
| 565 # nesting levels. | |
| 566 self._nest_stack = [self._base] | |
| 567 | |
| 568 # Index initial properties. | |
| 569 self._properties = dict((p.name, p) for p in self._base.msg.property) | |
| 570 | |
| 571 @classmethod | |
| 572 def create(cls, annotation_name, env=None, properties=None): | |
| 573 base = pb.Step() | |
| 574 base.name = 'steps' | |
| 575 base.status = pb.PENDING | |
| 576 if env: | |
| 577 if env.argv: | |
| 578 base.command.command_line.extend(env.argv) | |
| 579 if env.cwd: | |
| 580 base.command.cwd = env.cwd | |
| 581 if env.environ: | |
| 582 base.command.environ.update(env.environ) | |
| 583 if properties: | |
| 584 for k, v in sorted(properties.iteritems()): | |
| 585 base.property.add(name=k, value=v) | |
| 586 return cls(base, annotation_name) | |
| 587 | |
| 588 @property | |
| 589 def base(self): | |
| 590 return self._base.msg | |
| 591 | |
| 592 def check(self): | |
| 593 """Checks if the annotation state has been updated and, if so, returns it. | |
| 594 | |
| 595 After check returns, the latest annotation state will be used as the current | |
| 596 snapshot for future checks. | |
| 597 | |
| 598 Returns (str/None): A serialized binary Step protobuf if modified, None | |
| 599 otherwise. | |
| 600 """ | |
| 601 if self._check_snapshot is None or self._check_snapshot != self.base: | |
| 602 self._check_snapshot = copy.deepcopy(self.base) | |
| 603 return self._check_snapshot.SerializeToString() | |
| 604 return None | |
| 605 | |
| 606 def create_step(self, step_config): | |
| 607 # Identify our parent Step by examining the nesting level. The first step | |
| 608 # in the nest stack will always be the base (nesting level "-1", since it's | |
| 609 # the parent of level 0). Since the step's "nest_level" is one more than the | |
| 610 # parent, and we need to offset by 1 to reach the stack index, they cancel | |
| 611 # each other out, so the nest level is the same as the parent's stack index. | |
| 612 assert step_config.nest_level < len(self._nest_stack), ( | |
| 613 'Invalid nest level %d (highest is %d)' % ( | |
| 614 step_config.nest_level, len(self._nest_stack)-1)) | |
| 615 | |
| 616 # Clear any items in the nest stack that are deeper than the current | |
| 617 # element. | |
| 618 del(self._nest_stack[step_config.nest_level+1:]) | |
| 619 parent = self._nest_stack[-1] | |
| 620 | |
| 621 # Create a stream name for this step. Even though step names are unique, | |
| 622 # the normalized LogDog step name may overlap with a different step name. | |
| 623 # We keep track of the step names we've issued to this step space and | |
| 624 # add indexes if a conflict is identified. | |
| 625 stream_name_base = parent.stream_name_base.append('steps', | |
| 626 step_config.base_name) | |
| 627 index = parent.substream_name_index.setdefault(str(stream_name_base), 0) | |
| 628 parent.substream_name_index[str(stream_name_base)] += 1 | |
| 629 if index > 0: | |
| 630 stream_name_base += '_%d' % (index,) | |
| 631 | |
| 632 # Create and populate our new step. | |
| 633 msg = parent.msg.substep.add().step | |
| 634 msg.name = step_config.base_name | |
| 635 msg.status = pb.PENDING | |
| 636 if step_config.cmd: | |
| 637 msg.command.command_line.extend(step_config.cmd) | |
| 638 if step_config.cwd: | |
| 639 msg.command.cwd = step_config.cwd | |
| 640 if step_config.env: | |
| 641 msg.command.environ = step_config.env | |
| 642 | |
| 643 step = self.Step( | |
| 644 msg=msg, | |
| 645 stream_name_base=stream_name_base, | |
| 646 substream_name_index={}) | |
| 647 self._nest_stack.append(step) | |
| 648 return step | |
| 649 | |
| 650 def update_properties(self, **kwargs): | |
| 651 """Updates a set's property values to incorporate kwargs.""" | |
| 652 for k, v in sorted(kwargs.iteritems()): | |
| 653 cur = self._properties.get(k) | |
| 654 if cur is None: | |
| 655 cur = self.base.property.add(name=k, value=str(v)) | |
| 656 self._properties[k] = cur | |
| 657 continue | |
| 658 | |
| 659 # A Property message already exists for this key, so update its value. | |
| 660 if cur.value != v: | |
| 661 cur.value = str(v) | |
| 662 | |
| 663 | |
| 664 class _StreamName(object): | |
| 665 """An immutable validated wrapper for a LogDog stream name.""" | |
| 666 | |
| 667 def __init__(self, base): | |
| 668 if base is not None: | |
| 669 libs.logdog.streamname.validate_stream_name(base) | |
| 670 self._base = base | |
| 671 | |
| 672 def append(self, *components): | |
| 673 """Returns (_StreamName): A new _StreamName instance with components added. | |
| 674 | |
| 675 Each component in "components" will become a new normalized stream name | |
| 676 component. Conseqeuntly, any separators (/) in the components will be | |
| 677 replaced with underscores. | |
| 678 | |
| 679 Args: | |
| 680 components: the path components to append to this _StreamName. | |
| 681 """ | |
| 682 if len(components) == 0: | |
| 683 return self | |
| 684 | |
| 685 components = [self._normalize(self._flatten(p)) | |
| 686 for p in reversed(components)] | |
| 687 if self._base: | |
| 688 components.append(self._base) | |
| 689 return type(self)('/'.join(reversed(components))) | |
| 690 | |
| 691 def augment(self, val): | |
| 692 """Returns (_StreamName): A new _StreamName with "val" appended. | |
| 693 | |
| 694 This generates a new, normalized _StreamName with the contents of "val" | |
| 695 appended to the end. For example: | |
| 696 | |
| 697 Original: "foo/bar" | |
| 698 Append "baz qux": "foo/barbaz_qux" | |
| 699 """ | |
| 700 if not val: | |
| 701 return self | |
| 702 val = self._flatten(val) | |
| 703 if self._base: | |
| 704 val = self._base + val | |
| 705 return type(self)(self._normalize(val)) | |
| 706 | |
| 707 def __iadd__(self, val): | |
| 708 return self.augment(val) | |
| 709 | |
| 710 @staticmethod | |
| 711 def _flatten(v): | |
| 712 return v.replace('/', '_') | |
| 713 | |
| 714 @staticmethod | |
| 715 def _normalize(v): | |
| 716 return libs.logdog.streamname.normalize(v, prefix='s_') | |
| 717 | |
| 718 def __str__(self): | |
| 719 if not self._base: | |
| 720 raise ValueError('Cannot generate string from empty StreamName.') | |
| 721 return self._base | |
| OLD | NEW |