OLD | NEW |
---|---|
(Empty) | |
1 #!/usr/bin/env python | |
2 # Copyright 2015 The LUCI Authors. All rights reserved. | |
3 # Use of this source code is governed under the Apache License, Version 2.0 | |
4 # that can be found in the LICENSE file. | |
5 | |
6 import collections | |
7 import contextlib | |
8 import datetime | |
9 import json | |
10 import threading | |
11 import time | |
12 import unittest | |
13 import StringIO | |
14 | |
15 import test_env | |
16 | |
17 import libs.logdog.stream | |
18 import libs.logdog.varint | |
19 from google.protobuf import json_format as jsonpb | |
20 from recipe_engine import recipe_api | |
21 from recipe_engine import stream | |
22 from recipe_engine import stream_logdog | |
23 | |
24 | |
25 import annotations_pb2 as pb | |
26 | |
27 | |
28 def _translate_annotation_datagram(dg): | |
29 """Translate annotation datagram binary data into a Python dict modeled after | |
30 the JSONPB projection of the datagram. | |
31 | |
32 This is chosen because it allows for easy idiomatic equality assertions in | |
33 test cases. | |
34 | |
35 Args: | |
36 dg (str): The serialized annotation pb.Step datagram. | |
37 """ | |
38 msg = pb.Step() | |
39 msg.ParseFromString(dg) | |
40 return json.loads(jsonpb.MessageToJson(msg)) | |
41 | |
42 | |
43 class _TestStreamClient(libs.logdog.stream.StreamClient): | |
44 """A testing StreamClient that retains all data written to it.""" | |
45 | |
46 class Stream(object): | |
47 """A file-like object that is explicitly aware of LogDog stream protocol.""" | |
48 | |
49 def __init__(self, stream_client): | |
50 self._client = stream_client | |
51 self._buf = StringIO.StringIO() | |
52 self._header = None | |
53 self._final_data = None | |
54 self._data_offset = None | |
55 | |
56 def write(self, data): | |
57 self._buf.write(data) | |
58 self._attempt_registration() | |
59 | |
60 def close(self): | |
61 # If we never parsed our header, register that we are incomplete. | |
62 if self._header is None: | |
63 self._client._register_incomplete(self) | |
64 | |
65 self._final_data = self.data | |
66 self._buf.close() | |
67 | |
68 @contextlib.contextmanager | |
69 def _read_from(self, offset): | |
70 # Seek to the specified offset. | |
71 self._buf.seek(offset, mode=0) | |
72 try: | |
73 yield self._buf | |
74 finally: | |
75 # Seek back to he end of the stream so future writes will append. | |
martiniss
2016/09/01 21:59:48
typo
| |
76 self._buf.seek(0, mode=2) | |
77 | |
78 def _attempt_registration(self): | |
79 # Only need to register once. | |
80 if self._header is not None: | |
81 return | |
82 | |
83 # Can we parse a full LogDog stream header? | |
martiniss
2016/09/01 21:59:47
What do you mean can we? Try to?
| |
84 # | |
85 # This means pulling: | |
86 # - The LogDog Butler header. | |
87 # - The header size varint. | |
88 # - The header JSON blob, which needs to be decoded. | |
89 with self._read_from(0) as fd: | |
90 # Read 'result' bytes. | |
91 magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC)) | |
92 if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC): | |
93 # Incomplete magic number, cannot complete registration. | |
martiniss
2016/09/01 21:59:48
throw an exception? here and other places where yo
| |
94 return | |
95 count = len(magic_data) | |
96 | |
97 try: | |
98 size, varint_count = libs.logdog.varint.read_uvarint(fd) | |
99 except ValueError: | |
100 # Incomplete varint, cannot complete registration. | |
101 return | |
102 count += varint_count | |
103 | |
104 header_data = fd.read(size) | |
105 if len(header_data) != size: | |
106 # Incomplete header, cannot complete registration. | |
107 return | |
108 count += size | |
109 | |
110 # Parse the header as JSON. | |
111 self._header = json.loads(header_data) | |
112 self._data_offset = count # (varint + header size) | |
113 self._client._register_stream(self, self._header) | |
114 | |
115 @property | |
116 def data(self): | |
117 # If we have already cached our data (on close), return it directly. | |
118 if self._final_data is not None: | |
119 return self._final_data | |
120 | |
121 # Load our data from our live buffer. | |
122 if self._data_offset is None: | |
123 # No header has been read, so there is no data. | |
124 return None | |
125 with self._read_from(self._data_offset) as fd: | |
126 return fd.read() | |
127 | |
128 | |
129 _StreamEntry = collections.namedtuple('_StreamEntry', ( | |
130 's', 'type', 'content_type')) | |
131 | |
132 _DATAGRAM_CONTENT_TRANSLATE = { | |
133 stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram, | |
134 } | |
135 | |
136 | |
137 def __init__(self): | |
138 super(_TestStreamClient, self).__init__() | |
139 self.streams = {} | |
140 self.incomplete = [] | |
141 self.unregistered = {} | |
142 | |
143 @classmethod | |
144 def _create(cls, value): | |
145 raise NotImplementedError('Instances must be created manually.') | |
146 | |
147 def _connect_raw(self): | |
148 s = self.Stream(self) | |
149 self.unregistered[id(s)] = s | |
150 return s | |
151 | |
152 def get(self, name): | |
153 se = self.streams[name] | |
154 data = se.s.data | |
155 | |
156 if se.type == libs.logdog.stream.StreamParams.TEXT: | |
157 # Return text stream data as a list of lines. We use unicode because it | |
158 # fits in with the JSON dump from 'all_streams'. | |
159 return [unicode(l) for l in data.splitlines()] | |
160 elif se.type == libs.logdog.stream.StreamParams.BINARY: | |
161 raise NotImplementedError('No support for fetching binary stream data.') | |
162 elif se.type == libs.logdog.stream.StreamParams.DATAGRAM: | |
163 # Return datagram stream data as a list of datagrams. | |
164 sio = StringIO.StringIO(data) | |
165 datagrams = [] | |
166 while sio.tell() < sio.len: | |
167 size, _ = libs.logdog.varint.read_uvarint(sio) | |
168 dg = sio.read(size) | |
169 if len(dg) != size: | |
170 raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size)) | |
171 | |
172 # If this datagram is a known type (e.g., protobuf), transform it into | |
173 # JSONPB. | |
174 translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type) | |
175 if translator is not None: | |
176 dg = translator(dg) | |
177 datagrams.append(dg) | |
178 | |
179 sio.close() | |
180 return dg | |
181 else: | |
182 raise ValueError('Unknown stream type [%s]' % (se.type,)) | |
183 | |
184 def all_streams(self): | |
185 return dict((name, self.get(name)) for name in self.streams.iterkeys()) | |
martiniss
2016/09/01 21:59:48
why not just self.streams.items() ?
| |
186 | |
187 @property | |
188 def stream_names(self): | |
189 return set(self.streams.iterkeys()) | |
190 | |
191 def _remove_from_unregistered(self, s): | |
192 if id(s) not in self.unregistered: | |
193 raise KeyError('Stream is not known to be unregistered.') | |
194 del(self.unregistered[id(s)]) | |
martiniss
2016/09/01 21:59:48
Stylistically, are you supposed to do "del(x[a])",
| |
195 | |
196 def _register_stream(self, s, header): | |
197 name = header.get('name') | |
198 if name in self.streams: | |
199 raise KeyError('Duplicate stream [%s]' % (name,)) | |
200 | |
201 self._remove_from_unregistered(s) | |
202 self.streams[name] = self._StreamEntry( | |
203 s=s, | |
204 type=header['type'], | |
205 content_type=header.get('contentType'), | |
206 ) | |
207 | |
208 def _register_incomplete(self, s): | |
209 self._remove_from_unregistered(s) | |
210 self.incomplete.append(s) | |
211 | |
212 | |
213 class EnvironmentTest(unittest.TestCase): | |
214 """Simple test to assert that _Environment, which is stubbed during our tests, | |
215 actually works.""" | |
216 | |
217 def testEnvironmentProbes(self): | |
218 stream_logdog._Environment.probe() | |
219 | |
220 | |
221 class StreamEngineTest(unittest.TestCase): | |
222 | |
223 def setUp(self): | |
224 self.client = _TestStreamClient() | |
225 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) | |
226 self.env = stream_logdog._Environment( | |
227 now_fn=lambda: self.now, | |
228 argv=[], | |
229 environ={}, | |
230 cwd=None, | |
231 ) | |
232 self.maxDiff = 1024*1024 | |
233 | |
234 | |
martiniss
2016/09/01 21:59:48
nit: spaces
| |
235 @contextlib.contextmanager | |
236 def _new_stream_engine(self, **kwargs): | |
237 kwargs.setdefault('client', self.client) | |
238 kwargs.setdefault('env', self.env) | |
239 | |
240 # Initialize and open a StreamEngine. | |
241 se = stream_logdog.StreamEngine(**kwargs) | |
242 se.open() | |
243 yield se | |
244 | |
245 # Close the StreamEngine after we're done with it. | |
martiniss
2016/09/01 21:59:48
comment doesn't match the line below it.
| |
246 self._advance_time() | |
martiniss
2016/09/01 21:59:48
Personally, I'd not have something like this in he
| |
247 se.close() | |
248 | |
249 @contextlib.contextmanager | |
250 def _step_stream(self, se, **kwargs): | |
251 # Initialize and yield a new step stream. | |
252 self._advance_time() | |
253 step_stream = se.new_step_stream(recipe_api._make_step_config(**kwargs)) | |
254 yield step_stream | |
255 | |
256 # Close the step stream when we're done with it. | |
martiniss
2016/09/01 21:59:48
same as above.
| |
257 self._advance_time() | |
martiniss
2016/09/01 21:59:47
same as above.
| |
258 step_stream.close() | |
259 | |
260 @contextlib.contextmanager | |
261 def _log_stream(self, step_stream, name): | |
262 # Initialize and yield a new log stream. | |
263 log_stream = step_stream.new_log_stream(name) | |
264 yield log_stream | |
265 | |
266 # Close the log stream when we're done with it. | |
267 log_stream.close() | |
268 | |
269 def _advance_time(self): | |
270 self.now += datetime.timedelta(seconds=1) | |
271 | |
272 def testEmptyStreamEngine(self): | |
273 self.env.argv = ['fake_program', 'arg0', 'arg1'] | |
274 self.env.environ['foo'] = 'bar' | |
275 self.env.cwd = 'CWD' | |
276 | |
277 with self._new_stream_engine() as se: | |
martiniss
2016/09/01 21:59:48
this line doesn't seem to do anything... comment?
| |
278 pass | |
279 | |
280 self.assertEqual(self.client.all_streams(), { | |
281 u'annotations': { | |
282 u'name': u'steps', | |
283 u'status': u'SUCCESS', | |
284 u'started': u'2106-06-12T01:02:03Z', | |
285 u'ended': u'2106-06-12T01:02:04Z', | |
286 u'command': { | |
287 u'commandLine': [u'fake_program', u'arg0', u'arg1'], | |
288 u'cwd': u'CWD', | |
289 u'environ': {u'foo': u'bar'}, | |
290 }, | |
291 }, | |
292 }) | |
293 | |
294 def testBasicStream(self): | |
295 self.env.argv = ['fake_program', 'arg0', 'arg1'] | |
296 self.env.environ['foo'] = 'bar' | |
297 self.env.cwd = 'CWD' | |
298 | |
299 with self._new_stream_engine(name_base='test/base') as se: | |
300 with self._step_stream(se, | |
301 name='first step', | |
302 cmd=['first', 'step'], | |
303 cwd='FIRST_CWD') as step: | |
304 step.add_step_text('Sup') | |
305 step.add_step_text('Dawg?') | |
306 step.write_line('STDOUT for first step.') | |
307 step.write_line('(Another line)') | |
308 step.add_step_summary_text('Everything is great.') | |
309 step.add_step_link('example 1', 'http://example.com/1') | |
310 step.add_step_link('example 2', 'http://example.com/2') | |
311 step.set_step_status('SUCCESS') | |
312 | |
313 with self._step_stream(se, name='second step') as step: | |
314 step.set_step_status('SUCCESS') | |
315 step.write_split('multiple\nlines\nof\ntext') | |
316 | |
317 # Create two log streams with the same name to test indexing. | |
318 # | |
319 # Note that "log stream" is an invalid LogDog stream name, so this | |
320 # will also test normalization. | |
321 with self._log_stream(step, 'log stream') as ls: | |
322 ls.write_split('foo\nbar\nbaz\n') | |
323 with self._log_stream(step, 'log stream') as ls: | |
324 ls.write_split('qux\nquux\n') | |
325 | |
326 # This is a different stream name, but will normalize to the same log | |
327 # stream name as 'second/step', so this will test that we disambiguate | |
328 # the log stream names. | |
329 with self._step_stream(se, name='second/step') as step: | |
330 pass | |
331 | |
332 self.assertEqual(self.client.all_streams(), { | |
333 u'test/base/annotations': { | |
334 u'name': u'steps', | |
335 u'status': u'SUCCESS', | |
336 u'started': u'2106-06-12T01:02:03Z', | |
337 u'ended': u'2106-06-12T01:02:10Z', | |
martiniss
2016/09/01 21:59:48
The 7 seconds here are a bit hard to see that they
| |
338 u'command': { | |
339 u'commandLine': [u'fake_program', u'arg0', u'arg1'], | |
340 u'cwd': u'CWD', | |
341 u'environ': {u'foo': u'bar'}, | |
342 }, | |
343 u'substep': [ | |
344 | |
345 {u'step': { | |
346 u'name': u'first step', | |
347 u'status': u'SUCCESS', | |
348 u'started': u'2106-06-12T01:02:04Z', | |
349 u'ended': u'2106-06-12T01:02:05Z', | |
350 u'command': { | |
351 u'commandLine': [u'first', u'step'], | |
352 u'cwd': u'FIRST_CWD', | |
353 }, | |
354 u'stdoutStream': { | |
355 u'name': u'test/base/steps/first_step/stdout', | |
356 }, | |
357 u'text': [u'Everything is great.', u'Sup', u'Dawg?'], | |
358 u'otherLinks': [ | |
359 { | |
360 u'label': u'example 1', | |
361 u'url': u'http://example.com/1', | |
362 }, | |
363 { | |
364 u'label': u'example 2', | |
365 u'url': u'http://example.com/2', | |
366 }, | |
367 ], | |
368 }}, | |
369 | |
370 {u'step': { | |
371 u'name': u'second step', | |
372 u'status': u'SUCCESS', | |
373 u'started': u'2106-06-12T01:02:06Z', | |
374 u'ended': u'2106-06-12T01:02:07Z', | |
375 u'stdoutStream': { | |
376 u'name': u'test/base/steps/second_step/stdout', | |
377 }, | |
378 u'otherLinks': [ | |
379 { | |
380 u'label': u'log stream', | |
381 u'logdogStream': { | |
382 u'name': u'test/base/steps/second_step/logs/log_stream/0', | |
383 }, | |
384 }, | |
385 { | |
386 u'label': u'log stream', | |
387 u'logdogStream': { | |
388 u'name': u'test/base/steps/second_step/logs/log_stream/1', | |
389 }, | |
390 }, | |
391 ], | |
392 }}, | |
393 | |
394 {u'step': { | |
395 u'name': u'second/step', | |
396 u'status': u'SUCCESS', | |
397 u'started': u'2106-06-12T01:02:08Z', | |
398 u'ended': u'2106-06-12T01:02:09Z', | |
399 }}, | |
400 ], | |
401 }, | |
402 | |
403 u'test/base/steps/first_step/stdout': [ | |
404 u'STDOUT for first step.', | |
405 u'(Another line)', | |
406 ], | |
407 | |
408 u'test/base/steps/second_step/stdout': [ | |
409 u'multiple', | |
410 u'lines', | |
411 u'of', | |
412 u'text', | |
413 ], | |
414 | |
415 u'test/base/steps/second_step/logs/log_stream/0': [ | |
416 u'foo', | |
417 u'bar', | |
418 u'baz', | |
419 ], | |
420 | |
421 u'test/base/steps/second_step/logs/log_stream/1': [ | |
422 u'qux', | |
423 u'quux', | |
424 ], | |
425 }) | |
426 | |
427 def testWarningBasicStream(self): | |
428 with self._new_stream_engine() as se: | |
429 with self._step_stream(se, name='isuck') as step: | |
430 step.add_step_summary_text('Something went wrong.') | |
431 step.set_step_status('WARNING') | |
432 | |
433 self.assertEqual(self.client.all_streams(), { | |
434 u'annotations': { | |
435 u'name': u'steps', | |
436 u'status': u'SUCCESS', | |
437 u'started': u'2106-06-12T01:02:03Z', | |
438 u'ended': u'2106-06-12T01:02:06Z', | |
439 u'substep': [ | |
440 | |
441 {u'step': { | |
442 u'name': u'isuck', | |
443 u'status': u'SUCCESS', | |
martiniss
2016/09/01 21:59:48
Wait, shouldn't this be WARNING ??
| |
444 u'failureDetails': { | |
445 u'text': u'Something went wrong.', | |
446 }, | |
447 u'started': u'2106-06-12T01:02:04Z', | |
448 u'ended': u'2106-06-12T01:02:05Z', | |
449 u'text': [u'Something went wrong.'], | |
450 }}, | |
451 ], | |
452 }, | |
453 }) | |
454 | |
455 def testFailedBasicStream(self): | |
456 with self._new_stream_engine() as se: | |
457 with self._step_stream(se, name='isuck') as step: | |
458 step.add_step_summary_text('Oops I failed.') | |
459 step.set_step_status('FAILURE') | |
460 | |
461 with self._step_stream(se, name='irock') as step: | |
462 pass | |
463 | |
464 self.assertEqual(self.client.all_streams(), { | |
465 u'annotations': { | |
466 u'name': u'steps', | |
467 u'status': u'FAILURE', | |
468 u'started': u'2106-06-12T01:02:03Z', | |
469 u'ended': u'2106-06-12T01:02:08Z', | |
470 u'substep': [ | |
471 | |
472 {u'step': { | |
473 u'name': u'isuck', | |
474 u'status': u'FAILURE', | |
475 u'failureDetails': { | |
476 u'text': u'Oops I failed.', | |
477 }, | |
478 u'started': u'2106-06-12T01:02:04Z', | |
479 u'ended': u'2106-06-12T01:02:05Z', | |
480 u'text': [u'Oops I failed.'], | |
481 }}, | |
482 | |
483 {u'step': { | |
484 u'name': u'irock', | |
485 u'status': u'SUCCESS', | |
486 u'started': u'2106-06-12T01:02:06Z', | |
487 u'ended': u'2106-06-12T01:02:07Z', | |
488 }}, | |
489 ], | |
490 }, | |
491 }) | |
492 | |
493 def testNestedStream(self): | |
494 with self._new_stream_engine() as se: | |
495 # parent | |
496 with self._step_stream(se, name='parent') as step: | |
497 step.write_line('I am the parent.') | |
498 | |
499 # parent."child 1" | |
500 with self._step_stream(se, | |
501 name='child 1', | |
502 step_nest_level=1) as step: | |
503 step.write_line('I am child #1.') | |
504 | |
505 # parent."child 1"."grandchild" | |
506 with self._step_stream(se, | |
507 name='grandchild', | |
508 step_nest_level=2) as step: | |
509 step.write_line("I am child #1's child.") | |
510 | |
511 # parent."child 2". Mark this child as failed. This should not propagate | |
martiniss
2016/09/01 21:59:48
where do you mark this child as failed?
| |
512 # to the parent, since it has an explicit status. | |
513 with self._step_stream(se, | |
514 name='child 2', | |
515 step_nest_level=1) as step: | |
516 step.write_line('I am child #2.') | |
517 | |
518 # parent."child 2". Mark this child as failed. This should not propagate | |
519 # to the parent, since it has an explicit status. | |
520 with self._step_stream(se, name='friend') as step: | |
521 step.write_line("I am the parent's friend.") | |
522 | |
523 self.assertEqual(self.client.all_streams(), { | |
524 u'annotations': { | |
525 u'name': u'steps', | |
526 u'status': u'SUCCESS', | |
527 u'started': u'2106-06-12T01:02:03Z', | |
528 u'ended': u'2106-06-12T01:02:14Z', | |
529 u'substep': [ | |
530 | |
531 {u'step': { | |
532 u'name': u'parent', | |
533 u'status': u'SUCCESS', | |
534 u'started': u'2106-06-12T01:02:04Z', | |
535 u'ended': u'2106-06-12T01:02:05Z', | |
536 u'stdoutStream': { | |
537 u'name': u'steps/parent/stdout', | |
538 }, | |
539 u'substep': [ | |
540 | |
541 {u'step': { | |
542 u'name': u'child 1', | |
543 u'status': u'SUCCESS', | |
544 u'started': u'2106-06-12T01:02:06Z', | |
545 u'ended': u'2106-06-12T01:02:07Z', | |
546 u'stdoutStream': { | |
547 u'name': u'steps/parent/steps/child_1/stdout', | |
548 }, | |
549 u'substep': [ | |
550 | |
551 {u'step': { | |
552 u'name': u'grandchild', | |
553 u'status': u'SUCCESS', | |
554 u'started': u'2106-06-12T01:02:08Z', | |
555 u'ended': u'2106-06-12T01:02:09Z', | |
556 u'stdoutStream': { | |
557 u'name': u'steps/parent/steps/child_1/' | |
558 'steps/grandchild/stdout', | |
559 }, | |
560 }}, | |
561 ], | |
562 }}, | |
563 | |
564 {u'step': { | |
565 u'name': u'child 2', | |
566 u'status': u'SUCCESS', | |
567 u'started': u'2106-06-12T01:02:10Z', | |
568 u'ended': u'2106-06-12T01:02:11Z', | |
569 u'stdoutStream': { | |
570 u'name': u'steps/parent/steps/child_2/stdout', | |
571 }, | |
572 }}, | |
573 ], | |
574 }}, | |
575 | |
576 {u'step': { | |
577 u'name': u'friend', | |
578 u'status': u'SUCCESS', | |
579 u'started': u'2106-06-12T01:02:12Z', | |
580 u'ended': u'2106-06-12T01:02:13Z', | |
581 u'stdoutStream': { | |
582 u'name': u'steps/friend/stdout', | |
583 }, | |
584 }}, | |
585 ], | |
586 }, | |
587 | |
588 u'steps/parent/stdout': [u'I am the parent.'], | |
589 u'steps/parent/steps/child_1/stdout': [u'I am child #1.'], | |
590 u'steps/parent/steps/child_1/steps/grandchild/stdout': [ | |
591 u"I am child #1's child."], | |
592 u'steps/parent/steps/child_2/stdout': [u'I am child #2.'], | |
593 u'steps/friend/stdout': [u"I am the parent's friend."], | |
594 }) | |
595 | |
596 def testTriggersRaiseException(self): | |
597 with self._new_stream_engine() as se: | |
598 with self._step_stream(se, name='trigger') as step: | |
599 with self.assertRaises(NotImplementedError): | |
600 step.trigger('trigger spec') | |
601 | |
602 def testTriggersIgnored(self): | |
603 with self._new_stream_engine(ignore_triggers=True) as se: | |
604 with self._step_stream(se, name='trigger') as step: | |
605 step.trigger('trigger spec') | |
606 | |
607 def testNoSubannotations(self): | |
608 with self._new_stream_engine(ignore_triggers=True) as se: | |
609 with self.assertRaises(NotImplementedError): | |
610 se.new_step_stream(recipe_api._make_step_config( | |
611 name='uses subannotations', | |
612 allow_subannotations=True, | |
613 )) | |
614 | |
615 def testInvalidStepStatusRaisesValueError(self): | |
616 with self._new_stream_engine() as se: | |
617 with self._step_stream(se, name='trigger') as step: | |
618 with self.assertRaises(ValueError): | |
619 step.set_step_status('OHAI') | |
620 | |
621 | |
622 class AnnotationMonitorTest(unittest.TestCase): | |
623 """Tests the stream_logdog._AnnotationMonitor directly.""" | |
624 | |
625 # A small timedelta, sufficient to block but fast enough to not make the | |
626 # test slow. | |
627 _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5) | |
martiniss
2016/09/01 21:59:48
How hard would it be to actually mock time? This m
| |
628 | |
629 class _DatagramBuffer(object): | |
630 | |
631 def __init__(self): | |
632 self.datagrams = [] | |
633 self.data_event = threading.Event() | |
634 | |
635 def send(self, dg): | |
636 self.datagrams.append(dg) | |
637 self.data_event.set() | |
638 | |
639 def __len__(self): | |
640 return len(self.datagrams) | |
641 | |
642 @property | |
643 def latest(self): | |
644 if self.datagrams: | |
645 return self.datagrams[-1] | |
646 return None | |
647 | |
648 def wait_for_data(self): | |
649 self.data_event.wait() | |
650 self.data_event.clear() | |
651 return self.latest | |
652 | |
653 | |
654 @contextlib.contextmanager | |
655 def _annotation_monitor(self, **kwargs): | |
656 # Default to a really high flush period. This should never naturally trigger | |
657 # during a test case. | |
658 kwargs.setdefault('flush_period', datetime.timedelta(hours=1)) | |
659 | |
660 am = stream_logdog._AnnotationMonitor(self.db, **kwargs) | |
661 try: | |
662 yield am | |
663 finally: | |
664 am.flush_and_join() | |
665 | |
666 with am._lock: | |
667 # Assert that our timer has been shut down. | |
668 self.assertIsNone(am._flush_timer) | |
669 # Assert that there is no buffered data. | |
670 self.assertIsNone(am._current_data) | |
671 | |
672 def setUp(self): | |
673 self.db = self._DatagramBuffer() | |
674 | |
675 def testMonitorStartsAndJoinsWithNoData(self): | |
676 with self._annotation_monitor() as am: | |
677 pass | |
678 | |
679 # No datagrams should have been sent. | |
680 self.assertIsNone(self.db.latest) | |
681 self.assertEqual(len(self.db.datagrams), 0) | |
682 | |
683 def testMonitorBuffersAndSendsData(self): | |
684 with self._annotation_monitor() as am: | |
685 # The first piece of data should have been immediately sent. | |
686 am.signal_update('initial') | |
687 self.assertEqual(self.db.wait_for_data(), 'initial') | |
688 | |
689 # Subsequent iterations should not send data, but should start the flush | |
690 # timer and buffer the latest data. | |
691 with am._lock: | |
692 self.assertIsNone(am._flush_timer) | |
693 for i in xrange(10): | |
694 am.signal_update('test%d' % (i,)) | |
695 time.sleep(self._SMALL_TIME_DELTA.total_seconds()) | |
696 with am._lock: | |
697 self.assertEqual(am._current_data, 'test9') | |
698 self.assertIsNotNone(am._flush_timer) | |
699 | |
700 # Pretend the timer triggered. We should receive the latest buffered data. | |
701 am._flush_timer_expired() | |
702 self.assertEqual(self.db.wait_for_data(), 'test9') | |
703 with am._lock: | |
704 # No more timer or buffered data. | |
705 self.assertIsNone(am._flush_timer) | |
706 self.assertIsNone(am._current_data) | |
707 | |
708 # Send one last chunk of data, but don't let the timer expire. This will | |
709 # be sent on final flush. | |
710 am.signal_update('final') | |
711 with am._lock: | |
712 self.assertIsNotNone(am._flush_timer) | |
713 | |
714 # Assert that the final chunk of data was sent. | |
715 self.assertEqual(self.db.latest, 'final') | |
716 | |
717 # Only three datagrams should have been sent. | |
718 self.assertEqual(len(self.db.datagrams), 3) | |
719 | |
720 def testMonitorIgnoresDuplicateData(self): | |
721 with self._annotation_monitor() as am: | |
722 # Get initial data out of the way. | |
723 am.signal_update('initial') | |
724 self.assertEqual(self.db.wait_for_data(), 'initial') | |
725 | |
726 # Send the same thing. It should not be buffered. | |
727 am.signal_update('initial') | |
728 with am._lock: | |
729 self.assertIsNone(am._flush_timer) | |
730 self.assertIsNone(am._current_data) | |
731 | |
732 # Only one datagrams should have been sent. | |
733 self.assertEqual(len(self.db.datagrams), 1) | |
734 | |
735 def testStructuralUpdateSendsImmediately(self): | |
736 with self._annotation_monitor() as am: | |
737 # Get initial data out of the way. | |
738 am.signal_update('initial') | |
739 self.assertEqual(self.db.wait_for_data(), 'initial') | |
740 | |
741 # Send a structural update. It should send immediately. | |
742 am.signal_update('test', structural=True) | |
743 self.assertEqual(self.db.wait_for_data(), 'test') | |
744 | |
745 # Send a duplicate structural update. It should be ignored. | |
746 am.signal_update('test', structural=True) | |
747 with am._lock: | |
748 self.assertIsNone(am._flush_timer) | |
749 self.assertIsNone(am._current_data) | |
750 | |
751 # Only two datagrams should have been sent. | |
752 self.assertEqual(len(self.db.datagrams), 2) | |
753 | |
754 def testFlushesPeriodically(self): | |
755 with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am: | |
756 # Get initial data out of the way. | |
757 am.signal_update('initial') | |
758 self.assertEqual(self.db.wait_for_data(), 'initial') | |
759 | |
760 # Send a structural update. It should send immediately. | |
761 am.signal_update('test') | |
762 self.assertEqual(self.db.wait_for_data(), 'test') | |
763 | |
764 # Only two datagrams should have been sent. | |
765 self.assertEqual(len(self.db.datagrams), 2) | |
766 | |
767 | |
768 class AnnotationStateTest(unittest.TestCase): | |
769 """Tests the stream_logdog._AnnotationState directly.""" | |
770 | |
771 def setUp(self): | |
772 self.env = stream_logdog._Environment( | |
773 None, | |
774 argv=['command', 'arg0', 'arg1'], | |
775 cwd='path/to/cwd', | |
776 environ={ | |
777 'foo': 'bar', | |
778 'FOO': 'baz', | |
779 }, | |
780 ) | |
781 self.astate = stream_logdog._AnnotationState.create( | |
782 stream_logdog._StreamName('strean/name'), | |
783 env=self.env, | |
784 properties={'foo': 'bar'}, | |
785 ) | |
786 | |
787 def testFirstCheckReturnsData(self): | |
788 # The first check should return data. | |
789 self.assertIsNotNone(self.astate.check()) | |
790 # The second will, since nothing has changed. | |
791 self.assertIsNone(self.astate.check()) | |
792 | |
793 def testCanCreateAndGetStep(self): | |
794 # Root step. | |
795 base = self.astate.base | |
796 self.astate.create_step(recipe_api._make_step_config(name='first')) | |
797 self.assertEqual(len(base.substep), 1) | |
798 self.assertEqual(base.substep[0].step.name, 'first') | |
799 self.assertIsNotNone(self.astate.check()) | |
800 | |
801 # Child step. | |
802 self.astate.create_step(recipe_api._make_step_config( | |
803 name='first child', | |
804 step_nest_level=1)) | |
805 self.assertEqual(len(base.substep), 1) | |
806 self.assertEqual(len(base.substep[0].step.substep), 1) | |
807 self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child') | |
808 self.assertIsNotNone(self.astate.check()) | |
809 | |
810 # Sibling step to 'first'. | |
811 self.astate.create_step(recipe_api._make_step_config(name='second')) | |
812 self.assertEqual(len(base.substep), 2) | |
813 self.assertEqual(base.substep[1].step.name, 'second') | |
814 self.assertIsNotNone(self.astate.check()) | |
815 | |
816 def testCanUpdateProperties(self): | |
817 self.astate.update_properties(foo='baz', qux='quux') | |
818 self.assertEqual(list(self.astate.base.property), [ | |
819 pb.Step.Property(name='foo', value='baz'), | |
820 pb.Step.Property(name='qux', value='quux'), | |
821 ]) | |
822 | |
823 | |
824 class StreamNameTest(unittest.TestCase): | |
825 """Tests the stream_logdog._StreamName directly.""" | |
826 | |
827 def testEmptyStreamNameRaisesValueError(self): | |
828 sn = stream_logdog._StreamName(None) | |
829 with self.assertRaises(ValueError): | |
830 str(sn) | |
831 | |
832 def testInvalidBaseRaisesValueError(self): | |
833 with self.assertRaises(ValueError): | |
834 stream_logdog._StreamName('!!! invalid !!!') | |
835 | |
836 def testAppendComponents(self): | |
837 sn = stream_logdog._StreamName('base') | |
838 self.assertEqual(str(sn.append()), 'base') | |
839 self.assertEqual(str(sn.append('foo')), 'base/foo') | |
840 self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar') | |
841 self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz') | |
842 | |
843 def testAugment(self): | |
844 sn = stream_logdog._StreamName('base') | |
845 self.assertEqual(str(sn.augment('')), 'base') | |
846 self.assertEqual(str(sn.augment('foo')), 'basefoo') | |
847 self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz') | |
848 | |
849 def testAppendInvalidStreamNameNormalizes(self): | |
850 sn = stream_logdog._StreamName('base') | |
851 sn = sn.append('#!!! stream name !!!') | |
852 self.assertEqual(str(sn), 'base/s______stream_name____') | |
853 | |
854 def testAugmentInvalidStreamNameNormalizes(self): | |
855 sn = stream_logdog._StreamName('base') | |
856 self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____') | |
857 | |
858 | |
859 if __name__ == '__main__': | |
860 unittest.main() | |
OLD | NEW |