Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(518)

Side by Side Diff: recipe_engine/stream.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Stronger flush meta logic, moar test. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/simulation_test.py ('k') | recipe_engine/stream_logdog.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
11 11
12 StreamEngine will coordinate the multiplexing of streams. In the case of 12 StreamEngine will coordinate the multiplexing of streams. In the case of
13 annotations, this involves keeping track of the STEP_CURSOR and setting it 13 annotations, this involves keeping track of the STEP_CURSOR and setting it
14 accordingly, as well as filtering @@@ lines. 14 accordingly, as well as filtering @@@ lines.
15 15
16 Stream is a virtual well-behaved stream (associated with an Engine) which you 16 Stream is a virtual well-behaved stream (associated with an Engine) which you
17 can just write to without worrying. 17 can just write to without worrying.
18 """ 18 """
19 19
20 import json 20 import json
21 import time 21 import time
22 22
23 from . import env 23 from . import env
24 from . import recipe_api 24 from . import recipe_api
25 from . import util
25 26
26 27
27 class StreamEngine(object): 28 class StreamEngine(object):
28 class Stream(object): 29 class Stream(object):
29 def write_line(self, line): 30 def write_line(self, line):
30 raise NotImplementedError() 31 raise NotImplementedError()
31 32
32 def write_split(self, string): 33 def write_split(self, string):
33 """Write a string (which may contain newlines) to the stream. It will 34 """Write a string (which may contain newlines) to the stream. It will
34 be terminated by a newline.""" 35 be terminated by a newline."""
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 pass 98 pass
98 99
99 def __enter__(self): 100 def __enter__(self):
100 self.open() 101 self.open()
101 return self 102 return self
102 103
103 def __exit__(self, _exc_type, _exc_val, _exc_tb): 104 def __exit__(self, _exc_type, _exc_val, _exc_tb):
104 self.close() 105 self.close()
105 106
106 107
107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
108 # form products. This code is entirely mechanical from the types (if we
109 # had them formalized...).
110 class ProductStreamEngine(StreamEngine): 108 class ProductStreamEngine(StreamEngine):
109 """A StreamEngine that forms the non-commutative product of two other
110 StreamEngines.
111
112 Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
113 form products. This code is entirely mechanical from the types (if we
114 had them formalized...).
115
116 This product is non-commutative, meaning order matters. Specifically, an
117 exception in "engine_a" will prevent "engine_b" from being evaluated.
118 """
119
111 def __init__(self, engine_a, engine_b): 120 def __init__(self, engine_a, engine_b):
112 assert engine_a and engine_b 121 assert engine_a and engine_b
113 self._engine_a = engine_a 122 self._engine_a = engine_a
114 self._engine_b = engine_b 123 self._engine_b = engine_b
115 124
116 class Stream(StreamEngine.Stream): 125 class Stream(StreamEngine.Stream):
117 def __init__(self, stream_a, stream_b): 126 def __init__(self, stream_a, stream_b):
118 assert stream_a and stream_b 127 assert stream_a and stream_b
119 self._stream_a = stream_a 128 self._stream_a = stream_a
120 self._stream_b = stream_b 129 self._stream_b = stream_b
121 130
122 def write_line(self, line): 131 def write_line(self, line):
123 self._stream_a.write_line(line) 132 self._stream_a.write_line(line)
124 self._stream_b.write_line(line) 133 self._stream_b.write_line(line)
125 134
126 def close(self): 135 def close(self):
127 self._stream_a.close() 136 self._stream_a.close()
128 self._stream_b.close() 137 self._stream_b.close()
129 138
130 class StepStream(Stream): 139 class StepStream(Stream):
140 # pylint: disable=no-self-argument
131 def _void_product(method_name): 141 def _void_product(method_name):
132 def inner(self, *args): 142 def inner(self, *args):
133 getattr(self._stream_a, method_name)(*args) 143 getattr(self._stream_a, method_name)(*args)
134 getattr(self._stream_b, method_name)(*args) 144 getattr(self._stream_b, method_name)(*args)
135 return inner 145 return inner
136 146
137 def new_log_stream(self, log_name): 147 def new_log_stream(self, log_name):
138 return ProductStreamEngine.Stream( 148 return ProductStreamEngine.Stream(
139 self._stream_a.new_log_stream(log_name), 149 self._stream_a.new_log_stream(log_name),
140 self._stream_b.new_log_stream(log_name)) 150 self._stream_b.new_log_stream(log_name))
(...skipping 13 matching lines...) Expand all
154 164
155 def open(self): 165 def open(self):
156 self._engine_a.open() 166 self._engine_a.open()
157 self._engine_b.open() 167 self._engine_b.open()
158 168
159 def close(self): 169 def close(self):
160 self._engine_a.close() 170 self._engine_a.close()
161 self._engine_b.close() 171 self._engine_b.close()
162 172
163 173
164 def _noop(*args, **kwargs): 174 class MultiStreamEngine(StreamEngine):
175 """A StreamEngine consisting of one or more inner StreamEngines.
176
177 A call to this StreamEngine will be distributed to the inner StreamEngines.
178 Any exceptions that are caught during an inner handler will be deferred until
179 all inner handlers have been executed.
180 """
181
182 def __init__(self, base, *engines):
183 self._engines = (base,) + engines
184 assert None not in self._engines
185
186 @classmethod
187 def create(cls, *engines):
188 assert len(engines) > 0, 'At least one engine must be provided.'
189 if len(engines) == 1:
190 return engines[0]
191 return cls(engines[0], *engines[1:])
192
193 class Stream(StreamEngine.Stream):
194 def __init__(self, *streams):
195 assert all(streams)
196 self._streams = streams
197
198 def write_line(self, line):
199 util.map_defer_exceptions(lambda s: s.write_line(line), self._streams)
200
201 def close(self):
202 util.map_defer_exceptions(lambda s: s.close(), self._streams)
203
204 class StepStream(Stream):
205 # pylint: disable=no-self-argument
206 def _multiplex(method_name):
207 def inner(self, *args):
208 util.map_defer_exceptions(lambda s: getattr(s, method_name)(*args),
209 self._streams)
210 return inner
211
212 def new_log_stream(self, log_name):
213 log_streams = []
214 try:
215 for s in self._streams:
216 log_streams.append(s.new_log_stream(log_name))
217 return MultiStreamEngine.Stream(*log_streams)
218 except Exception:
219 # Close any opened log streams.
220 util.map_defer_exceptions(lambda ls: ls.close(), log_streams)
221 raise
222
223 add_step_text = _multiplex('add_step_text')
224 add_step_summary_text = _multiplex('add_step_summary_text')
225 add_step_link = _multiplex('add_step_link')
226 reset_subannotation_state = _multiplex('reset_subannotation_state')
227 set_step_status = _multiplex('set_step_status')
228 set_build_property = _multiplex('set_build_property')
229 trigger = _multiplex('trigger')
230
231 def new_step_stream(self, step_config):
232 return self.StepStream(
233 *(se.new_step_stream(step_config)
234 for se in self._engines))
235
236 def open(self):
237 for se in self._engines:
238 se.open()
239
240 def close(self):
241 util.map_defer_exceptions(lambda se: se.close(), self._engines)
242
243 def append_stream_engine(self, se):
244 assert isinstance(se, StreamEngine)
245 self._engines.append(se)
246
247
248 def _noop(*_args, **_kwargs):
165 pass 249 pass
166 250
167 class NoopStreamEngine(StreamEngine): 251 class NoopStreamEngine(StreamEngine):
168 class Stream(StreamEngine.Stream): 252 class Stream(StreamEngine.Stream):
169 write_line = _noop 253 write_line = _noop
170 close = _noop 254 close = _noop
171 255
172 class StepStream(Stream): 256 class StepStream(Stream):
173 def new_log_stream(self, log_name): 257 def new_log_stream(self, _log_name):
174 return NoopStreamEngine.Stream() 258 return NoopStreamEngine.Stream()
175 add_step_text = _noop 259 add_step_text = _noop
176 add_step_summary_text = _noop 260 add_step_summary_text = _noop
177 add_step_link = _noop 261 add_step_link = _noop
178 reset_subannotation_state = _noop 262 reset_subannotation_state = _noop
179 set_step_status = _noop 263 set_step_status = _noop
180 set_build_property = _noop 264 set_build_property = _noop
181 trigger = _noop 265 trigger = _noop
182 266
183 def new_step_stream(self, step_config): 267 def new_step_stream(self, step_config):
184 return self.StepStream() 268 return self.StepStream()
185 269
186 270
187 class StreamEngineInvariants(StreamEngine): 271 class StreamEngineInvariants(StreamEngine):
188 """Checks that the users are using a StreamEngine hygenically. 272 """Checks that the users are using a StreamEngine hygenically.
189 273
190 Multiply with actually functional StreamEngines so you don't have to check 274 Multiply with actually functional StreamEngines so you don't have to check
191 these all over the place. 275 these all over the place.
192 """ 276 """
193 def __init__(self): 277 def __init__(self):
194 self._streams = set() 278 self._streams = set()
195 279
280 @classmethod
281 def wrap(cls, other):
282 """Returns (ProductStreamEngine): A product applying invariants to "other".
283 """
284 return ProductStreamEngine(cls(), other)
285
196 class StepStream(StreamEngine.StepStream): 286 class StepStream(StreamEngine.StepStream):
197 def __init__(self, engine, step_name): 287 def __init__(self, engine, step_name):
288 super(StreamEngineInvariants.StepStream, self).__init__()
198 self._engine = engine 289 self._engine = engine
199 self._step_name = step_name 290 self._step_name = step_name
200 self._open = True 291 self._open = True
201 self._logs = {} 292 self._logs = {}
202 self._status = 'SUCCESS' 293 self._status = 'SUCCESS'
203 294
204 def write_line(self, line): 295 def write_line(self, line):
205 assert '\n' not in line 296 assert '\n' not in line
206 assert self._open 297 assert self._open
207 298
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
308 if self._current_step != step_name: 399 if self._current_step != step_name:
309 self.output_root_annotation('STEP_CURSOR', step_name) 400 self.output_root_annotation('STEP_CURSOR', step_name)
310 self._current_step = step_name 401 self._current_step = step_name
311 if step_name not in self._opened: 402 if step_name not in self._opened:
312 self.output_current_time() 403 self.output_current_time()
313 self.output_root_annotation('STEP_STARTED') 404 self.output_root_annotation('STEP_STARTED')
314 self._opened.add(step_name) 405 self._opened.add(step_name)
315 406
316 class StepStream(StreamEngine.StepStream): 407 class StepStream(StreamEngine.StepStream):
317 def __init__(self, engine, outstream, step_name): 408 def __init__(self, engine, outstream, step_name):
318 super(StreamEngine.StepStream, self).__init__() 409 super(AnnotatorStreamEngine.StepStream, self).__init__()
319 410
320 self._engine = engine 411 self._engine = engine
321 self._outstream = outstream 412 self._outstream = outstream
322 self._step_name = step_name 413 self._step_name = step_name
323 414
324 def basic_write(self, line): 415 def basic_write(self, line):
325 self._engine._step_cursor(self._step_name) 416 self._engine._step_cursor(self._step_name)
326 self._outstream.write(line) 417 self._outstream.write(line)
327 418
328 def close(self): 419 def close(self):
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
418 string (which means it might not be valid ascii), we decode the string with 509 string (which means it might not be valid ascii), we decode the string with
419 the 'replace' error mode, which replaces invalid characters with a suitable 510 the 'replace' error mode, which replaces invalid characters with a suitable
420 replacement character. 511 replacement character.
421 """ 512 """
422 try: 513 try:
423 return str(s) 514 return str(s)
424 except UnicodeEncodeError: 515 except UnicodeEncodeError:
425 return s.encode('utf-8', 'replace') 516 return s.encode('utf-8', 'replace')
426 except UnicodeDecodeError: 517 except UnicodeDecodeError:
427 return s.decode('utf-8', 'replace') 518 return s.decode('utf-8', 'replace')
OLDNEW
« no previous file with comments | « recipe_engine/simulation_test.py ('k') | recipe_engine/stream_logdog.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698