| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed by the Apache v2.0 license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import unittest |
| 6 import itertools |
| 7 import json |
| 8 import StringIO |
| 9 |
| 10 from client.libs.logdog import stream, varint |
| 11 |
| 12 |
| 13 class StreamParamsTestCase(unittest.TestCase): |
| 14 |
| 15 def setUp(self): |
| 16 self.params = stream.StreamParams( |
| 17 'name', |
| 18 type=stream.StreamParams.TEXT, |
| 19 content_type='content-type', |
| 20 tags={ |
| 21 'foo': 'bar', |
| 22 'baz': 'qux', |
| 23 }, |
| 24 tee=stream.StreamParams.TEE_STDOUT, |
| 25 binary_file_extension='ext') |
| 26 |
| 27 def testParamsToJson(self): |
| 28 self.assertEqual(self.params.to_json(), |
| 29 ('{"binaryFileExtension": "ext", "contentType": "content-type", ' |
| 30 '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, ' |
| 31 '"tee": "stdout", "type": "text"}')) |
| 32 |
| 33 def testParamsToJsonWithEmpties(self): |
| 34 params = self.params._replace( |
| 35 content_type=None, |
| 36 tags=None, |
| 37 tee=None, |
| 38 binary_file_extension=None, |
| 39 ) |
| 40 self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}') |
| 41 |
| 42 def testParamsWithInvalidTypeRaisesValueError(self): |
| 43 params = self.params._replace(type=None) |
| 44 self.assertRaises(ValueError, params.to_json) |
| 45 |
| 46 def testParamsWithInvalidTeeTypeRaisesValueError(self): |
| 47 params = self.params._replace(tee='somewhere') |
| 48 self.assertRaises(ValueError, params.to_json) |
| 49 |
| 50 def testParamsWithInvalidTagRaisesValueError(self): |
| 51 params = self.params._replace(tags='foo') |
| 52 self.assertRaises(ValueError, params.to_json) |
| 53 |
| 54 params = self.params._replace(tags={'!!! invalid tag key !!!': 'bar'}) |
| 55 self.assertRaises(ValueError, params.to_json) |
| 56 |
| 57 |
| 58 class StreamClientTestCase(unittest.TestCase): |
| 59 |
| 60 class _TestStreamClientConnection(object): |
| 61 |
| 62 def __init__(self): |
| 63 self.buffer = StringIO.StringIO() |
| 64 self.closed = False |
| 65 |
| 66 def _assert_not_closed(self): |
| 67 if self.closed: |
| 68 raise Exception('Connection is closed.') |
| 69 |
| 70 def write(self, v): |
| 71 self._assert_not_closed() |
| 72 self.buffer.write(v) |
| 73 |
| 74 def close(self): |
| 75 self._assert_not_closed() |
| 76 self.closed = True |
| 77 |
| 78 def interpret(self): |
| 79 data = StringIO.StringIO(self.buffer.getvalue()) |
| 80 length, _ = varint.read_uvarint(data) |
| 81 header = data.read(length) |
| 82 return json.loads(header), data.read() |
| 83 |
| 84 class _TestStreamClient(stream.StreamClient): |
| 85 def __init__(self, value): |
| 86 super(StreamClientTestCase._TestStreamClient, self).__init__() |
| 87 self.value = value |
| 88 self.last_conn = None |
| 89 |
| 90 @classmethod |
| 91 def _create(cls, value): |
| 92 return cls(value) |
| 93 |
| 94 def _connect_raw(self): |
| 95 conn = StreamClientTestCase._TestStreamClientConnection() |
| 96 self.last_conn = conn |
| 97 return conn |
| 98 |
| 99 def setUp(self): |
| 100 self._registry = stream.StreamProtocolRegistry() |
| 101 self._registry.register_protocol('test', self._TestStreamClient) |
| 102 |
| 103 @staticmethod |
| 104 def _split_datagrams(value): |
| 105 sio = StringIO.StringIO(value) |
| 106 while sio.pos < sio.len: |
| 107 size_prefix, _ = varint.read_uvarint(sio) |
| 108 data = sio.read(size_prefix) |
| 109 if len(data) != size_prefix: |
| 110 raise ValueError('Expected %d bytes, but only got %d' % ( |
| 111 size_prefix, len(data))) |
| 112 yield data |
| 113 |
| 114 def testClientInstantiation(self): |
| 115 client = self._registry.create('test:value') |
| 116 self.assertIsInstance(client, self._TestStreamClient) |
| 117 self.assertEqual(client.value, 'value') |
| 118 |
| 119 def testTextStream(self): |
| 120 client = self._registry.create('test:value') |
| 121 with client.text('mystream') as fd: |
| 122 fd.write('text\nstream\nlines') |
| 123 |
| 124 conn = client.last_conn |
| 125 self.assertTrue(conn.closed) |
| 126 |
| 127 header, data = conn.interpret() |
| 128 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
| 129 self.assertEqual(data, 'text\nstream\nlines') |
| 130 |
| 131 def testTextStreamWithParams(self): |
| 132 client = self._registry.create('test:value') |
| 133 with client.text('mystream', content_type='foo/bar', |
| 134 tee=stream.StreamParams.TEE_STDOUT, |
| 135 tags={'foo': 'bar', 'baz': 'qux'}) as fd: |
| 136 fd.write('text!') |
| 137 |
| 138 conn = client.last_conn |
| 139 self.assertTrue(conn.closed) |
| 140 |
| 141 header, data = conn.interpret() |
| 142 self.assertEqual(header, { |
| 143 'name': 'mystream', |
| 144 'type': 'text', |
| 145 'contentType': 'foo/bar', |
| 146 'tee': 'stdout', |
| 147 'tags': {'foo': 'bar', 'baz': 'qux'}, |
| 148 }) |
| 149 self.assertEqual(data, 'text!') |
| 150 |
| 151 def testBinaryStream(self): |
| 152 client = self._registry.create('test:value') |
| 153 with client.binary('mystream') as fd: |
| 154 fd.write('\x60\x0d\xd0\x65') |
| 155 |
| 156 conn = client.last_conn |
| 157 self.assertTrue(conn.closed) |
| 158 |
| 159 header, data = conn.interpret() |
| 160 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'}) |
| 161 self.assertEqual(data, '\x60\x0d\xd0\x65') |
| 162 |
| 163 def testDatagramStream(self): |
| 164 client = self._registry.create('test:value') |
| 165 with client.datagram('mystream') as fd: |
| 166 fd.send('datagram0') |
| 167 fd.send('dg1') |
| 168 fd.send('') |
| 169 fd.send('dg3') |
| 170 |
| 171 conn = client.last_conn |
| 172 self.assertTrue(conn.closed) |
| 173 |
| 174 header, data = conn.interpret() |
| 175 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'}) |
| 176 self.assertEqual(list(self._split_datagrams(data)), |
| 177 ['datagram0', 'dg1', '', 'dg3']) |
| 178 |
| 179 def testCreatingDuplicateStreamNameRaisesValueError(self): |
| 180 client = self._registry.create('test:value') |
| 181 with client.text('mystream') as fd: |
| 182 fd.write('Using a text stream.') |
| 183 |
| 184 with self.assertRaises(ValueError): |
| 185 with client.text('mystream') as fd: |
| 186 fd.write('Should not work.') |
| 187 |
| 188 conn = client.last_conn |
| 189 self.assertTrue(conn.closed) |
| 190 |
| 191 header, data = conn.interpret() |
| 192 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
| 193 self.assertEqual(data, 'Using a text stream.') |
| 194 |
| 195 |
| 196 if __name__ == '__main__': |
| 197 unittest.main() |
| OLD | NEW |