Index: recipe_engine/stream_logdog.py |
diff --git a/recipe_engine/stream_logdog.py b/recipe_engine/stream_logdog.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f7835ecc0c6d2e9fad2a27b2c15d015807163134 |
--- /dev/null |
+++ b/recipe_engine/stream_logdog.py |
@@ -0,0 +1,743 @@ |
+# Copyright 2015 The LUCI Authors. All rights reserved. |
+# Use of this source code is governed under the Apache License, Version 2.0 |
+# that can be found in the LICENSE file. |
+ |
+"""stream.StreamEngine implementation for LogDog, using Milo annotation |
+protobuf. |
+""" |
+ |
+import collections |
+import contextlib |
+import copy |
+import datetime |
+import os |
+import threading |
+import sys |
+ |
+from . import env |
+from . import stream |
+from . import util |
+ |
+import google.protobuf.message |
+import google.protobuf.timestamp_pb2 as timestamp_pb2 |
+import libs.logdog.bootstrap |
+import libs.logdog.stream |
+import libs.logdog.streamname |
+import annotations_pb2 as pb |
+ |
+ |
+# The datetime for the epoch. |
+_EPOCH = datetime.datetime.utcfromtimestamp(0) |
+ |
+# The annotation stream ContentType. |
+# |
+# This must match the ContentType for the annotation binary protobuf, which |
+# is specified in "<luci-go>/common/proto/milo/util.go". |
+ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2' |
+ |
+ |
+class _Environment(object): |
+ """Simulated system environment. The StreamEngine uses this to probe for |
+ system parameters. By default, the environment will be derived from the |
+ actual system. |
+ """ |
+ |
+ def __init__(self, now_fn, argv, environ, cwd): |
+ self._now_fn = now_fn |
+ self.argv = argv |
+ self.environ = environ |
+ self.cwd = cwd |
+ |
+ @property |
+ def now(self): |
+ return self._now_fn() |
+ |
+ @classmethod |
+ def probe(cls): |
+ return cls( |
+ now_fn=datetime.datetime.now, |
+ argv=sys.argv[:], |
+ environ=dict(os.environ), |
+ cwd=os.getcwd(), |
+ ) |
+ |
+ |
+def _check_annotation_changed(fn): |
+ """Decorator that can be applied to a StepStream instance method to call |
+ our bound Engine's _check after the function is finished. |
+ |
+ This should decorate any method that modifies annotation state. |
+ """ |
+ def check_after(inner, *args, **kwargs): |
+ v = fn(inner, *args, **kwargs) |
+ inner._engine._check() |
+ return v |
+ return check_after |
+ |
+ |
+class StreamEngine(stream.StreamEngine): |
+ """A stream.StreamEngine implementation that uses Logdog streams and Milo |
+ annotation protobufs. |
+ |
+ The generated LogDog streams will be (relative to "name_base"): |
+ /annotations |
+ The base annotations stream. |
+ |
+ /steps/<step_name>/ |
+ Base stream name for a given step. Note that if multiple steps normalize |
+ to the same <step_name> value, an index will be appended, such as |
+ <step_name>_0. This can happen because the stream name component of a |
+ step is normalized, so two validly-independent steps ("My Thing" and |
+ "My_Thing") will both normalize to "My_Thing". In this case, the second |
+ one would have the stream name component, "My_Thing_1". |
+ |
+ /steps/<step_name>/stdout |
+ STDOUT stream for step "<step_name>". |
+ /steps/<step_name>/stderr |
+ STDOUT stream for step "<step_name>". |
+ |
+ /steps/<step_name>/logs/<log_name>/<log_name_index> |
+ Stream name for a given step's logs. <log_name_index> is the index of the |
+ log with the given normalized name. This is similar to <step_name>, only |
+ the index is added as a separate stream name component. |
+ """ |
+ |
+ # The name of the annotation stream. |
+ ANNOTATION_NAME = 'annotations' |
+ |
+ |
+ class TextStream(stream.StreamEngine.Stream): |
+ |
+ def __init__(self, fd): |
+ super(StreamEngine.TextStream, self).__init__() |
+ self._fd = fd |
+ |
+ ## |
+ # Implement stream.StreamEngine.Stream |
+ ## |
+ |
+ def write_line(self, line): |
+ self._fd.write(line) |
+ self._fd.write('\n') |
+ |
+ def write_split(self, string): |
+ self._fd.write(string) |
+ if not string.endswith('\n'): |
+ self._fd.write('\n') |
+ |
+ def close(self): |
+ self._fd.close() |
+ |
+ |
+ class StepStream(stream.StreamEngine.StepStream): |
+ """An individual step stream.""" |
+ |
+ def __init__(self, engine, step): |
martiniss
2016/09/30 20:41:44
Can you document that step is an instance of Annot
dnj
2016/09/30 22:04:38
Done.
|
+ # We will lazily create the STDOUT stream when the first data is written. |
+ super(StreamEngine.StepStream, self).__init__() |
+ |
+ self._engine = engine |
+ self._step = step |
+ |
+ # We keep track of the log streams associated with this step. |
+ self._log_stream_index = {} |
+ |
+ # We will lazily instantiate our stdout stream when content is actually |
+ # written to it. |
+ self._stdout_stream = None |
+ |
+ # The retained step summary text. When generating failure details, this |
+ # will be consumed to populate their text field. |
+ self._summary_text = None |
+ |
+ @classmethod |
+ def create(cls, engine, step): |
+ strm = cls(engine, step) |
+ |
+ # Start our step. |
+ strm._step.msg.status = pb.RUNNING |
+ engine._set_timestamp(strm._step.msg.started) |
+ |
+ return strm |
+ |
+ @_check_annotation_changed |
+ def _get_stdout(self): |
+ if self._stdout_stream is None: |
+ # Create a new STDOUT text stream. |
+ stream_name = self._step.stream_name_base.append('stdout') |
+ self._stdout_stream = self._engine._client.open_text(str(stream_name)) |
martiniss
2016/09/30 20:41:44
You're accessing self._engine._client, which looks
dnj
2016/09/30 22:04:38
Yeah - this thing needs access to the client, and
|
+ |
+ self._step.msg.stdout_stream.name = str(stream_name) |
+ return self._stdout_stream |
+ |
+ ## |
+ # Implement stream.StreamEngine.Stream |
+ ## |
+ |
+ def write_line(self, line): |
+ stdout = self._get_stdout() |
+ stdout.write(line) |
+ stdout.write('\n') |
+ |
+ def write_split(self, string): |
+ stdout = self._get_stdout() |
+ stdout.write(string) |
+ if not string.endswith('\n'): |
+ stdout.write('\n') |
+ |
+ @_check_annotation_changed |
+ def close(self): |
+ if self._stdout_stream is not None: |
+ self._stdout_stream.close() |
+ |
+ # If we still have retained summary text, a failure, and no failure detail |
+ # text, copy it there. |
+ if self._summary_text is not None: |
+ if (self._step.msg.HasField('failure_details') and |
+ not self._step.msg.failure_details.text): |
+ self._step.msg.failure_details.text = self._summary_text |
+ |
+ # Close our Step. |
+ self._engine._close_step(self._step.msg) |
+ |
+ ## |
+ # Implement stream.StreamEngine.StepStream |
+ ## |
+ |
+ @_check_annotation_changed |
+ def new_log_stream(self, log_name): |
+ # Generate the base normalized log stream name for this log. |
+ stream_name = self._step.stream_name_base.append('logs', log_name) |
+ |
+ # Add the log stream index to the end of the stream name. |
+ index = self._log_stream_index.setdefault(str(stream_name), 0) |
+ self._log_stream_index[str(stream_name)] = index + 1 |
+ stream_name = stream_name.append(str(index)) |
+ |
+ # Create a new log stream for this name. |
+ fd = self._engine._client.open_text(str(stream_name)) |
+ |
+ # Update our step to include the log stream. |
+ link = self._step.msg.other_links.add(label=log_name) |
+ link.logdog_stream.name = str(stream_name) |
+ |
+ return self._engine.TextStream(fd) |
+ |
+ @_check_annotation_changed |
+ def add_step_text(self, text): |
+ self._step.msg.text.append(text) |
+ |
+ @_check_annotation_changed |
+ def add_step_summary_text(self, text): |
+ self._step.msg.text.insert(0, text) |
+ self._summary_text = text |
+ |
+ @_check_annotation_changed |
+ def add_step_link(self, name, url): |
+ self._step.msg.other_links.add(label=name, url=url) |
martiniss
2016/09/30 20:41:44
We never set the "main" link, only other links (ht
dnj
2016/09/30 22:04:38
I don't know what we'd set it to. Any thoughts? Fu
martiniss
2016/10/01 02:00:36
What was the intention of the original item in the
|
+ |
+ def reset_subannotation_state(self): |
+ pass |
+ |
+ @_check_annotation_changed |
+ def set_step_status(self, status): |
+ if status == 'SUCCESS': |
+ self._step.msg.status = pb.SUCCESS |
+ elif status == 'WARNING': |
+ self._step.msg.status = pb.SUCCESS |
+ self._step.msg.failure_details.type = pb.FailureDetails.GENERAL |
+ elif status == 'FAILURE': |
+ self._step.msg.status = pb.FAILURE |
+ self._step.msg.failure_details.type=pb.FailureDetails.GENERAL |
+ elif status == 'EXCEPTION': |
+ self._step.msg.status = pb.FAILURE |
+ self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION |
+ else: |
+ raise ValueError('Unknown status [%s]' % (status,)) |
+ |
+ @_check_annotation_changed |
+ def set_build_property(self, key, value): |
+ self._engine._anno.update_properties(key=value) |
+ |
+ def trigger(self, trigger_spec): |
+ if self._engine._ignore_triggers: |
+ return |
+ raise NotImplementedError( |
+ 'Stream-based triggering is not supported for LogDog. Please use ' |
+ 'a recipe module (e.g., buildbucket) directly for build scheduling.') |
+ |
+ |
+ def __init__(self, client=None, streamserver_uri=None, name_base=None, |
martiniss
2016/09/30 20:41:44
Maybe clarify this is the constructor for the Stre
dnj
2016/09/30 22:04:38
I'll just move this sucker up to the top.
|
+ ignore_triggers=False, environment=None): |
+ """Initializes a new LogDog/Annotation StreamEngine. |
+ |
+ Args: |
+ client (libs.logdog.stream.StreamClient or None): the LogDog stream client |
+ to use. If this is None, a new StreamClient will be instantiated when |
+ this StreamEngine is opened. |
+ streamserver_uri (str or None): The LogDog Butler stream server URI. See |
+ LogDog client library docs for details on supported protocols and |
+ format. This will only be used when "client" is None. If this is also |
+ None, a StreamClient will be created through probing. |
+ name_base (str or None): The default stream name prefix that will be added |
+ to generated LogDog stream names. If None, no prefix will be applied. |
+ ignore_triggers (bool): Triggers are not supported in LogDog annotation |
+ streams. If True, attempts to trigger will be silently ignored. If |
+ False, they will cause a NotImplementedError to be raised. |
+ environment (_Environment or None): The _Environment instance to use for |
+ operations. This will be None at production, but can be overridden |
+ here for testing. |
+ """ |
+ |
+ self._client = client |
+ self._streamserver_uri = streamserver_uri |
+ self._name_base = _StreamName(name_base) |
+ self._ignore_triggers = ignore_triggers |
+ self._env = environment or _Environment.probe() |
+ |
+ self._astate = None |
+ |
+ self._annotation_stream = None |
+ self._annotation_monitor = None |
+ self._streams = collections.OrderedDict() |
+ |
+ def new_step_stream(self, step_config): |
+ # TODO(dnj): In the current iteration, subannotations are NOT supported. |
+ # In order to support them, they would have to be parsed out of the stream |
+ # and converted into Milo Annotation protobuf. This is a non-trivial effort |
+ # and may be a waste of time, as in a LogDog-enabled world, the component |
+ # emitting sub-annotations would actually just create its own annotation |
+ # stream and emit its own Milo protobuf. |
+ # |
+ # Components that emit subannotations and don't want to be converted to use |
+ # LogDog streams could bootstrap themselves through Annotee and let it do |
+ # the work. |
+ # |
+ # For now, though, we explicitly do NOT support LogDog running with |
+ # subannotations enabled. |
+ if step_config.allow_subannotations: |
+ raise NotImplementedError('Subannotations are not supported with LogDog ' |
+ 'output.') |
+ |
+ strm = self.StepStream.create(self, self._astate.create_step(step_config)) |
+ self._check() |
+ return strm |
+ |
+ def open(self): |
+ # Initialize our client, if one is not provided. |
+ if self._client is None: |
+ if self._streamserver_uri: |
+ self._client = libs.logdog.stream.create(self._streamserver_uri) |
+ else: |
+ # Probe the stream client via Bootstrap. |
+ bootstrap = libs.logdog.bootstrap.probe() |
+ self._client = bootstrap.stream_client() |
+ |
+ annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME) |
+ self._annotation_stream = self._client.open_datagram( |
+ str(annotation_stream_name), |
+ content_type=ANNOTATION_CONTENT_TYPE) |
+ |
+ self._annotation_monitor = _AnnotationMonitor(self._annotation_stream) |
+ |
+ # Initialize our open streams list. |
+ self._streams.clear() |
+ |
+ # Initialize our annotation state. |
+ self._astate = _AnnotationState.create(self._name_base, |
+ environment=self._env) |
+ self._astate.base.status = pb.RUNNING |
+ self._set_timestamp(self._astate.base.started) |
+ self._check() |
+ |
+ def close(self): |
+ assert self._astate is not None, 'StreamEngine is not open.' |
+ |
+ # Shut down any outstanding streams that may not have been closed for |
+ # whatever reason. |
+ for s in reversed(self._streams.values()): |
+ s.close() |
+ |
+ # Close out our root Step. Manually check annotation state afterwards. |
+ self._close_step(self._astate.base) |
+ self._check() |
+ |
+ # Shut down our annotation monitor and close our annotation stream. |
+ self._annotation_monitor.flush_and_join() |
+ self._annotation_stream.close() |
+ |
+ # Clear our client and state. We are now closed. |
+ self._streams.clear() |
+ self._client = None |
+ self._astate = None |
+ |
+ def _check(self): |
+ if self._astate is None: |
+ return |
+ |
+ step_data = self._astate.check() |
+ if step_data is not None: |
+ self._annotation_monitor.signal_update(step_data) |
+ |
+ def _set_timestamp(self, dst, dt=None): |
+ """Populates a timestamp_pb2.Timestamp, dst, with a datetime. |
+ |
+ Args: |
+ dst (timestamp_pb2.Timestamp): the timestamp protobuf that will be loaded |
+ with the time. |
+ dt (datetime.datetime or None): If not None, the datetime to load. If |
+ None, the current time (via now) will be used. |
+ """ |
+ dt = (dt) if dt else (self._env.now) |
+ |
+ # Convert to milliseconds from epoch. |
+ v = (dt - _EPOCH).total_seconds() |
+ |
+ dst.seconds = int(v) |
+ dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos. |
+ |
+ def _close_step(self, step): |
+ """Closes a step, and any open substeps, propagating status. |
+ |
+ If all of the substeps are already closed, this will do nothing. However, if |
+ any are open, it will close them with an infra failure state. |
+ |
+ If any substeps failed, the failure will be propagated to step. |
+ |
+ Args: |
+ step (pb.Step): The Step message to close. |
+ """ |
+ # Close any open substeps, in case some of them didn't close. |
+ failed = [] |
+ incomplete = [] |
+ for sub in step.substep: |
+ if not sub.HasField('step'): |
+ # Not an embedded substep. |
+ continue |
+ |
+ # Did this step actually complete? It should have, by now, so if it didn't |
+ # we'll be reporting an infra failure in "step". |
+ if sub.step.status not in (pb.SUCCESS, pb.FAILURE): |
+ incomplete.append(sub.step) |
+ |
+ # Close this substep. This may be a no-op, if the substep is already |
+ # closed. |
+ self._close_step(sub.step) |
+ |
+ # If a substep failed, propagate its failure status to "step". |
+ if sub.step.status == pb.FAILURE: |
+ failed.append(sub.step) |
+ |
+ # If we had any incomplete steps, mark that we failed. |
+ if incomplete: |
+ step.status = pb.FAILURE |
+ if step.failure_details is None: |
+ step.failure_details = pb.FailureDetails( |
+ type=pb.FailureDetails.INFRA, |
+ text='Some substeps did not complete: %s' % ( |
+ ', '.join(s.name for s in incomplete)), |
+ ) |
+ elif failed: |
+ step.status = pb.FAILURE |
+ if step.failure_details is None: |
+ # This step didn't successfully close, so propagate an infra failure. |
+ step.failure_details = pb.FailureDetails( |
+ type=pb.FailureDetails.GENERAL, |
+ text='Some substeps failed: %s' % ( |
+ ', '.join(s.name for s in failed)), |
+ ) |
+ |
+ # Now close "step". If it's still RUNNING, assume that it was successful. |
+ if step.status == pb.RUNNING: |
+ step.status = pb.SUCCESS |
+ if not step.HasField('ended'): |
+ self._set_timestamp(step.ended) |
+ |
+ |
+ |
+class _AnnotationMonitor(object): |
+ """The owner of the annotation datagram stream, sending annotation updates in |
+ a controlled manner and buffering them when the content hasn't changed. |
+ |
+ By default, since annotation state can change rapidly, minor annotation |
+ changes are throttled such that they are only actually sent periodically. |
+ |
+ New annotation state updates can be installed by calling `signal_update`. |
+ After being started, the _AnnotationMonitor thread must be shut down by |
+ calling its `flush_and_join` method. |
+ """ |
+ |
+ # Flush interval for non-structural events. |
+ _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30) |
+ |
+ def __init__(self, fd, flush_period=None): |
+ self._fd = fd |
+ self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD |
+ |
+ # The following group of variables is protected by "_lock". |
+ self._lock = threading.Lock() |
+ self._current_data = None |
+ self._flush_timer = None |
+ self._last_flush_time = None |
+ self._last_flush_data = None |
+ |
+ def signal_update(self, step_data, structural=False): |
martiniss
2016/09/30 20:41:44
structural is never used by anything, right? Is it
dnj
2016/09/30 22:04:38
I think it's a good idea. I'm interested in seeing
|
+ """Updates the annotation state with new step data. |
+ |
+ This updates our state to include new step data. The annotation monitor |
+ thread will pick this up and dispatch it, either: |
+ - Eventually, when the flush period completes, or |
+ - Immediately, if this is a structural change. |
+ |
+ Args: |
+ step_data (str): The updated binary annotation protobuf step data. |
+ structural (bool): If True, this is a structural update and should be |
+ pushed immediately. |
+ """ |
+ with self._lock: |
+ # Did our data actually change? |
+ if step_data == self._last_flush_data: |
+ # Nope, leave things as-is. |
+ return |
+ |
+ # This is new data. Is it structural? If so, flush immediately. |
+ # If not, make sure our timer is running so it will eventually be flushed. |
+ # Note that the timer may also suggest that we flush immediately. |
+ now = datetime.datetime.now() |
+ self._current_data = step_data |
+ if structural or self._set_flush_timer_locked(now): |
+ # We should flush immediately. |
+ self._flush_now_locked(now) |
+ |
+ def flush_and_join(self): |
+ """Flushes any remaining updates and blocks until the monitor is complete. |
+ """ |
+ # Mark that we're finished and signal our event. |
+ with self._lock: |
+ self._flush_now_locked(datetime.datetime.now()) |
+ |
+ @property |
+ def latest(self): |
+ with self._lock: |
+ return self._last_flush_data |
+ |
+ def _flush_now_locked(self, now): |
+ # Clear any current flush timer. |
+ self._clear_flush_timer_locked() |
+ |
+ # Record this flush. |
+ # |
+ # We set the last flush time to now because even if we don't actually send |
+ # data, we have responded to the flush edge. |
martiniss
2016/09/30 20:41:44
flush edge?
dnj
2016/09/30 22:04:38
Edge in the signals sense. I'll change it to say "
|
+ flush_data, self._current_data = self._current_data, None |
+ self._last_flush_time = now |
+ |
+ # If the data hasn't changed since the last flush, then don't actually |
+ # do anything. |
+ if flush_data is None or flush_data == self._last_flush_data: |
+ return |
+ |
+ self._last_flush_data = flush_data |
+ self._fd.send(flush_data) |
+ |
+ def _clear_flush_timer_locked(self): |
+ if self._flush_timer is not None: |
+ self._flush_timer.cancel() |
+ self._flush_timer = None |
+ |
+ def _set_flush_timer_locked(self, now): |
+ if self._flush_timer is not None: |
+ # Our flush timer is already running. |
+ return False |
+ |
+ if self._last_flush_time is None: |
+ # We have never flushed before, so flush immediately. |
+ return True |
+ |
+ deadline = self._last_flush_time + self._flush_period |
+ if deadline <= now: |
+ # We're past our flush deadline, and should flush immediately. |
+ return True |
+ |
+ # Start our flush timer. |
+ self._flush_timer = threading.Timer((deadline - now).total_seconds(), |
+ self._flush_timer_expired) |
+ self._flush_timer.daemon = True |
+ self._flush_timer.start() |
+ |
+ def _flush_timer_expired(self): |
+ with self._lock: |
+ self._flush_now_locked(datetime.datetime.now()) |
+ |
+ |
+class _AnnotationState(object): |
+ """Manages an outer Milo annotation protobuf Step.""" |
+ |
+ Step = collections.namedtuple('Step', ( |
+ 'msg', 'stream_name_base', 'substream_name_index')) |
+ |
+ def __init__(self, base_step, stream_name_base): |
+ self._base = self.Step( |
+ msg=base_step, |
+ stream_name_base=stream_name_base, |
+ substream_name_index={}) |
+ self._check_snapshot = None |
+ |
+ # The current step stack. This is built by updating state after new steps' |
+ # nesting levels. |
+ self._nest_stack = [self._base] |
+ |
+ # Index initial properties. |
+ self._properties = dict((p.name, p) for p in self._base.msg.property) |
+ |
+ @classmethod |
+ def create(cls, stream_name_base, environment=None, properties=None): |
+ base = pb.Step() |
+ base.name = 'steps' |
+ base.status = pb.PENDING |
+ if environment: |
+ if environment.argv: |
+ base.command.command_line.extend(environment.argv) |
+ if environment.cwd: |
+ base.command.cwd = environment.cwd |
+ if environment.environ: |
+ base.command.environ.update(environment.environ) |
+ if properties: |
+ for k, v in sorted(properties.iteritems()): |
+ base.property.add(name=k, value=v) |
+ return cls(base, stream_name_base) |
+ |
+ @property |
+ def base(self): |
+ return self._base.msg |
+ |
+ def check(self): |
+ """Checks if the annotation state has been updated and, if so, returns it. |
+ |
+ After check returns, the latest annotation state will be used as the current |
+ snapshot for future checks. |
+ |
+ Returns (str/None): A serialized binary Step protobuf if modified, None |
+ otherwise. |
+ """ |
+ if self._check_snapshot is None or self._check_snapshot != self.base: |
+ self._check_snapshot = copy.deepcopy(self.base) |
+ return self._check_snapshot.SerializeToString() |
+ return None |
+ |
+ def create_step(self, step_config): |
+ # Identify our parent Step by examining the nesting level. The first step |
+ # in the nest stack will always be the base (nesting level "-1", since it's |
+ # the parent of level 0). Since the step's "nest_level" is one more than the |
+ # parent, and we need to offset by 1 to reach the stack index, they cancel |
+ # each other out, so the nest level is the same as the parent's stack index. |
+ assert step_config.nest_level < len(self._nest_stack), ( |
+ 'Invalid nest level %d (highest is %d)' % ( |
+ step_config.nest_level, len(self._nest_stack)-1)) |
+ |
+ # Clear any items in the nest stack that are deeper than the current |
+ # element. |
+ del(self._nest_stack[step_config.nest_level+1:]) |
+ parent = self._nest_stack[-1] |
+ |
+ # Create a stream name for this step. Even though step names are unique, |
+ # the normalized LogDog step name may overlap with a different step name. |
+ # We keep track of the step names we've issued to this step space and |
+ # add indexes if a conflict is identified. |
+ stream_name_base = parent.stream_name_base.append('steps', |
+ step_config.base_name) |
+ index = parent.substream_name_index.setdefault(str(stream_name_base), 0) |
+ parent.substream_name_index[str(stream_name_base)] += 1 |
+ if index > 0: |
+ stream_name_base += '_%d' % (index,) |
+ |
+ # Create and populate our new step. |
+ msg = parent.msg.substep.add().step |
+ msg.name = step_config.base_name |
+ msg.status = pb.PENDING |
+ if step_config.cmd: |
+ msg.command.command_line.extend(step_config.cmd) |
+ if step_config.cwd: |
+ msg.command.cwd = step_config.cwd |
+ if step_config.env: |
+ msg.command.environ = step_config.env |
+ |
+ step = self.Step( |
+ msg=msg, |
+ stream_name_base=stream_name_base, |
+ substream_name_index={}) |
+ self._nest_stack.append(step) |
+ return step |
+ |
+ def update_properties(self, **kwargs): |
+ """Updates a set's property values to incorporate kwargs.""" |
+ for k, v in sorted(kwargs.iteritems()): |
+ cur = self._properties.get(k) |
+ if cur is None: |
+ cur = self.base.property.add(name=k, value=str(v)) |
+ self._properties[k] = cur |
+ continue |
+ |
+ # A Property message already exists for this key, so update its value. |
+ if cur.value != v: |
+ cur.value = str(v) |
+ |
+ |
+class _StreamName(object): |
+ """An immutable validated wrapper for a LogDog stream name.""" |
+ |
+ def __init__(self, base): |
+ if base is not None: |
+ libs.logdog.streamname.validate_stream_name(base) |
+ self._base = base |
+ |
+ def append(self, *components): |
+ """Returns (_StreamName): A new _StreamName instance with components added. |
+ |
+ Each component in "components" will become a new normalized stream name |
+ component. Conseqeuntly, any separators (/) in the components will be |
+ replaced with underscores. |
+ |
+ Args: |
+ components: the path components to append to this _StreamName. |
+ """ |
+ if len(components) == 0: |
+ return self |
+ |
+ components = [self._normalize(self._flatten(p)) |
+ for p in reversed(components)] |
+ if self._base: |
+ components.append(self._base) |
+ return type(self)('/'.join(reversed(components))) |
+ |
+ def augment(self, val): |
+ """Returns (_StreamName): A new _StreamName with "val" appended. |
+ |
+ This generates a new, normalized _StreamName with the contents of "val" |
+ appended to the end. For example: |
+ |
+ Original: "foo/bar" |
+ Append "baz qux": "foo/barbaz_qux" |
+ """ |
+ if not val: |
+ return self |
+ val = self._flatten(val) |
+ if self._base: |
+ val = self._base + val |
+ return type(self)(self._normalize(val)) |
+ |
+ def __iadd__(self, val): |
+ return self.augment(val) |
+ |
+ @staticmethod |
+ def _flatten(v): |
+ return v.replace('/', '_') |
+ |
+ @staticmethod |
+ def _normalize(v): |
+ return libs.logdog.streamname.normalize(v, prefix='s_') |
+ |
+ def __str__(self): |
+ if not self._base: |
+ raise ValueError('Cannot generate string from empty StreamName.') |
+ return self._base |