Index: third_party/grpc/src/python/grpcio/tests/unit/_cython/cygrpc_test.py |
diff --git a/third_party/grpc/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/third_party/grpc/src/python/grpcio/tests/unit/_cython/cygrpc_test.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..876da88de952bd91a1050467516ecb7016c9b25a |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/tests/unit/_cython/cygrpc_test.py |
@@ -0,0 +1,452 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+import time |
+import threading |
+import unittest |
+ |
+from grpc._cython import cygrpc |
+from tests.unit._cython import test_utilities |
+from tests.unit import test_common |
+from tests.unit import resources |
+ |
+ |
+_SSL_HOST_OVERRIDE = 'foo.test.google.fr' |
+_CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key' |
+_CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' |
+ |
+def _metadata_plugin_callback(context, callback): |
+ callback(cygrpc.Metadata( |
+ [cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY, |
+ _CALL_CREDENTIALS_METADATA_VALUE)]), |
+ cygrpc.StatusCode.ok, '') |
+ |
+ |
+class TypeSmokeTest(unittest.TestCase): |
+ |
+ def testStringsInUtilitiesUpDown(self): |
+ self.assertEqual(0, cygrpc.StatusCode.ok) |
+ metadatum = cygrpc.Metadatum('a', 'b') |
+ self.assertEqual('a'.encode(), metadatum.key) |
+ self.assertEqual('b'.encode(), metadatum.value) |
+ metadata = cygrpc.Metadata([metadatum]) |
+ self.assertEqual(1, len(metadata)) |
+ self.assertEqual(metadatum.key, metadata[0].key) |
+ |
+ def testMetadataIteration(self): |
+ metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum('a', 'b'), cygrpc.Metadatum('c', 'd')]) |
+ iterator = iter(metadata) |
+ metadatum = next(iterator) |
+ self.assertIsInstance(metadatum, cygrpc.Metadatum) |
+ self.assertEqual(metadatum.key, 'a'.encode()) |
+ self.assertEqual(metadatum.value, 'b'.encode()) |
+ metadatum = next(iterator) |
+ self.assertIsInstance(metadatum, cygrpc.Metadatum) |
+ self.assertEqual(metadatum.key, 'c'.encode()) |
+ self.assertEqual(metadatum.value, 'd'.encode()) |
+ with self.assertRaises(StopIteration): |
+ next(iterator) |
+ |
+ def testOperationsIteration(self): |
+ operations = cygrpc.Operations([ |
+ cygrpc.operation_send_message('asdf')]) |
+ iterator = iter(operations) |
+ operation = next(iterator) |
+ self.assertIsInstance(operation, cygrpc.Operation) |
+ # `Operation`s are write-only structures; can't directly debug anything out |
+ # of them. Just check that we stop iterating. |
+ with self.assertRaises(StopIteration): |
+ next(iterator) |
+ |
+ def testTimespec(self): |
+ now = time.time() |
+ timespec = cygrpc.Timespec(now) |
+ self.assertAlmostEqual(now, float(timespec), places=8) |
+ |
+ def testCompletionQueueUpDown(self): |
+ completion_queue = cygrpc.CompletionQueue() |
+ del completion_queue |
+ |
+ def testServerUpDown(self): |
+ server = cygrpc.Server(cygrpc.ChannelArgs([])) |
+ del server |
+ |
+ def testChannelUpDown(self): |
+ channel = cygrpc.Channel('[::]:0', cygrpc.ChannelArgs([])) |
+ del channel |
+ |
+ def testCredentialsMetadataPluginUpDown(self): |
+ plugin = cygrpc.CredentialsMetadataPlugin( |
+ lambda ignored_a, ignored_b: None, '') |
+ del plugin |
+ |
+ def testCallCredentialsFromPluginUpDown(self): |
+ plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, '') |
+ call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) |
+ del plugin |
+ del call_credentials |
+ |
+ def testServerStartNoExplicitShutdown(self): |
+ server = cygrpc.Server() |
+ completion_queue = cygrpc.CompletionQueue() |
+ server.register_completion_queue(completion_queue) |
+ port = server.add_http2_port('[::]:0') |
+ self.assertIsInstance(port, int) |
+ server.start() |
+ del server |
+ |
+ def testServerStartShutdown(self): |
+ completion_queue = cygrpc.CompletionQueue() |
+ server = cygrpc.Server() |
+ server.add_http2_port('[::]:0') |
+ server.register_completion_queue(completion_queue) |
+ server.start() |
+ shutdown_tag = object() |
+ server.shutdown(completion_queue, shutdown_tag) |
+ event = completion_queue.poll() |
+ self.assertEqual(cygrpc.CompletionType.operation_complete, event.type) |
+ self.assertIs(shutdown_tag, event.tag) |
+ del server |
+ del completion_queue |
+ |
+ |
+class InsecureServerInsecureClient(unittest.TestCase): |
+ |
+ def setUp(self): |
+ self.server_completion_queue = cygrpc.CompletionQueue() |
+ self.server = cygrpc.Server() |
+ self.server.register_completion_queue(self.server_completion_queue) |
+ self.port = self.server.add_http2_port('[::]:0') |
+ self.server.start() |
+ self.client_completion_queue = cygrpc.CompletionQueue() |
+ self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port)) |
+ |
+ def tearDown(self): |
+ del self.server |
+ del self.client_completion_queue |
+ del self.server_completion_queue |
+ |
+ def testEcho(self): |
+ DEADLINE = time.time()+5 |
+ DEADLINE_TOLERANCE = 0.25 |
+ CLIENT_METADATA_ASCII_KEY = b'key' |
+ CLIENT_METADATA_ASCII_VALUE = b'val' |
+ CLIENT_METADATA_BIN_KEY = b'key-bin' |
+ CLIENT_METADATA_BIN_VALUE = b'\0'*1000 |
+ SERVER_INITIAL_METADATA_KEY = b'init_me_me_me' |
+ SERVER_INITIAL_METADATA_VALUE = b'whodawha?' |
+ SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought' |
+ SERVER_TRAILING_METADATA_VALUE = b'zomg it is' |
+ SERVER_STATUS_CODE = cygrpc.StatusCode.ok |
+ SERVER_STATUS_DETAILS = b'our work is never over' |
+ REQUEST = b'in death a member of project mayhem has a name' |
+ RESPONSE = b'his name is robert paulson' |
+ METHOD = b'twinkies' |
+ HOST = b'hostess' |
+ |
+ cygrpc_deadline = cygrpc.Timespec(DEADLINE) |
+ |
+ server_request_tag = object() |
+ request_call_result = self.server.request_call( |
+ self.server_completion_queue, self.server_completion_queue, |
+ server_request_tag) |
+ |
+ self.assertEqual(cygrpc.CallError.ok, request_call_result) |
+ |
+ client_call_tag = object() |
+ client_call = self.client_channel.create_call( |
+ None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline) |
+ client_initial_metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, |
+ CLIENT_METADATA_ASCII_VALUE), |
+ cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) |
+ client_start_batch_result = client_call.start_batch(cygrpc.Operations([ |
+ cygrpc.operation_send_initial_metadata(client_initial_metadata), |
+ cygrpc.operation_send_message(REQUEST), |
+ cygrpc.operation_send_close_from_client(), |
+ cygrpc.operation_receive_initial_metadata(), |
+ cygrpc.operation_receive_message(), |
+ cygrpc.operation_receive_status_on_client() |
+ ]), client_call_tag) |
+ self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) |
+ client_event_future = test_utilities.CompletionQueuePollFuture( |
+ self.client_completion_queue, cygrpc_deadline) |
+ |
+ request_event = self.server_completion_queue.poll(cygrpc_deadline) |
+ self.assertEqual(cygrpc.CompletionType.operation_complete, |
+ request_event.type) |
+ self.assertIsInstance(request_event.operation_call, cygrpc.Call) |
+ self.assertIs(server_request_tag, request_event.tag) |
+ self.assertEqual(0, len(request_event.batch_operations)) |
+ self.assertTrue( |
+ test_common.metadata_transmitted(client_initial_metadata, |
+ request_event.request_metadata)) |
+ self.assertEqual(METHOD, request_event.request_call_details.method) |
+ self.assertEqual(HOST, request_event.request_call_details.host) |
+ self.assertLess( |
+ abs(DEADLINE - float(request_event.request_call_details.deadline)), |
+ DEADLINE_TOLERANCE) |
+ |
+ server_call_tag = object() |
+ server_call = request_event.operation_call |
+ server_initial_metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY, |
+ SERVER_INITIAL_METADATA_VALUE)]) |
+ server_trailing_metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, |
+ SERVER_TRAILING_METADATA_VALUE)]) |
+ server_start_batch_result = server_call.start_batch([ |
+ cygrpc.operation_send_initial_metadata(server_initial_metadata), |
+ cygrpc.operation_receive_message(), |
+ cygrpc.operation_send_message(RESPONSE), |
+ cygrpc.operation_receive_close_on_server(), |
+ cygrpc.operation_send_status_from_server( |
+ server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) |
+ ], server_call_tag) |
+ self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) |
+ |
+ client_event = client_event_future.result() |
+ server_event = self.server_completion_queue.poll(cygrpc_deadline) |
+ |
+ self.assertEqual(6, len(client_event.batch_operations)) |
+ found_client_op_types = set() |
+ for client_result in client_event.batch_operations: |
+ # we expect each op type to be unique |
+ self.assertNotIn(client_result.type, found_client_op_types) |
+ found_client_op_types.add(client_result.type) |
+ if client_result.type == cygrpc.OperationType.receive_initial_metadata: |
+ self.assertTrue( |
+ test_common.metadata_transmitted(server_initial_metadata, |
+ client_result.received_metadata)) |
+ elif client_result.type == cygrpc.OperationType.receive_message: |
+ self.assertEqual(RESPONSE, client_result.received_message.bytes()) |
+ elif client_result.type == cygrpc.OperationType.receive_status_on_client: |
+ self.assertTrue( |
+ test_common.metadata_transmitted(server_trailing_metadata, |
+ client_result.received_metadata)) |
+ self.assertEqual(SERVER_STATUS_DETAILS, |
+ client_result.received_status_details) |
+ self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code) |
+ self.assertEqual(set([ |
+ cygrpc.OperationType.send_initial_metadata, |
+ cygrpc.OperationType.send_message, |
+ cygrpc.OperationType.send_close_from_client, |
+ cygrpc.OperationType.receive_initial_metadata, |
+ cygrpc.OperationType.receive_message, |
+ cygrpc.OperationType.receive_status_on_client |
+ ]), found_client_op_types) |
+ |
+ self.assertEqual(5, len(server_event.batch_operations)) |
+ found_server_op_types = set() |
+ for server_result in server_event.batch_operations: |
+ self.assertNotIn(client_result.type, found_server_op_types) |
+ found_server_op_types.add(server_result.type) |
+ if server_result.type == cygrpc.OperationType.receive_message: |
+ self.assertEqual(REQUEST, server_result.received_message.bytes()) |
+ elif server_result.type == cygrpc.OperationType.receive_close_on_server: |
+ self.assertFalse(server_result.received_cancelled) |
+ self.assertEqual(set([ |
+ cygrpc.OperationType.send_initial_metadata, |
+ cygrpc.OperationType.receive_message, |
+ cygrpc.OperationType.send_message, |
+ cygrpc.OperationType.receive_close_on_server, |
+ cygrpc.OperationType.send_status_from_server |
+ ]), found_server_op_types) |
+ |
+ del client_call |
+ del server_call |
+ |
+ |
+class SecureServerSecureClient(unittest.TestCase): |
+ |
+ def setUp(self): |
+ server_credentials = cygrpc.server_credentials_ssl( |
+ None, [cygrpc.SslPemKeyCertPair(resources.private_key(), |
+ resources.certificate_chain())], False) |
+ channel_credentials = cygrpc.channel_credentials_ssl( |
+ resources.test_root_certificates(), None) |
+ self.server_completion_queue = cygrpc.CompletionQueue() |
+ self.server = cygrpc.Server() |
+ self.server.register_completion_queue(self.server_completion_queue) |
+ self.port = self.server.add_http2_port('[::]:0', server_credentials) |
+ self.server.start() |
+ self.client_completion_queue = cygrpc.CompletionQueue() |
+ client_channel_arguments = cygrpc.ChannelArgs([ |
+ cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override, |
+ _SSL_HOST_OVERRIDE)]) |
+ self.client_channel = cygrpc.Channel( |
+ 'localhost:{}'.format(self.port), client_channel_arguments, |
+ channel_credentials) |
+ |
+ def tearDown(self): |
+ del self.server |
+ del self.client_completion_queue |
+ del self.server_completion_queue |
+ |
+ def testEcho(self): |
+ DEADLINE = time.time()+5 |
+ DEADLINE_TOLERANCE = 0.25 |
+ CLIENT_METADATA_ASCII_KEY = b'key' |
+ CLIENT_METADATA_ASCII_VALUE = b'val' |
+ CLIENT_METADATA_BIN_KEY = b'key-bin' |
+ CLIENT_METADATA_BIN_VALUE = b'\0'*1000 |
+ SERVER_INITIAL_METADATA_KEY = b'init_me_me_me' |
+ SERVER_INITIAL_METADATA_VALUE = b'whodawha?' |
+ SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought' |
+ SERVER_TRAILING_METADATA_VALUE = b'zomg it is' |
+ SERVER_STATUS_CODE = cygrpc.StatusCode.ok |
+ SERVER_STATUS_DETAILS = b'our work is never over' |
+ REQUEST = b'in death a member of project mayhem has a name' |
+ RESPONSE = b'his name is robert paulson' |
+ METHOD = b'/twinkies' |
+ HOST = None # Default host |
+ |
+ cygrpc_deadline = cygrpc.Timespec(DEADLINE) |
+ |
+ server_request_tag = object() |
+ request_call_result = self.server.request_call( |
+ self.server_completion_queue, self.server_completion_queue, |
+ server_request_tag) |
+ |
+ self.assertEqual(cygrpc.CallError.ok, request_call_result) |
+ |
+ plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, '') |
+ call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) |
+ |
+ client_call_tag = object() |
+ client_call = self.client_channel.create_call( |
+ None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline) |
+ client_call.set_credentials(call_credentials) |
+ client_initial_metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, |
+ CLIENT_METADATA_ASCII_VALUE), |
+ cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) |
+ client_start_batch_result = client_call.start_batch(cygrpc.Operations([ |
+ cygrpc.operation_send_initial_metadata(client_initial_metadata), |
+ cygrpc.operation_send_message(REQUEST), |
+ cygrpc.operation_send_close_from_client(), |
+ cygrpc.operation_receive_initial_metadata(), |
+ cygrpc.operation_receive_message(), |
+ cygrpc.operation_receive_status_on_client() |
+ ]), client_call_tag) |
+ self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) |
+ client_event_future = test_utilities.CompletionQueuePollFuture( |
+ self.client_completion_queue, cygrpc_deadline) |
+ |
+ request_event = self.server_completion_queue.poll(cygrpc_deadline) |
+ self.assertEqual(cygrpc.CompletionType.operation_complete, |
+ request_event.type) |
+ self.assertIsInstance(request_event.operation_call, cygrpc.Call) |
+ self.assertIs(server_request_tag, request_event.tag) |
+ self.assertEqual(0, len(request_event.batch_operations)) |
+ client_metadata_with_credentials = list(client_initial_metadata) + [ |
+ (_CALL_CREDENTIALS_METADATA_KEY, _CALL_CREDENTIALS_METADATA_VALUE)] |
+ self.assertTrue( |
+ test_common.metadata_transmitted(client_metadata_with_credentials, |
+ request_event.request_metadata)) |
+ self.assertEqual(METHOD, request_event.request_call_details.method) |
+ self.assertEqual(_SSL_HOST_OVERRIDE, |
+ request_event.request_call_details.host) |
+ self.assertLess( |
+ abs(DEADLINE - float(request_event.request_call_details.deadline)), |
+ DEADLINE_TOLERANCE) |
+ |
+ server_call_tag = object() |
+ server_call = request_event.operation_call |
+ server_initial_metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY, |
+ SERVER_INITIAL_METADATA_VALUE)]) |
+ server_trailing_metadata = cygrpc.Metadata([ |
+ cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, |
+ SERVER_TRAILING_METADATA_VALUE)]) |
+ server_start_batch_result = server_call.start_batch([ |
+ cygrpc.operation_send_initial_metadata(server_initial_metadata), |
+ cygrpc.operation_receive_message(), |
+ cygrpc.operation_send_message(RESPONSE), |
+ cygrpc.operation_receive_close_on_server(), |
+ cygrpc.operation_send_status_from_server( |
+ server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) |
+ ], server_call_tag) |
+ self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) |
+ |
+ client_event = client_event_future.result() |
+ server_event = self.server_completion_queue.poll(cygrpc_deadline) |
+ |
+ self.assertEqual(6, len(client_event.batch_operations)) |
+ found_client_op_types = set() |
+ for client_result in client_event.batch_operations: |
+ # we expect each op type to be unique |
+ self.assertNotIn(client_result.type, found_client_op_types) |
+ found_client_op_types.add(client_result.type) |
+ if client_result.type == cygrpc.OperationType.receive_initial_metadata: |
+ self.assertTrue( |
+ test_common.metadata_transmitted(server_initial_metadata, |
+ client_result.received_metadata)) |
+ elif client_result.type == cygrpc.OperationType.receive_message: |
+ self.assertEqual(RESPONSE, client_result.received_message.bytes()) |
+ elif client_result.type == cygrpc.OperationType.receive_status_on_client: |
+ self.assertTrue( |
+ test_common.metadata_transmitted(server_trailing_metadata, |
+ client_result.received_metadata)) |
+ self.assertEqual(SERVER_STATUS_DETAILS, |
+ client_result.received_status_details) |
+ self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code) |
+ self.assertEqual(set([ |
+ cygrpc.OperationType.send_initial_metadata, |
+ cygrpc.OperationType.send_message, |
+ cygrpc.OperationType.send_close_from_client, |
+ cygrpc.OperationType.receive_initial_metadata, |
+ cygrpc.OperationType.receive_message, |
+ cygrpc.OperationType.receive_status_on_client |
+ ]), found_client_op_types) |
+ |
+ self.assertEqual(5, len(server_event.batch_operations)) |
+ found_server_op_types = set() |
+ for server_result in server_event.batch_operations: |
+ self.assertNotIn(client_result.type, found_server_op_types) |
+ found_server_op_types.add(server_result.type) |
+ if server_result.type == cygrpc.OperationType.receive_message: |
+ self.assertEqual(REQUEST, server_result.received_message.bytes()) |
+ elif server_result.type == cygrpc.OperationType.receive_close_on_server: |
+ self.assertFalse(server_result.received_cancelled) |
+ self.assertEqual(set([ |
+ cygrpc.OperationType.send_initial_metadata, |
+ cygrpc.OperationType.receive_message, |
+ cygrpc.OperationType.send_message, |
+ cygrpc.OperationType.receive_close_on_server, |
+ cygrpc.OperationType.send_status_from_server |
+ ]), found_server_op_types) |
+ |
+ del client_call |
+ del server_call |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main(verbosity=2) |