| Index: third_party/grpc/src/python/grpcio/tests/unit/beta/_beta_features_test.py
|
| diff --git a/third_party/grpc/src/python/grpcio/tests/unit/beta/_beta_features_test.py b/third_party/grpc/src/python/grpcio/tests/unit/beta/_beta_features_test.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ea44177b499d4d5aaa0aff826b3bc9cd7133c3f6
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/tests/unit/beta/_beta_features_test.py
|
| @@ -0,0 +1,343 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""Tests Face interface compliance of the gRPC Python Beta API."""
|
| +
|
| +import threading
|
| +import unittest
|
| +
|
| +from grpc.beta import implementations
|
| +from grpc.beta import interfaces
|
| +from grpc.framework.common import cardinality
|
| +from grpc.framework.interfaces.face import utilities
|
| +from tests.unit import resources
|
| +from tests.unit.beta import test_utilities
|
| +from tests.unit.framework.common import test_constants
|
| +
|
| +_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
|
| +
|
| +_PER_RPC_CREDENTIALS_METADATA_KEY = 'my-call-credentials-metadata-key'
|
| +_PER_RPC_CREDENTIALS_METADATA_VALUE = 'my-call-credentials-metadata-value'
|
| +
|
| +_GROUP = 'group'
|
| +_UNARY_UNARY = 'unary-unary'
|
| +_UNARY_STREAM = 'unary-stream'
|
| +_STREAM_UNARY = 'stream-unary'
|
| +_STREAM_STREAM = 'stream-stream'
|
| +
|
| +_REQUEST = b'abc'
|
| +_RESPONSE = b'123'
|
| +
|
| +
|
| +class _Servicer(object):
|
| +
|
| + def __init__(self):
|
| + self._condition = threading.Condition()
|
| + self._peer = None
|
| + self._serviced = False
|
| +
|
| + def unary_unary(self, request, context):
|
| + with self._condition:
|
| + self._request = request
|
| + self._peer = context.protocol_context().peer()
|
| + self._invocation_metadata = context.invocation_metadata()
|
| + context.protocol_context().disable_next_response_compression()
|
| + self._serviced = True
|
| + self._condition.notify_all()
|
| + return _RESPONSE
|
| +
|
| + def unary_stream(self, request, context):
|
| + with self._condition:
|
| + self._request = request
|
| + self._peer = context.protocol_context().peer()
|
| + self._invocation_metadata = context.invocation_metadata()
|
| + context.protocol_context().disable_next_response_compression()
|
| + self._serviced = True
|
| + self._condition.notify_all()
|
| + return
|
| + yield
|
| +
|
| + def stream_unary(self, request_iterator, context):
|
| + for request in request_iterator:
|
| + self._request = request
|
| + with self._condition:
|
| + self._peer = context.protocol_context().peer()
|
| + self._invocation_metadata = context.invocation_metadata()
|
| + context.protocol_context().disable_next_response_compression()
|
| + self._serviced = True
|
| + self._condition.notify_all()
|
| + return _RESPONSE
|
| +
|
| + def stream_stream(self, request_iterator, context):
|
| + for request in request_iterator:
|
| + with self._condition:
|
| + self._peer = context.protocol_context().peer()
|
| + context.protocol_context().disable_next_response_compression()
|
| + yield _RESPONSE
|
| + with self._condition:
|
| + self._invocation_metadata = context.invocation_metadata()
|
| + self._serviced = True
|
| + self._condition.notify_all()
|
| +
|
| + def peer(self):
|
| + with self._condition:
|
| + return self._peer
|
| +
|
| + def block_until_serviced(self):
|
| + with self._condition:
|
| + while not self._serviced:
|
| + self._condition.wait()
|
| +
|
| +
|
| +class _BlockingIterator(object):
|
| +
|
| + def __init__(self, upstream):
|
| + self._condition = threading.Condition()
|
| + self._upstream = upstream
|
| + self._allowed = []
|
| +
|
| + def __iter__(self):
|
| + return self
|
| +
|
| + def next(self):
|
| + with self._condition:
|
| + while True:
|
| + if self._allowed is None:
|
| + raise StopIteration()
|
| + elif self._allowed:
|
| + return self._allowed.pop(0)
|
| + else:
|
| + self._condition.wait()
|
| +
|
| + def allow(self):
|
| + with self._condition:
|
| + try:
|
| + self._allowed.append(next(self._upstream))
|
| + except StopIteration:
|
| + self._allowed = None
|
| + self._condition.notify_all()
|
| +
|
| +
|
| +def _metadata_plugin(context, callback):
|
| + callback([(_PER_RPC_CREDENTIALS_METADATA_KEY,
|
| + _PER_RPC_CREDENTIALS_METADATA_VALUE)], None)
|
| +
|
| +
|
| +class BetaFeaturesTest(unittest.TestCase):
|
| +
|
| + def setUp(self):
|
| + self._servicer = _Servicer()
|
| + method_implementations = {
|
| + (_GROUP, _UNARY_UNARY):
|
| + utilities.unary_unary_inline(self._servicer.unary_unary),
|
| + (_GROUP, _UNARY_STREAM):
|
| + utilities.unary_stream_inline(self._servicer.unary_stream),
|
| + (_GROUP, _STREAM_UNARY):
|
| + utilities.stream_unary_inline(self._servicer.stream_unary),
|
| + (_GROUP, _STREAM_STREAM):
|
| + utilities.stream_stream_inline(self._servicer.stream_stream),
|
| + }
|
| +
|
| + cardinalities = {
|
| + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
|
| + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
|
| + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
|
| + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
|
| + }
|
| +
|
| + server_options = implementations.server_options(
|
| + thread_pool_size=test_constants.POOL_SIZE)
|
| + self._server = implementations.server(
|
| + method_implementations, options=server_options)
|
| + server_credentials = implementations.ssl_server_credentials(
|
| + [(resources.private_key(), resources.certificate_chain(),),])
|
| + port = self._server.add_secure_port('[::]:0', server_credentials)
|
| + self._server.start()
|
| + self._channel_credentials = implementations.ssl_channel_credentials(
|
| + resources.test_root_certificates(), None, None)
|
| + self._call_credentials = implementations.metadata_call_credentials(
|
| + _metadata_plugin)
|
| + channel = test_utilities.not_really_secure_channel(
|
| + 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
|
| + stub_options = implementations.stub_options(
|
| + thread_pool_size=test_constants.POOL_SIZE)
|
| + self._dynamic_stub = implementations.dynamic_stub(
|
| + channel, _GROUP, cardinalities, options=stub_options)
|
| +
|
| + def tearDown(self):
|
| + self._dynamic_stub = None
|
| + self._server.stop(test_constants.SHORT_TIMEOUT).wait()
|
| +
|
| + def test_unary_unary(self):
|
| + call_options = interfaces.grpc_call_options(
|
| + disable_compression=True, credentials=self._call_credentials)
|
| + response = getattr(self._dynamic_stub, _UNARY_UNARY)(
|
| + _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
|
| + self.assertEqual(_RESPONSE, response)
|
| + self.assertIsNotNone(self._servicer.peer())
|
| + invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
|
| + self._servicer._invocation_metadata]
|
| + self.assertIn(
|
| + (_PER_RPC_CREDENTIALS_METADATA_KEY,
|
| + _PER_RPC_CREDENTIALS_METADATA_VALUE),
|
| + invocation_metadata)
|
| +
|
| + def test_unary_stream(self):
|
| + call_options = interfaces.grpc_call_options(
|
| + disable_compression=True, credentials=self._call_credentials)
|
| + response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
|
| + _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
|
| + self._servicer.block_until_serviced()
|
| + self.assertIsNotNone(self._servicer.peer())
|
| + invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
|
| + self._servicer._invocation_metadata]
|
| + self.assertIn(
|
| + (_PER_RPC_CREDENTIALS_METADATA_KEY,
|
| + _PER_RPC_CREDENTIALS_METADATA_VALUE),
|
| + invocation_metadata)
|
| +
|
| + def test_stream_unary(self):
|
| + call_options = interfaces.grpc_call_options(
|
| + credentials=self._call_credentials)
|
| + request_iterator = _BlockingIterator(iter((_REQUEST,)))
|
| + response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
|
| + request_iterator, test_constants.LONG_TIMEOUT,
|
| + protocol_options=call_options)
|
| + response_future.protocol_context().disable_next_request_compression()
|
| + request_iterator.allow()
|
| + response_future.protocol_context().disable_next_request_compression()
|
| + request_iterator.allow()
|
| + self._servicer.block_until_serviced()
|
| + self.assertIsNotNone(self._servicer.peer())
|
| + self.assertEqual(_RESPONSE, response_future.result())
|
| + invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
|
| + self._servicer._invocation_metadata]
|
| + self.assertIn(
|
| + (_PER_RPC_CREDENTIALS_METADATA_KEY,
|
| + _PER_RPC_CREDENTIALS_METADATA_VALUE),
|
| + invocation_metadata)
|
| +
|
| + def test_stream_stream(self):
|
| + call_options = interfaces.grpc_call_options(
|
| + credentials=self._call_credentials)
|
| + request_iterator = _BlockingIterator(iter((_REQUEST,)))
|
| + response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
|
| + request_iterator, test_constants.SHORT_TIMEOUT,
|
| + protocol_options=call_options)
|
| + response_iterator.protocol_context().disable_next_request_compression()
|
| + request_iterator.allow()
|
| + response = next(response_iterator)
|
| + response_iterator.protocol_context().disable_next_request_compression()
|
| + request_iterator.allow()
|
| + self._servicer.block_until_serviced()
|
| + self.assertIsNotNone(self._servicer.peer())
|
| + self.assertEqual(_RESPONSE, response)
|
| + invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
|
| + self._servicer._invocation_metadata]
|
| + self.assertIn(
|
| + (_PER_RPC_CREDENTIALS_METADATA_KEY,
|
| + _PER_RPC_CREDENTIALS_METADATA_VALUE),
|
| + invocation_metadata)
|
| +
|
| +
|
| +class ContextManagementAndLifecycleTest(unittest.TestCase):
|
| +
|
| + def setUp(self):
|
| + self._servicer = _Servicer()
|
| + self._method_implementations = {
|
| + (_GROUP, _UNARY_UNARY):
|
| + utilities.unary_unary_inline(self._servicer.unary_unary),
|
| + (_GROUP, _UNARY_STREAM):
|
| + utilities.unary_stream_inline(self._servicer.unary_stream),
|
| + (_GROUP, _STREAM_UNARY):
|
| + utilities.stream_unary_inline(self._servicer.stream_unary),
|
| + (_GROUP, _STREAM_STREAM):
|
| + utilities.stream_stream_inline(self._servicer.stream_stream),
|
| + }
|
| +
|
| + self._cardinalities = {
|
| + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
|
| + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
|
| + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
|
| + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
|
| + }
|
| +
|
| + self._server_options = implementations.server_options(
|
| + thread_pool_size=test_constants.POOL_SIZE)
|
| + self._server_credentials = implementations.ssl_server_credentials(
|
| + [(resources.private_key(), resources.certificate_chain(),),])
|
| + self._channel_credentials = implementations.ssl_channel_credentials(
|
| + resources.test_root_certificates(), None, None)
|
| + self._stub_options = implementations.stub_options(
|
| + thread_pool_size=test_constants.POOL_SIZE)
|
| +
|
| + def test_stub_context(self):
|
| + server = implementations.server(
|
| + self._method_implementations, options=self._server_options)
|
| + port = server.add_secure_port('[::]:0', self._server_credentials)
|
| + server.start()
|
| +
|
| + channel = test_utilities.not_really_secure_channel(
|
| + 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
|
| + dynamic_stub = implementations.dynamic_stub(
|
| + channel, _GROUP, self._cardinalities, options=self._stub_options)
|
| + for _ in range(100):
|
| + with dynamic_stub:
|
| + pass
|
| + for _ in range(10):
|
| + with dynamic_stub:
|
| + call_options = interfaces.grpc_call_options(
|
| + disable_compression=True)
|
| + response = getattr(dynamic_stub, _UNARY_UNARY)(
|
| + _REQUEST, test_constants.LONG_TIMEOUT,
|
| + protocol_options=call_options)
|
| + self.assertEqual(_RESPONSE, response)
|
| + self.assertIsNotNone(self._servicer.peer())
|
| +
|
| + server.stop(test_constants.SHORT_TIMEOUT).wait()
|
| +
|
| + def test_server_lifecycle(self):
|
| + for _ in range(100):
|
| + server = implementations.server(
|
| + self._method_implementations, options=self._server_options)
|
| + port = server.add_secure_port('[::]:0', self._server_credentials)
|
| + server.start()
|
| + server.stop(test_constants.SHORT_TIMEOUT).wait()
|
| + for _ in range(100):
|
| + server = implementations.server(
|
| + self._method_implementations, options=self._server_options)
|
| + server.add_secure_port('[::]:0', self._server_credentials)
|
| + server.add_insecure_port('[::]:0')
|
| + with server:
|
| + server.stop(test_constants.SHORT_TIMEOUT)
|
| + server.stop(test_constants.SHORT_TIMEOUT)
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main(verbosity=2)
|
|
|