OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
5 | 5 |
6 import json | 6 import json |
7 import os | 7 import os |
8 import sys | 8 import sys |
9 import unittest | 9 import unittest |
10 import StringIO | 10 import StringIO |
11 | 11 |
12 ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join( | 12 ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join( |
13 __file__.decode(sys.getfilesystemencoding()), | 13 __file__.decode(sys.getfilesystemencoding()), |
14 os.pardir, os.pardir, os.pardir))) | 14 os.pardir, os.pardir, os.pardir))) |
15 sys.path.insert(0, ROOT_DIR) | 15 sys.path.insert(0, ROOT_DIR) |
16 | 16 |
17 from libs.logdog import stream, varint | 17 from libs.logdog import stream, streamname, varint |
18 | 18 |
19 | 19 |
20 class StreamParamsTestCase(unittest.TestCase): | 20 class StreamParamsTestCase(unittest.TestCase): |
21 | 21 |
22 def setUp(self): | 22 def setUp(self): |
23 self.params = stream.StreamParams( | 23 self.params = stream.StreamParams( |
24 'name', | 24 'name', |
25 type=stream.StreamParams.TEXT, | 25 type=stream.StreamParams.TEXT, |
26 content_type='content-type', | 26 content_type='content-type', |
27 tags={ | 27 tags={ |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 data = StringIO.StringIO(self.buffer.getvalue()) | 86 data = StringIO.StringIO(self.buffer.getvalue()) |
87 magic = data.read(len(stream.BUTLER_MAGIC)) | 87 magic = data.read(len(stream.BUTLER_MAGIC)) |
88 if magic != stream.BUTLER_MAGIC: | 88 if magic != stream.BUTLER_MAGIC: |
89 raise ValueError('Invalid magic value ([%s] != [%s])' % ( | 89 raise ValueError('Invalid magic value ([%s] != [%s])' % ( |
90 magic, stream.BUTLER_MAGIC)) | 90 magic, stream.BUTLER_MAGIC)) |
91 length, _ = varint.read_uvarint(data) | 91 length, _ = varint.read_uvarint(data) |
92 header = data.read(length) | 92 header = data.read(length) |
93 return json.loads(header), data.read() | 93 return json.loads(header), data.read() |
94 | 94 |
95 class _TestStreamClient(stream.StreamClient): | 95 class _TestStreamClient(stream.StreamClient): |
96 def __init__(self, value): | 96 def __init__(self, value, **kwargs): |
97 super(StreamClientTestCase._TestStreamClient, self).__init__() | 97 super(StreamClientTestCase._TestStreamClient, self).__init__(**kwargs) |
98 self.value = value | 98 self.value = value |
99 self.last_conn = None | 99 self.last_conn = None |
100 | 100 |
101 @classmethod | 101 @classmethod |
102 def _create(cls, value): | 102 def _create(cls, value, **kwargs): |
103 return cls(value) | 103 return cls(value, **kwargs) |
104 | 104 |
105 def _connect_raw(self): | 105 def _connect_raw(self): |
106 conn = StreamClientTestCase._TestStreamClientConnection() | 106 conn = StreamClientTestCase._TestStreamClientConnection() |
107 self.last_conn = conn | 107 self.last_conn = conn |
108 return conn | 108 return conn |
109 | 109 |
110 def setUp(self): | 110 def setUp(self): |
111 self._registry = stream.StreamProtocolRegistry() | 111 self._registry = stream.StreamProtocolRegistry() |
112 self._registry.register_protocol('test', self._TestStreamClient) | 112 self._registry.register_protocol('test', self._TestStreamClient) |
113 | 113 |
114 @staticmethod | 114 @staticmethod |
115 def _split_datagrams(value): | 115 def _split_datagrams(value): |
116 sio = StringIO.StringIO(value) | 116 sio = StringIO.StringIO(value) |
117 while sio.pos < sio.len: | 117 while sio.pos < sio.len: |
118 size_prefix, _ = varint.read_uvarint(sio) | 118 size_prefix, _ = varint.read_uvarint(sio) |
119 data = sio.read(size_prefix) | 119 data = sio.read(size_prefix) |
120 if len(data) != size_prefix: | 120 if len(data) != size_prefix: |
121 raise ValueError('Expected %d bytes, but only got %d' % ( | 121 raise ValueError('Expected %d bytes, but only got %d' % ( |
122 size_prefix, len(data))) | 122 size_prefix, len(data))) |
123 yield data | 123 yield data |
124 | 124 |
125 def testClientInstantiation(self): | 125 def testClientInstantiation(self): |
126 client = self._registry.create('test:value') | 126 client = self._registry.create('test:value') |
127 self.assertIsInstance(client, self._TestStreamClient) | 127 self.assertIsInstance(client, self._TestStreamClient) |
128 self.assertEqual(client.value, 'value') | 128 self.assertEqual(client.value, 'value') |
129 | 129 |
130 def testTextStream(self): | 130 def testTextStream(self): |
131 client = self._registry.create('test:value') | 131 client = self._registry.create('test:value', |
| 132 project='test', |
| 133 prefix='foo/bar', |
| 134 coordinator_host='example.appspot.com') |
132 with client.text('mystream') as fd: | 135 with client.text('mystream') as fd: |
| 136 self.assertEqual( |
| 137 fd.path, |
| 138 streamname.StreamPath(prefix='foo/bar', name='mystream')) |
| 139 self.assertEqual( |
| 140 fd.get_viewer_url(), |
| 141 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream') |
133 fd.write('text\nstream\nlines') | 142 fd.write('text\nstream\nlines') |
134 | 143 |
135 conn = client.last_conn | 144 conn = client.last_conn |
136 self.assertTrue(conn.closed) | 145 self.assertTrue(conn.closed) |
137 | 146 |
138 header, data = conn.interpret() | 147 header, data = conn.interpret() |
139 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) | 148 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
140 self.assertEqual(data, 'text\nstream\nlines') | 149 self.assertEqual(data, 'text\nstream\nlines') |
141 | 150 |
142 def testTextStreamWithParams(self): | 151 def testTextStreamWithParams(self): |
143 client = self._registry.create('test:value') | 152 client = self._registry.create('test:value') |
144 with client.text('mystream', content_type='foo/bar', | 153 with client.text('mystream', content_type='foo/bar', |
145 tee=stream.StreamParams.TEE_STDOUT, | 154 tee=stream.StreamParams.TEE_STDOUT, |
146 tags={'foo': 'bar', 'baz': 'qux'}) as fd: | 155 tags={'foo': 'bar', 'baz': 'qux'}) as fd: |
| 156 self.assertEqual( |
| 157 fd.params, |
| 158 stream.StreamParams.make( |
| 159 name='mystream', |
| 160 type=stream.StreamParams.TEXT, |
| 161 content_type='foo/bar', |
| 162 tee=stream.StreamParams.TEE_STDOUT, |
| 163 tags={'foo': 'bar', 'baz': 'qux'})) |
147 fd.write('text!') | 164 fd.write('text!') |
148 | 165 |
149 conn = client.last_conn | 166 conn = client.last_conn |
150 self.assertTrue(conn.closed) | 167 self.assertTrue(conn.closed) |
151 | 168 |
152 header, data = conn.interpret() | 169 header, data = conn.interpret() |
153 self.assertEqual(header, { | 170 self.assertEqual(header, { |
154 'name': 'mystream', | 171 'name': 'mystream', |
155 'type': 'text', | 172 'type': 'text', |
156 'contentType': 'foo/bar', | 173 'contentType': 'foo/bar', |
157 'tee': 'stdout', | 174 'tee': 'stdout', |
158 'tags': {'foo': 'bar', 'baz': 'qux'}, | 175 'tags': {'foo': 'bar', 'baz': 'qux'}, |
159 }) | 176 }) |
160 self.assertEqual(data, 'text!') | 177 self.assertEqual(data, 'text!') |
161 | 178 |
162 def testBinaryStream(self): | 179 def testBinaryStream(self): |
163 client = self._registry.create('test:value') | 180 client = self._registry.create('test:value', |
| 181 project='test', |
| 182 prefix='foo/bar', |
| 183 coordinator_host='example.appspot.com') |
164 with client.binary('mystream') as fd: | 184 with client.binary('mystream') as fd: |
| 185 self.assertEqual( |
| 186 fd.path, |
| 187 streamname.StreamPath(prefix='foo/bar', name='mystream')) |
| 188 self.assertEqual( |
| 189 fd.get_viewer_url(), |
| 190 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream') |
165 fd.write('\x60\x0d\xd0\x65') | 191 fd.write('\x60\x0d\xd0\x65') |
166 | 192 |
167 conn = client.last_conn | 193 conn = client.last_conn |
168 self.assertTrue(conn.closed) | 194 self.assertTrue(conn.closed) |
169 | 195 |
170 header, data = conn.interpret() | 196 header, data = conn.interpret() |
171 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'}) | 197 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'}) |
172 self.assertEqual(data, '\x60\x0d\xd0\x65') | 198 self.assertEqual(data, '\x60\x0d\xd0\x65') |
173 | 199 |
174 def testDatagramStream(self): | 200 def testDatagramStream(self): |
175 client = self._registry.create('test:value') | 201 client = self._registry.create('test:value', |
| 202 project='test', |
| 203 prefix='foo/bar', |
| 204 coordinator_host='example.appspot.com') |
176 with client.datagram('mystream') as fd: | 205 with client.datagram('mystream') as fd: |
| 206 self.assertEqual( |
| 207 fd.path, |
| 208 streamname.StreamPath(prefix='foo/bar', name='mystream')) |
| 209 self.assertEqual( |
| 210 fd.get_viewer_url(), |
| 211 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream') |
177 fd.send('datagram0') | 212 fd.send('datagram0') |
178 fd.send('dg1') | 213 fd.send('dg1') |
179 fd.send('') | 214 fd.send('') |
180 fd.send('dg3') | 215 fd.send('dg3') |
181 | 216 |
182 conn = client.last_conn | 217 conn = client.last_conn |
183 self.assertTrue(conn.closed) | 218 self.assertTrue(conn.closed) |
184 | 219 |
185 header, data = conn.interpret() | 220 header, data = conn.interpret() |
186 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'}) | 221 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'}) |
187 self.assertEqual(list(self._split_datagrams(data)), | 222 self.assertEqual(list(self._split_datagrams(data)), |
188 ['datagram0', 'dg1', '', 'dg3']) | 223 ['datagram0', 'dg1', '', 'dg3']) |
189 | 224 |
| 225 def testStreamWithoutPrefixCannotGenerateUrls(self): |
| 226 client = self._registry.create('test:value', |
| 227 coordinator_host='example.appspot.com') |
| 228 with client.text('mystream') as fd: |
| 229 self.assertRaises(KeyError, fd.get_viewer_url) |
| 230 |
| 231 def testStreamWithoutInvalidPrefixCannotGenerateUrls(self): |
| 232 client = self._registry.create('test:value', |
| 233 project='test', |
| 234 prefix='!!! invalid !!!', |
| 235 coordinator_host='example.appspot.com') |
| 236 with client.text('mystream') as fd: |
| 237 self.assertRaises(ValueError, fd.get_viewer_url) |
| 238 |
| 239 def testStreamWithoutProjectCannotGenerateUrls(self): |
| 240 client = self._registry.create('test:value', |
| 241 prefix='foo/bar', |
| 242 coordinator_host='example.appspot.com') |
| 243 with client.text('mystream') as fd: |
| 244 self.assertRaises(KeyError, fd.get_viewer_url) |
| 245 |
| 246 def testStreamWithoutCoordinatorHostCannotGenerateUrls(self): |
| 247 client = self._registry.create('test:value', |
| 248 project='test', |
| 249 prefix='foo/bar') |
| 250 with client.text('mystream') as fd: |
| 251 self.assertRaises(KeyError, fd.get_viewer_url) |
| 252 |
| 253 |
190 def testCreatingDuplicateStreamNameRaisesValueError(self): | 254 def testCreatingDuplicateStreamNameRaisesValueError(self): |
191 client = self._registry.create('test:value') | 255 client = self._registry.create('test:value') |
192 with client.text('mystream') as fd: | 256 with client.text('mystream') as fd: |
193 fd.write('Using a text stream.') | 257 fd.write('Using a text stream.') |
194 | 258 |
195 with self.assertRaises(ValueError): | 259 with self.assertRaises(ValueError): |
196 with client.text('mystream') as fd: | 260 with client.text('mystream') as fd: |
197 fd.write('Should not work.') | 261 fd.write('Should not work.') |
198 | 262 |
199 conn = client.last_conn | 263 conn = client.last_conn |
200 self.assertTrue(conn.closed) | 264 self.assertTrue(conn.closed) |
201 | 265 |
202 header, data = conn.interpret() | 266 header, data = conn.interpret() |
203 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) | 267 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
204 self.assertEqual(data, 'Using a text stream.') | 268 self.assertEqual(data, 'Using a text stream.') |
205 | 269 |
206 | 270 |
207 if __name__ == '__main__': | 271 if __name__ == '__main__': |
208 unittest.main() | 272 unittest.main() |
OLD | NEW |