Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(258)

Side by Side Diff: third_party/grpc/test/cpp/end2end/thread_stress_test.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <mutex>
35 #include <thread>
36
37 #include <grpc++/channel.h>
38 #include <grpc++/client_context.h>
39 #include <grpc++/create_channel.h>
40 #include <grpc++/server.h>
41 #include <grpc++/server_builder.h>
42 #include <grpc++/server_context.h>
43 #include <grpc/grpc.h>
44 #include <grpc/support/thd.h>
45 #include <grpc/support/time.h>
46 #include <gtest/gtest.h>
47
48 #include "src/core/surface/api_trace.h"
49 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
50 #include "src/proto/grpc/testing/echo.grpc.pb.h"
51 #include "test/core/util/port.h"
52 #include "test/core/util/test_config.h"
53
54 using grpc::testing::EchoRequest;
55 using grpc::testing::EchoResponse;
56 using std::chrono::system_clock;
57
58 const int kNumThreads = 100; // Number of threads
59 const int kNumAsyncSendThreads = 2;
60 const int kNumAsyncReceiveThreads = 50;
61 const int kNumRpcs = 1000; // Number of RPCs per thread
62
63 namespace grpc {
64 namespace testing {
65
66 namespace {
67
68 // When echo_deadline is requested, deadline seen in the ServerContext is set in
69 // the response in seconds.
70 void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
71 EchoResponse* response) {
72 if (request->has_param() && request->param().echo_deadline()) {
73 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
74 if (context->deadline() != system_clock::time_point::max()) {
75 Timepoint2Timespec(context->deadline(), &deadline);
76 }
77 response->mutable_param()->set_request_deadline(deadline.tv_sec);
78 }
79 }
80
81 } // namespace
82
83 class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
84 public:
85 TestServiceImpl() : signal_client_(false) {}
86
87 Status Echo(ServerContext* context, const EchoRequest* request,
88 EchoResponse* response) GRPC_OVERRIDE {
89 response->set_message(request->message());
90 MaybeEchoDeadline(context, request, response);
91 if (request->has_param() && request->param().client_cancel_after_us()) {
92 {
93 unique_lock<mutex> lock(mu_);
94 signal_client_ = true;
95 }
96 while (!context->IsCancelled()) {
97 gpr_sleep_until(gpr_time_add(
98 gpr_now(GPR_CLOCK_REALTIME),
99 gpr_time_from_micros(request->param().client_cancel_after_us(),
100 GPR_TIMESPAN)));
101 }
102 return Status::CANCELLED;
103 } else if (request->has_param() &&
104 request->param().server_cancel_after_us()) {
105 gpr_sleep_until(gpr_time_add(
106 gpr_now(GPR_CLOCK_REALTIME),
107 gpr_time_from_micros(request->param().server_cancel_after_us(),
108 GPR_TIMESPAN)));
109 return Status::CANCELLED;
110 } else {
111 EXPECT_FALSE(context->IsCancelled());
112 }
113 return Status::OK;
114 }
115
116 // Unimplemented is left unimplemented to test the returned error.
117
118 Status RequestStream(ServerContext* context,
119 ServerReader<EchoRequest>* reader,
120 EchoResponse* response) GRPC_OVERRIDE {
121 EchoRequest request;
122 response->set_message("");
123 while (reader->Read(&request)) {
124 response->mutable_message()->append(request.message());
125 }
126 return Status::OK;
127 }
128
129 // Return 3 messages.
130 // TODO(yangg) make it generic by adding a parameter into EchoRequest
131 Status ResponseStream(ServerContext* context, const EchoRequest* request,
132 ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
133 EchoResponse response;
134 response.set_message(request->message() + "0");
135 writer->Write(response);
136 response.set_message(request->message() + "1");
137 writer->Write(response);
138 response.set_message(request->message() + "2");
139 writer->Write(response);
140
141 return Status::OK;
142 }
143
144 Status BidiStream(ServerContext* context,
145 ServerReaderWriter<EchoResponse, EchoRequest>* stream)
146 GRPC_OVERRIDE {
147 EchoRequest request;
148 EchoResponse response;
149 while (stream->Read(&request)) {
150 gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
151 response.set_message(request.message());
152 stream->Write(response);
153 }
154 return Status::OK;
155 }
156
157 bool signal_client() {
158 unique_lock<mutex> lock(mu_);
159 return signal_client_;
160 }
161
162 private:
163 bool signal_client_;
164 mutex mu_;
165 };
166
167 class TestServiceImplDupPkg
168 : public ::grpc::testing::duplicate::EchoTestService::Service {
169 public:
170 Status Echo(ServerContext* context, const EchoRequest* request,
171 EchoResponse* response) GRPC_OVERRIDE {
172 response->set_message("no package");
173 return Status::OK;
174 }
175 };
176
177 class CommonStressTest {
178 public:
179 CommonStressTest() : kMaxMessageSize_(8192) {}
180 void SetUp() {
181 int port = grpc_pick_unused_port_or_die();
182 server_address_ << "localhost:" << port;
183 // Setup server
184 ServerBuilder builder;
185 builder.AddListeningPort(server_address_.str(),
186 InsecureServerCredentials());
187 builder.RegisterService(&service_);
188 builder.SetMaxMessageSize(
189 kMaxMessageSize_); // For testing max message size.
190 builder.RegisterService(&dup_pkg_service_);
191 server_ = builder.BuildAndStart();
192 }
193 void TearDown() { server_->Shutdown(); }
194 void ResetStub() {
195 std::shared_ptr<Channel> channel =
196 CreateChannel(server_address_.str(), InsecureChannelCredentials());
197 stub_ = grpc::testing::EchoTestService::NewStub(channel);
198 }
199 grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
200
201 private:
202 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
203 std::unique_ptr<Server> server_;
204 std::ostringstream server_address_;
205 const int kMaxMessageSize_;
206 TestServiceImpl service_;
207 TestServiceImplDupPkg dup_pkg_service_;
208 };
209
210 class End2endTest : public ::testing::Test {
211 protected:
212 End2endTest() {}
213 void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
214 void TearDown() GRPC_OVERRIDE { common_.TearDown(); }
215 void ResetStub() { common_.ResetStub(); }
216
217 CommonStressTest common_;
218 };
219
220 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
221 EchoRequest request;
222 EchoResponse response;
223 request.set_message("Hello");
224
225 for (int i = 0; i < num_rpcs; ++i) {
226 ClientContext context;
227 Status s = stub->Echo(&context, request, &response);
228 EXPECT_EQ(response.message(), request.message());
229 EXPECT_TRUE(s.ok());
230 }
231 }
232
233 TEST_F(End2endTest, ThreadStress) {
234 common_.ResetStub();
235 std::vector<std::thread*> threads;
236 for (int i = 0; i < kNumThreads; ++i) {
237 threads.push_back(new std::thread(SendRpc, common_.GetStub(), kNumRpcs));
238 }
239 for (int i = 0; i < kNumThreads; ++i) {
240 threads[i]->join();
241 delete threads[i];
242 }
243 }
244
245 class AsyncClientEnd2endTest : public ::testing::Test {
246 protected:
247 AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
248
249 void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
250 void TearDown() GRPC_OVERRIDE {
251 void* ignored_tag;
252 bool ignored_ok;
253 while (cq_.Next(&ignored_tag, &ignored_ok))
254 ;
255 common_.TearDown();
256 }
257
258 void Wait() {
259 unique_lock<mutex> l(mu_);
260 while (rpcs_outstanding_ != 0) {
261 cv_.wait(l);
262 }
263
264 cq_.Shutdown();
265 }
266
267 struct AsyncClientCall {
268 EchoResponse response;
269 ClientContext context;
270 Status status;
271 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
272 };
273
274 void AsyncSendRpc(int num_rpcs) {
275 for (int i = 0; i < num_rpcs; ++i) {
276 AsyncClientCall* call = new AsyncClientCall;
277 EchoRequest request;
278 request.set_message("Hello: " + std::to_string(i));
279 call->response_reader =
280 common_.GetStub()->AsyncEcho(&call->context, request, &cq_);
281 call->response_reader->Finish(&call->response, &call->status,
282 (void*)call);
283
284 unique_lock<mutex> l(mu_);
285 rpcs_outstanding_++;
286 }
287 }
288
289 void AsyncCompleteRpc() {
290 while (true) {
291 void* got_tag;
292 bool ok = false;
293 if (!cq_.Next(&got_tag, &ok)) break;
294 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
295 if (!ok) {
296 gpr_log(GPR_DEBUG, "Error: %d", call->status.error_code());
297 }
298 delete call;
299
300 bool notify;
301 {
302 unique_lock<mutex> l(mu_);
303 rpcs_outstanding_--;
304 notify = (rpcs_outstanding_ == 0);
305 }
306 if (notify) {
307 cv_.notify_all();
308 }
309 }
310 }
311
312 CommonStressTest common_;
313 CompletionQueue cq_;
314 mutex mu_;
315 condition_variable cv_;
316 int rpcs_outstanding_;
317 };
318
319 TEST_F(AsyncClientEnd2endTest, ThreadStress) {
320 common_.ResetStub();
321 std::vector<std::thread*> send_threads, completion_threads;
322 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
323 completion_threads.push_back(new std::thread(
324 &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this));
325 }
326 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
327 send_threads.push_back(
328 new std::thread(&AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc,
329 this, kNumRpcs));
330 }
331 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
332 send_threads[i]->join();
333 delete send_threads[i];
334 }
335
336 Wait();
337 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
338 completion_threads[i]->join();
339 delete completion_threads[i];
340 }
341 }
342
343 } // namespace testing
344 } // namespace grpc
345
346 int main(int argc, char** argv) {
347 grpc_test_init(argc, argv);
348 ::testing::InitGoogleTest(&argc, argv);
349 return RUN_ALL_TESTS();
350 }
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/end2end/test_service_impl.cc ('k') | third_party/grpc/test/cpp/end2end/zookeeper_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698