Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Side by Side Diff: third_party/grpc/src/core/iomgr/tcp_posix.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <grpc/support/port_platform.h>
35
36 #ifdef GPR_POSIX_SOCKET
37
38 #include "src/core/iomgr/tcp_posix.h"
39
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #include <sys/socket.h>
44 #include <sys/types.h>
45 #include <unistd.h>
46
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/slice.h>
50 #include <grpc/support/string_util.h>
51 #include <grpc/support/sync.h>
52 #include <grpc/support/time.h>
53
54 #include "src/core/debug/trace.h"
55 #include "src/core/iomgr/pollset_posix.h"
56 #include "src/core/iomgr/pollset_set_posix.h"
57 #include "src/core/profiling/timers.h"
58 #include "src/core/support/string.h"
59
60 #ifdef GPR_HAVE_MSG_NOSIGNAL
61 #define SENDMSG_FLAGS MSG_NOSIGNAL
62 #else
63 #define SENDMSG_FLAGS 0
64 #endif
65
66 #ifdef GPR_MSG_IOVLEN_TYPE
67 typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
68 #else
69 typedef size_t msg_iovlen_type;
70 #endif
71
72 int grpc_tcp_trace = 0;
73
74 typedef struct {
75 grpc_endpoint base;
76 grpc_fd *em_fd;
77 int fd;
78 int finished_edge;
79 msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
80 size_t slice_size;
81 gpr_refcount refcount;
82
83 /* garbage after the last read */
84 gpr_slice_buffer last_read_buffer;
85
86 gpr_slice_buffer *incoming_buffer;
87 gpr_slice_buffer *outgoing_buffer;
88 /** slice within outgoing_buffer to write next */
89 size_t outgoing_slice_idx;
90 /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
91 size_t outgoing_byte_idx;
92
93 grpc_closure *read_cb;
94 grpc_closure *write_cb;
95 grpc_closure *release_fd_cb;
96 int *release_fd;
97
98 grpc_closure read_closure;
99 grpc_closure write_closure;
100
101 char *peer_string;
102 } grpc_tcp;
103
104 static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
105 bool success);
106 static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
107 bool success);
108
109 static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
110 grpc_tcp *tcp = (grpc_tcp *)ep;
111 grpc_fd_shutdown(exec_ctx, tcp->em_fd);
112 }
113
114 static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
115 grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
116 "tcp_unref_orphan");
117 gpr_slice_buffer_destroy(&tcp->last_read_buffer);
118 gpr_free(tcp->peer_string);
119 gpr_free(tcp);
120 }
121
122 /*#define GRPC_TCP_REFCOUNT_DEBUG*/
123 #ifdef GRPC_TCP_REFCOUNT_DEBUG
124 #define TCP_UNREF(cl, tcp, reason) \
125 tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
126 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
127 static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
128 const char *reason, const char *file, int line) {
129 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
130 reason, tcp->refcount.count, tcp->refcount.count - 1);
131 if (gpr_unref(&tcp->refcount)) {
132 tcp_free(exec_ctx, tcp);
133 }
134 }
135
136 static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
137 int line) {
138 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
139 reason, tcp->refcount.count, tcp->refcount.count + 1);
140 gpr_ref(&tcp->refcount);
141 }
142 #else
143 #define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
144 #define TCP_REF(tcp, reason) tcp_ref((tcp))
145 static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
146 if (gpr_unref(&tcp->refcount)) {
147 tcp_free(exec_ctx, tcp);
148 }
149 }
150
151 static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
152 #endif
153
154 static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
155 grpc_tcp *tcp = (grpc_tcp *)ep;
156 TCP_UNREF(exec_ctx, tcp, "destroy");
157 }
158
159 static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
160 grpc_closure *cb = tcp->read_cb;
161
162 if (grpc_tcp_trace) {
163 size_t i;
164 gpr_log(GPR_DEBUG, "read: success=%d", success);
165 for (i = 0; i < tcp->incoming_buffer->count; i++) {
166 char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
167 GPR_DUMP_HEX | GPR_DUMP_ASCII);
168 gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
169 gpr_free(dump);
170 }
171 }
172
173 tcp->read_cb = NULL;
174 tcp->incoming_buffer = NULL;
175 cb->cb(exec_ctx, cb->cb_arg, success);
176 }
177
178 #define MAX_READ_IOVEC 4
179 static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
180 struct msghdr msg;
181 struct iovec iov[MAX_READ_IOVEC];
182 ssize_t read_bytes;
183 size_t i;
184
185 GPR_ASSERT(!tcp->finished_edge);
186 GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
187 GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
188 GPR_TIMER_BEGIN("tcp_continue_read", 0);
189
190 while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
191 gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
192 gpr_slice_malloc(tcp->slice_size));
193 }
194 for (i = 0; i < tcp->incoming_buffer->count; i++) {
195 iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
196 iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
197 }
198
199 msg.msg_name = NULL;
200 msg.msg_namelen = 0;
201 msg.msg_iov = iov;
202 msg.msg_iovlen = tcp->iov_size;
203 msg.msg_control = NULL;
204 msg.msg_controllen = 0;
205 msg.msg_flags = 0;
206
207 GPR_TIMER_BEGIN("recvmsg", 1);
208 do {
209 read_bytes = recvmsg(tcp->fd, &msg, 0);
210 } while (read_bytes < 0 && errno == EINTR);
211 GPR_TIMER_END("recvmsg", 0);
212
213 if (read_bytes < 0) {
214 /* NB: After calling call_read_cb a parallel call of the read handler may
215 * be running. */
216 if (errno == EAGAIN) {
217 if (tcp->iov_size > 1) {
218 tcp->iov_size /= 2;
219 }
220 /* We've consumed the edge, request a new one */
221 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
222 } else {
223 /* TODO(klempner): Log interesting errors */
224 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
225 call_read_cb(exec_ctx, tcp, 0);
226 TCP_UNREF(exec_ctx, tcp, "read");
227 }
228 } else if (read_bytes == 0) {
229 /* 0 read size ==> end of stream */
230 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
231 call_read_cb(exec_ctx, tcp, 0);
232 TCP_UNREF(exec_ctx, tcp, "read");
233 } else {
234 GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
235 if ((size_t)read_bytes < tcp->incoming_buffer->length) {
236 gpr_slice_buffer_trim_end(
237 tcp->incoming_buffer,
238 tcp->incoming_buffer->length - (size_t)read_bytes,
239 &tcp->last_read_buffer);
240 } else if (tcp->iov_size < MAX_READ_IOVEC) {
241 ++tcp->iov_size;
242 }
243 GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
244 call_read_cb(exec_ctx, tcp, 1);
245 TCP_UNREF(exec_ctx, tcp, "read");
246 }
247
248 GPR_TIMER_END("tcp_continue_read", 0);
249 }
250
251 static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
252 bool success) {
253 grpc_tcp *tcp = (grpc_tcp *)arg;
254 GPR_ASSERT(!tcp->finished_edge);
255
256 if (!success) {
257 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
258 call_read_cb(exec_ctx, tcp, 0);
259 TCP_UNREF(exec_ctx, tcp, "read");
260 } else {
261 tcp_continue_read(exec_ctx, tcp);
262 }
263 }
264
265 static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
266 gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
267 grpc_tcp *tcp = (grpc_tcp *)ep;
268 GPR_ASSERT(tcp->read_cb == NULL);
269 tcp->read_cb = cb;
270 tcp->incoming_buffer = incoming_buffer;
271 gpr_slice_buffer_reset_and_unref(incoming_buffer);
272 gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
273 TCP_REF(tcp, "read");
274 if (tcp->finished_edge) {
275 tcp->finished_edge = 0;
276 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
277 } else {
278 grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL);
279 }
280 }
281
282 typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
283
284 #define MAX_WRITE_IOVEC 16
285 static flush_result tcp_flush(grpc_tcp *tcp) {
286 struct msghdr msg;
287 struct iovec iov[MAX_WRITE_IOVEC];
288 msg_iovlen_type iov_size;
289 ssize_t sent_length;
290 size_t sending_length;
291 size_t trailing;
292 size_t unwind_slice_idx;
293 size_t unwind_byte_idx;
294
295 for (;;) {
296 sending_length = 0;
297 unwind_slice_idx = tcp->outgoing_slice_idx;
298 unwind_byte_idx = tcp->outgoing_byte_idx;
299 for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
300 iov_size != MAX_WRITE_IOVEC;
301 iov_size++) {
302 iov[iov_size].iov_base =
303 GPR_SLICE_START_PTR(
304 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
305 tcp->outgoing_byte_idx;
306 iov[iov_size].iov_len =
307 GPR_SLICE_LENGTH(
308 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
309 tcp->outgoing_byte_idx;
310 sending_length += iov[iov_size].iov_len;
311 tcp->outgoing_slice_idx++;
312 tcp->outgoing_byte_idx = 0;
313 }
314 GPR_ASSERT(iov_size > 0);
315
316 msg.msg_name = NULL;
317 msg.msg_namelen = 0;
318 msg.msg_iov = iov;
319 msg.msg_iovlen = iov_size;
320 msg.msg_control = NULL;
321 msg.msg_controllen = 0;
322 msg.msg_flags = 0;
323
324 GPR_TIMER_BEGIN("sendmsg", 1);
325 do {
326 /* TODO(klempner): Cork if this is a partial write */
327 sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
328 } while (sent_length < 0 && errno == EINTR);
329 GPR_TIMER_END("sendmsg", 0);
330
331 if (sent_length < 0) {
332 if (errno == EAGAIN) {
333 tcp->outgoing_slice_idx = unwind_slice_idx;
334 tcp->outgoing_byte_idx = unwind_byte_idx;
335 return FLUSH_PENDING;
336 } else {
337 /* TODO(klempner): Log some of these */
338 return FLUSH_ERROR;
339 }
340 }
341
342 GPR_ASSERT(tcp->outgoing_byte_idx == 0);
343 trailing = sending_length - (size_t)sent_length;
344 while (trailing > 0) {
345 size_t slice_length;
346
347 tcp->outgoing_slice_idx--;
348 slice_length = GPR_SLICE_LENGTH(
349 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
350 if (slice_length > trailing) {
351 tcp->outgoing_byte_idx = slice_length - trailing;
352 break;
353 } else {
354 trailing -= slice_length;
355 }
356 }
357
358 if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
359 return FLUSH_DONE;
360 }
361 };
362 }
363
364 static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
365 bool success) {
366 grpc_tcp *tcp = (grpc_tcp *)arg;
367 flush_result status;
368 grpc_closure *cb;
369
370 if (!success) {
371 cb = tcp->write_cb;
372 tcp->write_cb = NULL;
373 cb->cb(exec_ctx, cb->cb_arg, 0);
374 TCP_UNREF(exec_ctx, tcp, "write");
375 return;
376 }
377
378 status = tcp_flush(tcp);
379 if (status == FLUSH_PENDING) {
380 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
381 } else {
382 cb = tcp->write_cb;
383 tcp->write_cb = NULL;
384 GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
385 cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
386 GPR_TIMER_END("tcp_handle_write.cb", 0);
387 TCP_UNREF(exec_ctx, tcp, "write");
388 }
389 }
390
391 static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
392 gpr_slice_buffer *buf, grpc_closure *cb) {
393 grpc_tcp *tcp = (grpc_tcp *)ep;
394 flush_result status;
395
396 if (grpc_tcp_trace) {
397 size_t i;
398
399 for (i = 0; i < buf->count; i++) {
400 char *data =
401 gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
402 gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
403 gpr_free(data);
404 }
405 }
406
407 GPR_TIMER_BEGIN("tcp_write", 0);
408 GPR_ASSERT(tcp->write_cb == NULL);
409
410 if (buf->length == 0) {
411 GPR_TIMER_END("tcp_write", 0);
412 grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
413 return;
414 }
415 tcp->outgoing_buffer = buf;
416 tcp->outgoing_slice_idx = 0;
417 tcp->outgoing_byte_idx = 0;
418
419 status = tcp_flush(tcp);
420 if (status == FLUSH_PENDING) {
421 TCP_REF(tcp, "write");
422 tcp->write_cb = cb;
423 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
424 } else {
425 grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL);
426 }
427
428 GPR_TIMER_END("tcp_write", 0);
429 }
430
431 static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
432 grpc_pollset *pollset) {
433 grpc_tcp *tcp = (grpc_tcp *)ep;
434 grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
435 }
436
437 static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
438 grpc_pollset_set *pollset_set) {
439 grpc_tcp *tcp = (grpc_tcp *)ep;
440 grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
441 }
442
443 static char *tcp_get_peer(grpc_endpoint *ep) {
444 grpc_tcp *tcp = (grpc_tcp *)ep;
445 return gpr_strdup(tcp->peer_string);
446 }
447
448 static const grpc_endpoint_vtable vtable = {
449 tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
450 tcp_shutdown, tcp_destroy, tcp_get_peer};
451
452 grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
453 const char *peer_string) {
454 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
455 tcp->base.vtable = &vtable;
456 tcp->peer_string = gpr_strdup(peer_string);
457 tcp->fd = em_fd->fd;
458 tcp->read_cb = NULL;
459 tcp->write_cb = NULL;
460 tcp->release_fd_cb = NULL;
461 tcp->release_fd = NULL;
462 tcp->incoming_buffer = NULL;
463 tcp->slice_size = slice_size;
464 tcp->iov_size = 1;
465 tcp->finished_edge = 1;
466 /* paired with unref in grpc_tcp_destroy */
467 gpr_ref_init(&tcp->refcount, 1);
468 tcp->em_fd = em_fd;
469 tcp->read_closure.cb = tcp_handle_read;
470 tcp->read_closure.cb_arg = tcp;
471 tcp->write_closure.cb = tcp_handle_write;
472 tcp->write_closure.cb_arg = tcp;
473 gpr_slice_buffer_init(&tcp->last_read_buffer);
474
475 return &tcp->base;
476 }
477
478 int grpc_tcp_fd(grpc_endpoint *ep) {
479 grpc_tcp *tcp = (grpc_tcp *)ep;
480 GPR_ASSERT(ep->vtable == &vtable);
481 return grpc_fd_wrapped_fd(tcp->em_fd);
482 }
483
484 void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
485 int *fd, grpc_closure *done) {
486 grpc_tcp *tcp = (grpc_tcp *)ep;
487 GPR_ASSERT(ep->vtable == &vtable);
488 tcp->release_fd = fd;
489 tcp->release_fd_cb = done;
490 TCP_UNREF(exec_ctx, tcp, "destroy");
491 }
492
493 #endif
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/iomgr/tcp_posix.h ('k') | third_party/grpc/src/core/iomgr/tcp_server.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698