OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include <grpc++/server.h> |
| 35 |
| 36 #include <utility> |
| 37 |
| 38 #include <grpc++/completion_queue.h> |
| 39 #include <grpc++/generic/async_generic_service.h> |
| 40 #include <grpc++/impl/codegen/completion_queue_tag.h> |
| 41 #include <grpc++/impl/grpc_library.h> |
| 42 #include <grpc++/impl/method_handler_impl.h> |
| 43 #include <grpc++/impl/rpc_service_method.h> |
| 44 #include <grpc++/impl/service_type.h> |
| 45 #include <grpc++/security/server_credentials.h> |
| 46 #include <grpc++/server_context.h> |
| 47 #include <grpc++/support/time.h> |
| 48 #include <grpc/grpc.h> |
| 49 #include <grpc/support/alloc.h> |
| 50 #include <grpc/support/log.h> |
| 51 |
| 52 #include "src/core/profiling/timers.h" |
| 53 #include "src/cpp/server/thread_pool_interface.h" |
| 54 |
| 55 namespace grpc { |
| 56 |
| 57 class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks { |
| 58 public: |
| 59 ~DefaultGlobalCallbacks() GRPC_OVERRIDE {} |
| 60 void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} |
| 61 void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} |
| 62 }; |
| 63 |
| 64 static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; |
| 65 static gpr_once g_once_init_callbacks = GPR_ONCE_INIT; |
| 66 |
| 67 static void InitGlobalCallbacks() { |
| 68 if (g_callbacks == nullptr) { |
| 69 g_callbacks.reset(new DefaultGlobalCallbacks()); |
| 70 } |
| 71 } |
| 72 |
| 73 class Server::UnimplementedAsyncRequestContext { |
| 74 protected: |
| 75 UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} |
| 76 |
| 77 GenericServerContext server_context_; |
| 78 GenericServerAsyncReaderWriter generic_stream_; |
| 79 }; |
| 80 |
| 81 class Server::UnimplementedAsyncRequest GRPC_FINAL |
| 82 : public UnimplementedAsyncRequestContext, |
| 83 public GenericAsyncRequest { |
| 84 public: |
| 85 UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) |
| 86 : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, |
| 87 NULL, false), |
| 88 server_(server), |
| 89 cq_(cq) {} |
| 90 |
| 91 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; |
| 92 |
| 93 ServerContext* context() { return &server_context_; } |
| 94 GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } |
| 95 |
| 96 private: |
| 97 Server* const server_; |
| 98 ServerCompletionQueue* const cq_; |
| 99 }; |
| 100 |
| 101 typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> |
| 102 UnimplementedAsyncResponseOp; |
| 103 class Server::UnimplementedAsyncResponse GRPC_FINAL |
| 104 : public UnimplementedAsyncResponseOp { |
| 105 public: |
| 106 UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); |
| 107 ~UnimplementedAsyncResponse() { delete request_; } |
| 108 |
| 109 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { |
| 110 bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); |
| 111 delete this; |
| 112 return r; |
| 113 } |
| 114 |
| 115 private: |
| 116 UnimplementedAsyncRequest* const request_; |
| 117 }; |
| 118 |
| 119 class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { |
| 120 public: |
| 121 bool FinalizeResult(void** tag, bool* status) { |
| 122 delete this; |
| 123 return false; |
| 124 } |
| 125 }; |
| 126 |
| 127 class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { |
| 128 public: |
| 129 SyncRequest(RpcServiceMethod* method, void* tag) |
| 130 : method_(method), |
| 131 tag_(tag), |
| 132 in_flight_(false), |
| 133 has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || |
| 134 method->method_type() == |
| 135 RpcMethod::SERVER_STREAMING), |
| 136 call_details_(nullptr), |
| 137 cq_(nullptr) { |
| 138 grpc_metadata_array_init(&request_metadata_); |
| 139 } |
| 140 |
| 141 ~SyncRequest() { |
| 142 if (call_details_) { |
| 143 delete call_details_; |
| 144 } |
| 145 grpc_metadata_array_destroy(&request_metadata_); |
| 146 } |
| 147 |
| 148 static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { |
| 149 void* tag = nullptr; |
| 150 *ok = false; |
| 151 if (!cq->Next(&tag, ok)) { |
| 152 return nullptr; |
| 153 } |
| 154 auto* mrd = static_cast<SyncRequest*>(tag); |
| 155 GPR_ASSERT(mrd->in_flight_); |
| 156 return mrd; |
| 157 } |
| 158 |
| 159 static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, |
| 160 gpr_timespec deadline) { |
| 161 void* tag = nullptr; |
| 162 *ok = false; |
| 163 switch (cq->AsyncNext(&tag, ok, deadline)) { |
| 164 case CompletionQueue::TIMEOUT: |
| 165 *req = nullptr; |
| 166 return true; |
| 167 case CompletionQueue::SHUTDOWN: |
| 168 *req = nullptr; |
| 169 return false; |
| 170 case CompletionQueue::GOT_EVENT: |
| 171 *req = static_cast<SyncRequest*>(tag); |
| 172 GPR_ASSERT((*req)->in_flight_); |
| 173 return true; |
| 174 } |
| 175 GPR_UNREACHABLE_CODE(return false); |
| 176 } |
| 177 |
| 178 void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } |
| 179 |
| 180 void TeardownRequest() { |
| 181 grpc_completion_queue_destroy(cq_); |
| 182 cq_ = nullptr; |
| 183 } |
| 184 |
| 185 void Request(grpc_server* server, grpc_completion_queue* notify_cq) { |
| 186 GPR_ASSERT(cq_ && !in_flight_); |
| 187 in_flight_ = true; |
| 188 if (tag_) { |
| 189 GPR_ASSERT(GRPC_CALL_OK == |
| 190 grpc_server_request_registered_call( |
| 191 server, tag_, &call_, &deadline_, &request_metadata_, |
| 192 has_request_payload_ ? &request_payload_ : nullptr, cq_, |
| 193 notify_cq, this)); |
| 194 } else { |
| 195 if (!call_details_) { |
| 196 call_details_ = new grpc_call_details; |
| 197 grpc_call_details_init(call_details_); |
| 198 } |
| 199 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( |
| 200 server, &call_, call_details_, |
| 201 &request_metadata_, cq_, notify_cq, this)); |
| 202 } |
| 203 } |
| 204 |
| 205 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { |
| 206 if (!*status) { |
| 207 grpc_completion_queue_destroy(cq_); |
| 208 } |
| 209 if (call_details_) { |
| 210 deadline_ = call_details_->deadline; |
| 211 grpc_call_details_destroy(call_details_); |
| 212 grpc_call_details_init(call_details_); |
| 213 } |
| 214 return true; |
| 215 } |
| 216 |
| 217 class CallData GRPC_FINAL { |
| 218 public: |
| 219 explicit CallData(Server* server, SyncRequest* mrd) |
| 220 : cq_(mrd->cq_), |
| 221 call_(mrd->call_, server, &cq_, server->max_message_size_), |
| 222 ctx_(mrd->deadline_, mrd->request_metadata_.metadata, |
| 223 mrd->request_metadata_.count), |
| 224 has_request_payload_(mrd->has_request_payload_), |
| 225 request_payload_(mrd->request_payload_), |
| 226 method_(mrd->method_) { |
| 227 ctx_.set_call(mrd->call_); |
| 228 ctx_.cq_ = &cq_; |
| 229 GPR_ASSERT(mrd->in_flight_); |
| 230 mrd->in_flight_ = false; |
| 231 mrd->request_metadata_.count = 0; |
| 232 } |
| 233 |
| 234 ~CallData() { |
| 235 if (has_request_payload_ && request_payload_) { |
| 236 grpc_byte_buffer_destroy(request_payload_); |
| 237 } |
| 238 } |
| 239 |
| 240 void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { |
| 241 ctx_.BeginCompletionOp(&call_); |
| 242 global_callbacks->PreSynchronousRequest(&ctx_); |
| 243 method_->handler()->RunHandler(MethodHandler::HandlerParameter( |
| 244 &call_, &ctx_, request_payload_, call_.max_message_size())); |
| 245 global_callbacks->PostSynchronousRequest(&ctx_); |
| 246 request_payload_ = nullptr; |
| 247 void* ignored_tag; |
| 248 bool ignored_ok; |
| 249 cq_.Shutdown(); |
| 250 GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false); |
| 251 } |
| 252 |
| 253 private: |
| 254 CompletionQueue cq_; |
| 255 Call call_; |
| 256 ServerContext ctx_; |
| 257 const bool has_request_payload_; |
| 258 grpc_byte_buffer* request_payload_; |
| 259 RpcServiceMethod* const method_; |
| 260 }; |
| 261 |
| 262 private: |
| 263 RpcServiceMethod* const method_; |
| 264 void* const tag_; |
| 265 bool in_flight_; |
| 266 const bool has_request_payload_; |
| 267 grpc_call* call_; |
| 268 grpc_call_details* call_details_; |
| 269 gpr_timespec deadline_; |
| 270 grpc_metadata_array request_metadata_; |
| 271 grpc_byte_buffer* request_payload_; |
| 272 grpc_completion_queue* cq_; |
| 273 }; |
| 274 |
| 275 static internal::GrpcLibraryInitializer g_gli_initializer; |
| 276 Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, |
| 277 int max_message_size, ChannelArguments* args) |
| 278 : max_message_size_(max_message_size), |
| 279 started_(false), |
| 280 shutdown_(false), |
| 281 num_running_cb_(0), |
| 282 sync_methods_(new std::list<SyncRequest>), |
| 283 has_generic_service_(false), |
| 284 server_(nullptr), |
| 285 thread_pool_(thread_pool), |
| 286 thread_pool_owned_(thread_pool_owned) { |
| 287 g_gli_initializer.summon(); |
| 288 gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); |
| 289 global_callbacks_ = g_callbacks; |
| 290 global_callbacks_->UpdateArguments(args); |
| 291 grpc_channel_args channel_args; |
| 292 args->SetChannelArgs(&channel_args); |
| 293 server_ = grpc_server_create(&channel_args, nullptr); |
| 294 grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); |
| 295 } |
| 296 |
| 297 Server::~Server() { |
| 298 { |
| 299 grpc::unique_lock<grpc::mutex> lock(mu_); |
| 300 if (started_ && !shutdown_) { |
| 301 lock.unlock(); |
| 302 Shutdown(); |
| 303 } else if (!started_) { |
| 304 cq_.Shutdown(); |
| 305 } |
| 306 } |
| 307 void* got_tag; |
| 308 bool ok; |
| 309 GPR_ASSERT(!cq_.Next(&got_tag, &ok)); |
| 310 grpc_server_destroy(server_); |
| 311 if (thread_pool_owned_) { |
| 312 delete thread_pool_; |
| 313 } |
| 314 delete sync_methods_; |
| 315 } |
| 316 |
| 317 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { |
| 318 GPR_ASSERT(g_callbacks == nullptr); |
| 319 GPR_ASSERT(callbacks != nullptr); |
| 320 g_callbacks.reset(callbacks); |
| 321 } |
| 322 |
| 323 bool Server::RegisterService(const grpc::string* host, Service* service) { |
| 324 bool has_async_methods = service->has_async_methods(); |
| 325 if (has_async_methods) { |
| 326 GPR_ASSERT(service->server_ == nullptr && |
| 327 "Can only register an asynchronous service against one server."); |
| 328 service->server_ = this; |
| 329 } |
| 330 for (auto it = service->methods_.begin(); it != service->methods_.end(); |
| 331 ++it) { |
| 332 if (it->get() == nullptr) { // Handled by generic service if any. |
| 333 continue; |
| 334 } |
| 335 RpcServiceMethod* method = it->get(); |
| 336 void* tag = grpc_server_register_method(server_, method->name(), |
| 337 host ? host->c_str() : nullptr); |
| 338 if (tag == nullptr) { |
| 339 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", |
| 340 method->name()); |
| 341 return false; |
| 342 } |
| 343 if (method->handler() == nullptr) { |
| 344 method->set_server_tag(tag); |
| 345 } else { |
| 346 sync_methods_->emplace_back(method, tag); |
| 347 } |
| 348 } |
| 349 return true; |
| 350 } |
| 351 |
| 352 void Server::RegisterAsyncGenericService(AsyncGenericService* service) { |
| 353 GPR_ASSERT(service->server_ == nullptr && |
| 354 "Can only register an async generic service against one server."); |
| 355 service->server_ = this; |
| 356 has_generic_service_ = true; |
| 357 } |
| 358 |
| 359 int Server::AddListeningPort(const grpc::string& addr, |
| 360 ServerCredentials* creds) { |
| 361 GPR_ASSERT(!started_); |
| 362 return creds->AddPortToServer(addr, server_); |
| 363 } |
| 364 |
| 365 bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { |
| 366 GPR_ASSERT(!started_); |
| 367 started_ = true; |
| 368 grpc_server_start(server_); |
| 369 |
| 370 if (!has_generic_service_) { |
| 371 if (!sync_methods_->empty()) { |
| 372 unknown_method_.reset(new RpcServiceMethod( |
| 373 "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); |
| 374 // Use of emplace_back with just constructor arguments is not accepted |
| 375 // here by gcc-4.4 because it can't match the anonymous nullptr with a |
| 376 // proper constructor implicitly. Construct the object and use push_back. |
| 377 sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); |
| 378 } |
| 379 for (size_t i = 0; i < num_cqs; i++) { |
| 380 new UnimplementedAsyncRequest(this, cqs[i]); |
| 381 } |
| 382 } |
| 383 // Start processing rpcs. |
| 384 if (!sync_methods_->empty()) { |
| 385 for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { |
| 386 m->SetupRequest(); |
| 387 m->Request(server_, cq_.cq()); |
| 388 } |
| 389 |
| 390 ScheduleCallback(); |
| 391 } |
| 392 |
| 393 return true; |
| 394 } |
| 395 |
| 396 void Server::ShutdownInternal(gpr_timespec deadline) { |
| 397 grpc::unique_lock<grpc::mutex> lock(mu_); |
| 398 if (started_ && !shutdown_) { |
| 399 shutdown_ = true; |
| 400 grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); |
| 401 cq_.Shutdown(); |
| 402 lock.unlock(); |
| 403 // Spin, eating requests until the completion queue is completely shutdown. |
| 404 // If the deadline expires then cancel anything that's pending and keep |
| 405 // spinning forever until the work is actually drained. |
| 406 // Since nothing else needs to touch state guarded by mu_, holding it |
| 407 // through this loop is fine. |
| 408 SyncRequest* request; |
| 409 bool ok; |
| 410 while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { |
| 411 if (request == NULL) { // deadline expired |
| 412 grpc_server_cancel_all_calls(server_); |
| 413 deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); |
| 414 } else if (ok) { |
| 415 SyncRequest::CallData call_data(this, request); |
| 416 } |
| 417 } |
| 418 lock.lock(); |
| 419 |
| 420 // Wait for running callbacks to finish. |
| 421 while (num_running_cb_ != 0) { |
| 422 callback_cv_.wait(lock); |
| 423 } |
| 424 } |
| 425 } |
| 426 |
| 427 void Server::Wait() { |
| 428 grpc::unique_lock<grpc::mutex> lock(mu_); |
| 429 while (num_running_cb_ != 0) { |
| 430 callback_cv_.wait(lock); |
| 431 } |
| 432 } |
| 433 |
| 434 void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { |
| 435 static const size_t MAX_OPS = 8; |
| 436 size_t nops = 0; |
| 437 grpc_op cops[MAX_OPS]; |
| 438 ops->FillOps(cops, &nops); |
| 439 auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); |
| 440 GPR_ASSERT(GRPC_CALL_OK == result); |
| 441 } |
| 442 |
| 443 ServerInterface::BaseAsyncRequest::BaseAsyncRequest( |
| 444 ServerInterface* server, ServerContext* context, |
| 445 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, |
| 446 bool delete_on_finalize) |
| 447 : server_(server), |
| 448 context_(context), |
| 449 stream_(stream), |
| 450 call_cq_(call_cq), |
| 451 tag_(tag), |
| 452 delete_on_finalize_(delete_on_finalize), |
| 453 call_(nullptr) { |
| 454 memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); |
| 455 } |
| 456 |
| 457 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, |
| 458 bool* status) { |
| 459 if (*status) { |
| 460 for (size_t i = 0; i < initial_metadata_array_.count; i++) { |
| 461 context_->client_metadata_.insert( |
| 462 std::pair<grpc::string_ref, grpc::string_ref>( |
| 463 initial_metadata_array_.metadata[i].key, |
| 464 grpc::string_ref( |
| 465 initial_metadata_array_.metadata[i].value, |
| 466 initial_metadata_array_.metadata[i].value_length))); |
| 467 } |
| 468 } |
| 469 grpc_metadata_array_destroy(&initial_metadata_array_); |
| 470 context_->set_call(call_); |
| 471 context_->cq_ = call_cq_; |
| 472 Call call(call_, server_, call_cq_, server_->max_message_size()); |
| 473 if (*status && call_) { |
| 474 context_->BeginCompletionOp(&call); |
| 475 } |
| 476 // just the pointers inside call are copied here |
| 477 stream_->BindCall(&call); |
| 478 *tag = tag_; |
| 479 if (delete_on_finalize_) { |
| 480 delete this; |
| 481 } |
| 482 return true; |
| 483 } |
| 484 |
| 485 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( |
| 486 ServerInterface* server, ServerContext* context, |
| 487 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) |
| 488 : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} |
| 489 |
| 490 void ServerInterface::RegisteredAsyncRequest::IssueRequest( |
| 491 void* registered_method, grpc_byte_buffer** payload, |
| 492 ServerCompletionQueue* notification_cq) { |
| 493 grpc_server_request_registered_call( |
| 494 server_->server(), registered_method, &call_, &context_->deadline_, |
| 495 &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(), |
| 496 this); |
| 497 } |
| 498 |
| 499 ServerInterface::GenericAsyncRequest::GenericAsyncRequest( |
| 500 ServerInterface* server, GenericServerContext* context, |
| 501 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, |
| 502 ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) |
| 503 : BaseAsyncRequest(server, context, stream, call_cq, tag, |
| 504 delete_on_finalize) { |
| 505 grpc_call_details_init(&call_details_); |
| 506 GPR_ASSERT(notification_cq); |
| 507 GPR_ASSERT(call_cq); |
| 508 grpc_server_request_call(server->server(), &call_, &call_details_, |
| 509 &initial_metadata_array_, call_cq->cq(), |
| 510 notification_cq->cq(), this); |
| 511 } |
| 512 |
| 513 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, |
| 514 bool* status) { |
| 515 // TODO(yangg) remove the copy here. |
| 516 if (*status) { |
| 517 static_cast<GenericServerContext*>(context_)->method_ = |
| 518 call_details_.method; |
| 519 static_cast<GenericServerContext*>(context_)->host_ = call_details_.host; |
| 520 } |
| 521 gpr_free(call_details_.method); |
| 522 gpr_free(call_details_.host); |
| 523 return BaseAsyncRequest::FinalizeResult(tag, status); |
| 524 } |
| 525 |
| 526 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, |
| 527 bool* status) { |
| 528 if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { |
| 529 new UnimplementedAsyncRequest(server_, cq_); |
| 530 new UnimplementedAsyncResponse(this); |
| 531 } else { |
| 532 delete this; |
| 533 } |
| 534 return false; |
| 535 } |
| 536 |
| 537 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( |
| 538 UnimplementedAsyncRequest* request) |
| 539 : request_(request) { |
| 540 Status status(StatusCode::UNIMPLEMENTED, ""); |
| 541 UnknownMethodHandler::FillOps(request_->context(), this); |
| 542 request_->stream()->call_.PerformOps(this); |
| 543 } |
| 544 |
| 545 void Server::ScheduleCallback() { |
| 546 { |
| 547 grpc::unique_lock<grpc::mutex> lock(mu_); |
| 548 num_running_cb_++; |
| 549 } |
| 550 thread_pool_->Add(std::bind(&Server::RunRpc, this)); |
| 551 } |
| 552 |
| 553 void Server::RunRpc() { |
| 554 // Wait for one more incoming rpc. |
| 555 bool ok; |
| 556 GPR_TIMER_SCOPE("Server::RunRpc", 0); |
| 557 auto* mrd = SyncRequest::Wait(&cq_, &ok); |
| 558 if (mrd) { |
| 559 ScheduleCallback(); |
| 560 if (ok) { |
| 561 SyncRequest::CallData cd(this, mrd); |
| 562 { |
| 563 mrd->SetupRequest(); |
| 564 grpc::unique_lock<grpc::mutex> lock(mu_); |
| 565 if (!shutdown_) { |
| 566 mrd->Request(server_, cq_.cq()); |
| 567 } else { |
| 568 // destroy the structure that was created |
| 569 mrd->TeardownRequest(); |
| 570 } |
| 571 } |
| 572 GPR_TIMER_SCOPE("cd.Run()", 0); |
| 573 cd.Run(global_callbacks_); |
| 574 } |
| 575 } |
| 576 |
| 577 { |
| 578 grpc::unique_lock<grpc::mutex> lock(mu_); |
| 579 num_running_cb_--; |
| 580 if (shutdown_) { |
| 581 callback_cv_.notify_all(); |
| 582 } |
| 583 } |
| 584 } |
| 585 |
| 586 } // namespace grpc |
OLD | NEW |