| Index: recipe_engine/unittests/stream_logdog_test.py
|
| diff --git a/recipe_engine/unittests/stream_logdog_test.py b/recipe_engine/unittests/stream_logdog_test.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..228e1bdb8361dab60c365552d959b962ebe9e227
|
| --- /dev/null
|
| +++ b/recipe_engine/unittests/stream_logdog_test.py
|
| @@ -0,0 +1,860 @@
|
| +#!/usr/bin/env python
|
| +# Copyright 2015 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed under the Apache License, Version 2.0
|
| +# that can be found in the LICENSE file.
|
| +
|
| +import collections
|
| +import contextlib
|
| +import datetime
|
| +import json
|
| +import threading
|
| +import time
|
| +import unittest
|
| +import StringIO
|
| +
|
| +import test_env
|
| +
|
| +import libs.logdog.stream
|
| +import libs.logdog.varint
|
| +from google.protobuf import json_format as jsonpb
|
| +from recipe_engine import recipe_api
|
| +from recipe_engine import stream
|
| +from recipe_engine import stream_logdog
|
| +
|
| +
|
| +import annotations_pb2 as pb
|
| +
|
| +
|
| +def _translate_annotation_datagram(dg):
|
| + """Translate annotation datagram binary data into a Python dict modeled after
|
| + the JSONPB projection of the datagram.
|
| +
|
| + This is chosen because it allows for easy idiomatic equality assertions in
|
| + test cases.
|
| +
|
| + Args:
|
| + dg (str): The serialized annotation pb.Step datagram.
|
| + """
|
| + msg = pb.Step()
|
| + msg.ParseFromString(dg)
|
| + return json.loads(jsonpb.MessageToJson(msg))
|
| +
|
| +
|
| +class _TestStreamClient(libs.logdog.stream.StreamClient):
|
| + """A testing StreamClient that retains all data written to it."""
|
| +
|
| + class Stream(object):
|
| + """A file-like object that is explicitly aware of LogDog stream protocol."""
|
| +
|
| + def __init__(self, stream_client):
|
| + self._client = stream_client
|
| + self._buf = StringIO.StringIO()
|
| + self._header = None
|
| + self._final_data = None
|
| + self._data_offset = None
|
| +
|
| + def write(self, data):
|
| + self._buf.write(data)
|
| + self._attempt_registration()
|
| +
|
| + def close(self):
|
| + # If we never parsed our header, register that we are incomplete.
|
| + if self._header is None:
|
| + self._client._register_incomplete(self)
|
| +
|
| + self._final_data = self.data
|
| + self._buf.close()
|
| +
|
| + @contextlib.contextmanager
|
| + def _read_from(self, offset):
|
| + # Seek to the specified offset.
|
| + self._buf.seek(offset, mode=0)
|
| + try:
|
| + yield self._buf
|
| + finally:
|
| + # Seek back to he end of the stream so future writes will append.
|
| + self._buf.seek(0, mode=2)
|
| +
|
| + def _attempt_registration(self):
|
| + # Only need to register once.
|
| + if self._header is not None:
|
| + return
|
| +
|
| + # Can we parse a full LogDog stream header?
|
| + #
|
| + # This means pulling:
|
| + # - The LogDog Butler header.
|
| + # - The header size varint.
|
| + # - The header JSON blob, which needs to be decoded.
|
| + with self._read_from(0) as fd:
|
| + # Read 'result' bytes.
|
| + magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC))
|
| + if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC):
|
| + # Incomplete magic number, cannot complete registration.
|
| + return
|
| + count = len(magic_data)
|
| +
|
| + try:
|
| + size, varint_count = libs.logdog.varint.read_uvarint(fd)
|
| + except ValueError:
|
| + # Incomplete varint, cannot complete registration.
|
| + return
|
| + count += varint_count
|
| +
|
| + header_data = fd.read(size)
|
| + if len(header_data) != size:
|
| + # Incomplete header, cannot complete registration.
|
| + return
|
| + count += size
|
| +
|
| + # Parse the header as JSON.
|
| + self._header = json.loads(header_data)
|
| + self._data_offset = count # (varint + header size)
|
| + self._client._register_stream(self, self._header)
|
| +
|
| + @property
|
| + def data(self):
|
| + # If we have already cached our data (on close), return it directly.
|
| + if self._final_data is not None:
|
| + return self._final_data
|
| +
|
| + # Load our data from our live buffer.
|
| + if self._data_offset is None:
|
| + # No header has been read, so there is no data.
|
| + return None
|
| + with self._read_from(self._data_offset) as fd:
|
| + return fd.read()
|
| +
|
| +
|
| + _StreamEntry = collections.namedtuple('_StreamEntry', (
|
| + 's', 'type', 'content_type'))
|
| +
|
| + _DATAGRAM_CONTENT_TRANSLATE = {
|
| + stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram,
|
| + }
|
| +
|
| +
|
| + def __init__(self):
|
| + super(_TestStreamClient, self).__init__()
|
| + self.streams = {}
|
| + self.incomplete = []
|
| + self.unregistered = {}
|
| +
|
| + @classmethod
|
| + def _create(cls, value):
|
| + raise NotImplementedError('Instances must be created manually.')
|
| +
|
| + def _connect_raw(self):
|
| + s = self.Stream(self)
|
| + self.unregistered[id(s)] = s
|
| + return s
|
| +
|
| + def get(self, name):
|
| + se = self.streams[name]
|
| + data = se.s.data
|
| +
|
| + if se.type == libs.logdog.stream.StreamParams.TEXT:
|
| + # Return text stream data as a list of lines. We use unicode because it
|
| + # fits in with the JSON dump from 'all_streams'.
|
| + return [unicode(l) for l in data.splitlines()]
|
| + elif se.type == libs.logdog.stream.StreamParams.BINARY:
|
| + raise NotImplementedError('No support for fetching binary stream data.')
|
| + elif se.type == libs.logdog.stream.StreamParams.DATAGRAM:
|
| + # Return datagram stream data as a list of datagrams.
|
| + sio = StringIO.StringIO(data)
|
| + datagrams = []
|
| + while sio.tell() < sio.len:
|
| + size, _ = libs.logdog.varint.read_uvarint(sio)
|
| + dg = sio.read(size)
|
| + if len(dg) != size:
|
| + raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size))
|
| +
|
| + # If this datagram is a known type (e.g., protobuf), transform it into
|
| + # JSONPB.
|
| + translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type)
|
| + if translator is not None:
|
| + dg = translator(dg)
|
| + datagrams.append(dg)
|
| +
|
| + sio.close()
|
| + return dg
|
| + else:
|
| + raise ValueError('Unknown stream type [%s]' % (se.type,))
|
| +
|
| + def all_streams(self):
|
| + return dict((name, self.get(name)) for name in self.streams.iterkeys())
|
| +
|
| + @property
|
| + def stream_names(self):
|
| + return set(self.streams.iterkeys())
|
| +
|
| + def _remove_from_unregistered(self, s):
|
| + if id(s) not in self.unregistered:
|
| + raise KeyError('Stream is not known to be unregistered.')
|
| + del(self.unregistered[id(s)])
|
| +
|
| + def _register_stream(self, s, header):
|
| + name = header.get('name')
|
| + if name in self.streams:
|
| + raise KeyError('Duplicate stream [%s]' % (name,))
|
| +
|
| + self._remove_from_unregistered(s)
|
| + self.streams[name] = self._StreamEntry(
|
| + s=s,
|
| + type=header['type'],
|
| + content_type=header.get('contentType'),
|
| + )
|
| +
|
| + def _register_incomplete(self, s):
|
| + self._remove_from_unregistered(s)
|
| + self.incomplete.append(s)
|
| +
|
| +
|
| +class EnvironmentTest(unittest.TestCase):
|
| + """Simple test to assert that _Environment, which is stubbed during our tests,
|
| + actually works."""
|
| +
|
| + def testEnvironmentProbes(self):
|
| + stream_logdog._Environment.probe()
|
| +
|
| +
|
| +class StreamEngineTest(unittest.TestCase):
|
| +
|
| + def setUp(self):
|
| + self.client = _TestStreamClient()
|
| + self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
|
| + self.env = stream_logdog._Environment(
|
| + now_fn=lambda: self.now,
|
| + argv=[],
|
| + environ={},
|
| + cwd=None,
|
| + )
|
| + self.maxDiff = 1024*1024
|
| +
|
| +
|
| + @contextlib.contextmanager
|
| + def _new_stream_engine(self, **kwargs):
|
| + kwargs.setdefault('client', self.client)
|
| + kwargs.setdefault('env', self.env)
|
| +
|
| + # Initialize and open a StreamEngine.
|
| + se = stream_logdog.StreamEngine(**kwargs)
|
| + se.open()
|
| + yield se
|
| +
|
| + # Close the StreamEngine after we're done with it.
|
| + self._advance_time()
|
| + se.close()
|
| +
|
| + @contextlib.contextmanager
|
| + def _step_stream(self, se, **kwargs):
|
| + # Initialize and yield a new step stream.
|
| + self._advance_time()
|
| + step_stream = se.new_step_stream(recipe_api._make_step_config(**kwargs))
|
| + yield step_stream
|
| +
|
| + # Close the step stream when we're done with it.
|
| + self._advance_time()
|
| + step_stream.close()
|
| +
|
| + @contextlib.contextmanager
|
| + def _log_stream(self, step_stream, name):
|
| + # Initialize and yield a new log stream.
|
| + log_stream = step_stream.new_log_stream(name)
|
| + yield log_stream
|
| +
|
| + # Close the log stream when we're done with it.
|
| + log_stream.close()
|
| +
|
| + def _advance_time(self):
|
| + self.now += datetime.timedelta(seconds=1)
|
| +
|
| + def testEmptyStreamEngine(self):
|
| + self.env.argv = ['fake_program', 'arg0', 'arg1']
|
| + self.env.environ['foo'] = 'bar'
|
| + self.env.cwd = 'CWD'
|
| +
|
| + with self._new_stream_engine() as se:
|
| + pass
|
| +
|
| + self.assertEqual(self.client.all_streams(), {
|
| + u'annotations': {
|
| + u'name': u'steps',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:03Z',
|
| + u'ended': u'2106-06-12T01:02:04Z',
|
| + u'command': {
|
| + u'commandLine': [u'fake_program', u'arg0', u'arg1'],
|
| + u'cwd': u'CWD',
|
| + u'environ': {u'foo': u'bar'},
|
| + },
|
| + },
|
| + })
|
| +
|
| + def testBasicStream(self):
|
| + self.env.argv = ['fake_program', 'arg0', 'arg1']
|
| + self.env.environ['foo'] = 'bar'
|
| + self.env.cwd = 'CWD'
|
| +
|
| + with self._new_stream_engine(name_base='test/base') as se:
|
| + with self._step_stream(se,
|
| + name='first step',
|
| + cmd=['first', 'step'],
|
| + cwd='FIRST_CWD') as step:
|
| + step.add_step_text('Sup')
|
| + step.add_step_text('Dawg?')
|
| + step.write_line('STDOUT for first step.')
|
| + step.write_line('(Another line)')
|
| + step.add_step_summary_text('Everything is great.')
|
| + step.add_step_link('example 1', 'http://example.com/1')
|
| + step.add_step_link('example 2', 'http://example.com/2')
|
| + step.set_step_status('SUCCESS')
|
| +
|
| + with self._step_stream(se, name='second step') as step:
|
| + step.set_step_status('SUCCESS')
|
| + step.write_split('multiple\nlines\nof\ntext')
|
| +
|
| + # Create two log streams with the same name to test indexing.
|
| + #
|
| + # Note that "log stream" is an invalid LogDog stream name, so this
|
| + # will also test normalization.
|
| + with self._log_stream(step, 'log stream') as ls:
|
| + ls.write_split('foo\nbar\nbaz\n')
|
| + with self._log_stream(step, 'log stream') as ls:
|
| + ls.write_split('qux\nquux\n')
|
| +
|
| + # This is a different stream name, but will normalize to the same log
|
| + # stream name as 'second/step', so this will test that we disambiguate
|
| + # the log stream names.
|
| + with self._step_stream(se, name='second/step') as step:
|
| + pass
|
| +
|
| + self.assertEqual(self.client.all_streams(), {
|
| + u'test/base/annotations': {
|
| + u'name': u'steps',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:03Z',
|
| + u'ended': u'2106-06-12T01:02:10Z',
|
| + u'command': {
|
| + u'commandLine': [u'fake_program', u'arg0', u'arg1'],
|
| + u'cwd': u'CWD',
|
| + u'environ': {u'foo': u'bar'},
|
| + },
|
| + u'substep': [
|
| +
|
| + {u'step': {
|
| + u'name': u'first step',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:04Z',
|
| + u'ended': u'2106-06-12T01:02:05Z',
|
| + u'command': {
|
| + u'commandLine': [u'first', u'step'],
|
| + u'cwd': u'FIRST_CWD',
|
| + },
|
| + u'stdoutStream': {
|
| + u'name': u'test/base/steps/first_step/stdout',
|
| + },
|
| + u'text': [u'Everything is great.', u'Sup', u'Dawg?'],
|
| + u'otherLinks': [
|
| + {
|
| + u'label': u'example 1',
|
| + u'url': u'http://example.com/1',
|
| + },
|
| + {
|
| + u'label': u'example 2',
|
| + u'url': u'http://example.com/2',
|
| + },
|
| + ],
|
| + }},
|
| +
|
| + {u'step': {
|
| + u'name': u'second step',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:06Z',
|
| + u'ended': u'2106-06-12T01:02:07Z',
|
| + u'stdoutStream': {
|
| + u'name': u'test/base/steps/second_step/stdout',
|
| + },
|
| + u'otherLinks': [
|
| + {
|
| + u'label': u'log stream',
|
| + u'logdogStream': {
|
| + u'name': u'test/base/steps/second_step/logs/log_stream/0',
|
| + },
|
| + },
|
| + {
|
| + u'label': u'log stream',
|
| + u'logdogStream': {
|
| + u'name': u'test/base/steps/second_step/logs/log_stream/1',
|
| + },
|
| + },
|
| + ],
|
| + }},
|
| +
|
| + {u'step': {
|
| + u'name': u'second/step',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:08Z',
|
| + u'ended': u'2106-06-12T01:02:09Z',
|
| + }},
|
| + ],
|
| + },
|
| +
|
| + u'test/base/steps/first_step/stdout': [
|
| + u'STDOUT for first step.',
|
| + u'(Another line)',
|
| + ],
|
| +
|
| + u'test/base/steps/second_step/stdout': [
|
| + u'multiple',
|
| + u'lines',
|
| + u'of',
|
| + u'text',
|
| + ],
|
| +
|
| + u'test/base/steps/second_step/logs/log_stream/0': [
|
| + u'foo',
|
| + u'bar',
|
| + u'baz',
|
| + ],
|
| +
|
| + u'test/base/steps/second_step/logs/log_stream/1': [
|
| + u'qux',
|
| + u'quux',
|
| + ],
|
| + })
|
| +
|
| + def testWarningBasicStream(self):
|
| + with self._new_stream_engine() as se:
|
| + with self._step_stream(se, name='isuck') as step:
|
| + step.add_step_summary_text('Something went wrong.')
|
| + step.set_step_status('WARNING')
|
| +
|
| + self.assertEqual(self.client.all_streams(), {
|
| + u'annotations': {
|
| + u'name': u'steps',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:03Z',
|
| + u'ended': u'2106-06-12T01:02:06Z',
|
| + u'substep': [
|
| +
|
| + {u'step': {
|
| + u'name': u'isuck',
|
| + u'status': u'SUCCESS',
|
| + u'failureDetails': {
|
| + u'text': u'Something went wrong.',
|
| + },
|
| + u'started': u'2106-06-12T01:02:04Z',
|
| + u'ended': u'2106-06-12T01:02:05Z',
|
| + u'text': [u'Something went wrong.'],
|
| + }},
|
| + ],
|
| + },
|
| + })
|
| +
|
| + def testFailedBasicStream(self):
|
| + with self._new_stream_engine() as se:
|
| + with self._step_stream(se, name='isuck') as step:
|
| + step.add_step_summary_text('Oops I failed.')
|
| + step.set_step_status('FAILURE')
|
| +
|
| + with self._step_stream(se, name='irock') as step:
|
| + pass
|
| +
|
| + self.assertEqual(self.client.all_streams(), {
|
| + u'annotations': {
|
| + u'name': u'steps',
|
| + u'status': u'FAILURE',
|
| + u'started': u'2106-06-12T01:02:03Z',
|
| + u'ended': u'2106-06-12T01:02:08Z',
|
| + u'substep': [
|
| +
|
| + {u'step': {
|
| + u'name': u'isuck',
|
| + u'status': u'FAILURE',
|
| + u'failureDetails': {
|
| + u'text': u'Oops I failed.',
|
| + },
|
| + u'started': u'2106-06-12T01:02:04Z',
|
| + u'ended': u'2106-06-12T01:02:05Z',
|
| + u'text': [u'Oops I failed.'],
|
| + }},
|
| +
|
| + {u'step': {
|
| + u'name': u'irock',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:06Z',
|
| + u'ended': u'2106-06-12T01:02:07Z',
|
| + }},
|
| + ],
|
| + },
|
| + })
|
| +
|
| + def testNestedStream(self):
|
| + with self._new_stream_engine() as se:
|
| + # parent
|
| + with self._step_stream(se, name='parent') as step:
|
| + step.write_line('I am the parent.')
|
| +
|
| + # parent."child 1"
|
| + with self._step_stream(se,
|
| + name='child 1',
|
| + step_nest_level=1) as step:
|
| + step.write_line('I am child #1.')
|
| +
|
| + # parent."child 1"."grandchild"
|
| + with self._step_stream(se,
|
| + name='grandchild',
|
| + step_nest_level=2) as step:
|
| + step.write_line("I am child #1's child.")
|
| +
|
| + # parent."child 2". Mark this child as failed. This should not propagate
|
| + # to the parent, since it has an explicit status.
|
| + with self._step_stream(se,
|
| + name='child 2',
|
| + step_nest_level=1) as step:
|
| + step.write_line('I am child #2.')
|
| +
|
| + # parent."child 2". Mark this child as failed. This should not propagate
|
| + # to the parent, since it has an explicit status.
|
| + with self._step_stream(se, name='friend') as step:
|
| + step.write_line("I am the parent's friend.")
|
| +
|
| + self.assertEqual(self.client.all_streams(), {
|
| + u'annotations': {
|
| + u'name': u'steps',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:03Z',
|
| + u'ended': u'2106-06-12T01:02:14Z',
|
| + u'substep': [
|
| +
|
| + {u'step': {
|
| + u'name': u'parent',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:04Z',
|
| + u'ended': u'2106-06-12T01:02:05Z',
|
| + u'stdoutStream': {
|
| + u'name': u'steps/parent/stdout',
|
| + },
|
| + u'substep': [
|
| +
|
| + {u'step': {
|
| + u'name': u'child 1',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:06Z',
|
| + u'ended': u'2106-06-12T01:02:07Z',
|
| + u'stdoutStream': {
|
| + u'name': u'steps/parent/steps/child_1/stdout',
|
| + },
|
| + u'substep': [
|
| +
|
| + {u'step': {
|
| + u'name': u'grandchild',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:08Z',
|
| + u'ended': u'2106-06-12T01:02:09Z',
|
| + u'stdoutStream': {
|
| + u'name': u'steps/parent/steps/child_1/'
|
| + 'steps/grandchild/stdout',
|
| + },
|
| + }},
|
| + ],
|
| + }},
|
| +
|
| + {u'step': {
|
| + u'name': u'child 2',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:10Z',
|
| + u'ended': u'2106-06-12T01:02:11Z',
|
| + u'stdoutStream': {
|
| + u'name': u'steps/parent/steps/child_2/stdout',
|
| + },
|
| + }},
|
| + ],
|
| + }},
|
| +
|
| + {u'step': {
|
| + u'name': u'friend',
|
| + u'status': u'SUCCESS',
|
| + u'started': u'2106-06-12T01:02:12Z',
|
| + u'ended': u'2106-06-12T01:02:13Z',
|
| + u'stdoutStream': {
|
| + u'name': u'steps/friend/stdout',
|
| + },
|
| + }},
|
| + ],
|
| + },
|
| +
|
| + u'steps/parent/stdout': [u'I am the parent.'],
|
| + u'steps/parent/steps/child_1/stdout': [u'I am child #1.'],
|
| + u'steps/parent/steps/child_1/steps/grandchild/stdout': [
|
| + u"I am child #1's child."],
|
| + u'steps/parent/steps/child_2/stdout': [u'I am child #2.'],
|
| + u'steps/friend/stdout': [u"I am the parent's friend."],
|
| + })
|
| +
|
| + def testTriggersRaiseException(self):
|
| + with self._new_stream_engine() as se:
|
| + with self._step_stream(se, name='trigger') as step:
|
| + with self.assertRaises(NotImplementedError):
|
| + step.trigger('trigger spec')
|
| +
|
| + def testTriggersIgnored(self):
|
| + with self._new_stream_engine(ignore_triggers=True) as se:
|
| + with self._step_stream(se, name='trigger') as step:
|
| + step.trigger('trigger spec')
|
| +
|
| + def testNoSubannotations(self):
|
| + with self._new_stream_engine(ignore_triggers=True) as se:
|
| + with self.assertRaises(NotImplementedError):
|
| + se.new_step_stream(recipe_api._make_step_config(
|
| + name='uses subannotations',
|
| + allow_subannotations=True,
|
| + ))
|
| +
|
| + def testInvalidStepStatusRaisesValueError(self):
|
| + with self._new_stream_engine() as se:
|
| + with self._step_stream(se, name='trigger') as step:
|
| + with self.assertRaises(ValueError):
|
| + step.set_step_status('OHAI')
|
| +
|
| +
|
| +class AnnotationMonitorTest(unittest.TestCase):
|
| + """Tests the stream_logdog._AnnotationMonitor directly."""
|
| +
|
| + # A small timedelta, sufficient to block but fast enough to not make the
|
| + # test slow.
|
| + _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5)
|
| +
|
| + class _DatagramBuffer(object):
|
| +
|
| + def __init__(self):
|
| + self.datagrams = []
|
| + self.data_event = threading.Event()
|
| +
|
| + def send(self, dg):
|
| + self.datagrams.append(dg)
|
| + self.data_event.set()
|
| +
|
| + def __len__(self):
|
| + return len(self.datagrams)
|
| +
|
| + @property
|
| + def latest(self):
|
| + if self.datagrams:
|
| + return self.datagrams[-1]
|
| + return None
|
| +
|
| + def wait_for_data(self):
|
| + self.data_event.wait()
|
| + self.data_event.clear()
|
| + return self.latest
|
| +
|
| +
|
| + @contextlib.contextmanager
|
| + def _annotation_monitor(self, **kwargs):
|
| + # Default to a really high flush period. This should never naturally trigger
|
| + # during a test case.
|
| + kwargs.setdefault('flush_period', datetime.timedelta(hours=1))
|
| +
|
| + am = stream_logdog._AnnotationMonitor(self.db, **kwargs)
|
| + try:
|
| + yield am
|
| + finally:
|
| + am.flush_and_join()
|
| +
|
| + with am._lock:
|
| + # Assert that our timer has been shut down.
|
| + self.assertIsNone(am._flush_timer)
|
| + # Assert that there is no buffered data.
|
| + self.assertIsNone(am._current_data)
|
| +
|
| + def setUp(self):
|
| + self.db = self._DatagramBuffer()
|
| +
|
| + def testMonitorStartsAndJoinsWithNoData(self):
|
| + with self._annotation_monitor() as am:
|
| + pass
|
| +
|
| + # No datagrams should have been sent.
|
| + self.assertIsNone(self.db.latest)
|
| + self.assertEqual(len(self.db.datagrams), 0)
|
| +
|
| + def testMonitorBuffersAndSendsData(self):
|
| + with self._annotation_monitor() as am:
|
| + # The first piece of data should have been immediately sent.
|
| + am.signal_update('initial')
|
| + self.assertEqual(self.db.wait_for_data(), 'initial')
|
| +
|
| + # Subsequent iterations should not send data, but should start the flush
|
| + # timer and buffer the latest data.
|
| + with am._lock:
|
| + self.assertIsNone(am._flush_timer)
|
| + for i in xrange(10):
|
| + am.signal_update('test%d' % (i,))
|
| + time.sleep(self._SMALL_TIME_DELTA.total_seconds())
|
| + with am._lock:
|
| + self.assertEqual(am._current_data, 'test9')
|
| + self.assertIsNotNone(am._flush_timer)
|
| +
|
| + # Pretend the timer triggered. We should receive the latest buffered data.
|
| + am._flush_timer_expired()
|
| + self.assertEqual(self.db.wait_for_data(), 'test9')
|
| + with am._lock:
|
| + # No more timer or buffered data.
|
| + self.assertIsNone(am._flush_timer)
|
| + self.assertIsNone(am._current_data)
|
| +
|
| + # Send one last chunk of data, but don't let the timer expire. This will
|
| + # be sent on final flush.
|
| + am.signal_update('final')
|
| + with am._lock:
|
| + self.assertIsNotNone(am._flush_timer)
|
| +
|
| + # Assert that the final chunk of data was sent.
|
| + self.assertEqual(self.db.latest, 'final')
|
| +
|
| + # Only three datagrams should have been sent.
|
| + self.assertEqual(len(self.db.datagrams), 3)
|
| +
|
| + def testMonitorIgnoresDuplicateData(self):
|
| + with self._annotation_monitor() as am:
|
| + # Get initial data out of the way.
|
| + am.signal_update('initial')
|
| + self.assertEqual(self.db.wait_for_data(), 'initial')
|
| +
|
| + # Send the same thing. It should not be buffered.
|
| + am.signal_update('initial')
|
| + with am._lock:
|
| + self.assertIsNone(am._flush_timer)
|
| + self.assertIsNone(am._current_data)
|
| +
|
| + # Only one datagrams should have been sent.
|
| + self.assertEqual(len(self.db.datagrams), 1)
|
| +
|
| + def testStructuralUpdateSendsImmediately(self):
|
| + with self._annotation_monitor() as am:
|
| + # Get initial data out of the way.
|
| + am.signal_update('initial')
|
| + self.assertEqual(self.db.wait_for_data(), 'initial')
|
| +
|
| + # Send a structural update. It should send immediately.
|
| + am.signal_update('test', structural=True)
|
| + self.assertEqual(self.db.wait_for_data(), 'test')
|
| +
|
| + # Send a duplicate structural update. It should be ignored.
|
| + am.signal_update('test', structural=True)
|
| + with am._lock:
|
| + self.assertIsNone(am._flush_timer)
|
| + self.assertIsNone(am._current_data)
|
| +
|
| + # Only two datagrams should have been sent.
|
| + self.assertEqual(len(self.db.datagrams), 2)
|
| +
|
| + def testFlushesPeriodically(self):
|
| + with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am:
|
| + # Get initial data out of the way.
|
| + am.signal_update('initial')
|
| + self.assertEqual(self.db.wait_for_data(), 'initial')
|
| +
|
| + # Send a structural update. It should send immediately.
|
| + am.signal_update('test')
|
| + self.assertEqual(self.db.wait_for_data(), 'test')
|
| +
|
| + # Only two datagrams should have been sent.
|
| + self.assertEqual(len(self.db.datagrams), 2)
|
| +
|
| +
|
| +class AnnotationStateTest(unittest.TestCase):
|
| + """Tests the stream_logdog._AnnotationState directly."""
|
| +
|
| + def setUp(self):
|
| + self.env = stream_logdog._Environment(
|
| + None,
|
| + argv=['command', 'arg0', 'arg1'],
|
| + cwd='path/to/cwd',
|
| + environ={
|
| + 'foo': 'bar',
|
| + 'FOO': 'baz',
|
| + },
|
| + )
|
| + self.astate = stream_logdog._AnnotationState.create(
|
| + stream_logdog._StreamName('strean/name'),
|
| + env=self.env,
|
| + properties={'foo': 'bar'},
|
| + )
|
| +
|
| + def testFirstCheckReturnsData(self):
|
| + # The first check should return data.
|
| + self.assertIsNotNone(self.astate.check())
|
| + # The second will, since nothing has changed.
|
| + self.assertIsNone(self.astate.check())
|
| +
|
| + def testCanCreateAndGetStep(self):
|
| + # Root step.
|
| + base = self.astate.base
|
| + self.astate.create_step(recipe_api._make_step_config(name='first'))
|
| + self.assertEqual(len(base.substep), 1)
|
| + self.assertEqual(base.substep[0].step.name, 'first')
|
| + self.assertIsNotNone(self.astate.check())
|
| +
|
| + # Child step.
|
| + self.astate.create_step(recipe_api._make_step_config(
|
| + name='first child',
|
| + step_nest_level=1))
|
| + self.assertEqual(len(base.substep), 1)
|
| + self.assertEqual(len(base.substep[0].step.substep), 1)
|
| + self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child')
|
| + self.assertIsNotNone(self.astate.check())
|
| +
|
| + # Sibling step to 'first'.
|
| + self.astate.create_step(recipe_api._make_step_config(name='second'))
|
| + self.assertEqual(len(base.substep), 2)
|
| + self.assertEqual(base.substep[1].step.name, 'second')
|
| + self.assertIsNotNone(self.astate.check())
|
| +
|
| + def testCanUpdateProperties(self):
|
| + self.astate.update_properties(foo='baz', qux='quux')
|
| + self.assertEqual(list(self.astate.base.property), [
|
| + pb.Step.Property(name='foo', value='baz'),
|
| + pb.Step.Property(name='qux', value='quux'),
|
| + ])
|
| +
|
| +
|
| +class StreamNameTest(unittest.TestCase):
|
| + """Tests the stream_logdog._StreamName directly."""
|
| +
|
| + def testEmptyStreamNameRaisesValueError(self):
|
| + sn = stream_logdog._StreamName(None)
|
| + with self.assertRaises(ValueError):
|
| + str(sn)
|
| +
|
| + def testInvalidBaseRaisesValueError(self):
|
| + with self.assertRaises(ValueError):
|
| + stream_logdog._StreamName('!!! invalid !!!')
|
| +
|
| + def testAppendComponents(self):
|
| + sn = stream_logdog._StreamName('base')
|
| + self.assertEqual(str(sn.append()), 'base')
|
| + self.assertEqual(str(sn.append('foo')), 'base/foo')
|
| + self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar')
|
| + self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz')
|
| +
|
| + def testAugment(self):
|
| + sn = stream_logdog._StreamName('base')
|
| + self.assertEqual(str(sn.augment('')), 'base')
|
| + self.assertEqual(str(sn.augment('foo')), 'basefoo')
|
| + self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz')
|
| +
|
| + def testAppendInvalidStreamNameNormalizes(self):
|
| + sn = stream_logdog._StreamName('base')
|
| + sn = sn.append('#!!! stream name !!!')
|
| + self.assertEqual(str(sn), 'base/s______stream_name____')
|
| +
|
| + def testAugmentInvalidStreamNameNormalizes(self):
|
| + sn = stream_logdog._StreamName('base')
|
| + self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____')
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|