Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: third_party/grpc/src/core/surface/completion_queue.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include "src/core/surface/completion_queue.h"
35
36 #include <stdio.h>
37 #include <string.h>
38
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/atm.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/time.h>
43
44 #include "src/core/iomgr/pollset.h"
45 #include "src/core/iomgr/timer.h"
46 #include "src/core/profiling/timers.h"
47 #include "src/core/support/string.h"
48 #include "src/core/surface/api_trace.h"
49 #include "src/core/surface/call.h"
50 #include "src/core/surface/event_string.h"
51 #include "src/core/surface/surface_trace.h"
52
53 typedef struct {
54 grpc_pollset_worker **worker;
55 void *tag;
56 } plucker;
57
58 /* Completion queue structure */
59 struct grpc_completion_queue {
60 /** owned by pollset */
61 gpr_mu *mu;
62 /** completed events */
63 grpc_cq_completion completed_head;
64 grpc_cq_completion *completed_tail;
65 /** Number of pending events (+1 if we're not shutdown) */
66 gpr_refcount pending_events;
67 /** Once owning_refs drops to zero, we will destroy the cq */
68 gpr_refcount owning_refs;
69 /** 0 initially, 1 once we've begun shutting down */
70 int shutdown;
71 int shutdown_called;
72 int is_server_cq;
73 int num_pluckers;
74 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
75 grpc_closure pollset_shutdown_done;
76
77 #ifndef NDEBUG
78 void **outstanding_tags;
79 size_t outstanding_tag_count;
80 size_t outstanding_tag_capacity;
81 #endif
82
83 grpc_completion_queue *next_free;
84 };
85
86 #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
87
88 static gpr_mu g_freelist_mu;
89 grpc_completion_queue *g_freelist;
90
91 static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
92 bool success);
93
94 void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
95
96 void grpc_cq_global_shutdown(void) {
97 gpr_mu_destroy(&g_freelist_mu);
98 while (g_freelist) {
99 grpc_completion_queue *next = g_freelist->next_free;
100 grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist));
101 #ifndef NDEBUG
102 gpr_free(g_freelist->outstanding_tags);
103 #endif
104 gpr_free(g_freelist);
105 g_freelist = next;
106 }
107 }
108
109 struct grpc_cq_alarm {
110 grpc_timer alarm;
111 grpc_cq_completion completion;
112 /** completion queue where events about this alarm will be posted */
113 grpc_completion_queue *cq;
114 /** user supplied tag */
115 void *tag;
116 };
117
118 grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
119 grpc_completion_queue *cc;
120 GPR_ASSERT(!reserved);
121
122 GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
123
124 GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
125
126 gpr_mu_lock(&g_freelist_mu);
127 if (g_freelist == NULL) {
128 gpr_mu_unlock(&g_freelist_mu);
129
130 cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
131 grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
132 #ifndef NDEBUG
133 cc->outstanding_tags = NULL;
134 cc->outstanding_tag_capacity = 0;
135 #endif
136 } else {
137 cc = g_freelist;
138 g_freelist = g_freelist->next_free;
139 gpr_mu_unlock(&g_freelist_mu);
140 /* pollset already initialized */
141 }
142
143 /* Initial ref is dropped by grpc_completion_queue_shutdown */
144 gpr_ref_init(&cc->pending_events, 1);
145 /* One for destroy(), one for pollset_shutdown */
146 gpr_ref_init(&cc->owning_refs, 2);
147 cc->completed_tail = &cc->completed_head;
148 cc->completed_head.next = (uintptr_t)cc->completed_tail;
149 cc->shutdown = 0;
150 cc->shutdown_called = 0;
151 cc->is_server_cq = 0;
152 cc->num_pluckers = 0;
153 #ifndef NDEBUG
154 cc->outstanding_tag_count = 0;
155 #endif
156 grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc);
157
158 GPR_TIMER_END("grpc_completion_queue_create", 0);
159
160 return cc;
161 }
162
163 #ifdef GRPC_CQ_REF_COUNT_DEBUG
164 void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
165 const char *file, int line) {
166 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc,
167 (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
168 #else
169 void grpc_cq_internal_ref(grpc_completion_queue *cc) {
170 #endif
171 gpr_ref(&cc->owning_refs);
172 }
173
174 static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
175 bool success) {
176 grpc_completion_queue *cc = arg;
177 GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
178 }
179
180 #ifdef GRPC_CQ_REF_COUNT_DEBUG
181 void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
182 const char *file, int line) {
183 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
184 (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
185 #else
186 void grpc_cq_internal_unref(grpc_completion_queue *cc) {
187 #endif
188 if (gpr_unref(&cc->owning_refs)) {
189 GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
190 grpc_pollset_reset(POLLSET_FROM_CQ(cc));
191 gpr_mu_lock(&g_freelist_mu);
192 cc->next_free = g_freelist;
193 g_freelist = cc;
194 gpr_mu_unlock(&g_freelist_mu);
195 }
196 }
197
198 void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
199 #ifndef NDEBUG
200 gpr_mu_lock(cc->mu);
201 GPR_ASSERT(!cc->shutdown_called);
202 if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
203 cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
204 cc->outstanding_tags =
205 gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) *
206 cc->outstanding_tag_capacity);
207 }
208 cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
209 gpr_mu_unlock(cc->mu);
210 #endif
211 gpr_ref(&cc->pending_events);
212 }
213
214 /* Signal the end of an operation - if this is the last waiting-to-be-queued
215 event, then enter shutdown mode */
216 /* Queue a GRPC_OP_COMPLETED operation */
217 void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
218 void *tag, int success,
219 void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
220 grpc_cq_completion *storage),
221 void *done_arg, grpc_cq_completion *storage) {
222 int shutdown;
223 int i;
224 grpc_pollset_worker *pluck_worker;
225 #ifndef NDEBUG
226 int found = 0;
227 #endif
228
229 GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
230
231 storage->tag = tag;
232 storage->done = done;
233 storage->done_arg = done_arg;
234 storage->next =
235 ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0));
236
237 gpr_mu_lock(cc->mu);
238 #ifndef NDEBUG
239 for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
240 if (cc->outstanding_tags[i] == tag) {
241 cc->outstanding_tag_count--;
242 GPR_SWAP(void *, cc->outstanding_tags[i],
243 cc->outstanding_tags[cc->outstanding_tag_count]);
244 found = 1;
245 break;
246 }
247 }
248 GPR_ASSERT(found);
249 #endif
250 shutdown = gpr_unref(&cc->pending_events);
251 if (!shutdown) {
252 cc->completed_tail->next =
253 ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
254 cc->completed_tail = storage;
255 pluck_worker = NULL;
256 for (i = 0; i < cc->num_pluckers; i++) {
257 if (cc->pluckers[i].tag == tag) {
258 pluck_worker = *cc->pluckers[i].worker;
259 break;
260 }
261 }
262 grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
263 gpr_mu_unlock(cc->mu);
264 } else {
265 cc->completed_tail->next =
266 ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
267 cc->completed_tail = storage;
268 GPR_ASSERT(!cc->shutdown);
269 GPR_ASSERT(cc->shutdown_called);
270 cc->shutdown = 1;
271 grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
272 &cc->pollset_shutdown_done);
273 gpr_mu_unlock(cc->mu);
274 }
275
276 GPR_TIMER_END("grpc_cq_end_op", 0);
277 }
278
279 grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
280 gpr_timespec deadline, void *reserved) {
281 grpc_event ret;
282 grpc_pollset_worker *worker = NULL;
283 int first_loop = 1;
284 gpr_timespec now;
285 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
286
287 GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
288
289 GRPC_API_TRACE(
290 "grpc_completion_queue_next("
291 "cc=%p, "
292 "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
293 "reserved=%p)",
294 5, (cc, (long long)deadline.tv_sec, (int)deadline.tv_nsec,
295 (int)deadline.clock_type, reserved));
296 GPR_ASSERT(!reserved);
297
298 deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
299
300 GRPC_CQ_INTERNAL_REF(cc, "next");
301 gpr_mu_lock(cc->mu);
302 for (;;) {
303 if (cc->completed_tail != &cc->completed_head) {
304 grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
305 cc->completed_head.next = c->next & ~(uintptr_t)1;
306 if (c == cc->completed_tail) {
307 cc->completed_tail = &cc->completed_head;
308 }
309 gpr_mu_unlock(cc->mu);
310 ret.type = GRPC_OP_COMPLETE;
311 ret.success = c->next & 1u;
312 ret.tag = c->tag;
313 c->done(&exec_ctx, c->done_arg, c);
314 break;
315 }
316 if (cc->shutdown) {
317 gpr_mu_unlock(cc->mu);
318 memset(&ret, 0, sizeof(ret));
319 ret.type = GRPC_QUEUE_SHUTDOWN;
320 break;
321 }
322 now = gpr_now(GPR_CLOCK_MONOTONIC);
323 if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
324 gpr_mu_unlock(cc->mu);
325 memset(&ret, 0, sizeof(ret));
326 ret.type = GRPC_QUEUE_TIMEOUT;
327 break;
328 }
329 first_loop = 0;
330 /* Check alarms - these are a global resource so we just ping
331 each time through on every pollset.
332 May update deadline to ensure timely wakeups.
333 TODO(ctiller): can this work be localized? */
334 gpr_timespec iteration_deadline = deadline;
335 if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
336 GPR_TIMER_MARK("alarm_triggered", 0);
337 gpr_mu_unlock(cc->mu);
338 grpc_exec_ctx_flush(&exec_ctx);
339 gpr_mu_lock(cc->mu);
340 continue;
341 } else {
342 grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
343 iteration_deadline);
344 }
345 }
346 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
347 GRPC_CQ_INTERNAL_UNREF(cc, "next");
348 grpc_exec_ctx_finish(&exec_ctx);
349
350 GPR_TIMER_END("grpc_completion_queue_next", 0);
351
352 return ret;
353 }
354
355 static int add_plucker(grpc_completion_queue *cc, void *tag,
356 grpc_pollset_worker **worker) {
357 if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
358 return 0;
359 }
360 cc->pluckers[cc->num_pluckers].tag = tag;
361 cc->pluckers[cc->num_pluckers].worker = worker;
362 cc->num_pluckers++;
363 return 1;
364 }
365
366 static void del_plucker(grpc_completion_queue *cc, void *tag,
367 grpc_pollset_worker **worker) {
368 int i;
369 for (i = 0; i < cc->num_pluckers; i++) {
370 if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
371 cc->num_pluckers--;
372 GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
373 return;
374 }
375 }
376 GPR_UNREACHABLE_CODE(return );
377 }
378
379 grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
380 gpr_timespec deadline, void *reserved) {
381 grpc_event ret;
382 grpc_cq_completion *c;
383 grpc_cq_completion *prev;
384 grpc_pollset_worker *worker = NULL;
385 gpr_timespec now;
386 int first_loop = 1;
387 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
388
389 GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
390
391 GRPC_API_TRACE(
392 "grpc_completion_queue_pluck("
393 "cc=%p, tag=%p, "
394 "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
395 "reserved=%p)",
396 6, (cc, tag, (long long)deadline.tv_sec, (int)deadline.tv_nsec,
397 (int)deadline.clock_type, reserved));
398 GPR_ASSERT(!reserved);
399
400 deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
401
402 GRPC_CQ_INTERNAL_REF(cc, "pluck");
403 gpr_mu_lock(cc->mu);
404 for (;;) {
405 prev = &cc->completed_head;
406 while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
407 &cc->completed_head) {
408 if (c->tag == tag) {
409 prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
410 if (c == cc->completed_tail) {
411 cc->completed_tail = prev;
412 }
413 gpr_mu_unlock(cc->mu);
414 ret.type = GRPC_OP_COMPLETE;
415 ret.success = c->next & 1u;
416 ret.tag = c->tag;
417 c->done(&exec_ctx, c->done_arg, c);
418 goto done;
419 }
420 prev = c;
421 }
422 if (cc->shutdown) {
423 gpr_mu_unlock(cc->mu);
424 memset(&ret, 0, sizeof(ret));
425 ret.type = GRPC_QUEUE_SHUTDOWN;
426 break;
427 }
428 if (!add_plucker(cc, tag, &worker)) {
429 gpr_log(GPR_DEBUG,
430 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
431 "is %d",
432 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
433 gpr_mu_unlock(cc->mu);
434 memset(&ret, 0, sizeof(ret));
435 /* TODO(ctiller): should we use a different result here */
436 ret.type = GRPC_QUEUE_TIMEOUT;
437 break;
438 }
439 now = gpr_now(GPR_CLOCK_MONOTONIC);
440 if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
441 del_plucker(cc, tag, &worker);
442 gpr_mu_unlock(cc->mu);
443 memset(&ret, 0, sizeof(ret));
444 ret.type = GRPC_QUEUE_TIMEOUT;
445 break;
446 }
447 first_loop = 0;
448 /* Check alarms - these are a global resource so we just ping
449 each time through on every pollset.
450 May update deadline to ensure timely wakeups.
451 TODO(ctiller): can this work be localized? */
452 gpr_timespec iteration_deadline = deadline;
453 if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
454 GPR_TIMER_MARK("alarm_triggered", 0);
455 gpr_mu_unlock(cc->mu);
456 grpc_exec_ctx_flush(&exec_ctx);
457 gpr_mu_lock(cc->mu);
458 } else {
459 grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
460 iteration_deadline);
461 }
462 del_plucker(cc, tag, &worker);
463 }
464 done:
465 GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
466 GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
467 grpc_exec_ctx_finish(&exec_ctx);
468
469 GPR_TIMER_END("grpc_completion_queue_pluck", 0);
470
471 return ret;
472 }
473
474 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
475 to zero here, then enter shutdown mode and wake up any waiters */
476 void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
477 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
478 GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
479 GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
480 gpr_mu_lock(cc->mu);
481 if (cc->shutdown_called) {
482 gpr_mu_unlock(cc->mu);
483 GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
484 return;
485 }
486 cc->shutdown_called = 1;
487 if (gpr_unref(&cc->pending_events)) {
488 GPR_ASSERT(!cc->shutdown);
489 cc->shutdown = 1;
490 grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
491 &cc->pollset_shutdown_done);
492 }
493 gpr_mu_unlock(cc->mu);
494 grpc_exec_ctx_finish(&exec_ctx);
495 GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
496 }
497
498 void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
499 GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
500 GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
501 grpc_completion_queue_shutdown(cc);
502 GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
503 GPR_TIMER_END("grpc_completion_queue_destroy", 0);
504 }
505
506 grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
507 return POLLSET_FROM_CQ(cc);
508 }
509
510 void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
511
512 int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/surface/completion_queue.h ('k') | third_party/grpc/src/core/surface/event_string.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698