Chromium Code Reviews| Index: recipe_engine/unittests/stream_logdog_test.py |
| diff --git a/recipe_engine/unittests/stream_logdog_test.py b/recipe_engine/unittests/stream_logdog_test.py |
| new file mode 100755 |
| index 0000000000000000000000000000000000000000..228e1bdb8361dab60c365552d959b962ebe9e227 |
| --- /dev/null |
| +++ b/recipe_engine/unittests/stream_logdog_test.py |
| @@ -0,0 +1,860 @@ |
| +#!/usr/bin/env python |
| +# Copyright 2015 The LUCI Authors. All rights reserved. |
| +# Use of this source code is governed under the Apache License, Version 2.0 |
| +# that can be found in the LICENSE file. |
| + |
| +import collections |
| +import contextlib |
| +import datetime |
| +import json |
| +import threading |
| +import time |
| +import unittest |
| +import StringIO |
| + |
| +import test_env |
| + |
| +import libs.logdog.stream |
| +import libs.logdog.varint |
| +from google.protobuf import json_format as jsonpb |
| +from recipe_engine import recipe_api |
| +from recipe_engine import stream |
| +from recipe_engine import stream_logdog |
| + |
| + |
| +import annotations_pb2 as pb |
| + |
| + |
| +def _translate_annotation_datagram(dg): |
| + """Translate annotation datagram binary data into a Python dict modeled after |
| + the JSONPB projection of the datagram. |
| + |
| + This is chosen because it allows for easy idiomatic equality assertions in |
| + test cases. |
| + |
| + Args: |
| + dg (str): The serialized annotation pb.Step datagram. |
| + """ |
| + msg = pb.Step() |
| + msg.ParseFromString(dg) |
| + return json.loads(jsonpb.MessageToJson(msg)) |
| + |
| + |
| +class _TestStreamClient(libs.logdog.stream.StreamClient): |
| + """A testing StreamClient that retains all data written to it.""" |
| + |
| + class Stream(object): |
| + """A file-like object that is explicitly aware of LogDog stream protocol.""" |
| + |
| + def __init__(self, stream_client): |
| + self._client = stream_client |
| + self._buf = StringIO.StringIO() |
| + self._header = None |
| + self._final_data = None |
| + self._data_offset = None |
| + |
| + def write(self, data): |
| + self._buf.write(data) |
| + self._attempt_registration() |
| + |
| + def close(self): |
| + # If we never parsed our header, register that we are incomplete. |
| + if self._header is None: |
| + self._client._register_incomplete(self) |
| + |
| + self._final_data = self.data |
| + self._buf.close() |
| + |
| + @contextlib.contextmanager |
| + def _read_from(self, offset): |
| + # Seek to the specified offset. |
| + self._buf.seek(offset, mode=0) |
| + try: |
| + yield self._buf |
| + finally: |
| + # Seek back to he end of the stream so future writes will append. |
|
martiniss
2016/09/01 21:59:48
typo
|
| + self._buf.seek(0, mode=2) |
| + |
| + def _attempt_registration(self): |
| + # Only need to register once. |
| + if self._header is not None: |
| + return |
| + |
| + # Can we parse a full LogDog stream header? |
|
martiniss
2016/09/01 21:59:47
What do you mean can we? Try to?
|
| + # |
| + # This means pulling: |
| + # - The LogDog Butler header. |
| + # - The header size varint. |
| + # - The header JSON blob, which needs to be decoded. |
| + with self._read_from(0) as fd: |
| + # Read 'result' bytes. |
| + magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC)) |
| + if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC): |
| + # Incomplete magic number, cannot complete registration. |
|
martiniss
2016/09/01 21:59:48
throw an exception? here and other places where yo
|
| + return |
| + count = len(magic_data) |
| + |
| + try: |
| + size, varint_count = libs.logdog.varint.read_uvarint(fd) |
| + except ValueError: |
| + # Incomplete varint, cannot complete registration. |
| + return |
| + count += varint_count |
| + |
| + header_data = fd.read(size) |
| + if len(header_data) != size: |
| + # Incomplete header, cannot complete registration. |
| + return |
| + count += size |
| + |
| + # Parse the header as JSON. |
| + self._header = json.loads(header_data) |
| + self._data_offset = count # (varint + header size) |
| + self._client._register_stream(self, self._header) |
| + |
| + @property |
| + def data(self): |
| + # If we have already cached our data (on close), return it directly. |
| + if self._final_data is not None: |
| + return self._final_data |
| + |
| + # Load our data from our live buffer. |
| + if self._data_offset is None: |
| + # No header has been read, so there is no data. |
| + return None |
| + with self._read_from(self._data_offset) as fd: |
| + return fd.read() |
| + |
| + |
| + _StreamEntry = collections.namedtuple('_StreamEntry', ( |
| + 's', 'type', 'content_type')) |
| + |
| + _DATAGRAM_CONTENT_TRANSLATE = { |
| + stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram, |
| + } |
| + |
| + |
| + def __init__(self): |
| + super(_TestStreamClient, self).__init__() |
| + self.streams = {} |
| + self.incomplete = [] |
| + self.unregistered = {} |
| + |
| + @classmethod |
| + def _create(cls, value): |
| + raise NotImplementedError('Instances must be created manually.') |
| + |
| + def _connect_raw(self): |
| + s = self.Stream(self) |
| + self.unregistered[id(s)] = s |
| + return s |
| + |
| + def get(self, name): |
| + se = self.streams[name] |
| + data = se.s.data |
| + |
| + if se.type == libs.logdog.stream.StreamParams.TEXT: |
| + # Return text stream data as a list of lines. We use unicode because it |
| + # fits in with the JSON dump from 'all_streams'. |
| + return [unicode(l) for l in data.splitlines()] |
| + elif se.type == libs.logdog.stream.StreamParams.BINARY: |
| + raise NotImplementedError('No support for fetching binary stream data.') |
| + elif se.type == libs.logdog.stream.StreamParams.DATAGRAM: |
| + # Return datagram stream data as a list of datagrams. |
| + sio = StringIO.StringIO(data) |
| + datagrams = [] |
| + while sio.tell() < sio.len: |
| + size, _ = libs.logdog.varint.read_uvarint(sio) |
| + dg = sio.read(size) |
| + if len(dg) != size: |
| + raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size)) |
| + |
| + # If this datagram is a known type (e.g., protobuf), transform it into |
| + # JSONPB. |
| + translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type) |
| + if translator is not None: |
| + dg = translator(dg) |
| + datagrams.append(dg) |
| + |
| + sio.close() |
| + return dg |
| + else: |
| + raise ValueError('Unknown stream type [%s]' % (se.type,)) |
| + |
| + def all_streams(self): |
| + return dict((name, self.get(name)) for name in self.streams.iterkeys()) |
|
martiniss
2016/09/01 21:59:48
why not just self.streams.items() ?
|
| + |
| + @property |
| + def stream_names(self): |
| + return set(self.streams.iterkeys()) |
| + |
| + def _remove_from_unregistered(self, s): |
| + if id(s) not in self.unregistered: |
| + raise KeyError('Stream is not known to be unregistered.') |
| + del(self.unregistered[id(s)]) |
|
martiniss
2016/09/01 21:59:48
Stylistically, are you supposed to do "del(x[a])",
|
| + |
| + def _register_stream(self, s, header): |
| + name = header.get('name') |
| + if name in self.streams: |
| + raise KeyError('Duplicate stream [%s]' % (name,)) |
| + |
| + self._remove_from_unregistered(s) |
| + self.streams[name] = self._StreamEntry( |
| + s=s, |
| + type=header['type'], |
| + content_type=header.get('contentType'), |
| + ) |
| + |
| + def _register_incomplete(self, s): |
| + self._remove_from_unregistered(s) |
| + self.incomplete.append(s) |
| + |
| + |
| +class EnvironmentTest(unittest.TestCase): |
| + """Simple test to assert that _Environment, which is stubbed during our tests, |
| + actually works.""" |
| + |
| + def testEnvironmentProbes(self): |
| + stream_logdog._Environment.probe() |
| + |
| + |
| +class StreamEngineTest(unittest.TestCase): |
| + |
| + def setUp(self): |
| + self.client = _TestStreamClient() |
| + self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) |
| + self.env = stream_logdog._Environment( |
| + now_fn=lambda: self.now, |
| + argv=[], |
| + environ={}, |
| + cwd=None, |
| + ) |
| + self.maxDiff = 1024*1024 |
| + |
| + |
|
martiniss
2016/09/01 21:59:48
nit: spaces
|
| + @contextlib.contextmanager |
| + def _new_stream_engine(self, **kwargs): |
| + kwargs.setdefault('client', self.client) |
| + kwargs.setdefault('env', self.env) |
| + |
| + # Initialize and open a StreamEngine. |
| + se = stream_logdog.StreamEngine(**kwargs) |
| + se.open() |
| + yield se |
| + |
| + # Close the StreamEngine after we're done with it. |
|
martiniss
2016/09/01 21:59:48
comment doesn't match the line below it.
|
| + self._advance_time() |
|
martiniss
2016/09/01 21:59:48
Personally, I'd not have something like this in he
|
| + se.close() |
| + |
| + @contextlib.contextmanager |
| + def _step_stream(self, se, **kwargs): |
| + # Initialize and yield a new step stream. |
| + self._advance_time() |
| + step_stream = se.new_step_stream(recipe_api._make_step_config(**kwargs)) |
| + yield step_stream |
| + |
| + # Close the step stream when we're done with it. |
|
martiniss
2016/09/01 21:59:48
same as above.
|
| + self._advance_time() |
|
martiniss
2016/09/01 21:59:47
same as above.
|
| + step_stream.close() |
| + |
| + @contextlib.contextmanager |
| + def _log_stream(self, step_stream, name): |
| + # Initialize and yield a new log stream. |
| + log_stream = step_stream.new_log_stream(name) |
| + yield log_stream |
| + |
| + # Close the log stream when we're done with it. |
| + log_stream.close() |
| + |
| + def _advance_time(self): |
| + self.now += datetime.timedelta(seconds=1) |
| + |
| + def testEmptyStreamEngine(self): |
| + self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| + self.env.environ['foo'] = 'bar' |
| + self.env.cwd = 'CWD' |
| + |
| + with self._new_stream_engine() as se: |
|
martiniss
2016/09/01 21:59:48
this line doesn't seem to do anything... comment?
|
| + pass |
| + |
| + self.assertEqual(self.client.all_streams(), { |
| + u'annotations': { |
| + u'name': u'steps', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:03Z', |
| + u'ended': u'2106-06-12T01:02:04Z', |
| + u'command': { |
| + u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| + u'cwd': u'CWD', |
| + u'environ': {u'foo': u'bar'}, |
| + }, |
| + }, |
| + }) |
| + |
| + def testBasicStream(self): |
| + self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| + self.env.environ['foo'] = 'bar' |
| + self.env.cwd = 'CWD' |
| + |
| + with self._new_stream_engine(name_base='test/base') as se: |
| + with self._step_stream(se, |
| + name='first step', |
| + cmd=['first', 'step'], |
| + cwd='FIRST_CWD') as step: |
| + step.add_step_text('Sup') |
| + step.add_step_text('Dawg?') |
| + step.write_line('STDOUT for first step.') |
| + step.write_line('(Another line)') |
| + step.add_step_summary_text('Everything is great.') |
| + step.add_step_link('example 1', 'http://example.com/1') |
| + step.add_step_link('example 2', 'http://example.com/2') |
| + step.set_step_status('SUCCESS') |
| + |
| + with self._step_stream(se, name='second step') as step: |
| + step.set_step_status('SUCCESS') |
| + step.write_split('multiple\nlines\nof\ntext') |
| + |
| + # Create two log streams with the same name to test indexing. |
| + # |
| + # Note that "log stream" is an invalid LogDog stream name, so this |
| + # will also test normalization. |
| + with self._log_stream(step, 'log stream') as ls: |
| + ls.write_split('foo\nbar\nbaz\n') |
| + with self._log_stream(step, 'log stream') as ls: |
| + ls.write_split('qux\nquux\n') |
| + |
| + # This is a different stream name, but will normalize to the same log |
| + # stream name as 'second/step', so this will test that we disambiguate |
| + # the log stream names. |
| + with self._step_stream(se, name='second/step') as step: |
| + pass |
| + |
| + self.assertEqual(self.client.all_streams(), { |
| + u'test/base/annotations': { |
| + u'name': u'steps', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:03Z', |
| + u'ended': u'2106-06-12T01:02:10Z', |
|
martiniss
2016/09/01 21:59:48
The 7 seconds here are a bit hard to see that they
|
| + u'command': { |
| + u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| + u'cwd': u'CWD', |
| + u'environ': {u'foo': u'bar'}, |
| + }, |
| + u'substep': [ |
| + |
| + {u'step': { |
| + u'name': u'first step', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:04Z', |
| + u'ended': u'2106-06-12T01:02:05Z', |
| + u'command': { |
| + u'commandLine': [u'first', u'step'], |
| + u'cwd': u'FIRST_CWD', |
| + }, |
| + u'stdoutStream': { |
| + u'name': u'test/base/steps/first_step/stdout', |
| + }, |
| + u'text': [u'Everything is great.', u'Sup', u'Dawg?'], |
| + u'otherLinks': [ |
| + { |
| + u'label': u'example 1', |
| + u'url': u'http://example.com/1', |
| + }, |
| + { |
| + u'label': u'example 2', |
| + u'url': u'http://example.com/2', |
| + }, |
| + ], |
| + }}, |
| + |
| + {u'step': { |
| + u'name': u'second step', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:06Z', |
| + u'ended': u'2106-06-12T01:02:07Z', |
| + u'stdoutStream': { |
| + u'name': u'test/base/steps/second_step/stdout', |
| + }, |
| + u'otherLinks': [ |
| + { |
| + u'label': u'log stream', |
| + u'logdogStream': { |
| + u'name': u'test/base/steps/second_step/logs/log_stream/0', |
| + }, |
| + }, |
| + { |
| + u'label': u'log stream', |
| + u'logdogStream': { |
| + u'name': u'test/base/steps/second_step/logs/log_stream/1', |
| + }, |
| + }, |
| + ], |
| + }}, |
| + |
| + {u'step': { |
| + u'name': u'second/step', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:08Z', |
| + u'ended': u'2106-06-12T01:02:09Z', |
| + }}, |
| + ], |
| + }, |
| + |
| + u'test/base/steps/first_step/stdout': [ |
| + u'STDOUT for first step.', |
| + u'(Another line)', |
| + ], |
| + |
| + u'test/base/steps/second_step/stdout': [ |
| + u'multiple', |
| + u'lines', |
| + u'of', |
| + u'text', |
| + ], |
| + |
| + u'test/base/steps/second_step/logs/log_stream/0': [ |
| + u'foo', |
| + u'bar', |
| + u'baz', |
| + ], |
| + |
| + u'test/base/steps/second_step/logs/log_stream/1': [ |
| + u'qux', |
| + u'quux', |
| + ], |
| + }) |
| + |
| + def testWarningBasicStream(self): |
| + with self._new_stream_engine() as se: |
| + with self._step_stream(se, name='isuck') as step: |
| + step.add_step_summary_text('Something went wrong.') |
| + step.set_step_status('WARNING') |
| + |
| + self.assertEqual(self.client.all_streams(), { |
| + u'annotations': { |
| + u'name': u'steps', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:03Z', |
| + u'ended': u'2106-06-12T01:02:06Z', |
| + u'substep': [ |
| + |
| + {u'step': { |
| + u'name': u'isuck', |
| + u'status': u'SUCCESS', |
|
martiniss
2016/09/01 21:59:48
Wait, shouldn't this be WARNING ??
|
| + u'failureDetails': { |
| + u'text': u'Something went wrong.', |
| + }, |
| + u'started': u'2106-06-12T01:02:04Z', |
| + u'ended': u'2106-06-12T01:02:05Z', |
| + u'text': [u'Something went wrong.'], |
| + }}, |
| + ], |
| + }, |
| + }) |
| + |
| + def testFailedBasicStream(self): |
| + with self._new_stream_engine() as se: |
| + with self._step_stream(se, name='isuck') as step: |
| + step.add_step_summary_text('Oops I failed.') |
| + step.set_step_status('FAILURE') |
| + |
| + with self._step_stream(se, name='irock') as step: |
| + pass |
| + |
| + self.assertEqual(self.client.all_streams(), { |
| + u'annotations': { |
| + u'name': u'steps', |
| + u'status': u'FAILURE', |
| + u'started': u'2106-06-12T01:02:03Z', |
| + u'ended': u'2106-06-12T01:02:08Z', |
| + u'substep': [ |
| + |
| + {u'step': { |
| + u'name': u'isuck', |
| + u'status': u'FAILURE', |
| + u'failureDetails': { |
| + u'text': u'Oops I failed.', |
| + }, |
| + u'started': u'2106-06-12T01:02:04Z', |
| + u'ended': u'2106-06-12T01:02:05Z', |
| + u'text': [u'Oops I failed.'], |
| + }}, |
| + |
| + {u'step': { |
| + u'name': u'irock', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:06Z', |
| + u'ended': u'2106-06-12T01:02:07Z', |
| + }}, |
| + ], |
| + }, |
| + }) |
| + |
| + def testNestedStream(self): |
| + with self._new_stream_engine() as se: |
| + # parent |
| + with self._step_stream(se, name='parent') as step: |
| + step.write_line('I am the parent.') |
| + |
| + # parent."child 1" |
| + with self._step_stream(se, |
| + name='child 1', |
| + step_nest_level=1) as step: |
| + step.write_line('I am child #1.') |
| + |
| + # parent."child 1"."grandchild" |
| + with self._step_stream(se, |
| + name='grandchild', |
| + step_nest_level=2) as step: |
| + step.write_line("I am child #1's child.") |
| + |
| + # parent."child 2". Mark this child as failed. This should not propagate |
|
martiniss
2016/09/01 21:59:48
where do you mark this child as failed?
|
| + # to the parent, since it has an explicit status. |
| + with self._step_stream(se, |
| + name='child 2', |
| + step_nest_level=1) as step: |
| + step.write_line('I am child #2.') |
| + |
| + # parent."child 2". Mark this child as failed. This should not propagate |
| + # to the parent, since it has an explicit status. |
| + with self._step_stream(se, name='friend') as step: |
| + step.write_line("I am the parent's friend.") |
| + |
| + self.assertEqual(self.client.all_streams(), { |
| + u'annotations': { |
| + u'name': u'steps', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:03Z', |
| + u'ended': u'2106-06-12T01:02:14Z', |
| + u'substep': [ |
| + |
| + {u'step': { |
| + u'name': u'parent', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:04Z', |
| + u'ended': u'2106-06-12T01:02:05Z', |
| + u'stdoutStream': { |
| + u'name': u'steps/parent/stdout', |
| + }, |
| + u'substep': [ |
| + |
| + {u'step': { |
| + u'name': u'child 1', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:06Z', |
| + u'ended': u'2106-06-12T01:02:07Z', |
| + u'stdoutStream': { |
| + u'name': u'steps/parent/steps/child_1/stdout', |
| + }, |
| + u'substep': [ |
| + |
| + {u'step': { |
| + u'name': u'grandchild', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:08Z', |
| + u'ended': u'2106-06-12T01:02:09Z', |
| + u'stdoutStream': { |
| + u'name': u'steps/parent/steps/child_1/' |
| + 'steps/grandchild/stdout', |
| + }, |
| + }}, |
| + ], |
| + }}, |
| + |
| + {u'step': { |
| + u'name': u'child 2', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:10Z', |
| + u'ended': u'2106-06-12T01:02:11Z', |
| + u'stdoutStream': { |
| + u'name': u'steps/parent/steps/child_2/stdout', |
| + }, |
| + }}, |
| + ], |
| + }}, |
| + |
| + {u'step': { |
| + u'name': u'friend', |
| + u'status': u'SUCCESS', |
| + u'started': u'2106-06-12T01:02:12Z', |
| + u'ended': u'2106-06-12T01:02:13Z', |
| + u'stdoutStream': { |
| + u'name': u'steps/friend/stdout', |
| + }, |
| + }}, |
| + ], |
| + }, |
| + |
| + u'steps/parent/stdout': [u'I am the parent.'], |
| + u'steps/parent/steps/child_1/stdout': [u'I am child #1.'], |
| + u'steps/parent/steps/child_1/steps/grandchild/stdout': [ |
| + u"I am child #1's child."], |
| + u'steps/parent/steps/child_2/stdout': [u'I am child #2.'], |
| + u'steps/friend/stdout': [u"I am the parent's friend."], |
| + }) |
| + |
| + def testTriggersRaiseException(self): |
| + with self._new_stream_engine() as se: |
| + with self._step_stream(se, name='trigger') as step: |
| + with self.assertRaises(NotImplementedError): |
| + step.trigger('trigger spec') |
| + |
| + def testTriggersIgnored(self): |
| + with self._new_stream_engine(ignore_triggers=True) as se: |
| + with self._step_stream(se, name='trigger') as step: |
| + step.trigger('trigger spec') |
| + |
| + def testNoSubannotations(self): |
| + with self._new_stream_engine(ignore_triggers=True) as se: |
| + with self.assertRaises(NotImplementedError): |
| + se.new_step_stream(recipe_api._make_step_config( |
| + name='uses subannotations', |
| + allow_subannotations=True, |
| + )) |
| + |
| + def testInvalidStepStatusRaisesValueError(self): |
| + with self._new_stream_engine() as se: |
| + with self._step_stream(se, name='trigger') as step: |
| + with self.assertRaises(ValueError): |
| + step.set_step_status('OHAI') |
| + |
| + |
| +class AnnotationMonitorTest(unittest.TestCase): |
| + """Tests the stream_logdog._AnnotationMonitor directly.""" |
| + |
| + # A small timedelta, sufficient to block but fast enough to not make the |
| + # test slow. |
| + _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5) |
|
martiniss
2016/09/01 21:59:48
How hard would it be to actually mock time? This m
|
| + |
| + class _DatagramBuffer(object): |
| + |
| + def __init__(self): |
| + self.datagrams = [] |
| + self.data_event = threading.Event() |
| + |
| + def send(self, dg): |
| + self.datagrams.append(dg) |
| + self.data_event.set() |
| + |
| + def __len__(self): |
| + return len(self.datagrams) |
| + |
| + @property |
| + def latest(self): |
| + if self.datagrams: |
| + return self.datagrams[-1] |
| + return None |
| + |
| + def wait_for_data(self): |
| + self.data_event.wait() |
| + self.data_event.clear() |
| + return self.latest |
| + |
| + |
| + @contextlib.contextmanager |
| + def _annotation_monitor(self, **kwargs): |
| + # Default to a really high flush period. This should never naturally trigger |
| + # during a test case. |
| + kwargs.setdefault('flush_period', datetime.timedelta(hours=1)) |
| + |
| + am = stream_logdog._AnnotationMonitor(self.db, **kwargs) |
| + try: |
| + yield am |
| + finally: |
| + am.flush_and_join() |
| + |
| + with am._lock: |
| + # Assert that our timer has been shut down. |
| + self.assertIsNone(am._flush_timer) |
| + # Assert that there is no buffered data. |
| + self.assertIsNone(am._current_data) |
| + |
| + def setUp(self): |
| + self.db = self._DatagramBuffer() |
| + |
| + def testMonitorStartsAndJoinsWithNoData(self): |
| + with self._annotation_monitor() as am: |
| + pass |
| + |
| + # No datagrams should have been sent. |
| + self.assertIsNone(self.db.latest) |
| + self.assertEqual(len(self.db.datagrams), 0) |
| + |
| + def testMonitorBuffersAndSendsData(self): |
| + with self._annotation_monitor() as am: |
| + # The first piece of data should have been immediately sent. |
| + am.signal_update('initial') |
| + self.assertEqual(self.db.wait_for_data(), 'initial') |
| + |
| + # Subsequent iterations should not send data, but should start the flush |
| + # timer and buffer the latest data. |
| + with am._lock: |
| + self.assertIsNone(am._flush_timer) |
| + for i in xrange(10): |
| + am.signal_update('test%d' % (i,)) |
| + time.sleep(self._SMALL_TIME_DELTA.total_seconds()) |
| + with am._lock: |
| + self.assertEqual(am._current_data, 'test9') |
| + self.assertIsNotNone(am._flush_timer) |
| + |
| + # Pretend the timer triggered. We should receive the latest buffered data. |
| + am._flush_timer_expired() |
| + self.assertEqual(self.db.wait_for_data(), 'test9') |
| + with am._lock: |
| + # No more timer or buffered data. |
| + self.assertIsNone(am._flush_timer) |
| + self.assertIsNone(am._current_data) |
| + |
| + # Send one last chunk of data, but don't let the timer expire. This will |
| + # be sent on final flush. |
| + am.signal_update('final') |
| + with am._lock: |
| + self.assertIsNotNone(am._flush_timer) |
| + |
| + # Assert that the final chunk of data was sent. |
| + self.assertEqual(self.db.latest, 'final') |
| + |
| + # Only three datagrams should have been sent. |
| + self.assertEqual(len(self.db.datagrams), 3) |
| + |
| + def testMonitorIgnoresDuplicateData(self): |
| + with self._annotation_monitor() as am: |
| + # Get initial data out of the way. |
| + am.signal_update('initial') |
| + self.assertEqual(self.db.wait_for_data(), 'initial') |
| + |
| + # Send the same thing. It should not be buffered. |
| + am.signal_update('initial') |
| + with am._lock: |
| + self.assertIsNone(am._flush_timer) |
| + self.assertIsNone(am._current_data) |
| + |
| + # Only one datagrams should have been sent. |
| + self.assertEqual(len(self.db.datagrams), 1) |
| + |
| + def testStructuralUpdateSendsImmediately(self): |
| + with self._annotation_monitor() as am: |
| + # Get initial data out of the way. |
| + am.signal_update('initial') |
| + self.assertEqual(self.db.wait_for_data(), 'initial') |
| + |
| + # Send a structural update. It should send immediately. |
| + am.signal_update('test', structural=True) |
| + self.assertEqual(self.db.wait_for_data(), 'test') |
| + |
| + # Send a duplicate structural update. It should be ignored. |
| + am.signal_update('test', structural=True) |
| + with am._lock: |
| + self.assertIsNone(am._flush_timer) |
| + self.assertIsNone(am._current_data) |
| + |
| + # Only two datagrams should have been sent. |
| + self.assertEqual(len(self.db.datagrams), 2) |
| + |
| + def testFlushesPeriodically(self): |
| + with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am: |
| + # Get initial data out of the way. |
| + am.signal_update('initial') |
| + self.assertEqual(self.db.wait_for_data(), 'initial') |
| + |
| + # Send a structural update. It should send immediately. |
| + am.signal_update('test') |
| + self.assertEqual(self.db.wait_for_data(), 'test') |
| + |
| + # Only two datagrams should have been sent. |
| + self.assertEqual(len(self.db.datagrams), 2) |
| + |
| + |
| +class AnnotationStateTest(unittest.TestCase): |
| + """Tests the stream_logdog._AnnotationState directly.""" |
| + |
| + def setUp(self): |
| + self.env = stream_logdog._Environment( |
| + None, |
| + argv=['command', 'arg0', 'arg1'], |
| + cwd='path/to/cwd', |
| + environ={ |
| + 'foo': 'bar', |
| + 'FOO': 'baz', |
| + }, |
| + ) |
| + self.astate = stream_logdog._AnnotationState.create( |
| + stream_logdog._StreamName('strean/name'), |
| + env=self.env, |
| + properties={'foo': 'bar'}, |
| + ) |
| + |
| + def testFirstCheckReturnsData(self): |
| + # The first check should return data. |
| + self.assertIsNotNone(self.astate.check()) |
| + # The second will, since nothing has changed. |
| + self.assertIsNone(self.astate.check()) |
| + |
| + def testCanCreateAndGetStep(self): |
| + # Root step. |
| + base = self.astate.base |
| + self.astate.create_step(recipe_api._make_step_config(name='first')) |
| + self.assertEqual(len(base.substep), 1) |
| + self.assertEqual(base.substep[0].step.name, 'first') |
| + self.assertIsNotNone(self.astate.check()) |
| + |
| + # Child step. |
| + self.astate.create_step(recipe_api._make_step_config( |
| + name='first child', |
| + step_nest_level=1)) |
| + self.assertEqual(len(base.substep), 1) |
| + self.assertEqual(len(base.substep[0].step.substep), 1) |
| + self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child') |
| + self.assertIsNotNone(self.astate.check()) |
| + |
| + # Sibling step to 'first'. |
| + self.astate.create_step(recipe_api._make_step_config(name='second')) |
| + self.assertEqual(len(base.substep), 2) |
| + self.assertEqual(base.substep[1].step.name, 'second') |
| + self.assertIsNotNone(self.astate.check()) |
| + |
| + def testCanUpdateProperties(self): |
| + self.astate.update_properties(foo='baz', qux='quux') |
| + self.assertEqual(list(self.astate.base.property), [ |
| + pb.Step.Property(name='foo', value='baz'), |
| + pb.Step.Property(name='qux', value='quux'), |
| + ]) |
| + |
| + |
| +class StreamNameTest(unittest.TestCase): |
| + """Tests the stream_logdog._StreamName directly.""" |
| + |
| + def testEmptyStreamNameRaisesValueError(self): |
| + sn = stream_logdog._StreamName(None) |
| + with self.assertRaises(ValueError): |
| + str(sn) |
| + |
| + def testInvalidBaseRaisesValueError(self): |
| + with self.assertRaises(ValueError): |
| + stream_logdog._StreamName('!!! invalid !!!') |
| + |
| + def testAppendComponents(self): |
| + sn = stream_logdog._StreamName('base') |
| + self.assertEqual(str(sn.append()), 'base') |
| + self.assertEqual(str(sn.append('foo')), 'base/foo') |
| + self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar') |
| + self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz') |
| + |
| + def testAugment(self): |
| + sn = stream_logdog._StreamName('base') |
| + self.assertEqual(str(sn.augment('')), 'base') |
| + self.assertEqual(str(sn.augment('foo')), 'basefoo') |
| + self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz') |
| + |
| + def testAppendInvalidStreamNameNormalizes(self): |
| + sn = stream_logdog._StreamName('base') |
| + sn = sn.append('#!!! stream name !!!') |
| + self.assertEqual(str(sn), 'base/s______stream_name____') |
| + |
| + def testAugmentInvalidStreamNameNormalizes(self): |
| + sn = stream_logdog._StreamName('base') |
| + self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____') |
| + |
| + |
| +if __name__ == '__main__': |
| + unittest.main() |