Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Side by Side Diff: third_party/grpc/test/cpp/end2end/async_end2end_test.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <memory>
35 #include <thread>
36
37 #include <grpc++/channel.h>
38 #include <grpc++/client_context.h>
39 #include <grpc++/create_channel.h>
40 #include <grpc++/server.h>
41 #include <grpc++/server_builder.h>
42 #include <grpc++/server_context.h>
43 #include <grpc/grpc.h>
44 #include <grpc/support/thd.h>
45 #include <grpc/support/time.h>
46 #include <grpc/support/tls.h>
47 #include <gtest/gtest.h>
48
49 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
50 #include "src/proto/grpc/testing/echo.grpc.pb.h"
51 #include "test/core/util/port.h"
52 #include "test/core/util/test_config.h"
53 #include "test/cpp/util/string_ref_helper.h"
54
55 #ifdef GPR_POSIX_SOCKET
56 #include "src/core/iomgr/pollset_posix.h"
57 #endif
58
59 using grpc::testing::EchoRequest;
60 using grpc::testing::EchoResponse;
61 using std::chrono::system_clock;
62
63 GPR_TLS_DECL(g_is_async_end2end_test);
64
65 namespace grpc {
66 namespace testing {
67
68 namespace {
69
70 void* tag(int i) { return (void*)(intptr_t)i; }
71
72 #ifdef GPR_POSIX_SOCKET
73 static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
74 int timeout) {
75 if (gpr_tls_get(&g_is_async_end2end_test)) {
76 GPR_ASSERT(timeout == 0);
77 }
78 return poll(pfds, nfds, timeout);
79 }
80
81 class PollOverride {
82 public:
83 PollOverride(grpc_poll_function_type f) {
84 prev_ = grpc_poll_function;
85 grpc_poll_function = f;
86 }
87
88 ~PollOverride() { grpc_poll_function = prev_; }
89
90 private:
91 grpc_poll_function_type prev_;
92 };
93
94 class PollingOverrider : public PollOverride {
95 public:
96 explicit PollingOverrider(bool allow_blocking)
97 : PollOverride(allow_blocking ? poll : maybe_assert_non_blocking_poll) {}
98 };
99 #else
100 class PollingOverrider {
101 public:
102 explicit PollingOverrider(bool allow_blocking) {}
103 };
104 #endif
105
106 class Verifier {
107 public:
108 explicit Verifier(bool spin) : spin_(spin) {}
109 Verifier& Expect(int i, bool expect_ok) {
110 expectations_[tag(i)] = expect_ok;
111 return *this;
112 }
113
114 void Verify(CompletionQueue* cq) { Verify(cq, false); }
115
116 void Verify(CompletionQueue* cq, bool ignore_ok) {
117 GPR_ASSERT(!expectations_.empty());
118 while (!expectations_.empty()) {
119 bool ok;
120 void* got_tag;
121 if (spin_) {
122 for (;;) {
123 auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
124 if (r == CompletionQueue::TIMEOUT) continue;
125 if (r == CompletionQueue::GOT_EVENT) break;
126 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
127 abort();
128 }
129 } else {
130 EXPECT_TRUE(cq->Next(&got_tag, &ok));
131 }
132 auto it = expectations_.find(got_tag);
133 EXPECT_TRUE(it != expectations_.end());
134 if (!ignore_ok) {
135 EXPECT_EQ(it->second, ok);
136 }
137 expectations_.erase(it);
138 }
139 }
140 void Verify(CompletionQueue* cq,
141 std::chrono::system_clock::time_point deadline) {
142 if (expectations_.empty()) {
143 bool ok;
144 void* got_tag;
145 if (spin_) {
146 while (std::chrono::system_clock::now() < deadline) {
147 EXPECT_EQ(
148 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
149 CompletionQueue::TIMEOUT);
150 }
151 } else {
152 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
153 CompletionQueue::TIMEOUT);
154 }
155 } else {
156 while (!expectations_.empty()) {
157 bool ok;
158 void* got_tag;
159 if (spin_) {
160 for (;;) {
161 GPR_ASSERT(std::chrono::system_clock::now() < deadline);
162 auto r =
163 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
164 if (r == CompletionQueue::TIMEOUT) continue;
165 if (r == CompletionQueue::GOT_EVENT) break;
166 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
167 abort();
168 }
169 } else {
170 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
171 CompletionQueue::GOT_EVENT);
172 }
173 auto it = expectations_.find(got_tag);
174 EXPECT_TRUE(it != expectations_.end());
175 EXPECT_EQ(it->second, ok);
176 expectations_.erase(it);
177 }
178 }
179 }
180
181 private:
182 std::map<void*, bool> expectations_;
183 bool spin_;
184 };
185
186 class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
187 protected:
188 AsyncEnd2endTest() {}
189
190 void SetUp() GRPC_OVERRIDE {
191 poll_overrider_.reset(new PollingOverrider(!GetParam()));
192
193 int port = grpc_pick_unused_port_or_die();
194 server_address_ << "localhost:" << port;
195
196 // Setup server
197 ServerBuilder builder;
198 builder.AddListeningPort(server_address_.str(),
199 grpc::InsecureServerCredentials());
200 builder.RegisterService(&service_);
201 cq_ = builder.AddCompletionQueue();
202 server_ = builder.BuildAndStart();
203
204 gpr_tls_set(&g_is_async_end2end_test, 1);
205 }
206
207 void TearDown() GRPC_OVERRIDE {
208 server_->Shutdown();
209 void* ignored_tag;
210 bool ignored_ok;
211 cq_->Shutdown();
212 while (cq_->Next(&ignored_tag, &ignored_ok))
213 ;
214 poll_overrider_.reset();
215 gpr_tls_set(&g_is_async_end2end_test, 0);
216 }
217
218 void ResetStub() {
219 std::shared_ptr<Channel> channel =
220 CreateChannel(server_address_.str(), InsecureChannelCredentials());
221 stub_ = grpc::testing::EchoTestService::NewStub(channel);
222 }
223
224 void SendRpc(int num_rpcs) {
225 for (int i = 0; i < num_rpcs; i++) {
226 EchoRequest send_request;
227 EchoRequest recv_request;
228 EchoResponse send_response;
229 EchoResponse recv_response;
230 Status recv_status;
231
232 ClientContext cli_ctx;
233 ServerContext srv_ctx;
234 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
235
236 send_request.set_message("Hello");
237 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
238 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
239
240 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
241 cq_.get(), tag(2));
242
243 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
244 EXPECT_EQ(send_request.message(), recv_request.message());
245
246 send_response.set_message(recv_request.message());
247 response_writer.Finish(send_response, Status::OK, tag(3));
248 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
249
250 response_reader->Finish(&recv_response, &recv_status, tag(4));
251 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
252
253 EXPECT_EQ(send_response.message(), recv_response.message());
254 EXPECT_TRUE(recv_status.ok());
255 }
256 }
257
258 std::unique_ptr<ServerCompletionQueue> cq_;
259 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
260 std::unique_ptr<Server> server_;
261 grpc::testing::EchoTestService::AsyncService service_;
262 std::ostringstream server_address_;
263
264 std::unique_ptr<PollingOverrider> poll_overrider_;
265 };
266
267 TEST_P(AsyncEnd2endTest, SimpleRpc) {
268 ResetStub();
269 SendRpc(1);
270 }
271
272 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
273 ResetStub();
274 SendRpc(10);
275 }
276
277 // Test a simple RPC using the async version of Next
278 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
279 ResetStub();
280
281 EchoRequest send_request;
282 EchoRequest recv_request;
283 EchoResponse send_response;
284 EchoResponse recv_response;
285 Status recv_status;
286
287 ClientContext cli_ctx;
288 ServerContext srv_ctx;
289 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
290
291 send_request.set_message("Hello");
292 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
293 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
294
295 std::chrono::system_clock::time_point time_now(
296 std::chrono::system_clock::now());
297 std::chrono::system_clock::time_point time_limit(
298 std::chrono::system_clock::now() + std::chrono::seconds(10));
299 Verifier(GetParam()).Verify(cq_.get(), time_now);
300 Verifier(GetParam()).Verify(cq_.get(), time_now);
301
302 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
303 cq_.get(), tag(2));
304
305 Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
306 EXPECT_EQ(send_request.message(), recv_request.message());
307
308 send_response.set_message(recv_request.message());
309 response_writer.Finish(send_response, Status::OK, tag(3));
310 Verifier(GetParam())
311 .Expect(3, true)
312 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
313
314 response_reader->Finish(&recv_response, &recv_status, tag(4));
315 Verifier(GetParam())
316 .Expect(4, true)
317 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
318
319 EXPECT_EQ(send_response.message(), recv_response.message());
320 EXPECT_TRUE(recv_status.ok());
321 }
322
323 // Two pings and a final pong.
324 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
325 ResetStub();
326
327 EchoRequest send_request;
328 EchoRequest recv_request;
329 EchoResponse send_response;
330 EchoResponse recv_response;
331 Status recv_status;
332 ClientContext cli_ctx;
333 ServerContext srv_ctx;
334 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
335
336 send_request.set_message("Hello");
337 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
338 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
339
340 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
341 tag(2));
342
343 Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
344
345 cli_stream->Write(send_request, tag(3));
346 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
347
348 srv_stream.Read(&recv_request, tag(4));
349 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
350 EXPECT_EQ(send_request.message(), recv_request.message());
351
352 cli_stream->Write(send_request, tag(5));
353 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
354
355 srv_stream.Read(&recv_request, tag(6));
356 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
357
358 EXPECT_EQ(send_request.message(), recv_request.message());
359 cli_stream->WritesDone(tag(7));
360 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
361
362 srv_stream.Read(&recv_request, tag(8));
363 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
364
365 send_response.set_message(recv_request.message());
366 srv_stream.Finish(send_response, Status::OK, tag(9));
367 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
368
369 cli_stream->Finish(&recv_status, tag(10));
370 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
371
372 EXPECT_EQ(send_response.message(), recv_response.message());
373 EXPECT_TRUE(recv_status.ok());
374 }
375
376 // One ping, two pongs.
377 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
378 ResetStub();
379
380 EchoRequest send_request;
381 EchoRequest recv_request;
382 EchoResponse send_response;
383 EchoResponse recv_response;
384 Status recv_status;
385 ClientContext cli_ctx;
386 ServerContext srv_ctx;
387 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
388
389 send_request.set_message("Hello");
390 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
391 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
392
393 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
394 cq_.get(), cq_.get(), tag(2));
395
396 Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
397 EXPECT_EQ(send_request.message(), recv_request.message());
398
399 send_response.set_message(recv_request.message());
400 srv_stream.Write(send_response, tag(3));
401 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
402
403 cli_stream->Read(&recv_response, tag(4));
404 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
405 EXPECT_EQ(send_response.message(), recv_response.message());
406
407 srv_stream.Write(send_response, tag(5));
408 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
409
410 cli_stream->Read(&recv_response, tag(6));
411 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
412 EXPECT_EQ(send_response.message(), recv_response.message());
413
414 srv_stream.Finish(Status::OK, tag(7));
415 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
416
417 cli_stream->Read(&recv_response, tag(8));
418 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
419
420 cli_stream->Finish(&recv_status, tag(9));
421 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
422
423 EXPECT_TRUE(recv_status.ok());
424 }
425
426 // One ping, one pong.
427 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
428 ResetStub();
429
430 EchoRequest send_request;
431 EchoRequest recv_request;
432 EchoResponse send_response;
433 EchoResponse recv_response;
434 Status recv_status;
435 ClientContext cli_ctx;
436 ServerContext srv_ctx;
437 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
438
439 send_request.set_message("Hello");
440 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
441 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
442
443 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
444 tag(2));
445
446 Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
447
448 cli_stream->Write(send_request, tag(3));
449 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
450
451 srv_stream.Read(&recv_request, tag(4));
452 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
453 EXPECT_EQ(send_request.message(), recv_request.message());
454
455 send_response.set_message(recv_request.message());
456 srv_stream.Write(send_response, tag(5));
457 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
458
459 cli_stream->Read(&recv_response, tag(6));
460 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
461 EXPECT_EQ(send_response.message(), recv_response.message());
462
463 cli_stream->WritesDone(tag(7));
464 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
465
466 srv_stream.Read(&recv_request, tag(8));
467 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
468
469 srv_stream.Finish(Status::OK, tag(9));
470 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
471
472 cli_stream->Finish(&recv_status, tag(10));
473 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
474
475 EXPECT_TRUE(recv_status.ok());
476 }
477
478 // Metadata tests
479 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
480 ResetStub();
481
482 EchoRequest send_request;
483 EchoRequest recv_request;
484 EchoResponse send_response;
485 EchoResponse recv_response;
486 Status recv_status;
487
488 ClientContext cli_ctx;
489 ServerContext srv_ctx;
490 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
491
492 send_request.set_message("Hello");
493 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
494 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
495 std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
496 cli_ctx.AddMetadata(meta1.first, meta1.second);
497 cli_ctx.AddMetadata(meta2.first, meta2.second);
498 cli_ctx.AddMetadata(meta3.first, meta3.second);
499
500 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
501 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
502
503 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
504 cq_.get(), tag(2));
505 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
506 EXPECT_EQ(send_request.message(), recv_request.message());
507 auto client_initial_metadata = srv_ctx.client_metadata();
508 EXPECT_EQ(meta1.second,
509 ToString(client_initial_metadata.find(meta1.first)->second));
510 EXPECT_EQ(meta2.second,
511 ToString(client_initial_metadata.find(meta2.first)->second));
512 EXPECT_EQ(meta3.second,
513 ToString(client_initial_metadata.find(meta3.first)->second));
514 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
515
516 send_response.set_message(recv_request.message());
517 response_writer.Finish(send_response, Status::OK, tag(3));
518
519 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
520
521 response_reader->Finish(&recv_response, &recv_status, tag(4));
522 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
523
524 EXPECT_EQ(send_response.message(), recv_response.message());
525 EXPECT_TRUE(recv_status.ok());
526 }
527
528 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
529 ResetStub();
530
531 EchoRequest send_request;
532 EchoRequest recv_request;
533 EchoResponse send_response;
534 EchoResponse recv_response;
535 Status recv_status;
536
537 ClientContext cli_ctx;
538 ServerContext srv_ctx;
539 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
540
541 send_request.set_message("Hello");
542 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
543 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
544
545 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
546 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
547
548 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
549 cq_.get(), tag(2));
550 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
551 EXPECT_EQ(send_request.message(), recv_request.message());
552 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
553 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
554 response_writer.SendInitialMetadata(tag(3));
555 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
556
557 response_reader->ReadInitialMetadata(tag(4));
558 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
559 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
560 EXPECT_EQ(meta1.second,
561 ToString(server_initial_metadata.find(meta1.first)->second));
562 EXPECT_EQ(meta2.second,
563 ToString(server_initial_metadata.find(meta2.first)->second));
564 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
565
566 send_response.set_message(recv_request.message());
567 response_writer.Finish(send_response, Status::OK, tag(5));
568 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
569
570 response_reader->Finish(&recv_response, &recv_status, tag(6));
571 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
572
573 EXPECT_EQ(send_response.message(), recv_response.message());
574 EXPECT_TRUE(recv_status.ok());
575 }
576
577 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
578 ResetStub();
579
580 EchoRequest send_request;
581 EchoRequest recv_request;
582 EchoResponse send_response;
583 EchoResponse recv_response;
584 Status recv_status;
585
586 ClientContext cli_ctx;
587 ServerContext srv_ctx;
588 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
589
590 send_request.set_message("Hello");
591 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
592 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
593
594 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
595 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
596
597 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
598 cq_.get(), tag(2));
599 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
600 EXPECT_EQ(send_request.message(), recv_request.message());
601 response_writer.SendInitialMetadata(tag(3));
602 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
603
604 send_response.set_message(recv_request.message());
605 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
606 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
607 response_writer.Finish(send_response, Status::OK, tag(4));
608
609 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
610
611 response_reader->Finish(&recv_response, &recv_status, tag(5));
612 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
613 EXPECT_EQ(send_response.message(), recv_response.message());
614 EXPECT_TRUE(recv_status.ok());
615 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
616 EXPECT_EQ(meta1.second,
617 ToString(server_trailing_metadata.find(meta1.first)->second));
618 EXPECT_EQ(meta2.second,
619 ToString(server_trailing_metadata.find(meta2.first)->second));
620 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
621 }
622
623 TEST_P(AsyncEnd2endTest, MetadataRpc) {
624 ResetStub();
625
626 EchoRequest send_request;
627 EchoRequest recv_request;
628 EchoResponse send_response;
629 EchoResponse recv_response;
630 Status recv_status;
631
632 ClientContext cli_ctx;
633 ServerContext srv_ctx;
634 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
635
636 send_request.set_message("Hello");
637 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
638 std::pair<grpc::string, grpc::string> meta2(
639 "key2-bin",
640 grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
641 std::pair<grpc::string, grpc::string> meta3("key3", "val3");
642 std::pair<grpc::string, grpc::string> meta6(
643 "key4-bin",
644 grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
645 14));
646 std::pair<grpc::string, grpc::string> meta5("key5", "val5");
647 std::pair<grpc::string, grpc::string> meta4(
648 "key6-bin",
649 grpc::string(
650 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
651
652 cli_ctx.AddMetadata(meta1.first, meta1.second);
653 cli_ctx.AddMetadata(meta2.first, meta2.second);
654
655 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
656 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
657
658 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
659 cq_.get(), tag(2));
660 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
661 EXPECT_EQ(send_request.message(), recv_request.message());
662 auto client_initial_metadata = srv_ctx.client_metadata();
663 EXPECT_EQ(meta1.second,
664 ToString(client_initial_metadata.find(meta1.first)->second));
665 EXPECT_EQ(meta2.second,
666 ToString(client_initial_metadata.find(meta2.first)->second));
667 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
668
669 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
670 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
671 response_writer.SendInitialMetadata(tag(3));
672 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
673 response_reader->ReadInitialMetadata(tag(4));
674 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
675 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
676 EXPECT_EQ(meta3.second,
677 ToString(server_initial_metadata.find(meta3.first)->second));
678 EXPECT_EQ(meta4.second,
679 ToString(server_initial_metadata.find(meta4.first)->second));
680 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
681
682 send_response.set_message(recv_request.message());
683 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
684 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
685 response_writer.Finish(send_response, Status::OK, tag(5));
686
687 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
688
689 response_reader->Finish(&recv_response, &recv_status, tag(6));
690 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
691 EXPECT_EQ(send_response.message(), recv_response.message());
692 EXPECT_TRUE(recv_status.ok());
693 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
694 EXPECT_EQ(meta5.second,
695 ToString(server_trailing_metadata.find(meta5.first)->second));
696 EXPECT_EQ(meta6.second,
697 ToString(server_trailing_metadata.find(meta6.first)->second));
698 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
699 }
700
701 // Server uses AsyncNotifyWhenDone API to check for cancellation
702 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
703 ResetStub();
704
705 EchoRequest send_request;
706 EchoRequest recv_request;
707 EchoResponse send_response;
708 EchoResponse recv_response;
709 Status recv_status;
710
711 ClientContext cli_ctx;
712 ServerContext srv_ctx;
713 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
714
715 send_request.set_message("Hello");
716 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
717 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
718
719 srv_ctx.AsyncNotifyWhenDone(tag(5));
720 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
721 cq_.get(), tag(2));
722
723 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
724 EXPECT_EQ(send_request.message(), recv_request.message());
725
726 cli_ctx.TryCancel();
727 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
728 EXPECT_TRUE(srv_ctx.IsCancelled());
729
730 response_reader->Finish(&recv_response, &recv_status, tag(4));
731 Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
732
733 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
734 }
735
736 // Server uses AsyncNotifyWhenDone API to check for normal finish
737 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
738 ResetStub();
739
740 EchoRequest send_request;
741 EchoRequest recv_request;
742 EchoResponse send_response;
743 EchoResponse recv_response;
744 Status recv_status;
745
746 ClientContext cli_ctx;
747 ServerContext srv_ctx;
748 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
749
750 send_request.set_message("Hello");
751 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
752 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
753
754 srv_ctx.AsyncNotifyWhenDone(tag(5));
755 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
756 cq_.get(), tag(2));
757
758 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
759 EXPECT_EQ(send_request.message(), recv_request.message());
760
761 send_response.set_message(recv_request.message());
762 response_writer.Finish(send_response, Status::OK, tag(3));
763 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
764 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
765 EXPECT_FALSE(srv_ctx.IsCancelled());
766
767 response_reader->Finish(&recv_response, &recv_status, tag(4));
768 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
769
770 EXPECT_EQ(send_response.message(), recv_response.message());
771 EXPECT_TRUE(recv_status.ok());
772 }
773
774 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
775 std::shared_ptr<Channel> channel =
776 CreateChannel(server_address_.str(), InsecureChannelCredentials());
777 std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
778 stub = grpc::testing::UnimplementedService::NewStub(channel);
779 EchoRequest send_request;
780 EchoResponse recv_response;
781 Status recv_status;
782
783 ClientContext cli_ctx;
784 send_request.set_message("Hello");
785 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
786 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
787
788 response_reader->Finish(&recv_response, &recv_status, tag(4));
789 Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
790
791 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
792 EXPECT_EQ("", recv_status.error_message());
793 }
794
795 // This class is for testing scenarios where RPCs are cancelled on the server
796 // by calling ServerContext::TryCancel()
797 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
798 protected:
799 typedef enum {
800 DO_NOT_CANCEL = 0,
801 CANCEL_BEFORE_PROCESSING,
802 CANCEL_DURING_PROCESSING,
803 CANCEL_AFTER_PROCESSING
804 } ServerTryCancelRequestPhase;
805
806 void ServerTryCancel(ServerContext* context) {
807 EXPECT_FALSE(context->IsCancelled());
808 context->TryCancel();
809 gpr_log(GPR_INFO, "Server called TryCancel()");
810 EXPECT_TRUE(context->IsCancelled());
811 }
812
813 // Helper for testing client-streaming RPCs which are cancelled on the server.
814 // Depending on the value of server_try_cancel parameter, this will test one
815 // of the following three scenarios:
816 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
817 // any messages from the client
818 //
819 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
820 // messages from the client
821 //
822 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
823 // messages from the client (but before sending any status back to the
824 // client)
825 void TestClientStreamingServerCancel(
826 ServerTryCancelRequestPhase server_try_cancel) {
827 ResetStub();
828
829 EchoRequest send_request;
830 EchoRequest recv_request;
831 EchoResponse send_response;
832 EchoResponse recv_response;
833 Status recv_status;
834
835 ClientContext cli_ctx;
836 ServerContext srv_ctx;
837 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
838
839 // Initiate the 'RequestStream' call on client
840 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
841 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
842 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
843
844 // On the server, request to be notified of 'RequestStream' calls
845 // and receive the 'RequestStream' call just made by the client
846 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
847 tag(2));
848 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
849
850 // Client sends 3 messages (tags 3, 4 and 5)
851 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
852 send_request.set_message("Ping " + std::to_string(tag_idx));
853 cli_stream->Write(send_request, tag(tag_idx));
854 Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
855 }
856 cli_stream->WritesDone(tag(6));
857 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
858
859 bool expected_server_cq_result = true;
860 bool ignore_cq_result = false;
861
862 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
863 ServerTryCancel(&srv_ctx);
864
865 // Since cancellation is done before server reads any results, we know
866 // for sure that all cq results will return false from this point forward
867 expected_server_cq_result = false;
868 }
869
870 std::thread* server_try_cancel_thd = NULL;
871 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
872 server_try_cancel_thd = new std::thread(
873 &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
874 // Server will cancel the RPC in a parallel thread while reading the
875 // requests from the client. Since the cancellation can happen at anytime,
876 // some of the cq results (i.e those until cancellation) might be true but
877 // its non deterministic. So better to ignore the cq results
878 ignore_cq_result = true;
879 }
880
881 // Server reads 3 messages (tags 6, 7 and 8)
882 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
883 srv_stream.Read(&recv_request, tag(tag_idx));
884 Verifier(GetParam())
885 .Expect(tag_idx, expected_server_cq_result)
886 .Verify(cq_.get(), ignore_cq_result);
887 }
888
889 if (server_try_cancel_thd != NULL) {
890 server_try_cancel_thd->join();
891 delete server_try_cancel_thd;
892 }
893
894 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
895 ServerTryCancel(&srv_ctx);
896 }
897
898 // The RPC has been cancelled at this point for sure (i.e irrespective of
899 // the value of `server_try_cancel` is). So, from this point forward, we
900 // know that cq results are supposed to return false on server.
901
902 // Server sends the final message and cancelled status (but the RPC is
903 // already cancelled at this point. So we expect the operation to fail)
904 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
905 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
906
907 // Client will see the cancellation
908 cli_stream->Finish(&recv_status, tag(10));
909 // TODO(sreek): The expectation here should be true. This is a bug (github
910 // issue #4972)
911 Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
912 EXPECT_FALSE(recv_status.ok());
913 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
914 }
915
916 // Helper for testing server-streaming RPCs which are cancelled on the server.
917 // Depending on the value of server_try_cancel parameter, this will test one
918 // of the following three scenarios:
919 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
920 // any messages to the client
921 //
922 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
923 // messages to the client
924 //
925 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
926 // messages to the client (but before sending any status back to the
927 // client)
928 void TestServerStreamingServerCancel(
929 ServerTryCancelRequestPhase server_try_cancel) {
930 ResetStub();
931
932 EchoRequest send_request;
933 EchoRequest recv_request;
934 EchoResponse send_response;
935 EchoResponse recv_response;
936 Status recv_status;
937 ClientContext cli_ctx;
938 ServerContext srv_ctx;
939 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
940
941 send_request.set_message("Ping");
942 // Initiate the 'ResponseStream' call on the client
943 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
944 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
945 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
946 // On the server, request to be notified of 'ResponseStream' calls and
947 // receive the call just made by the client
948 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
949 cq_.get(), cq_.get(), tag(2));
950 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
951 EXPECT_EQ(send_request.message(), recv_request.message());
952
953 bool expected_cq_result = true;
954 bool ignore_cq_result = false;
955
956 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
957 ServerTryCancel(&srv_ctx);
958
959 // We know for sure that all cq results will be false from this point
960 // since the server cancelled the RPC
961 expected_cq_result = false;
962 }
963
964 std::thread* server_try_cancel_thd = NULL;
965 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
966 server_try_cancel_thd = new std::thread(
967 &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
968
969 // Server will cancel the RPC in a parallel thread while writing responses
970 // to the client. Since the cancellation can happen at anytime, some of
971 // the cq results (i.e those until cancellation) might be true but it is
972 // non deterministic. So better to ignore the cq results
973 ignore_cq_result = true;
974 }
975
976 // Server sends three messages (tags 3, 4 and 5)
977 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
978 send_response.set_message("Pong " + std::to_string(tag_idx));
979 srv_stream.Write(send_response, tag(tag_idx));
980 Verifier(GetParam())
981 .Expect(tag_idx, expected_cq_result)
982 .Verify(cq_.get(), ignore_cq_result);
983 }
984
985 if (server_try_cancel_thd != NULL) {
986 server_try_cancel_thd->join();
987 delete server_try_cancel_thd;
988 }
989
990 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
991 ServerTryCancel(&srv_ctx);
992
993 // Client reads may fail bacause it is notified that the stream is
994 // cancelled.
995 ignore_cq_result = true;
996 }
997
998 // Client attemts to read the three messages from the server
999 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1000 cli_stream->Read(&recv_response, tag(tag_idx));
1001 Verifier(GetParam())
1002 .Expect(tag_idx, expected_cq_result)
1003 .Verify(cq_.get(), ignore_cq_result);
1004 }
1005
1006 // The RPC has been cancelled at this point for sure (i.e irrespective of
1007 // the value of `server_try_cancel` is). So, from this point forward, we
1008 // know that cq results are supposed to return false on server.
1009
1010 // Server finishes the stream (but the RPC is already cancelled)
1011 srv_stream.Finish(Status::CANCELLED, tag(9));
1012 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
1013
1014 // Client will see the cancellation
1015 cli_stream->Finish(&recv_status, tag(10));
1016 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
1017 EXPECT_FALSE(recv_status.ok());
1018 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1019 }
1020
1021 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1022 // server.
1023 //
1024 // Depending on the value of server_try_cancel parameter, this will
1025 // test one of the following three scenarios:
1026 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1027 // writing any messages from/to the client
1028 //
1029 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1030 // messages from the client
1031 //
1032 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1033 // messages from the client (but before sending any status back to the
1034 // client)
1035 void TestBidiStreamingServerCancel(
1036 ServerTryCancelRequestPhase server_try_cancel) {
1037 ResetStub();
1038
1039 EchoRequest send_request;
1040 EchoRequest recv_request;
1041 EchoResponse send_response;
1042 EchoResponse recv_response;
1043 Status recv_status;
1044 ClientContext cli_ctx;
1045 ServerContext srv_ctx;
1046 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1047
1048 // Initiate the call from the client side
1049 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1050 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1051 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
1052
1053 // On the server, request to be notified of the 'BidiStream' call and
1054 // receive the call just made by the client
1055 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1056 tag(2));
1057 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
1058
1059 // Client sends the first and the only message
1060 send_request.set_message("Ping");
1061 cli_stream->Write(send_request, tag(3));
1062 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
1063
1064 bool expected_cq_result = true;
1065 bool ignore_cq_result = false;
1066
1067 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1068 ServerTryCancel(&srv_ctx);
1069
1070 // We know for sure that all cq results will be false from this point
1071 // since the server cancelled the RPC
1072 expected_cq_result = false;
1073 }
1074
1075 std::thread* server_try_cancel_thd = NULL;
1076 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1077 server_try_cancel_thd = new std::thread(
1078 &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
1079
1080 // Since server is going to cancel the RPC in a parallel thread, some of
1081 // the cq results (i.e those until the cancellation) might be true. Since
1082 // that number is non-deterministic, it is better to ignore the cq results
1083 ignore_cq_result = true;
1084 }
1085
1086 srv_stream.Read(&recv_request, tag(4));
1087 Verifier(GetParam())
1088 .Expect(4, expected_cq_result)
1089 .Verify(cq_.get(), ignore_cq_result);
1090
1091 send_response.set_message("Pong");
1092 srv_stream.Write(send_response, tag(5));
1093 Verifier(GetParam())
1094 .Expect(5, expected_cq_result)
1095 .Verify(cq_.get(), ignore_cq_result);
1096
1097 cli_stream->Read(&recv_response, tag(6));
1098 Verifier(GetParam())
1099 .Expect(6, expected_cq_result)
1100 .Verify(cq_.get(), ignore_cq_result);
1101
1102 // This is expected to succeed in all cases
1103 cli_stream->WritesDone(tag(7));
1104 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
1105
1106 // This is expected to fail in all cases i.e for all values of
1107 // server_try_cancel. This is because at this point, either there are no
1108 // more msgs from the client (because client called WritesDone) or the RPC
1109 // is cancelled on the server
1110 srv_stream.Read(&recv_request, tag(8));
1111 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
1112
1113 if (server_try_cancel_thd != NULL) {
1114 server_try_cancel_thd->join();
1115 delete server_try_cancel_thd;
1116 }
1117
1118 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1119 ServerTryCancel(&srv_ctx);
1120 }
1121
1122 // The RPC has been cancelled at this point for sure (i.e irrespective of
1123 // the value of `server_try_cancel` is). So, from this point forward, we
1124 // know that cq results are supposed to return false on server.
1125
1126 srv_stream.Finish(Status::CANCELLED, tag(9));
1127 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
1128
1129 cli_stream->Finish(&recv_status, tag(10));
1130 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
1131 EXPECT_FALSE(recv_status.ok());
1132 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1133 }
1134 };
1135
1136 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1137 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1138 }
1139
1140 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1141 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1142 }
1143
1144 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1145 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1146 }
1147
1148 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1149 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1150 }
1151
1152 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1153 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1154 }
1155
1156 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1157 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1158 }
1159
1160 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1161 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1162 }
1163
1164 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1165 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1166 }
1167
1168 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1169 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1170 }
1171
1172 INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
1173 ::testing::Values(false, true));
1174 INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1175 AsyncEnd2endServerTryCancelTest,
1176 ::testing::Values(false));
1177
1178 } // namespace
1179 } // namespace testing
1180 } // namespace grpc
1181
1182 int main(int argc, char** argv) {
1183 grpc_test_init(argc, argv);
1184 gpr_tls_init(&g_is_async_end2end_test);
1185 ::testing::InitGoogleTest(&argc, argv);
1186 int ret = RUN_ALL_TESTS();
1187 gpr_tls_destroy(&g_is_async_end2end_test);
1188 return ret;
1189 }
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/common/secure_auth_context_test.cc ('k') | third_party/grpc/test/cpp/end2end/client_crash_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698