OLD | NEW |
(Empty) | |
| 1 # Copyright 2015, Google Inc. |
| 2 # All rights reserved. |
| 3 # |
| 4 # Redistribution and use in source and binary forms, with or without |
| 5 # modification, are permitted provided that the following conditions are |
| 6 # met: |
| 7 # |
| 8 # * Redistributions of source code must retain the above copyright |
| 9 # notice, this list of conditions and the following disclaimer. |
| 10 # * Redistributions in binary form must reproduce the above |
| 11 # copyright notice, this list of conditions and the following disclaimer |
| 12 # in the documentation and/or other materials provided with the |
| 13 # distribution. |
| 14 # * Neither the name of Google Inc. nor the names of its |
| 15 # contributors may be used to endorse or promote products derived from |
| 16 # this software without specific prior written permission. |
| 17 # |
| 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 29 |
| 30 """Implementations of interoperability test methods.""" |
| 31 |
| 32 import enum |
| 33 import json |
| 34 import os |
| 35 import threading |
| 36 import time |
| 37 |
| 38 from oauth2client import client as oauth2client_client |
| 39 |
| 40 from grpc.framework.common import cardinality |
| 41 from grpc.framework.interfaces.face import face |
| 42 |
| 43 from tests.interop import empty_pb2 |
| 44 from tests.interop import messages_pb2 |
| 45 from tests.interop import test_pb2 |
| 46 |
| 47 _TIMEOUT = 7 |
| 48 |
| 49 |
| 50 class TestService(test_pb2.BetaTestServiceServicer): |
| 51 |
| 52 def EmptyCall(self, request, context): |
| 53 return empty_pb2.Empty() |
| 54 |
| 55 def UnaryCall(self, request, context): |
| 56 return messages_pb2.SimpleResponse( |
| 57 payload=messages_pb2.Payload( |
| 58 type=messages_pb2.COMPRESSABLE, |
| 59 body=b'\x00' * request.response_size)) |
| 60 |
| 61 def StreamingOutputCall(self, request, context): |
| 62 for response_parameters in request.response_parameters: |
| 63 yield messages_pb2.StreamingOutputCallResponse( |
| 64 payload=messages_pb2.Payload( |
| 65 type=request.response_type, |
| 66 body=b'\x00' * response_parameters.size)) |
| 67 |
| 68 def StreamingInputCall(self, request_iterator, context): |
| 69 aggregate_size = 0 |
| 70 for request in request_iterator: |
| 71 if request.payload and request.payload.body: |
| 72 aggregate_size += len(request.payload.body) |
| 73 return messages_pb2.StreamingInputCallResponse( |
| 74 aggregated_payload_size=aggregate_size) |
| 75 |
| 76 def FullDuplexCall(self, request_iterator, context): |
| 77 for request in request_iterator: |
| 78 yield messages_pb2.StreamingOutputCallResponse( |
| 79 payload=messages_pb2.Payload( |
| 80 type=request.payload.type, |
| 81 body=b'\x00' * request.response_parameters[0].size)) |
| 82 |
| 83 # NOTE(nathaniel): Apparently this is the same as the full-duplex call? |
| 84 # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... |
| 85 def HalfDuplexCall(self, request_iterator, context): |
| 86 return self.FullDuplexCall(request_iterator, context) |
| 87 |
| 88 |
| 89 def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope): |
| 90 with stub: |
| 91 request = messages_pb2.SimpleRequest( |
| 92 response_type=messages_pb2.COMPRESSABLE, response_size=314159, |
| 93 payload=messages_pb2.Payload(body=b'\x00' * 271828), |
| 94 fill_username=fill_username, fill_oauth_scope=fill_oauth_scope) |
| 95 response_future = stub.UnaryCall.future(request, _TIMEOUT) |
| 96 response = response_future.result() |
| 97 if response.payload.type is not messages_pb2.COMPRESSABLE: |
| 98 raise ValueError( |
| 99 'response payload type is "%s"!' % type(response.payload.type)) |
| 100 if len(response.payload.body) != 314159: |
| 101 raise ValueError( |
| 102 'response body of incorrect size %d!' % len(response.payload.body)) |
| 103 return response |
| 104 |
| 105 |
| 106 def _empty_unary(stub): |
| 107 with stub: |
| 108 response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT) |
| 109 if not isinstance(response, empty_pb2.Empty): |
| 110 raise TypeError( |
| 111 'response is of type "%s", not empty_pb2.Empty!', type(response)) |
| 112 |
| 113 |
| 114 def _large_unary(stub): |
| 115 _large_unary_common_behavior(stub, False, False) |
| 116 |
| 117 |
| 118 def _client_streaming(stub): |
| 119 with stub: |
| 120 payload_body_sizes = (27182, 8, 1828, 45904) |
| 121 payloads = ( |
| 122 messages_pb2.Payload(body=b'\x00' * size) |
| 123 for size in payload_body_sizes) |
| 124 requests = ( |
| 125 messages_pb2.StreamingInputCallRequest(payload=payload) |
| 126 for payload in payloads) |
| 127 response = stub.StreamingInputCall(requests, _TIMEOUT) |
| 128 if response.aggregated_payload_size != 74922: |
| 129 raise ValueError( |
| 130 'incorrect size %d!' % response.aggregated_payload_size) |
| 131 |
| 132 |
| 133 def _server_streaming(stub): |
| 134 sizes = (31415, 9, 2653, 58979) |
| 135 |
| 136 with stub: |
| 137 request = messages_pb2.StreamingOutputCallRequest( |
| 138 response_type=messages_pb2.COMPRESSABLE, |
| 139 response_parameters=( |
| 140 messages_pb2.ResponseParameters(size=sizes[0]), |
| 141 messages_pb2.ResponseParameters(size=sizes[1]), |
| 142 messages_pb2.ResponseParameters(size=sizes[2]), |
| 143 messages_pb2.ResponseParameters(size=sizes[3]), |
| 144 )) |
| 145 response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) |
| 146 for index, response in enumerate(response_iterator): |
| 147 if response.payload.type != messages_pb2.COMPRESSABLE: |
| 148 raise ValueError( |
| 149 'response body of invalid type %s!' % response.payload.type) |
| 150 if len(response.payload.body) != sizes[index]: |
| 151 raise ValueError( |
| 152 'response body of invalid size %d!' % len(response.payload.body)) |
| 153 |
| 154 def _cancel_after_begin(stub): |
| 155 with stub: |
| 156 sizes = (27182, 8, 1828, 45904) |
| 157 payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes] |
| 158 requests = [messages_pb2.StreamingInputCallRequest(payload=payload) |
| 159 for payload in payloads] |
| 160 responses = stub.StreamingInputCall.future(requests, _TIMEOUT) |
| 161 responses.cancel() |
| 162 if not responses.cancelled(): |
| 163 raise ValueError('expected call to be cancelled') |
| 164 |
| 165 |
| 166 class _Pipe(object): |
| 167 |
| 168 def __init__(self): |
| 169 self._condition = threading.Condition() |
| 170 self._values = [] |
| 171 self._open = True |
| 172 |
| 173 def __iter__(self): |
| 174 return self |
| 175 |
| 176 def next(self): |
| 177 with self._condition: |
| 178 while not self._values and self._open: |
| 179 self._condition.wait() |
| 180 if self._values: |
| 181 return self._values.pop(0) |
| 182 else: |
| 183 raise StopIteration() |
| 184 |
| 185 def add(self, value): |
| 186 with self._condition: |
| 187 self._values.append(value) |
| 188 self._condition.notify() |
| 189 |
| 190 def close(self): |
| 191 with self._condition: |
| 192 self._open = False |
| 193 self._condition.notify() |
| 194 |
| 195 def __enter__(self): |
| 196 return self |
| 197 |
| 198 def __exit__(self, type, value, traceback): |
| 199 self.close() |
| 200 |
| 201 |
| 202 def _ping_pong(stub): |
| 203 request_response_sizes = (31415, 9, 2653, 58979) |
| 204 request_payload_sizes = (27182, 8, 1828, 45904) |
| 205 |
| 206 with stub, _Pipe() as pipe: |
| 207 response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) |
| 208 print 'Starting ping-pong with response iterator %s' % response_iterator |
| 209 for response_size, payload_size in zip( |
| 210 request_response_sizes, request_payload_sizes): |
| 211 request = messages_pb2.StreamingOutputCallRequest( |
| 212 response_type=messages_pb2.COMPRESSABLE, |
| 213 response_parameters=(messages_pb2.ResponseParameters( |
| 214 size=response_size),), |
| 215 payload=messages_pb2.Payload(body=b'\x00' * payload_size)) |
| 216 pipe.add(request) |
| 217 response = next(response_iterator) |
| 218 if response.payload.type != messages_pb2.COMPRESSABLE: |
| 219 raise ValueError( |
| 220 'response body of invalid type %s!' % response.payload.type) |
| 221 if len(response.payload.body) != response_size: |
| 222 raise ValueError( |
| 223 'response body of invalid size %d!' % len(response.payload.body)) |
| 224 |
| 225 |
| 226 def _cancel_after_first_response(stub): |
| 227 request_response_sizes = (31415, 9, 2653, 58979) |
| 228 request_payload_sizes = (27182, 8, 1828, 45904) |
| 229 with stub, _Pipe() as pipe: |
| 230 response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) |
| 231 |
| 232 response_size = request_response_sizes[0] |
| 233 payload_size = request_payload_sizes[0] |
| 234 request = messages_pb2.StreamingOutputCallRequest( |
| 235 response_type=messages_pb2.COMPRESSABLE, |
| 236 response_parameters=(messages_pb2.ResponseParameters( |
| 237 size=response_size),), |
| 238 payload=messages_pb2.Payload(body=b'\x00' * payload_size)) |
| 239 pipe.add(request) |
| 240 response = next(response_iterator) |
| 241 # We test the contents of `response` in the Ping Pong test - don't check |
| 242 # them here. |
| 243 response_iterator.cancel() |
| 244 |
| 245 try: |
| 246 next(response_iterator) |
| 247 except Exception: |
| 248 pass |
| 249 else: |
| 250 raise ValueError('expected call to be cancelled') |
| 251 |
| 252 |
| 253 def _timeout_on_sleeping_server(stub): |
| 254 request_payload_size = 27182 |
| 255 with stub, _Pipe() as pipe: |
| 256 response_iterator = stub.FullDuplexCall(pipe, 0.001) |
| 257 |
| 258 request = messages_pb2.StreamingOutputCallRequest( |
| 259 response_type=messages_pb2.COMPRESSABLE, |
| 260 payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) |
| 261 pipe.add(request) |
| 262 time.sleep(0.1) |
| 263 try: |
| 264 next(response_iterator) |
| 265 except face.ExpirationError: |
| 266 pass |
| 267 else: |
| 268 raise ValueError('expected call to exceed deadline') |
| 269 |
| 270 |
| 271 def _empty_stream(stub): |
| 272 with stub, _Pipe() as pipe: |
| 273 response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) |
| 274 pipe.close() |
| 275 try: |
| 276 next(response_iterator) |
| 277 raise ValueError('expected exactly 0 responses') |
| 278 except StopIteration: |
| 279 pass |
| 280 |
| 281 |
| 282 def _compute_engine_creds(stub, args): |
| 283 response = _large_unary_common_behavior(stub, True, True) |
| 284 if args.default_service_account != response.username: |
| 285 raise ValueError( |
| 286 'expected username %s, got %s' % (args.default_service_account, |
| 287 response.username)) |
| 288 |
| 289 |
| 290 def _oauth2_auth_token(stub, args): |
| 291 json_key_filename = os.environ[ |
| 292 oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] |
| 293 wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] |
| 294 response = _large_unary_common_behavior(stub, True, True) |
| 295 if wanted_email != response.username: |
| 296 raise ValueError( |
| 297 'expected username %s, got %s' % (wanted_email, response.username)) |
| 298 if args.oauth_scope.find(response.oauth_scope) == -1: |
| 299 raise ValueError( |
| 300 'expected to find oauth scope "%s" in received "%s"' % |
| 301 (response.oauth_scope, args.oauth_scope)) |
| 302 |
| 303 @enum.unique |
| 304 class TestCase(enum.Enum): |
| 305 EMPTY_UNARY = 'empty_unary' |
| 306 LARGE_UNARY = 'large_unary' |
| 307 SERVER_STREAMING = 'server_streaming' |
| 308 CLIENT_STREAMING = 'client_streaming' |
| 309 PING_PONG = 'ping_pong' |
| 310 CANCEL_AFTER_BEGIN = 'cancel_after_begin' |
| 311 CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' |
| 312 EMPTY_STREAM = 'empty_stream' |
| 313 COMPUTE_ENGINE_CREDS = 'compute_engine_creds' |
| 314 OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' |
| 315 TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' |
| 316 |
| 317 def test_interoperability(self, stub, args): |
| 318 if self is TestCase.EMPTY_UNARY: |
| 319 _empty_unary(stub) |
| 320 elif self is TestCase.LARGE_UNARY: |
| 321 _large_unary(stub) |
| 322 elif self is TestCase.SERVER_STREAMING: |
| 323 _server_streaming(stub) |
| 324 elif self is TestCase.CLIENT_STREAMING: |
| 325 _client_streaming(stub) |
| 326 elif self is TestCase.PING_PONG: |
| 327 _ping_pong(stub) |
| 328 elif self is TestCase.CANCEL_AFTER_BEGIN: |
| 329 _cancel_after_begin(stub) |
| 330 elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: |
| 331 _cancel_after_first_response(stub) |
| 332 elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: |
| 333 _timeout_on_sleeping_server(stub) |
| 334 elif self is TestCase.EMPTY_STREAM: |
| 335 _empty_stream(stub) |
| 336 elif self is TestCase.COMPUTE_ENGINE_CREDS: |
| 337 _compute_engine_creds(stub, args) |
| 338 elif self is TestCase.OAUTH2_AUTH_TOKEN: |
| 339 _oauth2_auth_token(stub, args) |
| 340 else: |
| 341 raise NotImplementedError('Test case "%s" not implemented!' % self.name) |
OLD | NEW |