Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(108)

Side by Side Diff: third_party/grpc/src/core/iomgr/pollset_posix.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <grpc/support/port_platform.h>
35
36 #ifdef GPR_POSIX_SOCKET
37
38 #include "src/core/iomgr/pollset_posix.h"
39
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #include <unistd.h>
44
45 #include <grpc/support/alloc.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/thd.h>
48 #include <grpc/support/tls.h>
49 #include <grpc/support/useful.h>
50 #include "src/core/iomgr/fd_posix.h"
51 #include "src/core/iomgr/iomgr_internal.h"
52 #include "src/core/iomgr/socket_utils_posix.h"
53 #include "src/core/profiling/timers.h"
54 #include "src/core/support/block_annotate.h"
55
56 GPR_TLS_DECL(g_current_thread_poller);
57 GPR_TLS_DECL(g_current_thread_worker);
58
59 /** Default poll() function - a pointer so that it can be overridden by some
60 * tests */
61 grpc_poll_function_type grpc_poll_function = poll;
62
63 /** The alarm system needs to be able to wakeup 'some poller' sometimes
64 * (specifically when a new alarm needs to be triggered earlier than the next
65 * alarm 'epoch').
66 * This wakeup_fd gives us something to alert on when such a case occurs. */
67 grpc_wakeup_fd grpc_global_wakeup_fd;
68
69 static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
70 worker->prev->next = worker->next;
71 worker->next->prev = worker->prev;
72 }
73
74 int grpc_pollset_has_workers(grpc_pollset *p) {
75 return p->root_worker.next != &p->root_worker;
76 }
77
78 static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
79 if (grpc_pollset_has_workers(p)) {
80 grpc_pollset_worker *w = p->root_worker.next;
81 remove_worker(p, w);
82 return w;
83 } else {
84 return NULL;
85 }
86 }
87
88 static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
89 worker->next = &p->root_worker;
90 worker->prev = worker->next->prev;
91 worker->prev->next = worker->next->prev = worker;
92 }
93
94 static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
95 worker->prev = &p->root_worker;
96 worker->next = worker->prev->next;
97 worker->prev->next = worker->next->prev = worker;
98 }
99
100 size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
101
102 void grpc_pollset_kick_ext(grpc_pollset *p,
103 grpc_pollset_worker *specific_worker,
104 uint32_t flags) {
105 GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0);
106
107 /* pollset->mu already held */
108 if (specific_worker != NULL) {
109 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
110 GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0);
111 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
112 for (specific_worker = p->root_worker.next;
113 specific_worker != &p->root_worker;
114 specific_worker = specific_worker->next) {
115 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
116 }
117 p->kicked_without_pollers = 1;
118 GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0);
119 } else if (gpr_tls_get(&g_current_thread_worker) !=
120 (intptr_t)specific_worker) {
121 GPR_TIMER_MARK("different_thread_worker", 0);
122 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
123 specific_worker->reevaluate_polling_on_wakeup = 1;
124 }
125 specific_worker->kicked_specifically = 1;
126 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
127 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
128 GPR_TIMER_MARK("kick_yoself", 0);
129 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
130 specific_worker->reevaluate_polling_on_wakeup = 1;
131 }
132 specific_worker->kicked_specifically = 1;
133 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
134 }
135 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
136 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
137 GPR_TIMER_MARK("kick_anonymous", 0);
138 specific_worker = pop_front_worker(p);
139 if (specific_worker != NULL) {
140 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
141 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
142 push_back_worker(p, specific_worker);
143 specific_worker = pop_front_worker(p);
144 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
145 gpr_tls_get(&g_current_thread_worker) ==
146 (intptr_t)specific_worker) {
147 push_back_worker(p, specific_worker);
148 specific_worker = NULL;
149 }
150 }
151 if (specific_worker != NULL) {
152 GPR_TIMER_MARK("finally_kick", 0);
153 push_back_worker(p, specific_worker);
154 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
155 }
156 } else {
157 GPR_TIMER_MARK("kicked_no_pollers", 0);
158 p->kicked_without_pollers = 1;
159 }
160 }
161
162 GPR_TIMER_END("grpc_pollset_kick_ext", 0);
163 }
164
165 void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
166 grpc_pollset_kick_ext(p, specific_worker, 0);
167 }
168
169 /* global state management */
170
171 void grpc_pollset_global_init(void) {
172 gpr_tls_init(&g_current_thread_poller);
173 gpr_tls_init(&g_current_thread_worker);
174 grpc_wakeup_fd_global_init();
175 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
176 }
177
178 void grpc_pollset_global_shutdown(void) {
179 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
180 gpr_tls_destroy(&g_current_thread_poller);
181 gpr_tls_destroy(&g_current_thread_worker);
182 grpc_wakeup_fd_global_destroy();
183 }
184
185 void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
186
187 /* main interface */
188
189 static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
190
191 void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
192 gpr_mu_init(&pollset->mu);
193 *mu = &pollset->mu;
194 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
195 pollset->in_flight_cbs = 0;
196 pollset->shutting_down = 0;
197 pollset->called_shutdown = 0;
198 pollset->kicked_without_pollers = 0;
199 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
200 pollset->local_wakeup_cache = NULL;
201 pollset->kicked_without_pollers = 0;
202 become_basic_pollset(pollset, NULL);
203 }
204
205 void grpc_pollset_destroy(grpc_pollset *pollset) {
206 GPR_ASSERT(pollset->in_flight_cbs == 0);
207 GPR_ASSERT(!grpc_pollset_has_workers(pollset));
208 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
209 pollset->vtable->destroy(pollset);
210 while (pollset->local_wakeup_cache) {
211 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
212 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
213 gpr_free(pollset->local_wakeup_cache);
214 pollset->local_wakeup_cache = next;
215 }
216 }
217
218 void grpc_pollset_reset(grpc_pollset *pollset) {
219 GPR_ASSERT(pollset->shutting_down);
220 GPR_ASSERT(pollset->in_flight_cbs == 0);
221 GPR_ASSERT(!grpc_pollset_has_workers(pollset));
222 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
223 pollset->vtable->destroy(pollset);
224 pollset->shutting_down = 0;
225 pollset->called_shutdown = 0;
226 pollset->kicked_without_pollers = 0;
227 become_basic_pollset(pollset, NULL);
228 }
229
230 void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
231 grpc_fd *fd) {
232 gpr_mu_lock(&pollset->mu);
233 pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
234 /* the following (enabled only in debug) will reacquire and then release
235 our lock - meaning that if the unlocking flag passed to add_fd above is
236 not respected, the code will deadlock (in a way that we have a chance of
237 debugging) */
238 #ifndef NDEBUG
239 gpr_mu_lock(&pollset->mu);
240 gpr_mu_unlock(&pollset->mu);
241 #endif
242 }
243
244 static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
245 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
246 pollset->vtable->finish_shutdown(pollset);
247 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
248 }
249
250 void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
251 grpc_pollset_worker **worker_hdl, gpr_timespec now,
252 gpr_timespec deadline) {
253 grpc_pollset_worker worker;
254 *worker_hdl = &worker;
255
256 /* pollset->mu already held */
257 int added_worker = 0;
258 int locked = 1;
259 int queued_work = 0;
260 int keep_polling = 0;
261 GPR_TIMER_BEGIN("grpc_pollset_work", 0);
262 /* this must happen before we (potentially) drop pollset->mu */
263 worker.next = worker.prev = NULL;
264 worker.reevaluate_polling_on_wakeup = 0;
265 if (pollset->local_wakeup_cache != NULL) {
266 worker.wakeup_fd = pollset->local_wakeup_cache;
267 pollset->local_wakeup_cache = worker.wakeup_fd->next;
268 } else {
269 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
270 grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
271 }
272 worker.kicked_specifically = 0;
273 /* If there's work waiting for the pollset to be idle, and the
274 pollset is idle, then do that work */
275 if (!grpc_pollset_has_workers(pollset) &&
276 !grpc_closure_list_empty(pollset->idle_jobs)) {
277 GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0);
278 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
279 goto done;
280 }
281 /* If we're shutting down then we don't execute any extended work */
282 if (pollset->shutting_down) {
283 GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0);
284 goto done;
285 }
286 /* Give do_promote priority so we don't starve it out */
287 if (pollset->in_flight_cbs) {
288 GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0);
289 gpr_mu_unlock(&pollset->mu);
290 locked = 0;
291 goto done;
292 }
293 /* Start polling, and keep doing so while we're being asked to
294 re-evaluate our pollers (this allows poll() based pollers to
295 ensure they don't miss wakeups) */
296 keep_polling = 1;
297 while (keep_polling) {
298 keep_polling = 0;
299 if (!pollset->kicked_without_pollers) {
300 if (!added_worker) {
301 push_front_worker(pollset, &worker);
302 added_worker = 1;
303 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
304 }
305 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
306 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
307 pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
308 deadline, now);
309 GPR_TIMER_END("maybe_work_and_unlock", 0);
310 locked = 0;
311 gpr_tls_set(&g_current_thread_poller, 0);
312 } else {
313 GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0);
314 pollset->kicked_without_pollers = 0;
315 }
316 /* Finished execution - start cleaning up.
317 Note that we may arrive here from outside the enclosing while() loop.
318 In that case we won't loop though as we haven't added worker to the
319 worker list, which means nobody could ask us to re-evaluate polling). */
320 done:
321 if (!locked) {
322 queued_work |= grpc_exec_ctx_flush(exec_ctx);
323 gpr_mu_lock(&pollset->mu);
324 locked = 1;
325 }
326 /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
327 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
328 a loop */
329 if (worker.reevaluate_polling_on_wakeup) {
330 worker.reevaluate_polling_on_wakeup = 0;
331 pollset->kicked_without_pollers = 0;
332 if (queued_work || worker.kicked_specifically) {
333 /* If there's queued work on the list, then set the deadline to be
334 immediate so we get back out of the polling loop quickly */
335 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
336 }
337 keep_polling = 1;
338 }
339 }
340 if (added_worker) {
341 remove_worker(pollset, &worker);
342 gpr_tls_set(&g_current_thread_worker, 0);
343 }
344 /* release wakeup fd to the local pool */
345 worker.wakeup_fd->next = pollset->local_wakeup_cache;
346 pollset->local_wakeup_cache = worker.wakeup_fd;
347 /* check shutdown conditions */
348 if (pollset->shutting_down) {
349 if (grpc_pollset_has_workers(pollset)) {
350 grpc_pollset_kick(pollset, NULL);
351 } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
352 pollset->called_shutdown = 1;
353 gpr_mu_unlock(&pollset->mu);
354 finish_shutdown(exec_ctx, pollset);
355 grpc_exec_ctx_flush(exec_ctx);
356 /* Continuing to access pollset here is safe -- it is the caller's
357 * responsibility to not destroy when it has outstanding calls to
358 * grpc_pollset_work.
359 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
360 gpr_mu_lock(&pollset->mu);
361 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
362 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
363 gpr_mu_unlock(&pollset->mu);
364 grpc_exec_ctx_flush(exec_ctx);
365 gpr_mu_lock(&pollset->mu);
366 }
367 }
368 *worker_hdl = NULL;
369 GPR_TIMER_END("grpc_pollset_work", 0);
370 }
371
372 void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
373 grpc_closure *closure) {
374 GPR_ASSERT(!pollset->shutting_down);
375 pollset->shutting_down = 1;
376 pollset->shutdown_done = closure;
377 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
378 if (!grpc_pollset_has_workers(pollset)) {
379 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
380 }
381 if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
382 !grpc_pollset_has_workers(pollset)) {
383 pollset->called_shutdown = 1;
384 finish_shutdown(exec_ctx, pollset);
385 }
386 }
387
388 int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
389 gpr_timespec now) {
390 gpr_timespec timeout;
391 static const int64_t max_spin_polling_us = 10;
392 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
393 return -1;
394 }
395 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
396 max_spin_polling_us,
397 GPR_TIMESPAN))) <= 0) {
398 return 0;
399 }
400 timeout = gpr_time_sub(deadline, now);
401 return gpr_time_to_millis(gpr_time_add(
402 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
403 }
404
405 /*
406 * basic_pollset - a vtable that provides polling for zero or one file
407 * descriptor via poll()
408 */
409
410 typedef struct grpc_unary_promote_args {
411 const grpc_pollset_vtable *original_vtable;
412 grpc_pollset *pollset;
413 grpc_fd *fd;
414 grpc_closure promotion_closure;
415 } grpc_unary_promote_args;
416
417 static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
418 bool success) {
419 grpc_unary_promote_args *up_args = args;
420 const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
421 grpc_pollset *pollset = up_args->pollset;
422 grpc_fd *fd = up_args->fd;
423
424 /*
425 * This is quite tricky. There are a number of cases to keep in mind here:
426 * 1. fd may have been orphaned
427 * 2. The pollset may no longer be a unary poller (and we can't let case #1
428 * leak to other pollset types!)
429 * 3. pollset's fd (which may have changed) may have been orphaned
430 * 4. The pollset may be shutting down.
431 */
432
433 gpr_mu_lock(&pollset->mu);
434 /* First we need to ensure that nobody is polling concurrently */
435 GPR_ASSERT(!grpc_pollset_has_workers(pollset));
436
437 gpr_free(up_args);
438 /* At this point the pollset may no longer be a unary poller. In that case
439 * we should just call the right add function and be done. */
440 /* TODO(klempner): If we're not careful this could cause infinite recursion.
441 * That's not a problem for now because empty_pollset has a trivial poller
442 * and we don't have any mechanism to unbecome multipoller. */
443 pollset->in_flight_cbs--;
444 if (pollset->shutting_down) {
445 /* We don't care about this pollset anymore. */
446 if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
447 pollset->called_shutdown = 1;
448 finish_shutdown(exec_ctx, pollset);
449 }
450 } else if (grpc_fd_is_orphaned(fd)) {
451 /* Don't try to add it to anything, we'll drop our ref on it below */
452 } else if (pollset->vtable != original_vtable) {
453 pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
454 } else if (fd != pollset->data.ptr) {
455 grpc_fd *fds[2];
456 fds[0] = pollset->data.ptr;
457 fds[1] = fd;
458
459 if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
460 grpc_platform_become_multipoller(exec_ctx, pollset, fds,
461 GPR_ARRAY_SIZE(fds));
462 GRPC_FD_UNREF(fds[0], "basicpoll");
463 } else {
464 /* old fd is orphaned and we haven't cleaned it up until now, so remain a
465 * unary poller */
466 /* Note that it is possible that fds[1] is also orphaned at this point.
467 * That's okay, we'll correct it at the next add or poll. */
468 if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
469 pollset->data.ptr = fd;
470 GRPC_FD_REF(fd, "basicpoll");
471 }
472 }
473
474 gpr_mu_unlock(&pollset->mu);
475
476 /* Matching ref in basic_pollset_add_fd */
477 GRPC_FD_UNREF(fd, "basicpoll_add");
478 }
479
480 static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
481 grpc_fd *fd, int and_unlock_pollset) {
482 grpc_unary_promote_args *up_args;
483 GPR_ASSERT(fd);
484 if (fd == pollset->data.ptr) goto exit;
485
486 if (!grpc_pollset_has_workers(pollset)) {
487 /* Fast path -- no in flight cbs */
488 /* TODO(klempner): Comment this out and fix any test failures or establish
489 * they are due to timing issues */
490 grpc_fd *fds[2];
491 fds[0] = pollset->data.ptr;
492 fds[1] = fd;
493
494 if (fds[0] == NULL) {
495 pollset->data.ptr = fd;
496 GRPC_FD_REF(fd, "basicpoll");
497 } else if (!grpc_fd_is_orphaned(fds[0])) {
498 grpc_platform_become_multipoller(exec_ctx, pollset, fds,
499 GPR_ARRAY_SIZE(fds));
500 GRPC_FD_UNREF(fds[0], "basicpoll");
501 } else {
502 /* old fd is orphaned and we haven't cleaned it up until now, so remain a
503 * unary poller */
504 GRPC_FD_UNREF(fds[0], "basicpoll");
505 pollset->data.ptr = fd;
506 GRPC_FD_REF(fd, "basicpoll");
507 }
508 goto exit;
509 }
510
511 /* Now we need to promote. This needs to happen when we're not polling. Since
512 * this may be called from poll, the wait needs to happen asynchronously. */
513 GRPC_FD_REF(fd, "basicpoll_add");
514 pollset->in_flight_cbs++;
515 up_args = gpr_malloc(sizeof(*up_args));
516 up_args->fd = fd;
517 up_args->original_vtable = pollset->vtable;
518 up_args->pollset = pollset;
519 up_args->promotion_closure.cb = basic_do_promote;
520 up_args->promotion_closure.cb_arg = up_args;
521
522 grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
523 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
524
525 exit:
526 if (and_unlock_pollset) {
527 gpr_mu_unlock(&pollset->mu);
528 }
529 }
530
531 static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
532 grpc_pollset *pollset,
533 grpc_pollset_worker *worker,
534 gpr_timespec deadline,
535 gpr_timespec now) {
536 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
537 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
538
539 struct pollfd pfd[3];
540 grpc_fd *fd;
541 grpc_fd_watcher fd_watcher;
542 int timeout;
543 int r;
544 nfds_t nfds;
545
546 fd = pollset->data.ptr;
547 if (fd && grpc_fd_is_orphaned(fd)) {
548 GRPC_FD_UNREF(fd, "basicpoll");
549 fd = pollset->data.ptr = NULL;
550 }
551 timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
552 pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
553 pfd[0].events = POLLIN;
554 pfd[0].revents = 0;
555 pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
556 pfd[1].events = POLLIN;
557 pfd[1].revents = 0;
558 nfds = 2;
559 if (fd) {
560 pfd[2].fd = fd->fd;
561 pfd[2].revents = 0;
562 GRPC_FD_REF(fd, "basicpoll_begin");
563 gpr_mu_unlock(&pollset->mu);
564 pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
565 POLLOUT, &fd_watcher);
566 if (pfd[2].events != 0) {
567 nfds++;
568 }
569 } else {
570 gpr_mu_unlock(&pollset->mu);
571 }
572
573 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
574 even going into the blocking annotation if possible */
575 /* poll fd count (argument 2) is shortened by one if we have no events
576 to poll on - such that it only includes the kicker */
577 GPR_TIMER_BEGIN("poll", 0);
578 GRPC_SCHEDULING_START_BLOCKING_REGION;
579 r = grpc_poll_function(pfd, nfds, timeout);
580 GRPC_SCHEDULING_END_BLOCKING_REGION;
581 GPR_TIMER_END("poll", 0);
582
583 if (r < 0) {
584 if (errno != EINTR) {
585 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
586 }
587 if (fd) {
588 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
589 }
590 } else if (r == 0) {
591 if (fd) {
592 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
593 }
594 } else {
595 if (pfd[0].revents & POLLIN_CHECK) {
596 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
597 }
598 if (pfd[1].revents & POLLIN_CHECK) {
599 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
600 }
601 if (nfds > 2) {
602 grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
603 pfd[2].revents & POLLOUT_CHECK);
604 } else if (fd) {
605 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
606 }
607 }
608
609 if (fd) {
610 GRPC_FD_UNREF(fd, "basicpoll_begin");
611 }
612 }
613
614 static void basic_pollset_destroy(grpc_pollset *pollset) {
615 if (pollset->data.ptr != NULL) {
616 GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
617 pollset->data.ptr = NULL;
618 }
619 }
620
621 static const grpc_pollset_vtable basic_pollset = {
622 basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock,
623 basic_pollset_destroy, basic_pollset_destroy};
624
625 static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
626 pollset->vtable = &basic_pollset;
627 pollset->data.ptr = fd_or_null;
628 if (fd_or_null != NULL) {
629 GRPC_FD_REF(fd_or_null, "basicpoll");
630 }
631 }
632
633 #endif /* GPR_POSIX_POLLSET */
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/iomgr/pollset_posix.h ('k') | third_party/grpc/src/core/iomgr/pollset_set.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698