Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: third_party/grpc/src/core/iomgr/fd_posix.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <grpc/support/port_platform.h>
35
36 #ifdef GPR_POSIX_SOCKET
37
38 #include "src/core/iomgr/fd_posix.h"
39
40 #include <assert.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <grpc/support/alloc.h>
45 #include <grpc/support/log.h>
46 #include <grpc/support/string_util.h>
47 #include <grpc/support/useful.h>
48
49 #include "src/core/iomgr/pollset_posix.h"
50
51 #define CLOSURE_NOT_READY ((grpc_closure *)0)
52 #define CLOSURE_READY ((grpc_closure *)1)
53
54 /* We need to keep a freelist not because of any concerns of malloc performance
55 * but instead so that implementations with multiple threads in (for example)
56 * epoll_wait deal with the race between pollset removal and incoming poll
57 * notifications.
58 *
59 * The problem is that the poller ultimately holds a reference to this
60 * object, so it is very difficult to know when is safe to free it, at least
61 * without some expensive synchronization.
62 *
63 * If we keep the object freelisted, in the worst case losing this race just
64 * becomes a spurious read notification on a reused fd.
65 */
66 /* TODO(klempner): We could use some form of polling generation count to know
67 * when these are safe to free. */
68 /* TODO(klempner): Consider disabling freelisting if we don't have multiple
69 * threads in poll on the same fd */
70 /* TODO(klempner): Batch these allocations to reduce fragmentation */
71 static grpc_fd *fd_freelist = NULL;
72 static gpr_mu fd_freelist_mu;
73
74 static void freelist_fd(grpc_fd *fd) {
75 gpr_mu_lock(&fd_freelist_mu);
76 fd->freelist_next = fd_freelist;
77 fd_freelist = fd;
78 grpc_iomgr_unregister_object(&fd->iomgr_object);
79 gpr_mu_unlock(&fd_freelist_mu);
80 }
81
82 static grpc_fd *alloc_fd(int fd) {
83 grpc_fd *r = NULL;
84 gpr_mu_lock(&fd_freelist_mu);
85 if (fd_freelist != NULL) {
86 r = fd_freelist;
87 fd_freelist = fd_freelist->freelist_next;
88 }
89 gpr_mu_unlock(&fd_freelist_mu);
90 if (r == NULL) {
91 r = gpr_malloc(sizeof(grpc_fd));
92 gpr_mu_init(&r->mu);
93 }
94
95 gpr_atm_rel_store(&r->refst, 1);
96 r->shutdown = 0;
97 r->read_closure = CLOSURE_NOT_READY;
98 r->write_closure = CLOSURE_NOT_READY;
99 r->fd = fd;
100 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
101 &r->inactive_watcher_root;
102 r->freelist_next = NULL;
103 r->read_watcher = r->write_watcher = NULL;
104 r->on_done_closure = NULL;
105 r->closed = 0;
106 r->released = 0;
107 return r;
108 }
109
110 static void destroy(grpc_fd *fd) {
111 gpr_mu_destroy(&fd->mu);
112 gpr_free(fd);
113 }
114
115 #ifdef GRPC_FD_REF_COUNT_DEBUG
116 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
117 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
118 static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
119 int line) {
120 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
121 gpr_atm_no_barrier_load(&fd->refst),
122 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
123 #else
124 #define REF_BY(fd, n, reason) ref_by(fd, n)
125 #define UNREF_BY(fd, n, reason) unref_by(fd, n)
126 static void ref_by(grpc_fd *fd, int n) {
127 #endif
128 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
129 }
130
131 #ifdef GRPC_FD_REF_COUNT_DEBUG
132 static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
133 int line) {
134 gpr_atm old;
135 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
136 gpr_atm_no_barrier_load(&fd->refst),
137 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
138 #else
139 static void unref_by(grpc_fd *fd, int n) {
140 gpr_atm old;
141 #endif
142 old = gpr_atm_full_fetch_add(&fd->refst, -n);
143 if (old == n) {
144 freelist_fd(fd);
145 } else {
146 GPR_ASSERT(old > n);
147 }
148 }
149
150 void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
151
152 void grpc_fd_global_shutdown(void) {
153 gpr_mu_lock(&fd_freelist_mu);
154 gpr_mu_unlock(&fd_freelist_mu);
155 while (fd_freelist != NULL) {
156 grpc_fd *fd = fd_freelist;
157 fd_freelist = fd_freelist->freelist_next;
158 destroy(fd);
159 }
160 gpr_mu_destroy(&fd_freelist_mu);
161 }
162
163 grpc_fd *grpc_fd_create(int fd, const char *name) {
164 grpc_fd *r = alloc_fd(fd);
165 char *name2;
166 gpr_asprintf(&name2, "%s fd=%d", name, fd);
167 grpc_iomgr_register_object(&r->iomgr_object, name2);
168 gpr_free(name2);
169 #ifdef GRPC_FD_REF_COUNT_DEBUG
170 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
171 #endif
172 return r;
173 }
174
175 int grpc_fd_is_orphaned(grpc_fd *fd) {
176 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
177 }
178
179 static void pollset_kick_locked(grpc_fd_watcher *watcher) {
180 gpr_mu_lock(&watcher->pollset->mu);
181 GPR_ASSERT(watcher->worker);
182 grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
183 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
184 gpr_mu_unlock(&watcher->pollset->mu);
185 }
186
187 static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
188 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
189 pollset_kick_locked(fd->inactive_watcher_root.next);
190 } else if (fd->read_watcher) {
191 pollset_kick_locked(fd->read_watcher);
192 } else if (fd->write_watcher) {
193 pollset_kick_locked(fd->write_watcher);
194 }
195 }
196
197 static void wake_all_watchers_locked(grpc_fd *fd) {
198 grpc_fd_watcher *watcher;
199 for (watcher = fd->inactive_watcher_root.next;
200 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
201 pollset_kick_locked(watcher);
202 }
203 if (fd->read_watcher) {
204 pollset_kick_locked(fd->read_watcher);
205 }
206 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
207 pollset_kick_locked(fd->write_watcher);
208 }
209 }
210
211 static int has_watchers(grpc_fd *fd) {
212 return fd->read_watcher != NULL || fd->write_watcher != NULL ||
213 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
214 }
215
216 static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
217 fd->closed = 1;
218 if (!fd->released) {
219 close(fd->fd);
220 } else {
221 grpc_remove_fd_from_all_epoll_sets(fd->fd);
222 }
223 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
224 }
225
226 int grpc_fd_wrapped_fd(grpc_fd *fd) {
227 if (fd->released || fd->closed) {
228 return -1;
229 } else {
230 return fd->fd;
231 }
232 }
233
234 void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
235 int *release_fd, const char *reason) {
236 fd->on_done_closure = on_done;
237 fd->released = release_fd != NULL;
238 if (!fd->released) {
239 shutdown(fd->fd, SHUT_RDWR);
240 } else {
241 *release_fd = fd->fd;
242 }
243 gpr_mu_lock(&fd->mu);
244 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
245 if (!has_watchers(fd)) {
246 close_fd_locked(exec_ctx, fd);
247 } else {
248 wake_all_watchers_locked(fd);
249 }
250 gpr_mu_unlock(&fd->mu);
251 UNREF_BY(fd, 2, reason); /* drop the reference */
252 }
253
254 /* increment refcount by two to avoid changing the orphan bit */
255 #ifdef GRPC_FD_REF_COUNT_DEBUG
256 void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) {
257 ref_by(fd, 2, reason, file, line);
258 }
259
260 void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file,
261 int line) {
262 unref_by(fd, 2, reason, file, line);
263 }
264 #else
265 void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
266
267 void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
268 #endif
269
270 static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
271 grpc_closure **st, grpc_closure *closure) {
272 if (*st == CLOSURE_NOT_READY) {
273 /* not ready ==> switch to a waiting state by setting the closure */
274 *st = closure;
275 } else if (*st == CLOSURE_READY) {
276 /* already ready ==> queue the closure to run immediately */
277 *st = CLOSURE_NOT_READY;
278 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
279 maybe_wake_one_watcher_locked(fd);
280 } else {
281 /* upcallptr was set to a different closure. This is an error! */
282 gpr_log(GPR_ERROR,
283 "User called a notify_on function with a previous callback still "
284 "pending");
285 abort();
286 }
287 }
288
289 /* returns 1 if state becomes not ready */
290 static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
291 grpc_closure **st) {
292 if (*st == CLOSURE_READY) {
293 /* duplicate ready ==> ignore */
294 return 0;
295 } else if (*st == CLOSURE_NOT_READY) {
296 /* not ready, and not waiting ==> flag ready */
297 *st = CLOSURE_READY;
298 return 0;
299 } else {
300 /* waiting ==> queue closure */
301 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
302 *st = CLOSURE_NOT_READY;
303 return 1;
304 }
305 }
306
307 static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
308 /* only one set_ready can be active at once (but there may be a racing
309 notify_on) */
310 gpr_mu_lock(&fd->mu);
311 set_ready_locked(exec_ctx, fd, st);
312 gpr_mu_unlock(&fd->mu);
313 }
314
315 void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
316 gpr_mu_lock(&fd->mu);
317 GPR_ASSERT(!fd->shutdown);
318 fd->shutdown = 1;
319 set_ready_locked(exec_ctx, fd, &fd->read_closure);
320 set_ready_locked(exec_ctx, fd, &fd->write_closure);
321 gpr_mu_unlock(&fd->mu);
322 }
323
324 void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
325 grpc_closure *closure) {
326 gpr_mu_lock(&fd->mu);
327 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
328 gpr_mu_unlock(&fd->mu);
329 }
330
331 void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
332 grpc_closure *closure) {
333 gpr_mu_lock(&fd->mu);
334 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
335 gpr_mu_unlock(&fd->mu);
336 }
337
338 uint32_t grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
339 grpc_pollset_worker *worker, uint32_t read_mask,
340 uint32_t write_mask, grpc_fd_watcher *watcher) {
341 uint32_t mask = 0;
342 grpc_closure *cur;
343 int requested;
344 /* keep track of pollers that have requested our events, in case they change
345 */
346 GRPC_FD_REF(fd, "poll");
347
348 gpr_mu_lock(&fd->mu);
349
350 /* if we are shutdown, then don't add to the watcher set */
351 if (fd->shutdown) {
352 watcher->fd = NULL;
353 watcher->pollset = NULL;
354 watcher->worker = NULL;
355 gpr_mu_unlock(&fd->mu);
356 GRPC_FD_UNREF(fd, "poll");
357 return 0;
358 }
359
360 /* if there is nobody polling for read, but we need to, then start doing so */
361 cur = fd->read_closure;
362 requested = cur != CLOSURE_READY;
363 if (read_mask && fd->read_watcher == NULL && requested) {
364 fd->read_watcher = watcher;
365 mask |= read_mask;
366 }
367 /* if there is nobody polling for write, but we need to, then start doing so
368 */
369 cur = fd->write_closure;
370 requested = cur != CLOSURE_READY;
371 if (write_mask && fd->write_watcher == NULL && requested) {
372 fd->write_watcher = watcher;
373 mask |= write_mask;
374 }
375 /* if not polling, remember this watcher in case we need someone to later */
376 if (mask == 0 && worker != NULL) {
377 watcher->next = &fd->inactive_watcher_root;
378 watcher->prev = watcher->next->prev;
379 watcher->next->prev = watcher->prev->next = watcher;
380 }
381 watcher->pollset = pollset;
382 watcher->worker = worker;
383 watcher->fd = fd;
384 gpr_mu_unlock(&fd->mu);
385
386 return mask;
387 }
388
389 void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
390 int got_read, int got_write) {
391 int was_polling = 0;
392 int kick = 0;
393 grpc_fd *fd = watcher->fd;
394
395 if (fd == NULL) {
396 return;
397 }
398
399 gpr_mu_lock(&fd->mu);
400
401 if (watcher == fd->read_watcher) {
402 /* remove read watcher, kick if we still need a read */
403 was_polling = 1;
404 if (!got_read) {
405 kick = 1;
406 }
407 fd->read_watcher = NULL;
408 }
409 if (watcher == fd->write_watcher) {
410 /* remove write watcher, kick if we still need a write */
411 was_polling = 1;
412 if (!got_write) {
413 kick = 1;
414 }
415 fd->write_watcher = NULL;
416 }
417 if (!was_polling && watcher->worker != NULL) {
418 /* remove from inactive list */
419 watcher->next->prev = watcher->prev;
420 watcher->prev->next = watcher->next;
421 }
422 if (got_read) {
423 if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
424 kick = 1;
425 }
426 }
427 if (got_write) {
428 if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
429 kick = 1;
430 }
431 }
432 if (kick) {
433 maybe_wake_one_watcher_locked(fd);
434 }
435 if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
436 close_fd_locked(exec_ctx, fd);
437 }
438 gpr_mu_unlock(&fd->mu);
439
440 GRPC_FD_UNREF(fd, "poll");
441 }
442
443 void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
444 set_ready(exec_ctx, fd, &fd->read_closure);
445 }
446
447 void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
448 set_ready(exec_ctx, fd, &fd->write_closure);
449 }
450
451 #endif
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/iomgr/fd_posix.h ('k') | third_party/grpc/src/core/iomgr/iocp_windows.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698