Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 # that can be found in the LICENSE file. | |
| 4 | |
| 5 """stream.StreamEngine implementation for LogDog, using Milo annotation | |
| 6 protobuf. | |
| 7 """ | |
| 8 | |
| 9 import collections | |
| 10 import contextlib | |
| 11 import copy | |
| 12 import datetime | |
| 13 import os | |
| 14 import threading | |
| 15 import sys | |
| 16 | |
| 17 from . import env | |
| 18 from . import stream | |
| 19 from . import util | |
| 20 | |
| 21 import google.protobuf.message | |
| 22 import google.protobuf.timestamp_pb2 as timestamp_pb2 | |
| 23 import libs.logdog.bootstrap | |
| 24 import libs.logdog.stream | |
| 25 import libs.logdog.streamname | |
| 26 import annotations_pb2 as pb | |
| 27 | |
| 28 | |
| 29 # The datetime for the epoch. | |
| 30 _EPOCH = datetime.datetime.utcfromtimestamp(0) | |
| 31 | |
| 32 # The annotation stream ContentType. | |
| 33 # | |
| 34 # This must match the ContentType for the annotation binary protobuf, which | |
| 35 # is specified in "<luci-go>/common/proto/milo/util.go". | |
| 36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2' | |
| 37 | |
| 38 | |
| 39 class _Environment(object): | |
| 40 """Simulated system environment. The StreamEngine uses this to probe for | |
| 41 system parameters. By default, the environment will be derived from the | |
| 42 actual system. | |
| 43 """ | |
| 44 | |
| 45 def __init__(self, now_fn, argv, environ, cwd): | |
| 46 self._now_fn = now_fn | |
| 47 self.argv = argv | |
| 48 self.environ = environ | |
| 49 self.cwd = cwd | |
| 50 | |
| 51 @property | |
| 52 def now(self): | |
| 53 return self._now_fn() | |
| 54 | |
| 55 @classmethod | |
| 56 def probe(cls): | |
| 57 return cls( | |
| 58 now_fn=datetime.datetime.now, | |
| 59 argv=sys.argv[:], | |
| 60 environ=dict(os.environ), | |
| 61 cwd=os.getcwd(), | |
| 62 ) | |
| 63 | |
| 64 | |
| 65 class StreamEngine(stream.StreamEngine): | |
| 66 """A stream.StreamEngine implementation that uses Logdog streams and Milo | |
| 67 annotation protobufs. | |
| 68 | |
| 69 The generated LogDog streams will be (relative to "name_base"): | |
| 70 /annotations | |
| 71 The base annotations stream. | |
| 72 | |
| 73 /steps/<step_name>/ | |
| 74 Base stream name for a given step. Note that if multiple steps normalize | |
| 75 to the same <step_name> value, an index will be appended, such as | |
| 76 <step_name>_0. This can happen because the stream name component of a | |
| 77 step is normalized, so two validly-independent steps ("My Thing" and | |
| 78 "My_Thing") will both normalize to "My_Thing". In this case, the second | |
| 79 one would have the stream name component, "My_Thing_1". | |
| 80 | |
| 81 /steps/<step_name>/stdout | |
| 82 STDOUT stream for step "<step_name>". | |
| 83 /steps/<step_name>/stderr | |
| 84 STDOUT stream for step "<step_name>". | |
| 85 | |
| 86 /steps/<step_name>/logs/<log_name>/<log_name_index> | |
| 87 Stream name for a given step's logs. <log_name_index> is the index of the | |
| 88 log with the given normalized name. This is similar to <step_name>, only | |
| 89 the index is added as a separate stream name component. | |
| 90 """ | |
| 91 | |
| 92 # The name of the annotation stream. | |
| 93 ANNOTATION_NAME = 'annotations' | |
| 94 | |
| 95 | |
| 96 class TextStream(stream.StreamEngine.Stream): | |
| 97 | |
| 98 def __init__(self, fd): | |
| 99 super(StreamEngine.TextStream, self).__init__() | |
| 100 self._fd = fd | |
| 101 | |
| 102 ## | |
| 103 # Implement stream.StreamEngine.Stream | |
| 104 ## | |
| 105 | |
| 106 def write_line(self, line): | |
| 107 self._fd.write(line) | |
| 108 self._fd.write('\n') | |
| 109 | |
| 110 def write_split(self, string): | |
| 111 self._fd.write(string) | |
| 112 if not string.endswith('\n'): | |
| 113 self._fd.write('\n') | |
| 114 | |
| 115 def close(self): | |
| 116 self._fd.close() | |
| 117 | |
| 118 | |
| 119 class StepStream(stream.StreamEngine.StepStream): | |
| 120 """An individual step stream.""" | |
| 121 | |
| 122 def __init__(self, engine, step): | |
| 123 # We will lazily create the STDOUT stream when the first data is written. | |
| 124 super(StreamEngine.StepStream, self).__init__() | |
| 125 | |
| 126 self._engine = engine | |
| 127 self._step = step | |
| 128 | |
| 129 # We keep track of the log streams associated with this step. | |
| 130 self._log_stream_index = {} | |
| 131 | |
| 132 # We will lazily instantiate our stdout stream when content is actually | |
| 133 # written to it. | |
| 134 self._stdout_stream = None | |
| 135 | |
| 136 # The retained step summary text. When generating failure details, this | |
| 137 # will be consumed to populate their text field. | |
| 138 self._summary_text = None | |
| 139 | |
| 140 @classmethod | |
| 141 def create(cls, engine, step): | |
| 142 stream = cls(engine, step) | |
| 143 | |
| 144 # Start our step. | |
| 145 stream._step.msg.status = pb.RUNNING | |
| 146 engine._set_timestamp(stream._step.msg.started) | |
| 147 | |
| 148 return stream | |
| 149 | |
| 150 def _check_annotation_changed(fn): | |
|
martiniss
2016/09/27 19:24:48
Doesn't pylint complain about this?
dnj
2016/09/27 23:13:58
Hmm, it does. I'll move it out. I have run pylint
martiniss
2016/09/30 20:41:44
Yeah, it isn't :(
| |
| 151 """Decorator that can be applied to a StepStream instance method to call | |
| 152 our bound Engine's _check after the function is finished. | |
| 153 | |
| 154 This should decorate any method that modifies annotation state. | |
| 155 """ | |
| 156 def check_after(inner, *args, **kwargs): | |
| 157 v = fn(inner, *args, **kwargs) | |
| 158 inner._engine._check() | |
| 159 return v | |
| 160 return check_after | |
| 161 | |
| 162 @_check_annotation_changed | |
| 163 def _get_stdout(self): | |
| 164 if self._stdout_stream is None: | |
| 165 # Create a new STDOUT text stream. | |
| 166 stream_name = self._step.stream_name_base.append('stdout') | |
| 167 self._stdout_stream = self._engine._client.open_text(str(stream_name)) | |
| 168 | |
| 169 self._step.msg.stdout_stream.name = str(stream_name) | |
| 170 return self._stdout_stream | |
| 171 | |
| 172 ## | |
| 173 # Implement stream.StreamEngine.Stream | |
| 174 ## | |
| 175 | |
| 176 def write_line(self, line): | |
| 177 stdout = self._get_stdout() | |
| 178 stdout.write(line) | |
| 179 stdout.write('\n') | |
| 180 | |
| 181 def write_split(self, string): | |
| 182 stdout = self._get_stdout() | |
| 183 stdout.write(string) | |
| 184 if not string.endswith('\n'): | |
| 185 stdout.write('\n') | |
| 186 | |
| 187 @_check_annotation_changed | |
| 188 def close(self): | |
| 189 if self._stdout_stream is not None: | |
| 190 self._stdout_stream.close() | |
| 191 | |
| 192 # If we still have retained summary text, a failure, and no failure detail | |
| 193 # text, copy it there. | |
| 194 if self._summary_text is not None: | |
| 195 if (self._step.msg.HasField('failure_details') and | |
| 196 not self._step.msg.failure_details.text): | |
| 197 self._step.msg.failure_details.text = self._summary_text | |
| 198 | |
| 199 # Close our Step. | |
| 200 self._engine._close_step(self._step.msg) | |
| 201 | |
| 202 ## | |
| 203 # Implement stream.StreamEngine.StepStream | |
| 204 ## | |
| 205 | |
| 206 @_check_annotation_changed | |
| 207 def new_log_stream(self, log_name): | |
| 208 # Generate the base normalized log stream name for this log. | |
| 209 stream_name = self._step.stream_name_base.append('logs', log_name) | |
| 210 | |
| 211 # Add the log stream index to the end of the stream name. | |
| 212 index = self._log_stream_index.setdefault(str(stream_name), 0) | |
| 213 self._log_stream_index[str(stream_name)] = index + 1 | |
| 214 stream_name = stream_name.append(str(index)) | |
| 215 | |
| 216 # Create a new log stream for this name. | |
| 217 fd = self._engine._client.open_text(str(stream_name)) | |
| 218 | |
| 219 # Update our step to include the log stream. | |
| 220 link = self._step.msg.other_links.add(label=log_name) | |
| 221 link.logdog_stream.name = str(stream_name) | |
| 222 | |
| 223 return self._engine.TextStream(fd) | |
| 224 | |
| 225 @_check_annotation_changed | |
| 226 def add_step_text(self, text): | |
| 227 self._step.msg.text.append(text) | |
| 228 | |
| 229 @_check_annotation_changed | |
| 230 def add_step_summary_text(self, text): | |
| 231 self._step.msg.text.insert(0, text) | |
| 232 self._summary_text = text | |
| 233 | |
| 234 @_check_annotation_changed | |
| 235 def add_step_link(self, name, url): | |
| 236 self._step.msg.other_links.add(label=name, url=url) | |
| 237 | |
| 238 def reset_subannotation_state(self): | |
| 239 pass | |
| 240 | |
| 241 @_check_annotation_changed | |
| 242 def set_step_status(self, status): | |
| 243 if status == 'SUCCESS': | |
| 244 self._step.msg.status = pb.SUCCESS | |
| 245 elif status == 'WARNING': | |
| 246 self._step.msg.status = pb.SUCCESS | |
| 247 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL | |
| 248 elif status == 'FAILURE': | |
| 249 self._step.msg.status = pb.FAILURE | |
| 250 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL | |
| 251 elif status == 'EXCEPTION': | |
| 252 self._step.msg.status = pb.FAILURE | |
| 253 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION | |
| 254 else: | |
| 255 raise ValueError('Unknown status [%s]' % (status,)) | |
| 256 | |
| 257 @_check_annotation_changed | |
| 258 def set_build_property(self, key, value): | |
| 259 self._engine._anno.update_properties(key=value) | |
| 260 | |
| 261 def trigger(self, trigger_spec): | |
| 262 if self._engine._ignore_triggers: | |
| 263 return | |
| 264 raise NotImplementedError( | |
| 265 'Stream-based triggering is not supported for LogDog. Please use ' | |
| 266 'a recipe module (e.g., buildbucket) directly for build scheduling.') | |
| 267 | |
| 268 | |
| 269 def __init__(self, client=None, streamserver_uri=None, name_base=None, | |
| 270 ignore_triggers=False, env=None): | |
| 271 """Initializes a new LogDog/Annotation StreamEngine. | |
| 272 | |
| 273 Args: | |
| 274 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | |
| 275 to use. If this is None, a new StreamClient will be instantiated when | |
| 276 this StreamEngine is opened. | |
| 277 streamserver_uri (str or None): The LogDog Butler stream server URI. See | |
| 278 LogDog client library docs for details on supported protocols and | |
| 279 format. This will only be used when "client" is None. If this is also | |
| 280 None, a StreamClient will be created through probing. | |
| 281 name_base (str or None): The default stream name prefix that will be added | |
| 282 to generated LogDog stream names. If None, no prefix will be applied. | |
| 283 ignore_triggers (bool): Triggers are not supported in LogDog annotation | |
| 284 streams. If True, attempts to trigger will be silently ignored. If | |
| 285 False, they will cause a NotImplementedError to be raised. | |
| 286 env (_Environment or None): The _Environment instance to use for | |
| 287 operations. This will be None at production, but can be overridden | |
| 288 here for testing. | |
| 289 """ | |
| 290 | |
| 291 self._client = client | |
| 292 self._streamserver_uri = streamserver_uri | |
| 293 self._name_base = _StreamName(name_base) | |
| 294 self._ignore_triggers = ignore_triggers | |
| 295 self._env = env or _Environment.probe() | |
| 296 | |
| 297 self._astate = None | |
| 298 | |
| 299 self._streams = collections.OrderedDict() | |
| 300 | |
| 301 def new_step_stream(self, step_config): | |
| 302 # TODO(dnj): In the current iteration, subannotations are NOT supported. | |
|
martiniss
2016/09/27 19:24:48
The subannotation feature is used by chromium I th
dnj
2016/09/27 23:13:58
Used by CrOS too.
| |
| 303 # In order to support them, they would have to be parsed out of the stream | |
| 304 # and converted into Milo Annotation protobuf. This is a non-trivial effort | |
| 305 # and may be a waste of time, as in a LogDog-enabled world, the component | |
| 306 # emitting sub-annotations would actually just create its own annotation | |
| 307 # stream and emit its own Milo protobuf. | |
| 308 # | |
| 309 # Components that emit subannotations and don't want to be converted to use | |
| 310 # LogDog streams could bootstrap themselves through Annotee and let it do | |
| 311 # the work. | |
| 312 # | |
| 313 # For now, though, we explicitly do NOT support LogDog running with | |
| 314 # subannotations enabled. | |
| 315 if step_config.allow_subannotations: | |
| 316 raise NotImplementedError('Subannotations are not supported with LogDog ' | |
| 317 'output.') | |
| 318 | |
| 319 stream = self.StepStream.create(self, self._astate.create_step(step_config)) | |
| 320 self._check() | |
| 321 return stream | |
| 322 | |
| 323 def open(self): | |
| 324 # Initialize our client, if one is not provided. | |
| 325 if self._client is None: | |
| 326 if self._streamserver_uri: | |
| 327 self._client = libs.logdog.stream.create(self._streamserver_uri) | |
| 328 else: | |
| 329 # Probe the stream client via Bootstrap. | |
| 330 bootstrap = libs.logdog.bootstrap.probe() | |
| 331 self._client = bootstrap.stream_client() | |
| 332 | |
| 333 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME) | |
| 334 self._annotation_stream = self._client.open_datagram( | |
| 335 str(annotation_stream_name), | |
| 336 content_type=ANNOTATION_CONTENT_TYPE) | |
| 337 | |
| 338 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream) | |
| 339 | |
| 340 # Initialize our open streams list. | |
| 341 self._streams.clear() | |
| 342 | |
| 343 # Initialize our annotation state. | |
| 344 self._astate = _AnnotationState.create(self._name_base, env=self._env) | |
| 345 self._astate.base.status = pb.RUNNING | |
| 346 self._set_timestamp(self._astate.base.started) | |
| 347 self._check() | |
| 348 | |
| 349 def close(self): | |
| 350 assert self._astate is not None, 'StreamEngine is not open.' | |
| 351 | |
| 352 # Shut down any outstanding streams that may not have been closed for | |
| 353 # whatever reason. | |
| 354 for s in reversed(self._streams.values()): | |
| 355 s.close() | |
| 356 | |
| 357 # Close out our root Step. Manually check annotation state afterwards. | |
| 358 self._close_step(self._astate.base) | |
| 359 self._check() | |
| 360 | |
| 361 # Shut down our annotation monitor and close our annotation stream. | |
| 362 self._annotation_monitor.flush_and_join() | |
| 363 self._annotation_stream.close() | |
| 364 | |
| 365 # Clear our client and state. We are now closed. | |
| 366 self._streams.clear() | |
| 367 self._client = None | |
| 368 self._astate = None | |
| 369 | |
| 370 def _check(self): | |
| 371 if self._astate is None: | |
| 372 return | |
| 373 | |
| 374 step_data = self._astate.check() | |
| 375 if step_data is not None: | |
| 376 self._annotation_monitor.signal_update(step_data) | |
| 377 | |
| 378 def _set_timestamp(self, dst): | |
|
martiniss
2016/09/27 19:24:48
I don't understand this function. What's the point
dnj
2016/09/27 23:13:58
It loads data into the protobuf. I've updated the
| |
| 379 """Returns (timestamp_pb2.Timestamp): Containing the time from dt. | |
| 380 | |
| 381 Args: | |
| 382 dt (datetime.datetime): the time to load into the timestamp. | |
|
martiniss
2016/09/27 19:24:48
Args are wrong.
dnj
2016/09/27 23:13:58
Acknowledged.
| |
| 383 """ | |
| 384 dt = self._env.now | |
| 385 | |
| 386 # Convert to milliseconds from epoch. | |
| 387 v = (dt - _EPOCH).total_seconds() | |
| 388 | |
| 389 dst.seconds = int(v) | |
| 390 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos. | |
| 391 | |
| 392 def _close_step(self, step): | |
| 393 """Closes a step, and any open substeps, propagating status. | |
| 394 | |
| 395 If all of the substeps are already closed, this will do nothing. However, if | |
| 396 any are open, it will close them with an infra failure state. | |
| 397 | |
| 398 If any substeps failed, the failure will be propagated to step. | |
| 399 | |
| 400 Args: | |
| 401 step (pb.Step): The Step message to close. | |
| 402 """ | |
| 403 # Close any open substeps, in case some of them didn't close. | |
| 404 failed = [] | |
| 405 incomplete = [] | |
| 406 for sub in step.substep: | |
| 407 if not sub.HasField('step'): | |
| 408 # Not an embedded substep. | |
| 409 continue | |
| 410 | |
| 411 # Did this step actually complete? It should have, by now, so if it didn't | |
| 412 # we'll be reporting an infra failure in "step". | |
| 413 if sub.step.status not in (pb.SUCCESS, pb.FAILURE): | |
| 414 incomplete.append(sub.step) | |
| 415 | |
| 416 # Close this substep. This may be a no-op, if the substep is already | |
| 417 # closed. | |
| 418 self._close_step(sub.step) | |
| 419 | |
| 420 # If a substep failed, propagate its failure status to "step". | |
| 421 if sub.step.status == pb.FAILURE: | |
| 422 failed.append(sub.step) | |
| 423 | |
| 424 # If we had any incomplete steps, mark that we failed. | |
| 425 if incomplete: | |
| 426 step.status = pb.FAILURE | |
| 427 if step.failure_details is None: | |
| 428 step.failure_details = pb.FailureDetails( | |
| 429 type=pb.FailureDetails.INFRA, | |
| 430 text='Some substeps did not complete: %s' % ( | |
| 431 ', '.join(s.name for s in incomplete)), | |
| 432 ) | |
| 433 elif failed: | |
| 434 step.status = pb.FAILURE | |
| 435 if step.failure_details is None: | |
| 436 # This step didn't successfully close, so propagate an infra failure. | |
| 437 step.failure_details = pb.FailureDetails( | |
| 438 type=pb.FailureDetails.GENERAL, | |
| 439 text='Some substeps failed: %s' % ( | |
| 440 ', '.join(s.name for s in failed)), | |
| 441 ) | |
| 442 | |
| 443 # Now close "step". If it's still RUNNING, assume that it was successful. | |
| 444 if step.status == pb.RUNNING: | |
| 445 step.status = pb.SUCCESS | |
| 446 if not step.HasField('ended'): | |
| 447 self._set_timestamp(step.ended) | |
| 448 | |
| 449 | |
| 450 | |
| 451 class _AnnotationMonitor(object): | |
| 452 """The owner of the annotation datagram stream, sending annotation updates in | |
| 453 a controlled manner and buffering them when the content hasn't changed. | |
| 454 | |
| 455 By default, since annotation state can change rapidly, minor annotation | |
| 456 changes are throttled such that they are only actually sent periodically. | |
| 457 | |
| 458 New annotation state updates can be installed by calling `signal_update`. | |
| 459 After being started, the _AnnotationMonitor thread must be shut down by | |
| 460 calling its `flush_and_join` method. | |
| 461 """ | |
| 462 | |
| 463 # Flush interval for non-structural events. | |
| 464 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30) | |
| 465 | |
| 466 def __init__(self, fd, flush_period=None): | |
| 467 self._fd = fd | |
| 468 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD | |
| 469 | |
| 470 # The following group of variables is protected by "_lock". | |
| 471 self._lock = threading.Lock() | |
| 472 self._current_data = None | |
| 473 self._flush_timer = None | |
| 474 self._last_flush_time = None | |
| 475 self._last_flush_data = None | |
| 476 | |
| 477 def signal_update(self, step_data, structural=False): | |
| 478 """Updates the annotation state with new step data. | |
| 479 | |
| 480 This updates our state to include new step data. The annotation monitor | |
| 481 thread will pick this up and dispatch it, either: | |
| 482 - Eventually, when the flush period completes, or | |
| 483 - Immediately, if this is a structural change. | |
| 484 | |
| 485 Args: | |
| 486 step_data (str): The updated binary annotation protobuf step data. | |
| 487 structural (bool): If True, this is a structural update and should be | |
| 488 pushed immediately. | |
| 489 """ | |
| 490 with self._lock: | |
| 491 # Did our data actually change? | |
| 492 if step_data == self._last_flush_data: | |
| 493 # Nope, leave things as-is. | |
| 494 return | |
| 495 | |
| 496 # This is new data. Is it structural? If so, flush immediately. | |
| 497 # If not, make sure our timer is running so it will eventually be flushed. | |
| 498 # Note that the timer may also suggest that we flush immediately. | |
| 499 now = datetime.datetime.now() | |
| 500 self._current_data = step_data | |
| 501 if structural or self._set_flush_timer_locked(now): | |
| 502 # We should flush immediately. | |
| 503 self._flush_now_locked(now) | |
| 504 | |
| 505 def flush_and_join(self): | |
| 506 """Flushes any remaining updates and blocks until the monitor is complete. | |
| 507 """ | |
| 508 # Mark that we're finished and signal our event. | |
| 509 with self._lock: | |
| 510 self._flush_now_locked(datetime.datetime.now()) | |
| 511 | |
| 512 @property | |
| 513 def latest(self): | |
| 514 with self._lock: | |
| 515 return self._last_flush_data | |
| 516 | |
| 517 def _flush_now_locked(self, now): | |
| 518 # Clear any current flush timer. | |
| 519 self._clear_flush_timer_locked() | |
| 520 | |
| 521 # Record this flush. | |
| 522 # | |
| 523 # We set the last flush time to now because even if we don't actually send | |
| 524 # data, we have responded to the flush edge. | |
| 525 flush_data, self._current_data = self._current_data, None | |
| 526 self._last_flush_time = now | |
| 527 | |
| 528 # If the data hasn't changed since the last flush, then don't actually | |
| 529 # do anything. | |
| 530 if flush_data is None or flush_data == self._last_flush_data: | |
|
martiniss
2016/09/27 19:24:48
This means that duplicate data entries get ignored
dnj
2016/09/27 23:13:58
Probably not frequently, but no harm in checking.
| |
| 531 return | |
| 532 | |
| 533 self._last_flush_data = flush_data | |
| 534 self._fd.send(flush_data) | |
| 535 | |
| 536 def _clear_flush_timer_locked(self): | |
| 537 if self._flush_timer is not None: | |
| 538 self._flush_timer.cancel() | |
| 539 self._flush_timer = None | |
| 540 | |
| 541 def _set_flush_timer_locked(self, now): | |
| 542 if self._flush_timer is not None: | |
| 543 # Our flush timer is already running. | |
| 544 return False | |
| 545 | |
| 546 if self._last_flush_time is None: | |
| 547 # We have never flushed before, so flush immediately. | |
| 548 return True | |
| 549 | |
| 550 deadline = self._last_flush_time + self._flush_period | |
| 551 if deadline <= now: | |
| 552 # We're past our flush deadline, and should flush immediately. | |
| 553 return True | |
| 554 | |
| 555 # Start our flush timer. | |
| 556 self._flush_timer = threading.Timer((deadline - now).total_seconds(), | |
| 557 self._flush_timer_expired) | |
| 558 self._flush_timer.daemon = True | |
| 559 self._flush_timer.start() | |
| 560 | |
| 561 def _flush_timer_expired(self): | |
| 562 with self._lock: | |
| 563 self._flush_now_locked(datetime.datetime.now()) | |
| 564 | |
| 565 | |
| 566 class _AnnotationState(object): | |
| 567 """Manages an outer Milo annotation protobuf Step.""" | |
| 568 | |
| 569 Step = collections.namedtuple('Step', ( | |
| 570 'msg', 'stream_name_base', 'substream_name_index')) | |
| 571 | |
| 572 def __init__(self, base_step, stream_name_base): | |
| 573 self._base = self.Step( | |
| 574 msg=base_step, | |
| 575 stream_name_base=stream_name_base, | |
| 576 substream_name_index={}) | |
| 577 self._check_snapshot = None | |
| 578 | |
| 579 # The current step stack. This is built by updating state after new steps' | |
| 580 # nesting levels. | |
| 581 self._nest_stack = [self._base] | |
| 582 | |
| 583 # Index initial properties. | |
| 584 self._properties = dict((p.name, p) for p in self._base.msg.property) | |
| 585 | |
| 586 @classmethod | |
| 587 def create(cls, stream_name_base, env=None, properties=None): | |
| 588 base = pb.Step() | |
| 589 base.name = 'steps' | |
| 590 base.status = pb.PENDING | |
| 591 if env: | |
| 592 if env.argv: | |
| 593 base.command.command_line.extend(env.argv) | |
| 594 if env.cwd: | |
| 595 base.command.cwd = env.cwd | |
| 596 if env.environ: | |
| 597 base.command.environ.update(env.environ) | |
| 598 if properties: | |
| 599 for k, v in sorted(properties.iteritems()): | |
| 600 base.property.add(name=k, value=v) | |
| 601 return cls(base, stream_name_base) | |
| 602 | |
| 603 @property | |
| 604 def base(self): | |
| 605 return self._base.msg | |
| 606 | |
| 607 def check(self): | |
| 608 """Checks if the annotation state has been updated and, if so, returns it. | |
| 609 | |
| 610 After check returns, the latest annotation state will be used as the current | |
| 611 snapshot for future checks. | |
| 612 | |
| 613 Returns (str/None): A serialized binary Step protobuf if modified, None | |
| 614 otherwise. | |
| 615 """ | |
| 616 if self._check_snapshot is None or self._check_snapshot != self.base: | |
| 617 self._check_snapshot = copy.deepcopy(self.base) | |
| 618 return self._check_snapshot.SerializeToString() | |
| 619 return None | |
| 620 | |
| 621 def create_step(self, step_config): | |
| 622 # Identify our parent Step by examining the nesting level. The first step | |
| 623 # in the nest stack will always be the base (nesting level "-1", since it's | |
| 624 # the parent of level 0). Since the step's "nest_level" is one more than the | |
| 625 # parent, and we need to offset by 1 to reach the stack index, they cancel | |
| 626 # each other out, so the nest level is the same as the parent's stack index. | |
| 627 assert step_config.nest_level < len(self._nest_stack), ( | |
| 628 'Invalid nest level %d (highest is %d)' % ( | |
| 629 step_config.nest_level, len(self._nest_stack)-1)) | |
| 630 | |
| 631 # Clear any items in the nest stack that are deeper than the current | |
| 632 # element. | |
| 633 del(self._nest_stack[step_config.nest_level+1:]) | |
| 634 parent = self._nest_stack[-1] | |
| 635 | |
| 636 # Create a stream name for this step. Even though step names are unique, | |
| 637 # the normalized LogDog step name may overlap with a different step name. | |
| 638 # We keep track of the step names we've issued to this step space and | |
| 639 # add indexes if a conflict is identified. | |
| 640 stream_name_base = parent.stream_name_base.append('steps', | |
| 641 step_config.base_name) | |
| 642 index = parent.substream_name_index.setdefault(str(stream_name_base), 0) | |
| 643 parent.substream_name_index[str(stream_name_base)] += 1 | |
| 644 if index > 0: | |
| 645 stream_name_base += '_%d' % (index,) | |
| 646 | |
| 647 # Create and populate our new step. | |
| 648 msg = parent.msg.substep.add().step | |
| 649 msg.name = step_config.base_name | |
| 650 msg.status = pb.PENDING | |
| 651 if step_config.cmd: | |
| 652 msg.command.command_line.extend(step_config.cmd) | |
| 653 if step_config.cwd: | |
| 654 msg.command.cwd = step_config.cwd | |
| 655 if step_config.env: | |
| 656 msg.command.environ = step_config.env | |
| 657 | |
| 658 step = self.Step( | |
| 659 msg=msg, | |
| 660 stream_name_base=stream_name_base, | |
| 661 substream_name_index={}) | |
| 662 self._nest_stack.append(step) | |
| 663 return step | |
| 664 | |
| 665 def update_properties(self, **kwargs): | |
| 666 """Updates a set's property values to incorporate kwargs.""" | |
| 667 for k, v in sorted(kwargs.iteritems()): | |
| 668 cur = self._properties.get(k) | |
| 669 if cur is None: | |
| 670 cur = self.base.property.add(name=k, value=str(v)) | |
| 671 self._properties[k] = cur | |
| 672 continue | |
| 673 | |
| 674 # A Property message already exists for this key, so update its value. | |
| 675 if cur.value != v: | |
| 676 cur.value = str(v) | |
| 677 | |
| 678 | |
| 679 class _StreamName(object): | |
| 680 """An immutable validated wrapper for a LogDog stream name.""" | |
| 681 | |
| 682 def __init__(self, base): | |
| 683 if base is not None: | |
| 684 libs.logdog.streamname.validate_stream_name(base) | |
| 685 self._base = base | |
| 686 | |
| 687 def append(self, *components): | |
| 688 """Returns (_StreamName): A new _StreamName instance with components added. | |
| 689 | |
| 690 Each component in "components" will become a new normalized stream name | |
| 691 component. Conseqeuntly, any separators (/) in the components will be | |
| 692 replaced with underscores. | |
| 693 | |
| 694 Args: | |
| 695 components: the path components to append to this _StreamName. | |
| 696 """ | |
| 697 if len(components) == 0: | |
| 698 return self | |
| 699 | |
| 700 components = [self._normalize(self._flatten(p)) | |
| 701 for p in reversed(components)] | |
| 702 if self._base: | |
| 703 components.append(self._base) | |
| 704 return type(self)('/'.join(reversed(components))) | |
| 705 | |
| 706 def augment(self, val): | |
| 707 """Returns (_StreamName): A new _StreamName with "val" appended. | |
| 708 | |
| 709 This generates a new, normalized _StreamName with the contents of "val" | |
| 710 appended to the end. For example: | |
| 711 | |
| 712 Original: "foo/bar" | |
| 713 Append "baz qux": "foo/barbaz_qux" | |
| 714 """ | |
| 715 if not val: | |
| 716 return self | |
| 717 val = self._flatten(val) | |
| 718 if self._base: | |
| 719 val = self._base + val | |
| 720 return type(self)(self._normalize(val)) | |
| 721 | |
| 722 def __iadd__(self, val): | |
| 723 return self.augment(val) | |
| 724 | |
| 725 @staticmethod | |
| 726 def _flatten(v): | |
| 727 return v.replace('/', '_') | |
| 728 | |
| 729 @staticmethod | |
| 730 def _normalize(v): | |
| 731 return libs.logdog.streamname.normalize(v, prefix='s_') | |
| 732 | |
| 733 def __str__(self): | |
| 734 if not self._base: | |
| 735 raise ValueError('Cannot generate string from empty StreamName.') | |
| 736 return self._base | |
| OLD | NEW |