Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1964)

Unified Diff: recipe_engine/unittests/stream_logdog_test.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: pylint, fix comments Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: recipe_engine/unittests/stream_logdog_test.py
diff --git a/recipe_engine/unittests/stream_logdog_test.py b/recipe_engine/unittests/stream_logdog_test.py
new file mode 100755
index 0000000000000000000000000000000000000000..af8cb39f8ba1c70da87077bb56ac79b17d5db39f
--- /dev/null
+++ b/recipe_engine/unittests/stream_logdog_test.py
@@ -0,0 +1,860 @@
+#!/usr/bin/env python
+# Copyright 2015 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+import collections
+import contextlib
+import datetime
+import json
+import threading
+import time
+import unittest
+import StringIO
+
+import test_env
+
+import libs.logdog.stream
+import libs.logdog.varint
+from google.protobuf import json_format as jsonpb
+from recipe_engine import recipe_api
+from recipe_engine import stream
+from recipe_engine import stream_logdog
+
+
+import annotations_pb2 as pb
+
+
+def _translate_annotation_datagram(dg):
+ """Translate annotation datagram binary data into a Python dict modeled after
+ the JSONPB projection of the datagram.
+
+ This is chosen because it allows for easy idiomatic equality assertions in
+ test cases.
+
+ Args:
+ dg (str): The serialized annotation pb.Step datagram.
+ """
+ msg = pb.Step()
+ msg.ParseFromString(dg)
+ return json.loads(jsonpb.MessageToJson(msg))
+
+
+class _TestStreamClient(libs.logdog.stream.StreamClient):
+ """A testing StreamClient that retains all data written to it."""
+
+ class Stream(object):
+ """A file-like object that is explicitly aware of LogDog stream protocol."""
+
+ def __init__(self, stream_client):
+ self._client = stream_client
+ self._buf = StringIO.StringIO()
+ self._header = None
+ self._final_data = None
+ self._data_offset = None
+
+ def write(self, data):
+ self._buf.write(data)
+ self._attempt_registration()
+
+ def close(self):
+ # If we never parsed our header, register that we are incomplete.
+ if self._header is None:
+ self._client._register_incomplete(self)
+
+ self._final_data = self.data
+ self._buf.close()
+
+ @contextlib.contextmanager
+ def _read_from(self, offset):
+ # Seek to the specified offset.
+ self._buf.seek(offset, mode=0)
+ try:
+ yield self._buf
+ finally:
+ # Seek back to he end of the stream so future writes will append.
+ self._buf.seek(0, mode=2)
+
+ def _attempt_registration(self):
+ # Only need to register once.
+ if self._header is not None:
+ return
+
+ # Can we parse a full LogDog stream header?
+ #
+ # This means pulling:
+ # - The LogDog Butler header.
+ # - The header size varint.
+ # - The header JSON blob, which needs to be decoded.
+ with self._read_from(0) as fd:
+ # Read 'result' bytes.
+ magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC))
+ if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC):
+ # Incomplete magic number, cannot complete registration.
+ return
+ count = len(magic_data)
+
+ try:
+ size, varint_count = libs.logdog.varint.read_uvarint(fd)
+ except ValueError:
+ # Incomplete varint, cannot complete registration.
+ return
+ count += varint_count
+
+ header_data = fd.read(size)
+ if len(header_data) != size:
+ # Incomplete header, cannot complete registration.
+ return
+ count += size
+
+ # Parse the header as JSON.
+ self._header = json.loads(header_data)
+ self._data_offset = count # (varint + header size)
+ self._client._register_stream(self, self._header)
+
+ @property
+ def data(self):
+ # If we have already cached our data (on close), return it directly.
+ if self._final_data is not None:
+ return self._final_data
+
+ # Load our data from our live buffer.
+ if self._data_offset is None:
+ # No header has been read, so there is no data.
+ return None
+ with self._read_from(self._data_offset) as fd:
+ return fd.read()
+
+
+ _StreamEntry = collections.namedtuple('_StreamEntry', (
+ 's', 'type', 'content_type'))
+
+ _DATAGRAM_CONTENT_TRANSLATE = {
+ stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram,
+ }
+
+
+ def __init__(self):
+ super(_TestStreamClient, self).__init__()
+ self.streams = {}
+ self.incomplete = []
+ self.unregistered = {}
+
+ @classmethod
+ def _create(cls, value):
+ raise NotImplementedError('Instances must be created manually.')
+
+ def _connect_raw(self):
+ s = self.Stream(self)
+ self.unregistered[id(s)] = s
+ return s
+
+ def get(self, name):
+ se = self.streams[name]
+ data = se.s.data
+
+ if se.type == libs.logdog.stream.StreamParams.TEXT:
+ # Return text stream data as a list of lines. We use unicode because it
+ # fits in with the JSON dump from 'all_streams'.
+ return [unicode(l) for l in data.splitlines()]
+ elif se.type == libs.logdog.stream.StreamParams.BINARY:
+ raise NotImplementedError('No support for fetching binary stream data.')
+ elif se.type == libs.logdog.stream.StreamParams.DATAGRAM:
+ # Return datagram stream data as a list of datagrams.
+ sio = StringIO.StringIO(data)
+ datagrams = []
+ while sio.tell() < sio.len:
+ size, _ = libs.logdog.varint.read_uvarint(sio)
+ dg = sio.read(size)
+ if len(dg) != size:
+ raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size))
+
+ # If this datagram is a known type (e.g., protobuf), transform it into
+ # JSONPB.
+ translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type)
+ if translator is not None:
+ dg = translator(dg)
+ datagrams.append(dg)
+
+ sio.close()
+ return dg
+ else:
+ raise ValueError('Unknown stream type [%s]' % (se.type,))
+
+ def all_streams(self):
+ return dict((name, self.get(name)) for name in self.streams.iterkeys())
+
+ @property
+ def stream_names(self):
+ return set(self.streams.iterkeys())
+
+ def _remove_from_unregistered(self, s):
+ if id(s) not in self.unregistered:
+ raise KeyError('Stream is not known to be unregistered.')
+ del(self.unregistered[id(s)])
+
+ def _register_stream(self, s, header):
+ name = header.get('name')
+ if name in self.streams:
+ raise KeyError('Duplicate stream [%s]' % (name,))
+
+ self._remove_from_unregistered(s)
+ self.streams[name] = self._StreamEntry(
+ s=s,
+ type=header['type'],
+ content_type=header.get('contentType'),
+ )
+
+ def _register_incomplete(self, s):
+ self._remove_from_unregistered(s)
+ self.incomplete.append(s)
+
+
+class EnvironmentTest(unittest.TestCase):
+ """Simple test to assert that _Environment, which is stubbed during our tests,
+ actually works."""
+
+ def testEnvironmentProbes(self):
+ stream_logdog._Environment.probe()
+
+
+class StreamEngineTest(unittest.TestCase):
+
+ def setUp(self):
+ self.client = _TestStreamClient()
+ self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
+ self.env = stream_logdog._Environment(
+ now_fn=lambda: self.now,
+ argv=[],
+ environ={},
+ cwd=None,
+ )
+ self.maxDiff = 1024*1024
+
+
+ @contextlib.contextmanager
+ def _new_stream_engine(self, **kwargs):
+ kwargs.setdefault('client', self.client)
+ kwargs.setdefault('environment', self.env)
+
+ # Initialize and open a StreamEngine.
+ se = stream_logdog.StreamEngine(**kwargs)
+ se.open()
+ yield se
+
+ # Close the StreamEngine after we're done with it.
+ self._advance_time()
+ se.close()
+
+ @contextlib.contextmanager
+ def _step_stream(self, se, **kwargs):
+ # Initialize and yield a new step stream.
+ self._advance_time()
+ step_stream = se.new_step_stream(recipe_api.StepConfig.create(**kwargs))
+ yield step_stream
+
+ # Close the step stream when we're done with it.
+ self._advance_time()
+ step_stream.close()
+
+ @contextlib.contextmanager
+ def _log_stream(self, step_stream, name):
+ # Initialize and yield a new log stream.
+ log_stream = step_stream.new_log_stream(name)
+ yield log_stream
+
+ # Close the log stream when we're done with it.
+ log_stream.close()
+
+ def _advance_time(self):
+ self.now += datetime.timedelta(seconds=1)
+
+ def testEmptyStreamEngine(self):
+ self.env.argv = ['fake_program', 'arg0', 'arg1']
+ self.env.environ['foo'] = 'bar'
+ self.env.cwd = 'CWD'
+
+ with self._new_stream_engine() as se:
+ pass
+
+ self.assertEqual(self.client.all_streams(), {
+ u'annotations': {
+ u'name': u'steps',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:03Z',
+ u'ended': u'2106-06-12T01:02:04Z',
+ u'command': {
+ u'commandLine': [u'fake_program', u'arg0', u'arg1'],
+ u'cwd': u'CWD',
+ u'environ': {u'foo': u'bar'},
+ },
+ },
+ })
+
+ def testBasicStream(self):
+ self.env.argv = ['fake_program', 'arg0', 'arg1']
+ self.env.environ['foo'] = 'bar'
+ self.env.cwd = 'CWD'
+
+ with self._new_stream_engine(name_base='test/base') as se:
+ with self._step_stream(se,
+ name='first step',
+ cmd=['first', 'step'],
+ cwd='FIRST_CWD') as step:
+ step.add_step_text('Sup')
+ step.add_step_text('Dawg?')
+ step.write_line('STDOUT for first step.')
+ step.write_line('(Another line)')
+ step.add_step_summary_text('Everything is great.')
+ step.add_step_link('example 1', 'http://example.com/1')
+ step.add_step_link('example 2', 'http://example.com/2')
+ step.set_step_status('SUCCESS')
+
+ with self._step_stream(se, name='second step') as step:
+ step.set_step_status('SUCCESS')
+ step.write_split('multiple\nlines\nof\ntext')
+
+ # Create two log streams with the same name to test indexing.
+ #
+ # Note that "log stream" is an invalid LogDog stream name, so this
+ # will also test normalization.
+ with self._log_stream(step, 'log stream') as ls:
+ ls.write_split('foo\nbar\nbaz\n')
+ with self._log_stream(step, 'log stream') as ls:
+ ls.write_split('qux\nquux\n')
+
+ # This is a different stream name, but will normalize to the same log
+ # stream name as 'second/step', so this will test that we disambiguate
+ # the log stream names.
+ with self._step_stream(se, name='second/step') as step:
+ pass
+
+ self.assertEqual(self.client.all_streams(), {
+ u'test/base/annotations': {
+ u'name': u'steps',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:03Z',
+ u'ended': u'2106-06-12T01:02:10Z',
+ u'command': {
+ u'commandLine': [u'fake_program', u'arg0', u'arg1'],
+ u'cwd': u'CWD',
+ u'environ': {u'foo': u'bar'},
+ },
+ u'substep': [
+
+ {u'step': {
+ u'name': u'first step',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:04Z',
+ u'ended': u'2106-06-12T01:02:05Z',
+ u'command': {
+ u'commandLine': [u'first', u'step'],
+ u'cwd': u'FIRST_CWD',
+ },
+ u'stdoutStream': {
+ u'name': u'test/base/steps/first_step/stdout',
+ },
+ u'text': [u'Everything is great.', u'Sup', u'Dawg?'],
+ u'otherLinks': [
+ {
+ u'label': u'example 1',
+ u'url': u'http://example.com/1',
+ },
+ {
+ u'label': u'example 2',
+ u'url': u'http://example.com/2',
+ },
+ ],
+ }},
+
+ {u'step': {
+ u'name': u'second step',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:06Z',
+ u'ended': u'2106-06-12T01:02:07Z',
+ u'stdoutStream': {
+ u'name': u'test/base/steps/second_step/stdout',
+ },
+ u'otherLinks': [
+ {
+ u'label': u'log stream',
+ u'logdogStream': {
+ u'name': u'test/base/steps/second_step/logs/log_stream/0',
+ },
+ },
+ {
+ u'label': u'log stream',
+ u'logdogStream': {
+ u'name': u'test/base/steps/second_step/logs/log_stream/1',
+ },
+ },
+ ],
+ }},
+
+ {u'step': {
+ u'name': u'second/step',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:08Z',
+ u'ended': u'2106-06-12T01:02:09Z',
+ }},
+ ],
+ },
+
+ u'test/base/steps/first_step/stdout': [
+ u'STDOUT for first step.',
+ u'(Another line)',
+ ],
+
+ u'test/base/steps/second_step/stdout': [
+ u'multiple',
+ u'lines',
+ u'of',
+ u'text',
+ ],
+
+ u'test/base/steps/second_step/logs/log_stream/0': [
+ u'foo',
+ u'bar',
+ u'baz',
+ ],
+
+ u'test/base/steps/second_step/logs/log_stream/1': [
+ u'qux',
+ u'quux',
+ ],
+ })
+
+ def testWarningBasicStream(self):
+ with self._new_stream_engine() as se:
+ with self._step_stream(se, name='isuck') as step:
+ step.add_step_summary_text('Something went wrong.')
+ step.set_step_status('WARNING')
+
+ self.assertEqual(self.client.all_streams(), {
+ u'annotations': {
+ u'name': u'steps',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:03Z',
+ u'ended': u'2106-06-12T01:02:06Z',
+ u'substep': [
+
+ {u'step': {
+ u'name': u'isuck',
+ u'status': u'SUCCESS',
+ u'failureDetails': {
+ u'text': u'Something went wrong.',
+ },
+ u'started': u'2106-06-12T01:02:04Z',
+ u'ended': u'2106-06-12T01:02:05Z',
+ u'text': [u'Something went wrong.'],
+ }},
+ ],
+ },
+ })
+
+ def testFailedBasicStream(self):
+ with self._new_stream_engine() as se:
+ with self._step_stream(se, name='isuck') as step:
+ step.add_step_summary_text('Oops I failed.')
+ step.set_step_status('FAILURE')
+
+ with self._step_stream(se, name='irock') as step:
+ pass
+
+ self.assertEqual(self.client.all_streams(), {
+ u'annotations': {
+ u'name': u'steps',
+ u'status': u'FAILURE',
+ u'started': u'2106-06-12T01:02:03Z',
+ u'ended': u'2106-06-12T01:02:08Z',
+ u'substep': [
+
+ {u'step': {
+ u'name': u'isuck',
+ u'status': u'FAILURE',
+ u'failureDetails': {
+ u'text': u'Oops I failed.',
+ },
+ u'started': u'2106-06-12T01:02:04Z',
+ u'ended': u'2106-06-12T01:02:05Z',
+ u'text': [u'Oops I failed.'],
+ }},
+
+ {u'step': {
+ u'name': u'irock',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:06Z',
+ u'ended': u'2106-06-12T01:02:07Z',
+ }},
+ ],
+ },
+ })
+
+ def testNestedStream(self):
+ with self._new_stream_engine() as se:
+ # parent
+ with self._step_stream(se, name='parent') as step:
+ step.write_line('I am the parent.')
+
+ # parent."child 1"
+ with self._step_stream(se,
+ name='child 1',
+ step_nest_level=1) as step:
+ step.write_line('I am child #1.')
+
+ # parent."child 1"."grandchild"
+ with self._step_stream(se,
+ name='grandchild',
+ step_nest_level=2) as step:
+ step.write_line("I am child #1's child.")
+
+ # parent."child 2". Mark this child as failed. This should not propagate
+ # to the parent, since it has an explicit status.
+ with self._step_stream(se,
+ name='child 2',
+ step_nest_level=1) as step:
+ step.write_line('I am child #2.')
+
+ # parent."child 2". Mark this child as failed. This should not propagate
+ # to the parent, since it has an explicit status.
+ with self._step_stream(se, name='friend') as step:
+ step.write_line("I am the parent's friend.")
+
+ self.assertEqual(self.client.all_streams(), {
+ u'annotations': {
+ u'name': u'steps',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:03Z',
+ u'ended': u'2106-06-12T01:02:14Z',
+ u'substep': [
+
+ {u'step': {
+ u'name': u'parent',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:04Z',
+ u'ended': u'2106-06-12T01:02:05Z',
+ u'stdoutStream': {
+ u'name': u'steps/parent/stdout',
+ },
+ u'substep': [
+
+ {u'step': {
+ u'name': u'child 1',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:06Z',
+ u'ended': u'2106-06-12T01:02:07Z',
+ u'stdoutStream': {
+ u'name': u'steps/parent/steps/child_1/stdout',
+ },
+ u'substep': [
+
+ {u'step': {
+ u'name': u'grandchild',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:08Z',
+ u'ended': u'2106-06-12T01:02:09Z',
+ u'stdoutStream': {
+ u'name': u'steps/parent/steps/child_1/'
+ 'steps/grandchild/stdout',
+ },
+ }},
+ ],
+ }},
+
+ {u'step': {
+ u'name': u'child 2',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:10Z',
+ u'ended': u'2106-06-12T01:02:11Z',
+ u'stdoutStream': {
+ u'name': u'steps/parent/steps/child_2/stdout',
+ },
+ }},
+ ],
+ }},
+
+ {u'step': {
+ u'name': u'friend',
+ u'status': u'SUCCESS',
+ u'started': u'2106-06-12T01:02:12Z',
+ u'ended': u'2106-06-12T01:02:13Z',
+ u'stdoutStream': {
+ u'name': u'steps/friend/stdout',
+ },
+ }},
+ ],
+ },
+
+ u'steps/parent/stdout': [u'I am the parent.'],
+ u'steps/parent/steps/child_1/stdout': [u'I am child #1.'],
+ u'steps/parent/steps/child_1/steps/grandchild/stdout': [
+ u"I am child #1's child."],
+ u'steps/parent/steps/child_2/stdout': [u'I am child #2.'],
+ u'steps/friend/stdout': [u"I am the parent's friend."],
+ })
+
+ def testTriggersRaiseException(self):
+ with self._new_stream_engine() as se:
+ with self._step_stream(se, name='trigger') as step:
+ with self.assertRaises(NotImplementedError):
+ step.trigger('trigger spec')
+
+ def testTriggersIgnored(self):
+ with self._new_stream_engine(ignore_triggers=True) as se:
+ with self._step_stream(se, name='trigger') as step:
+ step.trigger('trigger spec')
+
+ def testNoSubannotations(self):
+ with self._new_stream_engine(ignore_triggers=True) as se:
+ with self.assertRaises(NotImplementedError):
+ se.new_step_stream(recipe_api.StepConfig.create(
+ name='uses subannotations',
+ allow_subannotations=True,
+ ))
+
+ def testInvalidStepStatusRaisesValueError(self):
+ with self._new_stream_engine() as se:
+ with self._step_stream(se, name='trigger') as step:
+ with self.assertRaises(ValueError):
+ step.set_step_status('OHAI')
+
+
+class AnnotationMonitorTest(unittest.TestCase):
+ """Tests the stream_logdog._AnnotationMonitor directly."""
+
+ # A small timedelta, sufficient to block but fast enough to not make the
+ # test slow.
+ _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5)
+
+ class _DatagramBuffer(object):
+
+ def __init__(self):
+ self.datagrams = []
+ self.data_event = threading.Event()
+
+ def send(self, dg):
+ self.datagrams.append(dg)
+ self.data_event.set()
+
+ def __len__(self):
+ return len(self.datagrams)
+
+ @property
+ def latest(self):
+ if self.datagrams:
+ return self.datagrams[-1]
+ return None
+
+ def wait_for_data(self):
+ self.data_event.wait()
+ self.data_event.clear()
+ return self.latest
+
+
+ @contextlib.contextmanager
+ def _annotation_monitor(self, **kwargs):
+ # Default to a really high flush period. This should never naturally trigger
+ # during a test case.
+ kwargs.setdefault('flush_period', datetime.timedelta(hours=1))
+
+ am = stream_logdog._AnnotationMonitor(self.db, **kwargs)
+ try:
+ yield am
+ finally:
+ am.flush_and_join()
+
+ with am._lock:
+ # Assert that our timer has been shut down.
+ self.assertIsNone(am._flush_timer)
+ # Assert that there is no buffered data.
+ self.assertIsNone(am._current_data)
+
+ def setUp(self):
+ self.db = self._DatagramBuffer()
+
+ def testMonitorStartsAndJoinsWithNoData(self):
+ with self._annotation_monitor() as am:
+ pass
+
+ # No datagrams should have been sent.
+ self.assertIsNone(self.db.latest)
+ self.assertEqual(len(self.db.datagrams), 0)
+
+ def testMonitorBuffersAndSendsData(self):
+ with self._annotation_monitor() as am:
+ # The first piece of data should have been immediately sent.
+ am.signal_update('initial')
+ self.assertEqual(self.db.wait_for_data(), 'initial')
+
+ # Subsequent iterations should not send data, but should start the flush
+ # timer and buffer the latest data.
+ with am._lock:
+ self.assertIsNone(am._flush_timer)
+ for i in xrange(10):
+ am.signal_update('test%d' % (i,))
+ time.sleep(self._SMALL_TIME_DELTA.total_seconds())
+ with am._lock:
+ self.assertEqual(am._current_data, 'test9')
+ self.assertIsNotNone(am._flush_timer)
+
+ # Pretend the timer triggered. We should receive the latest buffered data.
+ am._flush_timer_expired()
+ self.assertEqual(self.db.wait_for_data(), 'test9')
+ with am._lock:
+ # No more timer or buffered data.
+ self.assertIsNone(am._flush_timer)
+ self.assertIsNone(am._current_data)
+
+ # Send one last chunk of data, but don't let the timer expire. This will
+ # be sent on final flush.
+ am.signal_update('final')
+ with am._lock:
+ self.assertIsNotNone(am._flush_timer)
+
+ # Assert that the final chunk of data was sent.
+ self.assertEqual(self.db.latest, 'final')
+
+ # Only three datagrams should have been sent.
+ self.assertEqual(len(self.db.datagrams), 3)
+
+ def testMonitorIgnoresDuplicateData(self):
+ with self._annotation_monitor() as am:
+ # Get initial data out of the way.
+ am.signal_update('initial')
+ self.assertEqual(self.db.wait_for_data(), 'initial')
+
+ # Send the same thing. It should not be buffered.
+ am.signal_update('initial')
+ with am._lock:
+ self.assertIsNone(am._flush_timer)
+ self.assertIsNone(am._current_data)
+
+ # Only one datagrams should have been sent.
+ self.assertEqual(len(self.db.datagrams), 1)
+
+ def testStructuralUpdateSendsImmediately(self):
+ with self._annotation_monitor() as am:
+ # Get initial data out of the way.
+ am.signal_update('initial')
+ self.assertEqual(self.db.wait_for_data(), 'initial')
+
+ # Send a structural update. It should send immediately.
+ am.signal_update('test', structural=True)
+ self.assertEqual(self.db.wait_for_data(), 'test')
+
+ # Send a duplicate structural update. It should be ignored.
+ am.signal_update('test', structural=True)
+ with am._lock:
+ self.assertIsNone(am._flush_timer)
+ self.assertIsNone(am._current_data)
+
+ # Only two datagrams should have been sent.
+ self.assertEqual(len(self.db.datagrams), 2)
+
+ def testFlushesPeriodically(self):
+ with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am:
+ # Get initial data out of the way.
+ am.signal_update('initial')
+ self.assertEqual(self.db.wait_for_data(), 'initial')
+
+ # Send a structural update. It should send immediately.
+ am.signal_update('test')
+ self.assertEqual(self.db.wait_for_data(), 'test')
+
+ # Only two datagrams should have been sent.
+ self.assertEqual(len(self.db.datagrams), 2)
+
+
+class AnnotationStateTest(unittest.TestCase):
+ """Tests the stream_logdog._AnnotationState directly."""
+
+ def setUp(self):
+ self.env = stream_logdog._Environment(
+ None,
+ argv=['command', 'arg0', 'arg1'],
+ cwd='path/to/cwd',
+ environ={
+ 'foo': 'bar',
+ 'FOO': 'baz',
+ },
+ )
+ self.astate = stream_logdog._AnnotationState.create(
+ stream_logdog._StreamName('strean/name'),
+ environment=self.env,
+ properties={'foo': 'bar'},
+ )
+
+ def testFirstCheckReturnsData(self):
+ # The first check should return data.
+ self.assertIsNotNone(self.astate.check())
+ # The second will, since nothing has changed.
+ self.assertIsNone(self.astate.check())
+
+ def testCanCreateAndGetStep(self):
+ # Root step.
+ base = self.astate.base
+ self.astate.create_step(recipe_api.StepConfig.create(name='first'))
+ self.assertEqual(len(base.substep), 1)
+ self.assertEqual(base.substep[0].step.name, 'first')
+ self.assertIsNotNone(self.astate.check())
+
+ # Child step.
+ self.astate.create_step(recipe_api.StepConfig.create(
+ name='first child',
+ step_nest_level=1))
+ self.assertEqual(len(base.substep), 1)
+ self.assertEqual(len(base.substep[0].step.substep), 1)
+ self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child')
+ self.assertIsNotNone(self.astate.check())
+
+ # Sibling step to 'first'.
+ self.astate.create_step(recipe_api.StepConfig.create(name='second'))
+ self.assertEqual(len(base.substep), 2)
+ self.assertEqual(base.substep[1].step.name, 'second')
+ self.assertIsNotNone(self.astate.check())
+
+ def testCanUpdateProperties(self):
+ self.astate.update_properties(foo='baz', qux='quux')
+ self.assertEqual(list(self.astate.base.property), [
+ pb.Step.Property(name='foo', value='baz'),
+ pb.Step.Property(name='qux', value='quux'),
+ ])
+
+
+class StreamNameTest(unittest.TestCase):
+ """Tests the stream_logdog._StreamName directly."""
+
+ def testEmptyStreamNameRaisesValueError(self):
+ sn = stream_logdog._StreamName(None)
+ with self.assertRaises(ValueError):
+ str(sn)
+
+ def testInvalidBaseRaisesValueError(self):
+ with self.assertRaises(ValueError):
+ stream_logdog._StreamName('!!! invalid !!!')
+
+ def testAppendComponents(self):
+ sn = stream_logdog._StreamName('base')
+ self.assertEqual(str(sn.append()), 'base')
+ self.assertEqual(str(sn.append('foo')), 'base/foo')
+ self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar')
+ self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz')
+
+ def testAugment(self):
+ sn = stream_logdog._StreamName('base')
+ self.assertEqual(str(sn.augment('')), 'base')
+ self.assertEqual(str(sn.augment('foo')), 'basefoo')
+ self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz')
+
+ def testAppendInvalidStreamNameNormalizes(self):
+ sn = stream_logdog._StreamName('base')
+ sn = sn.append('#!!! stream name !!!')
+ self.assertEqual(str(sn), 'base/s______stream_name____')
+
+ def testAugmentInvalidStreamNameNormalizes(self):
+ sn = stream_logdog._StreamName('base')
+ self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____')
+
+
+if __name__ == '__main__':
+ unittest.main()

Powered by Google App Engine
This is Rietveld 408576698