Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(172)

Side by Side Diff: third_party/grpc/src/core/iomgr/tcp_windows.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include <grpc/support/port_platform.h>
35
36 #ifdef GPR_WINSOCK_SOCKET
37
38 #include "src/core/iomgr/sockaddr_win32.h"
39
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/log_win32.h>
43 #include <grpc/support/slice_buffer.h>
44 #include <grpc/support/string_util.h>
45 #include <grpc/support/useful.h>
46
47 #include "src/core/iomgr/timer.h"
48 #include "src/core/iomgr/iocp_windows.h"
49 #include "src/core/iomgr/sockaddr.h"
50 #include "src/core/iomgr/sockaddr_utils.h"
51 #include "src/core/iomgr/socket_windows.h"
52 #include "src/core/iomgr/tcp_client.h"
53
54 static int set_non_block(SOCKET sock) {
55 int status;
56 unsigned long param = 1;
57 DWORD ret;
58 status =
59 WSAIoctl(sock, FIONBIO, &param, sizeof(param), NULL, 0, &ret, NULL, NULL);
60 return status == 0;
61 }
62
63 static int set_dualstack(SOCKET sock) {
64 int status;
65 unsigned long param = 0;
66 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)&param,
67 sizeof(param));
68 return status == 0;
69 }
70
71 int grpc_tcp_prepare_socket(SOCKET sock) {
72 if (!set_non_block(sock)) return 0;
73 if (!set_dualstack(sock)) return 0;
74 return 1;
75 }
76
77 typedef struct grpc_tcp {
78 /* This is our C++ class derivation emulation. */
79 grpc_endpoint base;
80 /* The one socket this endpoint is using. */
81 grpc_winsocket *socket;
82 /* Refcounting how many operations are in progress. */
83 gpr_refcount refcount;
84
85 grpc_closure on_read;
86 grpc_closure on_write;
87
88 grpc_closure *read_cb;
89 grpc_closure *write_cb;
90 gpr_slice read_slice;
91 gpr_slice_buffer *write_slices;
92 gpr_slice_buffer *read_slices;
93
94 /* The IO Completion Port runs from another thread. We need some mechanism
95 to protect ourselves when requesting a shutdown. */
96 gpr_mu mu;
97 int shutting_down;
98
99 char *peer_string;
100 } grpc_tcp;
101
102 static void tcp_free(grpc_tcp *tcp) {
103 grpc_winsocket_destroy(tcp->socket);
104 gpr_mu_destroy(&tcp->mu);
105 gpr_free(tcp->peer_string);
106 gpr_free(tcp);
107 }
108
109 /*#define GRPC_TCP_REFCOUNT_DEBUG*/
110 #ifdef GRPC_TCP_REFCOUNT_DEBUG
111 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
112 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
113 static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
114 int line) {
115 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
116 reason, tcp->refcount.count, tcp->refcount.count - 1);
117 if (gpr_unref(&tcp->refcount)) {
118 tcp_free(tcp);
119 }
120 }
121
122 static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
123 int line) {
124 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
125 reason, tcp->refcount.count, tcp->refcount.count + 1);
126 gpr_ref(&tcp->refcount);
127 }
128 #else
129 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
130 #define TCP_REF(tcp, reason) tcp_ref((tcp))
131 static void tcp_unref(grpc_tcp *tcp) {
132 if (gpr_unref(&tcp->refcount)) {
133 tcp_free(tcp);
134 }
135 }
136
137 static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
138 #endif
139
140 /* Asynchronous callback from the IOCP, or the background thread. */
141 static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
142 grpc_tcp *tcp = tcpp;
143 grpc_closure *cb = tcp->read_cb;
144 grpc_winsocket *socket = tcp->socket;
145 gpr_slice sub;
146 grpc_winsocket_callback_info *info = &socket->read_info;
147
148 if (success) {
149 if (socket->read_info.wsa_error != 0 && !tcp->shutting_down) {
150 if (socket->read_info.wsa_error != WSAECONNRESET) {
151 char *utf8_message = gpr_format_message(info->wsa_error);
152 gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
153 gpr_free(utf8_message);
154 }
155 success = 0;
156 gpr_slice_unref(tcp->read_slice);
157 } else {
158 if (info->bytes_transfered != 0 && !tcp->shutting_down) {
159 sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
160 gpr_slice_buffer_add(tcp->read_slices, sub);
161 success = 1;
162 } else {
163 gpr_slice_unref(tcp->read_slice);
164 success = 0;
165 }
166 }
167 }
168
169 tcp->read_cb = NULL;
170 TCP_UNREF(tcp, "read");
171 if (cb) {
172 cb->cb(exec_ctx, cb->cb_arg, success);
173 }
174 }
175
176 static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
177 gpr_slice_buffer *read_slices, grpc_closure *cb) {
178 grpc_tcp *tcp = (grpc_tcp *)ep;
179 grpc_winsocket *handle = tcp->socket;
180 grpc_winsocket_callback_info *info = &handle->read_info;
181 int status;
182 DWORD bytes_read = 0;
183 DWORD flags = 0;
184 WSABUF buffer;
185
186 if (tcp->shutting_down) {
187 grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
188 return;
189 }
190
191 tcp->read_cb = cb;
192 tcp->read_slices = read_slices;
193 gpr_slice_buffer_reset_and_unref(read_slices);
194
195 tcp->read_slice = gpr_slice_malloc(8192);
196
197 buffer.len = (ULONG)GPR_SLICE_LENGTH(
198 tcp->read_slice); // we know slice size fits in 32bit.
199 buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
200
201 TCP_REF(tcp, "read");
202
203 /* First let's try a synchronous, non-blocking read. */
204 status =
205 WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL);
206 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
207
208 /* Did we get data immediately ? Yay. */
209 if (info->wsa_error != WSAEWOULDBLOCK) {
210 info->bytes_transfered = bytes_read;
211 grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, true, NULL);
212 return;
213 }
214
215 /* Otherwise, let's retry, by queuing a read. */
216 memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
217 status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
218 &info->overlapped, NULL);
219
220 if (status != 0) {
221 int wsa_error = WSAGetLastError();
222 if (wsa_error != WSA_IO_PENDING) {
223 info->wsa_error = wsa_error;
224 grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, false, NULL);
225 return;
226 }
227 }
228
229 grpc_socket_notify_on_read(exec_ctx, tcp->socket, &tcp->on_read);
230 }
231
232 /* Asynchronous callback from the IOCP, or the background thread. */
233 static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
234 grpc_tcp *tcp = (grpc_tcp *)tcpp;
235 grpc_winsocket *handle = tcp->socket;
236 grpc_winsocket_callback_info *info = &handle->write_info;
237 grpc_closure *cb;
238
239 gpr_mu_lock(&tcp->mu);
240 cb = tcp->write_cb;
241 tcp->write_cb = NULL;
242 gpr_mu_unlock(&tcp->mu);
243
244 if (success) {
245 if (info->wsa_error != 0) {
246 if (info->wsa_error != WSAECONNRESET) {
247 char *utf8_message = gpr_format_message(info->wsa_error);
248 gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
249 gpr_free(utf8_message);
250 }
251 success = 0;
252 } else {
253 GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
254 }
255 }
256
257 TCP_UNREF(tcp, "write");
258 cb->cb(exec_ctx, cb->cb_arg, success);
259 }
260
261 /* Initiates a write. */
262 static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
263 gpr_slice_buffer *slices, grpc_closure *cb) {
264 grpc_tcp *tcp = (grpc_tcp *)ep;
265 grpc_winsocket *socket = tcp->socket;
266 grpc_winsocket_callback_info *info = &socket->write_info;
267 unsigned i;
268 DWORD bytes_sent;
269 int status;
270 WSABUF local_buffers[16];
271 WSABUF *allocated = NULL;
272 WSABUF *buffers = local_buffers;
273 size_t len;
274
275 if (tcp->shutting_down) {
276 grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
277 return;
278 }
279
280 tcp->write_cb = cb;
281 tcp->write_slices = slices;
282 GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
283 if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
284 buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
285 allocated = buffers;
286 }
287
288 for (i = 0; i < tcp->write_slices->count; i++) {
289 len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]);
290 GPR_ASSERT(len <= ULONG_MAX);
291 buffers[i].len = (ULONG)len;
292 buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]);
293 }
294
295 /* First, let's try a synchronous, non-blocking write. */
296 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
297 &bytes_sent, 0, NULL, NULL);
298 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
299
300 /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
301 connection that has its send queue filled up. But if we don't, then we can
302 avoid doing an async write operation at all. */
303 if (info->wsa_error != WSAEWOULDBLOCK) {
304 bool ok = false;
305 if (status == 0) {
306 ok = true;
307 GPR_ASSERT(bytes_sent == tcp->write_slices->length);
308 } else {
309 if (socket->read_info.wsa_error != WSAECONNRESET) {
310 char *utf8_message = gpr_format_message(info->wsa_error);
311 gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message);
312 gpr_free(utf8_message);
313 }
314 }
315 if (allocated) gpr_free(allocated);
316 grpc_exec_ctx_enqueue(exec_ctx, cb, ok, NULL);
317 return;
318 }
319
320 TCP_REF(tcp, "write");
321
322 /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
323 operation, this time asynchronously. */
324 memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
325 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
326 &bytes_sent, 0, &socket->write_info.overlapped, NULL);
327 if (allocated) gpr_free(allocated);
328
329 if (status != 0) {
330 int wsa_error = WSAGetLastError();
331 if (wsa_error != WSA_IO_PENDING) {
332 TCP_UNREF(tcp, "write");
333 grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
334 return;
335 }
336 }
337
338 /* As all is now setup, we can now ask for the IOCP notification. It may
339 trigger the callback immediately however, but no matter. */
340 grpc_socket_notify_on_write(exec_ctx, socket, &tcp->on_write);
341 }
342
343 static void win_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
344 grpc_pollset *ps) {
345 grpc_tcp *tcp;
346 (void)ps;
347 tcp = (grpc_tcp *)ep;
348 grpc_iocp_add_socket(tcp->socket);
349 }
350
351 static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
352 grpc_pollset_set *pss) {
353 grpc_tcp *tcp;
354 (void)pss;
355 tcp = (grpc_tcp *)ep;
356 grpc_iocp_add_socket(tcp->socket);
357 }
358
359 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
360 for the potential read and write operations. It is up to the caller to
361 guarantee this isn't called in parallel to a read or write request, so
362 we're not going to protect against these. However the IO Completion Port
363 callback will happen from another thread, so we need to protect against
364 concurrent access of the data structure in that regard. */
365 static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
366 grpc_tcp *tcp = (grpc_tcp *)ep;
367 gpr_mu_lock(&tcp->mu);
368 /* At that point, what may happen is that we're already inside the IOCP
369 callback. See the comments in on_read and on_write. */
370 tcp->shutting_down = 1;
371 grpc_winsocket_shutdown(tcp->socket);
372 gpr_mu_unlock(&tcp->mu);
373 }
374
375 static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
376 grpc_tcp *tcp = (grpc_tcp *)ep;
377 TCP_UNREF(tcp, "destroy");
378 }
379
380 static char *win_get_peer(grpc_endpoint *ep) {
381 grpc_tcp *tcp = (grpc_tcp *)ep;
382 return gpr_strdup(tcp->peer_string);
383 }
384
385 static grpc_endpoint_vtable vtable = {win_read, win_write, win_add_to_pollset,
386 win_add_to_pollset_set, win_shutdown,
387 win_destroy, win_get_peer};
388
389 grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
390 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
391 memset(tcp, 0, sizeof(grpc_tcp));
392 tcp->base.vtable = &vtable;
393 tcp->socket = socket;
394 gpr_mu_init(&tcp->mu);
395 gpr_ref_init(&tcp->refcount, 1);
396 grpc_closure_init(&tcp->on_read, on_read, tcp);
397 grpc_closure_init(&tcp->on_write, on_write, tcp);
398 tcp->peer_string = gpr_strdup(peer_string);
399 return &tcp->base;
400 }
401
402 #endif /* GPR_WINSOCK_SOCKET */
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/iomgr/tcp_windows.h ('k') | third_party/grpc/src/core/iomgr/time_averaged_stats.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698