Index: recipe_engine/unittests/stream_logdog_test.py |
diff --git a/recipe_engine/unittests/stream_logdog_test.py b/recipe_engine/unittests/stream_logdog_test.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..af8cb39f8ba1c70da87077bb56ac79b17d5db39f |
--- /dev/null |
+++ b/recipe_engine/unittests/stream_logdog_test.py |
@@ -0,0 +1,860 @@ |
+#!/usr/bin/env python |
+# Copyright 2015 The LUCI Authors. All rights reserved. |
+# Use of this source code is governed under the Apache License, Version 2.0 |
+# that can be found in the LICENSE file. |
+ |
+import collections |
+import contextlib |
+import datetime |
+import json |
+import threading |
+import time |
+import unittest |
+import StringIO |
+ |
+import test_env |
+ |
+import libs.logdog.stream |
+import libs.logdog.varint |
+from google.protobuf import json_format as jsonpb |
+from recipe_engine import recipe_api |
+from recipe_engine import stream |
+from recipe_engine import stream_logdog |
+ |
+ |
+import annotations_pb2 as pb |
+ |
+ |
+def _translate_annotation_datagram(dg): |
+ """Translate annotation datagram binary data into a Python dict modeled after |
+ the JSONPB projection of the datagram. |
+ |
+ This is chosen because it allows for easy idiomatic equality assertions in |
+ test cases. |
+ |
+ Args: |
+ dg (str): The serialized annotation pb.Step datagram. |
+ """ |
+ msg = pb.Step() |
+ msg.ParseFromString(dg) |
+ return json.loads(jsonpb.MessageToJson(msg)) |
+ |
+ |
+class _TestStreamClient(libs.logdog.stream.StreamClient): |
+ """A testing StreamClient that retains all data written to it.""" |
+ |
+ class Stream(object): |
+ """A file-like object that is explicitly aware of LogDog stream protocol.""" |
+ |
+ def __init__(self, stream_client): |
+ self._client = stream_client |
+ self._buf = StringIO.StringIO() |
+ self._header = None |
+ self._final_data = None |
+ self._data_offset = None |
+ |
+ def write(self, data): |
+ self._buf.write(data) |
+ self._attempt_registration() |
+ |
+ def close(self): |
+ # If we never parsed our header, register that we are incomplete. |
+ if self._header is None: |
+ self._client._register_incomplete(self) |
+ |
+ self._final_data = self.data |
+ self._buf.close() |
+ |
+ @contextlib.contextmanager |
+ def _read_from(self, offset): |
+ # Seek to the specified offset. |
+ self._buf.seek(offset, mode=0) |
+ try: |
+ yield self._buf |
+ finally: |
+ # Seek back to he end of the stream so future writes will append. |
+ self._buf.seek(0, mode=2) |
+ |
+ def _attempt_registration(self): |
+ # Only need to register once. |
+ if self._header is not None: |
+ return |
+ |
+ # Can we parse a full LogDog stream header? |
+ # |
+ # This means pulling: |
+ # - The LogDog Butler header. |
+ # - The header size varint. |
+ # - The header JSON blob, which needs to be decoded. |
+ with self._read_from(0) as fd: |
+ # Read 'result' bytes. |
+ magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC)) |
+ if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC): |
+ # Incomplete magic number, cannot complete registration. |
+ return |
+ count = len(magic_data) |
+ |
+ try: |
+ size, varint_count = libs.logdog.varint.read_uvarint(fd) |
+ except ValueError: |
+ # Incomplete varint, cannot complete registration. |
+ return |
+ count += varint_count |
+ |
+ header_data = fd.read(size) |
+ if len(header_data) != size: |
+ # Incomplete header, cannot complete registration. |
+ return |
+ count += size |
+ |
+ # Parse the header as JSON. |
+ self._header = json.loads(header_data) |
+ self._data_offset = count # (varint + header size) |
+ self._client._register_stream(self, self._header) |
+ |
+ @property |
+ def data(self): |
+ # If we have already cached our data (on close), return it directly. |
+ if self._final_data is not None: |
+ return self._final_data |
+ |
+ # Load our data from our live buffer. |
+ if self._data_offset is None: |
+ # No header has been read, so there is no data. |
+ return None |
+ with self._read_from(self._data_offset) as fd: |
+ return fd.read() |
+ |
+ |
+ _StreamEntry = collections.namedtuple('_StreamEntry', ( |
+ 's', 'type', 'content_type')) |
+ |
+ _DATAGRAM_CONTENT_TRANSLATE = { |
+ stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram, |
+ } |
+ |
+ |
+ def __init__(self): |
+ super(_TestStreamClient, self).__init__() |
+ self.streams = {} |
+ self.incomplete = [] |
+ self.unregistered = {} |
+ |
+ @classmethod |
+ def _create(cls, value): |
+ raise NotImplementedError('Instances must be created manually.') |
+ |
+ def _connect_raw(self): |
+ s = self.Stream(self) |
+ self.unregistered[id(s)] = s |
+ return s |
+ |
+ def get(self, name): |
+ se = self.streams[name] |
+ data = se.s.data |
+ |
+ if se.type == libs.logdog.stream.StreamParams.TEXT: |
+ # Return text stream data as a list of lines. We use unicode because it |
+ # fits in with the JSON dump from 'all_streams'. |
+ return [unicode(l) for l in data.splitlines()] |
+ elif se.type == libs.logdog.stream.StreamParams.BINARY: |
+ raise NotImplementedError('No support for fetching binary stream data.') |
+ elif se.type == libs.logdog.stream.StreamParams.DATAGRAM: |
+ # Return datagram stream data as a list of datagrams. |
+ sio = StringIO.StringIO(data) |
+ datagrams = [] |
+ while sio.tell() < sio.len: |
+ size, _ = libs.logdog.varint.read_uvarint(sio) |
+ dg = sio.read(size) |
+ if len(dg) != size: |
+ raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size)) |
+ |
+ # If this datagram is a known type (e.g., protobuf), transform it into |
+ # JSONPB. |
+ translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type) |
+ if translator is not None: |
+ dg = translator(dg) |
+ datagrams.append(dg) |
+ |
+ sio.close() |
+ return dg |
+ else: |
+ raise ValueError('Unknown stream type [%s]' % (se.type,)) |
+ |
+ def all_streams(self): |
+ return dict((name, self.get(name)) for name in self.streams.iterkeys()) |
+ |
+ @property |
+ def stream_names(self): |
+ return set(self.streams.iterkeys()) |
+ |
+ def _remove_from_unregistered(self, s): |
+ if id(s) not in self.unregistered: |
+ raise KeyError('Stream is not known to be unregistered.') |
+ del(self.unregistered[id(s)]) |
+ |
+ def _register_stream(self, s, header): |
+ name = header.get('name') |
+ if name in self.streams: |
+ raise KeyError('Duplicate stream [%s]' % (name,)) |
+ |
+ self._remove_from_unregistered(s) |
+ self.streams[name] = self._StreamEntry( |
+ s=s, |
+ type=header['type'], |
+ content_type=header.get('contentType'), |
+ ) |
+ |
+ def _register_incomplete(self, s): |
+ self._remove_from_unregistered(s) |
+ self.incomplete.append(s) |
+ |
+ |
+class EnvironmentTest(unittest.TestCase): |
+ """Simple test to assert that _Environment, which is stubbed during our tests, |
+ actually works.""" |
+ |
+ def testEnvironmentProbes(self): |
+ stream_logdog._Environment.probe() |
+ |
+ |
+class StreamEngineTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.client = _TestStreamClient() |
+ self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) |
+ self.env = stream_logdog._Environment( |
+ now_fn=lambda: self.now, |
+ argv=[], |
+ environ={}, |
+ cwd=None, |
+ ) |
+ self.maxDiff = 1024*1024 |
+ |
+ |
+ @contextlib.contextmanager |
+ def _new_stream_engine(self, **kwargs): |
+ kwargs.setdefault('client', self.client) |
+ kwargs.setdefault('environment', self.env) |
+ |
+ # Initialize and open a StreamEngine. |
+ se = stream_logdog.StreamEngine(**kwargs) |
+ se.open() |
+ yield se |
+ |
+ # Close the StreamEngine after we're done with it. |
+ self._advance_time() |
+ se.close() |
+ |
+ @contextlib.contextmanager |
+ def _step_stream(self, se, **kwargs): |
+ # Initialize and yield a new step stream. |
+ self._advance_time() |
+ step_stream = se.new_step_stream(recipe_api.StepConfig.create(**kwargs)) |
+ yield step_stream |
+ |
+ # Close the step stream when we're done with it. |
+ self._advance_time() |
+ step_stream.close() |
+ |
+ @contextlib.contextmanager |
+ def _log_stream(self, step_stream, name): |
+ # Initialize and yield a new log stream. |
+ log_stream = step_stream.new_log_stream(name) |
+ yield log_stream |
+ |
+ # Close the log stream when we're done with it. |
+ log_stream.close() |
+ |
+ def _advance_time(self): |
+ self.now += datetime.timedelta(seconds=1) |
+ |
+ def testEmptyStreamEngine(self): |
+ self.env.argv = ['fake_program', 'arg0', 'arg1'] |
+ self.env.environ['foo'] = 'bar' |
+ self.env.cwd = 'CWD' |
+ |
+ with self._new_stream_engine() as se: |
+ pass |
+ |
+ self.assertEqual(self.client.all_streams(), { |
+ u'annotations': { |
+ u'name': u'steps', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:03Z', |
+ u'ended': u'2106-06-12T01:02:04Z', |
+ u'command': { |
+ u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
+ u'cwd': u'CWD', |
+ u'environ': {u'foo': u'bar'}, |
+ }, |
+ }, |
+ }) |
+ |
+ def testBasicStream(self): |
+ self.env.argv = ['fake_program', 'arg0', 'arg1'] |
+ self.env.environ['foo'] = 'bar' |
+ self.env.cwd = 'CWD' |
+ |
+ with self._new_stream_engine(name_base='test/base') as se: |
+ with self._step_stream(se, |
+ name='first step', |
+ cmd=['first', 'step'], |
+ cwd='FIRST_CWD') as step: |
+ step.add_step_text('Sup') |
+ step.add_step_text('Dawg?') |
+ step.write_line('STDOUT for first step.') |
+ step.write_line('(Another line)') |
+ step.add_step_summary_text('Everything is great.') |
+ step.add_step_link('example 1', 'http://example.com/1') |
+ step.add_step_link('example 2', 'http://example.com/2') |
+ step.set_step_status('SUCCESS') |
+ |
+ with self._step_stream(se, name='second step') as step: |
+ step.set_step_status('SUCCESS') |
+ step.write_split('multiple\nlines\nof\ntext') |
+ |
+ # Create two log streams with the same name to test indexing. |
+ # |
+ # Note that "log stream" is an invalid LogDog stream name, so this |
+ # will also test normalization. |
+ with self._log_stream(step, 'log stream') as ls: |
+ ls.write_split('foo\nbar\nbaz\n') |
+ with self._log_stream(step, 'log stream') as ls: |
+ ls.write_split('qux\nquux\n') |
+ |
+ # This is a different stream name, but will normalize to the same log |
+ # stream name as 'second/step', so this will test that we disambiguate |
+ # the log stream names. |
+ with self._step_stream(se, name='second/step') as step: |
+ pass |
+ |
+ self.assertEqual(self.client.all_streams(), { |
+ u'test/base/annotations': { |
+ u'name': u'steps', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:03Z', |
+ u'ended': u'2106-06-12T01:02:10Z', |
+ u'command': { |
+ u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
+ u'cwd': u'CWD', |
+ u'environ': {u'foo': u'bar'}, |
+ }, |
+ u'substep': [ |
+ |
+ {u'step': { |
+ u'name': u'first step', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:04Z', |
+ u'ended': u'2106-06-12T01:02:05Z', |
+ u'command': { |
+ u'commandLine': [u'first', u'step'], |
+ u'cwd': u'FIRST_CWD', |
+ }, |
+ u'stdoutStream': { |
+ u'name': u'test/base/steps/first_step/stdout', |
+ }, |
+ u'text': [u'Everything is great.', u'Sup', u'Dawg?'], |
+ u'otherLinks': [ |
+ { |
+ u'label': u'example 1', |
+ u'url': u'http://example.com/1', |
+ }, |
+ { |
+ u'label': u'example 2', |
+ u'url': u'http://example.com/2', |
+ }, |
+ ], |
+ }}, |
+ |
+ {u'step': { |
+ u'name': u'second step', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:06Z', |
+ u'ended': u'2106-06-12T01:02:07Z', |
+ u'stdoutStream': { |
+ u'name': u'test/base/steps/second_step/stdout', |
+ }, |
+ u'otherLinks': [ |
+ { |
+ u'label': u'log stream', |
+ u'logdogStream': { |
+ u'name': u'test/base/steps/second_step/logs/log_stream/0', |
+ }, |
+ }, |
+ { |
+ u'label': u'log stream', |
+ u'logdogStream': { |
+ u'name': u'test/base/steps/second_step/logs/log_stream/1', |
+ }, |
+ }, |
+ ], |
+ }}, |
+ |
+ {u'step': { |
+ u'name': u'second/step', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:08Z', |
+ u'ended': u'2106-06-12T01:02:09Z', |
+ }}, |
+ ], |
+ }, |
+ |
+ u'test/base/steps/first_step/stdout': [ |
+ u'STDOUT for first step.', |
+ u'(Another line)', |
+ ], |
+ |
+ u'test/base/steps/second_step/stdout': [ |
+ u'multiple', |
+ u'lines', |
+ u'of', |
+ u'text', |
+ ], |
+ |
+ u'test/base/steps/second_step/logs/log_stream/0': [ |
+ u'foo', |
+ u'bar', |
+ u'baz', |
+ ], |
+ |
+ u'test/base/steps/second_step/logs/log_stream/1': [ |
+ u'qux', |
+ u'quux', |
+ ], |
+ }) |
+ |
+ def testWarningBasicStream(self): |
+ with self._new_stream_engine() as se: |
+ with self._step_stream(se, name='isuck') as step: |
+ step.add_step_summary_text('Something went wrong.') |
+ step.set_step_status('WARNING') |
+ |
+ self.assertEqual(self.client.all_streams(), { |
+ u'annotations': { |
+ u'name': u'steps', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:03Z', |
+ u'ended': u'2106-06-12T01:02:06Z', |
+ u'substep': [ |
+ |
+ {u'step': { |
+ u'name': u'isuck', |
+ u'status': u'SUCCESS', |
+ u'failureDetails': { |
+ u'text': u'Something went wrong.', |
+ }, |
+ u'started': u'2106-06-12T01:02:04Z', |
+ u'ended': u'2106-06-12T01:02:05Z', |
+ u'text': [u'Something went wrong.'], |
+ }}, |
+ ], |
+ }, |
+ }) |
+ |
+ def testFailedBasicStream(self): |
+ with self._new_stream_engine() as se: |
+ with self._step_stream(se, name='isuck') as step: |
+ step.add_step_summary_text('Oops I failed.') |
+ step.set_step_status('FAILURE') |
+ |
+ with self._step_stream(se, name='irock') as step: |
+ pass |
+ |
+ self.assertEqual(self.client.all_streams(), { |
+ u'annotations': { |
+ u'name': u'steps', |
+ u'status': u'FAILURE', |
+ u'started': u'2106-06-12T01:02:03Z', |
+ u'ended': u'2106-06-12T01:02:08Z', |
+ u'substep': [ |
+ |
+ {u'step': { |
+ u'name': u'isuck', |
+ u'status': u'FAILURE', |
+ u'failureDetails': { |
+ u'text': u'Oops I failed.', |
+ }, |
+ u'started': u'2106-06-12T01:02:04Z', |
+ u'ended': u'2106-06-12T01:02:05Z', |
+ u'text': [u'Oops I failed.'], |
+ }}, |
+ |
+ {u'step': { |
+ u'name': u'irock', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:06Z', |
+ u'ended': u'2106-06-12T01:02:07Z', |
+ }}, |
+ ], |
+ }, |
+ }) |
+ |
+ def testNestedStream(self): |
+ with self._new_stream_engine() as se: |
+ # parent |
+ with self._step_stream(se, name='parent') as step: |
+ step.write_line('I am the parent.') |
+ |
+ # parent."child 1" |
+ with self._step_stream(se, |
+ name='child 1', |
+ step_nest_level=1) as step: |
+ step.write_line('I am child #1.') |
+ |
+ # parent."child 1"."grandchild" |
+ with self._step_stream(se, |
+ name='grandchild', |
+ step_nest_level=2) as step: |
+ step.write_line("I am child #1's child.") |
+ |
+ # parent."child 2". Mark this child as failed. This should not propagate |
+ # to the parent, since it has an explicit status. |
+ with self._step_stream(se, |
+ name='child 2', |
+ step_nest_level=1) as step: |
+ step.write_line('I am child #2.') |
+ |
+ # parent."child 2". Mark this child as failed. This should not propagate |
+ # to the parent, since it has an explicit status. |
+ with self._step_stream(se, name='friend') as step: |
+ step.write_line("I am the parent's friend.") |
+ |
+ self.assertEqual(self.client.all_streams(), { |
+ u'annotations': { |
+ u'name': u'steps', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:03Z', |
+ u'ended': u'2106-06-12T01:02:14Z', |
+ u'substep': [ |
+ |
+ {u'step': { |
+ u'name': u'parent', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:04Z', |
+ u'ended': u'2106-06-12T01:02:05Z', |
+ u'stdoutStream': { |
+ u'name': u'steps/parent/stdout', |
+ }, |
+ u'substep': [ |
+ |
+ {u'step': { |
+ u'name': u'child 1', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:06Z', |
+ u'ended': u'2106-06-12T01:02:07Z', |
+ u'stdoutStream': { |
+ u'name': u'steps/parent/steps/child_1/stdout', |
+ }, |
+ u'substep': [ |
+ |
+ {u'step': { |
+ u'name': u'grandchild', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:08Z', |
+ u'ended': u'2106-06-12T01:02:09Z', |
+ u'stdoutStream': { |
+ u'name': u'steps/parent/steps/child_1/' |
+ 'steps/grandchild/stdout', |
+ }, |
+ }}, |
+ ], |
+ }}, |
+ |
+ {u'step': { |
+ u'name': u'child 2', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:10Z', |
+ u'ended': u'2106-06-12T01:02:11Z', |
+ u'stdoutStream': { |
+ u'name': u'steps/parent/steps/child_2/stdout', |
+ }, |
+ }}, |
+ ], |
+ }}, |
+ |
+ {u'step': { |
+ u'name': u'friend', |
+ u'status': u'SUCCESS', |
+ u'started': u'2106-06-12T01:02:12Z', |
+ u'ended': u'2106-06-12T01:02:13Z', |
+ u'stdoutStream': { |
+ u'name': u'steps/friend/stdout', |
+ }, |
+ }}, |
+ ], |
+ }, |
+ |
+ u'steps/parent/stdout': [u'I am the parent.'], |
+ u'steps/parent/steps/child_1/stdout': [u'I am child #1.'], |
+ u'steps/parent/steps/child_1/steps/grandchild/stdout': [ |
+ u"I am child #1's child."], |
+ u'steps/parent/steps/child_2/stdout': [u'I am child #2.'], |
+ u'steps/friend/stdout': [u"I am the parent's friend."], |
+ }) |
+ |
+ def testTriggersRaiseException(self): |
+ with self._new_stream_engine() as se: |
+ with self._step_stream(se, name='trigger') as step: |
+ with self.assertRaises(NotImplementedError): |
+ step.trigger('trigger spec') |
+ |
+ def testTriggersIgnored(self): |
+ with self._new_stream_engine(ignore_triggers=True) as se: |
+ with self._step_stream(se, name='trigger') as step: |
+ step.trigger('trigger spec') |
+ |
+ def testNoSubannotations(self): |
+ with self._new_stream_engine(ignore_triggers=True) as se: |
+ with self.assertRaises(NotImplementedError): |
+ se.new_step_stream(recipe_api.StepConfig.create( |
+ name='uses subannotations', |
+ allow_subannotations=True, |
+ )) |
+ |
+ def testInvalidStepStatusRaisesValueError(self): |
+ with self._new_stream_engine() as se: |
+ with self._step_stream(se, name='trigger') as step: |
+ with self.assertRaises(ValueError): |
+ step.set_step_status('OHAI') |
+ |
+ |
+class AnnotationMonitorTest(unittest.TestCase): |
+ """Tests the stream_logdog._AnnotationMonitor directly.""" |
+ |
+ # A small timedelta, sufficient to block but fast enough to not make the |
+ # test slow. |
+ _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5) |
+ |
+ class _DatagramBuffer(object): |
+ |
+ def __init__(self): |
+ self.datagrams = [] |
+ self.data_event = threading.Event() |
+ |
+ def send(self, dg): |
+ self.datagrams.append(dg) |
+ self.data_event.set() |
+ |
+ def __len__(self): |
+ return len(self.datagrams) |
+ |
+ @property |
+ def latest(self): |
+ if self.datagrams: |
+ return self.datagrams[-1] |
+ return None |
+ |
+ def wait_for_data(self): |
+ self.data_event.wait() |
+ self.data_event.clear() |
+ return self.latest |
+ |
+ |
+ @contextlib.contextmanager |
+ def _annotation_monitor(self, **kwargs): |
+ # Default to a really high flush period. This should never naturally trigger |
+ # during a test case. |
+ kwargs.setdefault('flush_period', datetime.timedelta(hours=1)) |
+ |
+ am = stream_logdog._AnnotationMonitor(self.db, **kwargs) |
+ try: |
+ yield am |
+ finally: |
+ am.flush_and_join() |
+ |
+ with am._lock: |
+ # Assert that our timer has been shut down. |
+ self.assertIsNone(am._flush_timer) |
+ # Assert that there is no buffered data. |
+ self.assertIsNone(am._current_data) |
+ |
+ def setUp(self): |
+ self.db = self._DatagramBuffer() |
+ |
+ def testMonitorStartsAndJoinsWithNoData(self): |
+ with self._annotation_monitor() as am: |
+ pass |
+ |
+ # No datagrams should have been sent. |
+ self.assertIsNone(self.db.latest) |
+ self.assertEqual(len(self.db.datagrams), 0) |
+ |
+ def testMonitorBuffersAndSendsData(self): |
+ with self._annotation_monitor() as am: |
+ # The first piece of data should have been immediately sent. |
+ am.signal_update('initial') |
+ self.assertEqual(self.db.wait_for_data(), 'initial') |
+ |
+ # Subsequent iterations should not send data, but should start the flush |
+ # timer and buffer the latest data. |
+ with am._lock: |
+ self.assertIsNone(am._flush_timer) |
+ for i in xrange(10): |
+ am.signal_update('test%d' % (i,)) |
+ time.sleep(self._SMALL_TIME_DELTA.total_seconds()) |
+ with am._lock: |
+ self.assertEqual(am._current_data, 'test9') |
+ self.assertIsNotNone(am._flush_timer) |
+ |
+ # Pretend the timer triggered. We should receive the latest buffered data. |
+ am._flush_timer_expired() |
+ self.assertEqual(self.db.wait_for_data(), 'test9') |
+ with am._lock: |
+ # No more timer or buffered data. |
+ self.assertIsNone(am._flush_timer) |
+ self.assertIsNone(am._current_data) |
+ |
+ # Send one last chunk of data, but don't let the timer expire. This will |
+ # be sent on final flush. |
+ am.signal_update('final') |
+ with am._lock: |
+ self.assertIsNotNone(am._flush_timer) |
+ |
+ # Assert that the final chunk of data was sent. |
+ self.assertEqual(self.db.latest, 'final') |
+ |
+ # Only three datagrams should have been sent. |
+ self.assertEqual(len(self.db.datagrams), 3) |
+ |
+ def testMonitorIgnoresDuplicateData(self): |
+ with self._annotation_monitor() as am: |
+ # Get initial data out of the way. |
+ am.signal_update('initial') |
+ self.assertEqual(self.db.wait_for_data(), 'initial') |
+ |
+ # Send the same thing. It should not be buffered. |
+ am.signal_update('initial') |
+ with am._lock: |
+ self.assertIsNone(am._flush_timer) |
+ self.assertIsNone(am._current_data) |
+ |
+ # Only one datagrams should have been sent. |
+ self.assertEqual(len(self.db.datagrams), 1) |
+ |
+ def testStructuralUpdateSendsImmediately(self): |
+ with self._annotation_monitor() as am: |
+ # Get initial data out of the way. |
+ am.signal_update('initial') |
+ self.assertEqual(self.db.wait_for_data(), 'initial') |
+ |
+ # Send a structural update. It should send immediately. |
+ am.signal_update('test', structural=True) |
+ self.assertEqual(self.db.wait_for_data(), 'test') |
+ |
+ # Send a duplicate structural update. It should be ignored. |
+ am.signal_update('test', structural=True) |
+ with am._lock: |
+ self.assertIsNone(am._flush_timer) |
+ self.assertIsNone(am._current_data) |
+ |
+ # Only two datagrams should have been sent. |
+ self.assertEqual(len(self.db.datagrams), 2) |
+ |
+ def testFlushesPeriodically(self): |
+ with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am: |
+ # Get initial data out of the way. |
+ am.signal_update('initial') |
+ self.assertEqual(self.db.wait_for_data(), 'initial') |
+ |
+ # Send a structural update. It should send immediately. |
+ am.signal_update('test') |
+ self.assertEqual(self.db.wait_for_data(), 'test') |
+ |
+ # Only two datagrams should have been sent. |
+ self.assertEqual(len(self.db.datagrams), 2) |
+ |
+ |
+class AnnotationStateTest(unittest.TestCase): |
+ """Tests the stream_logdog._AnnotationState directly.""" |
+ |
+ def setUp(self): |
+ self.env = stream_logdog._Environment( |
+ None, |
+ argv=['command', 'arg0', 'arg1'], |
+ cwd='path/to/cwd', |
+ environ={ |
+ 'foo': 'bar', |
+ 'FOO': 'baz', |
+ }, |
+ ) |
+ self.astate = stream_logdog._AnnotationState.create( |
+ stream_logdog._StreamName('strean/name'), |
+ environment=self.env, |
+ properties={'foo': 'bar'}, |
+ ) |
+ |
+ def testFirstCheckReturnsData(self): |
+ # The first check should return data. |
+ self.assertIsNotNone(self.astate.check()) |
+ # The second will, since nothing has changed. |
+ self.assertIsNone(self.astate.check()) |
+ |
+ def testCanCreateAndGetStep(self): |
+ # Root step. |
+ base = self.astate.base |
+ self.astate.create_step(recipe_api.StepConfig.create(name='first')) |
+ self.assertEqual(len(base.substep), 1) |
+ self.assertEqual(base.substep[0].step.name, 'first') |
+ self.assertIsNotNone(self.astate.check()) |
+ |
+ # Child step. |
+ self.astate.create_step(recipe_api.StepConfig.create( |
+ name='first child', |
+ step_nest_level=1)) |
+ self.assertEqual(len(base.substep), 1) |
+ self.assertEqual(len(base.substep[0].step.substep), 1) |
+ self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child') |
+ self.assertIsNotNone(self.astate.check()) |
+ |
+ # Sibling step to 'first'. |
+ self.astate.create_step(recipe_api.StepConfig.create(name='second')) |
+ self.assertEqual(len(base.substep), 2) |
+ self.assertEqual(base.substep[1].step.name, 'second') |
+ self.assertIsNotNone(self.astate.check()) |
+ |
+ def testCanUpdateProperties(self): |
+ self.astate.update_properties(foo='baz', qux='quux') |
+ self.assertEqual(list(self.astate.base.property), [ |
+ pb.Step.Property(name='foo', value='baz'), |
+ pb.Step.Property(name='qux', value='quux'), |
+ ]) |
+ |
+ |
+class StreamNameTest(unittest.TestCase): |
+ """Tests the stream_logdog._StreamName directly.""" |
+ |
+ def testEmptyStreamNameRaisesValueError(self): |
+ sn = stream_logdog._StreamName(None) |
+ with self.assertRaises(ValueError): |
+ str(sn) |
+ |
+ def testInvalidBaseRaisesValueError(self): |
+ with self.assertRaises(ValueError): |
+ stream_logdog._StreamName('!!! invalid !!!') |
+ |
+ def testAppendComponents(self): |
+ sn = stream_logdog._StreamName('base') |
+ self.assertEqual(str(sn.append()), 'base') |
+ self.assertEqual(str(sn.append('foo')), 'base/foo') |
+ self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar') |
+ self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz') |
+ |
+ def testAugment(self): |
+ sn = stream_logdog._StreamName('base') |
+ self.assertEqual(str(sn.augment('')), 'base') |
+ self.assertEqual(str(sn.augment('foo')), 'basefoo') |
+ self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz') |
+ |
+ def testAppendInvalidStreamNameNormalizes(self): |
+ sn = stream_logdog._StreamName('base') |
+ sn = sn.append('#!!! stream name !!!') |
+ self.assertEqual(str(sn), 'base/s______stream_name____') |
+ |
+ def testAugmentInvalidStreamNameNormalizes(self): |
+ sn = stream_logdog._StreamName('base') |
+ self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____') |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |