OLD | NEW |
1 # Copyright 2015 The LUCI Authors. All rights reserved. | 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Abstract stream interface for representing recipe runs. | 5 """Abstract stream interface for representing recipe runs. |
6 | 6 |
7 We need to create streams for steps (and substeps) and also LOG_LINE steps. | 7 We need to create streams for steps (and substeps) and also LOG_LINE steps. |
8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but | 8 LogDog will implement LOG_LINE steps as real logs (i.e. uniformly), but |
9 annotations will implement them differently from normal logs, so we need | 9 annotations will implement them differently from normal logs, so we need |
10 a way to distinguish. | 10 a way to distinguish. |
11 | 11 |
12 StreamEngine will coordinate the multiplexing of streams. In the case of | 12 StreamEngine will coordinate the multiplexing of streams. In the case of |
13 annotations, this involves keeping track of the STEP_CURSOR and setting it | 13 annotations, this involves keeping track of the STEP_CURSOR and setting it |
14 accordingly, as well as filtering @@@ lines. | 14 accordingly, as well as filtering @@@ lines. |
15 | 15 |
16 Stream is a virtual well-behaved stream (associated with an Engine) which you | 16 Stream is a virtual well-behaved stream (associated with an Engine) which you |
17 can just write to without worrying. | 17 can just write to without worrying. |
18 """ | 18 """ |
19 | 19 |
20 import json | 20 import json |
21 import time | 21 import time |
22 | 22 |
23 from . import env | 23 from . import env |
24 from . import recipe_api | 24 from . import recipe_api |
| 25 from . import util |
25 | 26 |
26 | 27 |
27 class StreamEngine(object): | 28 class StreamEngine(object): |
28 class Stream(object): | 29 class Stream(object): |
29 def write_line(self, line): | 30 def write_line(self, line): |
30 raise NotImplementedError() | 31 raise NotImplementedError() |
31 | 32 |
32 def write_split(self, string): | 33 def write_split(self, string): |
33 """Write a string (which may contain newlines) to the stream. It will | 34 """Write a string (which may contain newlines) to the stream. It will |
34 be terminated by a newline.""" | 35 be terminated by a newline.""" |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
101 return self | 102 return self |
102 | 103 |
103 def __exit__(self, _exc_type, _exc_val, _exc_tb): | 104 def __exit__(self, _exc_type, _exc_val, _exc_tb): |
104 self.close() | 105 self.close() |
105 | 106 |
106 | 107 |
107 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can | 108 # Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
108 # form products. This code is entirely mechanical from the types (if we | 109 # form products. This code is entirely mechanical from the types (if we |
109 # had them formalized...). | 110 # had them formalized...). |
110 class ProductStreamEngine(StreamEngine): | 111 class ProductStreamEngine(StreamEngine): |
111 def __init__(self, engine_a, engine_b): | 112 def __init__(self, base, *engines): |
112 assert engine_a and engine_b | 113 self._engines = [base] + list(engines) |
113 self._engine_a = engine_a | 114 assert all(self._engines) |
114 self._engine_b = engine_b | |
115 | 115 |
116 class Stream(StreamEngine.Stream): | 116 class Stream(StreamEngine.Stream): |
117 def __init__(self, stream_a, stream_b): | 117 def __init__(self, *streams): |
118 assert stream_a and stream_b | 118 assert all(streams) |
119 self._stream_a = stream_a | 119 self._streams = streams |
120 self._stream_b = stream_b | |
121 | 120 |
122 def write_line(self, line): | 121 def write_line(self, line): |
123 self._stream_a.write_line(line) | 122 for s in self._streams: |
124 self._stream_b.write_line(line) | 123 s.write_line(line) |
125 | 124 |
126 def close(self): | 125 def close(self): |
127 self._stream_a.close() | 126 util.defer_exceptions_for(self._streams, lambda s: s.close()) |
128 self._stream_b.close() | |
129 | 127 |
130 class StepStream(Stream): | 128 class StepStream(Stream): |
131 def _void_product(method_name): | 129 def _void_product(method_name): |
132 def inner(self, *args): | 130 def inner(self, *args): |
133 getattr(self._stream_a, method_name)(*args) | 131 for s in self._streams: |
134 getattr(self._stream_b, method_name)(*args) | 132 getattr(s, method_name)(*args) |
135 return inner | 133 return inner |
136 | 134 |
137 def new_log_stream(self, log_name): | 135 def new_log_stream(self, log_name): |
138 return ProductStreamEngine.Stream( | 136 return ProductStreamEngine.Stream( |
139 self._stream_a.new_log_stream(log_name), | 137 *(se.new_log_stream(log_name) for se in self._streams)) |
140 self._stream_b.new_log_stream(log_name)) | |
141 | 138 |
142 add_step_text = _void_product('add_step_text') | 139 add_step_text = _void_product('add_step_text') |
143 add_step_summary_text = _void_product('add_step_summary_text') | 140 add_step_summary_text = _void_product('add_step_summary_text') |
144 add_step_link = _void_product('add_step_link') | 141 add_step_link = _void_product('add_step_link') |
145 reset_subannotation_state = _void_product('reset_subannotation_state') | 142 reset_subannotation_state = _void_product('reset_subannotation_state') |
146 set_step_status = _void_product('set_step_status') | 143 set_step_status = _void_product('set_step_status') |
147 set_build_property = _void_product('set_build_property') | 144 set_build_property = _void_product('set_build_property') |
148 trigger = _void_product('trigger') | 145 trigger = _void_product('trigger') |
149 | 146 |
150 def new_step_stream(self, step_config): | 147 def new_step_stream(self, step_config): |
151 return self.StepStream( | 148 return self.StepStream( |
152 self._engine_a.new_step_stream(step_config), | 149 *(se.new_step_stream(step_config) |
153 self._engine_b.new_step_stream(step_config)) | 150 for se in self._engines)) |
154 | 151 |
155 def open(self): | 152 def open(self): |
156 self._engine_a.open() | 153 for se in self._engines: |
157 self._engine_b.open() | 154 se.open() |
158 | 155 |
159 def close(self): | 156 def close(self): |
160 self._engine_a.close() | 157 util.defer_exceptions_for(self._engines, lambda se: se.close()) |
161 self._engine_b.close() | 158 |
| 159 def append_stream_engine(self, se): |
| 160 assert isinstance(se, StreamEngine) |
| 161 self._engines.append(se) |
162 | 162 |
163 | 163 |
164 def _noop(*args, **kwargs): | 164 def _noop(*args, **kwargs): |
165 pass | 165 pass |
166 | 166 |
167 class NoopStreamEngine(StreamEngine): | 167 class NoopStreamEngine(StreamEngine): |
168 class Stream(StreamEngine.Stream): | 168 class Stream(StreamEngine.Stream): |
169 write_line = _noop | 169 write_line = _noop |
170 close = _noop | 170 close = _noop |
171 | 171 |
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
418 string (which means it might not be valid ascii), we decode the string with | 418 string (which means it might not be valid ascii), we decode the string with |
419 the 'replace' error mode, which replaces invalid characters with a suitable | 419 the 'replace' error mode, which replaces invalid characters with a suitable |
420 replacement character. | 420 replacement character. |
421 """ | 421 """ |
422 try: | 422 try: |
423 return str(s) | 423 return str(s) |
424 except UnicodeEncodeError: | 424 except UnicodeEncodeError: |
425 return s.encode('utf-8', 'replace') | 425 return s.encode('utf-8', 'replace') |
426 except UnicodeDecodeError: | 426 except UnicodeDecodeError: |
427 return s.decode('utf-8', 'replace') | 427 return s.decode('utf-8', 'replace') |
OLD | NEW |