| Index: third_party/grpc/src/core/iomgr/fd_posix.c
|
| diff --git a/third_party/grpc/src/core/iomgr/fd_posix.c b/third_party/grpc/src/core/iomgr/fd_posix.c
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..4ba7c5df9435a9ea3af3c95819fc9228a287a584
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/core/iomgr/fd_posix.c
|
| @@ -0,0 +1,451 @@
|
| +/*
|
| + *
|
| + * Copyright 2015-2016, Google Inc.
|
| + * All rights reserved.
|
| + *
|
| + * Redistribution and use in source and binary forms, with or without
|
| + * modification, are permitted provided that the following conditions are
|
| + * met:
|
| + *
|
| + * * Redistributions of source code must retain the above copyright
|
| + * notice, this list of conditions and the following disclaimer.
|
| + * * Redistributions in binary form must reproduce the above
|
| + * copyright notice, this list of conditions and the following disclaimer
|
| + * in the documentation and/or other materials provided with the
|
| + * distribution.
|
| + * * Neither the name of Google Inc. nor the names of its
|
| + * contributors may be used to endorse or promote products derived from
|
| + * this software without specific prior written permission.
|
| + *
|
| + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| + *
|
| + */
|
| +
|
| +#include <grpc/support/port_platform.h>
|
| +
|
| +#ifdef GPR_POSIX_SOCKET
|
| +
|
| +#include "src/core/iomgr/fd_posix.h"
|
| +
|
| +#include <assert.h>
|
| +#include <sys/socket.h>
|
| +#include <unistd.h>
|
| +
|
| +#include <grpc/support/alloc.h>
|
| +#include <grpc/support/log.h>
|
| +#include <grpc/support/string_util.h>
|
| +#include <grpc/support/useful.h>
|
| +
|
| +#include "src/core/iomgr/pollset_posix.h"
|
| +
|
| +#define CLOSURE_NOT_READY ((grpc_closure *)0)
|
| +#define CLOSURE_READY ((grpc_closure *)1)
|
| +
|
| +/* We need to keep a freelist not because of any concerns of malloc performance
|
| + * but instead so that implementations with multiple threads in (for example)
|
| + * epoll_wait deal with the race between pollset removal and incoming poll
|
| + * notifications.
|
| + *
|
| + * The problem is that the poller ultimately holds a reference to this
|
| + * object, so it is very difficult to know when is safe to free it, at least
|
| + * without some expensive synchronization.
|
| + *
|
| + * If we keep the object freelisted, in the worst case losing this race just
|
| + * becomes a spurious read notification on a reused fd.
|
| + */
|
| +/* TODO(klempner): We could use some form of polling generation count to know
|
| + * when these are safe to free. */
|
| +/* TODO(klempner): Consider disabling freelisting if we don't have multiple
|
| + * threads in poll on the same fd */
|
| +/* TODO(klempner): Batch these allocations to reduce fragmentation */
|
| +static grpc_fd *fd_freelist = NULL;
|
| +static gpr_mu fd_freelist_mu;
|
| +
|
| +static void freelist_fd(grpc_fd *fd) {
|
| + gpr_mu_lock(&fd_freelist_mu);
|
| + fd->freelist_next = fd_freelist;
|
| + fd_freelist = fd;
|
| + grpc_iomgr_unregister_object(&fd->iomgr_object);
|
| + gpr_mu_unlock(&fd_freelist_mu);
|
| +}
|
| +
|
| +static grpc_fd *alloc_fd(int fd) {
|
| + grpc_fd *r = NULL;
|
| + gpr_mu_lock(&fd_freelist_mu);
|
| + if (fd_freelist != NULL) {
|
| + r = fd_freelist;
|
| + fd_freelist = fd_freelist->freelist_next;
|
| + }
|
| + gpr_mu_unlock(&fd_freelist_mu);
|
| + if (r == NULL) {
|
| + r = gpr_malloc(sizeof(grpc_fd));
|
| + gpr_mu_init(&r->mu);
|
| + }
|
| +
|
| + gpr_atm_rel_store(&r->refst, 1);
|
| + r->shutdown = 0;
|
| + r->read_closure = CLOSURE_NOT_READY;
|
| + r->write_closure = CLOSURE_NOT_READY;
|
| + r->fd = fd;
|
| + r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
|
| + &r->inactive_watcher_root;
|
| + r->freelist_next = NULL;
|
| + r->read_watcher = r->write_watcher = NULL;
|
| + r->on_done_closure = NULL;
|
| + r->closed = 0;
|
| + r->released = 0;
|
| + return r;
|
| +}
|
| +
|
| +static void destroy(grpc_fd *fd) {
|
| + gpr_mu_destroy(&fd->mu);
|
| + gpr_free(fd);
|
| +}
|
| +
|
| +#ifdef GRPC_FD_REF_COUNT_DEBUG
|
| +#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
|
| +#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
|
| +static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
|
| + int line) {
|
| + gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
|
| + gpr_atm_no_barrier_load(&fd->refst),
|
| + gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
|
| +#else
|
| +#define REF_BY(fd, n, reason) ref_by(fd, n)
|
| +#define UNREF_BY(fd, n, reason) unref_by(fd, n)
|
| +static void ref_by(grpc_fd *fd, int n) {
|
| +#endif
|
| + GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
|
| +}
|
| +
|
| +#ifdef GRPC_FD_REF_COUNT_DEBUG
|
| +static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
|
| + int line) {
|
| + gpr_atm old;
|
| + gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
|
| + gpr_atm_no_barrier_load(&fd->refst),
|
| + gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
|
| +#else
|
| +static void unref_by(grpc_fd *fd, int n) {
|
| + gpr_atm old;
|
| +#endif
|
| + old = gpr_atm_full_fetch_add(&fd->refst, -n);
|
| + if (old == n) {
|
| + freelist_fd(fd);
|
| + } else {
|
| + GPR_ASSERT(old > n);
|
| + }
|
| +}
|
| +
|
| +void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
|
| +
|
| +void grpc_fd_global_shutdown(void) {
|
| + gpr_mu_lock(&fd_freelist_mu);
|
| + gpr_mu_unlock(&fd_freelist_mu);
|
| + while (fd_freelist != NULL) {
|
| + grpc_fd *fd = fd_freelist;
|
| + fd_freelist = fd_freelist->freelist_next;
|
| + destroy(fd);
|
| + }
|
| + gpr_mu_destroy(&fd_freelist_mu);
|
| +}
|
| +
|
| +grpc_fd *grpc_fd_create(int fd, const char *name) {
|
| + grpc_fd *r = alloc_fd(fd);
|
| + char *name2;
|
| + gpr_asprintf(&name2, "%s fd=%d", name, fd);
|
| + grpc_iomgr_register_object(&r->iomgr_object, name2);
|
| + gpr_free(name2);
|
| +#ifdef GRPC_FD_REF_COUNT_DEBUG
|
| + gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
|
| +#endif
|
| + return r;
|
| +}
|
| +
|
| +int grpc_fd_is_orphaned(grpc_fd *fd) {
|
| + return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
|
| +}
|
| +
|
| +static void pollset_kick_locked(grpc_fd_watcher *watcher) {
|
| + gpr_mu_lock(&watcher->pollset->mu);
|
| + GPR_ASSERT(watcher->worker);
|
| + grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
|
| + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
|
| + gpr_mu_unlock(&watcher->pollset->mu);
|
| +}
|
| +
|
| +static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
|
| + if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
|
| + pollset_kick_locked(fd->inactive_watcher_root.next);
|
| + } else if (fd->read_watcher) {
|
| + pollset_kick_locked(fd->read_watcher);
|
| + } else if (fd->write_watcher) {
|
| + pollset_kick_locked(fd->write_watcher);
|
| + }
|
| +}
|
| +
|
| +static void wake_all_watchers_locked(grpc_fd *fd) {
|
| + grpc_fd_watcher *watcher;
|
| + for (watcher = fd->inactive_watcher_root.next;
|
| + watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
|
| + pollset_kick_locked(watcher);
|
| + }
|
| + if (fd->read_watcher) {
|
| + pollset_kick_locked(fd->read_watcher);
|
| + }
|
| + if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
|
| + pollset_kick_locked(fd->write_watcher);
|
| + }
|
| +}
|
| +
|
| +static int has_watchers(grpc_fd *fd) {
|
| + return fd->read_watcher != NULL || fd->write_watcher != NULL ||
|
| + fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
|
| +}
|
| +
|
| +static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
|
| + fd->closed = 1;
|
| + if (!fd->released) {
|
| + close(fd->fd);
|
| + } else {
|
| + grpc_remove_fd_from_all_epoll_sets(fd->fd);
|
| + }
|
| + grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
|
| +}
|
| +
|
| +int grpc_fd_wrapped_fd(grpc_fd *fd) {
|
| + if (fd->released || fd->closed) {
|
| + return -1;
|
| + } else {
|
| + return fd->fd;
|
| + }
|
| +}
|
| +
|
| +void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
|
| + int *release_fd, const char *reason) {
|
| + fd->on_done_closure = on_done;
|
| + fd->released = release_fd != NULL;
|
| + if (!fd->released) {
|
| + shutdown(fd->fd, SHUT_RDWR);
|
| + } else {
|
| + *release_fd = fd->fd;
|
| + }
|
| + gpr_mu_lock(&fd->mu);
|
| + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
|
| + if (!has_watchers(fd)) {
|
| + close_fd_locked(exec_ctx, fd);
|
| + } else {
|
| + wake_all_watchers_locked(fd);
|
| + }
|
| + gpr_mu_unlock(&fd->mu);
|
| + UNREF_BY(fd, 2, reason); /* drop the reference */
|
| +}
|
| +
|
| +/* increment refcount by two to avoid changing the orphan bit */
|
| +#ifdef GRPC_FD_REF_COUNT_DEBUG
|
| +void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) {
|
| + ref_by(fd, 2, reason, file, line);
|
| +}
|
| +
|
| +void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file,
|
| + int line) {
|
| + unref_by(fd, 2, reason, file, line);
|
| +}
|
| +#else
|
| +void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
|
| +
|
| +void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
|
| +#endif
|
| +
|
| +static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
|
| + grpc_closure **st, grpc_closure *closure) {
|
| + if (*st == CLOSURE_NOT_READY) {
|
| + /* not ready ==> switch to a waiting state by setting the closure */
|
| + *st = closure;
|
| + } else if (*st == CLOSURE_READY) {
|
| + /* already ready ==> queue the closure to run immediately */
|
| + *st = CLOSURE_NOT_READY;
|
| + grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
|
| + maybe_wake_one_watcher_locked(fd);
|
| + } else {
|
| + /* upcallptr was set to a different closure. This is an error! */
|
| + gpr_log(GPR_ERROR,
|
| + "User called a notify_on function with a previous callback still "
|
| + "pending");
|
| + abort();
|
| + }
|
| +}
|
| +
|
| +/* returns 1 if state becomes not ready */
|
| +static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
|
| + grpc_closure **st) {
|
| + if (*st == CLOSURE_READY) {
|
| + /* duplicate ready ==> ignore */
|
| + return 0;
|
| + } else if (*st == CLOSURE_NOT_READY) {
|
| + /* not ready, and not waiting ==> flag ready */
|
| + *st = CLOSURE_READY;
|
| + return 0;
|
| + } else {
|
| + /* waiting ==> queue closure */
|
| + grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
|
| + *st = CLOSURE_NOT_READY;
|
| + return 1;
|
| + }
|
| +}
|
| +
|
| +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
|
| + /* only one set_ready can be active at once (but there may be a racing
|
| + notify_on) */
|
| + gpr_mu_lock(&fd->mu);
|
| + set_ready_locked(exec_ctx, fd, st);
|
| + gpr_mu_unlock(&fd->mu);
|
| +}
|
| +
|
| +void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
|
| + gpr_mu_lock(&fd->mu);
|
| + GPR_ASSERT(!fd->shutdown);
|
| + fd->shutdown = 1;
|
| + set_ready_locked(exec_ctx, fd, &fd->read_closure);
|
| + set_ready_locked(exec_ctx, fd, &fd->write_closure);
|
| + gpr_mu_unlock(&fd->mu);
|
| +}
|
| +
|
| +void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
|
| + grpc_closure *closure) {
|
| + gpr_mu_lock(&fd->mu);
|
| + notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
|
| + gpr_mu_unlock(&fd->mu);
|
| +}
|
| +
|
| +void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
|
| + grpc_closure *closure) {
|
| + gpr_mu_lock(&fd->mu);
|
| + notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
|
| + gpr_mu_unlock(&fd->mu);
|
| +}
|
| +
|
| +uint32_t grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
|
| + grpc_pollset_worker *worker, uint32_t read_mask,
|
| + uint32_t write_mask, grpc_fd_watcher *watcher) {
|
| + uint32_t mask = 0;
|
| + grpc_closure *cur;
|
| + int requested;
|
| + /* keep track of pollers that have requested our events, in case they change
|
| + */
|
| + GRPC_FD_REF(fd, "poll");
|
| +
|
| + gpr_mu_lock(&fd->mu);
|
| +
|
| + /* if we are shutdown, then don't add to the watcher set */
|
| + if (fd->shutdown) {
|
| + watcher->fd = NULL;
|
| + watcher->pollset = NULL;
|
| + watcher->worker = NULL;
|
| + gpr_mu_unlock(&fd->mu);
|
| + GRPC_FD_UNREF(fd, "poll");
|
| + return 0;
|
| + }
|
| +
|
| + /* if there is nobody polling for read, but we need to, then start doing so */
|
| + cur = fd->read_closure;
|
| + requested = cur != CLOSURE_READY;
|
| + if (read_mask && fd->read_watcher == NULL && requested) {
|
| + fd->read_watcher = watcher;
|
| + mask |= read_mask;
|
| + }
|
| + /* if there is nobody polling for write, but we need to, then start doing so
|
| + */
|
| + cur = fd->write_closure;
|
| + requested = cur != CLOSURE_READY;
|
| + if (write_mask && fd->write_watcher == NULL && requested) {
|
| + fd->write_watcher = watcher;
|
| + mask |= write_mask;
|
| + }
|
| + /* if not polling, remember this watcher in case we need someone to later */
|
| + if (mask == 0 && worker != NULL) {
|
| + watcher->next = &fd->inactive_watcher_root;
|
| + watcher->prev = watcher->next->prev;
|
| + watcher->next->prev = watcher->prev->next = watcher;
|
| + }
|
| + watcher->pollset = pollset;
|
| + watcher->worker = worker;
|
| + watcher->fd = fd;
|
| + gpr_mu_unlock(&fd->mu);
|
| +
|
| + return mask;
|
| +}
|
| +
|
| +void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
|
| + int got_read, int got_write) {
|
| + int was_polling = 0;
|
| + int kick = 0;
|
| + grpc_fd *fd = watcher->fd;
|
| +
|
| + if (fd == NULL) {
|
| + return;
|
| + }
|
| +
|
| + gpr_mu_lock(&fd->mu);
|
| +
|
| + if (watcher == fd->read_watcher) {
|
| + /* remove read watcher, kick if we still need a read */
|
| + was_polling = 1;
|
| + if (!got_read) {
|
| + kick = 1;
|
| + }
|
| + fd->read_watcher = NULL;
|
| + }
|
| + if (watcher == fd->write_watcher) {
|
| + /* remove write watcher, kick if we still need a write */
|
| + was_polling = 1;
|
| + if (!got_write) {
|
| + kick = 1;
|
| + }
|
| + fd->write_watcher = NULL;
|
| + }
|
| + if (!was_polling && watcher->worker != NULL) {
|
| + /* remove from inactive list */
|
| + watcher->next->prev = watcher->prev;
|
| + watcher->prev->next = watcher->next;
|
| + }
|
| + if (got_read) {
|
| + if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
|
| + kick = 1;
|
| + }
|
| + }
|
| + if (got_write) {
|
| + if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
|
| + kick = 1;
|
| + }
|
| + }
|
| + if (kick) {
|
| + maybe_wake_one_watcher_locked(fd);
|
| + }
|
| + if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
|
| + close_fd_locked(exec_ctx, fd);
|
| + }
|
| + gpr_mu_unlock(&fd->mu);
|
| +
|
| + GRPC_FD_UNREF(fd, "poll");
|
| +}
|
| +
|
| +void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
|
| + set_ready(exec_ctx, fd, &fd->read_closure);
|
| +}
|
| +
|
| +void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
|
| + set_ready(exec_ctx, fd, &fd->write_closure);
|
| +}
|
| +
|
| +#endif
|
|
|