Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: client/libs/logdog/tests/stream_test.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed by the Apache v2.0 license that can be
3 # found in the LICENSE file.
4
5 import unittest
6 import itertools
7 import json
8 import StringIO
9
10 from client.libs.logdog import stream, varint
11
12
13 class StreamParamsTestCase(unittest.TestCase):
14
15 def setUp(self):
16 self.params = stream.StreamParams(
17 'name',
18 type=stream.StreamParams.TEXT,
19 content_type='content-type',
20 tags={
21 'foo': 'bar',
22 'baz': 'qux',
23 },
24 tee=stream.StreamParams.TEE_STDOUT,
25 binary_file_extension='ext')
26
27 def testParamsToJson(self):
28 self.assertEqual(self.params.to_json(),
29 ('{"binaryFileExtension": "ext", "contentType": "content-type", '
30 '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, '
31 '"tee": "stdout", "type": "text"}'))
32
33 def testParamsToJsonWithEmpties(self):
34 params = self.params._replace(
35 content_type=None,
36 tags=None,
37 tee=None,
38 binary_file_extension=None,
39 )
40 self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}')
41
42 def testParamsWithInvalidTypeRaisesValueError(self):
43 params = self.params._replace(type=None)
44 self.assertRaises(ValueError, params.to_json)
45
46 def testParamsWithInvalidTeeTypeRaisesValueError(self):
47 params = self.params._replace(tee='somewhere')
48 self.assertRaises(ValueError, params.to_json)
49
50 def testParamsWithInvalidTagRaisesValueError(self):
51 params = self.params._replace(tags='foo')
52 self.assertRaises(ValueError, params.to_json)
53
54 params = self.params._replace(tags={'!!! invalid tag key !!!': 'bar'})
55 self.assertRaises(ValueError, params.to_json)
56
57
58 class StreamClientTestCase(unittest.TestCase):
59
60 class _TestStreamClientConnection(object):
61
62 def __init__(self):
63 self.buffer = StringIO.StringIO()
64 self.closed = False
65
66 def _assert_not_closed(self):
67 if self.closed:
68 raise Exception('Connection is closed.')
69
70 def write(self, v):
71 self._assert_not_closed()
72 self.buffer.write(v)
73
74 def close(self):
75 self._assert_not_closed()
76 self.closed = True
77
78 def interpret(self):
79 data = StringIO.StringIO(self.buffer.getvalue())
80 length, _ = varint.read_uvarint(data)
81 header = data.read(length)
82 return json.loads(header), data.read()
83
84 class _TestStreamClient(stream.StreamClient):
85 def __init__(self, value):
86 super(StreamClientTestCase._TestStreamClient, self).__init__()
87 self.value = value
88 self.last_conn = None
89
90 @classmethod
91 def _create(cls, value):
92 return cls(value)
93
94 def _connect_raw(self):
95 conn = StreamClientTestCase._TestStreamClientConnection()
96 self.last_conn = conn
97 return conn
98
99 def setUp(self):
100 self._registry = stream.StreamProtocolRegistry()
101 self._registry.register_protocol('test', self._TestStreamClient)
102
103 @staticmethod
104 def _split_datagrams(value):
105 sio = StringIO.StringIO(value)
106 while sio.pos < sio.len:
107 size_prefix, _ = varint.read_uvarint(sio)
108 data = sio.read(size_prefix)
109 if len(data) != size_prefix:
110 raise ValueError('Expected %d bytes, but only got %d' % (
111 size_prefix, len(data)))
112 yield data
113
114 def testClientInstantiation(self):
115 client = self._registry.create('test:value')
116 self.assertIsInstance(client, self._TestStreamClient)
117 self.assertEqual(client.value, 'value')
118
119 def testTextStream(self):
120 client = self._registry.create('test:value')
121 with client.text('mystream') as fd:
122 fd.write('text\nstream\nlines')
123
124 conn = client.last_conn
125 self.assertTrue(conn.closed)
126
127 header, data = conn.interpret()
128 self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
129 self.assertEqual(data, 'text\nstream\nlines')
130
131 def testTextStreamWithParams(self):
132 client = self._registry.create('test:value')
133 with client.text('mystream', content_type='foo/bar',
134 tee=stream.StreamParams.TEE_STDOUT,
135 tags={'foo': 'bar', 'baz': 'qux'}) as fd:
136 fd.write('text!')
137
138 conn = client.last_conn
139 self.assertTrue(conn.closed)
140
141 header, data = conn.interpret()
142 self.assertEqual(header, {
143 'name': 'mystream',
144 'type': 'text',
145 'contentType': 'foo/bar',
146 'tee': 'stdout',
147 'tags': {'foo': 'bar', 'baz': 'qux'},
148 })
149 self.assertEqual(data, 'text!')
150
151 def testBinaryStream(self):
152 client = self._registry.create('test:value')
153 with client.binary('mystream') as fd:
154 fd.write('\x60\x0d\xd0\x65')
155
156 conn = client.last_conn
157 self.assertTrue(conn.closed)
158
159 header, data = conn.interpret()
160 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'})
161 self.assertEqual(data, '\x60\x0d\xd0\x65')
162
163 def testDatagramStream(self):
164 client = self._registry.create('test:value')
165 with client.datagram('mystream') as fd:
166 fd.send('datagram0')
167 fd.send('dg1')
168 fd.send('')
169 fd.send('dg3')
170
171 conn = client.last_conn
172 self.assertTrue(conn.closed)
173
174 header, data = conn.interpret()
175 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'})
176 self.assertEqual(list(self._split_datagrams(data)),
177 ['datagram0', 'dg1', '', 'dg3'])
178
179 def testCreatingDuplicateStreamNameRaisesValueError(self):
180 client = self._registry.create('test:value')
181 with client.text('mystream') as fd:
182 fd.write('Using a text stream.')
183
184 with self.assertRaises(ValueError):
185 with client.text('mystream') as fd:
186 fd.write('Should not work.')
187
188 conn = client.last_conn
189 self.assertTrue(conn.closed)
190
191 header, data = conn.interpret()
192 self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
193 self.assertEqual(data, 'Using a text stream.')
194
195
196 if __name__ == '__main__':
197 unittest.main()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698