Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(112)

Side by Side Diff: third_party/grpc/src/core/iomgr/udp_server.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
35 #ifndef _GNU_SOURCE
36 #define _GNU_SOURCE
37 #endif
38
39 #include <grpc/support/port_platform.h>
40
41 #ifdef GRPC_NEED_UDP
42 #ifdef GPR_POSIX_SOCKET
43
44 #include "src/core/iomgr/udp_server.h"
45
46 #include <errno.h>
47 #include <fcntl.h>
48 #include <limits.h>
49 #include <netinet/in.h>
50 #include <netinet/tcp.h>
51 #include <string.h>
52 #include <sys/socket.h>
53 #include <sys/stat.h>
54 #include <sys/types.h>
55 #include <sys/un.h>
56 #include <unistd.h>
57
58 #include "src/core/iomgr/fd_posix.h"
59 #include "src/core/iomgr/pollset_posix.h"
60 #include "src/core/iomgr/resolve_address.h"
61 #include "src/core/iomgr/sockaddr_utils.h"
62 #include "src/core/iomgr/socket_utils_posix.h"
63 #include "src/core/support/string.h"
64 #include <grpc/support/alloc.h>
65 #include <grpc/support/log.h>
66 #include <grpc/support/sync.h>
67 #include <grpc/support/string_util.h>
68 #include <grpc/support/time.h>
69
70 #define INIT_PORT_CAP 2
71
72 /* one listening port */
73 typedef struct {
74 int fd;
75 grpc_fd *emfd;
76 grpc_udp_server *server;
77 union {
78 uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE];
79 struct sockaddr sockaddr;
80 struct sockaddr_un un;
81 } addr;
82 size_t addr_len;
83 grpc_closure read_closure;
84 grpc_closure destroyed_closure;
85 grpc_udp_server_read_cb read_cb;
86 } server_port;
87
88 static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
89 struct stat st;
90
91 if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
92 unlink(un->sun_path);
93 }
94 }
95
96 /* the overall server */
97 struct grpc_udp_server {
98 gpr_mu mu;
99 gpr_cv cv;
100
101 /* active port count: how many ports are actually still listening */
102 size_t active_ports;
103 /* destroyed port count: how many ports are completely destroyed */
104 size_t destroyed_ports;
105
106 /* is this server shutting down? (boolean) */
107 int shutdown;
108
109 /* all listening ports */
110 server_port *ports;
111 size_t nports;
112 size_t port_capacity;
113
114 /* shutdown callback */
115 grpc_closure *shutdown_complete;
116
117 /* all pollsets interested in new connections */
118 grpc_pollset **pollsets;
119 /* number of pollsets in the pollsets array */
120 size_t pollset_count;
121 /* The parent grpc server */
122 grpc_server *grpc_server;
123 };
124
125 grpc_udp_server *grpc_udp_server_create(void) {
126 grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
127 gpr_mu_init(&s->mu);
128 gpr_cv_init(&s->cv);
129 s->active_ports = 0;
130 s->destroyed_ports = 0;
131 s->shutdown = 0;
132 s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
133 s->nports = 0;
134 s->port_capacity = INIT_PORT_CAP;
135
136 return s;
137 }
138
139 static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
140 grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1, NULL);
141
142 gpr_mu_destroy(&s->mu);
143 gpr_cv_destroy(&s->cv);
144
145 gpr_free(s->ports);
146 gpr_free(s);
147 }
148
149 static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
150 bool success) {
151 grpc_udp_server *s = server;
152 gpr_mu_lock(&s->mu);
153 s->destroyed_ports++;
154 if (s->destroyed_ports == s->nports) {
155 gpr_mu_unlock(&s->mu);
156 finish_shutdown(exec_ctx, s);
157 } else {
158 gpr_mu_unlock(&s->mu);
159 }
160 }
161
162 /* called when all listening endpoints have been shutdown, so no further
163 events will be received on them - at this point it's safe to destroy
164 things */
165 static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
166 size_t i;
167
168 /* delete ALL the things */
169 gpr_mu_lock(&s->mu);
170
171 if (!s->shutdown) {
172 gpr_mu_unlock(&s->mu);
173 return;
174 }
175
176 if (s->nports) {
177 for (i = 0; i < s->nports; i++) {
178 server_port *sp = &s->ports[i];
179 if (sp->addr.sockaddr.sa_family == AF_UNIX) {
180 unlink_if_unix_domain_socket(&sp->addr.un);
181 }
182 sp->destroyed_closure.cb = destroyed_port;
183 sp->destroyed_closure.cb_arg = s;
184 grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
185 "udp_listener_shutdown");
186 }
187 gpr_mu_unlock(&s->mu);
188 } else {
189 gpr_mu_unlock(&s->mu);
190 finish_shutdown(exec_ctx, s);
191 }
192 }
193
194 void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
195 grpc_closure *on_done) {
196 size_t i;
197 gpr_mu_lock(&s->mu);
198
199 GPR_ASSERT(!s->shutdown);
200 s->shutdown = 1;
201
202 s->shutdown_complete = on_done;
203
204 /* shutdown all fd's */
205 if (s->active_ports) {
206 for (i = 0; i < s->nports; i++) {
207 grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
208 }
209 gpr_mu_unlock(&s->mu);
210 } else {
211 gpr_mu_unlock(&s->mu);
212 deactivated_all_ports(exec_ctx, s);
213 }
214 }
215
216 /* Prepare a recently-created socket for listening. */
217 static int prepare_socket(int fd, const struct sockaddr *addr,
218 size_t addr_len) {
219 struct sockaddr_storage sockname_temp;
220 socklen_t sockname_len;
221 int get_local_ip;
222 int rc;
223
224 if (fd < 0) {
225 goto error;
226 }
227
228 if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) {
229 gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
230 strerror(errno));
231 }
232
233 get_local_ip = 1;
234 rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
235 sizeof(get_local_ip));
236 if (rc == 0 && addr->sa_family == AF_INET6) {
237 #if !defined(__APPLE__)
238 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
239 sizeof(get_local_ip));
240 #endif
241 }
242
243 GPR_ASSERT(addr_len < ~(socklen_t)0);
244 if (bind(fd, addr, (socklen_t)addr_len) < 0) {
245 char *addr_str;
246 grpc_sockaddr_to_string(&addr_str, addr, 0);
247 gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
248 gpr_free(addr_str);
249 goto error;
250 }
251
252 sockname_len = sizeof(sockname_temp);
253 if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
254 goto error;
255 }
256
257 return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
258
259 error:
260 if (fd >= 0) {
261 close(fd);
262 }
263 return -1;
264 }
265
266 /* event manager callback when reads are ready */
267 static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
268 server_port *sp = arg;
269
270 if (!success) {
271 gpr_mu_lock(&sp->server->mu);
272 if (0 == --sp->server->active_ports) {
273 gpr_mu_unlock(&sp->server->mu);
274 deactivated_all_ports(exec_ctx, sp->server);
275 } else {
276 gpr_mu_unlock(&sp->server->mu);
277 }
278 return;
279 }
280
281 /* Tell the registered callback that data is available to read. */
282 GPR_ASSERT(sp->read_cb);
283 sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server);
284
285 /* Re-arm the notification event so we get another chance to read. */
286 grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
287 }
288
289 static int add_socket_to_server(grpc_udp_server *s, int fd,
290 const struct sockaddr *addr, size_t addr_len,
291 grpc_udp_server_read_cb read_cb) {
292 server_port *sp;
293 int port;
294 char *addr_str;
295 char *name;
296
297 port = prepare_socket(fd, addr, addr_len);
298 if (port >= 0) {
299 grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
300 gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
301 gpr_free(addr_str);
302 gpr_mu_lock(&s->mu);
303 /* append it to the list under a lock */
304 if (s->nports == s->port_capacity) {
305 s->port_capacity *= 2;
306 s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
307 }
308 sp = &s->ports[s->nports++];
309 sp->server = s;
310 sp->fd = fd;
311 sp->emfd = grpc_fd_create(fd, name);
312 memcpy(sp->addr.untyped, addr, addr_len);
313 sp->addr_len = addr_len;
314 sp->read_cb = read_cb;
315 GPR_ASSERT(sp->emfd);
316 gpr_mu_unlock(&s->mu);
317 gpr_free(name);
318 }
319
320 return port;
321 }
322
323 int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
324 size_t addr_len, grpc_udp_server_read_cb read_cb) {
325 int allocated_port1 = -1;
326 int allocated_port2 = -1;
327 unsigned i;
328 int fd;
329 grpc_dualstack_mode dsmode;
330 struct sockaddr_in6 addr6_v4mapped;
331 struct sockaddr_in wild4;
332 struct sockaddr_in6 wild6;
333 struct sockaddr_in addr4_copy;
334 struct sockaddr *allocated_addr = NULL;
335 struct sockaddr_storage sockname_temp;
336 socklen_t sockname_len;
337 int port;
338
339 if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
340 unlink_if_unix_domain_socket(addr);
341 }
342
343 /* Check if this is a wildcard port, and if so, try to keep the port the same
344 as some previously created listener. */
345 if (grpc_sockaddr_get_port(addr) == 0) {
346 for (i = 0; i < s->nports; i++) {
347 sockname_len = sizeof(sockname_temp);
348 if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
349 &sockname_len)) {
350 port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
351 if (port > 0) {
352 allocated_addr = malloc(addr_len);
353 memcpy(allocated_addr, addr, addr_len);
354 grpc_sockaddr_set_port(allocated_addr, port);
355 addr = allocated_addr;
356 break;
357 }
358 }
359 }
360 }
361
362 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
363 addr = (const struct sockaddr *)&addr6_v4mapped;
364 addr_len = sizeof(addr6_v4mapped);
365 }
366
367 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
368 if (grpc_sockaddr_is_wildcard(addr, &port)) {
369 grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
370
371 /* Try listening on IPv6 first. */
372 addr = (struct sockaddr *)&wild6;
373 addr_len = sizeof(wild6);
374 fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
375 allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
376 if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
377 goto done;
378 }
379
380 /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
381 if (port == 0 && allocated_port1 > 0) {
382 grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
383 }
384 addr = (struct sockaddr *)&wild4;
385 addr_len = sizeof(wild4);
386 }
387
388 fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
389 if (fd < 0) {
390 gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
391 }
392 if (dsmode == GRPC_DSMODE_IPV4 &&
393 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
394 addr = (struct sockaddr *)&addr4_copy;
395 addr_len = sizeof(addr4_copy);
396 }
397 allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
398
399 done:
400 gpr_free(allocated_addr);
401 return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
402 }
403
404 int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
405 return (port_index < s->nports) ? s->ports[port_index].fd : -1;
406 }
407
408 void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
409 grpc_pollset **pollsets, size_t pollset_count,
410 grpc_server *server) {
411 size_t i, j;
412 gpr_mu_lock(&s->mu);
413 GPR_ASSERT(s->active_ports == 0);
414 s->pollsets = pollsets;
415 s->grpc_server = server;
416 for (i = 0; i < s->nports; i++) {
417 for (j = 0; j < pollset_count; j++) {
418 grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
419 }
420 s->ports[i].read_closure.cb = on_read;
421 s->ports[i].read_closure.cb_arg = &s->ports[i];
422 grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd,
423 &s->ports[i].read_closure);
424 s->active_ports++;
425 }
426 gpr_mu_unlock(&s->mu);
427 }
428
429 #endif
430 #endif
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/iomgr/udp_server.h ('k') | third_party/grpc/src/core/iomgr/wakeup_fd_eventfd.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698