| Index: third_party/grpc/src/cpp/server/server.cc
|
| diff --git a/third_party/grpc/src/cpp/server/server.cc b/third_party/grpc/src/cpp/server/server.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6d31a608c803ded3b473e1a9ba8e9fa60e277bf1
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/cpp/server/server.cc
|
| @@ -0,0 +1,586 @@
|
| +/*
|
| + *
|
| + * Copyright 2015-2016, Google Inc.
|
| + * All rights reserved.
|
| + *
|
| + * Redistribution and use in source and binary forms, with or without
|
| + * modification, are permitted provided that the following conditions are
|
| + * met:
|
| + *
|
| + * * Redistributions of source code must retain the above copyright
|
| + * notice, this list of conditions and the following disclaimer.
|
| + * * Redistributions in binary form must reproduce the above
|
| + * copyright notice, this list of conditions and the following disclaimer
|
| + * in the documentation and/or other materials provided with the
|
| + * distribution.
|
| + * * Neither the name of Google Inc. nor the names of its
|
| + * contributors may be used to endorse or promote products derived from
|
| + * this software without specific prior written permission.
|
| + *
|
| + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| + *
|
| + */
|
| +
|
| +#include <grpc++/server.h>
|
| +
|
| +#include <utility>
|
| +
|
| +#include <grpc++/completion_queue.h>
|
| +#include <grpc++/generic/async_generic_service.h>
|
| +#include <grpc++/impl/codegen/completion_queue_tag.h>
|
| +#include <grpc++/impl/grpc_library.h>
|
| +#include <grpc++/impl/method_handler_impl.h>
|
| +#include <grpc++/impl/rpc_service_method.h>
|
| +#include <grpc++/impl/service_type.h>
|
| +#include <grpc++/security/server_credentials.h>
|
| +#include <grpc++/server_context.h>
|
| +#include <grpc++/support/time.h>
|
| +#include <grpc/grpc.h>
|
| +#include <grpc/support/alloc.h>
|
| +#include <grpc/support/log.h>
|
| +
|
| +#include "src/core/profiling/timers.h"
|
| +#include "src/cpp/server/thread_pool_interface.h"
|
| +
|
| +namespace grpc {
|
| +
|
| +class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks {
|
| + public:
|
| + ~DefaultGlobalCallbacks() GRPC_OVERRIDE {}
|
| + void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
|
| + void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
|
| +};
|
| +
|
| +static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
|
| +static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
|
| +
|
| +static void InitGlobalCallbacks() {
|
| + if (g_callbacks == nullptr) {
|
| + g_callbacks.reset(new DefaultGlobalCallbacks());
|
| + }
|
| +}
|
| +
|
| +class Server::UnimplementedAsyncRequestContext {
|
| + protected:
|
| + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
|
| +
|
| + GenericServerContext server_context_;
|
| + GenericServerAsyncReaderWriter generic_stream_;
|
| +};
|
| +
|
| +class Server::UnimplementedAsyncRequest GRPC_FINAL
|
| + : public UnimplementedAsyncRequestContext,
|
| + public GenericAsyncRequest {
|
| + public:
|
| + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
|
| + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
|
| + NULL, false),
|
| + server_(server),
|
| + cq_(cq) {}
|
| +
|
| + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
|
| +
|
| + ServerContext* context() { return &server_context_; }
|
| + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
|
| +
|
| + private:
|
| + Server* const server_;
|
| + ServerCompletionQueue* const cq_;
|
| +};
|
| +
|
| +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
|
| + UnimplementedAsyncResponseOp;
|
| +class Server::UnimplementedAsyncResponse GRPC_FINAL
|
| + : public UnimplementedAsyncResponseOp {
|
| + public:
|
| + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
|
| + ~UnimplementedAsyncResponse() { delete request_; }
|
| +
|
| + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
|
| + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
|
| + delete this;
|
| + return r;
|
| + }
|
| +
|
| + private:
|
| + UnimplementedAsyncRequest* const request_;
|
| +};
|
| +
|
| +class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
|
| + public:
|
| + bool FinalizeResult(void** tag, bool* status) {
|
| + delete this;
|
| + return false;
|
| + }
|
| +};
|
| +
|
| +class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
|
| + public:
|
| + SyncRequest(RpcServiceMethod* method, void* tag)
|
| + : method_(method),
|
| + tag_(tag),
|
| + in_flight_(false),
|
| + has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
|
| + method->method_type() ==
|
| + RpcMethod::SERVER_STREAMING),
|
| + call_details_(nullptr),
|
| + cq_(nullptr) {
|
| + grpc_metadata_array_init(&request_metadata_);
|
| + }
|
| +
|
| + ~SyncRequest() {
|
| + if (call_details_) {
|
| + delete call_details_;
|
| + }
|
| + grpc_metadata_array_destroy(&request_metadata_);
|
| + }
|
| +
|
| + static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
|
| + void* tag = nullptr;
|
| + *ok = false;
|
| + if (!cq->Next(&tag, ok)) {
|
| + return nullptr;
|
| + }
|
| + auto* mrd = static_cast<SyncRequest*>(tag);
|
| + GPR_ASSERT(mrd->in_flight_);
|
| + return mrd;
|
| + }
|
| +
|
| + static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
|
| + gpr_timespec deadline) {
|
| + void* tag = nullptr;
|
| + *ok = false;
|
| + switch (cq->AsyncNext(&tag, ok, deadline)) {
|
| + case CompletionQueue::TIMEOUT:
|
| + *req = nullptr;
|
| + return true;
|
| + case CompletionQueue::SHUTDOWN:
|
| + *req = nullptr;
|
| + return false;
|
| + case CompletionQueue::GOT_EVENT:
|
| + *req = static_cast<SyncRequest*>(tag);
|
| + GPR_ASSERT((*req)->in_flight_);
|
| + return true;
|
| + }
|
| + GPR_UNREACHABLE_CODE(return false);
|
| + }
|
| +
|
| + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
|
| +
|
| + void TeardownRequest() {
|
| + grpc_completion_queue_destroy(cq_);
|
| + cq_ = nullptr;
|
| + }
|
| +
|
| + void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
|
| + GPR_ASSERT(cq_ && !in_flight_);
|
| + in_flight_ = true;
|
| + if (tag_) {
|
| + GPR_ASSERT(GRPC_CALL_OK ==
|
| + grpc_server_request_registered_call(
|
| + server, tag_, &call_, &deadline_, &request_metadata_,
|
| + has_request_payload_ ? &request_payload_ : nullptr, cq_,
|
| + notify_cq, this));
|
| + } else {
|
| + if (!call_details_) {
|
| + call_details_ = new grpc_call_details;
|
| + grpc_call_details_init(call_details_);
|
| + }
|
| + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
|
| + server, &call_, call_details_,
|
| + &request_metadata_, cq_, notify_cq, this));
|
| + }
|
| + }
|
| +
|
| + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
|
| + if (!*status) {
|
| + grpc_completion_queue_destroy(cq_);
|
| + }
|
| + if (call_details_) {
|
| + deadline_ = call_details_->deadline;
|
| + grpc_call_details_destroy(call_details_);
|
| + grpc_call_details_init(call_details_);
|
| + }
|
| + return true;
|
| + }
|
| +
|
| + class CallData GRPC_FINAL {
|
| + public:
|
| + explicit CallData(Server* server, SyncRequest* mrd)
|
| + : cq_(mrd->cq_),
|
| + call_(mrd->call_, server, &cq_, server->max_message_size_),
|
| + ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
|
| + mrd->request_metadata_.count),
|
| + has_request_payload_(mrd->has_request_payload_),
|
| + request_payload_(mrd->request_payload_),
|
| + method_(mrd->method_) {
|
| + ctx_.set_call(mrd->call_);
|
| + ctx_.cq_ = &cq_;
|
| + GPR_ASSERT(mrd->in_flight_);
|
| + mrd->in_flight_ = false;
|
| + mrd->request_metadata_.count = 0;
|
| + }
|
| +
|
| + ~CallData() {
|
| + if (has_request_payload_ && request_payload_) {
|
| + grpc_byte_buffer_destroy(request_payload_);
|
| + }
|
| + }
|
| +
|
| + void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
|
| + ctx_.BeginCompletionOp(&call_);
|
| + global_callbacks->PreSynchronousRequest(&ctx_);
|
| + method_->handler()->RunHandler(MethodHandler::HandlerParameter(
|
| + &call_, &ctx_, request_payload_, call_.max_message_size()));
|
| + global_callbacks->PostSynchronousRequest(&ctx_);
|
| + request_payload_ = nullptr;
|
| + void* ignored_tag;
|
| + bool ignored_ok;
|
| + cq_.Shutdown();
|
| + GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
|
| + }
|
| +
|
| + private:
|
| + CompletionQueue cq_;
|
| + Call call_;
|
| + ServerContext ctx_;
|
| + const bool has_request_payload_;
|
| + grpc_byte_buffer* request_payload_;
|
| + RpcServiceMethod* const method_;
|
| + };
|
| +
|
| + private:
|
| + RpcServiceMethod* const method_;
|
| + void* const tag_;
|
| + bool in_flight_;
|
| + const bool has_request_payload_;
|
| + grpc_call* call_;
|
| + grpc_call_details* call_details_;
|
| + gpr_timespec deadline_;
|
| + grpc_metadata_array request_metadata_;
|
| + grpc_byte_buffer* request_payload_;
|
| + grpc_completion_queue* cq_;
|
| +};
|
| +
|
| +static internal::GrpcLibraryInitializer g_gli_initializer;
|
| +Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
|
| + int max_message_size, ChannelArguments* args)
|
| + : max_message_size_(max_message_size),
|
| + started_(false),
|
| + shutdown_(false),
|
| + num_running_cb_(0),
|
| + sync_methods_(new std::list<SyncRequest>),
|
| + has_generic_service_(false),
|
| + server_(nullptr),
|
| + thread_pool_(thread_pool),
|
| + thread_pool_owned_(thread_pool_owned) {
|
| + g_gli_initializer.summon();
|
| + gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
|
| + global_callbacks_ = g_callbacks;
|
| + global_callbacks_->UpdateArguments(args);
|
| + grpc_channel_args channel_args;
|
| + args->SetChannelArgs(&channel_args);
|
| + server_ = grpc_server_create(&channel_args, nullptr);
|
| + grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
|
| +}
|
| +
|
| +Server::~Server() {
|
| + {
|
| + grpc::unique_lock<grpc::mutex> lock(mu_);
|
| + if (started_ && !shutdown_) {
|
| + lock.unlock();
|
| + Shutdown();
|
| + } else if (!started_) {
|
| + cq_.Shutdown();
|
| + }
|
| + }
|
| + void* got_tag;
|
| + bool ok;
|
| + GPR_ASSERT(!cq_.Next(&got_tag, &ok));
|
| + grpc_server_destroy(server_);
|
| + if (thread_pool_owned_) {
|
| + delete thread_pool_;
|
| + }
|
| + delete sync_methods_;
|
| +}
|
| +
|
| +void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
|
| + GPR_ASSERT(g_callbacks == nullptr);
|
| + GPR_ASSERT(callbacks != nullptr);
|
| + g_callbacks.reset(callbacks);
|
| +}
|
| +
|
| +bool Server::RegisterService(const grpc::string* host, Service* service) {
|
| + bool has_async_methods = service->has_async_methods();
|
| + if (has_async_methods) {
|
| + GPR_ASSERT(service->server_ == nullptr &&
|
| + "Can only register an asynchronous service against one server.");
|
| + service->server_ = this;
|
| + }
|
| + for (auto it = service->methods_.begin(); it != service->methods_.end();
|
| + ++it) {
|
| + if (it->get() == nullptr) { // Handled by generic service if any.
|
| + continue;
|
| + }
|
| + RpcServiceMethod* method = it->get();
|
| + void* tag = grpc_server_register_method(server_, method->name(),
|
| + host ? host->c_str() : nullptr);
|
| + if (tag == nullptr) {
|
| + gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
|
| + method->name());
|
| + return false;
|
| + }
|
| + if (method->handler() == nullptr) {
|
| + method->set_server_tag(tag);
|
| + } else {
|
| + sync_methods_->emplace_back(method, tag);
|
| + }
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
|
| + GPR_ASSERT(service->server_ == nullptr &&
|
| + "Can only register an async generic service against one server.");
|
| + service->server_ = this;
|
| + has_generic_service_ = true;
|
| +}
|
| +
|
| +int Server::AddListeningPort(const grpc::string& addr,
|
| + ServerCredentials* creds) {
|
| + GPR_ASSERT(!started_);
|
| + return creds->AddPortToServer(addr, server_);
|
| +}
|
| +
|
| +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
|
| + GPR_ASSERT(!started_);
|
| + started_ = true;
|
| + grpc_server_start(server_);
|
| +
|
| + if (!has_generic_service_) {
|
| + if (!sync_methods_->empty()) {
|
| + unknown_method_.reset(new RpcServiceMethod(
|
| + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
|
| + // Use of emplace_back with just constructor arguments is not accepted
|
| + // here by gcc-4.4 because it can't match the anonymous nullptr with a
|
| + // proper constructor implicitly. Construct the object and use push_back.
|
| + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
|
| + }
|
| + for (size_t i = 0; i < num_cqs; i++) {
|
| + new UnimplementedAsyncRequest(this, cqs[i]);
|
| + }
|
| + }
|
| + // Start processing rpcs.
|
| + if (!sync_methods_->empty()) {
|
| + for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
|
| + m->SetupRequest();
|
| + m->Request(server_, cq_.cq());
|
| + }
|
| +
|
| + ScheduleCallback();
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +void Server::ShutdownInternal(gpr_timespec deadline) {
|
| + grpc::unique_lock<grpc::mutex> lock(mu_);
|
| + if (started_ && !shutdown_) {
|
| + shutdown_ = true;
|
| + grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
|
| + cq_.Shutdown();
|
| + lock.unlock();
|
| + // Spin, eating requests until the completion queue is completely shutdown.
|
| + // If the deadline expires then cancel anything that's pending and keep
|
| + // spinning forever until the work is actually drained.
|
| + // Since nothing else needs to touch state guarded by mu_, holding it
|
| + // through this loop is fine.
|
| + SyncRequest* request;
|
| + bool ok;
|
| + while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
|
| + if (request == NULL) { // deadline expired
|
| + grpc_server_cancel_all_calls(server_);
|
| + deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
|
| + } else if (ok) {
|
| + SyncRequest::CallData call_data(this, request);
|
| + }
|
| + }
|
| + lock.lock();
|
| +
|
| + // Wait for running callbacks to finish.
|
| + while (num_running_cb_ != 0) {
|
| + callback_cv_.wait(lock);
|
| + }
|
| + }
|
| +}
|
| +
|
| +void Server::Wait() {
|
| + grpc::unique_lock<grpc::mutex> lock(mu_);
|
| + while (num_running_cb_ != 0) {
|
| + callback_cv_.wait(lock);
|
| + }
|
| +}
|
| +
|
| +void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
|
| + static const size_t MAX_OPS = 8;
|
| + size_t nops = 0;
|
| + grpc_op cops[MAX_OPS];
|
| + ops->FillOps(cops, &nops);
|
| + auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
|
| + GPR_ASSERT(GRPC_CALL_OK == result);
|
| +}
|
| +
|
| +ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
|
| + ServerInterface* server, ServerContext* context,
|
| + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
|
| + bool delete_on_finalize)
|
| + : server_(server),
|
| + context_(context),
|
| + stream_(stream),
|
| + call_cq_(call_cq),
|
| + tag_(tag),
|
| + delete_on_finalize_(delete_on_finalize),
|
| + call_(nullptr) {
|
| + memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
|
| +}
|
| +
|
| +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
|
| + bool* status) {
|
| + if (*status) {
|
| + for (size_t i = 0; i < initial_metadata_array_.count; i++) {
|
| + context_->client_metadata_.insert(
|
| + std::pair<grpc::string_ref, grpc::string_ref>(
|
| + initial_metadata_array_.metadata[i].key,
|
| + grpc::string_ref(
|
| + initial_metadata_array_.metadata[i].value,
|
| + initial_metadata_array_.metadata[i].value_length)));
|
| + }
|
| + }
|
| + grpc_metadata_array_destroy(&initial_metadata_array_);
|
| + context_->set_call(call_);
|
| + context_->cq_ = call_cq_;
|
| + Call call(call_, server_, call_cq_, server_->max_message_size());
|
| + if (*status && call_) {
|
| + context_->BeginCompletionOp(&call);
|
| + }
|
| + // just the pointers inside call are copied here
|
| + stream_->BindCall(&call);
|
| + *tag = tag_;
|
| + if (delete_on_finalize_) {
|
| + delete this;
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
|
| + ServerInterface* server, ServerContext* context,
|
| + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
|
| + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
|
| +
|
| +void ServerInterface::RegisteredAsyncRequest::IssueRequest(
|
| + void* registered_method, grpc_byte_buffer** payload,
|
| + ServerCompletionQueue* notification_cq) {
|
| + grpc_server_request_registered_call(
|
| + server_->server(), registered_method, &call_, &context_->deadline_,
|
| + &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
|
| + this);
|
| +}
|
| +
|
| +ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
|
| + ServerInterface* server, GenericServerContext* context,
|
| + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
|
| + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
|
| + : BaseAsyncRequest(server, context, stream, call_cq, tag,
|
| + delete_on_finalize) {
|
| + grpc_call_details_init(&call_details_);
|
| + GPR_ASSERT(notification_cq);
|
| + GPR_ASSERT(call_cq);
|
| + grpc_server_request_call(server->server(), &call_, &call_details_,
|
| + &initial_metadata_array_, call_cq->cq(),
|
| + notification_cq->cq(), this);
|
| +}
|
| +
|
| +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
|
| + bool* status) {
|
| + // TODO(yangg) remove the copy here.
|
| + if (*status) {
|
| + static_cast<GenericServerContext*>(context_)->method_ =
|
| + call_details_.method;
|
| + static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
|
| + }
|
| + gpr_free(call_details_.method);
|
| + gpr_free(call_details_.host);
|
| + return BaseAsyncRequest::FinalizeResult(tag, status);
|
| +}
|
| +
|
| +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
|
| + bool* status) {
|
| + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
|
| + new UnimplementedAsyncRequest(server_, cq_);
|
| + new UnimplementedAsyncResponse(this);
|
| + } else {
|
| + delete this;
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
|
| + UnimplementedAsyncRequest* request)
|
| + : request_(request) {
|
| + Status status(StatusCode::UNIMPLEMENTED, "");
|
| + UnknownMethodHandler::FillOps(request_->context(), this);
|
| + request_->stream()->call_.PerformOps(this);
|
| +}
|
| +
|
| +void Server::ScheduleCallback() {
|
| + {
|
| + grpc::unique_lock<grpc::mutex> lock(mu_);
|
| + num_running_cb_++;
|
| + }
|
| + thread_pool_->Add(std::bind(&Server::RunRpc, this));
|
| +}
|
| +
|
| +void Server::RunRpc() {
|
| + // Wait for one more incoming rpc.
|
| + bool ok;
|
| + GPR_TIMER_SCOPE("Server::RunRpc", 0);
|
| + auto* mrd = SyncRequest::Wait(&cq_, &ok);
|
| + if (mrd) {
|
| + ScheduleCallback();
|
| + if (ok) {
|
| + SyncRequest::CallData cd(this, mrd);
|
| + {
|
| + mrd->SetupRequest();
|
| + grpc::unique_lock<grpc::mutex> lock(mu_);
|
| + if (!shutdown_) {
|
| + mrd->Request(server_, cq_.cq());
|
| + } else {
|
| + // destroy the structure that was created
|
| + mrd->TeardownRequest();
|
| + }
|
| + }
|
| + GPR_TIMER_SCOPE("cd.Run()", 0);
|
| + cd.Run(global_callbacks_);
|
| + }
|
| + }
|
| +
|
| + {
|
| + grpc::unique_lock<grpc::mutex> lock(mu_);
|
| + num_running_cb_--;
|
| + if (shutdown_) {
|
| + callback_cv_.notify_all();
|
| + }
|
| + }
|
| +}
|
| +
|
| +} // namespace grpc
|
|
|