Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(484)

Side by Side Diff: recipe_engine/stream.py

Issue 2052543003: Emit CURRENT_TIMESTAMP annotation (Closed) Base URL: git@github.com:luci/recipes-py.git@master
Patch Set: nit Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/simulation_test.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
11 11
12 StreamEngine will coordinate the multiplexing of streams. In the case of 12 StreamEngine will coordinate the multiplexing of streams. In the case of
13 annotations, this involves keeping track of the STEP_CURSOR and setting it 13 annotations, this involves keeping track of the STEP_CURSOR and setting it
14 accordingly, as well as filtering @@@ lines. 14 accordingly, as well as filtering @@@ lines.
15 15
16 Stream is a virtual well-behaved stream (associated with an Engine) which you 16 Stream is a virtual well-behaved stream (associated with an Engine) which you
17 can just write to without worrying. 17 can just write to without worrying.
18 """ 18 """
19 19
20 import json 20 import json
21 import time
21 22
22 class StreamEngine(object): 23 class StreamEngine(object):
23 class Stream(object): 24 class Stream(object):
24 def write_line(self, line): 25 def write_line(self, line):
25 raise NotImplementedError() 26 raise NotImplementedError()
26 27
27 def write_split(self, string): 28 def write_split(self, string):
28 """Write a string (which may contain newlines) to the stream. It will 29 """Write a string (which may contain newlines) to the stream. It will
29 be terminated by a newline.""" 30 be terminated by a newline."""
30 for actual_line in string.splitlines() or ['']: # preserve empty lines 31 for actual_line in string.splitlines() or ['']: # preserve empty lines
(...skipping 30 matching lines...) Expand all
61 def set_nest_level(self, nest_level): 62 def set_nest_level(self, nest_level):
62 raise NotImplementedError() 63 raise NotImplementedError()
63 64
64 def set_build_property(self, key, value): 65 def set_build_property(self, key, value):
65 raise NotImplementedError() 66 raise NotImplementedError()
66 67
67 def trigger(self, trigger_spec): 68 def trigger(self, trigger_spec):
68 raise NotImplementedError() 69 raise NotImplementedError()
69 70
70 def new_step_stream(self, step_name, allow_subannotations=False): 71 def new_step_stream(self, step_name, allow_subannotations=False):
71 """Craete a new StepStream in this engine. 72 """Creates a new StepStream in this engine.
72 73
73 The step will be considered started at the moment this method is called. 74 The step will be considered started at the moment this method is called.
74 75
75 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow 76 TODO(luqui): allow_subannotations is a bit of a hack, whether to allow
76 annotations that this step emits through to the annotator (True), or 77 annotations that this step emits through to the annotator (True), or
77 guard them by prefixing them with ! (False). The proper way to do this 78 guard them by prefixing them with ! (False). The proper way to do this
78 is to implement an annotations parser that converts to StreamEngine calls; 79 is to implement an annotations parser that converts to StreamEngine calls;
79 i.e. parse -> re-emit. 80 i.e. parse -> re-emit.
80 """ 81 """
81 82
82 raise NotImplementedError() 83 raise NotImplementedError()
83 84
85 def open(self):
86 pass
87
88 def close(self):
89 pass
90
91 def __enter__(self):
92 self.open()
93 return self
94
95 def __exit__(self, _exc_type, _exc_val, _exc_tb):
96 self.close()
97
84 98
85 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can 99 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
86 # form products. This code is entirely mechanical from the types (if we 100 # form products. This code is entirely mechanical from the types (if we
87 # had them formalized...). 101 # had them formalized...).
88 class ProductStreamEngine(StreamEngine): 102 class ProductStreamEngine(StreamEngine):
89 def __init__(self, engine_a, engine_b): 103 def __init__(self, engine_a, engine_b):
90 self._engine_a = engine_a 104 self._engine_a = engine_a
91 self._engine_b = engine_b 105 self._engine_b = engine_b
92 106
93 class Stream(StreamEngine.Stream): 107 class Stream(StreamEngine.Stream):
(...skipping 28 matching lines...) Expand all
122 set_step_status = _void_product('set_step_status') 136 set_step_status = _void_product('set_step_status')
123 set_nest_level = _void_product('set_nest_level') 137 set_nest_level = _void_product('set_nest_level')
124 set_build_property = _void_product('set_build_property') 138 set_build_property = _void_product('set_build_property')
125 trigger = _void_product('trigger') 139 trigger = _void_product('trigger')
126 140
127 def new_step_stream(self, step_name, allow_subannotations=False): 141 def new_step_stream(self, step_name, allow_subannotations=False):
128 return self.StepStream( 142 return self.StepStream(
129 self._engine_a.new_step_stream(step_name, allow_subannotations), 143 self._engine_a.new_step_stream(step_name, allow_subannotations),
130 self._engine_b.new_step_stream(step_name, allow_subannotations)) 144 self._engine_b.new_step_stream(step_name, allow_subannotations))
131 145
146 def open(self):
147 self._engine_a.open()
148 self._engine_b.open()
149
150 def close(self):
151 self._engine_a.close()
152 self._engine_b.close()
153
132 154
133 def _noop(*args, **kwargs): 155 def _noop(*args, **kwargs):
134 pass 156 pass
135 157
136 class NoopStreamEngine(StreamEngine): 158 class NoopStreamEngine(StreamEngine):
137 class Stream(StreamEngine.Stream): 159 class Stream(StreamEngine.Stream):
138 write_line = _noop 160 write_line = _noop
139 close = _noop 161 close = _noop
140 162
141 class StepStream(Stream): 163 class StepStream(Stream):
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
234 assert self._open 256 assert self._open
235 self._open = False 257 self._open = False
236 258
237 def new_step_stream(self, step_name, allow_subannotations=False): 259 def new_step_stream(self, step_name, allow_subannotations=False):
238 assert step_name not in self._streams, 'Step %s already exists' % step_name 260 assert step_name not in self._streams, 'Step %s already exists' % step_name
239 self._streams.add(step_name) 261 self._streams.add(step_name)
240 return self.StepStream(self, step_name) 262 return self.StepStream(self, step_name)
241 263
242 264
243 class AnnotationStepStream(StreamEngine.StepStream): 265 class AnnotationStepStream(StreamEngine.StepStream):
266 def __init__(self, emit_timestamps=False, time_fn=None):
267 self.emit_timestamps = emit_timestamps
268 self.time_fn = time_fn or time.time
269
244 def basic_write(self, line): 270 def basic_write(self, line):
245 raise NotImplementedError() 271 raise NotImplementedError()
246 272
247 def output_annotation(self, *args): 273 def output_annotation(self, *args):
248 self.basic_write('@@@' + '@'.join(args) + '@@@\n') 274 self.basic_write('@@@' + '@'.join(map(str, args)) + '@@@\n')
249 275
250 def write_line(self, line): 276 def write_line(self, line):
251 if line.startswith('@@@'): 277 if line.startswith('@@@'):
252 self.basic_write('!' + line + '\n') 278 self.basic_write('!' + line + '\n')
253 else: 279 else:
254 self.basic_write(line + '\n') 280 self.basic_write(line + '\n')
255 281
256 def close(self): 282 def close(self):
283 if self.emit_timestamps:
284 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
257 self.output_annotation('STEP_CLOSED') 285 self.output_annotation('STEP_CLOSED')
258 286
259 def new_log_stream(self, log_name): 287 def new_log_stream(self, log_name):
260 return self.StepLogStream(self, log_name) 288 return self.StepLogStream(self, log_name)
261 289
262 def add_step_text(self, text): 290 def add_step_text(self, text):
263 self.output_annotation('STEP_TEXT', text) 291 self.output_annotation('STEP_TEXT', text)
264 292
265 def add_step_summary_text(self, text): 293 def add_step_summary_text(self, text):
266 self.output_annotation('STEP_SUMMARY_TEXT', text) 294 self.output_annotation('STEP_SUMMARY_TEXT', text)
(...skipping 28 matching lines...) Expand all
295 self._log_name = log_name.replace('/', '/') 323 self._log_name = log_name.replace('/', '/')
296 324
297 def write_line(self, line): 325 def write_line(self, line):
298 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line) 326 self._step_stream.output_annotation('STEP_LOG_LINE', self._log_name, line)
299 327
300 def close(self): 328 def close(self):
301 self._step_stream.output_annotation('STEP_LOG_END', self._log_name) 329 self._step_stream.output_annotation('STEP_LOG_END', self._log_name)
302 330
303 331
304 class AnnotatorStreamEngine(StreamEngine): 332 class AnnotatorStreamEngine(StreamEngine):
305 def __init__(self, outstream): 333 def __init__(self, outstream, emit_timestamps=False, time_fn=None):
306 self._current_step = None 334 self._current_step = None
307 self._opened = set() 335 self._opened = set()
308 self._outstream = outstream 336 self._outstream = outstream
337 self.emit_timestamps = emit_timestamps
338 self.time_fn = time_fn or time.time
339
340 def open(self):
341 super(AnnotatorStreamEngine, self).open()
342 if self.emit_timestamps:
343 self.output_current_time()
309 self.output_annotation('HONOR_ZERO_RETURN_CODE') 344 self.output_annotation('HONOR_ZERO_RETURN_CODE')
310 345
346 def close(self):
347 super(AnnotatorStreamEngine, self).close()
348 if self.emit_timestamps:
349 self.output_current_time()
350
351 def output_current_time(self):
352 """Prints CURRENT_TIMESTAMP annotation with current time."""
353 self.output_annotation('CURRENT_TIMESTAMP', self.time_fn())
354
311 def output_annotation(self, *args): 355 def output_annotation(self, *args):
312 # Flush the stream before & after engine annotations, because they can 356 # Flush the stream before & after engine annotations, because they can
313 # change which step we are talking about and this matters to buildbot. 357 # change which step we are talking about and this matters to buildbot.
314 self._outstream.flush() 358 self._outstream.flush()
315 self._outstream.write('@@@' + '@'.join(args) + '@@@\n') 359 self._outstream.write('@@@' + '@'.join(map(str, args)) + '@@@\n')
316 self._outstream.flush() 360 self._outstream.flush()
317 361
318 def _step_cursor(self, name): 362 def _step_cursor(self, name):
319 if self._current_step != name: 363 if self._current_step != name:
320 self.output_annotation('STEP_CURSOR', name) 364 self.output_annotation('STEP_CURSOR', name)
321 self._current_step = name 365 self._current_step = name
322 if name not in self._opened: 366 if name not in self._opened:
367 if self.emit_timestamps:
368 self.output_current_time()
323 self.output_annotation('STEP_STARTED') 369 self.output_annotation('STEP_STARTED')
324 self._opened.add(name) 370 self._opened.add(name)
325 371
326 class StepStream(AnnotationStepStream): 372 class StepStream(AnnotationStepStream):
327 def __init__(self, engine, step_name): 373 def __init__(self, engine, step_name):
374 AnnotationStepStream.__init__(
375 self, emit_timestamps=engine.emit_timestamps, time_fn=engine.time_fn)
328 self._engine = engine 376 self._engine = engine
329 self._step_name = step_name 377 self._step_name = step_name
330 378
331 def basic_write(self, line): 379 def basic_write(self, line):
332 self._engine._step_cursor(self._step_name) 380 self._engine._step_cursor(self._step_name)
333 self._engine._outstream.write(line) 381 self._engine._outstream.write(line)
334 382
335 class AllowSubannotationsStepStream(StepStream): 383 class AllowSubannotationsStepStream(StepStream):
336 def write_line(self, line): 384 def write_line(self, line):
337 self.basic_write(line + '\n') 385 self.basic_write(line + '\n')
(...skipping 17 matching lines...) Expand all
355 """A StepStream that is not tied to any engine, and emits assuming it has the 403 """A StepStream that is not tied to any engine, and emits assuming it has the
356 cursor. 404 cursor.
357 405
358 This is used for capturing the annotations in the engine. 406 This is used for capturing the annotations in the engine.
359 """ 407 """
360 def __init__(self, outstream): 408 def __init__(self, outstream):
361 self._outstream = outstream 409 self._outstream = outstream
362 410
363 def basic_write(self, line): 411 def basic_write(self, line):
364 self._outstream.write(line) 412 self._outstream.write(line)
365
OLDNEW
« no previous file with comments | « recipe_engine/simulation_test.py ('k') | recipe_engine/unittests/run_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698