Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: client/libs/logdog/tests/stream_test.py

Issue 1961603002: Add LogDog Python client library. (Closed) Base URL: https://github.com/luci/luci-py@master
Patch Set: Remove "client." from package name, make pylint happy. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/libs/logdog/tests/bootstrap_test.py ('k') | client/libs/logdog/tests/streamname_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2016 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed by the Apache v2.0 license that can be
4 # found in the LICENSE file.
5
6 import json
7 import os
8 import sys
9 import unittest
10 import StringIO
11
12 ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join(
13 __file__, os.pardir, os.pardir, os.pardir)))
14 sys.path.insert(0, ROOT_DIR)
15
16 from libs.logdog import stream, varint
17
18
19 class StreamParamsTestCase(unittest.TestCase):
20
21 def setUp(self):
22 self.params = stream.StreamParams(
23 'name',
24 type=stream.StreamParams.TEXT,
25 content_type='content-type',
26 tags={
27 'foo': 'bar',
28 'baz': 'qux',
29 },
30 tee=stream.StreamParams.TEE_STDOUT,
31 binary_file_extension='ext')
32
33 def testParamsToJson(self):
34 self.assertEqual(self.params.to_json(),
35 ('{"binaryFileExtension": "ext", "contentType": "content-type", '
36 '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, '
37 '"tee": "stdout", "type": "text"}'))
38
39 def testParamsToJsonWithEmpties(self):
40 params = self.params._replace(
41 content_type=None,
42 tags=None,
43 tee=None,
44 binary_file_extension=None,
45 )
46 self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}')
47
48 def testParamsWithInvalidTypeRaisesValueError(self):
49 params = self.params._replace(type=None)
50 self.assertRaises(ValueError, params.to_json)
51
52 def testParamsWithInvalidTeeTypeRaisesValueError(self):
53 params = self.params._replace(tee='somewhere')
54 self.assertRaises(ValueError, params.to_json)
55
56 def testParamsWithInvalidTagRaisesValueError(self):
57 params = self.params._replace(tags='foo')
58 self.assertRaises(ValueError, params.to_json)
59
60 params = self.params._replace(tags={'!!! invalid tag key !!!': 'bar'})
61 self.assertRaises(ValueError, params.to_json)
62
63
64 class StreamClientTestCase(unittest.TestCase):
65
66 class _TestStreamClientConnection(object):
67
68 def __init__(self):
69 self.buffer = StringIO.StringIO()
70 self.closed = False
71
72 def _assert_not_closed(self):
73 if self.closed:
74 raise Exception('Connection is closed.')
75
76 def write(self, v):
77 self._assert_not_closed()
78 self.buffer.write(v)
79
80 def close(self):
81 self._assert_not_closed()
82 self.closed = True
83
84 def interpret(self):
85 data = StringIO.StringIO(self.buffer.getvalue())
86 length, _ = varint.read_uvarint(data)
87 header = data.read(length)
88 return json.loads(header), data.read()
89
90 class _TestStreamClient(stream.StreamClient):
91 def __init__(self, value):
92 super(StreamClientTestCase._TestStreamClient, self).__init__()
93 self.value = value
94 self.last_conn = None
95
96 @classmethod
97 def _create(cls, value):
98 return cls(value)
99
100 def _connect_raw(self):
101 conn = StreamClientTestCase._TestStreamClientConnection()
102 self.last_conn = conn
103 return conn
104
105 def setUp(self):
106 self._registry = stream.StreamProtocolRegistry()
107 self._registry.register_protocol('test', self._TestStreamClient)
108
109 @staticmethod
110 def _split_datagrams(value):
111 sio = StringIO.StringIO(value)
112 while sio.pos < sio.len:
113 size_prefix, _ = varint.read_uvarint(sio)
114 data = sio.read(size_prefix)
115 if len(data) != size_prefix:
116 raise ValueError('Expected %d bytes, but only got %d' % (
117 size_prefix, len(data)))
118 yield data
119
120 def testClientInstantiation(self):
121 client = self._registry.create('test:value')
122 self.assertIsInstance(client, self._TestStreamClient)
123 self.assertEqual(client.value, 'value')
124
125 def testTextStream(self):
126 client = self._registry.create('test:value')
127 with client.text('mystream') as fd:
128 fd.write('text\nstream\nlines')
129
130 conn = client.last_conn
131 self.assertTrue(conn.closed)
132
133 header, data = conn.interpret()
134 self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
135 self.assertEqual(data, 'text\nstream\nlines')
136
137 def testTextStreamWithParams(self):
138 client = self._registry.create('test:value')
139 with client.text('mystream', content_type='foo/bar',
140 tee=stream.StreamParams.TEE_STDOUT,
141 tags={'foo': 'bar', 'baz': 'qux'}) as fd:
142 fd.write('text!')
143
144 conn = client.last_conn
145 self.assertTrue(conn.closed)
146
147 header, data = conn.interpret()
148 self.assertEqual(header, {
149 'name': 'mystream',
150 'type': 'text',
151 'contentType': 'foo/bar',
152 'tee': 'stdout',
153 'tags': {'foo': 'bar', 'baz': 'qux'},
154 })
155 self.assertEqual(data, 'text!')
156
157 def testBinaryStream(self):
158 client = self._registry.create('test:value')
159 with client.binary('mystream') as fd:
160 fd.write('\x60\x0d\xd0\x65')
161
162 conn = client.last_conn
163 self.assertTrue(conn.closed)
164
165 header, data = conn.interpret()
166 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'})
167 self.assertEqual(data, '\x60\x0d\xd0\x65')
168
169 def testDatagramStream(self):
170 client = self._registry.create('test:value')
171 with client.datagram('mystream') as fd:
172 fd.send('datagram0')
173 fd.send('dg1')
174 fd.send('')
175 fd.send('dg3')
176
177 conn = client.last_conn
178 self.assertTrue(conn.closed)
179
180 header, data = conn.interpret()
181 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'})
182 self.assertEqual(list(self._split_datagrams(data)),
183 ['datagram0', 'dg1', '', 'dg3'])
184
185 def testCreatingDuplicateStreamNameRaisesValueError(self):
186 client = self._registry.create('test:value')
187 with client.text('mystream') as fd:
188 fd.write('Using a text stream.')
189
190 with self.assertRaises(ValueError):
191 with client.text('mystream') as fd:
192 fd.write('Should not work.')
193
194 conn = client.last_conn
195 self.assertTrue(conn.closed)
196
197 header, data = conn.interpret()
198 self.assertEqual(header, {'name': 'mystream', 'type': 'text'})
199 self.assertEqual(data, 'Using a text stream.')
200
201
202 if __name__ == '__main__':
203 unittest.main()
OLDNEW
« no previous file with comments | « client/libs/logdog/tests/bootstrap_test.py ('k') | client/libs/logdog/tests/streamname_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698