Index: third_party/grpc/src/python/grpcio/tests/interop/methods.py |
diff --git a/third_party/grpc/src/python/grpcio/tests/interop/methods.py b/third_party/grpc/src/python/grpcio/tests/interop/methods.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b3591aef7bcab8e3e84e22486c8db78155ab0dc3 |
--- /dev/null |
+++ b/third_party/grpc/src/python/grpcio/tests/interop/methods.py |
@@ -0,0 +1,341 @@ |
+# Copyright 2015, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+"""Implementations of interoperability test methods.""" |
+ |
+import enum |
+import json |
+import os |
+import threading |
+import time |
+ |
+from oauth2client import client as oauth2client_client |
+ |
+from grpc.framework.common import cardinality |
+from grpc.framework.interfaces.face import face |
+ |
+from tests.interop import empty_pb2 |
+from tests.interop import messages_pb2 |
+from tests.interop import test_pb2 |
+ |
+_TIMEOUT = 7 |
+ |
+ |
+class TestService(test_pb2.BetaTestServiceServicer): |
+ |
+ def EmptyCall(self, request, context): |
+ return empty_pb2.Empty() |
+ |
+ def UnaryCall(self, request, context): |
+ return messages_pb2.SimpleResponse( |
+ payload=messages_pb2.Payload( |
+ type=messages_pb2.COMPRESSABLE, |
+ body=b'\x00' * request.response_size)) |
+ |
+ def StreamingOutputCall(self, request, context): |
+ for response_parameters in request.response_parameters: |
+ yield messages_pb2.StreamingOutputCallResponse( |
+ payload=messages_pb2.Payload( |
+ type=request.response_type, |
+ body=b'\x00' * response_parameters.size)) |
+ |
+ def StreamingInputCall(self, request_iterator, context): |
+ aggregate_size = 0 |
+ for request in request_iterator: |
+ if request.payload and request.payload.body: |
+ aggregate_size += len(request.payload.body) |
+ return messages_pb2.StreamingInputCallResponse( |
+ aggregated_payload_size=aggregate_size) |
+ |
+ def FullDuplexCall(self, request_iterator, context): |
+ for request in request_iterator: |
+ yield messages_pb2.StreamingOutputCallResponse( |
+ payload=messages_pb2.Payload( |
+ type=request.payload.type, |
+ body=b'\x00' * request.response_parameters[0].size)) |
+ |
+ # NOTE(nathaniel): Apparently this is the same as the full-duplex call? |
+ # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... |
+ def HalfDuplexCall(self, request_iterator, context): |
+ return self.FullDuplexCall(request_iterator, context) |
+ |
+ |
+def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope): |
+ with stub: |
+ request = messages_pb2.SimpleRequest( |
+ response_type=messages_pb2.COMPRESSABLE, response_size=314159, |
+ payload=messages_pb2.Payload(body=b'\x00' * 271828), |
+ fill_username=fill_username, fill_oauth_scope=fill_oauth_scope) |
+ response_future = stub.UnaryCall.future(request, _TIMEOUT) |
+ response = response_future.result() |
+ if response.payload.type is not messages_pb2.COMPRESSABLE: |
+ raise ValueError( |
+ 'response payload type is "%s"!' % type(response.payload.type)) |
+ if len(response.payload.body) != 314159: |
+ raise ValueError( |
+ 'response body of incorrect size %d!' % len(response.payload.body)) |
+ return response |
+ |
+ |
+def _empty_unary(stub): |
+ with stub: |
+ response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT) |
+ if not isinstance(response, empty_pb2.Empty): |
+ raise TypeError( |
+ 'response is of type "%s", not empty_pb2.Empty!', type(response)) |
+ |
+ |
+def _large_unary(stub): |
+ _large_unary_common_behavior(stub, False, False) |
+ |
+ |
+def _client_streaming(stub): |
+ with stub: |
+ payload_body_sizes = (27182, 8, 1828, 45904) |
+ payloads = ( |
+ messages_pb2.Payload(body=b'\x00' * size) |
+ for size in payload_body_sizes) |
+ requests = ( |
+ messages_pb2.StreamingInputCallRequest(payload=payload) |
+ for payload in payloads) |
+ response = stub.StreamingInputCall(requests, _TIMEOUT) |
+ if response.aggregated_payload_size != 74922: |
+ raise ValueError( |
+ 'incorrect size %d!' % response.aggregated_payload_size) |
+ |
+ |
+def _server_streaming(stub): |
+ sizes = (31415, 9, 2653, 58979) |
+ |
+ with stub: |
+ request = messages_pb2.StreamingOutputCallRequest( |
+ response_type=messages_pb2.COMPRESSABLE, |
+ response_parameters=( |
+ messages_pb2.ResponseParameters(size=sizes[0]), |
+ messages_pb2.ResponseParameters(size=sizes[1]), |
+ messages_pb2.ResponseParameters(size=sizes[2]), |
+ messages_pb2.ResponseParameters(size=sizes[3]), |
+ )) |
+ response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) |
+ for index, response in enumerate(response_iterator): |
+ if response.payload.type != messages_pb2.COMPRESSABLE: |
+ raise ValueError( |
+ 'response body of invalid type %s!' % response.payload.type) |
+ if len(response.payload.body) != sizes[index]: |
+ raise ValueError( |
+ 'response body of invalid size %d!' % len(response.payload.body)) |
+ |
+def _cancel_after_begin(stub): |
+ with stub: |
+ sizes = (27182, 8, 1828, 45904) |
+ payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes] |
+ requests = [messages_pb2.StreamingInputCallRequest(payload=payload) |
+ for payload in payloads] |
+ responses = stub.StreamingInputCall.future(requests, _TIMEOUT) |
+ responses.cancel() |
+ if not responses.cancelled(): |
+ raise ValueError('expected call to be cancelled') |
+ |
+ |
+class _Pipe(object): |
+ |
+ def __init__(self): |
+ self._condition = threading.Condition() |
+ self._values = [] |
+ self._open = True |
+ |
+ def __iter__(self): |
+ return self |
+ |
+ def next(self): |
+ with self._condition: |
+ while not self._values and self._open: |
+ self._condition.wait() |
+ if self._values: |
+ return self._values.pop(0) |
+ else: |
+ raise StopIteration() |
+ |
+ def add(self, value): |
+ with self._condition: |
+ self._values.append(value) |
+ self._condition.notify() |
+ |
+ def close(self): |
+ with self._condition: |
+ self._open = False |
+ self._condition.notify() |
+ |
+ def __enter__(self): |
+ return self |
+ |
+ def __exit__(self, type, value, traceback): |
+ self.close() |
+ |
+ |
+def _ping_pong(stub): |
+ request_response_sizes = (31415, 9, 2653, 58979) |
+ request_payload_sizes = (27182, 8, 1828, 45904) |
+ |
+ with stub, _Pipe() as pipe: |
+ response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) |
+ print 'Starting ping-pong with response iterator %s' % response_iterator |
+ for response_size, payload_size in zip( |
+ request_response_sizes, request_payload_sizes): |
+ request = messages_pb2.StreamingOutputCallRequest( |
+ response_type=messages_pb2.COMPRESSABLE, |
+ response_parameters=(messages_pb2.ResponseParameters( |
+ size=response_size),), |
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size)) |
+ pipe.add(request) |
+ response = next(response_iterator) |
+ if response.payload.type != messages_pb2.COMPRESSABLE: |
+ raise ValueError( |
+ 'response body of invalid type %s!' % response.payload.type) |
+ if len(response.payload.body) != response_size: |
+ raise ValueError( |
+ 'response body of invalid size %d!' % len(response.payload.body)) |
+ |
+ |
+def _cancel_after_first_response(stub): |
+ request_response_sizes = (31415, 9, 2653, 58979) |
+ request_payload_sizes = (27182, 8, 1828, 45904) |
+ with stub, _Pipe() as pipe: |
+ response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) |
+ |
+ response_size = request_response_sizes[0] |
+ payload_size = request_payload_sizes[0] |
+ request = messages_pb2.StreamingOutputCallRequest( |
+ response_type=messages_pb2.COMPRESSABLE, |
+ response_parameters=(messages_pb2.ResponseParameters( |
+ size=response_size),), |
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size)) |
+ pipe.add(request) |
+ response = next(response_iterator) |
+ # We test the contents of `response` in the Ping Pong test - don't check |
+ # them here. |
+ response_iterator.cancel() |
+ |
+ try: |
+ next(response_iterator) |
+ except Exception: |
+ pass |
+ else: |
+ raise ValueError('expected call to be cancelled') |
+ |
+ |
+def _timeout_on_sleeping_server(stub): |
+ request_payload_size = 27182 |
+ with stub, _Pipe() as pipe: |
+ response_iterator = stub.FullDuplexCall(pipe, 0.001) |
+ |
+ request = messages_pb2.StreamingOutputCallRequest( |
+ response_type=messages_pb2.COMPRESSABLE, |
+ payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) |
+ pipe.add(request) |
+ time.sleep(0.1) |
+ try: |
+ next(response_iterator) |
+ except face.ExpirationError: |
+ pass |
+ else: |
+ raise ValueError('expected call to exceed deadline') |
+ |
+ |
+def _empty_stream(stub): |
+ with stub, _Pipe() as pipe: |
+ response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) |
+ pipe.close() |
+ try: |
+ next(response_iterator) |
+ raise ValueError('expected exactly 0 responses') |
+ except StopIteration: |
+ pass |
+ |
+ |
+def _compute_engine_creds(stub, args): |
+ response = _large_unary_common_behavior(stub, True, True) |
+ if args.default_service_account != response.username: |
+ raise ValueError( |
+ 'expected username %s, got %s' % (args.default_service_account, |
+ response.username)) |
+ |
+ |
+def _oauth2_auth_token(stub, args): |
+ json_key_filename = os.environ[ |
+ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] |
+ wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] |
+ response = _large_unary_common_behavior(stub, True, True) |
+ if wanted_email != response.username: |
+ raise ValueError( |
+ 'expected username %s, got %s' % (wanted_email, response.username)) |
+ if args.oauth_scope.find(response.oauth_scope) == -1: |
+ raise ValueError( |
+ 'expected to find oauth scope "%s" in received "%s"' % |
+ (response.oauth_scope, args.oauth_scope)) |
+ |
+@enum.unique |
+class TestCase(enum.Enum): |
+ EMPTY_UNARY = 'empty_unary' |
+ LARGE_UNARY = 'large_unary' |
+ SERVER_STREAMING = 'server_streaming' |
+ CLIENT_STREAMING = 'client_streaming' |
+ PING_PONG = 'ping_pong' |
+ CANCEL_AFTER_BEGIN = 'cancel_after_begin' |
+ CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' |
+ EMPTY_STREAM = 'empty_stream' |
+ COMPUTE_ENGINE_CREDS = 'compute_engine_creds' |
+ OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' |
+ TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' |
+ |
+ def test_interoperability(self, stub, args): |
+ if self is TestCase.EMPTY_UNARY: |
+ _empty_unary(stub) |
+ elif self is TestCase.LARGE_UNARY: |
+ _large_unary(stub) |
+ elif self is TestCase.SERVER_STREAMING: |
+ _server_streaming(stub) |
+ elif self is TestCase.CLIENT_STREAMING: |
+ _client_streaming(stub) |
+ elif self is TestCase.PING_PONG: |
+ _ping_pong(stub) |
+ elif self is TestCase.CANCEL_AFTER_BEGIN: |
+ _cancel_after_begin(stub) |
+ elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: |
+ _cancel_after_first_response(stub) |
+ elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: |
+ _timeout_on_sleeping_server(stub) |
+ elif self is TestCase.EMPTY_STREAM: |
+ _empty_stream(stub) |
+ elif self is TestCase.COMPUTE_ENGINE_CREDS: |
+ _compute_engine_creds(stub, args) |
+ elif self is TestCase.OAUTH2_AUTH_TOKEN: |
+ _oauth2_auth_token(stub, args) |
+ else: |
+ raise NotImplementedError('Test case "%s" not implemented!' % self.name) |