Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(756)

Unified Diff: recipe_engine/stream_logdog.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: pylint, fix comments Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « recipe_engine/stream.py ('k') | recipe_engine/third_party/requests/requests-2.10.0.dist-info/INSTALLER » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: recipe_engine/stream_logdog.py
diff --git a/recipe_engine/stream_logdog.py b/recipe_engine/stream_logdog.py
new file mode 100644
index 0000000000000000000000000000000000000000..f7835ecc0c6d2e9fad2a27b2c15d015807163134
--- /dev/null
+++ b/recipe_engine/stream_logdog.py
@@ -0,0 +1,743 @@
+# Copyright 2015 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+"""stream.StreamEngine implementation for LogDog, using Milo annotation
+protobuf.
+"""
+
+import collections
+import contextlib
+import copy
+import datetime
+import os
+import threading
+import sys
+
+from . import env
+from . import stream
+from . import util
+
+import google.protobuf.message
+import google.protobuf.timestamp_pb2 as timestamp_pb2
+import libs.logdog.bootstrap
+import libs.logdog.stream
+import libs.logdog.streamname
+import annotations_pb2 as pb
+
+
+# The datetime for the epoch.
+_EPOCH = datetime.datetime.utcfromtimestamp(0)
+
+# The annotation stream ContentType.
+#
+# This must match the ContentType for the annotation binary protobuf, which
+# is specified in "<luci-go>/common/proto/milo/util.go".
+ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2'
+
+
+class _Environment(object):
+ """Simulated system environment. The StreamEngine uses this to probe for
+ system parameters. By default, the environment will be derived from the
+ actual system.
+ """
+
+ def __init__(self, now_fn, argv, environ, cwd):
+ self._now_fn = now_fn
+ self.argv = argv
+ self.environ = environ
+ self.cwd = cwd
+
+ @property
+ def now(self):
+ return self._now_fn()
+
+ @classmethod
+ def probe(cls):
+ return cls(
+ now_fn=datetime.datetime.now,
+ argv=sys.argv[:],
+ environ=dict(os.environ),
+ cwd=os.getcwd(),
+ )
+
+
+def _check_annotation_changed(fn):
+ """Decorator that can be applied to a StepStream instance method to call
+ our bound Engine's _check after the function is finished.
+
+ This should decorate any method that modifies annotation state.
+ """
+ def check_after(inner, *args, **kwargs):
+ v = fn(inner, *args, **kwargs)
+ inner._engine._check()
+ return v
+ return check_after
+
+
+class StreamEngine(stream.StreamEngine):
+ """A stream.StreamEngine implementation that uses Logdog streams and Milo
+ annotation protobufs.
+
+ The generated LogDog streams will be (relative to "name_base"):
+ /annotations
+ The base annotations stream.
+
+ /steps/<step_name>/
+ Base stream name for a given step. Note that if multiple steps normalize
+ to the same <step_name> value, an index will be appended, such as
+ <step_name>_0. This can happen because the stream name component of a
+ step is normalized, so two validly-independent steps ("My Thing" and
+ "My_Thing") will both normalize to "My_Thing". In this case, the second
+ one would have the stream name component, "My_Thing_1".
+
+ /steps/<step_name>/stdout
+ STDOUT stream for step "<step_name>".
+ /steps/<step_name>/stderr
+ STDOUT stream for step "<step_name>".
+
+ /steps/<step_name>/logs/<log_name>/<log_name_index>
+ Stream name for a given step's logs. <log_name_index> is the index of the
+ log with the given normalized name. This is similar to <step_name>, only
+ the index is added as a separate stream name component.
+ """
+
+ # The name of the annotation stream.
+ ANNOTATION_NAME = 'annotations'
+
+
+ class TextStream(stream.StreamEngine.Stream):
+
+ def __init__(self, fd):
+ super(StreamEngine.TextStream, self).__init__()
+ self._fd = fd
+
+ ##
+ # Implement stream.StreamEngine.Stream
+ ##
+
+ def write_line(self, line):
+ self._fd.write(line)
+ self._fd.write('\n')
+
+ def write_split(self, string):
+ self._fd.write(string)
+ if not string.endswith('\n'):
+ self._fd.write('\n')
+
+ def close(self):
+ self._fd.close()
+
+
+ class StepStream(stream.StreamEngine.StepStream):
+ """An individual step stream."""
+
+ def __init__(self, engine, step):
martiniss 2016/09/30 20:41:44 Can you document that step is an instance of Annot
dnj 2016/09/30 22:04:38 Done.
+ # We will lazily create the STDOUT stream when the first data is written.
+ super(StreamEngine.StepStream, self).__init__()
+
+ self._engine = engine
+ self._step = step
+
+ # We keep track of the log streams associated with this step.
+ self._log_stream_index = {}
+
+ # We will lazily instantiate our stdout stream when content is actually
+ # written to it.
+ self._stdout_stream = None
+
+ # The retained step summary text. When generating failure details, this
+ # will be consumed to populate their text field.
+ self._summary_text = None
+
+ @classmethod
+ def create(cls, engine, step):
+ strm = cls(engine, step)
+
+ # Start our step.
+ strm._step.msg.status = pb.RUNNING
+ engine._set_timestamp(strm._step.msg.started)
+
+ return strm
+
+ @_check_annotation_changed
+ def _get_stdout(self):
+ if self._stdout_stream is None:
+ # Create a new STDOUT text stream.
+ stream_name = self._step.stream_name_base.append('stdout')
+ self._stdout_stream = self._engine._client.open_text(str(stream_name))
martiniss 2016/09/30 20:41:44 You're accessing self._engine._client, which looks
dnj 2016/09/30 22:04:38 Yeah - this thing needs access to the client, and
+
+ self._step.msg.stdout_stream.name = str(stream_name)
+ return self._stdout_stream
+
+ ##
+ # Implement stream.StreamEngine.Stream
+ ##
+
+ def write_line(self, line):
+ stdout = self._get_stdout()
+ stdout.write(line)
+ stdout.write('\n')
+
+ def write_split(self, string):
+ stdout = self._get_stdout()
+ stdout.write(string)
+ if not string.endswith('\n'):
+ stdout.write('\n')
+
+ @_check_annotation_changed
+ def close(self):
+ if self._stdout_stream is not None:
+ self._stdout_stream.close()
+
+ # If we still have retained summary text, a failure, and no failure detail
+ # text, copy it there.
+ if self._summary_text is not None:
+ if (self._step.msg.HasField('failure_details') and
+ not self._step.msg.failure_details.text):
+ self._step.msg.failure_details.text = self._summary_text
+
+ # Close our Step.
+ self._engine._close_step(self._step.msg)
+
+ ##
+ # Implement stream.StreamEngine.StepStream
+ ##
+
+ @_check_annotation_changed
+ def new_log_stream(self, log_name):
+ # Generate the base normalized log stream name for this log.
+ stream_name = self._step.stream_name_base.append('logs', log_name)
+
+ # Add the log stream index to the end of the stream name.
+ index = self._log_stream_index.setdefault(str(stream_name), 0)
+ self._log_stream_index[str(stream_name)] = index + 1
+ stream_name = stream_name.append(str(index))
+
+ # Create a new log stream for this name.
+ fd = self._engine._client.open_text(str(stream_name))
+
+ # Update our step to include the log stream.
+ link = self._step.msg.other_links.add(label=log_name)
+ link.logdog_stream.name = str(stream_name)
+
+ return self._engine.TextStream(fd)
+
+ @_check_annotation_changed
+ def add_step_text(self, text):
+ self._step.msg.text.append(text)
+
+ @_check_annotation_changed
+ def add_step_summary_text(self, text):
+ self._step.msg.text.insert(0, text)
+ self._summary_text = text
+
+ @_check_annotation_changed
+ def add_step_link(self, name, url):
+ self._step.msg.other_links.add(label=name, url=url)
martiniss 2016/09/30 20:41:44 We never set the "main" link, only other links (ht
dnj 2016/09/30 22:04:38 I don't know what we'd set it to. Any thoughts? Fu
martiniss 2016/10/01 02:00:36 What was the intention of the original item in the
+
+ def reset_subannotation_state(self):
+ pass
+
+ @_check_annotation_changed
+ def set_step_status(self, status):
+ if status == 'SUCCESS':
+ self._step.msg.status = pb.SUCCESS
+ elif status == 'WARNING':
+ self._step.msg.status = pb.SUCCESS
+ self._step.msg.failure_details.type = pb.FailureDetails.GENERAL
+ elif status == 'FAILURE':
+ self._step.msg.status = pb.FAILURE
+ self._step.msg.failure_details.type=pb.FailureDetails.GENERAL
+ elif status == 'EXCEPTION':
+ self._step.msg.status = pb.FAILURE
+ self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION
+ else:
+ raise ValueError('Unknown status [%s]' % (status,))
+
+ @_check_annotation_changed
+ def set_build_property(self, key, value):
+ self._engine._anno.update_properties(key=value)
+
+ def trigger(self, trigger_spec):
+ if self._engine._ignore_triggers:
+ return
+ raise NotImplementedError(
+ 'Stream-based triggering is not supported for LogDog. Please use '
+ 'a recipe module (e.g., buildbucket) directly for build scheduling.')
+
+
+ def __init__(self, client=None, streamserver_uri=None, name_base=None,
martiniss 2016/09/30 20:41:44 Maybe clarify this is the constructor for the Stre
dnj 2016/09/30 22:04:38 I'll just move this sucker up to the top.
+ ignore_triggers=False, environment=None):
+ """Initializes a new LogDog/Annotation StreamEngine.
+
+ Args:
+ client (libs.logdog.stream.StreamClient or None): the LogDog stream client
+ to use. If this is None, a new StreamClient will be instantiated when
+ this StreamEngine is opened.
+ streamserver_uri (str or None): The LogDog Butler stream server URI. See
+ LogDog client library docs for details on supported protocols and
+ format. This will only be used when "client" is None. If this is also
+ None, a StreamClient will be created through probing.
+ name_base (str or None): The default stream name prefix that will be added
+ to generated LogDog stream names. If None, no prefix will be applied.
+ ignore_triggers (bool): Triggers are not supported in LogDog annotation
+ streams. If True, attempts to trigger will be silently ignored. If
+ False, they will cause a NotImplementedError to be raised.
+ environment (_Environment or None): The _Environment instance to use for
+ operations. This will be None at production, but can be overridden
+ here for testing.
+ """
+
+ self._client = client
+ self._streamserver_uri = streamserver_uri
+ self._name_base = _StreamName(name_base)
+ self._ignore_triggers = ignore_triggers
+ self._env = environment or _Environment.probe()
+
+ self._astate = None
+
+ self._annotation_stream = None
+ self._annotation_monitor = None
+ self._streams = collections.OrderedDict()
+
+ def new_step_stream(self, step_config):
+ # TODO(dnj): In the current iteration, subannotations are NOT supported.
+ # In order to support them, they would have to be parsed out of the stream
+ # and converted into Milo Annotation protobuf. This is a non-trivial effort
+ # and may be a waste of time, as in a LogDog-enabled world, the component
+ # emitting sub-annotations would actually just create its own annotation
+ # stream and emit its own Milo protobuf.
+ #
+ # Components that emit subannotations and don't want to be converted to use
+ # LogDog streams could bootstrap themselves through Annotee and let it do
+ # the work.
+ #
+ # For now, though, we explicitly do NOT support LogDog running with
+ # subannotations enabled.
+ if step_config.allow_subannotations:
+ raise NotImplementedError('Subannotations are not supported with LogDog '
+ 'output.')
+
+ strm = self.StepStream.create(self, self._astate.create_step(step_config))
+ self._check()
+ return strm
+
+ def open(self):
+ # Initialize our client, if one is not provided.
+ if self._client is None:
+ if self._streamserver_uri:
+ self._client = libs.logdog.stream.create(self._streamserver_uri)
+ else:
+ # Probe the stream client via Bootstrap.
+ bootstrap = libs.logdog.bootstrap.probe()
+ self._client = bootstrap.stream_client()
+
+ annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME)
+ self._annotation_stream = self._client.open_datagram(
+ str(annotation_stream_name),
+ content_type=ANNOTATION_CONTENT_TYPE)
+
+ self._annotation_monitor = _AnnotationMonitor(self._annotation_stream)
+
+ # Initialize our open streams list.
+ self._streams.clear()
+
+ # Initialize our annotation state.
+ self._astate = _AnnotationState.create(self._name_base,
+ environment=self._env)
+ self._astate.base.status = pb.RUNNING
+ self._set_timestamp(self._astate.base.started)
+ self._check()
+
+ def close(self):
+ assert self._astate is not None, 'StreamEngine is not open.'
+
+ # Shut down any outstanding streams that may not have been closed for
+ # whatever reason.
+ for s in reversed(self._streams.values()):
+ s.close()
+
+ # Close out our root Step. Manually check annotation state afterwards.
+ self._close_step(self._astate.base)
+ self._check()
+
+ # Shut down our annotation monitor and close our annotation stream.
+ self._annotation_monitor.flush_and_join()
+ self._annotation_stream.close()
+
+ # Clear our client and state. We are now closed.
+ self._streams.clear()
+ self._client = None
+ self._astate = None
+
+ def _check(self):
+ if self._astate is None:
+ return
+
+ step_data = self._astate.check()
+ if step_data is not None:
+ self._annotation_monitor.signal_update(step_data)
+
+ def _set_timestamp(self, dst, dt=None):
+ """Populates a timestamp_pb2.Timestamp, dst, with a datetime.
+
+ Args:
+ dst (timestamp_pb2.Timestamp): the timestamp protobuf that will be loaded
+ with the time.
+ dt (datetime.datetime or None): If not None, the datetime to load. If
+ None, the current time (via now) will be used.
+ """
+ dt = (dt) if dt else (self._env.now)
+
+ # Convert to milliseconds from epoch.
+ v = (dt - _EPOCH).total_seconds()
+
+ dst.seconds = int(v)
+ dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos.
+
+ def _close_step(self, step):
+ """Closes a step, and any open substeps, propagating status.
+
+ If all of the substeps are already closed, this will do nothing. However, if
+ any are open, it will close them with an infra failure state.
+
+ If any substeps failed, the failure will be propagated to step.
+
+ Args:
+ step (pb.Step): The Step message to close.
+ """
+ # Close any open substeps, in case some of them didn't close.
+ failed = []
+ incomplete = []
+ for sub in step.substep:
+ if not sub.HasField('step'):
+ # Not an embedded substep.
+ continue
+
+ # Did this step actually complete? It should have, by now, so if it didn't
+ # we'll be reporting an infra failure in "step".
+ if sub.step.status not in (pb.SUCCESS, pb.FAILURE):
+ incomplete.append(sub.step)
+
+ # Close this substep. This may be a no-op, if the substep is already
+ # closed.
+ self._close_step(sub.step)
+
+ # If a substep failed, propagate its failure status to "step".
+ if sub.step.status == pb.FAILURE:
+ failed.append(sub.step)
+
+ # If we had any incomplete steps, mark that we failed.
+ if incomplete:
+ step.status = pb.FAILURE
+ if step.failure_details is None:
+ step.failure_details = pb.FailureDetails(
+ type=pb.FailureDetails.INFRA,
+ text='Some substeps did not complete: %s' % (
+ ', '.join(s.name for s in incomplete)),
+ )
+ elif failed:
+ step.status = pb.FAILURE
+ if step.failure_details is None:
+ # This step didn't successfully close, so propagate an infra failure.
+ step.failure_details = pb.FailureDetails(
+ type=pb.FailureDetails.GENERAL,
+ text='Some substeps failed: %s' % (
+ ', '.join(s.name for s in failed)),
+ )
+
+ # Now close "step". If it's still RUNNING, assume that it was successful.
+ if step.status == pb.RUNNING:
+ step.status = pb.SUCCESS
+ if not step.HasField('ended'):
+ self._set_timestamp(step.ended)
+
+
+
+class _AnnotationMonitor(object):
+ """The owner of the annotation datagram stream, sending annotation updates in
+ a controlled manner and buffering them when the content hasn't changed.
+
+ By default, since annotation state can change rapidly, minor annotation
+ changes are throttled such that they are only actually sent periodically.
+
+ New annotation state updates can be installed by calling `signal_update`.
+ After being started, the _AnnotationMonitor thread must be shut down by
+ calling its `flush_and_join` method.
+ """
+
+ # Flush interval for non-structural events.
+ _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30)
+
+ def __init__(self, fd, flush_period=None):
+ self._fd = fd
+ self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD
+
+ # The following group of variables is protected by "_lock".
+ self._lock = threading.Lock()
+ self._current_data = None
+ self._flush_timer = None
+ self._last_flush_time = None
+ self._last_flush_data = None
+
+ def signal_update(self, step_data, structural=False):
martiniss 2016/09/30 20:41:44 structural is never used by anything, right? Is it
dnj 2016/09/30 22:04:38 I think it's a good idea. I'm interested in seeing
+ """Updates the annotation state with new step data.
+
+ This updates our state to include new step data. The annotation monitor
+ thread will pick this up and dispatch it, either:
+ - Eventually, when the flush period completes, or
+ - Immediately, if this is a structural change.
+
+ Args:
+ step_data (str): The updated binary annotation protobuf step data.
+ structural (bool): If True, this is a structural update and should be
+ pushed immediately.
+ """
+ with self._lock:
+ # Did our data actually change?
+ if step_data == self._last_flush_data:
+ # Nope, leave things as-is.
+ return
+
+ # This is new data. Is it structural? If so, flush immediately.
+ # If not, make sure our timer is running so it will eventually be flushed.
+ # Note that the timer may also suggest that we flush immediately.
+ now = datetime.datetime.now()
+ self._current_data = step_data
+ if structural or self._set_flush_timer_locked(now):
+ # We should flush immediately.
+ self._flush_now_locked(now)
+
+ def flush_and_join(self):
+ """Flushes any remaining updates and blocks until the monitor is complete.
+ """
+ # Mark that we're finished and signal our event.
+ with self._lock:
+ self._flush_now_locked(datetime.datetime.now())
+
+ @property
+ def latest(self):
+ with self._lock:
+ return self._last_flush_data
+
+ def _flush_now_locked(self, now):
+ # Clear any current flush timer.
+ self._clear_flush_timer_locked()
+
+ # Record this flush.
+ #
+ # We set the last flush time to now because even if we don't actually send
+ # data, we have responded to the flush edge.
martiniss 2016/09/30 20:41:44 flush edge?
dnj 2016/09/30 22:04:38 Edge in the signals sense. I'll change it to say "
+ flush_data, self._current_data = self._current_data, None
+ self._last_flush_time = now
+
+ # If the data hasn't changed since the last flush, then don't actually
+ # do anything.
+ if flush_data is None or flush_data == self._last_flush_data:
+ return
+
+ self._last_flush_data = flush_data
+ self._fd.send(flush_data)
+
+ def _clear_flush_timer_locked(self):
+ if self._flush_timer is not None:
+ self._flush_timer.cancel()
+ self._flush_timer = None
+
+ def _set_flush_timer_locked(self, now):
+ if self._flush_timer is not None:
+ # Our flush timer is already running.
+ return False
+
+ if self._last_flush_time is None:
+ # We have never flushed before, so flush immediately.
+ return True
+
+ deadline = self._last_flush_time + self._flush_period
+ if deadline <= now:
+ # We're past our flush deadline, and should flush immediately.
+ return True
+
+ # Start our flush timer.
+ self._flush_timer = threading.Timer((deadline - now).total_seconds(),
+ self._flush_timer_expired)
+ self._flush_timer.daemon = True
+ self._flush_timer.start()
+
+ def _flush_timer_expired(self):
+ with self._lock:
+ self._flush_now_locked(datetime.datetime.now())
+
+
+class _AnnotationState(object):
+ """Manages an outer Milo annotation protobuf Step."""
+
+ Step = collections.namedtuple('Step', (
+ 'msg', 'stream_name_base', 'substream_name_index'))
+
+ def __init__(self, base_step, stream_name_base):
+ self._base = self.Step(
+ msg=base_step,
+ stream_name_base=stream_name_base,
+ substream_name_index={})
+ self._check_snapshot = None
+
+ # The current step stack. This is built by updating state after new steps'
+ # nesting levels.
+ self._nest_stack = [self._base]
+
+ # Index initial properties.
+ self._properties = dict((p.name, p) for p in self._base.msg.property)
+
+ @classmethod
+ def create(cls, stream_name_base, environment=None, properties=None):
+ base = pb.Step()
+ base.name = 'steps'
+ base.status = pb.PENDING
+ if environment:
+ if environment.argv:
+ base.command.command_line.extend(environment.argv)
+ if environment.cwd:
+ base.command.cwd = environment.cwd
+ if environment.environ:
+ base.command.environ.update(environment.environ)
+ if properties:
+ for k, v in sorted(properties.iteritems()):
+ base.property.add(name=k, value=v)
+ return cls(base, stream_name_base)
+
+ @property
+ def base(self):
+ return self._base.msg
+
+ def check(self):
+ """Checks if the annotation state has been updated and, if so, returns it.
+
+ After check returns, the latest annotation state will be used as the current
+ snapshot for future checks.
+
+ Returns (str/None): A serialized binary Step protobuf if modified, None
+ otherwise.
+ """
+ if self._check_snapshot is None or self._check_snapshot != self.base:
+ self._check_snapshot = copy.deepcopy(self.base)
+ return self._check_snapshot.SerializeToString()
+ return None
+
+ def create_step(self, step_config):
+ # Identify our parent Step by examining the nesting level. The first step
+ # in the nest stack will always be the base (nesting level "-1", since it's
+ # the parent of level 0). Since the step's "nest_level" is one more than the
+ # parent, and we need to offset by 1 to reach the stack index, they cancel
+ # each other out, so the nest level is the same as the parent's stack index.
+ assert step_config.nest_level < len(self._nest_stack), (
+ 'Invalid nest level %d (highest is %d)' % (
+ step_config.nest_level, len(self._nest_stack)-1))
+
+ # Clear any items in the nest stack that are deeper than the current
+ # element.
+ del(self._nest_stack[step_config.nest_level+1:])
+ parent = self._nest_stack[-1]
+
+ # Create a stream name for this step. Even though step names are unique,
+ # the normalized LogDog step name may overlap with a different step name.
+ # We keep track of the step names we've issued to this step space and
+ # add indexes if a conflict is identified.
+ stream_name_base = parent.stream_name_base.append('steps',
+ step_config.base_name)
+ index = parent.substream_name_index.setdefault(str(stream_name_base), 0)
+ parent.substream_name_index[str(stream_name_base)] += 1
+ if index > 0:
+ stream_name_base += '_%d' % (index,)
+
+ # Create and populate our new step.
+ msg = parent.msg.substep.add().step
+ msg.name = step_config.base_name
+ msg.status = pb.PENDING
+ if step_config.cmd:
+ msg.command.command_line.extend(step_config.cmd)
+ if step_config.cwd:
+ msg.command.cwd = step_config.cwd
+ if step_config.env:
+ msg.command.environ = step_config.env
+
+ step = self.Step(
+ msg=msg,
+ stream_name_base=stream_name_base,
+ substream_name_index={})
+ self._nest_stack.append(step)
+ return step
+
+ def update_properties(self, **kwargs):
+ """Updates a set's property values to incorporate kwargs."""
+ for k, v in sorted(kwargs.iteritems()):
+ cur = self._properties.get(k)
+ if cur is None:
+ cur = self.base.property.add(name=k, value=str(v))
+ self._properties[k] = cur
+ continue
+
+ # A Property message already exists for this key, so update its value.
+ if cur.value != v:
+ cur.value = str(v)
+
+
+class _StreamName(object):
+ """An immutable validated wrapper for a LogDog stream name."""
+
+ def __init__(self, base):
+ if base is not None:
+ libs.logdog.streamname.validate_stream_name(base)
+ self._base = base
+
+ def append(self, *components):
+ """Returns (_StreamName): A new _StreamName instance with components added.
+
+ Each component in "components" will become a new normalized stream name
+ component. Conseqeuntly, any separators (/) in the components will be
+ replaced with underscores.
+
+ Args:
+ components: the path components to append to this _StreamName.
+ """
+ if len(components) == 0:
+ return self
+
+ components = [self._normalize(self._flatten(p))
+ for p in reversed(components)]
+ if self._base:
+ components.append(self._base)
+ return type(self)('/'.join(reversed(components)))
+
+ def augment(self, val):
+ """Returns (_StreamName): A new _StreamName with "val" appended.
+
+ This generates a new, normalized _StreamName with the contents of "val"
+ appended to the end. For example:
+
+ Original: "foo/bar"
+ Append "baz qux": "foo/barbaz_qux"
+ """
+ if not val:
+ return self
+ val = self._flatten(val)
+ if self._base:
+ val = self._base + val
+ return type(self)(self._normalize(val))
+
+ def __iadd__(self, val):
+ return self.augment(val)
+
+ @staticmethod
+ def _flatten(v):
+ return v.replace('/', '_')
+
+ @staticmethod
+ def _normalize(v):
+ return libs.logdog.streamname.normalize(v, prefix='s_')
+
+ def __str__(self):
+ if not self._base:
+ raise ValueError('Cannot generate string from empty StreamName.')
+ return self._base
« no previous file with comments | « recipe_engine/stream.py ('k') | recipe_engine/third_party/requests/requests-2.10.0.dist-info/INSTALLER » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698