Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: third_party/grpc/test/cpp/end2end/hybrid_end2end_test.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <memory>
35 #include <thread>
36
37 #include <grpc++/channel.h>
38 #include <grpc++/client_context.h>
39 #include <grpc++/create_channel.h>
40 #include <grpc++/generic/async_generic_service.h>
41 #include <grpc++/server.h>
42 #include <grpc++/server_builder.h>
43 #include <grpc++/server_context.h>
44 #include <grpc/grpc.h>
45 #include <gtest/gtest.h>
46
47 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
48 #include "src/proto/grpc/testing/echo.grpc.pb.h"
49 #include "test/core/util/port.h"
50 #include "test/core/util/test_config.h"
51 #include "test/cpp/end2end/test_service_impl.h"
52 #include "test/cpp/util/byte_buffer_proto_helper.h"
53
54 namespace grpc {
55 namespace testing {
56
57 namespace {
58
59 void* tag(int i) { return (void*)(intptr_t)i; }
60
61 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
62 void* got_tag;
63 bool ok;
64 EXPECT_TRUE(cq->Next(&got_tag, &ok));
65 EXPECT_EQ(tag(i), got_tag);
66 return ok;
67 }
68
69 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
70 EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
71 }
72
73 // Handlers to handle async request at a server. To be run in a separate thread.
74 template <class Service>
75 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
76 ServerContext srv_ctx;
77 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
78 EchoRequest recv_request;
79 EchoResponse send_response;
80 service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
81 tag(1));
82 Verify(cq, 1, true);
83 send_response.set_message(recv_request.message());
84 if (dup_service) {
85 send_response.mutable_message()->append("_dup");
86 }
87 response_writer.Finish(send_response, Status::OK, tag(2));
88 Verify(cq, 2, true);
89 }
90
91 template <class Service>
92 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
93 ServerContext srv_ctx;
94 EchoRequest recv_request;
95 EchoResponse send_response;
96 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
97 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
98 Verify(cq, 1, true);
99 int i = 1;
100 do {
101 i++;
102 send_response.mutable_message()->append(recv_request.message());
103 srv_stream.Read(&recv_request, tag(i));
104 } while (VerifyReturnSuccess(cq, i));
105 srv_stream.Finish(send_response, Status::OK, tag(100));
106 Verify(cq, 100, true);
107 }
108
109 template <class Service>
110 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
111 ServerContext srv_ctx;
112 EchoRequest recv_request;
113 EchoResponse send_response;
114 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
115 service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
116 tag(1));
117 Verify(cq, 1, true);
118 send_response.set_message(recv_request.message() + "0");
119 srv_stream.Write(send_response, tag(2));
120 Verify(cq, 2, true);
121 send_response.set_message(recv_request.message() + "1");
122 srv_stream.Write(send_response, tag(3));
123 Verify(cq, 3, true);
124 send_response.set_message(recv_request.message() + "2");
125 srv_stream.Write(send_response, tag(4));
126 Verify(cq, 4, true);
127 srv_stream.Finish(Status::OK, tag(5));
128 Verify(cq, 5, true);
129 }
130
131 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
132 CompletionQueue* cq) {
133 ByteBuffer recv_buffer;
134 stream->Read(&recv_buffer, tag(2));
135 Verify(cq, 2, true);
136 EchoRequest recv_request;
137 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
138 EchoResponse send_response;
139 send_response.set_message(recv_request.message());
140 auto send_buffer = SerializeToByteBuffer(&send_response);
141 stream->Write(*send_buffer, tag(3));
142 Verify(cq, 3, true);
143 stream->Finish(Status::OK, tag(4));
144 Verify(cq, 4, true);
145 }
146
147 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
148 CompletionQueue* cq) {
149 ByteBuffer recv_buffer;
150 EchoRequest recv_request;
151 EchoResponse send_response;
152 int i = 1;
153 while (true) {
154 i++;
155 stream->Read(&recv_buffer, tag(i));
156 if (!VerifyReturnSuccess(cq, i)) {
157 break;
158 }
159 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
160 send_response.mutable_message()->append(recv_request.message());
161 }
162 auto send_buffer = SerializeToByteBuffer(&send_response);
163 stream->Write(*send_buffer, tag(99));
164 Verify(cq, 99, true);
165 stream->Finish(Status::OK, tag(100));
166 Verify(cq, 100, true);
167 }
168
169 // Request and handle one generic call.
170 void HandleGenericCall(AsyncGenericService* service,
171 ServerCompletionQueue* cq) {
172 GenericServerContext srv_ctx;
173 GenericServerAsyncReaderWriter stream(&srv_ctx);
174 service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
175 Verify(cq, 1, true);
176 if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
177 HandleGenericEcho(&stream, cq);
178 } else if (srv_ctx.method() ==
179 "/grpc.testing.EchoTestService/RequestStream") {
180 HandleGenericRequestStream(&stream, cq);
181 } else { // other methods not handled yet.
182 gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
183 GPR_ASSERT(0);
184 }
185 }
186
187 class TestServiceImplDupPkg
188 : public ::grpc::testing::duplicate::EchoTestService::Service {
189 public:
190 Status Echo(ServerContext* context, const EchoRequest* request,
191 EchoResponse* response) GRPC_OVERRIDE {
192 response->set_message(request->message() + "_dup");
193 return Status::OK;
194 }
195 };
196
197 class HybridEnd2endTest : public ::testing::Test {
198 protected:
199 HybridEnd2endTest() {}
200
201 void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2,
202 AsyncGenericService* generic_service) {
203 int port = grpc_pick_unused_port_or_die();
204 server_address_ << "localhost:" << port;
205
206 // Setup server
207 ServerBuilder builder;
208 builder.AddListeningPort(server_address_.str(),
209 grpc::InsecureServerCredentials());
210 builder.RegisterService(service1);
211 if (service2) {
212 builder.RegisterService(service2);
213 }
214 if (generic_service) {
215 builder.RegisterAsyncGenericService(generic_service);
216 }
217 // Create a separate cq for each potential handler.
218 for (int i = 0; i < 5; i++) {
219 cqs_.push_back(builder.AddCompletionQueue());
220 }
221 server_ = builder.BuildAndStart();
222 }
223
224 void TearDown() GRPC_OVERRIDE {
225 if (server_) {
226 server_->Shutdown();
227 }
228 void* ignored_tag;
229 bool ignored_ok;
230 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
231 (*it)->Shutdown();
232 while ((*it)->Next(&ignored_tag, &ignored_ok))
233 ;
234 }
235 }
236
237 void ResetStub() {
238 std::shared_ptr<Channel> channel =
239 CreateChannel(server_address_.str(), InsecureChannelCredentials());
240 stub_ = grpc::testing::EchoTestService::NewStub(channel);
241 }
242
243 // Test all rpc methods.
244 void TestAllMethods() {
245 SendEcho();
246 SendSimpleClientStreaming();
247 SendSimpleServerStreaming();
248 SendBidiStreaming();
249 }
250
251 void SendEcho() {
252 EchoRequest send_request;
253 EchoResponse recv_response;
254 ClientContext cli_ctx;
255 send_request.set_message("Hello");
256 Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
257 EXPECT_EQ(send_request.message(), recv_response.message());
258 EXPECT_TRUE(recv_status.ok());
259 }
260
261 void SendEchoToDupService() {
262 std::shared_ptr<Channel> channel =
263 CreateChannel(server_address_.str(), InsecureChannelCredentials());
264 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
265 EchoRequest send_request;
266 EchoResponse recv_response;
267 ClientContext cli_ctx;
268 send_request.set_message("Hello");
269 Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
270 EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
271 EXPECT_TRUE(recv_status.ok());
272 }
273
274 void SendSimpleClientStreaming() {
275 EchoRequest send_request;
276 EchoResponse recv_response;
277 grpc::string expected_message;
278 ClientContext cli_ctx;
279 send_request.set_message("Hello");
280 auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
281 for (int i = 0; i < 5; i++) {
282 EXPECT_TRUE(stream->Write(send_request));
283 expected_message.append(send_request.message());
284 }
285 stream->WritesDone();
286 Status recv_status = stream->Finish();
287 EXPECT_EQ(expected_message, recv_response.message());
288 EXPECT_TRUE(recv_status.ok());
289 }
290
291 void SendSimpleServerStreaming() {
292 EchoRequest request;
293 EchoResponse response;
294 ClientContext context;
295 request.set_message("hello");
296
297 auto stream = stub_->ResponseStream(&context, request);
298 EXPECT_TRUE(stream->Read(&response));
299 EXPECT_EQ(response.message(), request.message() + "0");
300 EXPECT_TRUE(stream->Read(&response));
301 EXPECT_EQ(response.message(), request.message() + "1");
302 EXPECT_TRUE(stream->Read(&response));
303 EXPECT_EQ(response.message(), request.message() + "2");
304 EXPECT_FALSE(stream->Read(&response));
305
306 Status s = stream->Finish();
307 EXPECT_TRUE(s.ok());
308 }
309
310 void SendBidiStreaming() {
311 EchoRequest request;
312 EchoResponse response;
313 ClientContext context;
314 grpc::string msg("hello");
315
316 auto stream = stub_->BidiStream(&context);
317
318 request.set_message(msg + "0");
319 EXPECT_TRUE(stream->Write(request));
320 EXPECT_TRUE(stream->Read(&response));
321 EXPECT_EQ(response.message(), request.message());
322
323 request.set_message(msg + "1");
324 EXPECT_TRUE(stream->Write(request));
325 EXPECT_TRUE(stream->Read(&response));
326 EXPECT_EQ(response.message(), request.message());
327
328 request.set_message(msg + "2");
329 EXPECT_TRUE(stream->Write(request));
330 EXPECT_TRUE(stream->Read(&response));
331 EXPECT_EQ(response.message(), request.message());
332
333 stream->WritesDone();
334 EXPECT_FALSE(stream->Read(&response));
335 EXPECT_FALSE(stream->Read(&response));
336
337 Status s = stream->Finish();
338 EXPECT_TRUE(s.ok());
339 }
340
341 std::vector<std::unique_ptr<ServerCompletionQueue> > cqs_;
342 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
343 std::unique_ptr<Server> server_;
344 std::ostringstream server_address_;
345 };
346
347 TEST_F(HybridEnd2endTest, AsyncEcho) {
348 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> service;
349 SetUpServer(&service, nullptr, nullptr);
350 ResetStub();
351 std::thread echo_handler_thread(
352 [this, &service] { HandleEcho(&service, cqs_[0].get(), false); });
353 TestAllMethods();
354 echo_handler_thread.join();
355 }
356
357 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
358 EchoTestService::WithAsyncMethod_RequestStream<
359 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> > service;
360 SetUpServer(&service, nullptr, nullptr);
361 ResetStub();
362 std::thread echo_handler_thread(
363 [this, &service] { HandleEcho(&service, cqs_[0].get(), false); });
364 std::thread request_stream_handler_thread(
365 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
366 TestAllMethods();
367 echo_handler_thread.join();
368 request_stream_handler_thread.join();
369 }
370
371 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
372 EchoTestService::WithAsyncMethod_RequestStream<
373 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> >
374 service;
375 SetUpServer(&service, nullptr, nullptr);
376 ResetStub();
377 std::thread response_stream_handler_thread(
378 [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); });
379 std::thread request_stream_handler_thread(
380 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
381 TestAllMethods();
382 response_stream_handler_thread.join();
383 request_stream_handler_thread.join();
384 }
385
386 // Add a second service with one sync method.
387 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
388 EchoTestService::WithAsyncMethod_RequestStream<
389 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> >
390 service;
391 TestServiceImplDupPkg dup_service;
392 SetUpServer(&service, &dup_service, nullptr);
393 ResetStub();
394 std::thread response_stream_handler_thread(
395 [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); });
396 std::thread request_stream_handler_thread(
397 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
398 TestAllMethods();
399 SendEchoToDupService();
400 response_stream_handler_thread.join();
401 request_stream_handler_thread.join();
402 }
403
404 // Add a second service with one async method.
405 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
406 EchoTestService::WithAsyncMethod_RequestStream<
407 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> >
408 service;
409 duplicate::EchoTestService::AsyncService dup_service;
410 SetUpServer(&service, &dup_service, nullptr);
411 ResetStub();
412 std::thread response_stream_handler_thread(
413 [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); });
414 std::thread request_stream_handler_thread(
415 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
416 std::thread echo_handler_thread(
417 [this, &dup_service] { HandleEcho(&dup_service, cqs_[2].get(), true); });
418 TestAllMethods();
419 SendEchoToDupService();
420 response_stream_handler_thread.join();
421 request_stream_handler_thread.join();
422 echo_handler_thread.join();
423 }
424
425 TEST_F(HybridEnd2endTest, GenericEcho) {
426 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
427 AsyncGenericService generic_service;
428 SetUpServer(&service, nullptr, &generic_service);
429 ResetStub();
430 std::thread generic_handler_thread([this, &generic_service] {
431 HandleGenericCall(&generic_service, cqs_[0].get());
432 });
433 TestAllMethods();
434 generic_handler_thread.join();
435 }
436
437 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
438 EchoTestService::WithAsyncMethod_RequestStream<
439 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> > service;
440 AsyncGenericService generic_service;
441 SetUpServer(&service, nullptr, &generic_service);
442 ResetStub();
443 std::thread generic_handler_thread([this, &generic_service] {
444 HandleGenericCall(&generic_service, cqs_[0].get());
445 });
446 std::thread request_stream_handler_thread(
447 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
448 TestAllMethods();
449 generic_handler_thread.join();
450 request_stream_handler_thread.join();
451 }
452
453 // Add a second service with one sync method.
454 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
455 EchoTestService::WithAsyncMethod_RequestStream<
456 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> > service;
457 AsyncGenericService generic_service;
458 TestServiceImplDupPkg dup_service;
459 SetUpServer(&service, &dup_service, &generic_service);
460 ResetStub();
461 std::thread generic_handler_thread([this, &generic_service] {
462 HandleGenericCall(&generic_service, cqs_[0].get());
463 });
464 std::thread request_stream_handler_thread(
465 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
466 TestAllMethods();
467 SendEchoToDupService();
468 generic_handler_thread.join();
469 request_stream_handler_thread.join();
470 }
471
472 // Add a second service with one async method.
473 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
474 EchoTestService::WithAsyncMethod_RequestStream<
475 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> > service;
476 AsyncGenericService generic_service;
477 duplicate::EchoTestService::AsyncService dup_service;
478 SetUpServer(&service, &dup_service, &generic_service);
479 ResetStub();
480 std::thread generic_handler_thread([this, &generic_service] {
481 HandleGenericCall(&generic_service, cqs_[0].get());
482 });
483 std::thread request_stream_handler_thread(
484 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
485 std::thread echo_handler_thread(
486 [this, &dup_service] { HandleEcho(&dup_service, cqs_[2].get(), true); });
487 TestAllMethods();
488 SendEchoToDupService();
489 generic_handler_thread.join();
490 request_stream_handler_thread.join();
491 echo_handler_thread.join();
492 }
493
494 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
495 EchoTestService::WithAsyncMethod_RequestStream<
496 EchoTestService::WithGenericMethod_Echo<
497 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > >
498 service;
499 AsyncGenericService generic_service;
500 SetUpServer(&service, nullptr, &generic_service);
501 ResetStub();
502 std::thread generic_handler_thread([this, &generic_service] {
503 HandleGenericCall(&generic_service, cqs_[0].get());
504 });
505 std::thread request_stream_handler_thread(
506 [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
507 std::thread response_stream_handler_thread(
508 [this, &service] { HandleServerStreaming(&service, cqs_[2].get()); });
509 TestAllMethods();
510 generic_handler_thread.join();
511 request_stream_handler_thread.join();
512 response_stream_handler_thread.join();
513 }
514
515 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
516 EchoTestService::WithGenericMethod_RequestStream<
517 EchoTestService::WithGenericMethod_Echo<
518 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > >
519 service;
520 AsyncGenericService generic_service;
521 SetUpServer(&service, nullptr, &generic_service);
522 ResetStub();
523 std::thread generic_handler_thread([this, &generic_service] {
524 HandleGenericCall(&generic_service, cqs_[0].get());
525 });
526 std::thread generic_handler_thread2([this, &generic_service] {
527 HandleGenericCall(&generic_service, cqs_[1].get());
528 });
529 std::thread response_stream_handler_thread(
530 [this, &service] { HandleServerStreaming(&service, cqs_[2].get()); });
531 TestAllMethods();
532 generic_handler_thread.join();
533 generic_handler_thread2.join();
534 response_stream_handler_thread.join();
535 }
536
537 // If WithGenericMethod is called and no generic service is registered, the
538 // server will fail to build.
539 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
540 EchoTestService::WithGenericMethod_RequestStream<
541 EchoTestService::WithGenericMethod_Echo<
542 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > >
543 service;
544 SetUpServer(&service, nullptr, nullptr);
545 EXPECT_EQ(nullptr, server_.get());
546 }
547
548 } // namespace
549 } // namespace testing
550 } // namespace grpc
551
552 int main(int argc, char** argv) {
553 grpc_test_init(argc, argv);
554 ::testing::InitGoogleTest(&argc, argv);
555 return RUN_ALL_TESTS();
556 }
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/end2end/generic_end2end_test.cc ('k') | third_party/grpc/test/cpp/end2end/mock_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698