Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(204)

Side by Side Diff: third_party/grpc/src/python/grpcio/tests/interop/methods.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015, Google Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 # * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Implementations of interoperability test methods."""
31
32 import enum
33 import json
34 import os
35 import threading
36 import time
37
38 from oauth2client import client as oauth2client_client
39
40 from grpc.framework.common import cardinality
41 from grpc.framework.interfaces.face import face
42
43 from tests.interop import empty_pb2
44 from tests.interop import messages_pb2
45 from tests.interop import test_pb2
46
47 _TIMEOUT = 7
48
49
50 class TestService(test_pb2.BetaTestServiceServicer):
51
52 def EmptyCall(self, request, context):
53 return empty_pb2.Empty()
54
55 def UnaryCall(self, request, context):
56 return messages_pb2.SimpleResponse(
57 payload=messages_pb2.Payload(
58 type=messages_pb2.COMPRESSABLE,
59 body=b'\x00' * request.response_size))
60
61 def StreamingOutputCall(self, request, context):
62 for response_parameters in request.response_parameters:
63 yield messages_pb2.StreamingOutputCallResponse(
64 payload=messages_pb2.Payload(
65 type=request.response_type,
66 body=b'\x00' * response_parameters.size))
67
68 def StreamingInputCall(self, request_iterator, context):
69 aggregate_size = 0
70 for request in request_iterator:
71 if request.payload and request.payload.body:
72 aggregate_size += len(request.payload.body)
73 return messages_pb2.StreamingInputCallResponse(
74 aggregated_payload_size=aggregate_size)
75
76 def FullDuplexCall(self, request_iterator, context):
77 for request in request_iterator:
78 yield messages_pb2.StreamingOutputCallResponse(
79 payload=messages_pb2.Payload(
80 type=request.payload.type,
81 body=b'\x00' * request.response_parameters[0].size))
82
83 # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
84 # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
85 def HalfDuplexCall(self, request_iterator, context):
86 return self.FullDuplexCall(request_iterator, context)
87
88
89 def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope):
90 with stub:
91 request = messages_pb2.SimpleRequest(
92 response_type=messages_pb2.COMPRESSABLE, response_size=314159,
93 payload=messages_pb2.Payload(body=b'\x00' * 271828),
94 fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
95 response_future = stub.UnaryCall.future(request, _TIMEOUT)
96 response = response_future.result()
97 if response.payload.type is not messages_pb2.COMPRESSABLE:
98 raise ValueError(
99 'response payload type is "%s"!' % type(response.payload.type))
100 if len(response.payload.body) != 314159:
101 raise ValueError(
102 'response body of incorrect size %d!' % len(response.payload.body))
103 return response
104
105
106 def _empty_unary(stub):
107 with stub:
108 response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
109 if not isinstance(response, empty_pb2.Empty):
110 raise TypeError(
111 'response is of type "%s", not empty_pb2.Empty!', type(response))
112
113
114 def _large_unary(stub):
115 _large_unary_common_behavior(stub, False, False)
116
117
118 def _client_streaming(stub):
119 with stub:
120 payload_body_sizes = (27182, 8, 1828, 45904)
121 payloads = (
122 messages_pb2.Payload(body=b'\x00' * size)
123 for size in payload_body_sizes)
124 requests = (
125 messages_pb2.StreamingInputCallRequest(payload=payload)
126 for payload in payloads)
127 response = stub.StreamingInputCall(requests, _TIMEOUT)
128 if response.aggregated_payload_size != 74922:
129 raise ValueError(
130 'incorrect size %d!' % response.aggregated_payload_size)
131
132
133 def _server_streaming(stub):
134 sizes = (31415, 9, 2653, 58979)
135
136 with stub:
137 request = messages_pb2.StreamingOutputCallRequest(
138 response_type=messages_pb2.COMPRESSABLE,
139 response_parameters=(
140 messages_pb2.ResponseParameters(size=sizes[0]),
141 messages_pb2.ResponseParameters(size=sizes[1]),
142 messages_pb2.ResponseParameters(size=sizes[2]),
143 messages_pb2.ResponseParameters(size=sizes[3]),
144 ))
145 response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
146 for index, response in enumerate(response_iterator):
147 if response.payload.type != messages_pb2.COMPRESSABLE:
148 raise ValueError(
149 'response body of invalid type %s!' % response.payload.type)
150 if len(response.payload.body) != sizes[index]:
151 raise ValueError(
152 'response body of invalid size %d!' % len(response.payload.body))
153
154 def _cancel_after_begin(stub):
155 with stub:
156 sizes = (27182, 8, 1828, 45904)
157 payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
158 requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
159 for payload in payloads]
160 responses = stub.StreamingInputCall.future(requests, _TIMEOUT)
161 responses.cancel()
162 if not responses.cancelled():
163 raise ValueError('expected call to be cancelled')
164
165
166 class _Pipe(object):
167
168 def __init__(self):
169 self._condition = threading.Condition()
170 self._values = []
171 self._open = True
172
173 def __iter__(self):
174 return self
175
176 def next(self):
177 with self._condition:
178 while not self._values and self._open:
179 self._condition.wait()
180 if self._values:
181 return self._values.pop(0)
182 else:
183 raise StopIteration()
184
185 def add(self, value):
186 with self._condition:
187 self._values.append(value)
188 self._condition.notify()
189
190 def close(self):
191 with self._condition:
192 self._open = False
193 self._condition.notify()
194
195 def __enter__(self):
196 return self
197
198 def __exit__(self, type, value, traceback):
199 self.close()
200
201
202 def _ping_pong(stub):
203 request_response_sizes = (31415, 9, 2653, 58979)
204 request_payload_sizes = (27182, 8, 1828, 45904)
205
206 with stub, _Pipe() as pipe:
207 response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
208 print 'Starting ping-pong with response iterator %s' % response_iterator
209 for response_size, payload_size in zip(
210 request_response_sizes, request_payload_sizes):
211 request = messages_pb2.StreamingOutputCallRequest(
212 response_type=messages_pb2.COMPRESSABLE,
213 response_parameters=(messages_pb2.ResponseParameters(
214 size=response_size),),
215 payload=messages_pb2.Payload(body=b'\x00' * payload_size))
216 pipe.add(request)
217 response = next(response_iterator)
218 if response.payload.type != messages_pb2.COMPRESSABLE:
219 raise ValueError(
220 'response body of invalid type %s!' % response.payload.type)
221 if len(response.payload.body) != response_size:
222 raise ValueError(
223 'response body of invalid size %d!' % len(response.payload.body))
224
225
226 def _cancel_after_first_response(stub):
227 request_response_sizes = (31415, 9, 2653, 58979)
228 request_payload_sizes = (27182, 8, 1828, 45904)
229 with stub, _Pipe() as pipe:
230 response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
231
232 response_size = request_response_sizes[0]
233 payload_size = request_payload_sizes[0]
234 request = messages_pb2.StreamingOutputCallRequest(
235 response_type=messages_pb2.COMPRESSABLE,
236 response_parameters=(messages_pb2.ResponseParameters(
237 size=response_size),),
238 payload=messages_pb2.Payload(body=b'\x00' * payload_size))
239 pipe.add(request)
240 response = next(response_iterator)
241 # We test the contents of `response` in the Ping Pong test - don't check
242 # them here.
243 response_iterator.cancel()
244
245 try:
246 next(response_iterator)
247 except Exception:
248 pass
249 else:
250 raise ValueError('expected call to be cancelled')
251
252
253 def _timeout_on_sleeping_server(stub):
254 request_payload_size = 27182
255 with stub, _Pipe() as pipe:
256 response_iterator = stub.FullDuplexCall(pipe, 0.001)
257
258 request = messages_pb2.StreamingOutputCallRequest(
259 response_type=messages_pb2.COMPRESSABLE,
260 payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
261 pipe.add(request)
262 time.sleep(0.1)
263 try:
264 next(response_iterator)
265 except face.ExpirationError:
266 pass
267 else:
268 raise ValueError('expected call to exceed deadline')
269
270
271 def _empty_stream(stub):
272 with stub, _Pipe() as pipe:
273 response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
274 pipe.close()
275 try:
276 next(response_iterator)
277 raise ValueError('expected exactly 0 responses')
278 except StopIteration:
279 pass
280
281
282 def _compute_engine_creds(stub, args):
283 response = _large_unary_common_behavior(stub, True, True)
284 if args.default_service_account != response.username:
285 raise ValueError(
286 'expected username %s, got %s' % (args.default_service_account,
287 response.username))
288
289
290 def _oauth2_auth_token(stub, args):
291 json_key_filename = os.environ[
292 oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
293 wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
294 response = _large_unary_common_behavior(stub, True, True)
295 if wanted_email != response.username:
296 raise ValueError(
297 'expected username %s, got %s' % (wanted_email, response.username))
298 if args.oauth_scope.find(response.oauth_scope) == -1:
299 raise ValueError(
300 'expected to find oauth scope "%s" in received "%s"' %
301 (response.oauth_scope, args.oauth_scope))
302
303 @enum.unique
304 class TestCase(enum.Enum):
305 EMPTY_UNARY = 'empty_unary'
306 LARGE_UNARY = 'large_unary'
307 SERVER_STREAMING = 'server_streaming'
308 CLIENT_STREAMING = 'client_streaming'
309 PING_PONG = 'ping_pong'
310 CANCEL_AFTER_BEGIN = 'cancel_after_begin'
311 CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
312 EMPTY_STREAM = 'empty_stream'
313 COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
314 OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
315 TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
316
317 def test_interoperability(self, stub, args):
318 if self is TestCase.EMPTY_UNARY:
319 _empty_unary(stub)
320 elif self is TestCase.LARGE_UNARY:
321 _large_unary(stub)
322 elif self is TestCase.SERVER_STREAMING:
323 _server_streaming(stub)
324 elif self is TestCase.CLIENT_STREAMING:
325 _client_streaming(stub)
326 elif self is TestCase.PING_PONG:
327 _ping_pong(stub)
328 elif self is TestCase.CANCEL_AFTER_BEGIN:
329 _cancel_after_begin(stub)
330 elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
331 _cancel_after_first_response(stub)
332 elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
333 _timeout_on_sleeping_server(stub)
334 elif self is TestCase.EMPTY_STREAM:
335 _empty_stream(stub)
336 elif self is TestCase.COMPUTE_ENGINE_CREDS:
337 _compute_engine_creds(stub, args)
338 elif self is TestCase.OAUTH2_AUTH_TOKEN:
339 _oauth2_auth_token(stub, args)
340 else:
341 raise NotImplementedError('Test case "%s" not implemented!' % self.name)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698