OLD | NEW |
---|---|
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
6 | 6 |
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
10 a way to distinguish. | 10 a way to distinguish. |
11 | 11 |
12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
15 | 15 |
16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
17 can just write to without worrying. | 17 can just write to without worrying. |
18 """ | 18 """ |
19 | 19 |
20 import json | 20 import json |
21 import time | 21 import time |
22 | 22 |
23 from . import env | 23 from . import env |
24 from . import recipe_api | 24 from . import recipe_api |
25 from . import util | |
26 | |
27 import libs.logdog.bootstrap | |
28 import libs.logdog.stream | |
29 import annotations_pb2 as miloproto | |
martiniss
2016/09/01 21:59:47
Unneeded.
dnj
2016/09/07 17:54:57
Done.
| |
25 | 30 |
26 | 31 |
27 class StreamEngine(object): | 32 class StreamEngine(object): |
28 class Stream(object): | 33 class Stream(object): |
29 def write_line(self, line): | 34 def write_line(self, line): |
30 raise NotImplementedError() | 35 raise NotImplementedError() |
31 | 36 |
32 def write_split(self, string): | 37 def write_split(self, string): |
33 """Write a string (which may contain newlines) to the stream. It will | 38 """Write a string (which may contain newlines) to the stream. It will |
34 be terminated by a newline.""" | 39 be terminated by a newline.""" |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
101 return self | 106 return self |
102 | 107 |
103 def __exit__(self, _exc_type, _exc_val, _exc_tb): | 108 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
104 self.close() | 109 self.close() |
105 | 110 |
106 | 111 |
107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | 112 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
108 # form products. This code is entirely mechanical from the types (if we | 113 # form products. This code is entirely mechanical from the types (if we |
109 # had them formalized...). | 114 # had them formalized...). |
110 class ProductStreamEngine(StreamEngine): | 115 class ProductStreamEngine(StreamEngine): |
111 def __init__(self, engine_a, engine_b): | 116 def __init__(self, base, *engines): |
112 assert engine_a and engine_b | 117 self._engines = [base] + list(engines) |
113 self._engine_a = engine_a | 118 assert all(self._engines) |
114 self._engine_b = engine_b | |
115 | 119 |
116 class Stream(StreamEngine.Stream): | 120 class Stream(StreamEngine.Stream): |
117 def __init__(self, stream_a, stream_b): | 121 def __init__(self, *streams): |
118 assert stream_a and stream_b | 122 assert all(streams) |
119 self._stream_a = stream_a | 123 self._streams = streams |
120 self._stream_b = stream_b | |
121 | 124 |
122 def write_line(self, line): | 125 def write_line(self, line): |
123 self._stream_a.write_line(line) | 126 for s in self._streams: |
124 self._stream_b.write_line(line) | 127 s.write_line(line) |
125 | 128 |
126 def close(self): | 129 def close(self): |
127 self._stream_a.close() | 130 for s in util.defer_exceptions_for(self._streams): |
128 self._stream_b.close() | 131 s.close() |
129 | 132 |
130 class StepStream(Stream): | 133 class StepStream(Stream): |
131 def _void_product(method_name): | 134 def _void_product(method_name): |
132 def inner(self, *args): | 135 def inner(self, *args): |
133 getattr(self._stream_a, method_name)(*args) | 136 for s in self._streams: |
134 getattr(self._stream_b, method_name)(*args) | 137 getattr(s, method_name)(*args) |
135 return inner | 138 return inner |
136 | 139 |
137 def new_log_stream(self, log_name): | 140 def new_log_stream(self, log_name): |
138 return ProductStreamEngine.Stream( | 141 return ProductStreamEngine.Stream( |
139 self._stream_a.new_log_stream(log_name), | 142 *(se.new_log_stream(log_name) for se in self._streams)) |
140 self._stream_b.new_log_stream(log_name)) | |
141 | 143 |
142 add_step_text = _void_product('add_step_text') | 144 add_step_text = _void_product('add_step_text') |
143 add_step_summary_text = _void_product('add_step_summary_text') | 145 add_step_summary_text = _void_product('add_step_summary_text') |
144 add_step_link = _void_product('add_step_link') | 146 add_step_link = _void_product('add_step_link') |
145 reset_subannotation_state = _void_product('reset_subannotation_state') | 147 reset_subannotation_state = _void_product('reset_subannotation_state') |
146 set_step_status = _void_product('set_step_status') | 148 set_step_status = _void_product('set_step_status') |
147 set_build_property = _void_product('set_build_property') | 149 set_build_property = _void_product('set_build_property') |
148 trigger = _void_product('trigger') | 150 trigger = _void_product('trigger') |
149 | 151 |
150 def new_step_stream(self, step_config): | 152 def new_step_stream(self, step_config): |
151 return self.StepStream( | 153 return self.StepStream( |
152 self._engine_a.new_step_stream(step_config), | 154 *list(se.new_step_stream(step_config) |
martiniss
2016/09/01 21:59:47
why the "*list"? Why not *(
dnj
2016/09/07 17:54:57
Done.
| |
153 self._engine_b.new_step_stream(step_config)) | 155 for se in self._engines)) |
154 | 156 |
155 def open(self): | 157 def open(self): |
156 self._engine_a.open() | 158 for se in self._engines: |
157 self._engine_b.open() | 159 se.open() |
158 | 160 |
159 def close(self): | 161 def close(self): |
160 self._engine_a.close() | 162 for se in util.defer_exceptions_for(self._engines): |
161 self._engine_b.close() | 163 se.close() |
164 | |
165 def append_stream_engine(self, se): | |
166 assert isinstance(se, StreamEngine) | |
167 self._engines.append(se) | |
162 | 168 |
163 | 169 |
164 def _noop(*args, **kwargs): | 170 def _noop(*args, **kwargs): |
165 pass | 171 pass |
166 | 172 |
167 class NoopStreamEngine(StreamEngine): | 173 class NoopStreamEngine(StreamEngine): |
168 class Stream(StreamEngine.Stream): | 174 class Stream(StreamEngine.Stream): |
169 write_line = _noop | 175 write_line = _noop |
170 close = _noop | 176 close = _noop |
171 | 177 |
(...skipping 247 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
419 string (which means it might not be valid ascii), we decode the string with | 425 string (which means it might not be valid ascii), we decode the string with |
420 the 'replace' error mode, which replaces invalid characters with a suitable | 426 the 'replace' error mode, which replaces invalid characters with a suitable |
421 replacement character. | 427 replacement character. |
422 """ | 428 """ |
423 try: | 429 try: |
424 return str(s) | 430 return str(s) |
425 except UnicodeEncodeError: | 431 except UnicodeEncodeError: |
426 return s.encode('utf-8', 'replace') | 432 return s.encode('utf-8', 'replace') |
427 except UnicodeDecodeError: | 433 except UnicodeDecodeError: |
428 return s.decode('utf-8', 'replace') | 434 return s.decode('utf-8', 'replace') |
OLD | NEW |