| Index: third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py
|
| diff --git a/third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py b/third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9304b6b1db13d7ad3e77ffa5c4014eb6714be8a2
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py
|
| @@ -0,0 +1,444 @@
|
| +# Copyright 2015, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +"""Code for making a service.TestService more amenable to use in tests."""
|
| +
|
| +import collections
|
| +import threading
|
| +
|
| +# test_control, _service, and test_interfaces are referenced from specification
|
| +# in this module.
|
| +from grpc.framework.common import cardinality
|
| +from grpc.framework.common import style
|
| +from grpc.framework.foundation import stream
|
| +from grpc.framework.foundation import stream_util
|
| +from grpc.framework.interfaces.face import face
|
| +from tests.unit.framework.common import test_control # pylint: disable=unused-import
|
| +from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import
|
| +from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
|
| +
|
| +_IDENTITY = lambda x: x
|
| +
|
| +
|
| +class TestServiceDigest(
|
| + collections.namedtuple(
|
| + 'TestServiceDigest',
|
| + ('methods',
|
| + 'inline_method_implementations',
|
| + 'event_method_implementations',
|
| + 'multi_method_implementation',
|
| + 'unary_unary_messages_sequences',
|
| + 'unary_stream_messages_sequences',
|
| + 'stream_unary_messages_sequences',
|
| + 'stream_stream_messages_sequences',))):
|
| + """A transformation of a service.TestService.
|
| +
|
| + Attributes:
|
| + methods: A dict from method group-name pair to test_interfaces.Method object
|
| + describing the RPC methods that may be called during the test.
|
| + inline_method_implementations: A dict from method group-name pair to
|
| + face.MethodImplementation object to be used in tests of in-line calls to
|
| + behaviors under test.
|
| + event_method_implementations: A dict from method group-name pair to
|
| + face.MethodImplementation object to be used in tests of event-driven calls
|
| + to behaviors under test.
|
| + multi_method_implementation: A face.MultiMethodImplementation to be used in
|
| + tests of generic calls to behaviors under test.
|
| + unary_unary_messages_sequences: A dict from method group-name pair to
|
| + sequence of service.UnaryUnaryTestMessages objects to be used to test the
|
| + identified method.
|
| + unary_stream_messages_sequences: A dict from method group-name pair to
|
| + sequence of service.UnaryStreamTestMessages objects to be used to test the
|
| + identified method.
|
| + stream_unary_messages_sequences: A dict from method group-name pair to
|
| + sequence of service.StreamUnaryTestMessages objects to be used to test the
|
| + identified method.
|
| + stream_stream_messages_sequences: A dict from method group-name pair to
|
| + sequence of service.StreamStreamTestMessages objects to be used to test
|
| + the identified method.
|
| + """
|
| +
|
| +
|
| +class _BufferingConsumer(stream.Consumer):
|
| + """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
|
| +
|
| + def __init__(self):
|
| + self.consumed = []
|
| + self.terminated = False
|
| +
|
| + def consume(self, value):
|
| + self.consumed.append(value)
|
| +
|
| + def terminate(self):
|
| + self.terminated = True
|
| +
|
| + def consume_and_terminate(self, value):
|
| + self.consumed.append(value)
|
| + self.terminated = True
|
| +
|
| +
|
| +class _InlineUnaryUnaryMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, unary_unary_test_method, control):
|
| + self._test_method = unary_unary_test_method
|
| + self._control = control
|
| +
|
| + self.cardinality = cardinality.Cardinality.UNARY_UNARY
|
| + self.style = style.Service.INLINE
|
| +
|
| + def unary_unary_inline(self, request, context):
|
| + response_list = []
|
| + self._test_method.service(
|
| + request, response_list.append, context, self._control)
|
| + return response_list.pop(0)
|
| +
|
| +
|
| +class _EventUnaryUnaryMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, unary_unary_test_method, control, pool):
|
| + self._test_method = unary_unary_test_method
|
| + self._control = control
|
| + self._pool = pool
|
| +
|
| + self.cardinality = cardinality.Cardinality.UNARY_UNARY
|
| + self.style = style.Service.EVENT
|
| +
|
| + def unary_unary_event(self, request, response_callback, context):
|
| + if self._pool is None:
|
| + self._test_method.service(
|
| + request, response_callback, context, self._control)
|
| + else:
|
| + self._pool.submit(
|
| + self._test_method.service, request, response_callback, context,
|
| + self._control)
|
| +
|
| +
|
| +class _InlineUnaryStreamMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, unary_stream_test_method, control):
|
| + self._test_method = unary_stream_test_method
|
| + self._control = control
|
| +
|
| + self.cardinality = cardinality.Cardinality.UNARY_STREAM
|
| + self.style = style.Service.INLINE
|
| +
|
| + def unary_stream_inline(self, request, context):
|
| + response_consumer = _BufferingConsumer()
|
| + self._test_method.service(
|
| + request, response_consumer, context, self._control)
|
| + for response in response_consumer.consumed:
|
| + yield response
|
| +
|
| +
|
| +class _EventUnaryStreamMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, unary_stream_test_method, control, pool):
|
| + self._test_method = unary_stream_test_method
|
| + self._control = control
|
| + self._pool = pool
|
| +
|
| + self.cardinality = cardinality.Cardinality.UNARY_STREAM
|
| + self.style = style.Service.EVENT
|
| +
|
| + def unary_stream_event(self, request, response_consumer, context):
|
| + if self._pool is None:
|
| + self._test_method.service(
|
| + request, response_consumer, context, self._control)
|
| + else:
|
| + self._pool.submit(
|
| + self._test_method.service, request, response_consumer, context,
|
| + self._control)
|
| +
|
| +
|
| +class _InlineStreamUnaryMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, stream_unary_test_method, control):
|
| + self._test_method = stream_unary_test_method
|
| + self._control = control
|
| +
|
| + self.cardinality = cardinality.Cardinality.STREAM_UNARY
|
| + self.style = style.Service.INLINE
|
| +
|
| + def stream_unary_inline(self, request_iterator, context):
|
| + response_list = []
|
| + request_consumer = self._test_method.service(
|
| + response_list.append, context, self._control)
|
| + for request in request_iterator:
|
| + request_consumer.consume(request)
|
| + request_consumer.terminate()
|
| + return response_list.pop(0)
|
| +
|
| +
|
| +class _EventStreamUnaryMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, stream_unary_test_method, control, pool):
|
| + self._test_method = stream_unary_test_method
|
| + self._control = control
|
| + self._pool = pool
|
| +
|
| + self.cardinality = cardinality.Cardinality.STREAM_UNARY
|
| + self.style = style.Service.EVENT
|
| +
|
| + def stream_unary_event(self, response_callback, context):
|
| + request_consumer = self._test_method.service(
|
| + response_callback, context, self._control)
|
| + if self._pool is None:
|
| + return request_consumer
|
| + else:
|
| + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
|
| +
|
| +
|
| +class _InlineStreamStreamMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, stream_stream_test_method, control):
|
| + self._test_method = stream_stream_test_method
|
| + self._control = control
|
| +
|
| + self.cardinality = cardinality.Cardinality.STREAM_STREAM
|
| + self.style = style.Service.INLINE
|
| +
|
| + def stream_stream_inline(self, request_iterator, context):
|
| + response_consumer = _BufferingConsumer()
|
| + request_consumer = self._test_method.service(
|
| + response_consumer, context, self._control)
|
| +
|
| + for request in request_iterator:
|
| + request_consumer.consume(request)
|
| + while response_consumer.consumed:
|
| + yield response_consumer.consumed.pop(0)
|
| + response_consumer.terminate()
|
| +
|
| +
|
| +class _EventStreamStreamMethod(face.MethodImplementation):
|
| +
|
| + def __init__(self, stream_stream_test_method, control, pool):
|
| + self._test_method = stream_stream_test_method
|
| + self._control = control
|
| + self._pool = pool
|
| +
|
| + self.cardinality = cardinality.Cardinality.STREAM_STREAM
|
| + self.style = style.Service.EVENT
|
| +
|
| + def stream_stream_event(self, response_consumer, context):
|
| + request_consumer = self._test_method.service(
|
| + response_consumer, context, self._control)
|
| + if self._pool is None:
|
| + return request_consumer
|
| + else:
|
| + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
|
| +
|
| +
|
| +class _UnaryConsumer(stream.Consumer):
|
| + """A Consumer that only allows consumption of exactly one value."""
|
| +
|
| + def __init__(self, action):
|
| + self._lock = threading.Lock()
|
| + self._action = action
|
| + self._consumed = False
|
| + self._terminated = False
|
| +
|
| + def consume(self, value):
|
| + with self._lock:
|
| + if self._consumed:
|
| + raise ValueError('Unary consumer already consumed!')
|
| + elif self._terminated:
|
| + raise ValueError('Unary consumer already terminated!')
|
| + else:
|
| + self._consumed = True
|
| +
|
| + self._action(value)
|
| +
|
| + def terminate(self):
|
| + with self._lock:
|
| + if not self._consumed:
|
| + raise ValueError('Unary consumer hasn\'t yet consumed!')
|
| + elif self._terminated:
|
| + raise ValueError('Unary consumer already terminated!')
|
| + else:
|
| + self._terminated = True
|
| +
|
| + def consume_and_terminate(self, value):
|
| + with self._lock:
|
| + if self._consumed:
|
| + raise ValueError('Unary consumer already consumed!')
|
| + elif self._terminated:
|
| + raise ValueError('Unary consumer already terminated!')
|
| + else:
|
| + self._consumed = True
|
| + self._terminated = True
|
| +
|
| + self._action(value)
|
| +
|
| +
|
| +class _UnaryUnaryAdaptation(object):
|
| +
|
| + def __init__(self, unary_unary_test_method):
|
| + self._method = unary_unary_test_method
|
| +
|
| + def service(self, response_consumer, context, control):
|
| + def action(request):
|
| + self._method.service(
|
| + request, response_consumer.consume_and_terminate, context, control)
|
| + return _UnaryConsumer(action)
|
| +
|
| +
|
| +class _UnaryStreamAdaptation(object):
|
| +
|
| + def __init__(self, unary_stream_test_method):
|
| + self._method = unary_stream_test_method
|
| +
|
| + def service(self, response_consumer, context, control):
|
| + def action(request):
|
| + self._method.service(request, response_consumer, context, control)
|
| + return _UnaryConsumer(action)
|
| +
|
| +
|
| +class _StreamUnaryAdaptation(object):
|
| +
|
| + def __init__(self, stream_unary_test_method):
|
| + self._method = stream_unary_test_method
|
| +
|
| + def service(self, response_consumer, context, control):
|
| + return self._method.service(
|
| + response_consumer.consume_and_terminate, context, control)
|
| +
|
| +
|
| +class _MultiMethodImplementation(face.MultiMethodImplementation):
|
| +
|
| + def __init__(self, methods, control, pool):
|
| + self._methods = methods
|
| + self._control = control
|
| + self._pool = pool
|
| +
|
| + def service(self, group, name, response_consumer, context):
|
| + method = self._methods.get(group, name, None)
|
| + if method is None:
|
| + raise face.NoSuchMethodError(group, name)
|
| + elif self._pool is None:
|
| + return method(response_consumer, context, self._control)
|
| + else:
|
| + request_consumer = method(response_consumer, context, self._control)
|
| + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
|
| +
|
| +
|
| +class _Assembly(
|
| + collections.namedtuple(
|
| + '_Assembly',
|
| + ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
|
| + """An intermediate structure created when creating a TestServiceDigest."""
|
| +
|
| +
|
| +def _assemble(
|
| + scenarios, identifiers, inline_method_constructor, event_method_constructor,
|
| + adapter, control, pool):
|
| + """Creates an _Assembly from the given scenarios."""
|
| + methods = {}
|
| + inlines = {}
|
| + events = {}
|
| + adaptations = {}
|
| + messages = {}
|
| + for identifier, scenario in scenarios.iteritems():
|
| + if identifier in identifiers:
|
| + raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
|
| +
|
| + test_method = scenario[0]
|
| + inline_method = inline_method_constructor(test_method, control)
|
| + event_method = event_method_constructor(test_method, control, pool)
|
| + adaptation = adapter(test_method)
|
| +
|
| + methods[identifier] = test_method
|
| + inlines[identifier] = inline_method
|
| + events[identifier] = event_method
|
| + adaptations[identifier] = adaptation
|
| + messages[identifier] = scenario[1]
|
| +
|
| + return _Assembly(methods, inlines, events, adaptations, messages)
|
| +
|
| +
|
| +def digest(service, control, pool):
|
| + """Creates a TestServiceDigest from a TestService.
|
| +
|
| + Args:
|
| + service: A _service.TestService.
|
| + control: A test_control.Control.
|
| + pool: If RPC methods should be serviced in a separate thread, a thread pool.
|
| + None if RPC methods should be serviced in the thread belonging to the
|
| + run-time that calls for their service.
|
| +
|
| + Returns:
|
| + A TestServiceDigest synthesized from the given service.TestService.
|
| + """
|
| + identifiers = set()
|
| +
|
| + unary_unary = _assemble(
|
| + service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod,
|
| + _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
|
| + identifiers.update(unary_unary.inlines)
|
| +
|
| + unary_stream = _assemble(
|
| + service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod,
|
| + _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
|
| + identifiers.update(unary_stream.inlines)
|
| +
|
| + stream_unary = _assemble(
|
| + service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod,
|
| + _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
|
| + identifiers.update(stream_unary.inlines)
|
| +
|
| + stream_stream = _assemble(
|
| + service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod,
|
| + _EventStreamStreamMethod, _IDENTITY, control, pool)
|
| + identifiers.update(stream_stream.inlines)
|
| +
|
| + methods = dict(unary_unary.methods)
|
| + methods.update(unary_stream.methods)
|
| + methods.update(stream_unary.methods)
|
| + methods.update(stream_stream.methods)
|
| + adaptations = dict(unary_unary.adaptations)
|
| + adaptations.update(unary_stream.adaptations)
|
| + adaptations.update(stream_unary.adaptations)
|
| + adaptations.update(stream_stream.adaptations)
|
| + inlines = dict(unary_unary.inlines)
|
| + inlines.update(unary_stream.inlines)
|
| + inlines.update(stream_unary.inlines)
|
| + inlines.update(stream_stream.inlines)
|
| + events = dict(unary_unary.events)
|
| + events.update(unary_stream.events)
|
| + events.update(stream_unary.events)
|
| + events.update(stream_stream.events)
|
| +
|
| + return TestServiceDigest(
|
| + methods,
|
| + inlines,
|
| + events,
|
| + _MultiMethodImplementation(adaptations, control, pool),
|
| + unary_unary.messages,
|
| + unary_stream.messages,
|
| + stream_unary.messages,
|
| + stream_stream.messages)
|
|
|