Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(816)

Side by Side Diff: recipe_engine/stream_logdog.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Updated with comments, added test, proto 3.0.2. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/stream.py ('k') | recipe_engine/third_party/client-py/libs/logdog/stream.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """stream.StreamEngine implementation for LogDog, using Milo annotation
6 protobuf.
7 """
8
9 import collections
10 import contextlib
11 import copy
12 import datetime
13 import os
14 import threading
15 import sys
16
17 from . import env
18 from . import stream
19 from . import util
20
21 import google.protobuf.message
22 import google.protobuf.timestamp_pb2 as timestamp_pb2
23 import libs.logdog.bootstrap
24 import libs.logdog.stream
25 import libs.logdog.streamname
26 import annotations_pb2 as pb
27
28
29 # The datetime for the epoch.
30 _EPOCH = datetime.datetime.utcfromtimestamp(0)
31
32 # The annotation stream ContentType.
33 #
34 # This must match the ContentType for the annotation binary protobuf, which
35 # is specified in "<luci-go>/common/proto/milo/util.go".
36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2'
37
38
39 class _Environment(object):
40 """Simulated system environment. The StreamEngine uses this to probe for
41 system parameters. By default, the environment will be derived from the
42 actual system.
43 """
44
45 def __init__(self, now_fn, argv, environ, cwd):
46 self._now_fn = now_fn
47 self.argv = argv
48 self.environ = environ
49 self.cwd = cwd
50
51 @property
52 def now(self):
53 return self._now_fn()
54
55 @classmethod
56 def probe(cls):
57 return cls(
58 now_fn=datetime.datetime.now,
59 argv=sys.argv[:],
60 environ=dict(os.environ),
61 cwd=os.getcwd(),
62 )
63
64
65 class StreamEngine(stream.StreamEngine):
66 """A stream.StreamEngine implementation that uses Logdog streams and Milo
67 annotation protobufs.
68
69 The generated LogDog streams will be (relative to "name_base"):
70 /annotations
71 The base annotations stream.
72
73 /steps/<step_name>/
74 Base stream name for a given step. Note that if multiple steps normalize
75 to the same <step_name> value, an index will be appended, such as
76 <step_name>_0. This can happen because the stream name component of a
77 step is normalized, so two validly-independent steps ("My Thing" and
78 "My_Thing") will both normalize to "My_Thing". In this case, the second
79 one would have the stream name component, "My_Thing_1".
80
81 /steps/<step_name>/stdout
82 STDOUT stream for step "<step_name>".
83 /steps/<step_name>/stderr
84 STDOUT stream for step "<step_name>".
85
86 /steps/<step_name>/logs/<log_name>/<log_name_index>
87 Stream name for a given step's logs. <log_name_index> is the index of the
88 log with the given normalized name. This is similar to <step_name>, only
89 the index is added as a separate stream name component.
90 """
91
92 # The name of the annotation stream.
93 ANNOTATION_NAME = 'annotations'
94
95
96 class TextStream(stream.StreamEngine.Stream):
97
98 def __init__(self, fd):
99 super(StreamEngine.TextStream, self).__init__()
100 self._fd = fd
101
102 ##
103 # Implement stream.StreamEngine.Stream
104 ##
105
106 def write_line(self, line):
107 self._fd.write(line)
108 self._fd.write('\n')
109
110 def write_split(self, string):
111 self._fd.write(string)
112 if not string.endswith('\n'):
113 self._fd.write('\n')
114
115 def close(self):
116 self._fd.close()
117
118
119 class StepStream(stream.StreamEngine.StepStream):
120 """An individual step stream."""
121
122 def __init__(self, engine, step):
123 # We will lazily create the STDOUT stream when the first data is written.
124 super(StreamEngine.StepStream, self).__init__()
125
126 self._engine = engine
127 self._step = step
128
129 # We keep track of the log streams associated with this step.
130 self._log_stream_index = {}
131
132 # We will lazily instantiate our stdout stream when content is actually
133 # written to it.
134 self._stdout_stream = None
135
136 # The retained step summary text. When generating failure details, this
137 # will be consumed to populate their text field.
138 self._summary_text = None
139
140 @classmethod
141 def create(cls, engine, step):
142 stream = cls(engine, step)
143
144 # Start our step.
145 stream._step.msg.status = pb.RUNNING
146 engine._set_timestamp(stream._step.msg.started)
147
148 return stream
149
150 def _check_annotation_changed(fn):
martiniss 2016/09/27 19:24:48 Doesn't pylint complain about this?
dnj 2016/09/27 23:13:58 Hmm, it does. I'll move it out. I have run pylint
martiniss 2016/09/30 20:41:44 Yeah, it isn't :(
151 """Decorator that can be applied to a StepStream instance method to call
152 our bound Engine's _check after the function is finished.
153
154 This should decorate any method that modifies annotation state.
155 """
156 def check_after(inner, *args, **kwargs):
157 v = fn(inner, *args, **kwargs)
158 inner._engine._check()
159 return v
160 return check_after
161
162 @_check_annotation_changed
163 def _get_stdout(self):
164 if self._stdout_stream is None:
165 # Create a new STDOUT text stream.
166 stream_name = self._step.stream_name_base.append('stdout')
167 self._stdout_stream = self._engine._client.open_text(str(stream_name))
168
169 self._step.msg.stdout_stream.name = str(stream_name)
170 return self._stdout_stream
171
172 ##
173 # Implement stream.StreamEngine.Stream
174 ##
175
176 def write_line(self, line):
177 stdout = self._get_stdout()
178 stdout.write(line)
179 stdout.write('\n')
180
181 def write_split(self, string):
182 stdout = self._get_stdout()
183 stdout.write(string)
184 if not string.endswith('\n'):
185 stdout.write('\n')
186
187 @_check_annotation_changed
188 def close(self):
189 if self._stdout_stream is not None:
190 self._stdout_stream.close()
191
192 # If we still have retained summary text, a failure, and no failure detail
193 # text, copy it there.
194 if self._summary_text is not None:
195 if (self._step.msg.HasField('failure_details') and
196 not self._step.msg.failure_details.text):
197 self._step.msg.failure_details.text = self._summary_text
198
199 # Close our Step.
200 self._engine._close_step(self._step.msg)
201
202 ##
203 # Implement stream.StreamEngine.StepStream
204 ##
205
206 @_check_annotation_changed
207 def new_log_stream(self, log_name):
208 # Generate the base normalized log stream name for this log.
209 stream_name = self._step.stream_name_base.append('logs', log_name)
210
211 # Add the log stream index to the end of the stream name.
212 index = self._log_stream_index.setdefault(str(stream_name), 0)
213 self._log_stream_index[str(stream_name)] = index + 1
214 stream_name = stream_name.append(str(index))
215
216 # Create a new log stream for this name.
217 fd = self._engine._client.open_text(str(stream_name))
218
219 # Update our step to include the log stream.
220 link = self._step.msg.other_links.add(label=log_name)
221 link.logdog_stream.name = str(stream_name)
222
223 return self._engine.TextStream(fd)
224
225 @_check_annotation_changed
226 def add_step_text(self, text):
227 self._step.msg.text.append(text)
228
229 @_check_annotation_changed
230 def add_step_summary_text(self, text):
231 self._step.msg.text.insert(0, text)
232 self._summary_text = text
233
234 @_check_annotation_changed
235 def add_step_link(self, name, url):
236 self._step.msg.other_links.add(label=name, url=url)
237
238 def reset_subannotation_state(self):
239 pass
240
241 @_check_annotation_changed
242 def set_step_status(self, status):
243 if status == 'SUCCESS':
244 self._step.msg.status = pb.SUCCESS
245 elif status == 'WARNING':
246 self._step.msg.status = pb.SUCCESS
247 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL
248 elif status == 'FAILURE':
249 self._step.msg.status = pb.FAILURE
250 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL
251 elif status == 'EXCEPTION':
252 self._step.msg.status = pb.FAILURE
253 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION
254 else:
255 raise ValueError('Unknown status [%s]' % (status,))
256
257 @_check_annotation_changed
258 def set_build_property(self, key, value):
259 self._engine._anno.update_properties(key=value)
260
261 def trigger(self, trigger_spec):
262 if self._engine._ignore_triggers:
263 return
264 raise NotImplementedError(
265 'Stream-based triggering is not supported for LogDog. Please use '
266 'a recipe module (e.g., buildbucket) directly for build scheduling.')
267
268
269 def __init__(self, client=None, streamserver_uri=None, name_base=None,
270 ignore_triggers=False, env=None):
271 """Initializes a new LogDog/Annotation StreamEngine.
272
273 Args:
274 client (libs.logdog.stream.StreamClient or None): the LogDog stream client
275 to use. If this is None, a new StreamClient will be instantiated when
276 this StreamEngine is opened.
277 streamserver_uri (str or None): The LogDog Butler stream server URI. See
278 LogDog client library docs for details on supported protocols and
279 format. This will only be used when "client" is None. If this is also
280 None, a StreamClient will be created through probing.
281 name_base (str or None): The default stream name prefix that will be added
282 to generated LogDog stream names. If None, no prefix will be applied.
283 ignore_triggers (bool): Triggers are not supported in LogDog annotation
284 streams. If True, attempts to trigger will be silently ignored. If
285 False, they will cause a NotImplementedError to be raised.
286 env (_Environment or None): The _Environment instance to use for
287 operations. This will be None at production, but can be overridden
288 here for testing.
289 """
290
291 self._client = client
292 self._streamserver_uri = streamserver_uri
293 self._name_base = _StreamName(name_base)
294 self._ignore_triggers = ignore_triggers
295 self._env = env or _Environment.probe()
296
297 self._astate = None
298
299 self._streams = collections.OrderedDict()
300
301 def new_step_stream(self, step_config):
302 # TODO(dnj): In the current iteration, subannotations are NOT supported.
martiniss 2016/09/27 19:24:48 The subannotation feature is used by chromium I th
dnj 2016/09/27 23:13:58 Used by CrOS too.
303 # In order to support them, they would have to be parsed out of the stream
304 # and converted into Milo Annotation protobuf. This is a non-trivial effort
305 # and may be a waste of time, as in a LogDog-enabled world, the component
306 # emitting sub-annotations would actually just create its own annotation
307 # stream and emit its own Milo protobuf.
308 #
309 # Components that emit subannotations and don't want to be converted to use
310 # LogDog streams could bootstrap themselves through Annotee and let it do
311 # the work.
312 #
313 # For now, though, we explicitly do NOT support LogDog running with
314 # subannotations enabled.
315 if step_config.allow_subannotations:
316 raise NotImplementedError('Subannotations are not supported with LogDog '
317 'output.')
318
319 stream = self.StepStream.create(self, self._astate.create_step(step_config))
320 self._check()
321 return stream
322
323 def open(self):
324 # Initialize our client, if one is not provided.
325 if self._client is None:
326 if self._streamserver_uri:
327 self._client = libs.logdog.stream.create(self._streamserver_uri)
328 else:
329 # Probe the stream client via Bootstrap.
330 bootstrap = libs.logdog.bootstrap.probe()
331 self._client = bootstrap.stream_client()
332
333 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME)
334 self._annotation_stream = self._client.open_datagram(
335 str(annotation_stream_name),
336 content_type=ANNOTATION_CONTENT_TYPE)
337
338 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream)
339
340 # Initialize our open streams list.
341 self._streams.clear()
342
343 # Initialize our annotation state.
344 self._astate = _AnnotationState.create(self._name_base, env=self._env)
345 self._astate.base.status = pb.RUNNING
346 self._set_timestamp(self._astate.base.started)
347 self._check()
348
349 def close(self):
350 assert self._astate is not None, 'StreamEngine is not open.'
351
352 # Shut down any outstanding streams that may not have been closed for
353 # whatever reason.
354 for s in reversed(self._streams.values()):
355 s.close()
356
357 # Close out our root Step. Manually check annotation state afterwards.
358 self._close_step(self._astate.base)
359 self._check()
360
361 # Shut down our annotation monitor and close our annotation stream.
362 self._annotation_monitor.flush_and_join()
363 self._annotation_stream.close()
364
365 # Clear our client and state. We are now closed.
366 self._streams.clear()
367 self._client = None
368 self._astate = None
369
370 def _check(self):
371 if self._astate is None:
372 return
373
374 step_data = self._astate.check()
375 if step_data is not None:
376 self._annotation_monitor.signal_update(step_data)
377
378 def _set_timestamp(self, dst):
martiniss 2016/09/27 19:24:48 I don't understand this function. What's the point
dnj 2016/09/27 23:13:58 It loads data into the protobuf. I've updated the
379 """Returns (timestamp_pb2.Timestamp): Containing the time from dt.
380
381 Args:
382 dt (datetime.datetime): the time to load into the timestamp.
martiniss 2016/09/27 19:24:48 Args are wrong.
dnj 2016/09/27 23:13:58 Acknowledged.
383 """
384 dt = self._env.now
385
386 # Convert to milliseconds from epoch.
387 v = (dt - _EPOCH).total_seconds()
388
389 dst.seconds = int(v)
390 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos.
391
392 def _close_step(self, step):
393 """Closes a step, and any open substeps, propagating status.
394
395 If all of the substeps are already closed, this will do nothing. However, if
396 any are open, it will close them with an infra failure state.
397
398 If any substeps failed, the failure will be propagated to step.
399
400 Args:
401 step (pb.Step): The Step message to close.
402 """
403 # Close any open substeps, in case some of them didn't close.
404 failed = []
405 incomplete = []
406 for sub in step.substep:
407 if not sub.HasField('step'):
408 # Not an embedded substep.
409 continue
410
411 # Did this step actually complete? It should have, by now, so if it didn't
412 # we'll be reporting an infra failure in "step".
413 if sub.step.status not in (pb.SUCCESS, pb.FAILURE):
414 incomplete.append(sub.step)
415
416 # Close this substep. This may be a no-op, if the substep is already
417 # closed.
418 self._close_step(sub.step)
419
420 # If a substep failed, propagate its failure status to "step".
421 if sub.step.status == pb.FAILURE:
422 failed.append(sub.step)
423
424 # If we had any incomplete steps, mark that we failed.
425 if incomplete:
426 step.status = pb.FAILURE
427 if step.failure_details is None:
428 step.failure_details = pb.FailureDetails(
429 type=pb.FailureDetails.INFRA,
430 text='Some substeps did not complete: %s' % (
431 ', '.join(s.name for s in incomplete)),
432 )
433 elif failed:
434 step.status = pb.FAILURE
435 if step.failure_details is None:
436 # This step didn't successfully close, so propagate an infra failure.
437 step.failure_details = pb.FailureDetails(
438 type=pb.FailureDetails.GENERAL,
439 text='Some substeps failed: %s' % (
440 ', '.join(s.name for s in failed)),
441 )
442
443 # Now close "step". If it's still RUNNING, assume that it was successful.
444 if step.status == pb.RUNNING:
445 step.status = pb.SUCCESS
446 if not step.HasField('ended'):
447 self._set_timestamp(step.ended)
448
449
450
451 class _AnnotationMonitor(object):
452 """The owner of the annotation datagram stream, sending annotation updates in
453 a controlled manner and buffering them when the content hasn't changed.
454
455 By default, since annotation state can change rapidly, minor annotation
456 changes are throttled such that they are only actually sent periodically.
457
458 New annotation state updates can be installed by calling `signal_update`.
459 After being started, the _AnnotationMonitor thread must be shut down by
460 calling its `flush_and_join` method.
461 """
462
463 # Flush interval for non-structural events.
464 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30)
465
466 def __init__(self, fd, flush_period=None):
467 self._fd = fd
468 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD
469
470 # The following group of variables is protected by "_lock".
471 self._lock = threading.Lock()
472 self._current_data = None
473 self._flush_timer = None
474 self._last_flush_time = None
475 self._last_flush_data = None
476
477 def signal_update(self, step_data, structural=False):
478 """Updates the annotation state with new step data.
479
480 This updates our state to include new step data. The annotation monitor
481 thread will pick this up and dispatch it, either:
482 - Eventually, when the flush period completes, or
483 - Immediately, if this is a structural change.
484
485 Args:
486 step_data (str): The updated binary annotation protobuf step data.
487 structural (bool): If True, this is a structural update and should be
488 pushed immediately.
489 """
490 with self._lock:
491 # Did our data actually change?
492 if step_data == self._last_flush_data:
493 # Nope, leave things as-is.
494 return
495
496 # This is new data. Is it structural? If so, flush immediately.
497 # If not, make sure our timer is running so it will eventually be flushed.
498 # Note that the timer may also suggest that we flush immediately.
499 now = datetime.datetime.now()
500 self._current_data = step_data
501 if structural or self._set_flush_timer_locked(now):
502 # We should flush immediately.
503 self._flush_now_locked(now)
504
505 def flush_and_join(self):
506 """Flushes any remaining updates and blocks until the monitor is complete.
507 """
508 # Mark that we're finished and signal our event.
509 with self._lock:
510 self._flush_now_locked(datetime.datetime.now())
511
512 @property
513 def latest(self):
514 with self._lock:
515 return self._last_flush_data
516
517 def _flush_now_locked(self, now):
518 # Clear any current flush timer.
519 self._clear_flush_timer_locked()
520
521 # Record this flush.
522 #
523 # We set the last flush time to now because even if we don't actually send
524 # data, we have responded to the flush edge.
525 flush_data, self._current_data = self._current_data, None
526 self._last_flush_time = now
527
528 # If the data hasn't changed since the last flush, then don't actually
529 # do anything.
530 if flush_data is None or flush_data == self._last_flush_data:
martiniss 2016/09/27 19:24:48 This means that duplicate data entries get ignored
dnj 2016/09/27 23:13:58 Probably not frequently, but no harm in checking.
531 return
532
533 self._last_flush_data = flush_data
534 self._fd.send(flush_data)
535
536 def _clear_flush_timer_locked(self):
537 if self._flush_timer is not None:
538 self._flush_timer.cancel()
539 self._flush_timer = None
540
541 def _set_flush_timer_locked(self, now):
542 if self._flush_timer is not None:
543 # Our flush timer is already running.
544 return False
545
546 if self._last_flush_time is None:
547 # We have never flushed before, so flush immediately.
548 return True
549
550 deadline = self._last_flush_time + self._flush_period
551 if deadline <= now:
552 # We're past our flush deadline, and should flush immediately.
553 return True
554
555 # Start our flush timer.
556 self._flush_timer = threading.Timer((deadline - now).total_seconds(),
557 self._flush_timer_expired)
558 self._flush_timer.daemon = True
559 self._flush_timer.start()
560
561 def _flush_timer_expired(self):
562 with self._lock:
563 self._flush_now_locked(datetime.datetime.now())
564
565
566 class _AnnotationState(object):
567 """Manages an outer Milo annotation protobuf Step."""
568
569 Step = collections.namedtuple('Step', (
570 'msg', 'stream_name_base', 'substream_name_index'))
571
572 def __init__(self, base_step, stream_name_base):
573 self._base = self.Step(
574 msg=base_step,
575 stream_name_base=stream_name_base,
576 substream_name_index={})
577 self._check_snapshot = None
578
579 # The current step stack. This is built by updating state after new steps'
580 # nesting levels.
581 self._nest_stack = [self._base]
582
583 # Index initial properties.
584 self._properties = dict((p.name, p) for p in self._base.msg.property)
585
586 @classmethod
587 def create(cls, stream_name_base, env=None, properties=None):
588 base = pb.Step()
589 base.name = 'steps'
590 base.status = pb.PENDING
591 if env:
592 if env.argv:
593 base.command.command_line.extend(env.argv)
594 if env.cwd:
595 base.command.cwd = env.cwd
596 if env.environ:
597 base.command.environ.update(env.environ)
598 if properties:
599 for k, v in sorted(properties.iteritems()):
600 base.property.add(name=k, value=v)
601 return cls(base, stream_name_base)
602
603 @property
604 def base(self):
605 return self._base.msg
606
607 def check(self):
608 """Checks if the annotation state has been updated and, if so, returns it.
609
610 After check returns, the latest annotation state will be used as the current
611 snapshot for future checks.
612
613 Returns (str/None): A serialized binary Step protobuf if modified, None
614 otherwise.
615 """
616 if self._check_snapshot is None or self._check_snapshot != self.base:
617 self._check_snapshot = copy.deepcopy(self.base)
618 return self._check_snapshot.SerializeToString()
619 return None
620
621 def create_step(self, step_config):
622 # Identify our parent Step by examining the nesting level. The first step
623 # in the nest stack will always be the base (nesting level "-1", since it's
624 # the parent of level 0). Since the step's "nest_level" is one more than the
625 # parent, and we need to offset by 1 to reach the stack index, they cancel
626 # each other out, so the nest level is the same as the parent's stack index.
627 assert step_config.nest_level < len(self._nest_stack), (
628 'Invalid nest level %d (highest is %d)' % (
629 step_config.nest_level, len(self._nest_stack)-1))
630
631 # Clear any items in the nest stack that are deeper than the current
632 # element.
633 del(self._nest_stack[step_config.nest_level+1:])
634 parent = self._nest_stack[-1]
635
636 # Create a stream name for this step. Even though step names are unique,
637 # the normalized LogDog step name may overlap with a different step name.
638 # We keep track of the step names we've issued to this step space and
639 # add indexes if a conflict is identified.
640 stream_name_base = parent.stream_name_base.append('steps',
641 step_config.base_name)
642 index = parent.substream_name_index.setdefault(str(stream_name_base), 0)
643 parent.substream_name_index[str(stream_name_base)] += 1
644 if index > 0:
645 stream_name_base += '_%d' % (index,)
646
647 # Create and populate our new step.
648 msg = parent.msg.substep.add().step
649 msg.name = step_config.base_name
650 msg.status = pb.PENDING
651 if step_config.cmd:
652 msg.command.command_line.extend(step_config.cmd)
653 if step_config.cwd:
654 msg.command.cwd = step_config.cwd
655 if step_config.env:
656 msg.command.environ = step_config.env
657
658 step = self.Step(
659 msg=msg,
660 stream_name_base=stream_name_base,
661 substream_name_index={})
662 self._nest_stack.append(step)
663 return step
664
665 def update_properties(self, **kwargs):
666 """Updates a set's property values to incorporate kwargs."""
667 for k, v in sorted(kwargs.iteritems()):
668 cur = self._properties.get(k)
669 if cur is None:
670 cur = self.base.property.add(name=k, value=str(v))
671 self._properties[k] = cur
672 continue
673
674 # A Property message already exists for this key, so update its value.
675 if cur.value != v:
676 cur.value = str(v)
677
678
679 class _StreamName(object):
680 """An immutable validated wrapper for a LogDog stream name."""
681
682 def __init__(self, base):
683 if base is not None:
684 libs.logdog.streamname.validate_stream_name(base)
685 self._base = base
686
687 def append(self, *components):
688 """Returns (_StreamName): A new _StreamName instance with components added.
689
690 Each component in "components" will become a new normalized stream name
691 component. Conseqeuntly, any separators (/) in the components will be
692 replaced with underscores.
693
694 Args:
695 components: the path components to append to this _StreamName.
696 """
697 if len(components) == 0:
698 return self
699
700 components = [self._normalize(self._flatten(p))
701 for p in reversed(components)]
702 if self._base:
703 components.append(self._base)
704 return type(self)('/'.join(reversed(components)))
705
706 def augment(self, val):
707 """Returns (_StreamName): A new _StreamName with "val" appended.
708
709 This generates a new, normalized _StreamName with the contents of "val"
710 appended to the end. For example:
711
712 Original: "foo/bar"
713 Append "baz qux": "foo/barbaz_qux"
714 """
715 if not val:
716 return self
717 val = self._flatten(val)
718 if self._base:
719 val = self._base + val
720 return type(self)(self._normalize(val))
721
722 def __iadd__(self, val):
723 return self.augment(val)
724
725 @staticmethod
726 def _flatten(v):
727 return v.replace('/', '_')
728
729 @staticmethod
730 def _normalize(v):
731 return libs.logdog.streamname.normalize(v, prefix='s_')
732
733 def __str__(self):
734 if not self._base:
735 raise ValueError('Cannot generate string from empty StreamName.')
736 return self._base
OLDNEW
« no previous file with comments | « recipe_engine/stream.py ('k') | recipe_engine/third_party/client-py/libs/logdog/stream.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698