OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include <grpc/support/port_platform.h> |
| 35 |
| 36 #ifdef GPR_POSIX_SOCKET |
| 37 |
| 38 #include "src/core/iomgr/pollset_posix.h" |
| 39 |
| 40 #include <errno.h> |
| 41 #include <stdlib.h> |
| 42 #include <string.h> |
| 43 #include <unistd.h> |
| 44 |
| 45 #include <grpc/support/alloc.h> |
| 46 #include <grpc/support/log.h> |
| 47 #include <grpc/support/thd.h> |
| 48 #include <grpc/support/tls.h> |
| 49 #include <grpc/support/useful.h> |
| 50 #include "src/core/iomgr/fd_posix.h" |
| 51 #include "src/core/iomgr/iomgr_internal.h" |
| 52 #include "src/core/iomgr/socket_utils_posix.h" |
| 53 #include "src/core/profiling/timers.h" |
| 54 #include "src/core/support/block_annotate.h" |
| 55 |
| 56 GPR_TLS_DECL(g_current_thread_poller); |
| 57 GPR_TLS_DECL(g_current_thread_worker); |
| 58 |
| 59 /** Default poll() function - a pointer so that it can be overridden by some |
| 60 * tests */ |
| 61 grpc_poll_function_type grpc_poll_function = poll; |
| 62 |
| 63 /** The alarm system needs to be able to wakeup 'some poller' sometimes |
| 64 * (specifically when a new alarm needs to be triggered earlier than the next |
| 65 * alarm 'epoch'). |
| 66 * This wakeup_fd gives us something to alert on when such a case occurs. */ |
| 67 grpc_wakeup_fd grpc_global_wakeup_fd; |
| 68 |
| 69 static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { |
| 70 worker->prev->next = worker->next; |
| 71 worker->next->prev = worker->prev; |
| 72 } |
| 73 |
| 74 int grpc_pollset_has_workers(grpc_pollset *p) { |
| 75 return p->root_worker.next != &p->root_worker; |
| 76 } |
| 77 |
| 78 static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { |
| 79 if (grpc_pollset_has_workers(p)) { |
| 80 grpc_pollset_worker *w = p->root_worker.next; |
| 81 remove_worker(p, w); |
| 82 return w; |
| 83 } else { |
| 84 return NULL; |
| 85 } |
| 86 } |
| 87 |
| 88 static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { |
| 89 worker->next = &p->root_worker; |
| 90 worker->prev = worker->next->prev; |
| 91 worker->prev->next = worker->next->prev = worker; |
| 92 } |
| 93 |
| 94 static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { |
| 95 worker->prev = &p->root_worker; |
| 96 worker->next = worker->prev->next; |
| 97 worker->prev->next = worker->next->prev = worker; |
| 98 } |
| 99 |
| 100 size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } |
| 101 |
| 102 void grpc_pollset_kick_ext(grpc_pollset *p, |
| 103 grpc_pollset_worker *specific_worker, |
| 104 uint32_t flags) { |
| 105 GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0); |
| 106 |
| 107 /* pollset->mu already held */ |
| 108 if (specific_worker != NULL) { |
| 109 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { |
| 110 GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0); |
| 111 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); |
| 112 for (specific_worker = p->root_worker.next; |
| 113 specific_worker != &p->root_worker; |
| 114 specific_worker = specific_worker->next) { |
| 115 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); |
| 116 } |
| 117 p->kicked_without_pollers = 1; |
| 118 GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); |
| 119 } else if (gpr_tls_get(&g_current_thread_worker) != |
| 120 (intptr_t)specific_worker) { |
| 121 GPR_TIMER_MARK("different_thread_worker", 0); |
| 122 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { |
| 123 specific_worker->reevaluate_polling_on_wakeup = 1; |
| 124 } |
| 125 specific_worker->kicked_specifically = 1; |
| 126 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); |
| 127 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { |
| 128 GPR_TIMER_MARK("kick_yoself", 0); |
| 129 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { |
| 130 specific_worker->reevaluate_polling_on_wakeup = 1; |
| 131 } |
| 132 specific_worker->kicked_specifically = 1; |
| 133 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); |
| 134 } |
| 135 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { |
| 136 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); |
| 137 GPR_TIMER_MARK("kick_anonymous", 0); |
| 138 specific_worker = pop_front_worker(p); |
| 139 if (specific_worker != NULL) { |
| 140 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { |
| 141 GPR_TIMER_MARK("kick_anonymous_not_self", 0); |
| 142 push_back_worker(p, specific_worker); |
| 143 specific_worker = pop_front_worker(p); |
| 144 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && |
| 145 gpr_tls_get(&g_current_thread_worker) == |
| 146 (intptr_t)specific_worker) { |
| 147 push_back_worker(p, specific_worker); |
| 148 specific_worker = NULL; |
| 149 } |
| 150 } |
| 151 if (specific_worker != NULL) { |
| 152 GPR_TIMER_MARK("finally_kick", 0); |
| 153 push_back_worker(p, specific_worker); |
| 154 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); |
| 155 } |
| 156 } else { |
| 157 GPR_TIMER_MARK("kicked_no_pollers", 0); |
| 158 p->kicked_without_pollers = 1; |
| 159 } |
| 160 } |
| 161 |
| 162 GPR_TIMER_END("grpc_pollset_kick_ext", 0); |
| 163 } |
| 164 |
| 165 void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { |
| 166 grpc_pollset_kick_ext(p, specific_worker, 0); |
| 167 } |
| 168 |
| 169 /* global state management */ |
| 170 |
| 171 void grpc_pollset_global_init(void) { |
| 172 gpr_tls_init(&g_current_thread_poller); |
| 173 gpr_tls_init(&g_current_thread_worker); |
| 174 grpc_wakeup_fd_global_init(); |
| 175 grpc_wakeup_fd_init(&grpc_global_wakeup_fd); |
| 176 } |
| 177 |
| 178 void grpc_pollset_global_shutdown(void) { |
| 179 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); |
| 180 gpr_tls_destroy(&g_current_thread_poller); |
| 181 gpr_tls_destroy(&g_current_thread_worker); |
| 182 grpc_wakeup_fd_global_destroy(); |
| 183 } |
| 184 |
| 185 void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } |
| 186 |
| 187 /* main interface */ |
| 188 |
| 189 static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); |
| 190 |
| 191 void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
| 192 gpr_mu_init(&pollset->mu); |
| 193 *mu = &pollset->mu; |
| 194 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; |
| 195 pollset->in_flight_cbs = 0; |
| 196 pollset->shutting_down = 0; |
| 197 pollset->called_shutdown = 0; |
| 198 pollset->kicked_without_pollers = 0; |
| 199 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; |
| 200 pollset->local_wakeup_cache = NULL; |
| 201 pollset->kicked_without_pollers = 0; |
| 202 become_basic_pollset(pollset, NULL); |
| 203 } |
| 204 |
| 205 void grpc_pollset_destroy(grpc_pollset *pollset) { |
| 206 GPR_ASSERT(pollset->in_flight_cbs == 0); |
| 207 GPR_ASSERT(!grpc_pollset_has_workers(pollset)); |
| 208 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
| 209 pollset->vtable->destroy(pollset); |
| 210 while (pollset->local_wakeup_cache) { |
| 211 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; |
| 212 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); |
| 213 gpr_free(pollset->local_wakeup_cache); |
| 214 pollset->local_wakeup_cache = next; |
| 215 } |
| 216 } |
| 217 |
| 218 void grpc_pollset_reset(grpc_pollset *pollset) { |
| 219 GPR_ASSERT(pollset->shutting_down); |
| 220 GPR_ASSERT(pollset->in_flight_cbs == 0); |
| 221 GPR_ASSERT(!grpc_pollset_has_workers(pollset)); |
| 222 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
| 223 pollset->vtable->destroy(pollset); |
| 224 pollset->shutting_down = 0; |
| 225 pollset->called_shutdown = 0; |
| 226 pollset->kicked_without_pollers = 0; |
| 227 become_basic_pollset(pollset, NULL); |
| 228 } |
| 229 |
| 230 void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
| 231 grpc_fd *fd) { |
| 232 gpr_mu_lock(&pollset->mu); |
| 233 pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); |
| 234 /* the following (enabled only in debug) will reacquire and then release |
| 235 our lock - meaning that if the unlocking flag passed to add_fd above is |
| 236 not respected, the code will deadlock (in a way that we have a chance of |
| 237 debugging) */ |
| 238 #ifndef NDEBUG |
| 239 gpr_mu_lock(&pollset->mu); |
| 240 gpr_mu_unlock(&pollset->mu); |
| 241 #endif |
| 242 } |
| 243 |
| 244 static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
| 245 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); |
| 246 pollset->vtable->finish_shutdown(pollset); |
| 247 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); |
| 248 } |
| 249 |
| 250 void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
| 251 grpc_pollset_worker **worker_hdl, gpr_timespec now, |
| 252 gpr_timespec deadline) { |
| 253 grpc_pollset_worker worker; |
| 254 *worker_hdl = &worker; |
| 255 |
| 256 /* pollset->mu already held */ |
| 257 int added_worker = 0; |
| 258 int locked = 1; |
| 259 int queued_work = 0; |
| 260 int keep_polling = 0; |
| 261 GPR_TIMER_BEGIN("grpc_pollset_work", 0); |
| 262 /* this must happen before we (potentially) drop pollset->mu */ |
| 263 worker.next = worker.prev = NULL; |
| 264 worker.reevaluate_polling_on_wakeup = 0; |
| 265 if (pollset->local_wakeup_cache != NULL) { |
| 266 worker.wakeup_fd = pollset->local_wakeup_cache; |
| 267 pollset->local_wakeup_cache = worker.wakeup_fd->next; |
| 268 } else { |
| 269 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); |
| 270 grpc_wakeup_fd_init(&worker.wakeup_fd->fd); |
| 271 } |
| 272 worker.kicked_specifically = 0; |
| 273 /* If there's work waiting for the pollset to be idle, and the |
| 274 pollset is idle, then do that work */ |
| 275 if (!grpc_pollset_has_workers(pollset) && |
| 276 !grpc_closure_list_empty(pollset->idle_jobs)) { |
| 277 GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); |
| 278 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); |
| 279 goto done; |
| 280 } |
| 281 /* If we're shutting down then we don't execute any extended work */ |
| 282 if (pollset->shutting_down) { |
| 283 GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); |
| 284 goto done; |
| 285 } |
| 286 /* Give do_promote priority so we don't starve it out */ |
| 287 if (pollset->in_flight_cbs) { |
| 288 GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); |
| 289 gpr_mu_unlock(&pollset->mu); |
| 290 locked = 0; |
| 291 goto done; |
| 292 } |
| 293 /* Start polling, and keep doing so while we're being asked to |
| 294 re-evaluate our pollers (this allows poll() based pollers to |
| 295 ensure they don't miss wakeups) */ |
| 296 keep_polling = 1; |
| 297 while (keep_polling) { |
| 298 keep_polling = 0; |
| 299 if (!pollset->kicked_without_pollers) { |
| 300 if (!added_worker) { |
| 301 push_front_worker(pollset, &worker); |
| 302 added_worker = 1; |
| 303 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); |
| 304 } |
| 305 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); |
| 306 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); |
| 307 pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, |
| 308 deadline, now); |
| 309 GPR_TIMER_END("maybe_work_and_unlock", 0); |
| 310 locked = 0; |
| 311 gpr_tls_set(&g_current_thread_poller, 0); |
| 312 } else { |
| 313 GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0); |
| 314 pollset->kicked_without_pollers = 0; |
| 315 } |
| 316 /* Finished execution - start cleaning up. |
| 317 Note that we may arrive here from outside the enclosing while() loop. |
| 318 In that case we won't loop though as we haven't added worker to the |
| 319 worker list, which means nobody could ask us to re-evaluate polling). */ |
| 320 done: |
| 321 if (!locked) { |
| 322 queued_work |= grpc_exec_ctx_flush(exec_ctx); |
| 323 gpr_mu_lock(&pollset->mu); |
| 324 locked = 1; |
| 325 } |
| 326 /* If we're forced to re-evaluate polling (via grpc_pollset_kick with |
| 327 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force |
| 328 a loop */ |
| 329 if (worker.reevaluate_polling_on_wakeup) { |
| 330 worker.reevaluate_polling_on_wakeup = 0; |
| 331 pollset->kicked_without_pollers = 0; |
| 332 if (queued_work || worker.kicked_specifically) { |
| 333 /* If there's queued work on the list, then set the deadline to be |
| 334 immediate so we get back out of the polling loop quickly */ |
| 335 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); |
| 336 } |
| 337 keep_polling = 1; |
| 338 } |
| 339 } |
| 340 if (added_worker) { |
| 341 remove_worker(pollset, &worker); |
| 342 gpr_tls_set(&g_current_thread_worker, 0); |
| 343 } |
| 344 /* release wakeup fd to the local pool */ |
| 345 worker.wakeup_fd->next = pollset->local_wakeup_cache; |
| 346 pollset->local_wakeup_cache = worker.wakeup_fd; |
| 347 /* check shutdown conditions */ |
| 348 if (pollset->shutting_down) { |
| 349 if (grpc_pollset_has_workers(pollset)) { |
| 350 grpc_pollset_kick(pollset, NULL); |
| 351 } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { |
| 352 pollset->called_shutdown = 1; |
| 353 gpr_mu_unlock(&pollset->mu); |
| 354 finish_shutdown(exec_ctx, pollset); |
| 355 grpc_exec_ctx_flush(exec_ctx); |
| 356 /* Continuing to access pollset here is safe -- it is the caller's |
| 357 * responsibility to not destroy when it has outstanding calls to |
| 358 * grpc_pollset_work. |
| 359 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ |
| 360 gpr_mu_lock(&pollset->mu); |
| 361 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { |
| 362 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); |
| 363 gpr_mu_unlock(&pollset->mu); |
| 364 grpc_exec_ctx_flush(exec_ctx); |
| 365 gpr_mu_lock(&pollset->mu); |
| 366 } |
| 367 } |
| 368 *worker_hdl = NULL; |
| 369 GPR_TIMER_END("grpc_pollset_work", 0); |
| 370 } |
| 371 |
| 372 void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
| 373 grpc_closure *closure) { |
| 374 GPR_ASSERT(!pollset->shutting_down); |
| 375 pollset->shutting_down = 1; |
| 376 pollset->shutdown_done = closure; |
| 377 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
| 378 if (!grpc_pollset_has_workers(pollset)) { |
| 379 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); |
| 380 } |
| 381 if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && |
| 382 !grpc_pollset_has_workers(pollset)) { |
| 383 pollset->called_shutdown = 1; |
| 384 finish_shutdown(exec_ctx, pollset); |
| 385 } |
| 386 } |
| 387 |
| 388 int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, |
| 389 gpr_timespec now) { |
| 390 gpr_timespec timeout; |
| 391 static const int64_t max_spin_polling_us = 10; |
| 392 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { |
| 393 return -1; |
| 394 } |
| 395 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( |
| 396 max_spin_polling_us, |
| 397 GPR_TIMESPAN))) <= 0) { |
| 398 return 0; |
| 399 } |
| 400 timeout = gpr_time_sub(deadline, now); |
| 401 return gpr_time_to_millis(gpr_time_add( |
| 402 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); |
| 403 } |
| 404 |
| 405 /* |
| 406 * basic_pollset - a vtable that provides polling for zero or one file |
| 407 * descriptor via poll() |
| 408 */ |
| 409 |
| 410 typedef struct grpc_unary_promote_args { |
| 411 const grpc_pollset_vtable *original_vtable; |
| 412 grpc_pollset *pollset; |
| 413 grpc_fd *fd; |
| 414 grpc_closure promotion_closure; |
| 415 } grpc_unary_promote_args; |
| 416 |
| 417 static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, |
| 418 bool success) { |
| 419 grpc_unary_promote_args *up_args = args; |
| 420 const grpc_pollset_vtable *original_vtable = up_args->original_vtable; |
| 421 grpc_pollset *pollset = up_args->pollset; |
| 422 grpc_fd *fd = up_args->fd; |
| 423 |
| 424 /* |
| 425 * This is quite tricky. There are a number of cases to keep in mind here: |
| 426 * 1. fd may have been orphaned |
| 427 * 2. The pollset may no longer be a unary poller (and we can't let case #1 |
| 428 * leak to other pollset types!) |
| 429 * 3. pollset's fd (which may have changed) may have been orphaned |
| 430 * 4. The pollset may be shutting down. |
| 431 */ |
| 432 |
| 433 gpr_mu_lock(&pollset->mu); |
| 434 /* First we need to ensure that nobody is polling concurrently */ |
| 435 GPR_ASSERT(!grpc_pollset_has_workers(pollset)); |
| 436 |
| 437 gpr_free(up_args); |
| 438 /* At this point the pollset may no longer be a unary poller. In that case |
| 439 * we should just call the right add function and be done. */ |
| 440 /* TODO(klempner): If we're not careful this could cause infinite recursion. |
| 441 * That's not a problem for now because empty_pollset has a trivial poller |
| 442 * and we don't have any mechanism to unbecome multipoller. */ |
| 443 pollset->in_flight_cbs--; |
| 444 if (pollset->shutting_down) { |
| 445 /* We don't care about this pollset anymore. */ |
| 446 if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { |
| 447 pollset->called_shutdown = 1; |
| 448 finish_shutdown(exec_ctx, pollset); |
| 449 } |
| 450 } else if (grpc_fd_is_orphaned(fd)) { |
| 451 /* Don't try to add it to anything, we'll drop our ref on it below */ |
| 452 } else if (pollset->vtable != original_vtable) { |
| 453 pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); |
| 454 } else if (fd != pollset->data.ptr) { |
| 455 grpc_fd *fds[2]; |
| 456 fds[0] = pollset->data.ptr; |
| 457 fds[1] = fd; |
| 458 |
| 459 if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { |
| 460 grpc_platform_become_multipoller(exec_ctx, pollset, fds, |
| 461 GPR_ARRAY_SIZE(fds)); |
| 462 GRPC_FD_UNREF(fds[0], "basicpoll"); |
| 463 } else { |
| 464 /* old fd is orphaned and we haven't cleaned it up until now, so remain a |
| 465 * unary poller */ |
| 466 /* Note that it is possible that fds[1] is also orphaned at this point. |
| 467 * That's okay, we'll correct it at the next add or poll. */ |
| 468 if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); |
| 469 pollset->data.ptr = fd; |
| 470 GRPC_FD_REF(fd, "basicpoll"); |
| 471 } |
| 472 } |
| 473 |
| 474 gpr_mu_unlock(&pollset->mu); |
| 475 |
| 476 /* Matching ref in basic_pollset_add_fd */ |
| 477 GRPC_FD_UNREF(fd, "basicpoll_add"); |
| 478 } |
| 479 |
| 480 static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
| 481 grpc_fd *fd, int and_unlock_pollset) { |
| 482 grpc_unary_promote_args *up_args; |
| 483 GPR_ASSERT(fd); |
| 484 if (fd == pollset->data.ptr) goto exit; |
| 485 |
| 486 if (!grpc_pollset_has_workers(pollset)) { |
| 487 /* Fast path -- no in flight cbs */ |
| 488 /* TODO(klempner): Comment this out and fix any test failures or establish |
| 489 * they are due to timing issues */ |
| 490 grpc_fd *fds[2]; |
| 491 fds[0] = pollset->data.ptr; |
| 492 fds[1] = fd; |
| 493 |
| 494 if (fds[0] == NULL) { |
| 495 pollset->data.ptr = fd; |
| 496 GRPC_FD_REF(fd, "basicpoll"); |
| 497 } else if (!grpc_fd_is_orphaned(fds[0])) { |
| 498 grpc_platform_become_multipoller(exec_ctx, pollset, fds, |
| 499 GPR_ARRAY_SIZE(fds)); |
| 500 GRPC_FD_UNREF(fds[0], "basicpoll"); |
| 501 } else { |
| 502 /* old fd is orphaned and we haven't cleaned it up until now, so remain a |
| 503 * unary poller */ |
| 504 GRPC_FD_UNREF(fds[0], "basicpoll"); |
| 505 pollset->data.ptr = fd; |
| 506 GRPC_FD_REF(fd, "basicpoll"); |
| 507 } |
| 508 goto exit; |
| 509 } |
| 510 |
| 511 /* Now we need to promote. This needs to happen when we're not polling. Since |
| 512 * this may be called from poll, the wait needs to happen asynchronously. */ |
| 513 GRPC_FD_REF(fd, "basicpoll_add"); |
| 514 pollset->in_flight_cbs++; |
| 515 up_args = gpr_malloc(sizeof(*up_args)); |
| 516 up_args->fd = fd; |
| 517 up_args->original_vtable = pollset->vtable; |
| 518 up_args->pollset = pollset; |
| 519 up_args->promotion_closure.cb = basic_do_promote; |
| 520 up_args->promotion_closure.cb_arg = up_args; |
| 521 |
| 522 grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); |
| 523 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
| 524 |
| 525 exit: |
| 526 if (and_unlock_pollset) { |
| 527 gpr_mu_unlock(&pollset->mu); |
| 528 } |
| 529 } |
| 530 |
| 531 static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, |
| 532 grpc_pollset *pollset, |
| 533 grpc_pollset_worker *worker, |
| 534 gpr_timespec deadline, |
| 535 gpr_timespec now) { |
| 536 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) |
| 537 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) |
| 538 |
| 539 struct pollfd pfd[3]; |
| 540 grpc_fd *fd; |
| 541 grpc_fd_watcher fd_watcher; |
| 542 int timeout; |
| 543 int r; |
| 544 nfds_t nfds; |
| 545 |
| 546 fd = pollset->data.ptr; |
| 547 if (fd && grpc_fd_is_orphaned(fd)) { |
| 548 GRPC_FD_UNREF(fd, "basicpoll"); |
| 549 fd = pollset->data.ptr = NULL; |
| 550 } |
| 551 timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); |
| 552 pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); |
| 553 pfd[0].events = POLLIN; |
| 554 pfd[0].revents = 0; |
| 555 pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); |
| 556 pfd[1].events = POLLIN; |
| 557 pfd[1].revents = 0; |
| 558 nfds = 2; |
| 559 if (fd) { |
| 560 pfd[2].fd = fd->fd; |
| 561 pfd[2].revents = 0; |
| 562 GRPC_FD_REF(fd, "basicpoll_begin"); |
| 563 gpr_mu_unlock(&pollset->mu); |
| 564 pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, |
| 565 POLLOUT, &fd_watcher); |
| 566 if (pfd[2].events != 0) { |
| 567 nfds++; |
| 568 } |
| 569 } else { |
| 570 gpr_mu_unlock(&pollset->mu); |
| 571 } |
| 572 |
| 573 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid |
| 574 even going into the blocking annotation if possible */ |
| 575 /* poll fd count (argument 2) is shortened by one if we have no events |
| 576 to poll on - such that it only includes the kicker */ |
| 577 GPR_TIMER_BEGIN("poll", 0); |
| 578 GRPC_SCHEDULING_START_BLOCKING_REGION; |
| 579 r = grpc_poll_function(pfd, nfds, timeout); |
| 580 GRPC_SCHEDULING_END_BLOCKING_REGION; |
| 581 GPR_TIMER_END("poll", 0); |
| 582 |
| 583 if (r < 0) { |
| 584 if (errno != EINTR) { |
| 585 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); |
| 586 } |
| 587 if (fd) { |
| 588 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
| 589 } |
| 590 } else if (r == 0) { |
| 591 if (fd) { |
| 592 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
| 593 } |
| 594 } else { |
| 595 if (pfd[0].revents & POLLIN_CHECK) { |
| 596 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); |
| 597 } |
| 598 if (pfd[1].revents & POLLIN_CHECK) { |
| 599 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); |
| 600 } |
| 601 if (nfds > 2) { |
| 602 grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, |
| 603 pfd[2].revents & POLLOUT_CHECK); |
| 604 } else if (fd) { |
| 605 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
| 606 } |
| 607 } |
| 608 |
| 609 if (fd) { |
| 610 GRPC_FD_UNREF(fd, "basicpoll_begin"); |
| 611 } |
| 612 } |
| 613 |
| 614 static void basic_pollset_destroy(grpc_pollset *pollset) { |
| 615 if (pollset->data.ptr != NULL) { |
| 616 GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); |
| 617 pollset->data.ptr = NULL; |
| 618 } |
| 619 } |
| 620 |
| 621 static const grpc_pollset_vtable basic_pollset = { |
| 622 basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, |
| 623 basic_pollset_destroy, basic_pollset_destroy}; |
| 624 |
| 625 static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { |
| 626 pollset->vtable = &basic_pollset; |
| 627 pollset->data.ptr = fd_or_null; |
| 628 if (fd_or_null != NULL) { |
| 629 GRPC_FD_REF(fd_or_null, "basicpoll"); |
| 630 } |
| 631 } |
| 632 |
| 633 #endif /* GPR_POSIX_POLLSET */ |
OLD | NEW |