| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright 2016 The LUCI Authors. All rights reserved. | 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 import json | 6 import json |
| 7 import os | 7 import os |
| 8 import sys | 8 import sys |
| 9 import unittest | 9 import unittest |
| 10 import StringIO | 10 import StringIO |
| 11 | 11 |
| 12 ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join( | 12 ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join( |
| 13 __file__.decode(sys.getfilesystemencoding()), | 13 __file__.decode(sys.getfilesystemencoding()), |
| 14 os.pardir, os.pardir, os.pardir))) | 14 os.pardir, os.pardir, os.pardir))) |
| 15 sys.path.insert(0, ROOT_DIR) | 15 sys.path.insert(0, ROOT_DIR) |
| 16 | 16 |
| 17 from libs.logdog import stream, varint | 17 from libs.logdog import stream, streamname, varint |
| 18 | 18 |
| 19 | 19 |
| 20 class StreamParamsTestCase(unittest.TestCase): | 20 class StreamParamsTestCase(unittest.TestCase): |
| 21 | 21 |
| 22 def setUp(self): | 22 def setUp(self): |
| 23 self.params = stream.StreamParams( | 23 self.params = stream.StreamParams( |
| 24 'name', | 24 'name', |
| 25 type=stream.StreamParams.TEXT, | 25 type=stream.StreamParams.TEXT, |
| 26 content_type='content-type', | 26 content_type='content-type', |
| 27 tags={ | 27 tags={ |
| (...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 86 data = StringIO.StringIO(self.buffer.getvalue()) | 86 data = StringIO.StringIO(self.buffer.getvalue()) |
| 87 magic = data.read(len(stream.BUTLER_MAGIC)) | 87 magic = data.read(len(stream.BUTLER_MAGIC)) |
| 88 if magic != stream.BUTLER_MAGIC: | 88 if magic != stream.BUTLER_MAGIC: |
| 89 raise ValueError('Invalid magic value ([%s] != [%s])' % ( | 89 raise ValueError('Invalid magic value ([%s] != [%s])' % ( |
| 90 magic, stream.BUTLER_MAGIC)) | 90 magic, stream.BUTLER_MAGIC)) |
| 91 length, _ = varint.read_uvarint(data) | 91 length, _ = varint.read_uvarint(data) |
| 92 header = data.read(length) | 92 header = data.read(length) |
| 93 return json.loads(header), data.read() | 93 return json.loads(header), data.read() |
| 94 | 94 |
| 95 class _TestStreamClient(stream.StreamClient): | 95 class _TestStreamClient(stream.StreamClient): |
| 96 def __init__(self, value): | 96 def __init__(self, value, **kwargs): |
| 97 super(StreamClientTestCase._TestStreamClient, self).__init__() | 97 super(StreamClientTestCase._TestStreamClient, self).__init__(**kwargs) |
| 98 self.value = value | 98 self.value = value |
| 99 self.last_conn = None | 99 self.last_conn = None |
| 100 | 100 |
| 101 @classmethod | 101 @classmethod |
| 102 def _create(cls, value): | 102 def _create(cls, value, **kwargs): |
| 103 return cls(value) | 103 return cls(value, **kwargs) |
| 104 | 104 |
| 105 def _connect_raw(self): | 105 def _connect_raw(self): |
| 106 conn = StreamClientTestCase._TestStreamClientConnection() | 106 conn = StreamClientTestCase._TestStreamClientConnection() |
| 107 self.last_conn = conn | 107 self.last_conn = conn |
| 108 return conn | 108 return conn |
| 109 | 109 |
| 110 def setUp(self): | 110 def setUp(self): |
| 111 self._registry = stream.StreamProtocolRegistry() | 111 self._registry = stream.StreamProtocolRegistry() |
| 112 self._registry.register_protocol('test', self._TestStreamClient) | 112 self._registry.register_protocol('test', self._TestStreamClient) |
| 113 | 113 |
| 114 @staticmethod | 114 @staticmethod |
| 115 def _split_datagrams(value): | 115 def _split_datagrams(value): |
| 116 sio = StringIO.StringIO(value) | 116 sio = StringIO.StringIO(value) |
| 117 while sio.pos < sio.len: | 117 while sio.pos < sio.len: |
| 118 size_prefix, _ = varint.read_uvarint(sio) | 118 size_prefix, _ = varint.read_uvarint(sio) |
| 119 data = sio.read(size_prefix) | 119 data = sio.read(size_prefix) |
| 120 if len(data) != size_prefix: | 120 if len(data) != size_prefix: |
| 121 raise ValueError('Expected %d bytes, but only got %d' % ( | 121 raise ValueError('Expected %d bytes, but only got %d' % ( |
| 122 size_prefix, len(data))) | 122 size_prefix, len(data))) |
| 123 yield data | 123 yield data |
| 124 | 124 |
| 125 def testClientInstantiation(self): | 125 def testClientInstantiation(self): |
| 126 client = self._registry.create('test:value') | 126 client = self._registry.create('test:value') |
| 127 self.assertIsInstance(client, self._TestStreamClient) | 127 self.assertIsInstance(client, self._TestStreamClient) |
| 128 self.assertEqual(client.value, 'value') | 128 self.assertEqual(client.value, 'value') |
| 129 | 129 |
| 130 def testTextStream(self): | 130 def testTextStream(self): |
| 131 client = self._registry.create('test:value') | 131 client = self._registry.create('test:value', |
| 132 project='test', |
| 133 prefix='foo/bar', |
| 134 coordinator_host='example.appspot.com') |
| 132 with client.text('mystream') as fd: | 135 with client.text('mystream') as fd: |
| 136 self.assertEqual( |
| 137 fd.path, |
| 138 streamname.StreamPath(prefix='foo/bar', name='mystream')) |
| 139 self.assertEqual( |
| 140 fd.get_viewer_url(), |
| 141 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream') |
| 133 fd.write('text\nstream\nlines') | 142 fd.write('text\nstream\nlines') |
| 134 | 143 |
| 135 conn = client.last_conn | 144 conn = client.last_conn |
| 136 self.assertTrue(conn.closed) | 145 self.assertTrue(conn.closed) |
| 137 | 146 |
| 138 header, data = conn.interpret() | 147 header, data = conn.interpret() |
| 139 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) | 148 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
| 140 self.assertEqual(data, 'text\nstream\nlines') | 149 self.assertEqual(data, 'text\nstream\nlines') |
| 141 | 150 |
| 142 def testTextStreamWithParams(self): | 151 def testTextStreamWithParams(self): |
| 143 client = self._registry.create('test:value') | 152 client = self._registry.create('test:value') |
| 144 with client.text('mystream', content_type='foo/bar', | 153 with client.text('mystream', content_type='foo/bar', |
| 145 tee=stream.StreamParams.TEE_STDOUT, | 154 tee=stream.StreamParams.TEE_STDOUT, |
| 146 tags={'foo': 'bar', 'baz': 'qux'}) as fd: | 155 tags={'foo': 'bar', 'baz': 'qux'}) as fd: |
| 156 self.assertEqual( |
| 157 fd.params, |
| 158 stream.StreamParams.make( |
| 159 name='mystream', |
| 160 type=stream.StreamParams.TEXT, |
| 161 content_type='foo/bar', |
| 162 tee=stream.StreamParams.TEE_STDOUT, |
| 163 tags={'foo': 'bar', 'baz': 'qux'})) |
| 147 fd.write('text!') | 164 fd.write('text!') |
| 148 | 165 |
| 149 conn = client.last_conn | 166 conn = client.last_conn |
| 150 self.assertTrue(conn.closed) | 167 self.assertTrue(conn.closed) |
| 151 | 168 |
| 152 header, data = conn.interpret() | 169 header, data = conn.interpret() |
| 153 self.assertEqual(header, { | 170 self.assertEqual(header, { |
| 154 'name': 'mystream', | 171 'name': 'mystream', |
| 155 'type': 'text', | 172 'type': 'text', |
| 156 'contentType': 'foo/bar', | 173 'contentType': 'foo/bar', |
| 157 'tee': 'stdout', | 174 'tee': 'stdout', |
| 158 'tags': {'foo': 'bar', 'baz': 'qux'}, | 175 'tags': {'foo': 'bar', 'baz': 'qux'}, |
| 159 }) | 176 }) |
| 160 self.assertEqual(data, 'text!') | 177 self.assertEqual(data, 'text!') |
| 161 | 178 |
| 162 def testBinaryStream(self): | 179 def testBinaryStream(self): |
| 163 client = self._registry.create('test:value') | 180 client = self._registry.create('test:value', |
| 181 project='test', |
| 182 prefix='foo/bar', |
| 183 coordinator_host='example.appspot.com') |
| 164 with client.binary('mystream') as fd: | 184 with client.binary('mystream') as fd: |
| 185 self.assertEqual( |
| 186 fd.path, |
| 187 streamname.StreamPath(prefix='foo/bar', name='mystream')) |
| 188 self.assertEqual( |
| 189 fd.get_viewer_url(), |
| 190 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream') |
| 165 fd.write('\x60\x0d\xd0\x65') | 191 fd.write('\x60\x0d\xd0\x65') |
| 166 | 192 |
| 167 conn = client.last_conn | 193 conn = client.last_conn |
| 168 self.assertTrue(conn.closed) | 194 self.assertTrue(conn.closed) |
| 169 | 195 |
| 170 header, data = conn.interpret() | 196 header, data = conn.interpret() |
| 171 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'}) | 197 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'}) |
| 172 self.assertEqual(data, '\x60\x0d\xd0\x65') | 198 self.assertEqual(data, '\x60\x0d\xd0\x65') |
| 173 | 199 |
| 174 def testDatagramStream(self): | 200 def testDatagramStream(self): |
| 175 client = self._registry.create('test:value') | 201 client = self._registry.create('test:value', |
| 202 project='test', |
| 203 prefix='foo/bar', |
| 204 coordinator_host='example.appspot.com') |
| 176 with client.datagram('mystream') as fd: | 205 with client.datagram('mystream') as fd: |
| 206 self.assertEqual( |
| 207 fd.path, |
| 208 streamname.StreamPath(prefix='foo/bar', name='mystream')) |
| 209 self.assertEqual( |
| 210 fd.get_viewer_url(), |
| 211 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream') |
| 177 fd.send('datagram0') | 212 fd.send('datagram0') |
| 178 fd.send('dg1') | 213 fd.send('dg1') |
| 179 fd.send('') | 214 fd.send('') |
| 180 fd.send('dg3') | 215 fd.send('dg3') |
| 181 | 216 |
| 182 conn = client.last_conn | 217 conn = client.last_conn |
| 183 self.assertTrue(conn.closed) | 218 self.assertTrue(conn.closed) |
| 184 | 219 |
| 185 header, data = conn.interpret() | 220 header, data = conn.interpret() |
| 186 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'}) | 221 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'}) |
| 187 self.assertEqual(list(self._split_datagrams(data)), | 222 self.assertEqual(list(self._split_datagrams(data)), |
| 188 ['datagram0', 'dg1', '', 'dg3']) | 223 ['datagram0', 'dg1', '', 'dg3']) |
| 189 | 224 |
| 225 def testStreamWithoutPrefixCannotGenerateUrls(self): |
| 226 client = self._registry.create('test:value', |
| 227 coordinator_host='example.appspot.com') |
| 228 with client.text('mystream') as fd: |
| 229 self.assertRaises(KeyError, fd.get_viewer_url) |
| 230 |
| 231 def testStreamWithoutInvalidPrefixCannotGenerateUrls(self): |
| 232 client = self._registry.create('test:value', |
| 233 project='test', |
| 234 prefix='!!! invalid !!!', |
| 235 coordinator_host='example.appspot.com') |
| 236 with client.text('mystream') as fd: |
| 237 self.assertRaises(ValueError, fd.get_viewer_url) |
| 238 |
| 239 def testStreamWithoutProjectCannotGenerateUrls(self): |
| 240 client = self._registry.create('test:value', |
| 241 prefix='foo/bar', |
| 242 coordinator_host='example.appspot.com') |
| 243 with client.text('mystream') as fd: |
| 244 self.assertRaises(KeyError, fd.get_viewer_url) |
| 245 |
| 246 def testStreamWithoutCoordinatorHostCannotGenerateUrls(self): |
| 247 client = self._registry.create('test:value', |
| 248 project='test', |
| 249 prefix='foo/bar') |
| 250 with client.text('mystream') as fd: |
| 251 self.assertRaises(KeyError, fd.get_viewer_url) |
| 252 |
| 253 |
| 190 def testCreatingDuplicateStreamNameRaisesValueError(self): | 254 def testCreatingDuplicateStreamNameRaisesValueError(self): |
| 191 client = self._registry.create('test:value') | 255 client = self._registry.create('test:value') |
| 192 with client.text('mystream') as fd: | 256 with client.text('mystream') as fd: |
| 193 fd.write('Using a text stream.') | 257 fd.write('Using a text stream.') |
| 194 | 258 |
| 195 with self.assertRaises(ValueError): | 259 with self.assertRaises(ValueError): |
| 196 with client.text('mystream') as fd: | 260 with client.text('mystream') as fd: |
| 197 fd.write('Should not work.') | 261 fd.write('Should not work.') |
| 198 | 262 |
| 199 conn = client.last_conn | 263 conn = client.last_conn |
| 200 self.assertTrue(conn.closed) | 264 self.assertTrue(conn.closed) |
| 201 | 265 |
| 202 header, data = conn.interpret() | 266 header, data = conn.interpret() |
| 203 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) | 267 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
| 204 self.assertEqual(data, 'Using a text stream.') | 268 self.assertEqual(data, 'Using a text stream.') |
| 205 | 269 |
| 206 | 270 |
| 207 if __name__ == '__main__': | 271 if __name__ == '__main__': |
| 208 unittest.main() | 272 unittest.main() |
| OLD | NEW |