Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(336)

Side by Side Diff: recipe_engine/stream.py

Issue 2253943003: Formally define step config, pass to stream. (Closed) Base URL: https://github.com/luci/recipes-py@nest-single-event
Patch Set: Rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
11 11
12 StreamEngine will coordinate the multiplexing of streams. In the case of 12 StreamEngine will coordinate the multiplexing of streams. In the case of
13 annotations, this involves keeping track of the STEP_CURSOR and setting it 13 annotations, this involves keeping track of the STEP_CURSOR and setting it
14 accordingly, as well as filtering @@@ lines. 14 accordingly, as well as filtering @@@ lines.
15 15
16 Stream is a virtual well-behaved stream (associated with an Engine) which you 16 Stream is a virtual well-behaved stream (associated with an Engine) which you
17 can just write to without worrying. 17 can just write to without worrying.
18 """ 18 """
19 19
20 import json 20 import json
21 import time 21 import time
22 22
23 from . import env 23 from . import env
24 from . import recipe_api
24 25
25 26
26 class StreamEngine(object): 27 class StreamEngine(object):
27 class Stream(object): 28 class Stream(object):
28 def write_line(self, line): 29 def write_line(self, line):
29 raise NotImplementedError() 30 raise NotImplementedError()
30 31
31 def write_split(self, string): 32 def write_split(self, string):
32 """Write a string (which may contain newlines) to the stream. It will 33 """Write a string (which may contain newlines) to the stream. It will
33 be terminated by a newline.""" 34 be terminated by a newline."""
(...skipping 27 matching lines...) Expand all
61 62
62 def set_step_status(self, status): 63 def set_step_status(self, status):
63 raise NotImplementedError() 64 raise NotImplementedError()
64 65
65 def set_build_property(self, key, value): 66 def set_build_property(self, key, value):
66 raise NotImplementedError() 67 raise NotImplementedError()
67 68
68 def trigger(self, trigger_spec): 69 def trigger(self, trigger_spec):
69 raise NotImplementedError() 70 raise NotImplementedError()
70 71
71 def new_step_stream(self, step_name, allow_subannotations=False, 72 def make_step_stream(self, name, **kwargs):
72 nest_level=None): 73 """Shorthand for creating a step stream from a step configuration dict."""
74 kwargs['name'] = name
75 return self.new_step_stream(recipe_api._make_step_config(**kwargs))
76
77 def new_step_stream(self, step_config):
73 """Creates a new StepStream in this engine. 78 """Creates a new StepStream in this engine.
74 79
75 The step will be considered started at the moment this method is called. 80 The step will be considered started at the moment this method is called.
76 81
77 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow 82 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow
78 annotations that this step emits through to the annotator (True), or 83 annotations that this step emits through to the annotator (True), or
79 guard them by prefixing them with ! (False). The proper way to do this 84 guard them by prefixing them with ! (False). The proper way to do this
80 is to implement an annotations parser that converts to StreamEngine calls; 85 is to implement an annotations parser that converts to StreamEngine calls;
81 i.e. parse -> re-emit. 86 i.e. parse -> re-emit.
82 87
83 Args: 88 Args:
84 nest_level (int): The nest level of the step. None/0 are top-level. 89 step_config (recipe_api.StepConfig): The step configuration.
85 """ 90 """
86 return self._new_step_stream(step_name, allow_subannotations, nest_level)
87
88 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
89 """ABC overridable function for "new_step_stream" with no defaults."""
90 raise NotImplementedError() 91 raise NotImplementedError()
91 92
92 def open(self): 93 def open(self):
93 pass 94 pass
94 95
95 def close(self): 96 def close(self):
96 pass 97 pass
97 98
98 def __enter__(self): 99 def __enter__(self):
99 self.open() 100 self.open()
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
139 self._stream_b.new_log_stream(log_name)) 140 self._stream_b.new_log_stream(log_name))
140 141
141 add_step_text = _void_product('add_step_text') 142 add_step_text = _void_product('add_step_text')
142 add_step_summary_text = _void_product('add_step_summary_text') 143 add_step_summary_text = _void_product('add_step_summary_text')
143 add_step_link = _void_product('add_step_link') 144 add_step_link = _void_product('add_step_link')
144 reset_subannotation_state = _void_product('reset_subannotation_state') 145 reset_subannotation_state = _void_product('reset_subannotation_state')
145 set_step_status = _void_product('set_step_status') 146 set_step_status = _void_product('set_step_status')
146 set_build_property = _void_product('set_build_property') 147 set_build_property = _void_product('set_build_property')
147 trigger = _void_product('trigger') 148 trigger = _void_product('trigger')
148 149
149 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 150 def new_step_stream(self, step_config):
150 return self.StepStream( 151 return self.StepStream(
151 self._engine_a._new_step_stream( 152 self._engine_a.new_step_stream(step_config),
152 step_name, allow_subannotations, nest_level), 153 self._engine_b.new_step_stream(step_config))
153 self._engine_b._new_step_stream(
154 step_name, allow_subannotations, nest_level))
155 154
156 def open(self): 155 def open(self):
157 self._engine_a.open() 156 self._engine_a.open()
158 self._engine_b.open() 157 self._engine_b.open()
159 158
160 def close(self): 159 def close(self):
161 self._engine_a.close() 160 self._engine_a.close()
162 self._engine_b.close() 161 self._engine_b.close()
163 162
164 163
165 def _noop(*args, **kwargs): 164 def _noop(*args, **kwargs):
166 pass 165 pass
167 166
168 class NoopStreamEngine(StreamEngine): 167 class NoopStreamEngine(StreamEngine):
169 class Stream(StreamEngine.Stream): 168 class Stream(StreamEngine.Stream):
170 write_line = _noop 169 write_line = _noop
171 close = _noop 170 close = _noop
172 171
173 class StepStream(Stream): 172 class StepStream(Stream):
174 def new_log_stream(self, log_name): 173 def new_log_stream(self, log_name):
175 return NoopStreamEngine.Stream() 174 return NoopStreamEngine.Stream()
176 add_step_text = _noop 175 add_step_text = _noop
177 add_step_summary_text = _noop 176 add_step_summary_text = _noop
178 add_step_link = _noop 177 add_step_link = _noop
179 reset_subannotation_state = _noop 178 reset_subannotation_state = _noop
180 set_step_status = _noop 179 set_step_status = _noop
181 set_build_property = _noop 180 set_build_property = _noop
182 trigger = _noop 181 trigger = _noop
183 182
184 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 183 def new_step_stream(self, step_config):
185 return self.StepStream() 184 return self.StepStream()
186 185
187 186
188 class StreamEngineInvariants(StreamEngine): 187 class StreamEngineInvariants(StreamEngine):
189 """Checks that the users are using a StreamEngine hygenically. 188 """Checks that the users are using a StreamEngine hygenically.
190 189
191 Multiply with actually functional StreamEngines so you don't have to check 190 Multiply with actually functional StreamEngines so you don't have to check
192 these all over the place. 191 these all over the place.
193 """ 192 """
194 def __init__(self): 193 def __init__(self):
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
255 def write_line(self, line): 254 def write_line(self, line):
256 assert '\n' not in line 255 assert '\n' not in line
257 assert self._step_stream._open 256 assert self._step_stream._open
258 assert self._open 257 assert self._open
259 258
260 def close(self): 259 def close(self):
261 assert self._step_stream._open 260 assert self._step_stream._open
262 assert self._open 261 assert self._open
263 self._open = False 262 self._open = False
264 263
265 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 264 def new_step_stream(self, step_config):
266 assert step_name not in self._streams, 'Step %s already exists' % step_name 265 assert step_config.name not in self._streams, (
267 self._streams.add(step_name) 266 'Step %s already exists' % step_config.name)
268 return self.StepStream(self, step_name) 267 self._streams.add(step_config.name)
268 return self.StepStream(self, step_config.name)
269 269
270 270
271 class AnnotatorStreamEngine(StreamEngine): 271 class AnnotatorStreamEngine(StreamEngine):
272 def __init__(self, outstream, emit_timestamps=False, time_fn=None): 272 def __init__(self, outstream, emit_timestamps=False, time_fn=None):
273 self._current_step = None 273 self._current_step = None
274 self._opened = set() 274 self._opened = set()
275 self._outstream = outstream 275 self._outstream = outstream
276 self.emit_timestamps = emit_timestamps 276 self.emit_timestamps = emit_timestamps
277 self.time_fn = time_fn or time.time 277 self.time_fn = time_fn or time.time
278 278
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
387 self.basic_write(line + '\n') 387 self.basic_write(line + '\n')
388 388
389 # HACK(luqui): If the subannotator script changes the active step, we need 389 # HACK(luqui): If the subannotator script changes the active step, we need
390 # a way to get back to the real step that spawned the script. The right 390 # a way to get back to the real step that spawned the script. The right
391 # way to do that is to parse the annotation stream and re-emit it. But for 391 # way to do that is to parse the annotation stream and re-emit it. But for
392 # now we just provide this method. 392 # now we just provide this method.
393 def reset_subannotation_state(self): 393 def reset_subannotation_state(self):
394 self._engine._current_step = None 394 self._engine._current_step = None
395 395
396 396
397 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 397 def new_step_stream(self, step_config):
398 return self._create_step_stream(step_name, self._outstream, 398 self.output_root_annotation('SEED_STEP', step_config.name)
399 allow_subannotations, nest_level) 399 return self._create_step_stream(step_config, self._outstream)
400 400
401 def _create_step_stream(self, step_name, outstream, allow_subannotations, 401 def _create_step_stream(self, step_config, outstream):
402 nest_level): 402 if step_config.allow_subannotations:
403 self.output_root_annotation('SEED_STEP', step_name) 403 stream = self.AllowSubannotationsStepStream(self, outstream,
404 if allow_subannotations: 404 step_config.name)
405 stream = self.AllowSubannotationsStepStream(self, outstream, step_name)
406 else: 405 else:
407 stream = self.StepStream(self, outstream, step_name) 406 stream = self.StepStream(self, outstream, step_config.name)
408 407
409 if nest_level > 0: 408 if step_config.nest_level > 0:
410 # Emit our current nest level, if we are nested. 409 # Emit our current nest level, if we are nested.
411 stream.output_annotation('STEP_NEST_LEVEL', str(nest_level)) 410 stream.output_annotation('STEP_NEST_LEVEL', str(step_config.nest_level))
412 return stream 411 return stream
413 412
414 413
415 def encode_str(s): 414 def encode_str(s):
416 """Tries to encode a string into a python str type. 415 """Tries to encode a string into a python str type.
417 416
418 Currently buildbot only supports ascii. If we have an error decoding the 417 Currently buildbot only supports ascii. If we have an error decoding the
419 string (which means it might not be valid ascii), we decode the string with 418 string (which means it might not be valid ascii), we decode the string with
420 the 'replace' error mode, which replaces invalid characters with a suitable 419 the 'replace' error mode, which replaces invalid characters with a suitable
421 replacement character. 420 replacement character.
422 """ 421 """
423 try: 422 try:
424 return str(s) 423 return str(s)
425 except UnicodeEncodeError: 424 except UnicodeEncodeError:
426 return s.encode('utf-8', 'replace') 425 return s.encode('utf-8', 'replace')
427 except UnicodeDecodeError: 426 except UnicodeDecodeError:
428 return s.decode('utf-8', 'replace') 427 return s.decode('utf-8', 'replace')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698