Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: third_party/grpc/test/cpp/qps/client_async.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/grpc/test/cpp/qps/client.h ('k') | third_party/grpc/test/cpp/qps/client_sync.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <cassert>
35 #include <forward_list>
36 #include <functional>
37 #include <list>
38 #include <memory>
39 #include <mutex>
40 #include <sstream>
41 #include <string>
42 #include <thread>
43 #include <vector>
44
45 #include <gflags/gflags.h>
46 #include <grpc++/alarm.h>
47 #include <grpc++/channel.h>
48 #include <grpc++/client_context.h>
49 #include <grpc++/generic/generic_stub.h>
50 #include <grpc/grpc.h>
51 #include <grpc/support/cpu.h>
52 #include <grpc/support/histogram.h>
53 #include <grpc/support/log.h>
54
55 #include "src/proto/grpc/testing/services.grpc.pb.h"
56 #include "test/cpp/qps/client.h"
57 #include "test/cpp/qps/usage_timer.h"
58 #include "test/cpp/util/create_test_channel.h"
59
60 namespace grpc {
61 namespace testing {
62
63 class ClientRpcContext {
64 public:
65 ClientRpcContext() {}
66 virtual ~ClientRpcContext() {}
67 // next state, return false if done. Collect stats when appropriate
68 virtual bool RunNextState(bool, Histogram* hist) = 0;
69 virtual ClientRpcContext* StartNewClone() = 0;
70 static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
71 static ClientRpcContext* detag(void* t) {
72 return reinterpret_cast<ClientRpcContext*>(t);
73 }
74
75 virtual void Start(CompletionQueue* cq) = 0;
76 };
77
78 template <class RequestType, class ResponseType>
79 class ClientRpcContextUnaryImpl : public ClientRpcContext {
80 public:
81 ClientRpcContextUnaryImpl(
82 BenchmarkService::Stub* stub, const RequestType& req,
83 std::function<gpr_timespec()> next_issue,
84 std::function<
85 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
86 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
87 CompletionQueue*)> start_req,
88 std::function<void(grpc::Status, ResponseType*)> on_done)
89 : context_(),
90 stub_(stub),
91 cq_(nullptr),
92 req_(req),
93 response_(),
94 next_state_(State::READY),
95 callback_(on_done),
96 next_issue_(next_issue),
97 start_req_(start_req) {}
98 ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
99 void Start(CompletionQueue* cq) GRPC_OVERRIDE {
100 cq_ = cq;
101 if (!next_issue_) { // ready to issue
102 RunNextState(true, nullptr);
103 } else { // wait for the issue time
104 alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
105 }
106 }
107 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
108 switch (next_state_) {
109 case State::READY:
110 start_ = UsageTimer::Now();
111 response_reader_ = start_req_(stub_, &context_, req_, cq_);
112 response_reader_->Finish(&response_, &status_,
113 ClientRpcContext::tag(this));
114 next_state_ = State::RESP_DONE;
115 return true;
116 case State::RESP_DONE:
117 hist->Add((UsageTimer::Now() - start_) * 1e9);
118 callback_(status_, &response_);
119 next_state_ = State::INVALID;
120 return false;
121 default:
122 GPR_ASSERT(false);
123 return false;
124 }
125 }
126 ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
127 return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_,
128 callback_);
129 }
130
131 private:
132 grpc::ClientContext context_;
133 BenchmarkService::Stub* stub_;
134 CompletionQueue* cq_;
135 std::unique_ptr<Alarm> alarm_;
136 RequestType req_;
137 ResponseType response_;
138 enum State { INVALID, READY, RESP_DONE };
139 State next_state_;
140 std::function<void(grpc::Status, ResponseType*)> callback_;
141 std::function<gpr_timespec()> next_issue_;
142 std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
143 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
144 CompletionQueue*)> start_req_;
145 grpc::Status status_;
146 double start_;
147 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
148 response_reader_;
149 };
150
151 typedef std::forward_list<ClientRpcContext*> context_list;
152
153 template <class StubType, class RequestType>
154 class AsyncClient : public ClientImpl<StubType, RequestType> {
155 // Specify which protected members we are using since there is no
156 // member name resolution until the template types are fully resolved
157 public:
158 using Client::SetupLoadTest;
159 using Client::closed_loop_;
160 using Client::NextIssuer;
161 using ClientImpl<StubType, RequestType>::cores_;
162 using ClientImpl<StubType, RequestType>::channels_;
163 using ClientImpl<StubType, RequestType>::request_;
164 AsyncClient(const ClientConfig& config,
165 std::function<ClientRpcContext*(
166 StubType*, std::function<gpr_timespec()> next_issue,
167 const RequestType&)> setup_ctx,
168 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
169 create_stub)
170 : ClientImpl<StubType, RequestType>(config, create_stub),
171 num_async_threads_(NumThreads(config)) {
172 SetupLoadTest(config, num_async_threads_);
173
174 for (int i = 0; i < num_async_threads_; i++) {
175 cli_cqs_.emplace_back(new CompletionQueue);
176 next_issuers_.emplace_back(NextIssuer(i));
177 }
178
179 using namespace std::placeholders;
180 int t = 0;
181 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
182 for (int ch = 0; ch < config.client_channels(); ch++) {
183 auto* cq = cli_cqs_[t].get();
184 auto ctx =
185 setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
186 ctx->Start(cq);
187 t = (t + 1) % cli_cqs_.size();
188 }
189 }
190 }
191 virtual ~AsyncClient() {
192 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
193 (*cq)->Shutdown();
194 void* got_tag;
195 bool ok;
196 while ((*cq)->Next(&got_tag, &ok)) {
197 delete ClientRpcContext::detag(got_tag);
198 }
199 }
200 }
201
202 bool ThreadFunc(Histogram* histogram,
203 size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
204 void* got_tag;
205 bool ok;
206
207 if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
208 // Got a regular event, so process it
209 ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
210 if (!ctx->RunNextState(ok, histogram)) {
211 // The RPC and callback are done, so clone the ctx
212 // and kickstart the new one
213 auto clone = ctx->StartNewClone();
214 clone->Start(cli_cqs_[thread_idx].get());
215 // delete the old version
216 delete ctx;
217 }
218 return true;
219 } else { // queue is shutting down
220 return false;
221 }
222 }
223
224 protected:
225 const int num_async_threads_;
226
227 private:
228 int NumThreads(const ClientConfig& config) {
229 int num_threads = config.async_client_threads();
230 if (num_threads <= 0) { // Use dynamic sizing
231 num_threads = cores_;
232 gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
233 }
234 return num_threads;
235 }
236
237 std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
238 std::vector<std::function<gpr_timespec()>> next_issuers_;
239 };
240
241 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
242 std::shared_ptr<Channel> ch) {
243 return BenchmarkService::NewStub(ch);
244 }
245
246 class AsyncUnaryClient GRPC_FINAL
247 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
248 public:
249 explicit AsyncUnaryClient(const ClientConfig& config)
250 : AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
251 StartThreads(num_async_threads_);
252 }
253 ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
254
255 private:
256 static void CheckDone(grpc::Status s, SimpleResponse* response) {}
257 static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
258 StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
259 const SimpleRequest& request, CompletionQueue* cq) {
260 return stub->AsyncUnaryCall(ctx, request, cq);
261 };
262 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
263 std::function<gpr_timespec()> next_issue,
264 const SimpleRequest& req) {
265 return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
266 stub, req, next_issue, AsyncUnaryClient::StartReq,
267 AsyncUnaryClient::CheckDone);
268 }
269 };
270
271 template <class RequestType, class ResponseType>
272 class ClientRpcContextStreamingImpl : public ClientRpcContext {
273 public:
274 ClientRpcContextStreamingImpl(
275 BenchmarkService::Stub* stub, const RequestType& req,
276 std::function<gpr_timespec()> next_issue,
277 std::function<std::unique_ptr<
278 grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
279 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
280 void*)> start_req,
281 std::function<void(grpc::Status, ResponseType*)> on_done)
282 : context_(),
283 stub_(stub),
284 cq_(nullptr),
285 req_(req),
286 response_(),
287 next_state_(State::INVALID),
288 callback_(on_done),
289 next_issue_(next_issue),
290 start_req_(start_req) {}
291 ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
292 void Start(CompletionQueue* cq) GRPC_OVERRIDE {
293 cq_ = cq;
294 stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
295 next_state_ = State::STREAM_IDLE;
296 }
297 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
298 while (true) {
299 switch (next_state_) {
300 case State::STREAM_IDLE:
301 if (!next_issue_) { // ready to issue
302 next_state_ = State::READY_TO_WRITE;
303 } else {
304 next_state_ = State::WAIT;
305 }
306 break; // loop around, don't return
307 case State::WAIT:
308 alarm_.reset(
309 new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
310 next_state_ = State::READY_TO_WRITE;
311 return true;
312 case State::READY_TO_WRITE:
313 if (!ok) {
314 return false;
315 }
316 start_ = UsageTimer::Now();
317 next_state_ = State::WRITE_DONE;
318 stream_->Write(req_, ClientRpcContext::tag(this));
319 return true;
320 case State::WRITE_DONE:
321 if (!ok) {
322 return false;
323 }
324 next_state_ = State::READ_DONE;
325 stream_->Read(&response_, ClientRpcContext::tag(this));
326 return true;
327 break;
328 case State::READ_DONE:
329 hist->Add((UsageTimer::Now() - start_) * 1e9);
330 callback_(status_, &response_);
331 next_state_ = State::STREAM_IDLE;
332 break; // loop around
333 default:
334 GPR_ASSERT(false);
335 return false;
336 }
337 }
338 }
339 ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
340 return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
341 start_req_, callback_);
342 }
343
344 private:
345 grpc::ClientContext context_;
346 BenchmarkService::Stub* stub_;
347 CompletionQueue* cq_;
348 std::unique_ptr<Alarm> alarm_;
349 RequestType req_;
350 ResponseType response_;
351 enum State {
352 INVALID,
353 STREAM_IDLE,
354 WAIT,
355 READY_TO_WRITE,
356 WRITE_DONE,
357 READ_DONE
358 };
359 State next_state_;
360 std::function<void(grpc::Status, ResponseType*)> callback_;
361 std::function<gpr_timespec()> next_issue_;
362 std::function<
363 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
364 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
365 void*)> start_req_;
366 grpc::Status status_;
367 double start_;
368 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
369 stream_;
370 };
371
372 class AsyncStreamingClient GRPC_FINAL
373 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
374 public:
375 explicit AsyncStreamingClient(const ClientConfig& config)
376 : AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
377 StartThreads(num_async_threads_);
378 }
379
380 ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
381
382 private:
383 static void CheckDone(grpc::Status s, SimpleResponse* response) {}
384 static std::unique_ptr<
385 grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
386 StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
387 CompletionQueue* cq, void* tag) {
388 auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
389 return stream;
390 };
391 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
392 std::function<gpr_timespec()> next_issue,
393 const SimpleRequest& req) {
394 return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
395 stub, req, next_issue, AsyncStreamingClient::StartReq,
396 AsyncStreamingClient::CheckDone);
397 }
398 };
399
400 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
401 public:
402 ClientRpcContextGenericStreamingImpl(
403 grpc::GenericStub* stub, const ByteBuffer& req,
404 std::function<gpr_timespec()> next_issue,
405 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
406 grpc::GenericStub*, grpc::ClientContext*,
407 const grpc::string& method_name, CompletionQueue*, void*)> start_req,
408 std::function<void(grpc::Status, ByteBuffer*)> on_done)
409 : context_(),
410 stub_(stub),
411 cq_(nullptr),
412 req_(req),
413 response_(),
414 next_state_(State::INVALID),
415 callback_(on_done),
416 next_issue_(next_issue),
417 start_req_(start_req) {}
418 ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
419 void Start(CompletionQueue* cq) GRPC_OVERRIDE {
420 cq_ = cq;
421 const grpc::string kMethodName(
422 "/grpc.testing.BenchmarkService/StreamingCall");
423 stream_ = start_req_(stub_, &context_, kMethodName, cq,
424 ClientRpcContext::tag(this));
425 next_state_ = State::STREAM_IDLE;
426 }
427 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
428 while (true) {
429 switch (next_state_) {
430 case State::STREAM_IDLE:
431 if (!next_issue_) { // ready to issue
432 next_state_ = State::READY_TO_WRITE;
433 } else {
434 next_state_ = State::WAIT;
435 }
436 break; // loop around, don't return
437 case State::WAIT:
438 alarm_.reset(
439 new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
440 next_state_ = State::READY_TO_WRITE;
441 return true;
442 case State::READY_TO_WRITE:
443 if (!ok) {
444 return false;
445 }
446 start_ = UsageTimer::Now();
447 next_state_ = State::WRITE_DONE;
448 stream_->Write(req_, ClientRpcContext::tag(this));
449 return true;
450 case State::WRITE_DONE:
451 if (!ok) {
452 return false;
453 }
454 next_state_ = State::READ_DONE;
455 stream_->Read(&response_, ClientRpcContext::tag(this));
456 return true;
457 break;
458 case State::READ_DONE:
459 hist->Add((UsageTimer::Now() - start_) * 1e9);
460 callback_(status_, &response_);
461 next_state_ = State::STREAM_IDLE;
462 break; // loop around
463 default:
464 GPR_ASSERT(false);
465 return false;
466 }
467 }
468 }
469 ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
470 return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
471 start_req_, callback_);
472 }
473
474 private:
475 grpc::ClientContext context_;
476 grpc::GenericStub* stub_;
477 CompletionQueue* cq_;
478 std::unique_ptr<Alarm> alarm_;
479 ByteBuffer req_;
480 ByteBuffer response_;
481 enum State {
482 INVALID,
483 STREAM_IDLE,
484 WAIT,
485 READY_TO_WRITE,
486 WRITE_DONE,
487 READ_DONE
488 };
489 State next_state_;
490 std::function<void(grpc::Status, ByteBuffer*)> callback_;
491 std::function<gpr_timespec()> next_issue_;
492 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
493 grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
494 CompletionQueue*, void*)> start_req_;
495 grpc::Status status_;
496 double start_;
497 std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
498 };
499
500 static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
501 std::shared_ptr<Channel> ch) {
502 return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));
503 }
504
505 class GenericAsyncStreamingClient GRPC_FINAL
506 : public AsyncClient<grpc::GenericStub, ByteBuffer> {
507 public:
508 explicit GenericAsyncStreamingClient(const ClientConfig& config)
509 : AsyncClient(config, SetupCtx, GenericStubCreator) {
510 StartThreads(num_async_threads_);
511 }
512
513 ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
514
515 private:
516 static void CheckDone(grpc::Status s, ByteBuffer* response) {}
517 static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq(
518 grpc::GenericStub* stub, grpc::ClientContext* ctx,
519 const grpc::string& method_name, CompletionQueue* cq, void* tag) {
520 auto stream = stub->Call(ctx, method_name, cq, tag);
521 return stream;
522 };
523 static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
524 std::function<gpr_timespec()> next_issue,
525 const ByteBuffer& req) {
526 return new ClientRpcContextGenericStreamingImpl(
527 stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
528 GenericAsyncStreamingClient::CheckDone);
529 }
530 };
531
532 std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
533 return std::unique_ptr<Client>(new AsyncUnaryClient(args));
534 }
535 std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
536 return std::unique_ptr<Client>(new AsyncStreamingClient(args));
537 }
538 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
539 const ClientConfig& args) {
540 return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));
541 }
542
543 } // namespace testing
544 } // namespace grpc
OLDNEW
« no previous file with comments | « third_party/grpc/test/cpp/qps/client.h ('k') | third_party/grpc/test/cpp/qps/client_sync.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698