Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Side by Side Diff: recipe_engine/stream.py

Issue 2320223002: Revert of Track step nesting in StreamEngine. (Closed) Base URL: https://github.com/luci/recipes-py@emit-initial-properties
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/step_runner.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 55
56 def add_step_link(self, name, url): 56 def add_step_link(self, name, url):
57 raise NotImplementedError() 57 raise NotImplementedError()
58 58
59 def reset_subannotation_state(self): 59 def reset_subannotation_state(self):
60 pass 60 pass
61 61
62 def set_step_status(self, status): 62 def set_step_status(self, status):
63 raise NotImplementedError() 63 raise NotImplementedError()
64 64
65 def set_nest_level(self, nest_level):
66 raise NotImplementedError()
67
65 def set_build_property(self, key, value): 68 def set_build_property(self, key, value):
66 raise NotImplementedError() 69 raise NotImplementedError()
67 70
68 def trigger(self, trigger_spec): 71 def trigger(self, trigger_spec):
69 raise NotImplementedError() 72 raise NotImplementedError()
70 73
71 def new_step_stream(self, step_name, allow_subannotations=False, 74 def new_step_stream(self, step_name, allow_subannotations=False):
72 nest_level=None):
73 """Creates a new StepStream in this engine. 75 """Creates a new StepStream in this engine.
74 76
75 The step will be considered started at the moment this method is called. 77 The step will be considered started at the moment this method is called.
76 78
77 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow 79 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow
78 annotations that this step emits through to the annotator (True), or 80 annotations that this step emits through to the annotator (True), or
79 guard them by prefixing them with ! (False). The proper way to do this 81 guard them by prefixing them with ! (False). The proper way to do this
80 is to implement an annotations parser that converts to StreamEngine calls; 82 is to implement an annotations parser that converts to StreamEngine calls;
81 i.e. parse -> re-emit. 83 i.e. parse -> re-emit.
84 """
82 85
83 Args:
84 nest_level (int): The nest level of the step. None/0 are top-level.
85 """
86 return self._new_step_stream(step_name, allow_subannotations, nest_level)
87
88 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
89 """ABC overridable function for "new_step_stream" with no defaults."""
90 raise NotImplementedError() 86 raise NotImplementedError()
91 87
92 def open(self): 88 def open(self):
93 pass 89 pass
94 90
95 def close(self): 91 def close(self):
96 pass 92 pass
97 93
98 def __enter__(self): 94 def __enter__(self):
99 self.open() 95 self.open()
100 return self 96 return self
101 97
102 def __exit__(self, _exc_type, _exc_val, _exc_tb): 98 def __exit__(self, _exc_type, _exc_val, _exc_tb):
103 self.close() 99 self.close()
104 100
105 101
106 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can 102 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
107 # form products. This code is entirely mechanical from the types (if we 103 # form products. This code is entirely mechanical from the types (if we
108 # had them formalized...). 104 # had them formalized...).
109 class ProductStreamEngine(StreamEngine): 105 class ProductStreamEngine(StreamEngine):
110 def __init__(self, engine_a, engine_b): 106 def __init__(self, engine_a, engine_b):
111 assert engine_a and engine_b
112 self._engine_a = engine_a 107 self._engine_a = engine_a
113 self._engine_b = engine_b 108 self._engine_b = engine_b
114 109
115 class Stream(StreamEngine.Stream): 110 class Stream(StreamEngine.Stream):
116 def __init__(self, stream_a, stream_b): 111 def __init__(self, stream_a, stream_b):
117 assert stream_a and stream_b
118 self._stream_a = stream_a 112 self._stream_a = stream_a
119 self._stream_b = stream_b 113 self._stream_b = stream_b
120 114
121 def write_line(self, line): 115 def write_line(self, line):
122 self._stream_a.write_line(line) 116 self._stream_a.write_line(line)
123 self._stream_b.write_line(line) 117 self._stream_b.write_line(line)
124 118
125 def close(self): 119 def close(self):
126 self._stream_a.close() 120 self._stream_a.close()
127 self._stream_b.close() 121 self._stream_b.close()
128 122
129 class StepStream(Stream): 123 class StepStream(Stream):
130 def _void_product(method_name): 124 def _void_product(method_name):
131 def inner(self, *args): 125 def inner(self, *args):
132 getattr(self._stream_a, method_name)(*args) 126 getattr(self._stream_a, method_name)(*args)
133 getattr(self._stream_b, method_name)(*args) 127 getattr(self._stream_b, method_name)(*args)
134 return inner 128 return inner
135 129
136 def new_log_stream(self, log_name): 130 def new_log_stream(self, log_name):
137 return ProductStreamEngine.Stream( 131 return ProductStreamEngine.Stream(
138 self._stream_a.new_log_stream(log_name), 132 self._stream_a.new_log_stream(log_name),
139 self._stream_b.new_log_stream(log_name)) 133 self._stream_b.new_log_stream(log_name))
140 134
141 add_step_text = _void_product('add_step_text') 135 add_step_text = _void_product('add_step_text')
142 add_step_summary_text = _void_product('add_step_summary_text') 136 add_step_summary_text = _void_product('add_step_summary_text')
143 add_step_link = _void_product('add_step_link') 137 add_step_link = _void_product('add_step_link')
144 reset_subannotation_state = _void_product('reset_subannotation_state') 138 reset_subannotation_state = _void_product('reset_subannotation_state')
145 set_step_status = _void_product('set_step_status') 139 set_step_status = _void_product('set_step_status')
140 set_nest_level = _void_product('set_nest_level')
146 set_build_property = _void_product('set_build_property') 141 set_build_property = _void_product('set_build_property')
147 trigger = _void_product('trigger') 142 trigger = _void_product('trigger')
148 143
149 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 144 def new_step_stream(self, step_name, allow_subannotations=False):
150 return self.StepStream( 145 return self.StepStream(
151 self._engine_a._new_step_stream( 146 self._engine_a.new_step_stream(step_name, allow_subannotations),
152 step_name, allow_subannotations, nest_level), 147 self._engine_b.new_step_stream(step_name, allow_subannotations))
153 self._engine_b._new_step_stream(
154 step_name, allow_subannotations, nest_level))
155 148
156 def open(self): 149 def open(self):
157 self._engine_a.open() 150 self._engine_a.open()
158 self._engine_b.open() 151 self._engine_b.open()
159 152
160 def close(self): 153 def close(self):
161 self._engine_a.close() 154 self._engine_a.close()
162 self._engine_b.close() 155 self._engine_b.close()
163 156
164 157
165 def _noop(*args, **kwargs): 158 def _noop(*args, **kwargs):
166 pass 159 pass
167 160
168 class NoopStreamEngine(StreamEngine): 161 class NoopStreamEngine(StreamEngine):
169 class Stream(StreamEngine.Stream): 162 class Stream(StreamEngine.Stream):
170 write_line = _noop 163 write_line = _noop
171 close = _noop 164 close = _noop
172 165
173 class StepStream(Stream): 166 class StepStream(Stream):
174 def new_log_stream(self, log_name): 167 def new_log_stream(self, log_name):
175 return NoopStreamEngine.Stream() 168 return NoopStreamEngine.Stream()
176 add_step_text = _noop 169 add_step_text = _noop
177 add_step_summary_text = _noop 170 add_step_summary_text = _noop
178 add_step_link = _noop 171 add_step_link = _noop
179 reset_subannotation_state = _noop 172 reset_subannotation_state = _noop
180 set_step_status = _noop 173 set_step_status = _noop
174 set_nest_level = _noop
181 set_build_property = _noop 175 set_build_property = _noop
182 trigger = _noop 176 trigger = _noop
183 177
184 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 178 def new_step_stream(self, step_name, allow_subannotations=False):
185 return self.StepStream() 179 return self.StepStream()
186 180
187 181
188 class StreamEngineInvariants(StreamEngine): 182 class StreamEngineInvariants(StreamEngine):
189 """Checks that the users are using a StreamEngine hygenically. 183 """Checks that the users are using a StreamEngine hygenically.
190 184
191 Multiply with actually functional StreamEngines so you don't have to check 185 Multiply with actually functional StreamEngines so you don't have to check
192 these all over the place. 186 these all over the place.
193 """ 187 """
194 def __init__(self): 188 def __init__(self):
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 assert isinstance(url, basestring), 'Link url %s is not a string' % url 226 assert isinstance(url, basestring), 'Link url %s is not a string' % url
233 227
234 def set_step_status(self, status): 228 def set_step_status(self, status):
235 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION') 229 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION')
236 if status == 'SUCCESS': 230 if status == 'SUCCESS':
237 # A constraint imposed by the annotations implementation 231 # A constraint imposed by the annotations implementation
238 assert self._status == 'SUCCESS', ( 232 assert self._status == 'SUCCESS', (
239 'Cannot set successful status after status is %s' % self._status) 233 'Cannot set successful status after status is %s' % self._status)
240 self._status = status 234 self._status = status
241 235
236 def set_nest_level(self, nest_level):
237 assert isinstance(nest_level, int)
238
242 def set_build_property(self, key, value): 239 def set_build_property(self, key, value):
243 pass 240 pass
244 241
245 def trigger(self, spec): 242 def trigger(self, spec):
246 assert '\n' not in spec # Spec must fit on one line. 243 assert '\n' not in spec # Spec must fit on one line.
247 json.loads(spec) # Spec must be a valid json object. 244 json.loads(spec) # Spec must be a valid json object.
248 245
249 class LogStream(StreamEngine.Stream): 246 class LogStream(StreamEngine.Stream):
250 def __init__(self, step_stream, log_name): 247 def __init__(self, step_stream, log_name):
251 self._step_stream = step_stream 248 self._step_stream = step_stream
252 self._log_name = log_name 249 self._log_name = log_name
253 self._open = True 250 self._open = True
254 251
255 def write_line(self, line): 252 def write_line(self, line):
256 assert '\n' not in line 253 assert '\n' not in line
257 assert self._step_stream._open 254 assert self._step_stream._open
258 assert self._open 255 assert self._open
259 256
260 def close(self): 257 def close(self):
261 assert self._step_stream._open 258 assert self._step_stream._open
262 assert self._open 259 assert self._open
263 self._open = False 260 self._open = False
264 261
265 def _new_step_stream(self, step_name, allow_subannotations, nest_level): 262 def new_step_stream(self, step_name, allow_subannotations=False):
266 assert step_name not in self._streams, 'Step %s already exists' % step_name 263 assert step_name not in self._streams, 'Step %s already exists' % step_name
267 self._streams.add(step_name) 264 self._streams.add(step_name)
268 return self.StepStream(self, step_name) 265 return self.StepStream(self, step_name)
269 266
270 267
268 class AnnotationStepStream(StreamEngine.StepStream):
269 def __init__(self, emit_timestamps=False, time_fn=None):
270 self.emit_timestamps = emit_timestamps
271 self.time_fn = time_fn or time.time
272
273 def basic_write(self, line):
274 raise NotImplementedError()
275
276 def output_annotation(self, *args):
277 self.basic_write(
278 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
279
280 def write_line(self, line):
281 if line.startswith('@@@'):
282 self.basic_write('!' + line + '\n')
283 else:
284 self.basic_write(line + '\n')
285
286 def close(self):
287 if self.emit_timestamps:
288 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
289 self.output_annotation('STEP_CLOSED')
290
291 def new_log_stream(self, log_name):
292 return self.StepLogStream(self, log_name)
293
294 def add_step_text(self, text):
295 self.output_annotation('STEP_TEXT', text)
296
297 def add_step_summary_text(self, text):
298 self.output_annotation('STEP_SUMMARY_TEXT', text)
299
300 def add_step_link(self, name, url):
301 self.output_annotation('STEP_LINK', name, url)
302
303 def set_step_status(self, status):
304 if status == 'SUCCESS':
305 pass
306 elif status == 'WARNING':
307 self.output_annotation('STEP_WARNINGS')
308 elif status == 'FAILURE':
309 self.output_annotation('STEP_FAILURE')
310 elif status == 'EXCEPTION':
311 self.output_annotation('STEP_EXCEPTION')
312 else:
313 raise Exception('Impossible status %s' % status)
314
315 def set_nest_level(self, nest_level):
316 self.output_annotation('STEP_NEST_LEVEL', str(nest_level))
317
318 def set_build_property(self, key, value):
319 self.output_annotation('SET_BUILD_PROPERTY', key, value)
320
321 def trigger(self, spec):
322 self.output_annotation('STEP_TRIGGER', spec)
323
324 class StepLogStream(StreamEngine.Stream):
325 def __init__(self, step_stream, log_name):
326 self._step_stream = step_stream
327 self._log_name = log_name.replace('/', '/')
328
329 def write_line(self, line):
330 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
331
332 def close(self):
333 self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
334
335
271 class AnnotatorStreamEngine(StreamEngine): 336 class AnnotatorStreamEngine(StreamEngine):
272 def __init__(self, outstream, emit_timestamps=False, time_fn=None): 337 def __init__(self, outstream, emit_timestamps=False, time_fn=None):
273 self._current_step = None 338 self._current_step = None
274 self._opened = set() 339 self._opened = set()
275 self._outstream = outstream 340 self._outstream = outstream
276 self.emit_timestamps = emit_timestamps 341 self.emit_timestamps = emit_timestamps
277 self.time_fn = time_fn or time.time 342 self.time_fn = time_fn or time.time
278 343
279 def open(self): 344 def open(self):
280 super(AnnotatorStreamEngine, self).open() 345 super(AnnotatorStreamEngine, self).open()
281 self.output_current_time() 346 if self.emit_timestamps:
282 self.output_root_annotation('HONOR_ZERO_RETURN_CODE') 347 self.output_current_time()
348 self.output_annotation('HONOR_ZERO_RETURN_CODE')
283 349
284 def close(self): 350 def close(self):
285 super(AnnotatorStreamEngine, self).close() 351 super(AnnotatorStreamEngine, self).close()
286 self.output_current_time() 352 if self.emit_timestamps:
353 self.output_current_time()
287 354
288 def output_current_time(self, step=None): 355 def output_current_time(self):
289 """Prints CURRENT_TIMESTAMP annotation with current time.""" 356 """Prints CURRENT_TIMESTAMP annotation with current time."""
290 if step: 357 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
291 self._step_cursor(step)
292 if self.emit_timestamps:
293 self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn())
294 358
295 @staticmethod 359 def output_annotation(self, *args):
296 def write_annotation(outstream, *args):
297 # Flush the stream before & after engine annotations, because they can 360 # Flush the stream before & after engine annotations, because they can
298 # change which step we are talking about and this matters to buildbot. 361 # change which step we are talking about and this matters to buildbot.
299 outstream.flush() 362 self._outstream.flush()
300 outstream.write( 363 self._outstream.write(
301 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n') 364 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
302 outstream.flush() 365 self._outstream.flush()
303 366
304 def output_root_annotation(self, *args): 367 def _step_cursor(self, name):
305 self.write_annotation(self._outstream, *args) 368 if self._current_step != name:
369 self.output_annotation('STEP_CURSOR', name)
370 self._current_step = name
371 if name not in self._opened:
372 if self.emit_timestamps:
373 self.output_current_time()
374 self.output_annotation('STEP_STARTED')
375 self._opened.add(name)
306 376
307 def _step_cursor(self, step_name): 377 class StepStream(AnnotationStepStream):
308 if self._current_step != step_name: 378 def __init__(self, engine, step_name):
309 self.output_root_annotation('STEP_CURSOR', step_name) 379 AnnotationStepStream.__init__(
310 self._current_step = step_name 380 self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn)
311 if step_name not in self._opened:
312 self.output_current_time()
313 self.output_root_annotation('STEP_STARTED')
314 self._opened.add(step_name)
315
316 class StepStream(StreamEngine.StepStream):
317 def __init__(self, engine, outstream, step_name):
318 super(StreamEngine.StepStream, self).__init__()
319
320 self._engine = engine 381 self._engine = engine
321 self._outstream = outstream
322 self._step_name = step_name 382 self._step_name = step_name
323 383
324 def basic_write(self, line): 384 def basic_write(self, line):
325 self._engine._step_cursor(self._step_name) 385 self._engine._step_cursor(self._step_name)
326 self._outstream.write(line) 386 self._engine._outstream.write(line)
327
328 def close(self):
329 self._engine.output_current_time(step=self._step_name)
330 self.output_annotation('STEP_CLOSED')
331
332 def output_annotation(self, *args):
333 self._engine._step_cursor(self._step_name)
334 self._engine.write_annotation(self._outstream, *args)
335
336 def write_line(self, line):
337 if line.startswith('@@@'):
338 self.basic_write('!' + line + '\n')
339 else:
340 self.basic_write(line + '\n')
341
342 def new_log_stream(self, log_name):
343 return self._engine.StepLogStream(self, log_name)
344
345 def add_step_text(self, text):
346 self.output_annotation('STEP_TEXT', text)
347
348 def add_step_summary_text(self, text):
349 self.output_annotation('STEP_SUMMARY_TEXT', text)
350
351 def add_step_link(self, name, url):
352 self.output_annotation('STEP_LINK', name, url)
353
354 def set_step_status(self, status):
355 if status == 'SUCCESS':
356 pass
357 elif status == 'WARNING':
358 self.output_annotation('STEP_WARNINGS')
359 elif status == 'FAILURE':
360 self.output_annotation('STEP_FAILURE')
361 elif status == 'EXCEPTION':
362 self.output_annotation('STEP_EXCEPTION')
363 else:
364 raise Exception('Impossible status %s' % status)
365
366 def set_build_property(self, key, value):
367 self.output_annotation('SET_BUILD_PROPERTY', key, value)
368
369 def trigger(self, spec):
370 self.output_annotation('STEP_TRIGGER', spec)
371
372
373 class StepLogStream(StreamEngine.Stream):
374 def __init__(self, step_stream, log_name):
375 self._step_stream = step_stream
376 self._log_name = log_name.replace('/', '/')
377
378 def write_line(self, line):
379 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
380
381 def close(self):
382 self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
383
384 387
385 class AllowSubannotationsStepStream(StepStream): 388 class AllowSubannotationsStepStream(StepStream):
386 def write_line(self, line): 389 def write_line(self, line):
387 self.basic_write(line + '\n') 390 self.basic_write(line + '\n')
388 391
389 # HACK(luqui): If the subannotator script changes the active step, we need 392 # HACK(luqui): If the subannotator script changes the active step, we need
390 # a way to get back to the real step that spawned the script. The right 393 # a way to get back to the real step that spawned the script. The right
391 # way to do that is to parse the annotation stream and re-emit it. But for 394 # way to do that is to parse the annotation stream and re-emit it. But for
392 # now we just provide this method. 395 # now we just provide this method.
393 def reset_subannotation_state(self): 396 def reset_subannotation_state(self):
394 self._engine._current_step = None 397 self._engine._current_step = None
395 398
399 def new_step_stream(self, step_name, allow_subannotations=False):
400 self.output_annotation('SEED_STEP', step_name)
401 if allow_subannotations:
402 return self.AllowSubannotationsStepStream(self, step_name)
403 else:
404 return self.StepStream(self, step_name)
396 405
397 def _new_step_stream(self, step_name, allow_subannotations, nest_level):
398 return self._create_step_stream(step_name, self._outstream,
399 allow_subannotations, nest_level)
400 406
401 def _create_step_stream(self, step_name, outstream, allow_subannotations, 407 class BareAnnotationStepStream(AnnotationStepStream):
402 nest_level): 408 """A StepStream that is not tied to any engine, and emits assuming it has the
403 self.output_root_annotation('SEED_STEP', step_name) 409 cursor.
404 if allow_subannotations:
405 stream = self.AllowSubannotationsStepStream(self, outstream, step_name)
406 else:
407 stream = self.StepStream(self, outstream, step_name)
408 410
409 if nest_level > 0: 411 This is used for capturing the annotations in the engine.
410 # Emit our current nest level, if we are nested. 412 """
411 stream.output_annotation('STEP_NEST_LEVEL', str(nest_level)) 413 def __init__(self, outstream):
412 return stream 414 self._outstream = outstream
415
416 def basic_write(self, line):
417 self._outstream.write(encode_str(line))
413 418
414 419
415 def encode_str(s): 420 def encode_str(s):
416 """Tries to encode a string into a python str type. 421 """Tries to encode a string into a python str type.
417 422
418 Currently buildbot only supports ascii. If we have an error decoding the 423 Currently buildbot only supports ascii. If we have an error decoding the
419 string (which means it might not be valid ascii), we decode the string with 424 string (which means it might not be valid ascii), we decode the string with
420 the 'replace' error mode, which replaces invalid characters with a suitable 425 the 'replace' error mode, which replaces invalid characters with a suitable
421 replacement character. 426 replacement character.
422 """ 427 """
423 try: 428 try:
424 return str(s) 429 return str(s)
425 except UnicodeEncodeError: 430 except UnicodeEncodeError:
426 return s.encode('utf-8', 'replace') 431 return s.encode('utf-8', 'replace')
427 except UnicodeDecodeError: 432 except UnicodeDecodeError:
428 return s.decode('utf-8', 'replace') 433 return s.decode('utf-8', 'replace')
OLDNEW
« no previous file with comments | « recipe_engine/step_runner.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698