Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1419)

Side by Side Diff: recipe_engine/stream.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The LUCI Authors. All rights reserved. 1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Abstract stream interface for representing recipe runs. 5 """Abstract stream interface for representing recipe runs.
6 6
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. 7 We need to create streams for steps (and substeps) and also LOG_LINE steps.
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but
9 annotations will implement them differently from normal logs, so we need 9 annotations will implement them differently from normal logs, so we need
10 a way to distinguish. 10 a way to distinguish.
11 11
12 StreamEngine will coordinate the multiplexing of streams. In the case of 12 StreamEngine will coordinate the multiplexing of streams. In the case of
13 annotations, this involves keeping track of the STEP_CURSOR and setting it 13 annotations, this involves keeping track of the STEP_CURSOR and setting it
14 accordingly, as well as filtering @@@ lines. 14 accordingly, as well as filtering @@@ lines.
15 15
16 Stream is a virtual well-behaved stream (associated with an Engine) which you 16 Stream is a virtual well-behaved stream (associated with an Engine) which you
17 can just write to without worrying. 17 can just write to without worrying.
18 """ 18 """
19 19
20 import json 20 import json
21 import time 21 import time
22 22
23 from . import env 23 from . import env
24 from . import recipe_api 24 from . import recipe_api
25 from . import util
26
27 import libs.logdog.bootstrap
28 import libs.logdog.stream
29 import annotations_pb2 as miloproto
martiniss 2016/09/01 21:59:47 Unneeded.
dnj 2016/09/07 17:54:57 Done.
25 30
26 31
27 class StreamEngine(object): 32 class StreamEngine(object):
28 class Stream(object): 33 class Stream(object):
29 def write_line(self, line): 34 def write_line(self, line):
30 raise NotImplementedError() 35 raise NotImplementedError()
31 36
32 def write_split(self, string): 37 def write_split(self, string):
33 """Write a string (which may contain newlines) to the stream. It will 38 """Write a string (which may contain newlines) to the stream. It will
34 be terminated by a newline.""" 39 be terminated by a newline."""
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 return self 106 return self
102 107
103 def __exit__(self, _exc_type, _exc_val, _exc_tb): 108 def __exit__(self, _exc_type, _exc_val, _exc_tb):
104 self.close() 109 self.close()
105 110
106 111
107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can 112 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
108 # form products. This code is entirely mechanical from the types (if we 113 # form products. This code is entirely mechanical from the types (if we
109 # had them formalized...). 114 # had them formalized...).
110 class ProductStreamEngine(StreamEngine): 115 class ProductStreamEngine(StreamEngine):
111 def __init__(self, engine_a, engine_b): 116 def __init__(self, base, *engines):
112 assert engine_a and engine_b 117 self._engines = [base] + list(engines)
113 self._engine_a = engine_a 118 assert all(self._engines)
114 self._engine_b = engine_b
115 119
116 class Stream(StreamEngine.Stream): 120 class Stream(StreamEngine.Stream):
117 def __init__(self, stream_a, stream_b): 121 def __init__(self, *streams):
118 assert stream_a and stream_b 122 assert all(streams)
119 self._stream_a = stream_a 123 self._streams = streams
120 self._stream_b = stream_b
121 124
122 def write_line(self, line): 125 def write_line(self, line):
123 self._stream_a.write_line(line) 126 for s in self._streams:
124 self._stream_b.write_line(line) 127 s.write_line(line)
125 128
126 def close(self): 129 def close(self):
127 self._stream_a.close() 130 for s in util.defer_exceptions_for(self._streams):
128 self._stream_b.close() 131 s.close()
129 132
130 class StepStream(Stream): 133 class StepStream(Stream):
131 def _void_product(method_name): 134 def _void_product(method_name):
132 def inner(self, *args): 135 def inner(self, *args):
133 getattr(self._stream_a, method_name)(*args) 136 for s in self._streams:
134 getattr(self._stream_b, method_name)(*args) 137 getattr(s, method_name)(*args)
135 return inner 138 return inner
136 139
137 def new_log_stream(self, log_name): 140 def new_log_stream(self, log_name):
138 return ProductStreamEngine.Stream( 141 return ProductStreamEngine.Stream(
139 self._stream_a.new_log_stream(log_name), 142 *(se.new_log_stream(log_name) for se in self._streams))
140 self._stream_b.new_log_stream(log_name))
141 143
142 add_step_text = _void_product('add_step_text') 144 add_step_text = _void_product('add_step_text')
143 add_step_summary_text = _void_product('add_step_summary_text') 145 add_step_summary_text = _void_product('add_step_summary_text')
144 add_step_link = _void_product('add_step_link') 146 add_step_link = _void_product('add_step_link')
145 reset_subannotation_state = _void_product('reset_subannotation_state') 147 reset_subannotation_state = _void_product('reset_subannotation_state')
146 set_step_status = _void_product('set_step_status') 148 set_step_status = _void_product('set_step_status')
147 set_build_property = _void_product('set_build_property') 149 set_build_property = _void_product('set_build_property')
148 trigger = _void_product('trigger') 150 trigger = _void_product('trigger')
149 151
150 def new_step_stream(self, step_config): 152 def new_step_stream(self, step_config):
151 return self.StepStream( 153 return self.StepStream(
152 self._engine_a.new_step_stream(step_config), 154 *list(se.new_step_stream(step_config)
martiniss 2016/09/01 21:59:47 why the "*list"? Why not *(
dnj 2016/09/07 17:54:57 Done.
153 self._engine_b.new_step_stream(step_config)) 155 for se in self._engines))
154 156
155 def open(self): 157 def open(self):
156 self._engine_a.open() 158 for se in self._engines:
157 self._engine_b.open() 159 se.open()
158 160
159 def close(self): 161 def close(self):
160 self._engine_a.close() 162 for se in util.defer_exceptions_for(self._engines):
161 self._engine_b.close() 163 se.close()
164
165 def append_stream_engine(self, se):
166 assert isinstance(se, StreamEngine)
167 self._engines.append(se)
162 168
163 169
164 def _noop(*args, **kwargs): 170 def _noop(*args, **kwargs):
165 pass 171 pass
166 172
167 class NoopStreamEngine(StreamEngine): 173 class NoopStreamEngine(StreamEngine):
168 class Stream(StreamEngine.Stream): 174 class Stream(StreamEngine.Stream):
169 write_line = _noop 175 write_line = _noop
170 close = _noop 176 close = _noop
171 177
(...skipping 247 matching lines...) Expand 10 before | Expand all | Expand 10 after
419 string (which means it might not be valid ascii), we decode the string with 425 string (which means it might not be valid ascii), we decode the string with
420 the 'replace' error mode, which replaces invalid characters with a suitable 426 the 'replace' error mode, which replaces invalid characters with a suitable
421 replacement character. 427 replacement character.
422 """ 428 """
423 try: 429 try:
424 return str(s) 430 return str(s)
425 except UnicodeEncodeError: 431 except UnicodeEncodeError:
426 return s.encode('utf-8', 'replace') 432 return s.encode('utf-8', 'replace')
427 except UnicodeDecodeError: 433 except UnicodeDecodeError:
428 return s.decode('utf-8', 'replace') 434 return s.decode('utf-8', 'replace')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698