Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: third_party/grpc/src/core/surface/server.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include "src/core/surface/server.h"
35
36 #include <limits.h>
37 #include <stdlib.h>
38 #include <string.h>
39
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/string_util.h>
43 #include <grpc/support/useful.h>
44
45 #include "src/core/census/grpc_filter.h"
46 #include "src/core/channel/channel_args.h"
47 #include "src/core/channel/connected_channel.h"
48 #include "src/core/iomgr/iomgr.h"
49 #include "src/core/support/stack_lockfree.h"
50 #include "src/core/support/string.h"
51 #include "src/core/surface/api_trace.h"
52 #include "src/core/surface/call.h"
53 #include "src/core/surface/channel.h"
54 #include "src/core/surface/completion_queue.h"
55 #include "src/core/surface/init.h"
56 #include "src/core/transport/metadata.h"
57 #include "src/core/transport/static_metadata.h"
58
59 typedef struct listener {
60 void *arg;
61 void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
62 grpc_pollset **pollsets, size_t pollset_count);
63 void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
64 grpc_closure *closure);
65 struct listener *next;
66 grpc_closure destroy_done;
67 } listener;
68
69 typedef struct call_data call_data;
70 typedef struct channel_data channel_data;
71 typedef struct registered_method registered_method;
72
73 typedef struct {
74 call_data *next;
75 call_data *prev;
76 } call_link;
77
78 typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
79
80 typedef struct requested_call {
81 requested_call_type type;
82 void *tag;
83 grpc_server *server;
84 grpc_completion_queue *cq_bound_to_call;
85 grpc_completion_queue *cq_for_notification;
86 grpc_call **call;
87 grpc_cq_completion completion;
88 grpc_metadata_array *initial_metadata;
89 union {
90 struct {
91 grpc_call_details *details;
92 } batch;
93 struct {
94 registered_method *registered_method;
95 gpr_timespec *deadline;
96 grpc_byte_buffer **optional_payload;
97 } registered;
98 } data;
99 grpc_closure publish;
100 } requested_call;
101
102 typedef struct channel_registered_method {
103 registered_method *server_registered_method;
104 grpc_mdstr *method;
105 grpc_mdstr *host;
106 } channel_registered_method;
107
108 struct channel_data {
109 grpc_server *server;
110 grpc_connectivity_state connectivity_state;
111 grpc_channel *channel;
112 /* linked list of all channels on a server */
113 channel_data *next;
114 channel_data *prev;
115 channel_registered_method *registered_methods;
116 uint32_t registered_method_slots;
117 uint32_t registered_method_max_probes;
118 grpc_closure finish_destroy_channel_closure;
119 grpc_closure channel_connectivity_changed;
120 };
121
122 typedef struct shutdown_tag {
123 void *tag;
124 grpc_completion_queue *cq;
125 grpc_cq_completion completion;
126 } shutdown_tag;
127
128 typedef enum {
129 /* waiting for metadata */
130 NOT_STARTED,
131 /* inital metadata read, not flow controlled in yet */
132 PENDING,
133 /* flow controlled in, on completion queue */
134 ACTIVATED,
135 /* cancelled before being queued */
136 ZOMBIED
137 } call_state;
138
139 typedef struct request_matcher request_matcher;
140
141 struct call_data {
142 grpc_call *call;
143
144 /** protects state */
145 gpr_mu mu_state;
146 /** the current state of a call - see call_state */
147 call_state state;
148
149 grpc_mdstr *path;
150 grpc_mdstr *host;
151 gpr_timespec deadline;
152
153 grpc_completion_queue *cq_new;
154
155 grpc_metadata_batch *recv_initial_metadata;
156 grpc_metadata_array initial_metadata;
157
158 grpc_closure got_initial_metadata;
159 grpc_closure server_on_recv_initial_metadata;
160 grpc_closure kill_zombie_closure;
161 grpc_closure *on_done_recv_initial_metadata;
162
163 call_data *pending_next;
164 };
165
166 struct request_matcher {
167 call_data *pending_head;
168 call_data *pending_tail;
169 gpr_stack_lockfree *requests;
170 };
171
172 struct registered_method {
173 char *method;
174 char *host;
175 request_matcher request_matcher;
176 registered_method *next;
177 };
178
179 typedef struct {
180 grpc_channel **channels;
181 size_t num_channels;
182 } channel_broadcaster;
183
184 struct grpc_server {
185 size_t channel_filter_count;
186 grpc_channel_filter const **channel_filters;
187 grpc_channel_args *channel_args;
188
189 grpc_completion_queue **cqs;
190 grpc_pollset **pollsets;
191 size_t cq_count;
192
193 /* The two following mutexes control access to server-state
194 mu_global controls access to non-call-related state (e.g., channel state)
195 mu_call controls access to call-related state (e.g., the call lists)
196
197 If they are ever required to be nested, you must lock mu_global
198 before mu_call. This is currently used in shutdown processing
199 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
200 gpr_mu mu_global; /* mutex for server and channel state */
201 gpr_mu mu_call; /* mutex for call-specific state */
202
203 registered_method *registered_methods;
204 request_matcher unregistered_request_matcher;
205 /** free list of available requested_calls indices */
206 gpr_stack_lockfree *request_freelist;
207 /** requested call backing data */
208 requested_call *requested_calls;
209 size_t max_requested_calls;
210
211 gpr_atm shutdown_flag;
212 uint8_t shutdown_published;
213 size_t num_shutdown_tags;
214 shutdown_tag *shutdown_tags;
215
216 channel_data root_channel_data;
217
218 listener *listeners;
219 int listeners_destroyed;
220 gpr_refcount internal_refcount;
221
222 /** when did we print the last shutdown progress message */
223 gpr_timespec last_shutdown_message_time;
224 };
225
226 #define SERVER_FROM_CALL_ELEM(elem) \
227 (((channel_data *)(elem)->channel_data)->server)
228
229 static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
230 call_data *calld, requested_call *rc);
231 static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
232 requested_call *rc);
233 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
234 hold mu_call */
235 static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
236
237 /*
238 * channel broadcaster
239 */
240
241 /* assumes server locked */
242 static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
243 channel_data *c;
244 size_t count = 0;
245 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
246 count++;
247 }
248 cb->num_channels = count;
249 cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
250 count = 0;
251 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
252 cb->channels[count++] = c->channel;
253 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
254 }
255 }
256
257 struct shutdown_cleanup_args {
258 grpc_closure closure;
259 gpr_slice slice;
260 };
261
262 static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
263 bool iomgr_status_ignored) {
264 struct shutdown_cleanup_args *a = arg;
265 gpr_slice_unref(a->slice);
266 gpr_free(a);
267 }
268
269 static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
270 int send_goaway, int send_disconnect) {
271 grpc_transport_op op;
272 struct shutdown_cleanup_args *sc;
273 grpc_channel_element *elem;
274
275 memset(&op, 0, sizeof(op));
276 op.send_goaway = send_goaway;
277 sc = gpr_malloc(sizeof(*sc));
278 sc->slice = gpr_slice_from_copied_string("Server shutdown");
279 op.goaway_message = &sc->slice;
280 op.goaway_status = GRPC_STATUS_OK;
281 op.disconnect = send_disconnect;
282 grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
283 op.on_consumed = &sc->closure;
284
285 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
286 elem->filter->start_transport_op(exec_ctx, elem, &op);
287 }
288
289 static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
290 channel_broadcaster *cb,
291 int send_goaway,
292 int force_disconnect) {
293 size_t i;
294
295 for (i = 0; i < cb->num_channels; i++) {
296 send_shutdown(exec_ctx, cb->channels[i], send_goaway, force_disconnect);
297 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast");
298 }
299 gpr_free(cb->channels);
300 }
301
302 /*
303 * request_matcher
304 */
305
306 static void request_matcher_init(request_matcher *rm, size_t entries) {
307 memset(rm, 0, sizeof(*rm));
308 rm->requests = gpr_stack_lockfree_create(entries);
309 }
310
311 static void request_matcher_destroy(request_matcher *rm) {
312 GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1);
313 gpr_stack_lockfree_destroy(rm->requests);
314 }
315
316 static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) {
317 grpc_call_destroy(grpc_call_from_top_element(elem));
318 }
319
320 static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
321 request_matcher *rm) {
322 while (rm->pending_head) {
323 call_data *calld = rm->pending_head;
324 rm->pending_head = calld->pending_next;
325 gpr_mu_lock(&calld->mu_state);
326 calld->state = ZOMBIED;
327 gpr_mu_unlock(&calld->mu_state);
328 grpc_closure_init(
329 &calld->kill_zombie_closure, kill_zombie,
330 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
331 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
332 }
333 }
334
335 static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
336 grpc_server *server,
337 request_matcher *rm) {
338 int request_id;
339 while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
340 fail_call(exec_ctx, server, &server->requested_calls[request_id]);
341 }
342 }
343
344 /*
345 * server proper
346 */
347
348 static void server_ref(grpc_server *server) {
349 gpr_ref(&server->internal_refcount);
350 }
351
352 static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
353 registered_method *rm;
354 size_t i;
355 grpc_channel_args_destroy(server->channel_args);
356 gpr_mu_destroy(&server->mu_global);
357 gpr_mu_destroy(&server->mu_call);
358 gpr_free((void *)server->channel_filters);
359 while ((rm = server->registered_methods) != NULL) {
360 server->registered_methods = rm->next;
361 request_matcher_destroy(&rm->request_matcher);
362 gpr_free(rm->method);
363 gpr_free(rm->host);
364 gpr_free(rm);
365 }
366 for (i = 0; i < server->cq_count; i++) {
367 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
368 }
369 request_matcher_destroy(&server->unregistered_request_matcher);
370 gpr_stack_lockfree_destroy(server->request_freelist);
371 gpr_free(server->cqs);
372 gpr_free(server->pollsets);
373 gpr_free(server->shutdown_tags);
374 gpr_free(server->requested_calls);
375 gpr_free(server);
376 }
377
378 static void server_unref(grpc_exec_ctx *exec_ctx, grpc_server *server) {
379 if (gpr_unref(&server->internal_refcount)) {
380 server_delete(exec_ctx, server);
381 }
382 }
383
384 static int is_channel_orphaned(channel_data *chand) {
385 return chand->next == chand;
386 }
387
388 static void orphan_channel(channel_data *chand) {
389 chand->next->prev = chand->prev;
390 chand->prev->next = chand->next;
391 chand->next = chand->prev = chand;
392 }
393
394 static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
395 bool success) {
396 channel_data *chand = cd;
397 grpc_server *server = chand->server;
398 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
399 server_unref(exec_ctx, server);
400 }
401
402 static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
403 if (is_channel_orphaned(chand)) return;
404 GPR_ASSERT(chand->server != NULL);
405 orphan_channel(chand);
406 server_ref(chand->server);
407 maybe_finish_shutdown(exec_ctx, chand->server);
408 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
409 chand->finish_destroy_channel_closure.cb_arg = chand;
410
411 grpc_transport_op op;
412 memset(&op, 0, sizeof(op));
413 op.set_accept_stream = true;
414 op.on_consumed = &chand->finish_destroy_channel_closure;
415 grpc_channel_next_op(exec_ctx,
416 grpc_channel_stack_element(
417 grpc_channel_get_channel_stack(chand->channel), 0),
418 &op);
419 }
420
421 static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
422 grpc_call_element *elem, request_matcher *rm) {
423 call_data *calld = elem->call_data;
424 int request_id;
425
426 if (gpr_atm_acq_load(&server->shutdown_flag)) {
427 gpr_mu_lock(&calld->mu_state);
428 calld->state = ZOMBIED;
429 gpr_mu_unlock(&calld->mu_state);
430 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
431 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
432 return;
433 }
434
435 request_id = gpr_stack_lockfree_pop(rm->requests);
436 if (request_id == -1) {
437 gpr_mu_lock(&server->mu_call);
438 gpr_mu_lock(&calld->mu_state);
439 calld->state = PENDING;
440 gpr_mu_unlock(&calld->mu_state);
441 if (rm->pending_head == NULL) {
442 rm->pending_tail = rm->pending_head = calld;
443 } else {
444 rm->pending_tail->pending_next = calld;
445 rm->pending_tail = calld;
446 }
447 calld->pending_next = NULL;
448 gpr_mu_unlock(&server->mu_call);
449 } else {
450 gpr_mu_lock(&calld->mu_state);
451 calld->state = ACTIVATED;
452 gpr_mu_unlock(&calld->mu_state);
453 begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
454 }
455 }
456
457 static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
458 channel_data *chand = elem->channel_data;
459 call_data *calld = elem->call_data;
460 grpc_server *server = chand->server;
461 uint32_t i;
462 uint32_t hash;
463 channel_registered_method *rm;
464
465 if (chand->registered_methods && calld->path && calld->host) {
466 /* TODO(ctiller): unify these two searches */
467 /* check for an exact match with host */
468 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
469 for (i = 0; i <= chand->registered_method_max_probes; i++) {
470 rm = &chand->registered_methods[(hash + i) %
471 chand->registered_method_slots];
472 if (!rm) break;
473 if (rm->host != calld->host) continue;
474 if (rm->method != calld->path) continue;
475 finish_start_new_rpc(exec_ctx, server, elem,
476 &rm->server_registered_method->request_matcher);
477 return;
478 }
479 /* check for a wildcard method definition (no host set) */
480 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
481 for (i = 0; i <= chand->registered_method_max_probes; i++) {
482 rm = &chand->registered_methods[(hash + i) %
483 chand->registered_method_slots];
484 if (!rm) break;
485 if (rm->host != NULL) continue;
486 if (rm->method != calld->path) continue;
487 finish_start_new_rpc(exec_ctx, server, elem,
488 &rm->server_registered_method->request_matcher);
489 return;
490 }
491 }
492 finish_start_new_rpc(exec_ctx, server, elem,
493 &server->unregistered_request_matcher);
494 }
495
496 static int num_listeners(grpc_server *server) {
497 listener *l;
498 int n = 0;
499 for (l = server->listeners; l; l = l->next) {
500 n++;
501 }
502 return n;
503 }
504
505 static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
506 grpc_cq_completion *completion) {
507 server_unref(exec_ctx, server);
508 }
509
510 static int num_channels(grpc_server *server) {
511 channel_data *chand;
512 int n = 0;
513 for (chand = server->root_channel_data.next;
514 chand != &server->root_channel_data; chand = chand->next) {
515 n++;
516 }
517 return n;
518 }
519
520 static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
521 grpc_server *server) {
522 registered_method *rm;
523 request_matcher_kill_requests(exec_ctx, server,
524 &server->unregistered_request_matcher);
525 request_matcher_zombify_all_pending_calls(
526 exec_ctx, &server->unregistered_request_matcher);
527 for (rm = server->registered_methods; rm; rm = rm->next) {
528 request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
529 request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
530 }
531 }
532
533 static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
534 grpc_server *server) {
535 size_t i;
536 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
537 return;
538 }
539
540 kill_pending_work_locked(exec_ctx, server);
541
542 if (server->root_channel_data.next != &server->root_channel_data ||
543 server->listeners_destroyed < num_listeners(server)) {
544 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
545 server->last_shutdown_message_time),
546 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
547 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
548 gpr_log(GPR_DEBUG,
549 "Waiting for %d channels and %d/%d listeners to be destroyed"
550 " before shutting down server",
551 num_channels(server),
552 num_listeners(server) - server->listeners_destroyed,
553 num_listeners(server));
554 }
555 return;
556 }
557 server->shutdown_published = 1;
558 for (i = 0; i < server->num_shutdown_tags; i++) {
559 server_ref(server);
560 grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq,
561 server->shutdown_tags[i].tag, 1, done_shutdown_event, server,
562 &server->shutdown_tags[i].completion);
563 }
564 }
565
566 static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
567 grpc_call_element *elem = user_data;
568 call_data *calld = elem->call_data;
569 if (md->key == GRPC_MDSTR_PATH) {
570 calld->path = GRPC_MDSTR_REF(md->value);
571 return NULL;
572 } else if (md->key == GRPC_MDSTR_AUTHORITY) {
573 calld->host = GRPC_MDSTR_REF(md->value);
574 return NULL;
575 }
576 return md;
577 }
578
579 static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
580 bool success) {
581 grpc_call_element *elem = ptr;
582 call_data *calld = elem->call_data;
583 gpr_timespec op_deadline;
584
585 grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem);
586 op_deadline = calld->recv_initial_metadata->deadline;
587 if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
588 calld->deadline = op_deadline;
589 }
590 if (calld->host && calld->path) {
591 /* do nothing */
592 } else {
593 success = 0;
594 }
595
596 calld->on_done_recv_initial_metadata->cb(
597 exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success);
598 }
599
600 static void server_mutate_op(grpc_call_element *elem,
601 grpc_transport_stream_op *op) {
602 call_data *calld = elem->call_data;
603
604 if (op->recv_initial_metadata != NULL) {
605 calld->recv_initial_metadata = op->recv_initial_metadata;
606 calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
607 op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
608 }
609 }
610
611 static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
612 grpc_call_element *elem,
613 grpc_transport_stream_op *op) {
614 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
615 server_mutate_op(elem, op);
616 grpc_call_next_op(exec_ctx, elem, op);
617 }
618
619 static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
620 bool success) {
621 grpc_call_element *elem = ptr;
622 call_data *calld = elem->call_data;
623 if (success) {
624 start_new_rpc(exec_ctx, elem);
625 } else {
626 gpr_mu_lock(&calld->mu_state);
627 if (calld->state == NOT_STARTED) {
628 calld->state = ZOMBIED;
629 gpr_mu_unlock(&calld->mu_state);
630 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
631 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
632 } else if (calld->state == PENDING) {
633 calld->state = ZOMBIED;
634 gpr_mu_unlock(&calld->mu_state);
635 /* zombied call will be destroyed when it's removed from the pending
636 queue... later */
637 } else {
638 gpr_mu_unlock(&calld->mu_state);
639 }
640 }
641 }
642
643 static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
644 grpc_transport *transport,
645 const void *transport_server_data) {
646 channel_data *chand = cd;
647 /* create a call */
648 grpc_call *call =
649 grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data,
650 NULL, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
651 grpc_call_element *elem =
652 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
653 call_data *calld = elem->call_data;
654 grpc_op op;
655 memset(&op, 0, sizeof(op));
656 op.op = GRPC_OP_RECV_INITIAL_METADATA;
657 op.data.recv_initial_metadata = &calld->initial_metadata;
658 grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem);
659 grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
660 &calld->got_initial_metadata);
661 }
662
663 static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
664 bool iomgr_status_ignored) {
665 channel_data *chand = cd;
666 grpc_server *server = chand->server;
667 if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
668 grpc_transport_op op;
669 memset(&op, 0, sizeof(op));
670 op.on_connectivity_state_change = &chand->channel_connectivity_changed,
671 op.connectivity_state = &chand->connectivity_state;
672 grpc_channel_next_op(exec_ctx,
673 grpc_channel_stack_element(
674 grpc_channel_get_channel_stack(chand->channel), 0),
675 &op);
676 } else {
677 gpr_mu_lock(&server->mu_global);
678 destroy_channel(exec_ctx, chand);
679 gpr_mu_unlock(&server->mu_global);
680 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
681 }
682 }
683
684 static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
685 grpc_call_element_args *args) {
686 call_data *calld = elem->call_data;
687 channel_data *chand = elem->channel_data;
688 memset(calld, 0, sizeof(call_data));
689 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
690 calld->call = grpc_call_from_top_element(elem);
691 gpr_mu_init(&calld->mu_state);
692
693 grpc_closure_init(&calld->server_on_recv_initial_metadata,
694 server_on_recv_initial_metadata, elem);
695
696 server_ref(chand->server);
697 }
698
699 static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
700 grpc_call_element *elem) {
701 channel_data *chand = elem->channel_data;
702 call_data *calld = elem->call_data;
703
704 GPR_ASSERT(calld->state != PENDING);
705
706 if (calld->host) {
707 GRPC_MDSTR_UNREF(calld->host);
708 }
709 if (calld->path) {
710 GRPC_MDSTR_UNREF(calld->path);
711 }
712 grpc_metadata_array_destroy(&calld->initial_metadata);
713
714 gpr_mu_destroy(&calld->mu_state);
715
716 server_unref(exec_ctx, chand->server);
717 }
718
719 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
720 grpc_channel_element *elem,
721 grpc_channel_element_args *args) {
722 channel_data *chand = elem->channel_data;
723 GPR_ASSERT(args->is_first);
724 GPR_ASSERT(!args->is_last);
725 chand->server = NULL;
726 chand->channel = NULL;
727 chand->next = chand->prev = chand;
728 chand->registered_methods = NULL;
729 chand->connectivity_state = GRPC_CHANNEL_IDLE;
730 grpc_closure_init(&chand->channel_connectivity_changed,
731 channel_connectivity_changed, chand);
732 }
733
734 static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
735 grpc_channel_element *elem) {
736 size_t i;
737 channel_data *chand = elem->channel_data;
738 if (chand->registered_methods) {
739 for (i = 0; i < chand->registered_method_slots; i++) {
740 if (chand->registered_methods[i].method) {
741 GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
742 }
743 if (chand->registered_methods[i].host) {
744 GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
745 }
746 }
747 gpr_free(chand->registered_methods);
748 }
749 if (chand->server) {
750 gpr_mu_lock(&chand->server->mu_global);
751 chand->next->prev = chand->prev;
752 chand->prev->next = chand->next;
753 chand->next = chand->prev = chand;
754 maybe_finish_shutdown(exec_ctx, chand->server);
755 gpr_mu_unlock(&chand->server->mu_global);
756 server_unref(exec_ctx, chand->server);
757 }
758 }
759
760 static const grpc_channel_filter server_surface_filter = {
761 server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
762 init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
763 sizeof(channel_data), init_channel_elem, destroy_channel_elem,
764 grpc_call_next_get_peer, "server",
765 };
766
767 void grpc_server_register_completion_queue(grpc_server *server,
768 grpc_completion_queue *cq,
769 void *reserved) {
770 size_t i, n;
771 GRPC_API_TRACE(
772 "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
773 (server, cq, reserved));
774 GPR_ASSERT(!reserved);
775 for (i = 0; i < server->cq_count; i++) {
776 if (server->cqs[i] == cq) return;
777 }
778 GRPC_CQ_INTERNAL_REF(cq, "server");
779 grpc_cq_mark_server_cq(cq);
780 n = server->cq_count++;
781 server->cqs = gpr_realloc(server->cqs,
782 server->cq_count * sizeof(grpc_completion_queue *));
783 server->cqs[n] = cq;
784 }
785
786 grpc_server *grpc_server_create_from_filters(
787 const grpc_channel_filter **filters, size_t filter_count,
788 const grpc_channel_args *args) {
789 size_t i;
790 int census_enabled = grpc_channel_args_is_census_enabled(args);
791
792 grpc_server *server = gpr_malloc(sizeof(grpc_server));
793
794 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
795
796 memset(server, 0, sizeof(grpc_server));
797
798 gpr_mu_init(&server->mu_global);
799 gpr_mu_init(&server->mu_call);
800
801 /* decremented by grpc_server_destroy */
802 gpr_ref_init(&server->internal_refcount, 1);
803 server->root_channel_data.next = server->root_channel_data.prev =
804 &server->root_channel_data;
805
806 /* TODO(ctiller): expose a channel_arg for this */
807 server->max_requested_calls = 32768;
808 server->request_freelist =
809 gpr_stack_lockfree_create(server->max_requested_calls);
810 for (i = 0; i < (size_t)server->max_requested_calls; i++) {
811 gpr_stack_lockfree_push(server->request_freelist, (int)i);
812 }
813 request_matcher_init(&server->unregistered_request_matcher,
814 server->max_requested_calls);
815 server->requested_calls = gpr_malloc(server->max_requested_calls *
816 sizeof(*server->requested_calls));
817
818 /* Server filter stack is:
819
820 server_surface_filter - for making surface API calls
821 grpc_server_census_filter (optional) - for stats collection and tracing
822 {passed in filter stack}
823 grpc_connected_channel_filter - for interfacing with transports */
824 server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
825 server->channel_filters =
826 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
827 server->channel_filters[0] = &server_surface_filter;
828 if (census_enabled) {
829 server->channel_filters[1] = &grpc_server_census_filter;
830 }
831 for (i = 0; i < filter_count; i++) {
832 server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
833 }
834
835 server->channel_args = grpc_channel_args_copy(args);
836
837 return server;
838 }
839
840 static int streq(const char *a, const char *b) {
841 if (a == NULL && b == NULL) return 1;
842 if (a == NULL) return 0;
843 if (b == NULL) return 0;
844 return 0 == strcmp(a, b);
845 }
846
847 void *grpc_server_register_method(grpc_server *server, const char *method,
848 const char *host) {
849 registered_method *m;
850 GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)",
851 3, (server, method, host));
852 if (!method) {
853 gpr_log(GPR_ERROR,
854 "grpc_server_register_method method string cannot be NULL");
855 return NULL;
856 }
857 for (m = server->registered_methods; m; m = m->next) {
858 if (streq(m->method, method) && streq(m->host, host)) {
859 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
860 host ? host : "*");
861 return NULL;
862 }
863 }
864 m = gpr_malloc(sizeof(registered_method));
865 memset(m, 0, sizeof(*m));
866 request_matcher_init(&m->request_matcher, server->max_requested_calls);
867 m->method = gpr_strdup(method);
868 m->host = gpr_strdup(host);
869 m->next = server->registered_methods;
870 server->registered_methods = m;
871 return m;
872 }
873
874 void grpc_server_start(grpc_server *server) {
875 listener *l;
876 size_t i;
877 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
878
879 GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
880
881 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
882 for (i = 0; i < server->cq_count; i++) {
883 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
884 }
885
886 for (l = server->listeners; l; l = l->next) {
887 l->start(&exec_ctx, server, l->arg, server->pollsets, server->cq_count);
888 }
889
890 grpc_exec_ctx_finish(&exec_ctx);
891 }
892
893 void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
894 grpc_transport *transport,
895 grpc_channel_filter const **extra_filters,
896 size_t num_extra_filters,
897 const grpc_channel_args *args) {
898 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
899 grpc_channel_filter const **filters =
900 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
901 size_t i;
902 size_t num_registered_methods;
903 size_t alloc;
904 registered_method *rm;
905 channel_registered_method *crm;
906 grpc_channel *channel;
907 channel_data *chand;
908 grpc_mdstr *host;
909 grpc_mdstr *method;
910 uint32_t hash;
911 size_t slots;
912 uint32_t probes;
913 uint32_t max_probes = 0;
914 grpc_transport_op op;
915
916 for (i = 0; i < s->channel_filter_count; i++) {
917 filters[i] = s->channel_filters[i];
918 }
919 for (; i < s->channel_filter_count + num_extra_filters; i++) {
920 filters[i] = extra_filters[i - s->channel_filter_count];
921 }
922 filters[i] = &grpc_connected_channel_filter;
923
924 for (i = 0; i < s->cq_count; i++) {
925 memset(&op, 0, sizeof(op));
926 op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
927 grpc_transport_perform_op(exec_ctx, transport, &op);
928 }
929
930 channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
931 num_filters, args, 0);
932 chand = (channel_data *)grpc_channel_stack_element(
933 grpc_channel_get_channel_stack(channel), 0)->channel_data;
934 chand->server = s;
935 server_ref(s);
936 chand->channel = channel;
937
938 num_registered_methods = 0;
939 for (rm = s->registered_methods; rm; rm = rm->next) {
940 num_registered_methods++;
941 }
942 /* build a lookup table phrased in terms of mdstr's in this channels context
943 to quickly find registered methods */
944 if (num_registered_methods > 0) {
945 slots = 2 * num_registered_methods;
946 alloc = sizeof(channel_registered_method) * slots;
947 chand->registered_methods = gpr_malloc(alloc);
948 memset(chand->registered_methods, 0, alloc);
949 for (rm = s->registered_methods; rm; rm = rm->next) {
950 host = rm->host ? grpc_mdstr_from_string(rm->host) : NULL;
951 method = grpc_mdstr_from_string(rm->method);
952 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
953 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
954 .server_registered_method != NULL;
955 probes++)
956 ;
957 if (probes > max_probes) max_probes = probes;
958 crm = &chand->registered_methods[(hash + probes) % slots];
959 crm->server_registered_method = rm;
960 crm->host = host;
961 crm->method = method;
962 }
963 GPR_ASSERT(slots <= UINT32_MAX);
964 chand->registered_method_slots = (uint32_t)slots;
965 chand->registered_method_max_probes = max_probes;
966 }
967
968 grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
969 transport);
970
971 gpr_mu_lock(&s->mu_global);
972 chand->next = &s->root_channel_data;
973 chand->prev = chand->next->prev;
974 chand->next->prev = chand->prev->next = chand;
975 gpr_mu_unlock(&s->mu_global);
976
977 gpr_free((void *)filters);
978
979 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
980 memset(&op, 0, sizeof(op));
981 op.set_accept_stream = true;
982 op.set_accept_stream_fn = accept_stream;
983 op.set_accept_stream_user_data = chand;
984 op.on_connectivity_state_change = &chand->channel_connectivity_changed;
985 op.connectivity_state = &chand->connectivity_state;
986 op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
987 grpc_transport_perform_op(exec_ctx, transport, &op);
988 }
989
990 void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
991 grpc_cq_completion *storage) {
992 (void)done_arg;
993 gpr_free(storage);
994 }
995
996 static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
997 bool success) {
998 grpc_server *server = s;
999 gpr_mu_lock(&server->mu_global);
1000 server->listeners_destroyed++;
1001 maybe_finish_shutdown(exec_ctx, server);
1002 gpr_mu_unlock(&server->mu_global);
1003 }
1004
1005 void grpc_server_shutdown_and_notify(grpc_server *server,
1006 grpc_completion_queue *cq, void *tag) {
1007 listener *l;
1008 shutdown_tag *sdt;
1009 channel_broadcaster broadcaster;
1010 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1011
1012 GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1013 (server, cq, tag));
1014
1015 /* lock, and gather up some stuff to do */
1016 gpr_mu_lock(&server->mu_global);
1017 grpc_cq_begin_op(cq, tag);
1018 if (server->shutdown_published) {
1019 grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL,
1020 gpr_malloc(sizeof(grpc_cq_completion)));
1021 gpr_mu_unlock(&server->mu_global);
1022 goto done;
1023 }
1024 server->shutdown_tags =
1025 gpr_realloc(server->shutdown_tags,
1026 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
1027 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1028 sdt->tag = tag;
1029 sdt->cq = cq;
1030 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1031 gpr_mu_unlock(&server->mu_global);
1032 goto done;
1033 }
1034
1035 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1036
1037 channel_broadcaster_init(server, &broadcaster);
1038
1039 gpr_atm_rel_store(&server->shutdown_flag, 1);
1040
1041 /* collect all unregistered then registered calls */
1042 gpr_mu_lock(&server->mu_call);
1043 kill_pending_work_locked(&exec_ctx, server);
1044 gpr_mu_unlock(&server->mu_call);
1045
1046 maybe_finish_shutdown(&exec_ctx, server);
1047 gpr_mu_unlock(&server->mu_global);
1048
1049 /* Shutdown listeners */
1050 for (l = server->listeners; l; l = l->next) {
1051 grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
1052 l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
1053 }
1054
1055 channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 1, 0);
1056
1057 done:
1058 grpc_exec_ctx_finish(&exec_ctx);
1059 }
1060
1061 void grpc_server_cancel_all_calls(grpc_server *server) {
1062 channel_broadcaster broadcaster;
1063 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1064
1065 GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1066
1067 gpr_mu_lock(&server->mu_global);
1068 channel_broadcaster_init(server, &broadcaster);
1069 gpr_mu_unlock(&server->mu_global);
1070
1071 channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1);
1072 grpc_exec_ctx_finish(&exec_ctx);
1073 }
1074
1075 void grpc_server_destroy(grpc_server *server) {
1076 listener *l;
1077 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1078
1079 GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1080
1081 gpr_mu_lock(&server->mu_global);
1082 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1083 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1084
1085 while (server->listeners) {
1086 l = server->listeners;
1087 server->listeners = l->next;
1088 gpr_free(l);
1089 }
1090
1091 gpr_mu_unlock(&server->mu_global);
1092
1093 server_unref(&exec_ctx, server);
1094 grpc_exec_ctx_finish(&exec_ctx);
1095 }
1096
1097 void grpc_server_add_listener(
1098 grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1099 void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1100 grpc_pollset **pollsets, size_t pollset_count),
1101 void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1102 grpc_closure *on_done)) {
1103 listener *l = gpr_malloc(sizeof(listener));
1104 l->arg = arg;
1105 l->start = start;
1106 l->destroy = destroy;
1107 l->next = server->listeners;
1108 server->listeners = l;
1109 }
1110
1111 static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
1112 grpc_server *server,
1113 requested_call *rc) {
1114 call_data *calld = NULL;
1115 request_matcher *rm = NULL;
1116 int request_id;
1117 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1118 fail_call(exec_ctx, server, rc);
1119 return GRPC_CALL_OK;
1120 }
1121 request_id = gpr_stack_lockfree_pop(server->request_freelist);
1122 if (request_id == -1) {
1123 /* out of request ids: just fail this one */
1124 fail_call(exec_ctx, server, rc);
1125 return GRPC_CALL_OK;
1126 }
1127 switch (rc->type) {
1128 case BATCH_CALL:
1129 rm = &server->unregistered_request_matcher;
1130 break;
1131 case REGISTERED_CALL:
1132 rm = &rc->data.registered.registered_method->request_matcher;
1133 break;
1134 }
1135 server->requested_calls[request_id] = *rc;
1136 gpr_free(rc);
1137 if (gpr_stack_lockfree_push(rm->requests, request_id)) {
1138 /* this was the first queued request: we need to lock and start
1139 matching calls */
1140 gpr_mu_lock(&server->mu_call);
1141 while ((calld = rm->pending_head) != NULL) {
1142 request_id = gpr_stack_lockfree_pop(rm->requests);
1143 if (request_id == -1) break;
1144 rm->pending_head = calld->pending_next;
1145 gpr_mu_unlock(&server->mu_call);
1146 gpr_mu_lock(&calld->mu_state);
1147 if (calld->state == ZOMBIED) {
1148 gpr_mu_unlock(&calld->mu_state);
1149 grpc_closure_init(
1150 &calld->kill_zombie_closure, kill_zombie,
1151 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
1152 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true,
1153 NULL);
1154 } else {
1155 GPR_ASSERT(calld->state == PENDING);
1156 calld->state = ACTIVATED;
1157 gpr_mu_unlock(&calld->mu_state);
1158 begin_call(exec_ctx, server, calld,
1159 &server->requested_calls[request_id]);
1160 }
1161 gpr_mu_lock(&server->mu_call);
1162 }
1163 gpr_mu_unlock(&server->mu_call);
1164 }
1165 return GRPC_CALL_OK;
1166 }
1167
1168 grpc_call_error grpc_server_request_call(
1169 grpc_server *server, grpc_call **call, grpc_call_details *details,
1170 grpc_metadata_array *initial_metadata,
1171 grpc_completion_queue *cq_bound_to_call,
1172 grpc_completion_queue *cq_for_notification, void *tag) {
1173 grpc_call_error error;
1174 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1175 requested_call *rc = gpr_malloc(sizeof(*rc));
1176 GRPC_API_TRACE(
1177 "grpc_server_request_call("
1178 "server=%p, call=%p, details=%p, initial_metadata=%p, "
1179 "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1180 7, (server, call, details, initial_metadata, cq_bound_to_call,
1181 cq_for_notification, tag));
1182 if (!grpc_cq_is_server_cq(cq_for_notification)) {
1183 gpr_free(rc);
1184 error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1185 goto done;
1186 }
1187 grpc_cq_begin_op(cq_for_notification, tag);
1188 details->reserved = NULL;
1189 rc->type = BATCH_CALL;
1190 rc->server = server;
1191 rc->tag = tag;
1192 rc->cq_bound_to_call = cq_bound_to_call;
1193 rc->cq_for_notification = cq_for_notification;
1194 rc->call = call;
1195 rc->data.batch.details = details;
1196 rc->initial_metadata = initial_metadata;
1197 error = queue_call_request(&exec_ctx, server, rc);
1198 done:
1199 grpc_exec_ctx_finish(&exec_ctx);
1200 return error;
1201 }
1202
1203 grpc_call_error grpc_server_request_registered_call(
1204 grpc_server *server, void *rmp, grpc_call **call, gpr_timespec *deadline,
1205 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
1206 grpc_completion_queue *cq_bound_to_call,
1207 grpc_completion_queue *cq_for_notification, void *tag) {
1208 grpc_call_error error;
1209 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1210 requested_call *rc = gpr_malloc(sizeof(*rc));
1211 registered_method *rm = rmp;
1212 GRPC_API_TRACE(
1213 "grpc_server_request_registered_call("
1214 "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1215 "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1216 "tag=%p)",
1217 9, (server, rmp, call, deadline, initial_metadata, optional_payload,
1218 cq_bound_to_call, cq_for_notification, tag));
1219 if (!grpc_cq_is_server_cq(cq_for_notification)) {
1220 gpr_free(rc);
1221 error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1222 goto done;
1223 }
1224 grpc_cq_begin_op(cq_for_notification, tag);
1225 rc->type = REGISTERED_CALL;
1226 rc->server = server;
1227 rc->tag = tag;
1228 rc->cq_bound_to_call = cq_bound_to_call;
1229 rc->cq_for_notification = cq_for_notification;
1230 rc->call = call;
1231 rc->data.registered.registered_method = rm;
1232 rc->data.registered.deadline = deadline;
1233 rc->initial_metadata = initial_metadata;
1234 rc->data.registered.optional_payload = optional_payload;
1235 error = queue_call_request(&exec_ctx, server, rc);
1236 done:
1237 grpc_exec_ctx_finish(&exec_ctx);
1238 return error;
1239 }
1240
1241 static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
1242 void *user_data, bool success);
1243
1244 static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1245 gpr_slice slice = value->slice;
1246 size_t len = GPR_SLICE_LENGTH(slice);
1247
1248 if (len + 1 > *capacity) {
1249 *capacity = GPR_MAX(len + 1, *capacity * 2);
1250 *dest = gpr_realloc(*dest, *capacity);
1251 }
1252 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1253 }
1254
1255 static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
1256 call_data *calld, requested_call *rc) {
1257 grpc_op ops[1];
1258 grpc_op *op = ops;
1259
1260 memset(ops, 0, sizeof(ops));
1261
1262 /* called once initial metadata has been read by the call, but BEFORE
1263 the ioreq to fetch it out of the call has been executed.
1264 This means metadata related fields can be relied on in calld, but to
1265 fill in the metadata array passed by the client, we need to perform
1266 an ioreq op, that should complete immediately. */
1267
1268 grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
1269 grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
1270 *rc->call = calld->call;
1271 calld->cq_new = rc->cq_for_notification;
1272 GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
1273 switch (rc->type) {
1274 case BATCH_CALL:
1275 GPR_ASSERT(calld->host != NULL);
1276 GPR_ASSERT(calld->path != NULL);
1277 cpstr(&rc->data.batch.details->host,
1278 &rc->data.batch.details->host_capacity, calld->host);
1279 cpstr(&rc->data.batch.details->method,
1280 &rc->data.batch.details->method_capacity, calld->path);
1281 rc->data.batch.details->deadline = calld->deadline;
1282 break;
1283 case REGISTERED_CALL:
1284 *rc->data.registered.deadline = calld->deadline;
1285 if (rc->data.registered.optional_payload) {
1286 op->op = GRPC_OP_RECV_MESSAGE;
1287 op->data.recv_message = rc->data.registered.optional_payload;
1288 op++;
1289 }
1290 break;
1291 default:
1292 GPR_UNREACHABLE_CODE(return );
1293 }
1294
1295 GRPC_CALL_INTERNAL_REF(calld->call, "server");
1296 grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
1297 (size_t)(op - ops), &rc->publish);
1298 }
1299
1300 static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
1301 grpc_cq_completion *c) {
1302 requested_call *rc = req;
1303 grpc_server *server = rc->server;
1304
1305 if (rc >= server->requested_calls &&
1306 rc < server->requested_calls + server->max_requested_calls) {
1307 GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
1308 gpr_stack_lockfree_push(server->request_freelist,
1309 (int)(rc - server->requested_calls));
1310 } else {
1311 gpr_free(req);
1312 }
1313
1314 server_unref(exec_ctx, server);
1315 }
1316
1317 static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
1318 requested_call *rc) {
1319 *rc->call = NULL;
1320 rc->initial_metadata->count = 0;
1321
1322 server_ref(server);
1323 grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
1324 done_request_event, rc, &rc->completion);
1325 }
1326
1327 static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
1328 bool success) {
1329 requested_call *rc = prc;
1330 grpc_call *call = *rc->call;
1331 grpc_call_element *elem =
1332 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1333 call_data *calld = elem->call_data;
1334 channel_data *chand = elem->channel_data;
1335 server_ref(chand->server);
1336 grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
1337 rc, &rc->completion);
1338 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
1339 }
1340
1341 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1342 return server->channel_args;
1343 }
1344
1345 int grpc_server_has_open_connections(grpc_server *server) {
1346 int r;
1347 gpr_mu_lock(&server->mu_global);
1348 r = server->root_channel_data.next != &server->root_channel_data;
1349 gpr_mu_unlock(&server->mu_global);
1350 return r;
1351 }
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/surface/server.h ('k') | third_party/grpc/src/core/surface/server_chttp2.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698