| Index: client/libs/logdog/tests/stream_test.py
|
| diff --git a/client/libs/logdog/tests/stream_test.py b/client/libs/logdog/tests/stream_test.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..b308f2eafb26c439097a5f6236bfa5feec8e0248
|
| --- /dev/null
|
| +++ b/client/libs/logdog/tests/stream_test.py
|
| @@ -0,0 +1,203 @@
|
| +#!/usr/bin/env python
|
| +# Copyright 2016 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed by the Apache v2.0 license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import json
|
| +import os
|
| +import sys
|
| +import unittest
|
| +import StringIO
|
| +
|
| +ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join(
|
| + __file__, os.pardir, os.pardir, os.pardir)))
|
| +sys.path.insert(0, ROOT_DIR)
|
| +
|
| +from libs.logdog import stream, varint
|
| +
|
| +
|
| +class StreamParamsTestCase(unittest.TestCase):
|
| +
|
| + def setUp(self):
|
| + self.params = stream.StreamParams(
|
| + 'name',
|
| + type=stream.StreamParams.TEXT,
|
| + content_type='content-type',
|
| + tags={
|
| + 'foo': 'bar',
|
| + 'baz': 'qux',
|
| + },
|
| + tee=stream.StreamParams.TEE_STDOUT,
|
| + binary_file_extension='ext')
|
| +
|
| + def testParamsToJson(self):
|
| + self.assertEqual(self.params.to_json(),
|
| + ('{"binaryFileExtension": "ext", "contentType": "content-type", '
|
| + '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, '
|
| + '"tee": "stdout", "type": "text"}'))
|
| +
|
| + def testParamsToJsonWithEmpties(self):
|
| + params = self.params._replace(
|
| + content_type=None,
|
| + tags=None,
|
| + tee=None,
|
| + binary_file_extension=None,
|
| + )
|
| + self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}')
|
| +
|
| + def testParamsWithInvalidTypeRaisesValueError(self):
|
| + params = self.params._replace(type=None)
|
| + self.assertRaises(ValueError, params.to_json)
|
| +
|
| + def testParamsWithInvalidTeeTypeRaisesValueError(self):
|
| + params = self.params._replace(tee='somewhere')
|
| + self.assertRaises(ValueError, params.to_json)
|
| +
|
| + def testParamsWithInvalidTagRaisesValueError(self):
|
| + params = self.params._replace(tags='foo')
|
| + self.assertRaises(ValueError, params.to_json)
|
| +
|
| + params = self.params._replace(tags={'!!! invalid tag key !!!': 'bar'})
|
| + self.assertRaises(ValueError, params.to_json)
|
| +
|
| +
|
| +class StreamClientTestCase(unittest.TestCase):
|
| +
|
| + class _TestStreamClientConnection(object):
|
| +
|
| + def __init__(self):
|
| + self.buffer = StringIO.StringIO()
|
| + self.closed = False
|
| +
|
| + def _assert_not_closed(self):
|
| + if self.closed:
|
| + raise Exception('Connection is closed.')
|
| +
|
| + def write(self, v):
|
| + self._assert_not_closed()
|
| + self.buffer.write(v)
|
| +
|
| + def close(self):
|
| + self._assert_not_closed()
|
| + self.closed = True
|
| +
|
| + def interpret(self):
|
| + data = StringIO.StringIO(self.buffer.getvalue())
|
| + length, _ = varint.read_uvarint(data)
|
| + header = data.read(length)
|
| + return json.loads(header), data.read()
|
| +
|
| + class _TestStreamClient(stream.StreamClient):
|
| + def __init__(self, value):
|
| + super(StreamClientTestCase._TestStreamClient, self).__init__()
|
| + self.value = value
|
| + self.last_conn = None
|
| +
|
| + @classmethod
|
| + def _create(cls, value):
|
| + return cls(value)
|
| +
|
| + def _connect_raw(self):
|
| + conn = StreamClientTestCase._TestStreamClientConnection()
|
| + self.last_conn = conn
|
| + return conn
|
| +
|
| + def setUp(self):
|
| + self._registry = stream.StreamProtocolRegistry()
|
| + self._registry.register_protocol('test', self._TestStreamClient)
|
| +
|
| + @staticmethod
|
| + def _split_datagrams(value):
|
| + sio = StringIO.StringIO(value)
|
| + while sio.pos < sio.len:
|
| + size_prefix, _ = varint.read_uvarint(sio)
|
| + data = sio.read(size_prefix)
|
| + if len(data) != size_prefix:
|
| + raise ValueError('Expected %d bytes, but only got %d' % (
|
| + size_prefix, len(data)))
|
| + yield data
|
| +
|
| + def testClientInstantiation(self):
|
| + client = self._registry.create('test:value')
|
| + self.assertIsInstance(client, self._TestStreamClient)
|
| + self.assertEqual(client.value, 'value')
|
| +
|
| + def testTextStream(self):
|
| + client = self._registry.create('test:value')
|
| + with client.text('mystream') as fd:
|
| + fd.write('text\nstream\nlines')
|
| +
|
| + conn = client.last_conn
|
| + self.assertTrue(conn.closed)
|
| +
|
| + header, data = conn.interpret()
|
| + self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
|
| + self.assertEqual(data, 'text\nstream\nlines')
|
| +
|
| + def testTextStreamWithParams(self):
|
| + client = self._registry.create('test:value')
|
| + with client.text('mystream', content_type='foo/bar',
|
| + tee=stream.StreamParams.TEE_STDOUT,
|
| + tags={'foo': 'bar', 'baz': 'qux'}) as fd:
|
| + fd.write('text!')
|
| +
|
| + conn = client.last_conn
|
| + self.assertTrue(conn.closed)
|
| +
|
| + header, data = conn.interpret()
|
| + self.assertEqual(header, {
|
| + 'name': 'mystream',
|
| + 'type': 'text',
|
| + 'contentType': 'foo/bar',
|
| + 'tee': 'stdout',
|
| + 'tags': {'foo': 'bar', 'baz': 'qux'},
|
| + })
|
| + self.assertEqual(data, 'text!')
|
| +
|
| + def testBinaryStream(self):
|
| + client = self._registry.create('test:value')
|
| + with client.binary('mystream') as fd:
|
| + fd.write('\x60\x0d\xd0\x65')
|
| +
|
| + conn = client.last_conn
|
| + self.assertTrue(conn.closed)
|
| +
|
| + header, data = conn.interpret()
|
| + self.assertEqual(header, {'name': 'mystream', 'type': 'binary'})
|
| + self.assertEqual(data, '\x60\x0d\xd0\x65')
|
| +
|
| + def testDatagramStream(self):
|
| + client = self._registry.create('test:value')
|
| + with client.datagram('mystream') as fd:
|
| + fd.send('datagram0')
|
| + fd.send('dg1')
|
| + fd.send('')
|
| + fd.send('dg3')
|
| +
|
| + conn = client.last_conn
|
| + self.assertTrue(conn.closed)
|
| +
|
| + header, data = conn.interpret()
|
| + self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'})
|
| + self.assertEqual(list(self._split_datagrams(data)),
|
| + ['datagram0', 'dg1', '', 'dg3'])
|
| +
|
| + def testCreatingDuplicateStreamNameRaisesValueError(self):
|
| + client = self._registry.create('test:value')
|
| + with client.text('mystream') as fd:
|
| + fd.write('Using a text stream.')
|
| +
|
| + with self.assertRaises(ValueError):
|
| + with client.text('mystream') as fd:
|
| + fd.write('Should not work.')
|
| +
|
| + conn = client.last_conn
|
| + self.assertTrue(conn.closed)
|
| +
|
| + header, data = conn.interpret()
|
| + self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
|
| + self.assertEqual(data, 'Using a text stream.')
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|