Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(250)

Side by Side Diff: recipe_engine/stream.py

Issue 2245113002: Track step nesting in StreamEngine. (Closed) Base URL: https://github.com/luci/recipes-py@emit-initial-properties
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 55
56 def add_step_link(self, name, url): 56 def add_step_link(self, name, url):
57 raise NotImplementedError() 57 raise NotImplementedError()
58 58
59 def reset_subannotation_state(self): 59 def reset_subannotation_state(self):
60 pass 60 pass
61 61
62 def set_step_status(self, status): 62 def set_step_status(self, status):
63 raise NotImplementedError() 63 raise NotImplementedError()
64 64
65 def set_nest_level(self, nest_level):
dnj 2016/08/15 17:33:53 diediediediediediedie
66 raise NotImplementedError()
67
68 def set_build_property(self, key, value): 65 def set_build_property(self, key, value):
69 raise NotImplementedError() 66 raise NotImplementedError()
70 67
71 def trigger(self, trigger_spec): 68 def trigger(self, trigger_spec):
72 raise NotImplementedError() 69 raise NotImplementedError()
73 70
74 def new_step_stream(self, step_name, allow_subannotations=False): 71 def new_step_stream(self, step_name, allow_subannotations=False,
72 nested=False):
75 """Creates a new StepStream in this engine. 73 """Creates a new StepStream in this engine.
76 74
77 The step will be considered started at the moment this method is called. 75 The step will be considered started at the moment this method is called.
78 76
79 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow 77 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow
80 annotations that this step emits through to the annotator (True), or 78 annotations that this step emits through to the annotator (True), or
81 guard them by prefixing them with ! (False). The proper way to do this 79 guard them by prefixing them with ! (False). The proper way to do this
82 is to implement an annotations parser that converts to StreamEngine calls; 80 is to implement an annotations parser that converts to StreamEngine calls;
83 i.e. parse -> re-emit. 81 i.e. parse -> re-emit.
82
83 Args:
84 nested (bool): If True, this is a nested step, and should be visually
85 nested under the current step.
84 """ 86 """
85
86 raise NotImplementedError() 87 raise NotImplementedError()
87 88
88 def open(self): 89 def open(self):
89 pass 90 pass
90 91
91 def close(self): 92 def close(self):
92 pass 93 pass
93 94
94 def __enter__(self): 95 def __enter__(self):
95 self.open() 96 self.open()
96 return self 97 return self
97 98
98 def __exit__(self, _exc_type, _exc_val, _exc_tb): 99 def __exit__(self, _exc_type, _exc_val, _exc_tb):
99 self.close() 100 self.close()
100 101
101 102
102 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can 103 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
103 # form products. This code is entirely mechanical from the types (if we 104 # form products. This code is entirely mechanical from the types (if we
104 # had them formalized...). 105 # had them formalized...).
105 class ProductStreamEngine(StreamEngine): 106 class ProductStreamEngine(StreamEngine):
106 def __init__(self, engine_a, engine_b): 107 def __init__(self, engine_a, engine_b):
108 assert engine_a and engine_b
dnj 2016/08/15 17:33:53 Useful assertions, not relevant to CL.
107 self._engine_a = engine_a 109 self._engine_a = engine_a
108 self._engine_b = engine_b 110 self._engine_b = engine_b
109 111
110 class Stream(StreamEngine.Stream): 112 class Stream(StreamEngine.Stream):
111 def __init__(self, stream_a, stream_b): 113 def __init__(self, stream_a, stream_b):
114 assert stream_a and stream_b
112 self._stream_a = stream_a 115 self._stream_a = stream_a
113 self._stream_b = stream_b 116 self._stream_b = stream_b
114 117
115 def write_line(self, line): 118 def write_line(self, line):
116 self._stream_a.write_line(line) 119 self._stream_a.write_line(line)
117 self._stream_b.write_line(line) 120 self._stream_b.write_line(line)
118 121
119 def close(self): 122 def close(self):
120 self._stream_a.close() 123 self._stream_a.close()
121 self._stream_b.close() 124 self._stream_b.close()
122 125
123 class StepStream(Stream): 126 class StepStream(Stream):
124 def _void_product(method_name): 127 def _void_product(method_name):
125 def inner(self, *args): 128 def inner(self, *args):
126 getattr(self._stream_a, method_name)(*args) 129 getattr(self._stream_a, method_name)(*args)
127 getattr(self._stream_b, method_name)(*args) 130 getattr(self._stream_b, method_name)(*args)
128 return inner 131 return inner
129 132
130 def new_log_stream(self, log_name): 133 def new_log_stream(self, log_name):
131 return ProductStreamEngine.Stream( 134 return ProductStreamEngine.Stream(
132 self._stream_a.new_log_stream(log_name), 135 self._stream_a.new_log_stream(log_name),
133 self._stream_b.new_log_stream(log_name)) 136 self._stream_b.new_log_stream(log_name))
134 137
135 add_step_text = _void_product('add_step_text') 138 add_step_text = _void_product('add_step_text')
136 add_step_summary_text = _void_product('add_step_summary_text') 139 add_step_summary_text = _void_product('add_step_summary_text')
137 add_step_link = _void_product('add_step_link') 140 add_step_link = _void_product('add_step_link')
138 reset_subannotation_state = _void_product('reset_subannotation_state') 141 reset_subannotation_state = _void_product('reset_subannotation_state')
139 set_step_status = _void_product('set_step_status') 142 set_step_status = _void_product('set_step_status')
140 set_nest_level = _void_product('set_nest_level')
141 set_build_property = _void_product('set_build_property') 143 set_build_property = _void_product('set_build_property')
142 trigger = _void_product('trigger') 144 trigger = _void_product('trigger')
143 145
144 def new_step_stream(self, step_name, allow_subannotations=False): 146 def new_step_stream(self, step_name, **kwargs):
145 return self.StepStream( 147 return self.StepStream(
146 self._engine_a.new_step_stream(step_name, allow_subannotations), 148 self._engine_a.new_step_stream(step_name, **kwargs),
147 self._engine_b.new_step_stream(step_name, allow_subannotations)) 149 self._engine_b.new_step_stream(step_name, **kwargs))
148 150
149 def open(self): 151 def open(self):
150 self._engine_a.open() 152 self._engine_a.open()
151 self._engine_b.open() 153 self._engine_b.open()
152 154
153 def close(self): 155 def close(self):
154 self._engine_a.close() 156 self._engine_a.close()
155 self._engine_b.close() 157 self._engine_b.close()
156 158
157 159
158 def _noop(*args, **kwargs): 160 def _noop(*args, **kwargs):
159 pass 161 pass
160 162
161 class NoopStreamEngine(StreamEngine): 163 class NoopStreamEngine(StreamEngine):
162 class Stream(StreamEngine.Stream): 164 class Stream(StreamEngine.Stream):
163 write_line = _noop 165 write_line = _noop
164 close = _noop 166 close = _noop
165 167
166 class StepStream(Stream): 168 class StepStream(Stream):
167 def new_log_stream(self, log_name): 169 def new_log_stream(self, log_name):
168 return NoopStreamEngine.Stream() 170 return NoopStreamEngine.Stream()
169 add_step_text = _noop 171 add_step_text = _noop
170 add_step_summary_text = _noop 172 add_step_summary_text = _noop
171 add_step_link = _noop 173 add_step_link = _noop
172 reset_subannotation_state = _noop 174 reset_subannotation_state = _noop
173 set_step_status = _noop 175 set_step_status = _noop
174 set_nest_level = _noop
175 set_build_property = _noop 176 set_build_property = _noop
176 trigger = _noop 177 trigger = _noop
177 178
178 def new_step_stream(self, step_name, allow_subannotations=False): 179 def new_step_stream(self, step_name, **kwargs):
179 return self.StepStream() 180 return self.StepStream()
180 181
181 182
182 class StreamEngineInvariants(StreamEngine): 183 class StreamEngineInvariants(StreamEngine):
183 """Checks that the users are using a StreamEngine hygenically. 184 """Checks that the users are using a StreamEngine hygenically.
184 185
185 Multiply with actually functional StreamEngines so you don't have to check 186 Multiply with actually functional StreamEngines so you don't have to check
186 these all over the place. 187 these all over the place.
187 """ 188 """
188 def __init__(self): 189 def __init__(self):
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
226 assert isinstance(url, basestring), 'Link url %s is not a string' % url 227 assert isinstance(url, basestring), 'Link url %s is not a string' % url
227 228
228 def set_step_status(self, status): 229 def set_step_status(self, status):
229 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION') 230 assert status in ('SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION')
230 if status == 'SUCCESS': 231 if status == 'SUCCESS':
231 # A constraint imposed by the annotations implementation 232 # A constraint imposed by the annotations implementation
232 assert self._status == 'SUCCESS', ( 233 assert self._status == 'SUCCESS', (
233 'Cannot set successful status after status is %s' % self._status) 234 'Cannot set successful status after status is %s' % self._status)
234 self._status = status 235 self._status = status
235 236
236 def set_nest_level(self, nest_level):
237 assert isinstance(nest_level, int)
238
239 def set_build_property(self, key, value): 237 def set_build_property(self, key, value):
240 pass 238 pass
241 239
242 def trigger(self, spec): 240 def trigger(self, spec):
243 assert '\n' not in spec # Spec must fit on one line. 241 assert '\n' not in spec # Spec must fit on one line.
244 json.loads(spec) # Spec must be a valid json object. 242 json.loads(spec) # Spec must be a valid json object.
245 243
246 class LogStream(StreamEngine.Stream): 244 class LogStream(StreamEngine.Stream):
247 def __init__(self, step_stream, log_name): 245 def __init__(self, step_stream, log_name):
248 self._step_stream = step_stream 246 self._step_stream = step_stream
249 self._log_name = log_name 247 self._log_name = log_name
250 self._open = True 248 self._open = True
251 249
252 def write_line(self, line): 250 def write_line(self, line):
253 assert '\n' not in line 251 assert '\n' not in line
254 assert self._step_stream._open 252 assert self._step_stream._open
255 assert self._open 253 assert self._open
256 254
257 def close(self): 255 def close(self):
258 assert self._step_stream._open 256 assert self._step_stream._open
259 assert self._open 257 assert self._open
260 self._open = False 258 self._open = False
261 259
262 def new_step_stream(self, step_name, allow_subannotations=False): 260 def new_step_stream(self, step_name, **kwargs):
263 assert step_name not in self._streams, 'Step %s already exists' % step_name 261 assert step_name not in self._streams, 'Step %s already exists' % step_name
264 self._streams.add(step_name) 262 self._streams.add(step_name)
265 return self.StepStream(self, step_name) 263 return self.StepStream(self, step_name)
266 264
267 265
268 class AnnotationStepStream(StreamEngine.StepStream):
dnj 2016/08/15 17:33:53 Because we no longer have a separate simulation An
269 def __init__(self, emit_timestamps=False, time_fn=None):
270 self.emit_timestamps = emit_timestamps
271 self.time_fn = time_fn or time.time
272
273 def basic_write(self, line):
274 raise NotImplementedError()
275
276 def output_annotation(self, *args):
277 self.basic_write(
278 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
279
280 def write_line(self, line):
281 if line.startswith('@@@'):
282 self.basic_write('!' + line + '\n')
283 else:
284 self.basic_write(line + '\n')
285
286 def close(self):
287 if self.emit_timestamps:
288 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
289 self.output_annotation('STEP_CLOSED')
290
291 def new_log_stream(self, log_name):
292 return self.StepLogStream(self, log_name)
293
294 def add_step_text(self, text):
295 self.output_annotation('STEP_TEXT', text)
296
297 def add_step_summary_text(self, text):
298 self.output_annotation('STEP_SUMMARY_TEXT', text)
299
300 def add_step_link(self, name, url):
301 self.output_annotation('STEP_LINK', name, url)
302
303 def set_step_status(self, status):
304 if status == 'SUCCESS':
305 pass
306 elif status == 'WARNING':
307 self.output_annotation('STEP_WARNINGS')
308 elif status == 'FAILURE':
309 self.output_annotation('STEP_FAILURE')
310 elif status == 'EXCEPTION':
311 self.output_annotation('STEP_EXCEPTION')
312 else:
313 raise Exception('Impossible status %s' % status)
314
315 def set_nest_level(self, nest_level):
316 self.output_annotation('STEP_NEST_LEVEL', str(nest_level))
317
318 def set_build_property(self, key, value):
319 self.output_annotation('SET_BUILD_PROPERTY', key, value)
320
321 def trigger(self, spec):
322 self.output_annotation('STEP_TRIGGER', spec)
323
324 class StepLogStream(StreamEngine.Stream):
325 def __init__(self, step_stream, log_name):
326 self._step_stream = step_stream
327 self._log_name = log_name.replace('/', '/')
328
329 def write_line(self, line):
330 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
331
332 def close(self):
333 self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
334
335
336 class AnnotatorStreamEngine(StreamEngine): 266 class AnnotatorStreamEngine(StreamEngine):
337 def __init__(self, outstream, emit_timestamps=False, time_fn=None): 267 def __init__(self, outstream, emit_timestamps=False, time_fn=None):
338 self._current_step = None 268 self._current_step = None
269 self._current_nest_level = 0
dnj 2016/08/15 17:33:53 The AnnotatorStreamEngine now tracks the nest leve
339 self._opened = set() 270 self._opened = set()
340 self._outstream = outstream 271 self._outstream = outstream
341 self.emit_timestamps = emit_timestamps 272 self.emit_timestamps = emit_timestamps
342 self.time_fn = time_fn or time.time 273 self.time_fn = time_fn or time.time
343 274
344 def open(self): 275 def open(self):
345 super(AnnotatorStreamEngine, self).open() 276 super(AnnotatorStreamEngine, self).open()
346 if self.emit_timestamps: 277 self.output_current_time()
347 self.output_current_time() 278 self.output_root_annotation('HONOR_ZERO_RETURN_CODE')
348 self.output_annotation('HONOR_ZERO_RETURN_CODE')
349 279
350 def close(self): 280 def close(self):
351 super(AnnotatorStreamEngine, self).close() 281 super(AnnotatorStreamEngine, self).close()
282 self.output_current_time()
283
284 def output_current_time(self, step=None):
285 """Prints CURRENT_TIMESTAMP annotation with current time."""
286 if step:
287 self._step_cursor(step)
352 if self.emit_timestamps: 288 if self.emit_timestamps:
353 self.output_current_time() 289 self.output_root_annotation('CURRENT_TIMESTAMP', self.time_fn())
354 290
355 def output_current_time(self): 291 @staticmethod
356 """Prints CURRENT_TIMESTAMP annotation with current time.""" 292 def write_annotation(outstream, *args):
357 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
358
359 def output_annotation(self, *args):
360 # Flush the stream before & after engine annotations, because they can 293 # Flush the stream before & after engine annotations, because they can
361 # change which step we are talking about and this matters to buildbot. 294 # change which step we are talking about and this matters to buildbot.
362 self._outstream.flush() 295 outstream.flush()
363 self._outstream.write( 296 outstream.write(
364 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n') 297 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
365 self._outstream.flush() 298 outstream.flush()
299
300 def output_root_annotation(self, *args):
301 self.write_annotation(self._outstream, *args)
366 302
367 def _step_cursor(self, name): 303 def _step_cursor(self, name):
368 if self._current_step != name: 304 if self._current_step != name:
369 self.output_annotation('STEP_CURSOR', name) 305 self.output_root_annotation('STEP_CURSOR', name)
370 self._current_step = name 306 self._current_step = name
371 if name not in self._opened: 307 if name not in self._opened:
372 if self.emit_timestamps: 308 self.output_current_time()
373 self.output_current_time() 309 self.output_root_annotation('STEP_STARTED')
374 self.output_annotation('STEP_STARTED')
375 self._opened.add(name) 310 self._opened.add(name)
376 311
377 class StepStream(AnnotationStepStream): 312 class StepStream(StreamEngine.StepStream):
378 def __init__(self, engine, step_name): 313 def __init__(self, engine, outstream, step_name, nested):
379 AnnotationStepStream.__init__( 314 super(StreamEngine.StepStream, self).__init__()
380 self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn) 315
381 self._engine = engine 316 self._engine = engine
317 self._outstream = outstream
382 self._step_name = step_name 318 self._step_name = step_name
319 self._nested = nested
383 320
384 def basic_write(self, line): 321 def basic_write(self, line):
385 self._engine._step_cursor(self._step_name) 322 self._engine._step_cursor(self._step_name)
386 self._engine._outstream.write(line) 323 self._outstream.write(line)
324
325 def close(self):
326 self._engine._notify_step_finished(self)
dnj 2016/08/15 17:33:53 This is new, see below.
327
328 def output_annotation(self, *args):
dnj 2016/08/15 17:33:53 Ack, am going to rewrite this in terms of 'write_a
329 self.basic_write(
330 '@@@' + '@'.join(map(encode_str, args)) + '@@@\n')
331
332 def write_line(self, line):
dnj 2016/08/15 17:33:53 (All of this is copy/paste from outer class when i
333 if line.startswith('@@@'):
334 self.basic_write('!' + line + '\n')
335 else:
336 self.basic_write(line + '\n')
337
338 def new_log_stream(self, log_name):
339 return self._engine.StepLogStream(self, log_name)
340
341 def add_step_text(self, text):
342 self.output_annotation('STEP_TEXT', text)
343
344 def add_step_summary_text(self, text):
345 self.output_annotation('STEP_SUMMARY_TEXT', text)
346
347 def add_step_link(self, name, url):
348 self.output_annotation('STEP_LINK', name, url)
349
350 def set_step_status(self, status):
351 if status == 'SUCCESS':
352 pass
353 elif status == 'WARNING':
354 self.output_annotation('STEP_WARNINGS')
355 elif status == 'FAILURE':
356 self.output_annotation('STEP_FAILURE')
357 elif status == 'EXCEPTION':
358 self.output_annotation('STEP_EXCEPTION')
359 else:
360 raise Exception('Impossible status %s' % status)
361
362 def set_build_property(self, key, value):
363 self.output_annotation('SET_BUILD_PROPERTY', key, value)
364
365 def trigger(self, spec):
366 self.output_annotation('STEP_TRIGGER', spec)
367
368
369 class StepLogStream(StreamEngine.Stream):
370 def __init__(self, step_stream, log_name):
371 self._step_stream = step_stream
372 self._log_name = log_name.replace('/', '/')
373
374 def write_line(self, line):
375 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
376
377 def close(self):
378 self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
379
387 380
388 class AllowSubannotationsStepStream(StepStream): 381 class AllowSubannotationsStepStream(StepStream):
389 def write_line(self, line): 382 def write_line(self, line):
390 self.basic_write(line + '\n') 383 self.basic_write(line + '\n')
391 384
392 # HACK(luqui): If the subannotator script changes the active step, we need 385 # HACK(luqui): If the subannotator script changes the active step, we need
393 # a way to get back to the real step that spawned the script. The right 386 # a way to get back to the real step that spawned the script. The right
394 # way to do that is to parse the annotation stream and re-emit it. But for 387 # way to do that is to parse the annotation stream and re-emit it. But for
395 # now we just provide this method. 388 # now we just provide this method.
396 def reset_subannotation_state(self): 389 def reset_subannotation_state(self):
397 self._engine._current_step = None 390 self._engine._current_step = None
398 391
399 def new_step_stream(self, step_name, allow_subannotations=False): 392
400 self.output_annotation('SEED_STEP', step_name) 393 def new_step_stream(self, step_name, allow_subannotations=False,
394 nested=False):
395 self.output_root_annotation('SEED_STEP', step_name)
396 return self._create_step_stream(step_name, self._outstream,
397 allow_subannotations, nested)
398
399 def _create_step_stream(self, step_name, outstream,
400 allow_subannotations, nested):
401 if allow_subannotations: 401 if allow_subannotations:
402 return self.AllowSubannotationsStepStream(self, step_name) 402 stream = self.AllowSubannotationsStepStream(self, outstream, step_name,
403 nested)
403 else: 404 else:
404 return self.StepStream(self, step_name) 405 stream = self.StepStream(self, outstream, step_name, nested)
405 406
407 if nested:
408 # Increase our current nest level and emit a nested annotation for this
409 # step. This will be decreased when the step finishes.
410 self._current_nest_level += 1
411 stream.output_annotation('STEP_NEST_LEVEL', str(self._current_nest_level))
412 return stream
406 413
407 class BareAnnotationStepStream(AnnotationStepStream): 414 def _notify_step_finished(self, step_stream):
408 """A StepStream that is not tied to any engine, and emits assuming it has the 415 if step_stream._nested:
dnj 2016/08/15 17:33:53 (Called when a step is closed. I put this here ins
409 cursor. 416 orig = self._current_nest_level
417 self._current_nest_level -= 1
418 assert self._current_nest_level >= 0
410 419
411 This is used for capturing the annotations in the engine. 420 self.output_current_time(step=step_stream._step_name)
412 """ 421 step_stream.output_annotation('STEP_CLOSED')
413 def __init__(self, outstream):
414 self._outstream = outstream
415
416 def basic_write(self, line):
417 self._outstream.write(encode_str(line))
418 422
419 423
420 def encode_str(s): 424 def encode_str(s):
421 """Tries to encode a string into a python str type. 425 """Tries to encode a string into a python str type.
422 426
423 Currently buildbot only supports ascii. If we have an error decoding the 427 Currently buildbot only supports ascii. If we have an error decoding the
424 string (which means it might not be valid ascii), we decode the string with 428 string (which means it might not be valid ascii), we decode the string with
425 the 'replace' error mode, which replaces invalid characters with a suitable 429 the 'replace' error mode, which replaces invalid characters with a suitable
426 replacement character. 430 replacement character.
427 """ 431 """
428 try: 432 try:
429 return str(s) 433 return str(s)
430 except UnicodeEncodeError: 434 except UnicodeEncodeError:
431 return s.encode('utf-8', 'replace') 435 return s.encode('utf-8', 'replace')
432 except UnicodeDecodeError: 436 except UnicodeDecodeError:
433 return s.decode('utf-8', 'replace') 437 return s.decode('utf-8', 'replace')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698