Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 #!/usr/bin/env python | |
| 2 # Copyright 2015 The LUCI Authors. All rights reserved. | |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | |
| 4 # that can be found in the LICENSE file. | |
| 5 | |
| 6 import collections | |
| 7 import contextlib | |
| 8 import datetime | |
| 9 import json | |
| 10 import threading | |
| 11 import time | |
| 12 import unittest | |
| 13 import StringIO | |
| 14 | |
| 15 import test_env | |
| 16 | |
| 17 import libs.logdog.stream | |
| 18 import libs.logdog.varint | |
| 19 from google.protobuf import json_format as jsonpb | |
| 20 from recipe_engine import recipe_api | |
| 21 from recipe_engine import stream | |
| 22 from recipe_engine import stream_logdog | |
| 23 | |
| 24 | |
| 25 import annotations_pb2 as pb | |
| 26 | |
| 27 | |
| 28 def _translate_annotation_datagram(dg): | |
| 29 """Translate annotation datagram binary data into a Python dict modeled after | |
| 30 the JSONPB projection of the datagram. | |
| 31 | |
| 32 This is chosen because it allows for easy idiomatic equality assertions in | |
| 33 test cases. | |
| 34 | |
| 35 Args: | |
| 36 dg (str): The serialized annotation pb.Step datagram. | |
| 37 """ | |
| 38 msg = pb.Step() | |
| 39 msg.ParseFromString(dg) | |
| 40 return json.loads(jsonpb.MessageToJson(msg)) | |
| 41 | |
| 42 | |
| 43 class _TestStreamClient(libs.logdog.stream.StreamClient): | |
| 44 """A testing StreamClient that retains all data written to it.""" | |
| 45 | |
| 46 class Stream(object): | |
| 47 """A file-like object that is explicitly aware of LogDog stream protocol.""" | |
| 48 | |
| 49 def __init__(self, stream_client): | |
| 50 self._client = stream_client | |
| 51 self._buf = StringIO.StringIO() | |
| 52 self._header = None | |
| 53 self._final_data = None | |
| 54 self._data_offset = None | |
| 55 | |
| 56 def write(self, data): | |
| 57 self._buf.write(data) | |
| 58 self._attempt_registration() | |
| 59 | |
| 60 def close(self): | |
| 61 # If we never parsed our header, register that we are incomplete. | |
| 62 if self._header is None: | |
| 63 self._client._register_incomplete(self) | |
| 64 | |
| 65 self._final_data = self.data | |
| 66 self._buf.close() | |
| 67 | |
| 68 @contextlib.contextmanager | |
| 69 def _read_from(self, offset): | |
| 70 # Seek to the specified offset. | |
| 71 self._buf.seek(offset, mode=0) | |
| 72 try: | |
| 73 yield self._buf | |
| 74 finally: | |
| 75 # Seek back to he end of the stream so future writes will append. | |
|
martiniss
2016/09/01 21:59:48
typo
| |
| 76 self._buf.seek(0, mode=2) | |
| 77 | |
| 78 def _attempt_registration(self): | |
| 79 # Only need to register once. | |
| 80 if self._header is not None: | |
| 81 return | |
| 82 | |
| 83 # Can we parse a full LogDog stream header? | |
|
martiniss
2016/09/01 21:59:47
What do you mean can we? Try to?
| |
| 84 # | |
| 85 # This means pulling: | |
| 86 # - The LogDog Butler header. | |
| 87 # - The header size varint. | |
| 88 # - The header JSON blob, which needs to be decoded. | |
| 89 with self._read_from(0) as fd: | |
| 90 # Read 'result' bytes. | |
| 91 magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC)) | |
| 92 if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC): | |
| 93 # Incomplete magic number, cannot complete registration. | |
|
martiniss
2016/09/01 21:59:48
throw an exception? here and other places where yo
| |
| 94 return | |
| 95 count = len(magic_data) | |
| 96 | |
| 97 try: | |
| 98 size, varint_count = libs.logdog.varint.read_uvarint(fd) | |
| 99 except ValueError: | |
| 100 # Incomplete varint, cannot complete registration. | |
| 101 return | |
| 102 count += varint_count | |
| 103 | |
| 104 header_data = fd.read(size) | |
| 105 if len(header_data) != size: | |
| 106 # Incomplete header, cannot complete registration. | |
| 107 return | |
| 108 count += size | |
| 109 | |
| 110 # Parse the header as JSON. | |
| 111 self._header = json.loads(header_data) | |
| 112 self._data_offset = count # (varint + header size) | |
| 113 self._client._register_stream(self, self._header) | |
| 114 | |
| 115 @property | |
| 116 def data(self): | |
| 117 # If we have already cached our data (on close), return it directly. | |
| 118 if self._final_data is not None: | |
| 119 return self._final_data | |
| 120 | |
| 121 # Load our data from our live buffer. | |
| 122 if self._data_offset is None: | |
| 123 # No header has been read, so there is no data. | |
| 124 return None | |
| 125 with self._read_from(self._data_offset) as fd: | |
| 126 return fd.read() | |
| 127 | |
| 128 | |
| 129 _StreamEntry = collections.namedtuple('_StreamEntry', ( | |
| 130 's', 'type', 'content_type')) | |
| 131 | |
| 132 _DATAGRAM_CONTENT_TRANSLATE = { | |
| 133 stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram, | |
| 134 } | |
| 135 | |
| 136 | |
| 137 def __init__(self): | |
| 138 super(_TestStreamClient, self).__init__() | |
| 139 self.streams = {} | |
| 140 self.incomplete = [] | |
| 141 self.unregistered = {} | |
| 142 | |
| 143 @classmethod | |
| 144 def _create(cls, value): | |
| 145 raise NotImplementedError('Instances must be created manually.') | |
| 146 | |
| 147 def _connect_raw(self): | |
| 148 s = self.Stream(self) | |
| 149 self.unregistered[id(s)] = s | |
| 150 return s | |
| 151 | |
| 152 def get(self, name): | |
| 153 se = self.streams[name] | |
| 154 data = se.s.data | |
| 155 | |
| 156 if se.type == libs.logdog.stream.StreamParams.TEXT: | |
| 157 # Return text stream data as a list of lines. We use unicode because it | |
| 158 # fits in with the JSON dump from 'all_streams'. | |
| 159 return [unicode(l) for l in data.splitlines()] | |
| 160 elif se.type == libs.logdog.stream.StreamParams.BINARY: | |
| 161 raise NotImplementedError('No support for fetching binary stream data.') | |
| 162 elif se.type == libs.logdog.stream.StreamParams.DATAGRAM: | |
| 163 # Return datagram stream data as a list of datagrams. | |
| 164 sio = StringIO.StringIO(data) | |
| 165 datagrams = [] | |
| 166 while sio.tell() < sio.len: | |
| 167 size, _ = libs.logdog.varint.read_uvarint(sio) | |
| 168 dg = sio.read(size) | |
| 169 if len(dg) != size: | |
| 170 raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size)) | |
| 171 | |
| 172 # If this datagram is a known type (e.g., protobuf), transform it into | |
| 173 # JSONPB. | |
| 174 translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type) | |
| 175 if translator is not None: | |
| 176 dg = translator(dg) | |
| 177 datagrams.append(dg) | |
| 178 | |
| 179 sio.close() | |
| 180 return dg | |
| 181 else: | |
| 182 raise ValueError('Unknown stream type [%s]' % (se.type,)) | |
| 183 | |
| 184 def all_streams(self): | |
| 185 return dict((name, self.get(name)) for name in self.streams.iterkeys()) | |
|
martiniss
2016/09/01 21:59:48
why not just self.streams.items() ?
| |
| 186 | |
| 187 @property | |
| 188 def stream_names(self): | |
| 189 return set(self.streams.iterkeys()) | |
| 190 | |
| 191 def _remove_from_unregistered(self, s): | |
| 192 if id(s) not in self.unregistered: | |
| 193 raise KeyError('Stream is not known to be unregistered.') | |
| 194 del(self.unregistered[id(s)]) | |
|
martiniss
2016/09/01 21:59:48
Stylistically, are you supposed to do "del(x[a])",
| |
| 195 | |
| 196 def _register_stream(self, s, header): | |
| 197 name = header.get('name') | |
| 198 if name in self.streams: | |
| 199 raise KeyError('Duplicate stream [%s]' % (name,)) | |
| 200 | |
| 201 self._remove_from_unregistered(s) | |
| 202 self.streams[name] = self._StreamEntry( | |
| 203 s=s, | |
| 204 type=header['type'], | |
| 205 content_type=header.get('contentType'), | |
| 206 ) | |
| 207 | |
| 208 def _register_incomplete(self, s): | |
| 209 self._remove_from_unregistered(s) | |
| 210 self.incomplete.append(s) | |
| 211 | |
| 212 | |
| 213 class EnvironmentTest(unittest.TestCase): | |
| 214 """Simple test to assert that _Environment, which is stubbed during our tests, | |
| 215 actually works.""" | |
| 216 | |
| 217 def testEnvironmentProbes(self): | |
| 218 stream_logdog._Environment.probe() | |
| 219 | |
| 220 | |
| 221 class StreamEngineTest(unittest.TestCase): | |
| 222 | |
| 223 def setUp(self): | |
| 224 self.client = _TestStreamClient() | |
| 225 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) | |
| 226 self.env = stream_logdog._Environment( | |
| 227 now_fn=lambda: self.now, | |
| 228 argv=[], | |
| 229 environ={}, | |
| 230 cwd=None, | |
| 231 ) | |
| 232 self.maxDiff = 1024*1024 | |
| 233 | |
| 234 | |
|
martiniss
2016/09/01 21:59:48
nit: spaces
| |
| 235 @contextlib.contextmanager | |
| 236 def _new_stream_engine(self, **kwargs): | |
| 237 kwargs.setdefault('client', self.client) | |
| 238 kwargs.setdefault('env', self.env) | |
| 239 | |
| 240 # Initialize and open a StreamEngine. | |
| 241 se = stream_logdog.StreamEngine(**kwargs) | |
| 242 se.open() | |
| 243 yield se | |
| 244 | |
| 245 # Close the StreamEngine after we're done with it. | |
|
martiniss
2016/09/01 21:59:48
comment doesn't match the line below it.
| |
| 246 self._advance_time() | |
|
martiniss
2016/09/01 21:59:48
Personally, I'd not have something like this in he
| |
| 247 se.close() | |
| 248 | |
| 249 @contextlib.contextmanager | |
| 250 def _step_stream(self, se, **kwargs): | |
| 251 # Initialize and yield a new step stream. | |
| 252 self._advance_time() | |
| 253 step_stream = se.new_step_stream(recipe_api._make_step_config(**kwargs)) | |
| 254 yield step_stream | |
| 255 | |
| 256 # Close the step stream when we're done with it. | |
|
martiniss
2016/09/01 21:59:48
same as above.
| |
| 257 self._advance_time() | |
|
martiniss
2016/09/01 21:59:47
same as above.
| |
| 258 step_stream.close() | |
| 259 | |
| 260 @contextlib.contextmanager | |
| 261 def _log_stream(self, step_stream, name): | |
| 262 # Initialize and yield a new log stream. | |
| 263 log_stream = step_stream.new_log_stream(name) | |
| 264 yield log_stream | |
| 265 | |
| 266 # Close the log stream when we're done with it. | |
| 267 log_stream.close() | |
| 268 | |
| 269 def _advance_time(self): | |
| 270 self.now += datetime.timedelta(seconds=1) | |
| 271 | |
| 272 def testEmptyStreamEngine(self): | |
| 273 self.env.argv = ['fake_program', 'arg0', 'arg1'] | |
| 274 self.env.environ['foo'] = 'bar' | |
| 275 self.env.cwd = 'CWD' | |
| 276 | |
| 277 with self._new_stream_engine() as se: | |
|
martiniss
2016/09/01 21:59:48
this line doesn't seem to do anything... comment?
| |
| 278 pass | |
| 279 | |
| 280 self.assertEqual(self.client.all_streams(), { | |
| 281 u'annotations': { | |
| 282 u'name': u'steps', | |
| 283 u'status': u'SUCCESS', | |
| 284 u'started': u'2106-06-12T01:02:03Z', | |
| 285 u'ended': u'2106-06-12T01:02:04Z', | |
| 286 u'command': { | |
| 287 u'commandLine': [u'fake_program', u'arg0', u'arg1'], | |
| 288 u'cwd': u'CWD', | |
| 289 u'environ': {u'foo': u'bar'}, | |
| 290 }, | |
| 291 }, | |
| 292 }) | |
| 293 | |
| 294 def testBasicStream(self): | |
| 295 self.env.argv = ['fake_program', 'arg0', 'arg1'] | |
| 296 self.env.environ['foo'] = 'bar' | |
| 297 self.env.cwd = 'CWD' | |
| 298 | |
| 299 with self._new_stream_engine(name_base='test/base') as se: | |
| 300 with self._step_stream(se, | |
| 301 name='first step', | |
| 302 cmd=['first', 'step'], | |
| 303 cwd='FIRST_CWD') as step: | |
| 304 step.add_step_text('Sup') | |
| 305 step.add_step_text('Dawg?') | |
| 306 step.write_line('STDOUT for first step.') | |
| 307 step.write_line('(Another line)') | |
| 308 step.add_step_summary_text('Everything is great.') | |
| 309 step.add_step_link('example 1', 'http://example.com/1') | |
| 310 step.add_step_link('example 2', 'http://example.com/2') | |
| 311 step.set_step_status('SUCCESS') | |
| 312 | |
| 313 with self._step_stream(se, name='second step') as step: | |
| 314 step.set_step_status('SUCCESS') | |
| 315 step.write_split('multiple\nlines\nof\ntext') | |
| 316 | |
| 317 # Create two log streams with the same name to test indexing. | |
| 318 # | |
| 319 # Note that "log stream" is an invalid LogDog stream name, so this | |
| 320 # will also test normalization. | |
| 321 with self._log_stream(step, 'log stream') as ls: | |
| 322 ls.write_split('foo\nbar\nbaz\n') | |
| 323 with self._log_stream(step, 'log stream') as ls: | |
| 324 ls.write_split('qux\nquux\n') | |
| 325 | |
| 326 # This is a different stream name, but will normalize to the same log | |
| 327 # stream name as 'second/step', so this will test that we disambiguate | |
| 328 # the log stream names. | |
| 329 with self._step_stream(se, name='second/step') as step: | |
| 330 pass | |
| 331 | |
| 332 self.assertEqual(self.client.all_streams(), { | |
| 333 u'test/base/annotations': { | |
| 334 u'name': u'steps', | |
| 335 u'status': u'SUCCESS', | |
| 336 u'started': u'2106-06-12T01:02:03Z', | |
| 337 u'ended': u'2106-06-12T01:02:10Z', | |
|
martiniss
2016/09/01 21:59:48
The 7 seconds here are a bit hard to see that they
| |
| 338 u'command': { | |
| 339 u'commandLine': [u'fake_program', u'arg0', u'arg1'], | |
| 340 u'cwd': u'CWD', | |
| 341 u'environ': {u'foo': u'bar'}, | |
| 342 }, | |
| 343 u'substep': [ | |
| 344 | |
| 345 {u'step': { | |
| 346 u'name': u'first step', | |
| 347 u'status': u'SUCCESS', | |
| 348 u'started': u'2106-06-12T01:02:04Z', | |
| 349 u'ended': u'2106-06-12T01:02:05Z', | |
| 350 u'command': { | |
| 351 u'commandLine': [u'first', u'step'], | |
| 352 u'cwd': u'FIRST_CWD', | |
| 353 }, | |
| 354 u'stdoutStream': { | |
| 355 u'name': u'test/base/steps/first_step/stdout', | |
| 356 }, | |
| 357 u'text': [u'Everything is great.', u'Sup', u'Dawg?'], | |
| 358 u'otherLinks': [ | |
| 359 { | |
| 360 u'label': u'example 1', | |
| 361 u'url': u'http://example.com/1', | |
| 362 }, | |
| 363 { | |
| 364 u'label': u'example 2', | |
| 365 u'url': u'http://example.com/2', | |
| 366 }, | |
| 367 ], | |
| 368 }}, | |
| 369 | |
| 370 {u'step': { | |
| 371 u'name': u'second step', | |
| 372 u'status': u'SUCCESS', | |
| 373 u'started': u'2106-06-12T01:02:06Z', | |
| 374 u'ended': u'2106-06-12T01:02:07Z', | |
| 375 u'stdoutStream': { | |
| 376 u'name': u'test/base/steps/second_step/stdout', | |
| 377 }, | |
| 378 u'otherLinks': [ | |
| 379 { | |
| 380 u'label': u'log stream', | |
| 381 u'logdogStream': { | |
| 382 u'name': u'test/base/steps/second_step/logs/log_stream/0', | |
| 383 }, | |
| 384 }, | |
| 385 { | |
| 386 u'label': u'log stream', | |
| 387 u'logdogStream': { | |
| 388 u'name': u'test/base/steps/second_step/logs/log_stream/1', | |
| 389 }, | |
| 390 }, | |
| 391 ], | |
| 392 }}, | |
| 393 | |
| 394 {u'step': { | |
| 395 u'name': u'second/step', | |
| 396 u'status': u'SUCCESS', | |
| 397 u'started': u'2106-06-12T01:02:08Z', | |
| 398 u'ended': u'2106-06-12T01:02:09Z', | |
| 399 }}, | |
| 400 ], | |
| 401 }, | |
| 402 | |
| 403 u'test/base/steps/first_step/stdout': [ | |
| 404 u'STDOUT for first step.', | |
| 405 u'(Another line)', | |
| 406 ], | |
| 407 | |
| 408 u'test/base/steps/second_step/stdout': [ | |
| 409 u'multiple', | |
| 410 u'lines', | |
| 411 u'of', | |
| 412 u'text', | |
| 413 ], | |
| 414 | |
| 415 u'test/base/steps/second_step/logs/log_stream/0': [ | |
| 416 u'foo', | |
| 417 u'bar', | |
| 418 u'baz', | |
| 419 ], | |
| 420 | |
| 421 u'test/base/steps/second_step/logs/log_stream/1': [ | |
| 422 u'qux', | |
| 423 u'quux', | |
| 424 ], | |
| 425 }) | |
| 426 | |
| 427 def testWarningBasicStream(self): | |
| 428 with self._new_stream_engine() as se: | |
| 429 with self._step_stream(se, name='isuck') as step: | |
| 430 step.add_step_summary_text('Something went wrong.') | |
| 431 step.set_step_status('WARNING') | |
| 432 | |
| 433 self.assertEqual(self.client.all_streams(), { | |
| 434 u'annotations': { | |
| 435 u'name': u'steps', | |
| 436 u'status': u'SUCCESS', | |
| 437 u'started': u'2106-06-12T01:02:03Z', | |
| 438 u'ended': u'2106-06-12T01:02:06Z', | |
| 439 u'substep': [ | |
| 440 | |
| 441 {u'step': { | |
| 442 u'name': u'isuck', | |
| 443 u'status': u'SUCCESS', | |
|
martiniss
2016/09/01 21:59:48
Wait, shouldn't this be WARNING ??
| |
| 444 u'failureDetails': { | |
| 445 u'text': u'Something went wrong.', | |
| 446 }, | |
| 447 u'started': u'2106-06-12T01:02:04Z', | |
| 448 u'ended': u'2106-06-12T01:02:05Z', | |
| 449 u'text': [u'Something went wrong.'], | |
| 450 }}, | |
| 451 ], | |
| 452 }, | |
| 453 }) | |
| 454 | |
| 455 def testFailedBasicStream(self): | |
| 456 with self._new_stream_engine() as se: | |
| 457 with self._step_stream(se, name='isuck') as step: | |
| 458 step.add_step_summary_text('Oops I failed.') | |
| 459 step.set_step_status('FAILURE') | |
| 460 | |
| 461 with self._step_stream(se, name='irock') as step: | |
| 462 pass | |
| 463 | |
| 464 self.assertEqual(self.client.all_streams(), { | |
| 465 u'annotations': { | |
| 466 u'name': u'steps', | |
| 467 u'status': u'FAILURE', | |
| 468 u'started': u'2106-06-12T01:02:03Z', | |
| 469 u'ended': u'2106-06-12T01:02:08Z', | |
| 470 u'substep': [ | |
| 471 | |
| 472 {u'step': { | |
| 473 u'name': u'isuck', | |
| 474 u'status': u'FAILURE', | |
| 475 u'failureDetails': { | |
| 476 u'text': u'Oops I failed.', | |
| 477 }, | |
| 478 u'started': u'2106-06-12T01:02:04Z', | |
| 479 u'ended': u'2106-06-12T01:02:05Z', | |
| 480 u'text': [u'Oops I failed.'], | |
| 481 }}, | |
| 482 | |
| 483 {u'step': { | |
| 484 u'name': u'irock', | |
| 485 u'status': u'SUCCESS', | |
| 486 u'started': u'2106-06-12T01:02:06Z', | |
| 487 u'ended': u'2106-06-12T01:02:07Z', | |
| 488 }}, | |
| 489 ], | |
| 490 }, | |
| 491 }) | |
| 492 | |
| 493 def testNestedStream(self): | |
| 494 with self._new_stream_engine() as se: | |
| 495 # parent | |
| 496 with self._step_stream(se, name='parent') as step: | |
| 497 step.write_line('I am the parent.') | |
| 498 | |
| 499 # parent."child 1" | |
| 500 with self._step_stream(se, | |
| 501 name='child 1', | |
| 502 step_nest_level=1) as step: | |
| 503 step.write_line('I am child #1.') | |
| 504 | |
| 505 # parent."child 1"."grandchild" | |
| 506 with self._step_stream(se, | |
| 507 name='grandchild', | |
| 508 step_nest_level=2) as step: | |
| 509 step.write_line("I am child #1's child.") | |
| 510 | |
| 511 # parent."child 2". Mark this child as failed. This should not propagate | |
|
martiniss
2016/09/01 21:59:48
where do you mark this child as failed?
| |
| 512 # to the parent, since it has an explicit status. | |
| 513 with self._step_stream(se, | |
| 514 name='child 2', | |
| 515 step_nest_level=1) as step: | |
| 516 step.write_line('I am child #2.') | |
| 517 | |
| 518 # parent."child 2". Mark this child as failed. This should not propagate | |
| 519 # to the parent, since it has an explicit status. | |
| 520 with self._step_stream(se, name='friend') as step: | |
| 521 step.write_line("I am the parent's friend.") | |
| 522 | |
| 523 self.assertEqual(self.client.all_streams(), { | |
| 524 u'annotations': { | |
| 525 u'name': u'steps', | |
| 526 u'status': u'SUCCESS', | |
| 527 u'started': u'2106-06-12T01:02:03Z', | |
| 528 u'ended': u'2106-06-12T01:02:14Z', | |
| 529 u'substep': [ | |
| 530 | |
| 531 {u'step': { | |
| 532 u'name': u'parent', | |
| 533 u'status': u'SUCCESS', | |
| 534 u'started': u'2106-06-12T01:02:04Z', | |
| 535 u'ended': u'2106-06-12T01:02:05Z', | |
| 536 u'stdoutStream': { | |
| 537 u'name': u'steps/parent/stdout', | |
| 538 }, | |
| 539 u'substep': [ | |
| 540 | |
| 541 {u'step': { | |
| 542 u'name': u'child 1', | |
| 543 u'status': u'SUCCESS', | |
| 544 u'started': u'2106-06-12T01:02:06Z', | |
| 545 u'ended': u'2106-06-12T01:02:07Z', | |
| 546 u'stdoutStream': { | |
| 547 u'name': u'steps/parent/steps/child_1/stdout', | |
| 548 }, | |
| 549 u'substep': [ | |
| 550 | |
| 551 {u'step': { | |
| 552 u'name': u'grandchild', | |
| 553 u'status': u'SUCCESS', | |
| 554 u'started': u'2106-06-12T01:02:08Z', | |
| 555 u'ended': u'2106-06-12T01:02:09Z', | |
| 556 u'stdoutStream': { | |
| 557 u'name': u'steps/parent/steps/child_1/' | |
| 558 'steps/grandchild/stdout', | |
| 559 }, | |
| 560 }}, | |
| 561 ], | |
| 562 }}, | |
| 563 | |
| 564 {u'step': { | |
| 565 u'name': u'child 2', | |
| 566 u'status': u'SUCCESS', | |
| 567 u'started': u'2106-06-12T01:02:10Z', | |
| 568 u'ended': u'2106-06-12T01:02:11Z', | |
| 569 u'stdoutStream': { | |
| 570 u'name': u'steps/parent/steps/child_2/stdout', | |
| 571 }, | |
| 572 }}, | |
| 573 ], | |
| 574 }}, | |
| 575 | |
| 576 {u'step': { | |
| 577 u'name': u'friend', | |
| 578 u'status': u'SUCCESS', | |
| 579 u'started': u'2106-06-12T01:02:12Z', | |
| 580 u'ended': u'2106-06-12T01:02:13Z', | |
| 581 u'stdoutStream': { | |
| 582 u'name': u'steps/friend/stdout', | |
| 583 }, | |
| 584 }}, | |
| 585 ], | |
| 586 }, | |
| 587 | |
| 588 u'steps/parent/stdout': [u'I am the parent.'], | |
| 589 u'steps/parent/steps/child_1/stdout': [u'I am child #1.'], | |
| 590 u'steps/parent/steps/child_1/steps/grandchild/stdout': [ | |
| 591 u"I am child #1's child."], | |
| 592 u'steps/parent/steps/child_2/stdout': [u'I am child #2.'], | |
| 593 u'steps/friend/stdout': [u"I am the parent's friend."], | |
| 594 }) | |
| 595 | |
| 596 def testTriggersRaiseException(self): | |
| 597 with self._new_stream_engine() as se: | |
| 598 with self._step_stream(se, name='trigger') as step: | |
| 599 with self.assertRaises(NotImplementedError): | |
| 600 step.trigger('trigger spec') | |
| 601 | |
| 602 def testTriggersIgnored(self): | |
| 603 with self._new_stream_engine(ignore_triggers=True) as se: | |
| 604 with self._step_stream(se, name='trigger') as step: | |
| 605 step.trigger('trigger spec') | |
| 606 | |
| 607 def testNoSubannotations(self): | |
| 608 with self._new_stream_engine(ignore_triggers=True) as se: | |
| 609 with self.assertRaises(NotImplementedError): | |
| 610 se.new_step_stream(recipe_api._make_step_config( | |
| 611 name='uses subannotations', | |
| 612 allow_subannotations=True, | |
| 613 )) | |
| 614 | |
| 615 def testInvalidStepStatusRaisesValueError(self): | |
| 616 with self._new_stream_engine() as se: | |
| 617 with self._step_stream(se, name='trigger') as step: | |
| 618 with self.assertRaises(ValueError): | |
| 619 step.set_step_status('OHAI') | |
| 620 | |
| 621 | |
| 622 class AnnotationMonitorTest(unittest.TestCase): | |
| 623 """Tests the stream_logdog._AnnotationMonitor directly.""" | |
| 624 | |
| 625 # A small timedelta, sufficient to block but fast enough to not make the | |
| 626 # test slow. | |
| 627 _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5) | |
|
martiniss
2016/09/01 21:59:48
How hard would it be to actually mock time? This m
| |
| 628 | |
| 629 class _DatagramBuffer(object): | |
| 630 | |
| 631 def __init__(self): | |
| 632 self.datagrams = [] | |
| 633 self.data_event = threading.Event() | |
| 634 | |
| 635 def send(self, dg): | |
| 636 self.datagrams.append(dg) | |
| 637 self.data_event.set() | |
| 638 | |
| 639 def __len__(self): | |
| 640 return len(self.datagrams) | |
| 641 | |
| 642 @property | |
| 643 def latest(self): | |
| 644 if self.datagrams: | |
| 645 return self.datagrams[-1] | |
| 646 return None | |
| 647 | |
| 648 def wait_for_data(self): | |
| 649 self.data_event.wait() | |
| 650 self.data_event.clear() | |
| 651 return self.latest | |
| 652 | |
| 653 | |
| 654 @contextlib.contextmanager | |
| 655 def _annotation_monitor(self, **kwargs): | |
| 656 # Default to a really high flush period. This should never naturally trigger | |
| 657 # during a test case. | |
| 658 kwargs.setdefault('flush_period', datetime.timedelta(hours=1)) | |
| 659 | |
| 660 am = stream_logdog._AnnotationMonitor(self.db, **kwargs) | |
| 661 try: | |
| 662 yield am | |
| 663 finally: | |
| 664 am.flush_and_join() | |
| 665 | |
| 666 with am._lock: | |
| 667 # Assert that our timer has been shut down. | |
| 668 self.assertIsNone(am._flush_timer) | |
| 669 # Assert that there is no buffered data. | |
| 670 self.assertIsNone(am._current_data) | |
| 671 | |
| 672 def setUp(self): | |
| 673 self.db = self._DatagramBuffer() | |
| 674 | |
| 675 def testMonitorStartsAndJoinsWithNoData(self): | |
| 676 with self._annotation_monitor() as am: | |
| 677 pass | |
| 678 | |
| 679 # No datagrams should have been sent. | |
| 680 self.assertIsNone(self.db.latest) | |
| 681 self.assertEqual(len(self.db.datagrams), 0) | |
| 682 | |
| 683 def testMonitorBuffersAndSendsData(self): | |
| 684 with self._annotation_monitor() as am: | |
| 685 # The first piece of data should have been immediately sent. | |
| 686 am.signal_update('initial') | |
| 687 self.assertEqual(self.db.wait_for_data(), 'initial') | |
| 688 | |
| 689 # Subsequent iterations should not send data, but should start the flush | |
| 690 # timer and buffer the latest data. | |
| 691 with am._lock: | |
| 692 self.assertIsNone(am._flush_timer) | |
| 693 for i in xrange(10): | |
| 694 am.signal_update('test%d' % (i,)) | |
| 695 time.sleep(self._SMALL_TIME_DELTA.total_seconds()) | |
| 696 with am._lock: | |
| 697 self.assertEqual(am._current_data, 'test9') | |
| 698 self.assertIsNotNone(am._flush_timer) | |
| 699 | |
| 700 # Pretend the timer triggered. We should receive the latest buffered data. | |
| 701 am._flush_timer_expired() | |
| 702 self.assertEqual(self.db.wait_for_data(), 'test9') | |
| 703 with am._lock: | |
| 704 # No more timer or buffered data. | |
| 705 self.assertIsNone(am._flush_timer) | |
| 706 self.assertIsNone(am._current_data) | |
| 707 | |
| 708 # Send one last chunk of data, but don't let the timer expire. This will | |
| 709 # be sent on final flush. | |
| 710 am.signal_update('final') | |
| 711 with am._lock: | |
| 712 self.assertIsNotNone(am._flush_timer) | |
| 713 | |
| 714 # Assert that the final chunk of data was sent. | |
| 715 self.assertEqual(self.db.latest, 'final') | |
| 716 | |
| 717 # Only three datagrams should have been sent. | |
| 718 self.assertEqual(len(self.db.datagrams), 3) | |
| 719 | |
| 720 def testMonitorIgnoresDuplicateData(self): | |
| 721 with self._annotation_monitor() as am: | |
| 722 # Get initial data out of the way. | |
| 723 am.signal_update('initial') | |
| 724 self.assertEqual(self.db.wait_for_data(), 'initial') | |
| 725 | |
| 726 # Send the same thing. It should not be buffered. | |
| 727 am.signal_update('initial') | |
| 728 with am._lock: | |
| 729 self.assertIsNone(am._flush_timer) | |
| 730 self.assertIsNone(am._current_data) | |
| 731 | |
| 732 # Only one datagrams should have been sent. | |
| 733 self.assertEqual(len(self.db.datagrams), 1) | |
| 734 | |
| 735 def testStructuralUpdateSendsImmediately(self): | |
| 736 with self._annotation_monitor() as am: | |
| 737 # Get initial data out of the way. | |
| 738 am.signal_update('initial') | |
| 739 self.assertEqual(self.db.wait_for_data(), 'initial') | |
| 740 | |
| 741 # Send a structural update. It should send immediately. | |
| 742 am.signal_update('test', structural=True) | |
| 743 self.assertEqual(self.db.wait_for_data(), 'test') | |
| 744 | |
| 745 # Send a duplicate structural update. It should be ignored. | |
| 746 am.signal_update('test', structural=True) | |
| 747 with am._lock: | |
| 748 self.assertIsNone(am._flush_timer) | |
| 749 self.assertIsNone(am._current_data) | |
| 750 | |
| 751 # Only two datagrams should have been sent. | |
| 752 self.assertEqual(len(self.db.datagrams), 2) | |
| 753 | |
| 754 def testFlushesPeriodically(self): | |
| 755 with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am: | |
| 756 # Get initial data out of the way. | |
| 757 am.signal_update('initial') | |
| 758 self.assertEqual(self.db.wait_for_data(), 'initial') | |
| 759 | |
| 760 # Send a structural update. It should send immediately. | |
| 761 am.signal_update('test') | |
| 762 self.assertEqual(self.db.wait_for_data(), 'test') | |
| 763 | |
| 764 # Only two datagrams should have been sent. | |
| 765 self.assertEqual(len(self.db.datagrams), 2) | |
| 766 | |
| 767 | |
| 768 class AnnotationStateTest(unittest.TestCase): | |
| 769 """Tests the stream_logdog._AnnotationState directly.""" | |
| 770 | |
| 771 def setUp(self): | |
| 772 self.env = stream_logdog._Environment( | |
| 773 None, | |
| 774 argv=['command', 'arg0', 'arg1'], | |
| 775 cwd='path/to/cwd', | |
| 776 environ={ | |
| 777 'foo': 'bar', | |
| 778 'FOO': 'baz', | |
| 779 }, | |
| 780 ) | |
| 781 self.astate = stream_logdog._AnnotationState.create( | |
| 782 stream_logdog._StreamName('strean/name'), | |
| 783 env=self.env, | |
| 784 properties={'foo': 'bar'}, | |
| 785 ) | |
| 786 | |
| 787 def testFirstCheckReturnsData(self): | |
| 788 # The first check should return data. | |
| 789 self.assertIsNotNone(self.astate.check()) | |
| 790 # The second will, since nothing has changed. | |
| 791 self.assertIsNone(self.astate.check()) | |
| 792 | |
| 793 def testCanCreateAndGetStep(self): | |
| 794 # Root step. | |
| 795 base = self.astate.base | |
| 796 self.astate.create_step(recipe_api._make_step_config(name='first')) | |
| 797 self.assertEqual(len(base.substep), 1) | |
| 798 self.assertEqual(base.substep[0].step.name, 'first') | |
| 799 self.assertIsNotNone(self.astate.check()) | |
| 800 | |
| 801 # Child step. | |
| 802 self.astate.create_step(recipe_api._make_step_config( | |
| 803 name='first child', | |
| 804 step_nest_level=1)) | |
| 805 self.assertEqual(len(base.substep), 1) | |
| 806 self.assertEqual(len(base.substep[0].step.substep), 1) | |
| 807 self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child') | |
| 808 self.assertIsNotNone(self.astate.check()) | |
| 809 | |
| 810 # Sibling step to 'first'. | |
| 811 self.astate.create_step(recipe_api._make_step_config(name='second')) | |
| 812 self.assertEqual(len(base.substep), 2) | |
| 813 self.assertEqual(base.substep[1].step.name, 'second') | |
| 814 self.assertIsNotNone(self.astate.check()) | |
| 815 | |
| 816 def testCanUpdateProperties(self): | |
| 817 self.astate.update_properties(foo='baz', qux='quux') | |
| 818 self.assertEqual(list(self.astate.base.property), [ | |
| 819 pb.Step.Property(name='foo', value='baz'), | |
| 820 pb.Step.Property(name='qux', value='quux'), | |
| 821 ]) | |
| 822 | |
| 823 | |
| 824 class StreamNameTest(unittest.TestCase): | |
| 825 """Tests the stream_logdog._StreamName directly.""" | |
| 826 | |
| 827 def testEmptyStreamNameRaisesValueError(self): | |
| 828 sn = stream_logdog._StreamName(None) | |
| 829 with self.assertRaises(ValueError): | |
| 830 str(sn) | |
| 831 | |
| 832 def testInvalidBaseRaisesValueError(self): | |
| 833 with self.assertRaises(ValueError): | |
| 834 stream_logdog._StreamName('!!! invalid !!!') | |
| 835 | |
| 836 def testAppendComponents(self): | |
| 837 sn = stream_logdog._StreamName('base') | |
| 838 self.assertEqual(str(sn.append()), 'base') | |
| 839 self.assertEqual(str(sn.append('foo')), 'base/foo') | |
| 840 self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar') | |
| 841 self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz') | |
| 842 | |
| 843 def testAugment(self): | |
| 844 sn = stream_logdog._StreamName('base') | |
| 845 self.assertEqual(str(sn.augment('')), 'base') | |
| 846 self.assertEqual(str(sn.augment('foo')), 'basefoo') | |
| 847 self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz') | |
| 848 | |
| 849 def testAppendInvalidStreamNameNormalizes(self): | |
| 850 sn = stream_logdog._StreamName('base') | |
| 851 sn = sn.append('#!!! stream name !!!') | |
| 852 self.assertEqual(str(sn), 'base/s______stream_name____') | |
| 853 | |
| 854 def testAugmentInvalidStreamNameNormalizes(self): | |
| 855 sn = stream_logdog._StreamName('base') | |
| 856 self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____') | |
| 857 | |
| 858 | |
| 859 if __name__ == '__main__': | |
| 860 unittest.main() | |
| OLD | NEW |