Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(636)

Side by Side Diff: third_party/grpc/src/cpp/server/server.cc

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <grpc++/server.h>
35
36 #include <utility>
37
38 #include <grpc++/completion_queue.h>
39 #include <grpc++/generic/async_generic_service.h>
40 #include <grpc++/impl/codegen/completion_queue_tag.h>
41 #include <grpc++/impl/grpc_library.h>
42 #include <grpc++/impl/method_handler_impl.h>
43 #include <grpc++/impl/rpc_service_method.h>
44 #include <grpc++/impl/service_type.h>
45 #include <grpc++/security/server_credentials.h>
46 #include <grpc++/server_context.h>
47 #include <grpc++/support/time.h>
48 #include <grpc/grpc.h>
49 #include <grpc/support/alloc.h>
50 #include <grpc/support/log.h>
51
52 #include "src/core/profiling/timers.h"
53 #include "src/cpp/server/thread_pool_interface.h"
54
55 namespace grpc {
56
57 class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks {
58 public:
59 ~DefaultGlobalCallbacks() GRPC_OVERRIDE {}
60 void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
61 void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
62 };
63
64 static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
65 static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
66
67 static void InitGlobalCallbacks() {
68 if (g_callbacks == nullptr) {
69 g_callbacks.reset(new DefaultGlobalCallbacks());
70 }
71 }
72
73 class Server::UnimplementedAsyncRequestContext {
74 protected:
75 UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
76
77 GenericServerContext server_context_;
78 GenericServerAsyncReaderWriter generic_stream_;
79 };
80
81 class Server::UnimplementedAsyncRequest GRPC_FINAL
82 : public UnimplementedAsyncRequestContext,
83 public GenericAsyncRequest {
84 public:
85 UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
86 : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
87 NULL, false),
88 server_(server),
89 cq_(cq) {}
90
91 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
92
93 ServerContext* context() { return &server_context_; }
94 GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
95
96 private:
97 Server* const server_;
98 ServerCompletionQueue* const cq_;
99 };
100
101 typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
102 UnimplementedAsyncResponseOp;
103 class Server::UnimplementedAsyncResponse GRPC_FINAL
104 : public UnimplementedAsyncResponseOp {
105 public:
106 UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
107 ~UnimplementedAsyncResponse() { delete request_; }
108
109 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
110 bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
111 delete this;
112 return r;
113 }
114
115 private:
116 UnimplementedAsyncRequest* const request_;
117 };
118
119 class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
120 public:
121 bool FinalizeResult(void** tag, bool* status) {
122 delete this;
123 return false;
124 }
125 };
126
127 class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
128 public:
129 SyncRequest(RpcServiceMethod* method, void* tag)
130 : method_(method),
131 tag_(tag),
132 in_flight_(false),
133 has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
134 method->method_type() ==
135 RpcMethod::SERVER_STREAMING),
136 call_details_(nullptr),
137 cq_(nullptr) {
138 grpc_metadata_array_init(&request_metadata_);
139 }
140
141 ~SyncRequest() {
142 if (call_details_) {
143 delete call_details_;
144 }
145 grpc_metadata_array_destroy(&request_metadata_);
146 }
147
148 static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
149 void* tag = nullptr;
150 *ok = false;
151 if (!cq->Next(&tag, ok)) {
152 return nullptr;
153 }
154 auto* mrd = static_cast<SyncRequest*>(tag);
155 GPR_ASSERT(mrd->in_flight_);
156 return mrd;
157 }
158
159 static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
160 gpr_timespec deadline) {
161 void* tag = nullptr;
162 *ok = false;
163 switch (cq->AsyncNext(&tag, ok, deadline)) {
164 case CompletionQueue::TIMEOUT:
165 *req = nullptr;
166 return true;
167 case CompletionQueue::SHUTDOWN:
168 *req = nullptr;
169 return false;
170 case CompletionQueue::GOT_EVENT:
171 *req = static_cast<SyncRequest*>(tag);
172 GPR_ASSERT((*req)->in_flight_);
173 return true;
174 }
175 GPR_UNREACHABLE_CODE(return false);
176 }
177
178 void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
179
180 void TeardownRequest() {
181 grpc_completion_queue_destroy(cq_);
182 cq_ = nullptr;
183 }
184
185 void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
186 GPR_ASSERT(cq_ && !in_flight_);
187 in_flight_ = true;
188 if (tag_) {
189 GPR_ASSERT(GRPC_CALL_OK ==
190 grpc_server_request_registered_call(
191 server, tag_, &call_, &deadline_, &request_metadata_,
192 has_request_payload_ ? &request_payload_ : nullptr, cq_,
193 notify_cq, this));
194 } else {
195 if (!call_details_) {
196 call_details_ = new grpc_call_details;
197 grpc_call_details_init(call_details_);
198 }
199 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
200 server, &call_, call_details_,
201 &request_metadata_, cq_, notify_cq, this));
202 }
203 }
204
205 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
206 if (!*status) {
207 grpc_completion_queue_destroy(cq_);
208 }
209 if (call_details_) {
210 deadline_ = call_details_->deadline;
211 grpc_call_details_destroy(call_details_);
212 grpc_call_details_init(call_details_);
213 }
214 return true;
215 }
216
217 class CallData GRPC_FINAL {
218 public:
219 explicit CallData(Server* server, SyncRequest* mrd)
220 : cq_(mrd->cq_),
221 call_(mrd->call_, server, &cq_, server->max_message_size_),
222 ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
223 mrd->request_metadata_.count),
224 has_request_payload_(mrd->has_request_payload_),
225 request_payload_(mrd->request_payload_),
226 method_(mrd->method_) {
227 ctx_.set_call(mrd->call_);
228 ctx_.cq_ = &cq_;
229 GPR_ASSERT(mrd->in_flight_);
230 mrd->in_flight_ = false;
231 mrd->request_metadata_.count = 0;
232 }
233
234 ~CallData() {
235 if (has_request_payload_ && request_payload_) {
236 grpc_byte_buffer_destroy(request_payload_);
237 }
238 }
239
240 void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
241 ctx_.BeginCompletionOp(&call_);
242 global_callbacks->PreSynchronousRequest(&ctx_);
243 method_->handler()->RunHandler(MethodHandler::HandlerParameter(
244 &call_, &ctx_, request_payload_, call_.max_message_size()));
245 global_callbacks->PostSynchronousRequest(&ctx_);
246 request_payload_ = nullptr;
247 void* ignored_tag;
248 bool ignored_ok;
249 cq_.Shutdown();
250 GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
251 }
252
253 private:
254 CompletionQueue cq_;
255 Call call_;
256 ServerContext ctx_;
257 const bool has_request_payload_;
258 grpc_byte_buffer* request_payload_;
259 RpcServiceMethod* const method_;
260 };
261
262 private:
263 RpcServiceMethod* const method_;
264 void* const tag_;
265 bool in_flight_;
266 const bool has_request_payload_;
267 grpc_call* call_;
268 grpc_call_details* call_details_;
269 gpr_timespec deadline_;
270 grpc_metadata_array request_metadata_;
271 grpc_byte_buffer* request_payload_;
272 grpc_completion_queue* cq_;
273 };
274
275 static internal::GrpcLibraryInitializer g_gli_initializer;
276 Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
277 int max_message_size, ChannelArguments* args)
278 : max_message_size_(max_message_size),
279 started_(false),
280 shutdown_(false),
281 num_running_cb_(0),
282 sync_methods_(new std::list<SyncRequest>),
283 has_generic_service_(false),
284 server_(nullptr),
285 thread_pool_(thread_pool),
286 thread_pool_owned_(thread_pool_owned) {
287 g_gli_initializer.summon();
288 gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
289 global_callbacks_ = g_callbacks;
290 global_callbacks_->UpdateArguments(args);
291 grpc_channel_args channel_args;
292 args->SetChannelArgs(&channel_args);
293 server_ = grpc_server_create(&channel_args, nullptr);
294 grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
295 }
296
297 Server::~Server() {
298 {
299 grpc::unique_lock<grpc::mutex> lock(mu_);
300 if (started_ && !shutdown_) {
301 lock.unlock();
302 Shutdown();
303 } else if (!started_) {
304 cq_.Shutdown();
305 }
306 }
307 void* got_tag;
308 bool ok;
309 GPR_ASSERT(!cq_.Next(&got_tag, &ok));
310 grpc_server_destroy(server_);
311 if (thread_pool_owned_) {
312 delete thread_pool_;
313 }
314 delete sync_methods_;
315 }
316
317 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
318 GPR_ASSERT(g_callbacks == nullptr);
319 GPR_ASSERT(callbacks != nullptr);
320 g_callbacks.reset(callbacks);
321 }
322
323 bool Server::RegisterService(const grpc::string* host, Service* service) {
324 bool has_async_methods = service->has_async_methods();
325 if (has_async_methods) {
326 GPR_ASSERT(service->server_ == nullptr &&
327 "Can only register an asynchronous service against one server.");
328 service->server_ = this;
329 }
330 for (auto it = service->methods_.begin(); it != service->methods_.end();
331 ++it) {
332 if (it->get() == nullptr) { // Handled by generic service if any.
333 continue;
334 }
335 RpcServiceMethod* method = it->get();
336 void* tag = grpc_server_register_method(server_, method->name(),
337 host ? host->c_str() : nullptr);
338 if (tag == nullptr) {
339 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
340 method->name());
341 return false;
342 }
343 if (method->handler() == nullptr) {
344 method->set_server_tag(tag);
345 } else {
346 sync_methods_->emplace_back(method, tag);
347 }
348 }
349 return true;
350 }
351
352 void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
353 GPR_ASSERT(service->server_ == nullptr &&
354 "Can only register an async generic service against one server.");
355 service->server_ = this;
356 has_generic_service_ = true;
357 }
358
359 int Server::AddListeningPort(const grpc::string& addr,
360 ServerCredentials* creds) {
361 GPR_ASSERT(!started_);
362 return creds->AddPortToServer(addr, server_);
363 }
364
365 bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
366 GPR_ASSERT(!started_);
367 started_ = true;
368 grpc_server_start(server_);
369
370 if (!has_generic_service_) {
371 if (!sync_methods_->empty()) {
372 unknown_method_.reset(new RpcServiceMethod(
373 "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
374 // Use of emplace_back with just constructor arguments is not accepted
375 // here by gcc-4.4 because it can't match the anonymous nullptr with a
376 // proper constructor implicitly. Construct the object and use push_back.
377 sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
378 }
379 for (size_t i = 0; i < num_cqs; i++) {
380 new UnimplementedAsyncRequest(this, cqs[i]);
381 }
382 }
383 // Start processing rpcs.
384 if (!sync_methods_->empty()) {
385 for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
386 m->SetupRequest();
387 m->Request(server_, cq_.cq());
388 }
389
390 ScheduleCallback();
391 }
392
393 return true;
394 }
395
396 void Server::ShutdownInternal(gpr_timespec deadline) {
397 grpc::unique_lock<grpc::mutex> lock(mu_);
398 if (started_ && !shutdown_) {
399 shutdown_ = true;
400 grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
401 cq_.Shutdown();
402 lock.unlock();
403 // Spin, eating requests until the completion queue is completely shutdown.
404 // If the deadline expires then cancel anything that's pending and keep
405 // spinning forever until the work is actually drained.
406 // Since nothing else needs to touch state guarded by mu_, holding it
407 // through this loop is fine.
408 SyncRequest* request;
409 bool ok;
410 while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
411 if (request == NULL) { // deadline expired
412 grpc_server_cancel_all_calls(server_);
413 deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
414 } else if (ok) {
415 SyncRequest::CallData call_data(this, request);
416 }
417 }
418 lock.lock();
419
420 // Wait for running callbacks to finish.
421 while (num_running_cb_ != 0) {
422 callback_cv_.wait(lock);
423 }
424 }
425 }
426
427 void Server::Wait() {
428 grpc::unique_lock<grpc::mutex> lock(mu_);
429 while (num_running_cb_ != 0) {
430 callback_cv_.wait(lock);
431 }
432 }
433
434 void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
435 static const size_t MAX_OPS = 8;
436 size_t nops = 0;
437 grpc_op cops[MAX_OPS];
438 ops->FillOps(cops, &nops);
439 auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
440 GPR_ASSERT(GRPC_CALL_OK == result);
441 }
442
443 ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
444 ServerInterface* server, ServerContext* context,
445 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
446 bool delete_on_finalize)
447 : server_(server),
448 context_(context),
449 stream_(stream),
450 call_cq_(call_cq),
451 tag_(tag),
452 delete_on_finalize_(delete_on_finalize),
453 call_(nullptr) {
454 memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
455 }
456
457 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
458 bool* status) {
459 if (*status) {
460 for (size_t i = 0; i < initial_metadata_array_.count; i++) {
461 context_->client_metadata_.insert(
462 std::pair<grpc::string_ref, grpc::string_ref>(
463 initial_metadata_array_.metadata[i].key,
464 grpc::string_ref(
465 initial_metadata_array_.metadata[i].value,
466 initial_metadata_array_.metadata[i].value_length)));
467 }
468 }
469 grpc_metadata_array_destroy(&initial_metadata_array_);
470 context_->set_call(call_);
471 context_->cq_ = call_cq_;
472 Call call(call_, server_, call_cq_, server_->max_message_size());
473 if (*status && call_) {
474 context_->BeginCompletionOp(&call);
475 }
476 // just the pointers inside call are copied here
477 stream_->BindCall(&call);
478 *tag = tag_;
479 if (delete_on_finalize_) {
480 delete this;
481 }
482 return true;
483 }
484
485 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
486 ServerInterface* server, ServerContext* context,
487 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
488 : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
489
490 void ServerInterface::RegisteredAsyncRequest::IssueRequest(
491 void* registered_method, grpc_byte_buffer** payload,
492 ServerCompletionQueue* notification_cq) {
493 grpc_server_request_registered_call(
494 server_->server(), registered_method, &call_, &context_->deadline_,
495 &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
496 this);
497 }
498
499 ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
500 ServerInterface* server, GenericServerContext* context,
501 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
502 ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
503 : BaseAsyncRequest(server, context, stream, call_cq, tag,
504 delete_on_finalize) {
505 grpc_call_details_init(&call_details_);
506 GPR_ASSERT(notification_cq);
507 GPR_ASSERT(call_cq);
508 grpc_server_request_call(server->server(), &call_, &call_details_,
509 &initial_metadata_array_, call_cq->cq(),
510 notification_cq->cq(), this);
511 }
512
513 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
514 bool* status) {
515 // TODO(yangg) remove the copy here.
516 if (*status) {
517 static_cast<GenericServerContext*>(context_)->method_ =
518 call_details_.method;
519 static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
520 }
521 gpr_free(call_details_.method);
522 gpr_free(call_details_.host);
523 return BaseAsyncRequest::FinalizeResult(tag, status);
524 }
525
526 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
527 bool* status) {
528 if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
529 new UnimplementedAsyncRequest(server_, cq_);
530 new UnimplementedAsyncResponse(this);
531 } else {
532 delete this;
533 }
534 return false;
535 }
536
537 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
538 UnimplementedAsyncRequest* request)
539 : request_(request) {
540 Status status(StatusCode::UNIMPLEMENTED, "");
541 UnknownMethodHandler::FillOps(request_->context(), this);
542 request_->stream()->call_.PerformOps(this);
543 }
544
545 void Server::ScheduleCallback() {
546 {
547 grpc::unique_lock<grpc::mutex> lock(mu_);
548 num_running_cb_++;
549 }
550 thread_pool_->Add(std::bind(&Server::RunRpc, this));
551 }
552
553 void Server::RunRpc() {
554 // Wait for one more incoming rpc.
555 bool ok;
556 GPR_TIMER_SCOPE("Server::RunRpc", 0);
557 auto* mrd = SyncRequest::Wait(&cq_, &ok);
558 if (mrd) {
559 ScheduleCallback();
560 if (ok) {
561 SyncRequest::CallData cd(this, mrd);
562 {
563 mrd->SetupRequest();
564 grpc::unique_lock<grpc::mutex> lock(mu_);
565 if (!shutdown_) {
566 mrd->Request(server_, cq_.cq());
567 } else {
568 // destroy the structure that was created
569 mrd->TeardownRequest();
570 }
571 }
572 GPR_TIMER_SCOPE("cd.Run()", 0);
573 cd.Run(global_callbacks_);
574 }
575 }
576
577 {
578 grpc::unique_lock<grpc::mutex> lock(mu_);
579 num_running_cb_--;
580 if (shutdown_) {
581 callback_cv_.notify_all();
582 }
583 }
584 }
585
586 } // namespace grpc
OLDNEW
« no previous file with comments | « third_party/grpc/src/cpp/server/secure_server_credentials.cc ('k') | third_party/grpc/src/cpp/server/server_builder.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698