Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(48)

Unified Diff: client/libs/logdog/tests/stream_test.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Remove "client." from package name, make pylint happy. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/libs/logdog/tests/bootstrap_test.py ('k') | client/libs/logdog/tests/streamname_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/libs/logdog/tests/stream_test.py
diff --git a/client/libs/logdog/tests/stream_test.py b/client/libs/logdog/tests/stream_test.py
new file mode 100755
index 0000000000000000000000000000000000000000..b308f2eafb26c439097a5f6236bfa5feec8e0248
--- /dev/null
+++ b/client/libs/logdog/tests/stream_test.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+# Copyright 2016 The LUCI Authors. All rights reserved.
+# Use of this source code is governed by the Apache v2.0 license that can be
+# found in the LICENSE file.
+
+import json
+import os
+import sys
+import unittest
+import StringIO
+
+ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join(
+ __file__, os.pardir, os.pardir, os.pardir)))
+sys.path.insert(0, ROOT_DIR)
+
+from libs.logdog import stream, varint
+
+
+class StreamParamsTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.params = stream.StreamParams(
+ 'name',
+ type=stream.StreamParams.TEXT,
+ content_type='content-type',
+ tags={
+ 'foo': 'bar',
+ 'baz': 'qux',
+ },
+ tee=stream.StreamParams.TEE_STDOUT,
+ binary_file_extension='ext')
+
+ def testParamsToJson(self):
+ self.assertEqual(self.params.to_json(),
+ ('{"binaryFileExtension": "ext", "contentType": "content-type", '
+ '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, '
+ '"tee": "stdout", "type": "text"}'))
+
+ def testParamsToJsonWithEmpties(self):
+ params = self.params._replace(
+ content_type=None,
+ tags=None,
+ tee=None,
+ binary_file_extension=None,
+ )
+ self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}')
+
+ def testParamsWithInvalidTypeRaisesValueError(self):
+ params = self.params._replace(type=None)
+ self.assertRaises(ValueError, params.to_json)
+
+ def testParamsWithInvalidTeeTypeRaisesValueError(self):
+ params = self.params._replace(tee='somewhere')
+ self.assertRaises(ValueError, params.to_json)
+
+ def testParamsWithInvalidTagRaisesValueError(self):
+ params = self.params._replace(tags='foo')
+ self.assertRaises(ValueError, params.to_json)
+
+ params = self.params._replace(tags={'!!! invalid tag key !!!': 'bar'})
+ self.assertRaises(ValueError, params.to_json)
+
+
+class StreamClientTestCase(unittest.TestCase):
+
+ class _TestStreamClientConnection(object):
+
+ def __init__(self):
+ self.buffer = StringIO.StringIO()
+ self.closed = False
+
+ def _assert_not_closed(self):
+ if self.closed:
+ raise Exception('Connection is closed.')
+
+ def write(self, v):
+ self._assert_not_closed()
+ self.buffer.write(v)
+
+ def close(self):
+ self._assert_not_closed()
+ self.closed = True
+
+ def interpret(self):
+ data = StringIO.StringIO(self.buffer.getvalue())
+ length, _ = varint.read_uvarint(data)
+ header = data.read(length)
+ return json.loads(header), data.read()
+
+ class _TestStreamClient(stream.StreamClient):
+ def __init__(self, value):
+ super(StreamClientTestCase._TestStreamClient, self).__init__()
+ self.value = value
+ self.last_conn = None
+
+ @classmethod
+ def _create(cls, value):
+ return cls(value)
+
+ def _connect_raw(self):
+ conn = StreamClientTestCase._TestStreamClientConnection()
+ self.last_conn = conn
+ return conn
+
+ def setUp(self):
+ self._registry = stream.StreamProtocolRegistry()
+ self._registry.register_protocol('test', self._TestStreamClient)
+
+ @staticmethod
+ def _split_datagrams(value):
+ sio = StringIO.StringIO(value)
+ while sio.pos < sio.len:
+ size_prefix, _ = varint.read_uvarint(sio)
+ data = sio.read(size_prefix)
+ if len(data) != size_prefix:
+ raise ValueError('Expected %d bytes, but only got %d' % (
+ size_prefix, len(data)))
+ yield data
+
+ def testClientInstantiation(self):
+ client = self._registry.create('test:value')
+ self.assertIsInstance(client, self._TestStreamClient)
+ self.assertEqual(client.value, 'value')
+
+ def testTextStream(self):
+ client = self._registry.create('test:value')
+ with client.text('mystream') as fd:
+ fd.write('text\nstream\nlines')
+
+ conn = client.last_conn
+ self.assertTrue(conn.closed)
+
+ header, data = conn.interpret()
+ self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
+ self.assertEqual(data, 'text\nstream\nlines')
+
+ def testTextStreamWithParams(self):
+ client = self._registry.create('test:value')
+ with client.text('mystream', content_type='foo/bar',
+ tee=stream.StreamParams.TEE_STDOUT,
+ tags={'foo': 'bar', 'baz': 'qux'}) as fd:
+ fd.write('text!')
+
+ conn = client.last_conn
+ self.assertTrue(conn.closed)
+
+ header, data = conn.interpret()
+ self.assertEqual(header, {
+ 'name': 'mystream',
+ 'type': 'text',
+ 'contentType': 'foo/bar',
+ 'tee': 'stdout',
+ 'tags': {'foo': 'bar', 'baz': 'qux'},
+ })
+ self.assertEqual(data, 'text!')
+
+ def testBinaryStream(self):
+ client = self._registry.create('test:value')
+ with client.binary('mystream') as fd:
+ fd.write('\x60\x0d\xd0\x65')
+
+ conn = client.last_conn
+ self.assertTrue(conn.closed)
+
+ header, data = conn.interpret()
+ self.assertEqual(header, {'name': 'mystream', 'type': 'binary'})
+ self.assertEqual(data, '\x60\x0d\xd0\x65')
+
+ def testDatagramStream(self):
+ client = self._registry.create('test:value')
+ with client.datagram('mystream') as fd:
+ fd.send('datagram0')
+ fd.send('dg1')
+ fd.send('')
+ fd.send('dg3')
+
+ conn = client.last_conn
+ self.assertTrue(conn.closed)
+
+ header, data = conn.interpret()
+ self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'})
+ self.assertEqual(list(self._split_datagrams(data)),
+ ['datagram0', 'dg1', '', 'dg3'])
+
+ def testCreatingDuplicateStreamNameRaisesValueError(self):
+ client = self._registry.create('test:value')
+ with client.text('mystream') as fd:
+ fd.write('Using a text stream.')
+
+ with self.assertRaises(ValueError):
+ with client.text('mystream') as fd:
+ fd.write('Should not work.')
+
+ conn = client.last_conn
+ self.assertTrue(conn.closed)
+
+ header, data = conn.interpret()
+ self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
+ self.assertEqual(data, 'Using a text stream.')
+
+
+if __name__ == '__main__':
+ unittest.main()
« no previous file with comments | « client/libs/logdog/tests/bootstrap_test.py ('k') | client/libs/logdog/tests/streamname_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698