OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include <cassert> |
| 35 #include <forward_list> |
| 36 #include <functional> |
| 37 #include <list> |
| 38 #include <memory> |
| 39 #include <mutex> |
| 40 #include <sstream> |
| 41 #include <string> |
| 42 #include <thread> |
| 43 #include <vector> |
| 44 |
| 45 #include <gflags/gflags.h> |
| 46 #include <grpc++/alarm.h> |
| 47 #include <grpc++/channel.h> |
| 48 #include <grpc++/client_context.h> |
| 49 #include <grpc++/generic/generic_stub.h> |
| 50 #include <grpc/grpc.h> |
| 51 #include <grpc/support/cpu.h> |
| 52 #include <grpc/support/histogram.h> |
| 53 #include <grpc/support/log.h> |
| 54 |
| 55 #include "src/proto/grpc/testing/services.grpc.pb.h" |
| 56 #include "test/cpp/qps/client.h" |
| 57 #include "test/cpp/qps/usage_timer.h" |
| 58 #include "test/cpp/util/create_test_channel.h" |
| 59 |
| 60 namespace grpc { |
| 61 namespace testing { |
| 62 |
| 63 class ClientRpcContext { |
| 64 public: |
| 65 ClientRpcContext() {} |
| 66 virtual ~ClientRpcContext() {} |
| 67 // next state, return false if done. Collect stats when appropriate |
| 68 virtual bool RunNextState(bool, Histogram* hist) = 0; |
| 69 virtual ClientRpcContext* StartNewClone() = 0; |
| 70 static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } |
| 71 static ClientRpcContext* detag(void* t) { |
| 72 return reinterpret_cast<ClientRpcContext*>(t); |
| 73 } |
| 74 |
| 75 virtual void Start(CompletionQueue* cq) = 0; |
| 76 }; |
| 77 |
| 78 template <class RequestType, class ResponseType> |
| 79 class ClientRpcContextUnaryImpl : public ClientRpcContext { |
| 80 public: |
| 81 ClientRpcContextUnaryImpl( |
| 82 BenchmarkService::Stub* stub, const RequestType& req, |
| 83 std::function<gpr_timespec()> next_issue, |
| 84 std::function< |
| 85 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( |
| 86 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, |
| 87 CompletionQueue*)> start_req, |
| 88 std::function<void(grpc::Status, ResponseType*)> on_done) |
| 89 : context_(), |
| 90 stub_(stub), |
| 91 cq_(nullptr), |
| 92 req_(req), |
| 93 response_(), |
| 94 next_state_(State::READY), |
| 95 callback_(on_done), |
| 96 next_issue_(next_issue), |
| 97 start_req_(start_req) {} |
| 98 ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} |
| 99 void Start(CompletionQueue* cq) GRPC_OVERRIDE { |
| 100 cq_ = cq; |
| 101 if (!next_issue_) { // ready to issue |
| 102 RunNextState(true, nullptr); |
| 103 } else { // wait for the issue time |
| 104 alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); |
| 105 } |
| 106 } |
| 107 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
| 108 switch (next_state_) { |
| 109 case State::READY: |
| 110 start_ = UsageTimer::Now(); |
| 111 response_reader_ = start_req_(stub_, &context_, req_, cq_); |
| 112 response_reader_->Finish(&response_, &status_, |
| 113 ClientRpcContext::tag(this)); |
| 114 next_state_ = State::RESP_DONE; |
| 115 return true; |
| 116 case State::RESP_DONE: |
| 117 hist->Add((UsageTimer::Now() - start_) * 1e9); |
| 118 callback_(status_, &response_); |
| 119 next_state_ = State::INVALID; |
| 120 return false; |
| 121 default: |
| 122 GPR_ASSERT(false); |
| 123 return false; |
| 124 } |
| 125 } |
| 126 ClientRpcContext* StartNewClone() GRPC_OVERRIDE { |
| 127 return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_, |
| 128 callback_); |
| 129 } |
| 130 |
| 131 private: |
| 132 grpc::ClientContext context_; |
| 133 BenchmarkService::Stub* stub_; |
| 134 CompletionQueue* cq_; |
| 135 std::unique_ptr<Alarm> alarm_; |
| 136 RequestType req_; |
| 137 ResponseType response_; |
| 138 enum State { INVALID, READY, RESP_DONE }; |
| 139 State next_state_; |
| 140 std::function<void(grpc::Status, ResponseType*)> callback_; |
| 141 std::function<gpr_timespec()> next_issue_; |
| 142 std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( |
| 143 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, |
| 144 CompletionQueue*)> start_req_; |
| 145 grpc::Status status_; |
| 146 double start_; |
| 147 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> |
| 148 response_reader_; |
| 149 }; |
| 150 |
| 151 typedef std::forward_list<ClientRpcContext*> context_list; |
| 152 |
| 153 template <class StubType, class RequestType> |
| 154 class AsyncClient : public ClientImpl<StubType, RequestType> { |
| 155 // Specify which protected members we are using since there is no |
| 156 // member name resolution until the template types are fully resolved |
| 157 public: |
| 158 using Client::SetupLoadTest; |
| 159 using Client::closed_loop_; |
| 160 using Client::NextIssuer; |
| 161 using ClientImpl<StubType, RequestType>::cores_; |
| 162 using ClientImpl<StubType, RequestType>::channels_; |
| 163 using ClientImpl<StubType, RequestType>::request_; |
| 164 AsyncClient(const ClientConfig& config, |
| 165 std::function<ClientRpcContext*( |
| 166 StubType*, std::function<gpr_timespec()> next_issue, |
| 167 const RequestType&)> setup_ctx, |
| 168 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> |
| 169 create_stub) |
| 170 : ClientImpl<StubType, RequestType>(config, create_stub), |
| 171 num_async_threads_(NumThreads(config)) { |
| 172 SetupLoadTest(config, num_async_threads_); |
| 173 |
| 174 for (int i = 0; i < num_async_threads_; i++) { |
| 175 cli_cqs_.emplace_back(new CompletionQueue); |
| 176 next_issuers_.emplace_back(NextIssuer(i)); |
| 177 } |
| 178 |
| 179 using namespace std::placeholders; |
| 180 int t = 0; |
| 181 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { |
| 182 for (int ch = 0; ch < config.client_channels(); ch++) { |
| 183 auto* cq = cli_cqs_[t].get(); |
| 184 auto ctx = |
| 185 setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_); |
| 186 ctx->Start(cq); |
| 187 t = (t + 1) % cli_cqs_.size(); |
| 188 } |
| 189 } |
| 190 } |
| 191 virtual ~AsyncClient() { |
| 192 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { |
| 193 (*cq)->Shutdown(); |
| 194 void* got_tag; |
| 195 bool ok; |
| 196 while ((*cq)->Next(&got_tag, &ok)) { |
| 197 delete ClientRpcContext::detag(got_tag); |
| 198 } |
| 199 } |
| 200 } |
| 201 |
| 202 bool ThreadFunc(Histogram* histogram, |
| 203 size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { |
| 204 void* got_tag; |
| 205 bool ok; |
| 206 |
| 207 if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { |
| 208 // Got a regular event, so process it |
| 209 ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
| 210 if (!ctx->RunNextState(ok, histogram)) { |
| 211 // The RPC and callback are done, so clone the ctx |
| 212 // and kickstart the new one |
| 213 auto clone = ctx->StartNewClone(); |
| 214 clone->Start(cli_cqs_[thread_idx].get()); |
| 215 // delete the old version |
| 216 delete ctx; |
| 217 } |
| 218 return true; |
| 219 } else { // queue is shutting down |
| 220 return false; |
| 221 } |
| 222 } |
| 223 |
| 224 protected: |
| 225 const int num_async_threads_; |
| 226 |
| 227 private: |
| 228 int NumThreads(const ClientConfig& config) { |
| 229 int num_threads = config.async_client_threads(); |
| 230 if (num_threads <= 0) { // Use dynamic sizing |
| 231 num_threads = cores_; |
| 232 gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); |
| 233 } |
| 234 return num_threads; |
| 235 } |
| 236 |
| 237 std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; |
| 238 std::vector<std::function<gpr_timespec()>> next_issuers_; |
| 239 }; |
| 240 |
| 241 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( |
| 242 std::shared_ptr<Channel> ch) { |
| 243 return BenchmarkService::NewStub(ch); |
| 244 } |
| 245 |
| 246 class AsyncUnaryClient GRPC_FINAL |
| 247 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { |
| 248 public: |
| 249 explicit AsyncUnaryClient(const ClientConfig& config) |
| 250 : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { |
| 251 StartThreads(num_async_threads_); |
| 252 } |
| 253 ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } |
| 254 |
| 255 private: |
| 256 static void CheckDone(grpc::Status s, SimpleResponse* response) {} |
| 257 static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> |
| 258 StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, |
| 259 const SimpleRequest& request, CompletionQueue* cq) { |
| 260 return stub->AsyncUnaryCall(ctx, request, cq); |
| 261 }; |
| 262 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, |
| 263 std::function<gpr_timespec()> next_issue, |
| 264 const SimpleRequest& req) { |
| 265 return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( |
| 266 stub, req, next_issue, AsyncUnaryClient::StartReq, |
| 267 AsyncUnaryClient::CheckDone); |
| 268 } |
| 269 }; |
| 270 |
| 271 template <class RequestType, class ResponseType> |
| 272 class ClientRpcContextStreamingImpl : public ClientRpcContext { |
| 273 public: |
| 274 ClientRpcContextStreamingImpl( |
| 275 BenchmarkService::Stub* stub, const RequestType& req, |
| 276 std::function<gpr_timespec()> next_issue, |
| 277 std::function<std::unique_ptr< |
| 278 grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( |
| 279 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, |
| 280 void*)> start_req, |
| 281 std::function<void(grpc::Status, ResponseType*)> on_done) |
| 282 : context_(), |
| 283 stub_(stub), |
| 284 cq_(nullptr), |
| 285 req_(req), |
| 286 response_(), |
| 287 next_state_(State::INVALID), |
| 288 callback_(on_done), |
| 289 next_issue_(next_issue), |
| 290 start_req_(start_req) {} |
| 291 ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} |
| 292 void Start(CompletionQueue* cq) GRPC_OVERRIDE { |
| 293 cq_ = cq; |
| 294 stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); |
| 295 next_state_ = State::STREAM_IDLE; |
| 296 } |
| 297 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
| 298 while (true) { |
| 299 switch (next_state_) { |
| 300 case State::STREAM_IDLE: |
| 301 if (!next_issue_) { // ready to issue |
| 302 next_state_ = State::READY_TO_WRITE; |
| 303 } else { |
| 304 next_state_ = State::WAIT; |
| 305 } |
| 306 break; // loop around, don't return |
| 307 case State::WAIT: |
| 308 alarm_.reset( |
| 309 new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); |
| 310 next_state_ = State::READY_TO_WRITE; |
| 311 return true; |
| 312 case State::READY_TO_WRITE: |
| 313 if (!ok) { |
| 314 return false; |
| 315 } |
| 316 start_ = UsageTimer::Now(); |
| 317 next_state_ = State::WRITE_DONE; |
| 318 stream_->Write(req_, ClientRpcContext::tag(this)); |
| 319 return true; |
| 320 case State::WRITE_DONE: |
| 321 if (!ok) { |
| 322 return false; |
| 323 } |
| 324 next_state_ = State::READ_DONE; |
| 325 stream_->Read(&response_, ClientRpcContext::tag(this)); |
| 326 return true; |
| 327 break; |
| 328 case State::READ_DONE: |
| 329 hist->Add((UsageTimer::Now() - start_) * 1e9); |
| 330 callback_(status_, &response_); |
| 331 next_state_ = State::STREAM_IDLE; |
| 332 break; // loop around |
| 333 default: |
| 334 GPR_ASSERT(false); |
| 335 return false; |
| 336 } |
| 337 } |
| 338 } |
| 339 ClientRpcContext* StartNewClone() GRPC_OVERRIDE { |
| 340 return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_, |
| 341 start_req_, callback_); |
| 342 } |
| 343 |
| 344 private: |
| 345 grpc::ClientContext context_; |
| 346 BenchmarkService::Stub* stub_; |
| 347 CompletionQueue* cq_; |
| 348 std::unique_ptr<Alarm> alarm_; |
| 349 RequestType req_; |
| 350 ResponseType response_; |
| 351 enum State { |
| 352 INVALID, |
| 353 STREAM_IDLE, |
| 354 WAIT, |
| 355 READY_TO_WRITE, |
| 356 WRITE_DONE, |
| 357 READ_DONE |
| 358 }; |
| 359 State next_state_; |
| 360 std::function<void(grpc::Status, ResponseType*)> callback_; |
| 361 std::function<gpr_timespec()> next_issue_; |
| 362 std::function< |
| 363 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( |
| 364 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, |
| 365 void*)> start_req_; |
| 366 grpc::Status status_; |
| 367 double start_; |
| 368 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> |
| 369 stream_; |
| 370 }; |
| 371 |
| 372 class AsyncStreamingClient GRPC_FINAL |
| 373 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { |
| 374 public: |
| 375 explicit AsyncStreamingClient(const ClientConfig& config) |
| 376 : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { |
| 377 StartThreads(num_async_threads_); |
| 378 } |
| 379 |
| 380 ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } |
| 381 |
| 382 private: |
| 383 static void CheckDone(grpc::Status s, SimpleResponse* response) {} |
| 384 static std::unique_ptr< |
| 385 grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>> |
| 386 StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, |
| 387 CompletionQueue* cq, void* tag) { |
| 388 auto stream = stub->AsyncStreamingCall(ctx, cq, tag); |
| 389 return stream; |
| 390 }; |
| 391 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, |
| 392 std::function<gpr_timespec()> next_issue, |
| 393 const SimpleRequest& req) { |
| 394 return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( |
| 395 stub, req, next_issue, AsyncStreamingClient::StartReq, |
| 396 AsyncStreamingClient::CheckDone); |
| 397 } |
| 398 }; |
| 399 |
| 400 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { |
| 401 public: |
| 402 ClientRpcContextGenericStreamingImpl( |
| 403 grpc::GenericStub* stub, const ByteBuffer& req, |
| 404 std::function<gpr_timespec()> next_issue, |
| 405 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( |
| 406 grpc::GenericStub*, grpc::ClientContext*, |
| 407 const grpc::string& method_name, CompletionQueue*, void*)> start_req, |
| 408 std::function<void(grpc::Status, ByteBuffer*)> on_done) |
| 409 : context_(), |
| 410 stub_(stub), |
| 411 cq_(nullptr), |
| 412 req_(req), |
| 413 response_(), |
| 414 next_state_(State::INVALID), |
| 415 callback_(on_done), |
| 416 next_issue_(next_issue), |
| 417 start_req_(start_req) {} |
| 418 ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} |
| 419 void Start(CompletionQueue* cq) GRPC_OVERRIDE { |
| 420 cq_ = cq; |
| 421 const grpc::string kMethodName( |
| 422 "/grpc.testing.BenchmarkService/StreamingCall"); |
| 423 stream_ = start_req_(stub_, &context_, kMethodName, cq, |
| 424 ClientRpcContext::tag(this)); |
| 425 next_state_ = State::STREAM_IDLE; |
| 426 } |
| 427 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { |
| 428 while (true) { |
| 429 switch (next_state_) { |
| 430 case State::STREAM_IDLE: |
| 431 if (!next_issue_) { // ready to issue |
| 432 next_state_ = State::READY_TO_WRITE; |
| 433 } else { |
| 434 next_state_ = State::WAIT; |
| 435 } |
| 436 break; // loop around, don't return |
| 437 case State::WAIT: |
| 438 alarm_.reset( |
| 439 new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); |
| 440 next_state_ = State::READY_TO_WRITE; |
| 441 return true; |
| 442 case State::READY_TO_WRITE: |
| 443 if (!ok) { |
| 444 return false; |
| 445 } |
| 446 start_ = UsageTimer::Now(); |
| 447 next_state_ = State::WRITE_DONE; |
| 448 stream_->Write(req_, ClientRpcContext::tag(this)); |
| 449 return true; |
| 450 case State::WRITE_DONE: |
| 451 if (!ok) { |
| 452 return false; |
| 453 } |
| 454 next_state_ = State::READ_DONE; |
| 455 stream_->Read(&response_, ClientRpcContext::tag(this)); |
| 456 return true; |
| 457 break; |
| 458 case State::READ_DONE: |
| 459 hist->Add((UsageTimer::Now() - start_) * 1e9); |
| 460 callback_(status_, &response_); |
| 461 next_state_ = State::STREAM_IDLE; |
| 462 break; // loop around |
| 463 default: |
| 464 GPR_ASSERT(false); |
| 465 return false; |
| 466 } |
| 467 } |
| 468 } |
| 469 ClientRpcContext* StartNewClone() GRPC_OVERRIDE { |
| 470 return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_, |
| 471 start_req_, callback_); |
| 472 } |
| 473 |
| 474 private: |
| 475 grpc::ClientContext context_; |
| 476 grpc::GenericStub* stub_; |
| 477 CompletionQueue* cq_; |
| 478 std::unique_ptr<Alarm> alarm_; |
| 479 ByteBuffer req_; |
| 480 ByteBuffer response_; |
| 481 enum State { |
| 482 INVALID, |
| 483 STREAM_IDLE, |
| 484 WAIT, |
| 485 READY_TO_WRITE, |
| 486 WRITE_DONE, |
| 487 READ_DONE |
| 488 }; |
| 489 State next_state_; |
| 490 std::function<void(grpc::Status, ByteBuffer*)> callback_; |
| 491 std::function<gpr_timespec()> next_issue_; |
| 492 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( |
| 493 grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, |
| 494 CompletionQueue*, void*)> start_req_; |
| 495 grpc::Status status_; |
| 496 double start_; |
| 497 std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; |
| 498 }; |
| 499 |
| 500 static std::unique_ptr<grpc::GenericStub> GenericStubCreator( |
| 501 std::shared_ptr<Channel> ch) { |
| 502 return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch)); |
| 503 } |
| 504 |
| 505 class GenericAsyncStreamingClient GRPC_FINAL |
| 506 : public AsyncClient<grpc::GenericStub, ByteBuffer> { |
| 507 public: |
| 508 explicit GenericAsyncStreamingClient(const ClientConfig& config) |
| 509 : AsyncClient(config, SetupCtx, GenericStubCreator) { |
| 510 StartThreads(num_async_threads_); |
| 511 } |
| 512 |
| 513 ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } |
| 514 |
| 515 private: |
| 516 static void CheckDone(grpc::Status s, ByteBuffer* response) {} |
| 517 static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( |
| 518 grpc::GenericStub* stub, grpc::ClientContext* ctx, |
| 519 const grpc::string& method_name, CompletionQueue* cq, void* tag) { |
| 520 auto stream = stub->Call(ctx, method_name, cq, tag); |
| 521 return stream; |
| 522 }; |
| 523 static ClientRpcContext* SetupCtx(grpc::GenericStub* stub, |
| 524 std::function<gpr_timespec()> next_issue, |
| 525 const ByteBuffer& req) { |
| 526 return new ClientRpcContextGenericStreamingImpl( |
| 527 stub, req, next_issue, GenericAsyncStreamingClient::StartReq, |
| 528 GenericAsyncStreamingClient::CheckDone); |
| 529 } |
| 530 }; |
| 531 |
| 532 std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) { |
| 533 return std::unique_ptr<Client>(new AsyncUnaryClient(args)); |
| 534 } |
| 535 std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) { |
| 536 return std::unique_ptr<Client>(new AsyncStreamingClient(args)); |
| 537 } |
| 538 std::unique_ptr<Client> CreateGenericAsyncStreamingClient( |
| 539 const ClientConfig& args) { |
| 540 return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args)); |
| 541 } |
| 542 |
| 543 } // namespace testing |
| 544 } // namespace grpc |
OLD | NEW |