Index: tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/protobuf_test.py |
diff --git a/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/protobuf_test.py b/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/protobuf_test.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..9a65824f72ccefa2ada11aacc1a1883374b28039 |
--- /dev/null |
+++ b/tools/telemetry/third_party/gsutilz/third_party/protorpc/protorpc/protobuf_test.py |
@@ -0,0 +1,299 @@ |
+#!/usr/bin/env python |
+# |
+# Copyright 2010 Google Inc. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+# |
+ |
+"""Tests for protorpc.protobuf.""" |
+ |
+__author__ = 'rafek@google.com (Rafe Kaplan)' |
+ |
+ |
+import datetime |
+import unittest |
+ |
+from protorpc import message_types |
+from protorpc import messages |
+from protorpc import protobuf |
+from protorpc import protorpc_test_pb2 |
+from protorpc import test_util |
+from protorpc import util |
+ |
+# TODO: Add DateTimeFields to protorpc_test.proto when definition.py |
+# supports date time fields. |
+class HasDateTimeMessage(messages.Message): |
+ value = message_types.DateTimeField(1) |
+ |
+class NestedDateTimeMessage(messages.Message): |
+ value = messages.MessageField(message_types.DateTimeMessage, 1) |
+ |
+ |
+class ModuleInterfaceTest(test_util.ModuleInterfaceTest, |
+ test_util.TestCase): |
+ |
+ MODULE = protobuf |
+ |
+ |
+class EncodeMessageTest(test_util.TestCase, |
+ test_util.ProtoConformanceTestBase): |
+ """Test message to protocol buffer encoding.""" |
+ |
+ PROTOLIB = protobuf |
+ |
+ def assertErrorIs(self, exception, message, function, *params, **kwargs): |
+ try: |
+ function(*params, **kwargs) |
+ self.fail('Expected to raise exception %s but did not.' % exception) |
+ except exception as err: |
+ self.assertEquals(message, str(err)) |
+ |
+ @property |
+ def encoded_partial(self): |
+ proto = protorpc_test_pb2.OptionalMessage() |
+ proto.double_value = 1.23 |
+ proto.int64_value = -100000000000 |
+ proto.int32_value = 1020 |
+ proto.string_value = u'a string' |
+ proto.enum_value = protorpc_test_pb2.OptionalMessage.VAL2 |
+ |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_full(self): |
+ proto = protorpc_test_pb2.OptionalMessage() |
+ proto.double_value = 1.23 |
+ proto.float_value = -2.5 |
+ proto.int64_value = -100000000000 |
+ proto.uint64_value = 102020202020 |
+ proto.int32_value = 1020 |
+ proto.bool_value = True |
+ proto.string_value = u'a string\u044f' |
+ proto.bytes_value = b'a bytes\xff\xfe' |
+ proto.enum_value = protorpc_test_pb2.OptionalMessage.VAL2 |
+ |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_repeated(self): |
+ proto = protorpc_test_pb2.RepeatedMessage() |
+ proto.double_value.append(1.23) |
+ proto.double_value.append(2.3) |
+ proto.float_value.append(-2.5) |
+ proto.float_value.append(0.5) |
+ proto.int64_value.append(-100000000000) |
+ proto.int64_value.append(20) |
+ proto.uint64_value.append(102020202020) |
+ proto.uint64_value.append(10) |
+ proto.int32_value.append(1020) |
+ proto.int32_value.append(718) |
+ proto.bool_value.append(True) |
+ proto.bool_value.append(False) |
+ proto.string_value.append(u'a string\u044f') |
+ proto.string_value.append(u'another string') |
+ proto.bytes_value.append(b'a bytes\xff\xfe') |
+ proto.bytes_value.append(b'another bytes') |
+ proto.enum_value.append(protorpc_test_pb2.RepeatedMessage.VAL2) |
+ proto.enum_value.append(protorpc_test_pb2.RepeatedMessage.VAL1) |
+ |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_nested(self): |
+ proto = protorpc_test_pb2.HasNestedMessage() |
+ proto.nested.a_value = 'a string' |
+ |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_repeated_nested(self): |
+ proto = protorpc_test_pb2.HasNestedMessage() |
+ proto.repeated_nested.add().a_value = 'a string' |
+ proto.repeated_nested.add().a_value = 'another string' |
+ |
+ return proto.SerializeToString() |
+ |
+ unexpected_tag_message = ( |
+ chr((15 << protobuf._WIRE_TYPE_BITS) | protobuf._Encoder.NUMERIC) + |
+ chr(5)) |
+ |
+ @property |
+ def encoded_default_assigned(self): |
+ proto = protorpc_test_pb2.HasDefault() |
+ proto.a_value = test_util.HasDefault.a_value.default |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_nested_empty(self): |
+ proto = protorpc_test_pb2.HasOptionalNestedMessage() |
+ proto.nested.Clear() |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_repeated_nested_empty(self): |
+ proto = protorpc_test_pb2.HasOptionalNestedMessage() |
+ proto.repeated_nested.add() |
+ proto.repeated_nested.add() |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_extend_message(self): |
+ proto = protorpc_test_pb2.RepeatedMessage() |
+ proto.add_int64_value(400) |
+ proto.add_int64_value(50) |
+ proto.add_int64_value(6000) |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_string_types(self): |
+ proto = protorpc_test_pb2.OptionalMessage() |
+ proto.string_value = u'Latin' |
+ return proto.SerializeToString() |
+ |
+ @property |
+ def encoded_invalid_enum(self): |
+ encoder = protobuf._Encoder() |
+ field_num = test_util.OptionalMessage.enum_value.number |
+ tag = (field_num << protobuf._WIRE_TYPE_BITS) | encoder.NUMERIC |
+ encoder.putVarInt32(tag) |
+ encoder.putVarInt32(1000) |
+ return encoder.buffer().tostring() |
+ |
+ def testDecodeWrongWireFormat(self): |
+ """Test what happens when wrong wire format found in protobuf.""" |
+ class ExpectedProto(messages.Message): |
+ value = messages.StringField(1) |
+ |
+ class WrongVariant(messages.Message): |
+ value = messages.IntegerField(1) |
+ |
+ original = WrongVariant() |
+ original.value = 10 |
+ self.assertErrorIs(messages.DecodeError, |
+ 'Expected wire type STRING but found NUMERIC', |
+ protobuf.decode_message, |
+ ExpectedProto, |
+ protobuf.encode_message(original)) |
+ |
+ def testDecodeBadWireType(self): |
+ """Test what happens when non-existant wire type found in protobuf.""" |
+ # Message has tag 1, type 3 which does not exist. |
+ bad_wire_type_message = chr((1 << protobuf._WIRE_TYPE_BITS) | 3) |
+ |
+ self.assertErrorIs(messages.DecodeError, |
+ 'No such wire type 3', |
+ protobuf.decode_message, |
+ test_util.OptionalMessage, |
+ bad_wire_type_message) |
+ |
+ def testUnexpectedTagBelowOne(self): |
+ """Test that completely invalid tags generate an error.""" |
+ # Message has tag 0, type NUMERIC. |
+ invalid_tag_message = chr(protobuf._Encoder.NUMERIC) |
+ |
+ self.assertErrorIs(messages.DecodeError, |
+ 'Invalid tag value 0', |
+ protobuf.decode_message, |
+ test_util.OptionalMessage, |
+ invalid_tag_message) |
+ |
+ def testProtocolBufferDecodeError(self): |
+ """Test what happens when there a ProtocolBufferDecodeError. |
+ |
+ This is what happens when the underlying ProtocolBuffer library raises |
+ it's own decode error. |
+ """ |
+ # Message has tag 1, type DOUBLE, missing value. |
+ truncated_message = ( |
+ chr((1 << protobuf._WIRE_TYPE_BITS) | protobuf._Encoder.DOUBLE)) |
+ |
+ self.assertErrorIs(messages.DecodeError, |
+ 'Decoding error: truncated', |
+ protobuf.decode_message, |
+ test_util.OptionalMessage, |
+ truncated_message) |
+ |
+ def testProtobufUnrecognizedField(self): |
+ """Test that unrecognized fields are serialized and can be accessed.""" |
+ decoded = protobuf.decode_message(test_util.OptionalMessage, |
+ self.unexpected_tag_message) |
+ self.assertEquals(1, len(decoded.all_unrecognized_fields())) |
+ self.assertEquals(15, decoded.all_unrecognized_fields()[0]) |
+ self.assertEquals((5, messages.Variant.INT64), |
+ decoded.get_unrecognized_field_info(15)) |
+ |
+ def testUnrecognizedFieldWrongFormat(self): |
+ """Test that unrecognized fields in the wrong format are skipped.""" |
+ |
+ class SimpleMessage(messages.Message): |
+ value = messages.IntegerField(1) |
+ |
+ message = SimpleMessage(value=3) |
+ message.set_unrecognized_field('from_json', 'test', messages.Variant.STRING) |
+ |
+ encoded = protobuf.encode_message(message) |
+ expected = ( |
+ chr((1 << protobuf._WIRE_TYPE_BITS) | protobuf._Encoder.NUMERIC) + |
+ chr(3)) |
+ self.assertEquals(encoded, expected) |
+ |
+ def testProtobufDecodeDateTimeMessage(self): |
+ """Test what happens when decoding a DateTimeMessage.""" |
+ |
+ nested = NestedDateTimeMessage() |
+ nested.value = message_types.DateTimeMessage(milliseconds=2500) |
+ value = protobuf.decode_message(HasDateTimeMessage, |
+ protobuf.encode_message(nested)).value |
+ self.assertEqual(datetime.datetime(1970, 1, 1, 0, 0, 2, 500000), value) |
+ |
+ def testProtobufDecodeDateTimeMessageWithTimeZone(self): |
+ """Test what happens when decoding a DateTimeMessage with a time zone.""" |
+ nested = NestedDateTimeMessage() |
+ nested.value = message_types.DateTimeMessage(milliseconds=12345678, |
+ time_zone_offset=60) |
+ value = protobuf.decode_message(HasDateTimeMessage, |
+ protobuf.encode_message(nested)).value |
+ self.assertEqual(datetime.datetime(1970, 1, 1, 3, 25, 45, 678000, |
+ tzinfo=util.TimeZoneOffset(60)), |
+ value) |
+ |
+ def testProtobufEncodeDateTimeMessage(self): |
+ """Test what happens when encoding a DateTimeField.""" |
+ mine = HasDateTimeMessage(value=datetime.datetime(1970, 1, 1)) |
+ nested = NestedDateTimeMessage() |
+ nested.value = message_types.DateTimeMessage(milliseconds=0) |
+ |
+ my_encoded = protobuf.encode_message(mine) |
+ encoded = protobuf.encode_message(nested) |
+ self.assertEquals(my_encoded, encoded) |
+ |
+ def testProtobufEncodeDateTimeMessageWithTimeZone(self): |
+ """Test what happens when encoding a DateTimeField with a time zone.""" |
+ for tz_offset in (30, -30, 8 * 60, 0): |
+ mine = HasDateTimeMessage(value=datetime.datetime( |
+ 1970, 1, 1, tzinfo=util.TimeZoneOffset(tz_offset))) |
+ nested = NestedDateTimeMessage() |
+ nested.value = message_types.DateTimeMessage( |
+ milliseconds=0, time_zone_offset=tz_offset) |
+ |
+ my_encoded = protobuf.encode_message(mine) |
+ encoded = protobuf.encode_message(nested) |
+ self.assertEquals(my_encoded, encoded) |
+ |
+ |
+def main(): |
+ unittest.main() |
+ |
+ |
+if __name__ == '__main__': |
+ main() |