Index: third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py |
diff --git a/third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py b/third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4f8e26c9a26c986ffacf436228dde928b3407d21 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py |
@@ -0,0 +1,277 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""Tests of the base interface of RPC Framework.""" |
+ |
+import logging |
+import random |
+import threading |
+import time |
+import unittest |
+ |
+from grpc.framework.foundation import logging_pool |
+from grpc.framework.interfaces.base import base |
+from grpc.framework.interfaces.base import utilities |
+from tests.unit.framework.common import test_constants |
+from tests.unit.framework.interfaces.base import _control |
+from tests.unit.framework.interfaces.base import test_interfaces |
+ |
+_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True)) |
+ |
+_EMPTY_OUTCOME_KIND_DICT = { |
+ outcome_kind: 0 for outcome_kind in base.Outcome.Kind} |
+ |
+ |
+class _Serialization(test_interfaces.Serialization): |
+ |
+ def serialize_request(self, request): |
+ return request + request |
+ |
+ def deserialize_request(self, serialized_request): |
+ return serialized_request[:len(serialized_request) / 2] |
+ |
+ def serialize_response(self, response): |
+ return response * 3 |
+ |
+ def deserialize_response(self, serialized_response): |
+ return serialized_response[2 * len(serialized_response) / 3:] |
+ |
+ |
+def _advance(quadruples, operator, controller): |
+ try: |
+ for quadruple in quadruples: |
+ operator.advance( |
+ initial_metadata=quadruple[0], payload=quadruple[1], |
+ completion=quadruple[2], allowance=quadruple[3]) |
+ except Exception as e: # pylint: disable=broad-except |
+ controller.failed('Exception on advance: %e' % e) |
+ |
+ |
+class _Operator(base.Operator): |
+ |
+ def __init__(self, controller, on_advance, pool, operator_under_test): |
+ self._condition = threading.Condition() |
+ self._controller = controller |
+ self._on_advance = on_advance |
+ self._pool = pool |
+ self._operator_under_test = operator_under_test |
+ self._pending_advances = [] |
+ |
+ def set_operator_under_test(self, operator_under_test): |
+ with self._condition: |
+ self._operator_under_test = operator_under_test |
+ pent_advances = self._pending_advances |
+ self._pending_advances = [] |
+ pool = self._pool |
+ controller = self._controller |
+ |
+ if pool is None: |
+ _advance(pent_advances, operator_under_test, controller) |
+ else: |
+ pool.submit(_advance, pent_advances, operator_under_test, controller) |
+ |
+ def advance( |
+ self, initial_metadata=None, payload=None, completion=None, |
+ allowance=None): |
+ on_advance = self._on_advance( |
+ initial_metadata, payload, completion, allowance) |
+ if on_advance.kind is _control.OnAdvance.Kind.ADVANCE: |
+ with self._condition: |
+ pool = self._pool |
+ operator_under_test = self._operator_under_test |
+ controller = self._controller |
+ |
+ quadruple = ( |
+ on_advance.initial_metadata, on_advance.payload, |
+ on_advance.completion, on_advance.allowance) |
+ if pool is None: |
+ _advance((quadruple,), operator_under_test, controller) |
+ else: |
+ pool.submit(_advance, (quadruple,), operator_under_test, controller) |
+ elif on_advance.kind is _control.OnAdvance.Kind.DEFECT: |
+ raise ValueError( |
+ 'Deliberately raised exception from Operator.advance (in a test)!') |
+ |
+ |
+class _ProtocolReceiver(base.ProtocolReceiver): |
+ |
+ def __init__(self): |
+ self._condition = threading.Condition() |
+ self._contexts = [] |
+ |
+ def context(self, protocol_context): |
+ with self._condition: |
+ self._contexts.append(protocol_context) |
+ |
+ |
+class _Servicer(base.Servicer): |
+ """A base.Servicer with instrumented for testing.""" |
+ |
+ def __init__(self, group, method, controllers, pool): |
+ self._condition = threading.Condition() |
+ self._group = group |
+ self._method = method |
+ self._pool = pool |
+ self._controllers = list(controllers) |
+ |
+ def service(self, group, method, context, output_operator): |
+ with self._condition: |
+ controller = self._controllers.pop(0) |
+ if group != self._group or method != self._method: |
+ controller.fail( |
+ '%s != %s or %s != %s' % (group, self._group, method, self._method)) |
+ raise base.NoSuchMethodError(None, None) |
+ else: |
+ operator = _Operator( |
+ controller, controller.on_service_advance, self._pool, |
+ output_operator) |
+ outcome = context.add_termination_callback( |
+ controller.service_on_termination) |
+ if outcome is not None: |
+ controller.service_on_termination(outcome) |
+ return utilities.full_subscription(operator, _ProtocolReceiver()) |
+ |
+ |
+class _OperationTest(unittest.TestCase): |
+ |
+ def setUp(self): |
+ if self._synchronicity_variation: |
+ self._pool = logging_pool.pool(test_constants.POOL_SIZE) |
+ else: |
+ self._pool = None |
+ self._controller = self._controller_creator.controller( |
+ self._implementation, self._randomness) |
+ |
+ def tearDown(self): |
+ if self._synchronicity_variation: |
+ self._pool.shutdown(wait=True) |
+ else: |
+ self._pool = None |
+ |
+ def test_operation(self): |
+ invocation = self._controller.invocation() |
+ if invocation.subscription_kind is base.Subscription.Kind.FULL: |
+ test_operator = _Operator( |
+ self._controller, self._controller.on_invocation_advance, |
+ self._pool, None) |
+ subscription = utilities.full_subscription( |
+ test_operator, _ProtocolReceiver()) |
+ else: |
+ # TODO(nathaniel): support and test other subscription kinds. |
+ self.fail('Non-full subscriptions not yet supported!') |
+ |
+ servicer = _Servicer( |
+ invocation.group, invocation.method, (self._controller,), self._pool) |
+ |
+ invocation_end, service_end, memo = self._implementation.instantiate( |
+ {(invocation.group, invocation.method): _Serialization()}, servicer) |
+ |
+ try: |
+ invocation_end.start() |
+ service_end.start() |
+ operation_context, operator_under_test = invocation_end.operate( |
+ invocation.group, invocation.method, subscription, invocation.timeout, |
+ initial_metadata=invocation.initial_metadata, payload=invocation.payload, |
+ completion=invocation.completion) |
+ test_operator.set_operator_under_test(operator_under_test) |
+ outcome = operation_context.add_termination_callback( |
+ self._controller.invocation_on_termination) |
+ if outcome is not None: |
+ self._controller.invocation_on_termination(outcome) |
+ except Exception as e: # pylint: disable=broad-except |
+ self._controller.failed('Exception on invocation: %s' % e) |
+ self.fail(e) |
+ |
+ while True: |
+ instruction = self._controller.poll() |
+ if instruction.kind is _control.Instruction.Kind.ADVANCE: |
+ try: |
+ test_operator.advance( |
+ *instruction.advance_args, **instruction.advance_kwargs) |
+ except Exception as e: # pylint: disable=broad-except |
+ self._controller.failed('Exception on instructed advance: %s' % e) |
+ elif instruction.kind is _control.Instruction.Kind.CANCEL: |
+ try: |
+ operation_context.cancel() |
+ except Exception as e: # pylint: disable=broad-except |
+ self._controller.failed('Exception on cancel: %s' % e) |
+ elif instruction.kind is _control.Instruction.Kind.CONCLUDE: |
+ break |
+ |
+ invocation_stop_event = invocation_end.stop(0) |
+ service_stop_event = service_end.stop(0) |
+ invocation_stop_event.wait() |
+ service_stop_event.wait() |
+ invocation_stats = invocation_end.operation_stats() |
+ service_stats = service_end.operation_stats() |
+ |
+ self._implementation.destantiate(memo) |
+ |
+ self.assertTrue( |
+ instruction.conclude_success, msg=instruction.conclude_message) |
+ |
+ expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT) |
+ expected_invocation_stats[ |
+ instruction.conclude_invocation_outcome_kind] += 1 |
+ self.assertDictEqual(expected_invocation_stats, invocation_stats) |
+ expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT) |
+ expected_service_stats[instruction.conclude_service_outcome_kind] += 1 |
+ self.assertDictEqual(expected_service_stats, service_stats) |
+ |
+ |
+def test_cases(implementation): |
+ """Creates unittest.TestCase classes for a given Base implementation. |
+ |
+ Args: |
+ implementation: A test_interfaces.Implementation specifying creation and |
+ destruction of the Base implementation under test. |
+ |
+ Returns: |
+ A sequence of subclasses of unittest.TestCase defining tests of the |
+ specified Base layer implementation. |
+ """ |
+ random_seed = hash(time.time()) |
+ logging.warning('Random seed for this execution: %s', random_seed) |
+ randomness = random.Random(x=random_seed) |
+ |
+ test_case_classes = [] |
+ for synchronicity_variation in _SYNCHRONICITY_VARIATION: |
+ for controller_creator in _control.CONTROLLER_CREATORS: |
+ name = ''.join( |
+ (synchronicity_variation[0], controller_creator.name(), 'Test',)) |
+ test_case_classes.append( |
+ type(name, (_OperationTest,), |
+ {'_implementation': implementation, |
+ '_randomness': randomness, |
+ '_synchronicity_variation': synchronicity_variation[1], |
+ '_controller_creator': controller_creator, |
+ '__module__': implementation.__module__, |
+ })) |
+ |
+ return test_case_classes |