OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2015 The LUCI Authors. All rights reserved. | |
2 # Use of this source code is governed under the Apache License, Version 2.0 | |
3 # that can be found in the LICENSE file. | |
4 | |
5 """stream.StreamEngine implementation for LogDog, using Milo annotation | |
6 protobuf. | |
7 """ | |
8 | |
9 import collections | |
10 import contextlib | |
11 import copy | |
12 import datetime | |
13 import os | |
14 import threading | |
15 import sys | |
16 | |
17 from . import env | |
18 from . import stream | |
19 from . import util | |
20 | |
21 import google.protobuf.message | |
22 import google.protobuf.timestamp_pb2 as timestamp_pb2 | |
23 import libs.logdog.bootstrap | |
24 import libs.logdog.stream | |
25 import libs.logdog.streamname | |
26 import annotations_pb2 as pb | |
27 | |
28 | |
29 # The datetime for the epoch. | |
30 _EPOCH = datetime.datetime.utcfromtimestamp(0) | |
31 | |
32 # The annotation stream ContentType. | |
33 # | |
34 # This must match the ContentType for the annotation binary protobuf, which | |
35 # is specified in "<luci-go>/common/proto/milo/util.go". | |
36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2' | |
37 | |
38 | |
39 class _Environment(object): | |
40 """Simulated system environment. The StreamEngine uses this to probe for | |
41 system parameters. By default, the environment will be derived from the | |
42 actual system. | |
43 """ | |
44 | |
45 def __init__(self, now_fn, argv, environ, cwd): | |
46 self._now_fn = now_fn | |
47 self.argv = argv | |
48 self.environ = environ | |
49 self.cwd = cwd | |
50 | |
51 @property | |
52 def now(self): | |
53 return self._now_fn() | |
54 | |
55 @classmethod | |
56 def probe(cls): | |
57 return cls( | |
58 now_fn=datetime.datetime.now, | |
59 argv=sys.argv[:], | |
60 environ=dict(os.environ), | |
61 cwd=os.getcwd(), | |
62 ) | |
63 | |
64 | |
65 class StreamEngine(stream.StreamEngine): | |
66 """A stream.StreamEngine implementation that uses Logdog streams and Milo | |
67 annotation protobufs. | |
68 | |
69 The generated LogDog streams will be (relative to "name_base"): | |
70 /annotations | |
71 The base annotations stream. | |
72 | |
73 /steps/<step_name>/ | |
74 Base stream name for a given step. Note that if multiple steps normalize | |
75 to the same <step_name> value, an index will be appended, such as | |
76 <step_name>_0. This can happen because the stream name component of a | |
77 step is normalized, so two validly-independent steps ("My Thing" and | |
78 "My_Thing") will both normalize to "My_Thing". In this case, the second | |
79 one would have the stream name component, "My_Thing_1". | |
80 | |
81 /steps/<step_name>/stdout | |
82 STDOUT stream for step "<step_name>". | |
83 /steps/<step_name>/stderr | |
84 STDOUT stream for step "<step_name>". | |
85 | |
86 /steps/<step_name>/logs/<log_name>/<log_name_index> | |
87 Stream name for a given step's logs. <log_name_index> is the index of the | |
88 log with the given normalized name. This is similar to <step_name>, only | |
89 the index is added as a separate stream name component. | |
90 """ | |
91 | |
92 # The name of the annotation stream. | |
93 ANNOTATION_NAME = 'annotations' | |
94 | |
95 | |
96 class TextStream(stream.StreamEngine.Stream): | |
97 | |
98 def __init__(self, fd): | |
99 super(StreamEngine.TextStream, self).__init__() | |
100 self._fd = fd | |
101 | |
102 ## | |
103 # Implement stream.StreamEngine.Stream | |
104 ## | |
105 | |
106 def write_line(self, line): | |
107 self._fd.write(line) | |
108 self._fd.write('\n') | |
109 | |
110 def write_split(self, string): | |
111 self._fd.write(string) | |
112 if not string.endswith('\n'): | |
113 self._fd.write('\n') | |
114 | |
115 def close(self): | |
116 self._fd.close() | |
117 | |
118 | |
119 class StepStream(stream.StreamEngine.StepStream): | |
120 """An individual step stream.""" | |
121 | |
122 def __init__(self, engine, step): | |
123 # We will lazily create the STDOUT stream when the first data is written. | |
124 super(StreamEngine.StepStream, self).__init__() | |
125 | |
126 self._engine = engine | |
127 self._step = step | |
128 | |
129 # We keep track of the log streams associated with this step. | |
130 self._log_stream_index = {} | |
131 | |
132 # We will lazily instantiate our stdout stream when content is actually | |
133 # written to it. | |
134 self._stdout_stream = None | |
135 | |
136 # The retained step summary text. When generating failure details, this | |
137 # will be consumed to populate their text field. | |
138 self._summary_text = None | |
139 | |
140 @classmethod | |
141 def create(cls, engine, step): | |
142 stream = cls(engine, step) | |
143 | |
144 # Start our step. | |
145 stream._step.msg.status = pb.RUNNING | |
146 engine._set_timestamp(stream._step.msg.started) | |
147 | |
148 return stream | |
149 | |
150 def _check_annotation_changed(fn): | |
martiniss
2016/09/27 19:24:48
Doesn't pylint complain about this?
dnj
2016/09/27 23:13:58
Hmm, it does. I'll move it out. I have run pylint
martiniss
2016/09/30 20:41:44
Yeah, it isn't :(
| |
151 """Decorator that can be applied to a StepStream instance method to call | |
152 our bound Engine's _check after the function is finished. | |
153 | |
154 This should decorate any method that modifies annotation state. | |
155 """ | |
156 def check_after(inner, *args, **kwargs): | |
157 v = fn(inner, *args, **kwargs) | |
158 inner._engine._check() | |
159 return v | |
160 return check_after | |
161 | |
162 @_check_annotation_changed | |
163 def _get_stdout(self): | |
164 if self._stdout_stream is None: | |
165 # Create a new STDOUT text stream. | |
166 stream_name = self._step.stream_name_base.append('stdout') | |
167 self._stdout_stream = self._engine._client.open_text(str(stream_name)) | |
168 | |
169 self._step.msg.stdout_stream.name = str(stream_name) | |
170 return self._stdout_stream | |
171 | |
172 ## | |
173 # Implement stream.StreamEngine.Stream | |
174 ## | |
175 | |
176 def write_line(self, line): | |
177 stdout = self._get_stdout() | |
178 stdout.write(line) | |
179 stdout.write('\n') | |
180 | |
181 def write_split(self, string): | |
182 stdout = self._get_stdout() | |
183 stdout.write(string) | |
184 if not string.endswith('\n'): | |
185 stdout.write('\n') | |
186 | |
187 @_check_annotation_changed | |
188 def close(self): | |
189 if self._stdout_stream is not None: | |
190 self._stdout_stream.close() | |
191 | |
192 # If we still have retained summary text, a failure, and no failure detail | |
193 # text, copy it there. | |
194 if self._summary_text is not None: | |
195 if (self._step.msg.HasField('failure_details') and | |
196 not self._step.msg.failure_details.text): | |
197 self._step.msg.failure_details.text = self._summary_text | |
198 | |
199 # Close our Step. | |
200 self._engine._close_step(self._step.msg) | |
201 | |
202 ## | |
203 # Implement stream.StreamEngine.StepStream | |
204 ## | |
205 | |
206 @_check_annotation_changed | |
207 def new_log_stream(self, log_name): | |
208 # Generate the base normalized log stream name for this log. | |
209 stream_name = self._step.stream_name_base.append('logs', log_name) | |
210 | |
211 # Add the log stream index to the end of the stream name. | |
212 index = self._log_stream_index.setdefault(str(stream_name), 0) | |
213 self._log_stream_index[str(stream_name)] = index + 1 | |
214 stream_name = stream_name.append(str(index)) | |
215 | |
216 # Create a new log stream for this name. | |
217 fd = self._engine._client.open_text(str(stream_name)) | |
218 | |
219 # Update our step to include the log stream. | |
220 link = self._step.msg.other_links.add(label=log_name) | |
221 link.logdog_stream.name = str(stream_name) | |
222 | |
223 return self._engine.TextStream(fd) | |
224 | |
225 @_check_annotation_changed | |
226 def add_step_text(self, text): | |
227 self._step.msg.text.append(text) | |
228 | |
229 @_check_annotation_changed | |
230 def add_step_summary_text(self, text): | |
231 self._step.msg.text.insert(0, text) | |
232 self._summary_text = text | |
233 | |
234 @_check_annotation_changed | |
235 def add_step_link(self, name, url): | |
236 self._step.msg.other_links.add(label=name, url=url) | |
237 | |
238 def reset_subannotation_state(self): | |
239 pass | |
240 | |
241 @_check_annotation_changed | |
242 def set_step_status(self, status): | |
243 if status == 'SUCCESS': | |
244 self._step.msg.status = pb.SUCCESS | |
245 elif status == 'WARNING': | |
246 self._step.msg.status = pb.SUCCESS | |
247 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL | |
248 elif status == 'FAILURE': | |
249 self._step.msg.status = pb.FAILURE | |
250 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL | |
251 elif status == 'EXCEPTION': | |
252 self._step.msg.status = pb.FAILURE | |
253 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION | |
254 else: | |
255 raise ValueError('Unknown status [%s]' % (status,)) | |
256 | |
257 @_check_annotation_changed | |
258 def set_build_property(self, key, value): | |
259 self._engine._anno.update_properties(key=value) | |
260 | |
261 def trigger(self, trigger_spec): | |
262 if self._engine._ignore_triggers: | |
263 return | |
264 raise NotImplementedError( | |
265 'Stream-based triggering is not supported for LogDog. Please use ' | |
266 'a recipe module (e.g., buildbucket) directly for build scheduling.') | |
267 | |
268 | |
269 def __init__(self, client=None, streamserver_uri=None, name_base=None, | |
270 ignore_triggers=False, env=None): | |
271 """Initializes a new LogDog/Annotation StreamEngine. | |
272 | |
273 Args: | |
274 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | |
275 to use. If this is None, a new StreamClient will be instantiated when | |
276 this StreamEngine is opened. | |
277 streamserver_uri (str or None): The LogDog Butler stream server URI. See | |
278 LogDog client library docs for details on supported protocols and | |
279 format. This will only be used when "client" is None. If this is also | |
280 None, a StreamClient will be created through probing. | |
281 name_base (str or None): The default stream name prefix that will be added | |
282 to generated LogDog stream names. If None, no prefix will be applied. | |
283 ignore_triggers (bool): Triggers are not supported in LogDog annotation | |
284 streams. If True, attempts to trigger will be silently ignored. If | |
285 False, they will cause a NotImplementedError to be raised. | |
286 env (_Environment or None): The _Environment instance to use for | |
287 operations. This will be None at production, but can be overridden | |
288 here for testing. | |
289 """ | |
290 | |
291 self._client = client | |
292 self._streamserver_uri = streamserver_uri | |
293 self._name_base = _StreamName(name_base) | |
294 self._ignore_triggers = ignore_triggers | |
295 self._env = env or _Environment.probe() | |
296 | |
297 self._astate = None | |
298 | |
299 self._streams = collections.OrderedDict() | |
300 | |
301 def new_step_stream(self, step_config): | |
302 # TODO(dnj): In the current iteration, subannotations are NOT supported. | |
martiniss
2016/09/27 19:24:48
The subannotation feature is used by chromium I th
dnj
2016/09/27 23:13:58
Used by CrOS too.
| |
303 # In order to support them, they would have to be parsed out of the stream | |
304 # and converted into Milo Annotation protobuf. This is a non-trivial effort | |
305 # and may be a waste of time, as in a LogDog-enabled world, the component | |
306 # emitting sub-annotations would actually just create its own annotation | |
307 # stream and emit its own Milo protobuf. | |
308 # | |
309 # Components that emit subannotations and don't want to be converted to use | |
310 # LogDog streams could bootstrap themselves through Annotee and let it do | |
311 # the work. | |
312 # | |
313 # For now, though, we explicitly do NOT support LogDog running with | |
314 # subannotations enabled. | |
315 if step_config.allow_subannotations: | |
316 raise NotImplementedError('Subannotations are not supported with LogDog ' | |
317 'output.') | |
318 | |
319 stream = self.StepStream.create(self, self._astate.create_step(step_config)) | |
320 self._check() | |
321 return stream | |
322 | |
323 def open(self): | |
324 # Initialize our client, if one is not provided. | |
325 if self._client is None: | |
326 if self._streamserver_uri: | |
327 self._client = libs.logdog.stream.create(self._streamserver_uri) | |
328 else: | |
329 # Probe the stream client via Bootstrap. | |
330 bootstrap = libs.logdog.bootstrap.probe() | |
331 self._client = bootstrap.stream_client() | |
332 | |
333 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME) | |
334 self._annotation_stream = self._client.open_datagram( | |
335 str(annotation_stream_name), | |
336 content_type=ANNOTATION_CONTENT_TYPE) | |
337 | |
338 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream) | |
339 | |
340 # Initialize our open streams list. | |
341 self._streams.clear() | |
342 | |
343 # Initialize our annotation state. | |
344 self._astate = _AnnotationState.create(self._name_base, env=self._env) | |
345 self._astate.base.status = pb.RUNNING | |
346 self._set_timestamp(self._astate.base.started) | |
347 self._check() | |
348 | |
349 def close(self): | |
350 assert self._astate is not None, 'StreamEngine is not open.' | |
351 | |
352 # Shut down any outstanding streams that may not have been closed for | |
353 # whatever reason. | |
354 for s in reversed(self._streams.values()): | |
355 s.close() | |
356 | |
357 # Close out our root Step. Manually check annotation state afterwards. | |
358 self._close_step(self._astate.base) | |
359 self._check() | |
360 | |
361 # Shut down our annotation monitor and close our annotation stream. | |
362 self._annotation_monitor.flush_and_join() | |
363 self._annotation_stream.close() | |
364 | |
365 # Clear our client and state. We are now closed. | |
366 self._streams.clear() | |
367 self._client = None | |
368 self._astate = None | |
369 | |
370 def _check(self): | |
371 if self._astate is None: | |
372 return | |
373 | |
374 step_data = self._astate.check() | |
375 if step_data is not None: | |
376 self._annotation_monitor.signal_update(step_data) | |
377 | |
378 def _set_timestamp(self, dst): | |
martiniss
2016/09/27 19:24:48
I don't understand this function. What's the point
dnj
2016/09/27 23:13:58
It loads data into the protobuf. I've updated the
| |
379 """Returns (timestamp_pb2.Timestamp): Containing the time from dt. | |
380 | |
381 Args: | |
382 dt (datetime.datetime): the time to load into the timestamp. | |
martiniss
2016/09/27 19:24:48
Args are wrong.
dnj
2016/09/27 23:13:58
Acknowledged.
| |
383 """ | |
384 dt = self._env.now | |
385 | |
386 # Convert to milliseconds from epoch. | |
387 v = (dt - _EPOCH).total_seconds() | |
388 | |
389 dst.seconds = int(v) | |
390 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos. | |
391 | |
392 def _close_step(self, step): | |
393 """Closes a step, and any open substeps, propagating status. | |
394 | |
395 If all of the substeps are already closed, this will do nothing. However, if | |
396 any are open, it will close them with an infra failure state. | |
397 | |
398 If any substeps failed, the failure will be propagated to step. | |
399 | |
400 Args: | |
401 step (pb.Step): The Step message to close. | |
402 """ | |
403 # Close any open substeps, in case some of them didn't close. | |
404 failed = [] | |
405 incomplete = [] | |
406 for sub in step.substep: | |
407 if not sub.HasField('step'): | |
408 # Not an embedded substep. | |
409 continue | |
410 | |
411 # Did this step actually complete? It should have, by now, so if it didn't | |
412 # we'll be reporting an infra failure in "step". | |
413 if sub.step.status not in (pb.SUCCESS, pb.FAILURE): | |
414 incomplete.append(sub.step) | |
415 | |
416 # Close this substep. This may be a no-op, if the substep is already | |
417 # closed. | |
418 self._close_step(sub.step) | |
419 | |
420 # If a substep failed, propagate its failure status to "step". | |
421 if sub.step.status == pb.FAILURE: | |
422 failed.append(sub.step) | |
423 | |
424 # If we had any incomplete steps, mark that we failed. | |
425 if incomplete: | |
426 step.status = pb.FAILURE | |
427 if step.failure_details is None: | |
428 step.failure_details = pb.FailureDetails( | |
429 type=pb.FailureDetails.INFRA, | |
430 text='Some substeps did not complete: %s' % ( | |
431 ', '.join(s.name for s in incomplete)), | |
432 ) | |
433 elif failed: | |
434 step.status = pb.FAILURE | |
435 if step.failure_details is None: | |
436 # This step didn't successfully close, so propagate an infra failure. | |
437 step.failure_details = pb.FailureDetails( | |
438 type=pb.FailureDetails.GENERAL, | |
439 text='Some substeps failed: %s' % ( | |
440 ', '.join(s.name for s in failed)), | |
441 ) | |
442 | |
443 # Now close "step". If it's still RUNNING, assume that it was successful. | |
444 if step.status == pb.RUNNING: | |
445 step.status = pb.SUCCESS | |
446 if not step.HasField('ended'): | |
447 self._set_timestamp(step.ended) | |
448 | |
449 | |
450 | |
451 class _AnnotationMonitor(object): | |
452 """The owner of the annotation datagram stream, sending annotation updates in | |
453 a controlled manner and buffering them when the content hasn't changed. | |
454 | |
455 By default, since annotation state can change rapidly, minor annotation | |
456 changes are throttled such that they are only actually sent periodically. | |
457 | |
458 New annotation state updates can be installed by calling `signal_update`. | |
459 After being started, the _AnnotationMonitor thread must be shut down by | |
460 calling its `flush_and_join` method. | |
461 """ | |
462 | |
463 # Flush interval for non-structural events. | |
464 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30) | |
465 | |
466 def __init__(self, fd, flush_period=None): | |
467 self._fd = fd | |
468 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD | |
469 | |
470 # The following group of variables is protected by "_lock". | |
471 self._lock = threading.Lock() | |
472 self._current_data = None | |
473 self._flush_timer = None | |
474 self._last_flush_time = None | |
475 self._last_flush_data = None | |
476 | |
477 def signal_update(self, step_data, structural=False): | |
478 """Updates the annotation state with new step data. | |
479 | |
480 This updates our state to include new step data. The annotation monitor | |
481 thread will pick this up and dispatch it, either: | |
482 - Eventually, when the flush period completes, or | |
483 - Immediately, if this is a structural change. | |
484 | |
485 Args: | |
486 step_data (str): The updated binary annotation protobuf step data. | |
487 structural (bool): If True, this is a structural update and should be | |
488 pushed immediately. | |
489 """ | |
490 with self._lock: | |
491 # Did our data actually change? | |
492 if step_data == self._last_flush_data: | |
493 # Nope, leave things as-is. | |
494 return | |
495 | |
496 # This is new data. Is it structural? If so, flush immediately. | |
497 # If not, make sure our timer is running so it will eventually be flushed. | |
498 # Note that the timer may also suggest that we flush immediately. | |
499 now = datetime.datetime.now() | |
500 self._current_data = step_data | |
501 if structural or self._set_flush_timer_locked(now): | |
502 # We should flush immediately. | |
503 self._flush_now_locked(now) | |
504 | |
505 def flush_and_join(self): | |
506 """Flushes any remaining updates and blocks until the monitor is complete. | |
507 """ | |
508 # Mark that we're finished and signal our event. | |
509 with self._lock: | |
510 self._flush_now_locked(datetime.datetime.now()) | |
511 | |
512 @property | |
513 def latest(self): | |
514 with self._lock: | |
515 return self._last_flush_data | |
516 | |
517 def _flush_now_locked(self, now): | |
518 # Clear any current flush timer. | |
519 self._clear_flush_timer_locked() | |
520 | |
521 # Record this flush. | |
522 # | |
523 # We set the last flush time to now because even if we don't actually send | |
524 # data, we have responded to the flush edge. | |
525 flush_data, self._current_data = self._current_data, None | |
526 self._last_flush_time = now | |
527 | |
528 # If the data hasn't changed since the last flush, then don't actually | |
529 # do anything. | |
530 if flush_data is None or flush_data == self._last_flush_data: | |
martiniss
2016/09/27 19:24:48
This means that duplicate data entries get ignored
dnj
2016/09/27 23:13:58
Probably not frequently, but no harm in checking.
| |
531 return | |
532 | |
533 self._last_flush_data = flush_data | |
534 self._fd.send(flush_data) | |
535 | |
536 def _clear_flush_timer_locked(self): | |
537 if self._flush_timer is not None: | |
538 self._flush_timer.cancel() | |
539 self._flush_timer = None | |
540 | |
541 def _set_flush_timer_locked(self, now): | |
542 if self._flush_timer is not None: | |
543 # Our flush timer is already running. | |
544 return False | |
545 | |
546 if self._last_flush_time is None: | |
547 # We have never flushed before, so flush immediately. | |
548 return True | |
549 | |
550 deadline = self._last_flush_time + self._flush_period | |
551 if deadline <= now: | |
552 # We're past our flush deadline, and should flush immediately. | |
553 return True | |
554 | |
555 # Start our flush timer. | |
556 self._flush_timer = threading.Timer((deadline - now).total_seconds(), | |
557 self._flush_timer_expired) | |
558 self._flush_timer.daemon = True | |
559 self._flush_timer.start() | |
560 | |
561 def _flush_timer_expired(self): | |
562 with self._lock: | |
563 self._flush_now_locked(datetime.datetime.now()) | |
564 | |
565 | |
566 class _AnnotationState(object): | |
567 """Manages an outer Milo annotation protobuf Step.""" | |
568 | |
569 Step = collections.namedtuple('Step', ( | |
570 'msg', 'stream_name_base', 'substream_name_index')) | |
571 | |
572 def __init__(self, base_step, stream_name_base): | |
573 self._base = self.Step( | |
574 msg=base_step, | |
575 stream_name_base=stream_name_base, | |
576 substream_name_index={}) | |
577 self._check_snapshot = None | |
578 | |
579 # The current step stack. This is built by updating state after new steps' | |
580 # nesting levels. | |
581 self._nest_stack = [self._base] | |
582 | |
583 # Index initial properties. | |
584 self._properties = dict((p.name, p) for p in self._base.msg.property) | |
585 | |
586 @classmethod | |
587 def create(cls, stream_name_base, env=None, properties=None): | |
588 base = pb.Step() | |
589 base.name = 'steps' | |
590 base.status = pb.PENDING | |
591 if env: | |
592 if env.argv: | |
593 base.command.command_line.extend(env.argv) | |
594 if env.cwd: | |
595 base.command.cwd = env.cwd | |
596 if env.environ: | |
597 base.command.environ.update(env.environ) | |
598 if properties: | |
599 for k, v in sorted(properties.iteritems()): | |
600 base.property.add(name=k, value=v) | |
601 return cls(base, stream_name_base) | |
602 | |
603 @property | |
604 def base(self): | |
605 return self._base.msg | |
606 | |
607 def check(self): | |
608 """Checks if the annotation state has been updated and, if so, returns it. | |
609 | |
610 After check returns, the latest annotation state will be used as the current | |
611 snapshot for future checks. | |
612 | |
613 Returns (str/None): A serialized binary Step protobuf if modified, None | |
614 otherwise. | |
615 """ | |
616 if self._check_snapshot is None or self._check_snapshot != self.base: | |
617 self._check_snapshot = copy.deepcopy(self.base) | |
618 return self._check_snapshot.SerializeToString() | |
619 return None | |
620 | |
621 def create_step(self, step_config): | |
622 # Identify our parent Step by examining the nesting level. The first step | |
623 # in the nest stack will always be the base (nesting level "-1", since it's | |
624 # the parent of level 0). Since the step's "nest_level" is one more than the | |
625 # parent, and we need to offset by 1 to reach the stack index, they cancel | |
626 # each other out, so the nest level is the same as the parent's stack index. | |
627 assert step_config.nest_level < len(self._nest_stack), ( | |
628 'Invalid nest level %d (highest is %d)' % ( | |
629 step_config.nest_level, len(self._nest_stack)-1)) | |
630 | |
631 # Clear any items in the nest stack that are deeper than the current | |
632 # element. | |
633 del(self._nest_stack[step_config.nest_level+1:]) | |
634 parent = self._nest_stack[-1] | |
635 | |
636 # Create a stream name for this step. Even though step names are unique, | |
637 # the normalized LogDog step name may overlap with a different step name. | |
638 # We keep track of the step names we've issued to this step space and | |
639 # add indexes if a conflict is identified. | |
640 stream_name_base = parent.stream_name_base.append('steps', | |
641 step_config.base_name) | |
642 index = parent.substream_name_index.setdefault(str(stream_name_base), 0) | |
643 parent.substream_name_index[str(stream_name_base)] += 1 | |
644 if index > 0: | |
645 stream_name_base += '_%d' % (index,) | |
646 | |
647 # Create and populate our new step. | |
648 msg = parent.msg.substep.add().step | |
649 msg.name = step_config.base_name | |
650 msg.status = pb.PENDING | |
651 if step_config.cmd: | |
652 msg.command.command_line.extend(step_config.cmd) | |
653 if step_config.cwd: | |
654 msg.command.cwd = step_config.cwd | |
655 if step_config.env: | |
656 msg.command.environ = step_config.env | |
657 | |
658 step = self.Step( | |
659 msg=msg, | |
660 stream_name_base=stream_name_base, | |
661 substream_name_index={}) | |
662 self._nest_stack.append(step) | |
663 return step | |
664 | |
665 def update_properties(self, **kwargs): | |
666 """Updates a set's property values to incorporate kwargs.""" | |
667 for k, v in sorted(kwargs.iteritems()): | |
668 cur = self._properties.get(k) | |
669 if cur is None: | |
670 cur = self.base.property.add(name=k, value=str(v)) | |
671 self._properties[k] = cur | |
672 continue | |
673 | |
674 # A Property message already exists for this key, so update its value. | |
675 if cur.value != v: | |
676 cur.value = str(v) | |
677 | |
678 | |
679 class _StreamName(object): | |
680 """An immutable validated wrapper for a LogDog stream name.""" | |
681 | |
682 def __init__(self, base): | |
683 if base is not None: | |
684 libs.logdog.streamname.validate_stream_name(base) | |
685 self._base = base | |
686 | |
687 def append(self, *components): | |
688 """Returns (_StreamName): A new _StreamName instance with components added. | |
689 | |
690 Each component in "components" will become a new normalized stream name | |
691 component. Conseqeuntly, any separators (/) in the components will be | |
692 replaced with underscores. | |
693 | |
694 Args: | |
695 components: the path components to append to this _StreamName. | |
696 """ | |
697 if len(components) == 0: | |
698 return self | |
699 | |
700 components = [self._normalize(self._flatten(p)) | |
701 for p in reversed(components)] | |
702 if self._base: | |
703 components.append(self._base) | |
704 return type(self)('/'.join(reversed(components))) | |
705 | |
706 def augment(self, val): | |
707 """Returns (_StreamName): A new _StreamName with "val" appended. | |
708 | |
709 This generates a new, normalized _StreamName with the contents of "val" | |
710 appended to the end. For example: | |
711 | |
712 Original: "foo/bar" | |
713 Append "baz qux": "foo/barbaz_qux" | |
714 """ | |
715 if not val: | |
716 return self | |
717 val = self._flatten(val) | |
718 if self._base: | |
719 val = self._base + val | |
720 return type(self)(self._normalize(val)) | |
721 | |
722 def __iadd__(self, val): | |
723 return self.augment(val) | |
724 | |
725 @staticmethod | |
726 def _flatten(v): | |
727 return v.replace('/', '_') | |
728 | |
729 @staticmethod | |
730 def _normalize(v): | |
731 return libs.logdog.streamname.normalize(v, prefix='s_') | |
732 | |
733 def __str__(self): | |
734 if not self._base: | |
735 raise ValueError('Cannot generate string from empty StreamName.') | |
736 return self._base | |
OLD | NEW |