Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(126)

Side by Side Diff: recipe_engine/stream.py

Issue 2245113002: Track step nesting in StreamEngine. (Closed) Base URL: https://github.com/luci/recipes-py@emit-initial-properties
Patch Set: Rebase, comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/step_runner.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 55
56 def add_step_link(self, name, url): 56 def add_step_link(self, name, url):
57 raise NotImplementedError() 57 raise NotImplementedError()
58 58
59 def reset_subannotation_state(self): 59 def reset_subannotation_state(self):
60 pass 60 pass
61 61
62 def set_step_status(self, status): 62 def set_step_status(self, status):
63 raise NotImplementedError() 63 raise NotImplementedError()
64 64
65 def set_nest_level(self, nest_level):
66 raise NotImplementedError()
67
68 def set_build_property(self, key, value): 65 def set_build_property(self, key, value):
69 raise NotImplementedError() 66 raise NotImplementedError()
70 67
71 def trigger(self, trigger_spec): 68 def trigger(self, trigger_spec):
72 raise NotImplementedError() 69 raise NotImplementedError()
73 70
74 def new_step_stream(self, step_name, allow_subannotations=False): 71 def new_step_stream(self, step_name, allow_subannotations=False,
72 nest_level=None):
75 """Creates a new StepStream in this engine. 73 """Creates a new StepStream in this engine.
76 74
77 The step will be considered started at the moment this method is called. 75 The step will be considered started at the moment this method is called.
78 76
79 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow 77 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow
80 annotations that this step emits through to the annotator (True), or 78 annotations that this step emits through to the annotator (True), or
81 guard them by prefixing them with ! (False). The proper way to do this 79 guard them by prefixing them with ! (False). The proper way to do this
82 is to implement an annotations parser that converts to StreamEngine calls; 80 is to implement an annotations parser that converts to StreamEngine calls;
83 i.e. parse -> re-emit. 81 i.e. parse -> re-emit.
82
83 Args:
84 nest_level (int): The nest level of the step. None/0 are top-level.
84 """ 85 """
86 return self._new_step_stream(step_name, allow_subannotations, nest_level)
85 87
88 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
89 """ABC overridable function for "new_step_stream" with no defaults."""
86 raise NotImplementedError() 90 raise NotImplementedError()
87 91
88 def open(self): 92 def open(self):
89 pass 93 pass
90 94
91 def close(self): 95 def close(self):
92 pass 96 pass
93 97
94 def __enter__(self): 98 def __enter__(self):
95 self.open() 99 self.open()
96 return self 100 return self
97 101
98 def __exit__(self, _exc_type, _exc_val, _exc_tb): 102 def __exit__(self, _exc_type, _exc_val, _exc_tb):
99 self.close() 103 self.close()
100 104
101 105
102 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can 106 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
103 # form products. This code is entirely mechanical from the types (if we 107 # form products. This code is entirely mechanical from the types (if we
104 # had them formalized...). 108 # had them formalized...).
105 class ProductStreamEngine(StreamEngine): 109 class ProductStreamEngine(StreamEngine):
106 def __init__(self, engine_a, engine_b): 110 def __init__(self, engine_a, engine_b):
111 assert engine_a and engine_b
107 self._engine_a = engine_a 112 self._engine_a = engine_a
108 self._engine_b = engine_b 113 self._engine_b = engine_b
109 114
110 class Stream(StreamEngine.Stream): 115 class Stream(StreamEngine.Stream):
111 def __init__(self, stream_a, stream_b): 116 def __init__(self, stream_a, stream_b):
117 assert stream_a and stream_b
112 self._stream_a = stream_a 118 self._stream_a = stream_a
113 self._stream_b = stream_b 119 self._stream_b = stream_b
114 120
115 def write_line(self, line): 121 def write_line(self, line):
116 self._stream_a.write_line(line) 122 self._stream_a.write_line(line)
117 self._stream_b.write_line(line) 123 self._stream_b.write_line(line)
118 124
119 def close(self): 125 def close(self):
120 self._stream_a.close() 126 self._stream_a.close()
121 self._stream_b.close() 127 self._stream_b.close()
122 128
123 class StepStream(Stream): 129 class StepStream(Stream):
124 def _void_product(method_name): 130 def _void_product(method_name):
125 def inner(self, *args): 131 def inner(self, *args):
126 getattr(self._stream_a, method_name)(*args) 132 getattr(self._stream_a, method_name)(*args)
127 getattr(self._stream_b, method_name)(*args) 133 getattr(self._stream_b, method_name)(*args)
128 return inner 134 return inner
129 135
130 def new_log_stream(self, log_name): 136 def new_log_stream(self, log_name):
131 return ProductStreamEngine.Stream( 137 return ProductStreamEngine.Stream(
132 self._stream_a.new_log_stream(log_name), 138 self._stream_a.new_log_stream(log_name),
133 self._stream_b.new_log_stream(log_name)) 139 self._stream_b.new_log_stream(log_name))
134 140
135 add_step_text = _void_product('add_step_text') 141 add_step_text = _void_product('add_step_text')
136 add_step_summary_text = _void_product('add_step_summary_text') 142 add_step_summary_text = _void_product('add_step_summary_text')
137 add_step_link = _void_product('add_step_link') 143 add_step_link = _void_product('add_step_link')
138 reset_subannotation_state = _void_product('reset_subannotation_state') 144 reset_subannotation_state = _void_product('reset_subannotation_state')
139 set_step_status = _void_product('set_step_status') 145 set_step_status = _void_product('set_step_status')
140 set_nest_level = _void_product('set_nest_level')
141 set_build_property = _void_product('set_build_property') 146 set_build_property = _void_product('set_build_property')
142 trigger = _void_product('trigger') 147 trigger = _void_product('trigger')
143 148
144 def new_step_stream(self, step_name, allow_subannotations=False): 149 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
145 return self.StepStream( 150 return self.StepStream(
146 self._engine_a.new_step_stream(step_name, allow_subannotations), 151 self._engine_a._new_step_stream(
147 self._engine_b.new_step_stream(step_name, allow_subannotations)) 152 step_name, allow_subannotations, nest_level),
153 self._engine_b._new_step_stream(
154 step_name, allow_subannotations, nest_level))
148 155
149 def open(self): 156 def open(self):
150 self._engine_a.open() 157 self._engine_a.open()
151 self._engine_b.open() 158 self._engine_b.open()
152 159
153 def close(self): 160 def close(self):
154 self._engine_a.close() 161 self._engine_a.close()
155 self._engine_b.close() 162 self._engine_b.close()
156 163
157 164
158 def _noop(*args, **kwargs): 165 def _noop(*args, **kwargs):
159 pass 166 pass
160 167
161 class NoopStreamEngine(StreamEngine): 168 class NoopStreamEngine(StreamEngine):
162 class Stream(StreamEngine.Stream): 169 class Stream(StreamEngine.Stream):
163 write_line = _noop 170 write_line = _noop
164 close = _noop 171 close = _noop
165 172
166 class StepStream(Stream): 173 class StepStream(Stream):
167 def new_log_stream(self, log_name): 174 def new_log_stream(self, log_name):
168 return NoopStreamEngine.Stream() 175 return NoopStreamEngine.Stream()
169 add_step_text = _noop 176 add_step_text = _noop
170 add_step_summary_text = _noop 177 add_step_summary_text = _noop
171 add_step_link = _noop 178 add_step_link = _noop
172 reset_subannotation_state = _noop 179 reset_subannotation_state = _noop
173 set_step_status = _noop 180 set_step_status = _noop
174 set_nest_level = _noop
175 set_build_property = _noop 181 set_build_property = _noop
176 trigger = _noop 182 trigger = _noop
177 183
178 def new_step_stream(self, step_name, allow_subannotations=False): 184 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
179 return self.StepStream() 185 return self.StepStream()
180 186
181 187
182 class StreamEngineInvariants(StreamEngine): 188 class StreamEngineInvariants(StreamEngine):
183 """Checks that the users are using a StreamEngine hygenically. 189 """Checks that the users are using a StreamEngine hygenically.
184 190
185 Multiply with actually functional StreamEngines so you don't have to check 191 Multiply with actually functional StreamEngines so you don't have to check
186 these all over the place. 192 these all over the place.
187 """ 193 """
188 def __init__(self): 194 def __init__(self):
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
226 assert isinstance(url, basestring), 'Link url %s is not a string' % url 232 assert isinstance(url, basestring), 'Link url %s is not a string' % url
227 233
228 def set_step_status(self, status): 234 def set_step_status(self, status):
229 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION') 235 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION')
230 if status == 'SUCCESS': 236 if status == 'SUCCESS':
231 # A constraint imposed by the annotations implementation 237 # A constraint imposed by the annotations implementation
232 assert self._status == 'SUCCESS', ( 238 assert self._status == 'SUCCESS', (
233 'Cannot set successful status after status is %s' % self._status) 239 'Cannot set successful status after status is %s' % self._status)
234 self._status = status 240 self._status = status
235 241
236 def set_nest_level(self, nest_level):
237 assert isinstance(nest_level, int)
238
239 def set_build_property(self, key, value): 242 def set_build_property(self, key, value):
240 pass 243 pass
241 244
242 def trigger(self, spec): 245 def trigger(self, spec):
243 assert '\n' not in spec # Spec must fit on one line. 246 assert '\n' not in spec # Spec must fit on one line.
244 json.loads(spec) # Spec must be a valid json object. 247 json.loads(spec) # Spec must be a valid json object.
245 248
246 class LogStream(StreamEngine.Stream): 249 class LogStream(StreamEngine.Stream):
247 def __init__(self, step_stream, log_name): 250 def __init__(self, step_stream, log_name):
248 self._step_stream = step_stream 251 self._step_stream = step_stream
249 self._log_name = log_name 252 self._log_name = log_name
250 self._open = True 253 self._open = True
251 254
252 def write_line(self, line): 255 def write_line(self, line):
253 assert '\n' not in line 256 assert '\n' not in line
254 assert self._step_stream._open 257 assert self._step_stream._open
255 assert self._open 258 assert self._open
256 259
257 def close(self): 260 def close(self):
258 assert self._step_stream._open 261 assert self._step_stream._open
259 assert self._open 262 assert self._open
260 self._open = False 263 self._open = False
261 264
262 def new_step_stream(self, step_name, allow_subannotations=False): 265 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
263 assert step_name not in self._streams, 'Step %s already exists' % step_name 266 assert step_name not in self._streams, 'Step %s already exists' % step_name
264 self._streams.add(step_name) 267 self._streams.add(step_name)
265 return self.StepStream(self, step_name) 268 return self.StepStream(self, step_name)
266 269
267 270
268 class AnnotationStepStream(StreamEngine.StepStream): 271 class AnnotatorStreamEngine(StreamEngine):
269 def __init__(self, emit_timestamps=False, time_fn=None): 272 def __init__(self, outstream, emit_timestamps=False, time_fn=None):
273 self._current_step = None
274 self._opened = set()
275 self._outstream = outstream
270 self.emit_timestamps = emit_timestamps 276 self.emit_timestamps = emit_timestamps
271 self.time_fn = time_fn or time.time 277 self.time_fn = time_fn or time.time
272 278
273 def basic_write(self, line): 279 def open(self):
274 raise NotImplementedError() 280 super(AnnotatorStreamEngine, self).open()
275 281 self.output_current_time()
276 def output_annotation(self, *args): 282 self.output_root_annotation('HONOR_ZERO_RETURN_CODE')
277 self.basic_write(
278 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
279
280 def write_line(self, line):
281 if line.startswith('@@@'):
282 self.basic_write('!' + line + '\n')
283 else:
284 self.basic_write(line + '\n')
285 283
286 def close(self): 284 def close(self):
285 super(AnnotatorStreamEngine, self).close()
286 self.output_current_time()
287
288 def output_current_time(self, step=None):
289 """Prints CURRENT_TIMESTAMP annotation with current time."""
290 if step:
291 self._step_cursor(step)
287 if self.emit_timestamps: 292 if self.emit_timestamps:
288 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn()) 293 self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn())
289 self.output_annotation('STEP_CLOSED')
290 294
291 def new_log_stream(self, log_name): 295 @staticmethod
292 return self.StepLogStream(self, log_name) 296 def write_annotation(outstream, *args):
297 # Flush the stream before & after engine annotations, because they can
298 # change which step we are talking about and this matters to buildbot.
299 outstream.flush()
300 outstream.write(
301 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
302 outstream.flush()
293 303
294 def add_step_text(self, text): 304 def output_root_annotation(self, *args):
295 self.output_annotation('STEP_TEXT', text) 305 self.write_annotation(self._outstream, *args)
296 306
297 def add_step_summary_text(self, text): 307 def _step_cursor(self, step_name):
298 self.output_annotation('STEP_SUMMARY_TEXT', text) 308 if self._current_step != step_name:
309 self.output_root_annotation('STEP_CURSOR', step_name)
310 self._current_step = step_name
311 if step_name not in self._opened:
312 self.output_current_time()
313 self.output_root_annotation('STEP_STARTED')
314 self._opened.add(step_name)
299 315
300 def add_step_link(self, name, url): 316 class StepStream(StreamEngine.StepStream):
301 self.output_annotation('STEP_LINK', name, url) 317 def __init__(self, engine, outstream, step_name):
318 super(StreamEngine.StepStream, self).__init__()
302 319
303 def set_step_status(self, status): 320 self._engine = engine
304 if status == 'SUCCESS': 321 self._outstream = outstream
305 pass 322 self._step_name = step_name
306 elif status == 'WARNING':
307 self.output_annotation('STEP_WARNINGS')
308 elif status == 'FAILURE':
309 self.output_annotation('STEP_FAILURE')
310 elif status == 'EXCEPTION':
311 self.output_annotation('STEP_EXCEPTION')
312 else:
313 raise Exception('Impossible status %s' % status)
314 323
315 def set_nest_level(self, nest_level): 324 def basic_write(self, line):
316 self.output_annotation('STEP_NEST_LEVEL', str(nest_level)) 325 self._engine._step_cursor(self._step_name)
326 self._outstream.write(line)
317 327
318 def set_build_property(self, key, value): 328 def close(self):
319 self.output_annotation('SET_BUILD_PROPERTY', key, value) 329 self._engine.output_current_time(step=self._step_name)
330 self.output_annotation('STEP_CLOSED')
320 331
321 def trigger(self, spec): 332 def output_annotation(self, *args):
322 self.output_annotation('STEP_TRIGGER', spec) 333 self._engine._step_cursor(self._step_name)
334 self._engine.write_annotation(self._outstream, *args)
335
336 def write_line(self, line):
337 if line.startswith('@@@'):
338 self.basic_write('!' + line + '\n')
339 else:
340 self.basic_write(line + '\n')
341
342 def new_log_stream(self, log_name):
343 return self._engine.StepLogStream(self, log_name)
344
345 def add_step_text(self, text):
346 self.output_annotation('STEP_TEXT', text)
347
348 def add_step_summary_text(self, text):
349 self.output_annotation('STEP_SUMMARY_TEXT', text)
350
351 def add_step_link(self, name, url):
352 self.output_annotation('STEP_LINK', name, url)
353
354 def set_step_status(self, status):
355 if status == 'SUCCESS':
356 pass
357 elif status == 'WARNING':
358 self.output_annotation('STEP_WARNINGS')
359 elif status == 'FAILURE':
360 self.output_annotation('STEP_FAILURE')
361 elif status == 'EXCEPTION':
362 self.output_annotation('STEP_EXCEPTION')
363 else:
364 raise Exception('Impossible status %s' % status)
365
366 def set_build_property(self, key, value):
367 self.output_annotation('SET_BUILD_PROPERTY', key, value)
368
369 def trigger(self, spec):
370 self.output_annotation('STEP_TRIGGER', spec)
371
323 372
324 class StepLogStream(StreamEngine.Stream): 373 class StepLogStream(StreamEngine.Stream):
325 def __init__(self, step_stream, log_name): 374 def __init__(self, step_stream, log_name):
326 self._step_stream = step_stream 375 self._step_stream = step_stream
327 self._log_name = log_name.replace('/', '/') 376 self._log_name = log_name.replace('/', '/')
328 377
329 def write_line(self, line): 378 def write_line(self, line):
330 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line) 379 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
331 380
332 def close(self): 381 def close(self):
333 self._step_stream.output_annotation('STEP_LOG_END', self._log_name) 382 self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
334 383
335 384
336 class AnnotatorStreamEngine(StreamEngine):
337 def __init__(self, outstream, emit_timestamps=False, time_fn=None):
338 self._current_step = None
339 self._opened = set()
340 self._outstream = outstream
341 self.emit_timestamps = emit_timestamps
342 self.time_fn = time_fn or time.time
343
344 def open(self):
345 super(AnnotatorStreamEngine, self).open()
346 if self.emit_timestamps:
347 self.output_current_time()
348 self.output_annotation('HONOR_ZERO_RETURN_CODE')
349
350 def close(self):
351 super(AnnotatorStreamEngine, self).close()
352 if self.emit_timestamps:
353 self.output_current_time()
354
355 def output_current_time(self):
356 """Prints CURRENT_TIMESTAMP annotation with current time."""
357 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
358
359 def output_annotation(self, *args):
360 # Flush the stream before & after engine annotations, because they can
361 # change which step we are talking about and this matters to buildbot.
362 self._outstream.flush()
363 self._outstream.write(
364 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
365 self._outstream.flush()
366
367 def _step_cursor(self, name):
368 if self._current_step != name:
369 self.output_annotation('STEP_CURSOR', name)
370 self._current_step = name
371 if name not in self._opened:
372 if self.emit_timestamps:
373 self.output_current_time()
374 self.output_annotation('STEP_STARTED')
375 self._opened.add(name)
376
377 class StepStream(AnnotationStepStream):
378 def __init__(self, engine, step_name):
379 AnnotationStepStream.__init__(
380 self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn)
381 self._engine = engine
382 self._step_name = step_name
383
384 def basic_write(self, line):
385 self._engine._step_cursor(self._step_name)
386 self._engine._outstream.write(line)
387
388 class AllowSubannotationsStepStream(StepStream): 385 class AllowSubannotationsStepStream(StepStream):
389 def write_line(self, line): 386 def write_line(self, line):
390 self.basic_write(line + '\n') 387 self.basic_write(line + '\n')
391 388
392 # HACK(luqui): If the subannotator script changes the active step, we need 389 # HACK(luqui): If the subannotator script changes the active step, we need
393 # a way to get back to the real step that spawned the script. The right 390 # a way to get back to the real step that spawned the script. The right
394 # way to do that is to parse the annotation stream and re-emit it. But for 391 # way to do that is to parse the annotation stream and re-emit it. But for
395 # now we just provide this method. 392 # now we just provide this method.
396 def reset_subannotation_state(self): 393 def reset_subannotation_state(self):
397 self._engine._current_step = None 394 self._engine._current_step = None
398 395
399 def new_step_stream(self, step_name, allow_subannotations=False): 396
400 self.output_annotation('SEED_STEP', step_name) 397 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
398 return self._create_step_stream(step_name, self._outstream,
399 allow_subannotations, nest_level)
400
401 def _create_step_stream(self, step_name, outstream, allow_subannotations,
402 nest_level):
403 self.output_root_annotation('SEED_STEP', step_name)
401 if allow_subannotations: 404 if allow_subannotations:
402 return self.AllowSubannotationsStepStream(self, step_name) 405 stream = self.AllowSubannotationsStepStream(self, outstream, step_name)
403 else: 406 else:
404 return self.StepStream(self, step_name) 407 stream = self.StepStream(self, outstream, step_name)
405 408
406 409 if nest_level > 0:
407 class BareAnnotationStepStream(AnnotationStepStream): 410 # Emit our current nest level, if we are nested.
408 """A StepStream that is not tied to any engine, and emits assuming it has the 411 stream.output_annotation('STEP_NEST_LEVEL', str(nest_level))
409 cursor. 412 return stream
410
411 This is used for capturing the annotations in the engine.
412 """
413 def __init__(self, outstream):
414 self._outstream = outstream
415
416 def basic_write(self, line):
417 self._outstream.write(encode_str(line))
418 413
419 414
420 def encode_str(s): 415 def encode_str(s):
421 """Tries to encode a string into a python str type. 416 """Tries to encode a string into a python str type.
422 417
423 Currently buildbot only supports ascii. If we have an error decoding the 418 Currently buildbot only supports ascii. If we have an error decoding the
424 string (which means it might not be valid ascii), we decode the string with 419 string (which means it might not be valid ascii), we decode the string with
425 the 'replace' error mode, which replaces invalid characters with a suitable 420 the 'replace' error mode, which replaces invalid characters with a suitable
426 replacement character. 421 replacement character.
427 """ 422 """
428 try: 423 try:
429 return str(s) 424 return str(s)
430 except UnicodeEncodeError: 425 except UnicodeEncodeError:
431 return s.encode('utf-8', 'replace') 426 return s.encode('utf-8', 'replace')
432 except UnicodeDecodeError: 427 except UnicodeDecodeError:
433 return s.decode('utf-8', 'replace') 428 return s.decode('utf-8', 'replace')
OLDNEW
« no previous file with comments | « recipe_engine/step_runner.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698