| OLD | NEW |
| (Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed by the Apache v2.0 license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 import json |
| 7 import os |
| 8 import sys |
| 9 import unittest |
| 10 import StringIO |
| 11 |
| 12 ROOT_DIR = os.path.dirname(os.path.abspath(os.path.join( |
| 13 __file__, os.pardir, os.pardir, os.pardir))) |
| 14 sys.path.insert(0, ROOT_DIR) |
| 15 |
| 16 from libs.logdog import stream, varint |
| 17 |
| 18 |
| 19 class StreamParamsTestCase(unittest.TestCase): |
| 20 |
| 21 def setUp(self): |
| 22 self.params = stream.StreamParams( |
| 23 'name', |
| 24 type=stream.StreamParams.TEXT, |
| 25 content_type='content-type', |
| 26 tags={ |
| 27 'foo': 'bar', |
| 28 'baz': 'qux', |
| 29 }, |
| 30 tee=stream.StreamParams.TEE_STDOUT, |
| 31 binary_file_extension='ext') |
| 32 |
| 33 def testParamsToJson(self): |
| 34 self.assertEqual(self.params.to_json(), |
| 35 ('{"binaryFileExtension": "ext", "contentType": "content-type", ' |
| 36 '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, ' |
| 37 '"tee": "stdout", "type": "text"}')) |
| 38 |
| 39 def testParamsToJsonWithEmpties(self): |
| 40 params = self.params._replace( |
| 41 content_type=None, |
| 42 tags=None, |
| 43 tee=None, |
| 44 binary_file_extension=None, |
| 45 ) |
| 46 self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}') |
| 47 |
| 48 def testParamsWithInvalidTypeRaisesValueError(self): |
| 49 params = self.params._replace(type=None) |
| 50 self.assertRaises(ValueError, params.to_json) |
| 51 |
| 52 def testParamsWithInvalidTeeTypeRaisesValueError(self): |
| 53 params = self.params._replace(tee='somewhere') |
| 54 self.assertRaises(ValueError, params.to_json) |
| 55 |
| 56 def testParamsWithInvalidTagRaisesValueError(self): |
| 57 params = self.params._replace(tags='foo') |
| 58 self.assertRaises(ValueError, params.to_json) |
| 59 |
| 60 params = self.params._replace(tags={'!!! invalid tag key !!!': 'bar'}) |
| 61 self.assertRaises(ValueError, params.to_json) |
| 62 |
| 63 |
| 64 class StreamClientTestCase(unittest.TestCase): |
| 65 |
| 66 class _TestStreamClientConnection(object): |
| 67 |
| 68 def __init__(self): |
| 69 self.buffer = StringIO.StringIO() |
| 70 self.closed = False |
| 71 |
| 72 def _assert_not_closed(self): |
| 73 if self.closed: |
| 74 raise Exception('Connection is closed.') |
| 75 |
| 76 def write(self, v): |
| 77 self._assert_not_closed() |
| 78 self.buffer.write(v) |
| 79 |
| 80 def close(self): |
| 81 self._assert_not_closed() |
| 82 self.closed = True |
| 83 |
| 84 def interpret(self): |
| 85 data = StringIO.StringIO(self.buffer.getvalue()) |
| 86 length, _ = varint.read_uvarint(data) |
| 87 header = data.read(length) |
| 88 return json.loads(header), data.read() |
| 89 |
| 90 class _TestStreamClient(stream.StreamClient): |
| 91 def __init__(self, value): |
| 92 super(StreamClientTestCase._TestStreamClient, self).__init__() |
| 93 self.value = value |
| 94 self.last_conn = None |
| 95 |
| 96 @classmethod |
| 97 def _create(cls, value): |
| 98 return cls(value) |
| 99 |
| 100 def _connect_raw(self): |
| 101 conn = StreamClientTestCase._TestStreamClientConnection() |
| 102 self.last_conn = conn |
| 103 return conn |
| 104 |
| 105 def setUp(self): |
| 106 self._registry = stream.StreamProtocolRegistry() |
| 107 self._registry.register_protocol('test', self._TestStreamClient) |
| 108 |
| 109 @staticmethod |
| 110 def _split_datagrams(value): |
| 111 sio = StringIO.StringIO(value) |
| 112 while sio.pos < sio.len: |
| 113 size_prefix, _ = varint.read_uvarint(sio) |
| 114 data = sio.read(size_prefix) |
| 115 if len(data) != size_prefix: |
| 116 raise ValueError('Expected %d bytes, but only got %d' % ( |
| 117 size_prefix, len(data))) |
| 118 yield data |
| 119 |
| 120 def testClientInstantiation(self): |
| 121 client = self._registry.create('test:value') |
| 122 self.assertIsInstance(client, self._TestStreamClient) |
| 123 self.assertEqual(client.value, 'value') |
| 124 |
| 125 def testTextStream(self): |
| 126 client = self._registry.create('test:value') |
| 127 with client.text('mystream') as fd: |
| 128 fd.write('text\nstream\nlines') |
| 129 |
| 130 conn = client.last_conn |
| 131 self.assertTrue(conn.closed) |
| 132 |
| 133 header, data = conn.interpret() |
| 134 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
| 135 self.assertEqual(data, 'text\nstream\nlines') |
| 136 |
| 137 def testTextStreamWithParams(self): |
| 138 client = self._registry.create('test:value') |
| 139 with client.text('mystream', content_type='foo/bar', |
| 140 tee=stream.StreamParams.TEE_STDOUT, |
| 141 tags={'foo': 'bar', 'baz': 'qux'}) as fd: |
| 142 fd.write('text!') |
| 143 |
| 144 conn = client.last_conn |
| 145 self.assertTrue(conn.closed) |
| 146 |
| 147 header, data = conn.interpret() |
| 148 self.assertEqual(header, { |
| 149 'name': 'mystream', |
| 150 'type': 'text', |
| 151 'contentType': 'foo/bar', |
| 152 'tee': 'stdout', |
| 153 'tags': {'foo': 'bar', 'baz': 'qux'}, |
| 154 }) |
| 155 self.assertEqual(data, 'text!') |
| 156 |
| 157 def testBinaryStream(self): |
| 158 client = self._registry.create('test:value') |
| 159 with client.binary('mystream') as fd: |
| 160 fd.write('\x60\x0d\xd0\x65') |
| 161 |
| 162 conn = client.last_conn |
| 163 self.assertTrue(conn.closed) |
| 164 |
| 165 header, data = conn.interpret() |
| 166 self.assertEqual(header, {'name': 'mystream', 'type': 'binary'}) |
| 167 self.assertEqual(data, '\x60\x0d\xd0\x65') |
| 168 |
| 169 def testDatagramStream(self): |
| 170 client = self._registry.create('test:value') |
| 171 with client.datagram('mystream') as fd: |
| 172 fd.send('datagram0') |
| 173 fd.send('dg1') |
| 174 fd.send('') |
| 175 fd.send('dg3') |
| 176 |
| 177 conn = client.last_conn |
| 178 self.assertTrue(conn.closed) |
| 179 |
| 180 header, data = conn.interpret() |
| 181 self.assertEqual(header, {'name': 'mystream', 'type': 'datagram'}) |
| 182 self.assertEqual(list(self._split_datagrams(data)), |
| 183 ['datagram0', 'dg1', '', 'dg3']) |
| 184 |
| 185 def testCreatingDuplicateStreamNameRaisesValueError(self): |
| 186 client = self._registry.create('test:value') |
| 187 with client.text('mystream') as fd: |
| 188 fd.write('Using a text stream.') |
| 189 |
| 190 with self.assertRaises(ValueError): |
| 191 with client.text('mystream') as fd: |
| 192 fd.write('Should not work.') |
| 193 |
| 194 conn = client.last_conn |
| 195 self.assertTrue(conn.closed) |
| 196 |
| 197 header, data = conn.interpret() |
| 198 self.assertEqual(header, {'name': 'mystream', 'type': 'text'}) |
| 199 self.assertEqual(data, 'Using a text stream.') |
| 200 |
| 201 |
| 202 if __name__ == '__main__': |
| 203 unittest.main() |
| OLD | NEW |