Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Side by Side Diff: recipe_engine/stream.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: pylint, fix comments Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
11 11
12 StreamEngine will coordinate the multiplexing of streams. In the case of 12 StreamEngine will coordinate the multiplexing of streams. In the case of
13 annotations, this involves keeping track of the STEP_CURSOR and setting it 13 annotations, this involves keeping track of the STEP_CURSOR and setting it
14 accordingly, as well as filtering @@@ lines. 14 accordingly, as well as filtering @@@ lines.
15 15
16 Stream is a virtual well-behaved stream (associated with an Engine) which you 16 Stream is a virtual well-behaved stream (associated with an Engine) which you
17 can just write to without worrying. 17 can just write to without worrying.
18 """ 18 """
19 19
20 import json 20 import json
21 import time 21 import time
22 22
23 from . import env 23 from . import env
24 from . import recipe_api 24 from . import recipe_api
25 from . import util
25 26
26 27
27 class StreamEngine(object): 28 class StreamEngine(object):
28 class Stream(object): 29 class Stream(object):
29 def write_line(self, line): 30 def write_line(self, line):
30 raise NotImplementedError() 31 raise NotImplementedError()
31 32
32 def write_split(self, string): 33 def write_split(self, string):
33 """Write a string (which may contain newlines) to the stream. It will 34 """Write a string (which may contain newlines) to the stream. It will
34 be terminated by a newline.""" 35 be terminated by a newline."""
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 return self 102 return self
102 103
103 def __exit__(self, _exc_type, _exc_val, _exc_tb): 104 def __exit__(self, _exc_type, _exc_val, _exc_tb):
104 self.close() 105 self.close()
105 106
106 107
107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can 108 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
108 # form products. This code is entirely mechanical from the types (if we 109 # form products. This code is entirely mechanical from the types (if we
109 # had them formalized...). 110 # had them formalized...).
110 class ProductStreamEngine(StreamEngine): 111 class ProductStreamEngine(StreamEngine):
111 def __init__(self, engine_a, engine_b): 112 def __init__(self, base, *engines):
112 assert engine_a and engine_b 113 self._engines = [base] + list(engines)
113 self._engine_a = engine_a 114 assert all(self._engines)
114 self._engine_b = engine_b
115 115
116 class Stream(StreamEngine.Stream): 116 class Stream(StreamEngine.Stream):
117 def __init__(self, stream_a, stream_b): 117 def __init__(self, *streams):
118 assert stream_a and stream_b 118 assert all(streams)
119 self._stream_a = stream_a 119 self._streams = streams
120 self._stream_b = stream_b
121 120
122 def write_line(self, line): 121 def write_line(self, line):
123 self._stream_a.write_line(line) 122 for s in self._streams:
124 self._stream_b.write_line(line) 123 s.write_line(line)
125 124
126 def close(self): 125 def close(self):
127 self._stream_a.close() 126 util.defer_exceptions_for(self._streams, lambda s: s.close())
128 self._stream_b.close()
129 127
130 class StepStream(Stream): 128 class StepStream(Stream):
131 def _void_product(method_name): 129 def _void_product(method_name):
132 def inner(self, *args): 130 def inner(self, *args):
133 getattr(self._stream_a, method_name)(*args) 131 for s in self._streams:
134 getattr(self._stream_b, method_name)(*args) 132 getattr(s, method_name)(*args)
135 return inner 133 return inner
136 134
137 def new_log_stream(self, log_name): 135 def new_log_stream(self, log_name):
138 return ProductStreamEngine.Stream( 136 return ProductStreamEngine.Stream(
139 self._stream_a.new_log_stream(log_name), 137 *(se.new_log_stream(log_name) for se in self._streams))
140 self._stream_b.new_log_stream(log_name))
141 138
142 add_step_text = _void_product('add_step_text') 139 add_step_text = _void_product('add_step_text')
143 add_step_summary_text = _void_product('add_step_summary_text') 140 add_step_summary_text = _void_product('add_step_summary_text')
144 add_step_link = _void_product('add_step_link') 141 add_step_link = _void_product('add_step_link')
145 reset_subannotation_state = _void_product('reset_subannotation_state') 142 reset_subannotation_state = _void_product('reset_subannotation_state')
146 set_step_status = _void_product('set_step_status') 143 set_step_status = _void_product('set_step_status')
147 set_build_property = _void_product('set_build_property') 144 set_build_property = _void_product('set_build_property')
148 trigger = _void_product('trigger') 145 trigger = _void_product('trigger')
149 146
150 def new_step_stream(self, step_config): 147 def new_step_stream(self, step_config):
151 return self.StepStream( 148 return self.StepStream(
152 self._engine_a.new_step_stream(step_config), 149 *(se.new_step_stream(step_config)
153 self._engine_b.new_step_stream(step_config)) 150 for se in self._engines))
154 151
155 def open(self): 152 def open(self):
156 self._engine_a.open() 153 for se in self._engines:
157 self._engine_b.open() 154 se.open()
158 155
159 def close(self): 156 def close(self):
160 self._engine_a.close() 157 util.defer_exceptions_for(self._engines, lambda se: se.close())
161 self._engine_b.close() 158
159 def append_stream_engine(self, se):
160 assert isinstance(se, StreamEngine)
161 self._engines.append(se)
162 162
163 163
164 def _noop(*args, **kwargs): 164 def _noop(*args, **kwargs):
165 pass 165 pass
166 166
167 class NoopStreamEngine(StreamEngine): 167 class NoopStreamEngine(StreamEngine):
168 class Stream(StreamEngine.Stream): 168 class Stream(StreamEngine.Stream):
169 write_line = _noop 169 write_line = _noop
170 close = _noop 170 close = _noop
171 171
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after
418 string (which means it might not be valid ascii), we decode the string with 418 string (which means it might not be valid ascii), we decode the string with
419 the 'replace' error mode, which replaces invalid characters with a suitable 419 the 'replace' error mode, which replaces invalid characters with a suitable
420 replacement character. 420 replacement character.
421 """ 421 """
422 try: 422 try:
423 return str(s) 423 return str(s)
424 except UnicodeEncodeError: 424 except UnicodeEncodeError:
425 return s.encode('utf-8', 'replace') 425 return s.encode('utf-8', 'replace')
426 except UnicodeDecodeError: 426 except UnicodeDecodeError:
427 return s.decode('utf-8', 'replace') 427 return s.decode('utf-8', 'replace')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698