Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: recipe_engine/stream_logdog.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """stream.StreamEngine implementation for LogDog, using Milo annotation
6 protobuf.
7 """
8
9 import collections
10 import contextlib
11 import copy
12 import datetime
13 import os
14 import threading
15 import sys
16
17 from . import env
18 from . import stream
19 from . import util
20
21 import google.protobuf.message
22 import google.protobuf.timestamp_pb2 as timestamp_pb2
23 import libs.logdog.bootstrap
24 import libs.logdog.stream
25 import libs.logdog.streamname
26 import annotations_pb2 as pb
27
28
29 # The datetime for the epoch.
30 _EPOCH = datetime.datetime.utcfromtimestamp(0)
31
32 # The annotation stream ContentType.
33 #
34 # This must match the ContentType for the annotation binary protobuf, which
35 # is specified in "<luci-go>/common/proto/milo/util.go".
36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2'
37
38
39 class _Environment(object):
40 """Simulated system environment. The StreamEngine uses this to probe for
41 system parameters. By default, the environment will be derived from the
42 actual system.
43 """
44
45 def __init__(self, now_fn, argv, environ, cwd):
46 self._now_fn = now_fn
47 self.argv = argv
48 self.environ = environ
49 self.cwd = cwd
50
51 @property
52 def now(self):
53 return self._now_fn()
54
55 @classmethod
56 def probe(cls):
57 return cls(
58 now_fn=datetime.datetime.now,
59 argv=sys.argv[:],
60 environ=dict(os.environ),
61 cwd=os.getcwd(),
62 )
63
64
65 class StreamEngine(stream.StreamEngine):
66 """A stream.StreamEngine implementation that uses Logdog streams and Milo
67 annotation protobufs.
68
69 The generated LogDog streams will be (relative to "name_base"):
70 /annotations (or "annotation_name", if overridden)
71 The base annotations stream.
72
73 /steps/<step_name>/
74 Base stream name for a given step. Note that if multiple steps normalize
75 to the same <step_name> value, an index will be appended, such as
76 <step_name>_0. This can happen because the stream name component of a
77 step is normalized, so two validly-independent steps ("My Thing" and
78 "My_Thing") will both normalize to "My_Thing". In this case, the second
79 one would have the stream name component, "My_Thing_1".
80
81 /steps/<step_name>/stdout
82 STDOUT stream for step "<step_name>".
83 /steps/<step_name>/stderr
84 STDOUT stream for step "<step_name>".
85
86 /steps/<step_name>/logs/<log_name>/<log_name_index>
87 Stream name for a given step's logs. <log_name_index> is the index of the
88 log with the given normalize name. This is similar to <step_name>, only
martiniss 2016/09/01 21:59:47 typo: with the given normalized name
dnj 2016/09/07 17:54:58 Done.
89 the index is added as a separate stream name component.
90 """
91
92 # The default name of the annotation stream.
93 DEFAULT_ANNOTATION_NAME = 'annotations'
94
95
96 class TextStream(stream.StreamEngine.Stream):
martiniss 2016/09/01 21:59:47 This looks really general; maybe this should be in
dnj 2016/09/07 17:54:57 I don't think so. I think it's reasonable for impl
97
98 def __init__(self, fd):
99 super(StreamEngine.TextStream, self).__init__()
100 self._fd = fd
101
102 ##
103 # Implement stream.StreamEngine.Stream
104 ##
105
106 def write_line(self, line):
107 self._fd.write(line)
108 self._fd.write('\n')
109
110 def write_split(self, string):
111 self._fd.write(string)
112 if not string.endswith('\n'):
113 self._fd.write('\n')
114
115 def close(self):
116 self._fd.close()
117
118
119 class StepStream(stream.StreamEngine.StepStream):
120 """An individual step stream."""
121
122 def __init__(self, engine, step):
123 # We will lazily create the STDOUT stream when the first data is written.
124 super(StreamEngine.StepStream, self).__init__()
125
126 self._engine = engine
127 self._step = step
128
129 # We keep track of the log streams associated with this step.
130 self._log_stream_index = {}
131
132 # We will lazily instantiate our stdout stream when content is actually
133 # written to it.
134 self._stdout_stream = None
135
136 # The retained step summary text. When generating failure details, this
137 # will be consumed to populate their text field.
138 self._summary_text = None
139
140 @classmethod
141 def create(cls, engine, step):
142 stream = cls(engine, step)
143
144 # Start our step.
145 stream._step.msg.status = pb.RUNNING
146 engine._set_timestamp(stream._step.msg.started)
147
148 return stream
149
150 def _check_annotation_changed(fn):
151 """Decorator that can be applied to a StepStream instance method to call
152 our bound Engine's _check after the function is finished.
153
154 This should decorate any method that modifies annotation state.
155 """
156 def check_after(inner, *args, **kwargs):
157 v = fn(inner, *args, **kwargs)
158 inner._engine._check()
159 return v
160 return check_after
161
162 @_check_annotation_changed
163 def _get_stdout(self):
164 if self._stdout_stream is None:
165 # Create a new STDOUT text stream.
166 stream_name = self._step.stream_name_base.append('stdout')
167 self._stdout_stream = self._engine._client.open_text(str(stream_name))
168
169 self._step.msg.stdout_stream.name = str(stream_name)
170 return self._stdout_stream
171
172 ##
173 # Implement stream.StreamEngine.Stream
174 ##
175
176 def write_line(self, line):
177 stdout = self._get_stdout()
178 stdout.write(line)
179 stdout.write('\n')
180
181 def write_split(self, string):
182 stdout = self._get_stdout()
183 stdout.write(string)
184 if not string.endswith('\n'):
185 stdout.write('\n')
186
187 @_check_annotation_changed
188 def close(self):
189 if self._stdout_stream is not None:
190 self._stdout_stream.close()
191
192 # If we still have retained summary text, a failure, and no failure detail
193 # text, copy it there.
194 if self._summary_text is not None:
195 if (self._step.msg.HasField('failure_details') and
196 not self._step.msg.failure_details.text):
197 self._step.msg.failure_details.text = self._summary_text
198
199 # Close our Step.
200 self._engine._close_step(self._step.msg)
201
202
martiniss 2016/09/01 21:59:47 nit: spaces
dnj 2016/09/07 17:54:57 Done.
203 ##
204 # Implement stream.StreamEngine.StepStream
205 ##
206
207 @_check_annotation_changed
208 def new_log_stream(self, log_name):
209 # Generate the base normalized log stream name for this log.
210 stream_name = self._step.stream_name_base.append('logs', log_name)
211
212 # Add the log stream index to the end of the stream name.
213 index = self._log_stream_index.setdefault(str(stream_name), 0)
214 self._log_stream_index[str(stream_name)] = index + 1
215 stream_name = stream_name.append(str(index))
216
217 # Create a new log stream for this name.
218 fd = self._engine._client.open_text(str(stream_name))
219
220 # Update our step to include the log stream.
221 link = self._step.msg.other_links.add(label=log_name)
222 link.logdog_stream.name = str(stream_name)
223
224 return self._engine.TextStream(fd)
225
226 @_check_annotation_changed
227 def add_step_text(self, text):
228 self._step.msg.text.append(text)
229
230 @_check_annotation_changed
231 def add_step_summary_text(self, text):
232 self._step.msg.text.insert(0, text)
233 self._summary_text = text
234
235 @_check_annotation_changed
236 def add_step_link(self, name, url):
237 self._step.msg.other_links.add(label=name, url=url)
238
239 def reset_subannotation_state(self):
240 pass
241
242 @_check_annotation_changed
243 def set_step_status(self, status):
244 if status == 'SUCCESS':
245 self._step.msg.status = pb.SUCCESS
246 elif status == 'WARNING':
247 self._step.msg.status = pb.SUCCESS
248 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL
249 elif status == 'FAILURE':
250 self._step.msg.status = pb.FAILURE
251 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL
252 elif status == 'EXCEPTION':
253 self._step.msg.status = pb.FAILURE
254 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION
255 else:
256 raise ValueError('Unknown status [%s]' % (status,))
257
258 @_check_annotation_changed
259 def set_build_property(self, key, value):
260 self._engine._anno.update_properties(key=value)
261
262 def trigger(self, trigger_spec):
263 if self._engine._ignore_triggers:
264 return
265 raise NotImplementedError(
martiniss 2016/09/01 21:59:47 this will probably cause issues when you start rol
dnj 2016/09/07 17:54:57 Understood. Not much to be done here, though, asid
266 'Stream-based triggering is not supported for LogDog. Please use '
267 'a recipe module (e.g., buildbucket) directly for build scheduling.')
268
269
270 def __init__(self, client=None, streamserver_uri=None, name_base=None,
martiniss 2016/09/01 21:59:47 Can you document this? I'm not sure exactly what c
dnj 2016/09/07 17:54:57 Done.
271 annotation_name=None, ignore_triggers=False, env=None):
272 self._client = client
273 self._streamserver_uri = streamserver_uri
274 self._name_base = _StreamName(name_base)
275 self._annotation_name = annotation_name or self.DEFAULT_ANNOTATION_NAME
martiniss 2016/09/01 21:59:47 What would be the reason for overriding this? I gu
dnj 2016/09/07 17:54:57 Really the idea is to let users fit this into thei
276 self._ignore_triggers = ignore_triggers
277 self._env = env or _Environment.probe()
278
279 self._astate = None
280
281 self._streams = collections.OrderedDict()
282
283 # For testing, allow our "now" to be overridden.
martiniss 2016/09/01 21:59:47 ???
dnj 2016/09/07 17:54:57 Old comment, deleted.
284
285 def new_step_stream(self, step_config):
286 # TODO(dnj): In the current iteration, subannotations are NOT supported.
287 # In order to support them, they would have to be parsed out of the stream
288 # and converted into Milo Annotation protobuf. This is a non-trivial effort
289 # and may be a waste of time, as in a LogDog-enabled world, the component
290 # emitting sub-annotations would actually just create its own annotation
291 # stream and emit its own Milo protobuf.
292 #
293 # Components that emit subannotations and don't want to be converted to use
294 # LogDog streams could bootstrap themselves through Annotee and let it do
295 # the work.
296 #
297 # For now, though, we explicitly do NOT support LogDog running with
298 # subannotations enabled.
299 if step_config.allow_subannotations:
300 raise NotImplementedError('Subannotations are not supported with LogDog '
301 'output.')
302
303 stream = self.StepStream.create(self, self._astate.create_step(step_config))
304 self._check()
305 return stream
306
307 def open(self):
308 # Initialize our client, if ont is not provided.
martiniss 2016/09/01 21:59:47 typo
dnj 2016/09/07 17:54:57 Done.
309 if self._client is None:
310 if self._streamserver_uri:
311 self._client = libs.logdog.stream.create(self._streamserver_uri)
312 else:
313 # Probe the stream client via Bootstrap.
314 bootstrap = libs.logdog.bootstrap.probe()
315 self._client = bootstrap.stream_client()
316
317 annotation_stream_name = self._name_base.append(self._annotation_name)
318 self._annotation_stream = self._client.open_datagram(
319 str(annotation_stream_name),
320 content_type=ANNOTATION_CONTENT_TYPE)
321
322 # Kick off our annotation transmission thread.
323 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream)
martiniss 2016/09/01 21:59:47 I don't understand why this says "thread", when we
dnj 2016/09/07 17:54:57 Old comment, and kind of. The annotation monitor u
324
325 # Initialize our open streams list.
326 self._streams.clear()
327
328 # Initialize our annotation state.
329 self._astate = _AnnotationState.create(self._name_base, env=self._env)
330 self._astate.base.status = pb.RUNNING
331 self._set_timestamp(self._astate.base.started)
332 self._check()
333
334 def close(self):
335 assert self._astate is not None, 'StreamEngine is not open.'
336
337 # Shut down any outstanding streams that may not have been closed for
338 # whatever reason.
339 for s in reversed(self._streams.values()):
martiniss 2016/09/01 21:59:47 why reversed?
dnj 2016/09/07 17:54:57 Streams created later should be closed before stre
340 s.close()
341
342 # Close out our root Step. Manually check annotation state afterwards.
343 self._close_step(self._astate.base)
344 self._check()
345
346 # Shut down our annotation monitor and close our annotation stream.
347 self._annotation_monitor.flush_and_join()
348 self._annotation_stream.close()
349
350 # Clear our client and state. We are now closed.
351 self._streams.clear()
352 self._client = None
353 self._astate = None
354
355 def _check(self):
356 if self._astate is None:
357 return
358
359 step_data = self._astate.check()
360 if step_data is not None:
361 self._annotation_monitor.signal_update(step_data)
362
363 def _set_timestamp(self, dst):
364 """Returns (timestamp_pb2.Timestamp): Containing the time from dt.
365
366 Args:
367 dt (datetime.datetime): the time to load into the timestamp.
368 """
369 dt = self._env.now
370
371 # Convert to milliseconds from epoch.
372 v = (dt - _EPOCH).total_seconds()
373
374 dst.seconds = int(v)
375 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos.
376
377 def _close_step(self, step):
378 """Closes a step, and any open substeps, propagating status.
379
380 If all of the substeps are already closed, this will do nothing. However, if
381 any are open, it will close them with an infra failure state.
382
383 If any substeps failed, the failure will be propagated to step.
384
385 Args:
386 step (pb.Step): The Step message to close.
387 """
388 # Close any open substeps, in case some of them didn't close.
389 failed = []
390 incomplete = []
391 for sub in step.substep:
392 if not sub.HasField('step'):
393 # Not an embedded substep.
394 continue
395
396 # Did this step actually complete? It should have, by now, so if it didn't
397 # we'll be reporting an infra failure in "step".
398 if sub.step.status not in (pb.SUCCESS, pb.FAILURE):
399 incomplete.append(sub.step)
400
401 # Close this substep. This may be a no-op, if the substep is already
402 # closed.
403 self._close_step(sub.step)
404
405 # If a substep failed, propagate its failure status to "step".
406 if sub.step.status == pb.FAILURE:
407 failed.append(sub.step)
408
409 # If we had any incomplete steps, mark that we failed.
410 if incomplete:
411 step.status = pb.FAILURE
412 if step.failure_details is None:
413 step.failure_details = pb.FailureDetails(
414 type=pb.FailureDetails.INFRA,
415 text='Some substeps did not complete: %s' % (
416 ', '.join(s.name for s in incomplete)),
417 )
418 elif failed:
419 step.status = pb.FAILURE
420 if step.failure_details is None:
421 # This step didn't successfully close, so propagate an infra failure.
422 step.failure_details = pb.FailureDetails(
423 type=pb.FailureDetails.GENERAL,
424 text='Some substeps failed: %s' % (
425 ', '.join(s.name for s in failed)),
426 )
427
428 # Now close "step". If it's still RUNNING, assume that it was successful.
429 if step.status == pb.RUNNING:
430 step.status = pb.SUCCESS
431 if not step.HasField('ended'):
432 self._set_timestamp(step.ended)
433
434
435
436 class _AnnotationMonitor(object):
martiniss 2016/09/01 21:59:47 Did you write this out of necessity, or before you
dnj 2016/09/07 17:54:57 Necessity. Sending a full annotation dump every it
437 """The owner of the annotation datagram stream, sending annotation updates in
438 a controlled manner and buffering them when the content hasn't changed.
439
440 By default, since annotation state can change rapidly, minor annotation
441 changes are throttled such that they are only actually sent periodically.
442
443 New annotation state updates can be installed by calling `signal_update`.
444 After being started, the _AnnotationMonitor thread must be shut down by
445 calling its `flush_and_join` method.
446 """
447
448 # Flush interval for non-structural events.
449 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30)
450
451 def __init__(self, fd, flush_period=None):
452 self._fd = fd
453 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD
454
455 # The following group of variables is protected by "_lock".
456 self._lock = threading.Lock()
457 self._current_data = None
458 self._flush_timer = None
459 self._last_flush_time = None
460 self._last_flush_data = None
461
462 def signal_update(self, step_data, structural=False):
463 """Updates the annotation state with new step data.
464
465 This updates our state to include new step data. The annotation monitor
466 thread will pick this up and dispatch it, either:
467 - Eventually, when the flush period completes, or
468 - Immediately, if this is a structural change.
469
470 Args:
471 step_data (str): The updated binary annotation protobuf step data.
472 structural (bool): If True, this is a structural update and should be
473 pushed immediately.
474 """
475 with self._lock:
476 # Did our data actually change?
477 if step_data == self._last_flush_data:
478 # Nope, leave things as-is.
479 return
480
481 # This is new data. Is it structural? If so, flush immediately.
482 # If not, make sure our timer is running so it will eventually be flushed.
483 # Note that the timer may also suggest that we flush immediately.
484 now = datetime.datetime.now()
485 self._current_data = step_data
486 if structural or self._set_flush_timer_locked(now):
487 # We should flush immediately.
488 self._flush_now_locked(now)
489
490 def flush_and_join(self):
491 """Flushes any remaining updates and blocks until the monitor is complete.
492 """
493 # Mark that we're finished and signal our event.
494 with self._lock:
495 self._flush_now_locked(datetime.datetime.now())
496
497 @property
498 def latest(self):
499 with self._lock:
500 return self._last_flush_data
501
502 def _flush_now_locked(self, now):
503 # Clear any current flush timer.
504 self._clear_flush_timer_locked()
505
506 # Record this flush.
507 #
508 # We set the last flush time to now because even if we don't actually send
509 # data, we have responded to the flush edge.
510 flush_data, self._current_data = self._current_data, None
511 self._last_flush_time = now
512
513 # If the data hasn't changed since the last flush, then don't actually
514 # do anything.
515 if flush_data is None or flush_data == self._last_flush_data:
516 return
517
518 self._last_flush_data = flush_data
519 self._fd.send(flush_data)
520
521 def _clear_flush_timer_locked(self):
522 if self._flush_timer is not None:
523 self._flush_timer.cancel()
524 self._flush_timer = None
525
526 def _set_flush_timer_locked(self, now):
527 if self._flush_timer is not None:
528 # Our flush timer is already running.
529 return False
530
531 if self._last_flush_time is None:
532 # We have never flushed before, so flush immediately.
533 return True
534
535 deadline = self._last_flush_time + self._flush_period
536 if deadline <= now:
537 # We're past our flush deadline, and should flush immediately.
538 return True
539
540 # Start our flush timer.
541 self._flush_timer = threading.Timer((deadline - now).total_seconds(),
542 self._flush_timer_expired)
543 self._flush_timer.daemon = True
544 self._flush_timer.start()
545
546 def _flush_timer_expired(self):
547 with self._lock:
548 self._flush_now_locked(datetime.datetime.now())
549
550
551 class _AnnotationState(object):
552 """Manages an outer Milo annotation protobuf Step."""
553
554 Step = collections.namedtuple('Step', (
555 'msg', 'stream_name_base', 'substream_name_index'))
556
557 def __init__(self, base_step, stream_name_base):
558 self._base = self.Step(
559 msg=base_step,
560 stream_name_base=stream_name_base,
561 substream_name_index={})
562 self._check_snapshot = None
563
564 # The current step stack. This is built by updating state after new steps'
565 # nesting levels.
566 self._nest_stack = [self._base]
567
568 # Index initial properties.
569 self._properties = dict((p.name, p) for p in self._base.msg.property)
570
571 @classmethod
572 def create(cls, annotation_name, env=None, properties=None):
573 base = pb.Step()
574 base.name = 'steps'
575 base.status = pb.PENDING
576 if env:
577 if env.argv:
578 base.command.command_line.extend(env.argv)
579 if env.cwd:
580 base.command.cwd = env.cwd
581 if env.environ:
582 base.command.environ.update(env.environ)
583 if properties:
584 for k, v in sorted(properties.iteritems()):
585 base.property.add(name=k, value=v)
586 return cls(base, annotation_name)
587
588 @property
589 def base(self):
590 return self._base.msg
591
592 def check(self):
593 """Checks if the annotation state has been updated and, if so, returns it.
594
595 After check returns, the latest annotation state will be used as the current
596 snapshot for future checks.
597
598 Returns (str/None): A serialized binary Step protobuf if modified, None
599 otherwise.
600 """
601 if self._check_snapshot is None or self._check_snapshot != self.base:
602 self._check_snapshot = copy.deepcopy(self.base)
603 return self._check_snapshot.SerializeToString()
604 return None
605
606 def create_step(self, step_config):
607 # Identify our parent Step by examining the nesting level. The first step
608 # in the nest stack will always be the base (nesting level "-1", since it's
609 # the parent of level 0). Since the step's "nest_level" is one more than the
610 # parent, and we need to offset by 1 to reach the stack index, they cancel
611 # each other out, so the nest level is the same as the parent's stack index.
612 assert step_config.nest_level < len(self._nest_stack), (
613 'Invalid nest level %d (highest is %d)' % (
614 step_config.nest_level, len(self._nest_stack)-1))
615
616 # Clear any items in the nest stack that are deeper than the current
617 # element.
618 del(self._nest_stack[step_config.nest_level+1:])
619 parent = self._nest_stack[-1]
620
621 # Create a stream name for this step. Even though step names are unique,
622 # the normalized LogDog step name may overlap with a different step name.
623 # We keep track of the step names we've issued to this step space and
624 # add indexes if a conflict is identified.
625 stream_name_base = parent.stream_name_base.append('steps',
626 step_config.base_name)
627 index = parent.substream_name_index.setdefault(str(stream_name_base), 0)
628 parent.substream_name_index[str(stream_name_base)] += 1
629 if index > 0:
630 stream_name_base += '_%d' % (index,)
631
632 # Create and populate our new step.
633 msg = parent.msg.substep.add().step
634 msg.name = step_config.base_name
635 msg.status = pb.PENDING
636 if step_config.cmd:
637 msg.command.command_line.extend(step_config.cmd)
638 if step_config.cwd:
639 msg.command.cwd = step_config.cwd
640 if step_config.env:
641 msg.command.environ = step_config.env
642
643 step = self.Step(
644 msg=msg,
645 stream_name_base=stream_name_base,
646 substream_name_index={})
647 self._nest_stack.append(step)
648 return step
649
650 def update_properties(self, **kwargs):
651 """Updates a set's property values to incorporate kwargs."""
652 for k, v in sorted(kwargs.iteritems()):
653 cur = self._properties.get(k)
654 if cur is None:
655 cur = self.base.property.add(name=k, value=str(v))
656 self._properties[k] = cur
657 continue
658
659 # A Property message already exists for this key, so update its value.
660 if cur.value != v:
661 cur.value = str(v)
662
663
664 class _StreamName(object):
665 """An immutable validated wrapper for a LogDog stream name."""
666
667 def __init__(self, base):
668 if base is not None:
669 libs.logdog.streamname.validate_stream_name(base)
670 self._base = base
671
672 def append(self, *components):
673 """Returns (_StreamName): A new _StreamName instance with components added.
674
675 Each component in "components" will become a new normalized stream name
676 component. Conseqeuntly, any separators (/) in the components will be
677 replaced with underscores.
678
679 Args:
680 components: the path components to append to this _StreamName.
681 """
682 if len(components) == 0:
683 return self
684
685 components = [self._normalize(self._flatten(p))
686 for p in reversed(components)]
687 if self._base:
688 components.append(self._base)
689 return type(self)('/'.join(reversed(components)))
690
691 def augment(self, val):
692 """Returns (_StreamName): A new _StreamName with "val" appended.
693
694 This generates a new, normalized _StreamName with the contents of "val"
695 appended to the end. For example:
696
697 Original: "foo/bar"
698 Append "baz qux": "foo/barbaz_qux"
699 """
700 if not val:
701 return self
702 val = self._flatten(val)
703 if self._base:
704 val = self._base + val
705 return type(self)(self._normalize(val))
706
707 def __iadd__(self, val):
708 return self.augment(val)
709
710 @staticmethod
711 def _flatten(v):
712 return v.replace('/', '_')
713
714 @staticmethod
715 def _normalize(v):
716 return libs.logdog.streamname.normalize(v, prefix='s_')
717
718 def __str__(self):
719 if not self._base:
720 raise ValueError('Cannot generate string from empty StreamName.')
721 return self._base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698