Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Side by Side Diff: net/socket/tcp_socket_libevent.cc

Issue 23454010: POSIX only: Move client socket functionality from TCPClientSocket into TCPSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/socket/tcp_socket_libevent.h" 5 #include "net/socket/tcp_socket.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <fcntl.h> 8 #include <fcntl.h>
9 #include <netdb.h> 9 #include <netdb.h>
10 #include <netinet/tcp.h>
10 #include <sys/socket.h> 11 #include <sys/socket.h>
11 12
12 #include "build/build_config.h" 13 #include "build/build_config.h"
13 14
14 #if defined(OS_POSIX) 15 #if defined(OS_POSIX)
15 #include <netinet/in.h> 16 #include <netinet/in.h>
16 #endif 17 #endif
17 18
18 #include "base/logging.h" 19 #include "base/logging.h"
20 #include "base/metrics/histogram.h"
21 #include "base/metrics/stats_counters.h"
19 #include "base/posix/eintr_wrapper.h" 22 #include "base/posix/eintr_wrapper.h"
23 #include "net/base/address_list.h"
24 #include "net/base/connection_type_histograms.h"
25 #include "net/base/io_buffer.h"
20 #include "net/base/ip_endpoint.h" 26 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h" 27 #include "net/base/net_errors.h"
22 #include "net/base/net_util.h" 28 #include "net/base/net_util.h"
23 #include "net/socket/socket_descriptor.h" 29 #include "net/base/network_change_notifier.h"
24 #include "net/socket/socket_net_log_params.h" 30 #include "net/socket/socket_net_log_params.h"
25 31
32 // If we don't have a definition for TCPI_OPT_SYN_DATA, create one.
33 #ifndef TCPI_OPT_SYN_DATA
34 #define TCPI_OPT_SYN_DATA 32
35 #endif
36
26 namespace net { 37 namespace net {
38 namespace {
39
40 const int kTCPKeepAliveSeconds = 45;
41
42 // SetTCPNoDelay turns on/off buffering in the kernel. By default, TCP sockets
43 // will wait up to 200ms for more data to complete a packet before transmitting.
44 // After calling this function, the kernel will not wait. See TCP_NODELAY in
45 // `man 7 tcp`.
46 bool SetTCPNoDelay(int fd, bool no_delay) {
47 int on = no_delay ? 1 : 0;
48 int error = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
49 return error == 0;
50 }
51
52 // SetTCPKeepAlive sets SO_KEEPALIVE.
53 bool SetTCPKeepAlive(int fd, bool enable, int delay) {
54 int on = enable ? 1 : 0;
55 if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on))) {
56 PLOG(ERROR) << "Failed to set SO_KEEPALIVE on fd: " << fd;
57 return false;
58 }
59 #if defined(OS_LINUX) || defined(OS_ANDROID)
60 // Set seconds until first TCP keep alive.
61 if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &delay, sizeof(delay))) {
62 PLOG(ERROR) << "Failed to set TCP_KEEPIDLE on fd: " << fd;
63 return false;
64 }
65 // Set seconds between TCP keep alives.
66 if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &delay, sizeof(delay))) {
67 PLOG(ERROR) << "Failed to set TCP_KEEPINTVL on fd: " << fd;
68 return false;
69 }
70 #endif
71 return true;
72 }
73
74 int MapConnectError(int os_error) {
75 switch (os_error) {
76 case EACCES:
77 return ERR_NETWORK_ACCESS_DENIED;
78 case ETIMEDOUT:
79 return ERR_CONNECTION_TIMED_OUT;
80 default: {
81 int net_error = MapSystemError(os_error);
82 if (net_error == ERR_FAILED)
83 return ERR_CONNECTION_FAILED; // More specific than ERR_FAILED.
84
85 // Give a more specific error when the user is offline.
86 if (net_error == ERR_ADDRESS_UNREACHABLE &&
87 NetworkChangeNotifier::IsOffline()) {
88 return ERR_INTERNET_DISCONNECTED;
89 }
90 return net_error;
91 }
92 }
93 }
94
95 } // namespace
96
97 //-----------------------------------------------------------------------------
98
99 TCPSocketLibevent::Watcher::Watcher(
100 const base::Closure& read_ready_callback,
101 const base::Closure& write_ready_callback)
102 : read_ready_callback_(read_ready_callback),
103 write_ready_callback_(write_ready_callback) {
104 }
105
106 TCPSocketLibevent::Watcher::~Watcher() {
107 }
108
109 void TCPSocketLibevent::Watcher::OnFileCanReadWithoutBlocking(int /* fd */) {
110 if (!read_ready_callback_.is_null())
111 read_ready_callback_.Run();
112 else
113 NOTREACHED();
114 }
115
116 void TCPSocketLibevent::Watcher::OnFileCanWriteWithoutBlocking(int /* fd */) {
117 if (!write_ready_callback_.is_null())
118 write_ready_callback_.Run();
akalin 2013/09/05 22:58:12 assuming moved code here, too
yzshen1 2013/09/06 17:19:45 Please see my reply for your comment in the .h fil
119 else
120 NOTREACHED();
121 }
27 122
28 TCPSocketLibevent::TCPSocketLibevent(NetLog* net_log, 123 TCPSocketLibevent::TCPSocketLibevent(NetLog* net_log,
29 const NetLog::Source& source) 124 const NetLog::Source& source)
30 : socket_(kInvalidSocket), 125 : socket_(kInvalidSocket),
126 accept_watcher_(base::Bind(&TCPSocketLibevent::DidCompleteAccept,
127 base::Unretained(this)),
128 base::Closure()),
31 accept_socket_(NULL), 129 accept_socket_(NULL),
32 accept_address_(NULL), 130 accept_address_(NULL),
131 read_watcher_(base::Bind(&TCPSocketLibevent::DidCompleteRead,
132 base::Unretained(this)),
133 base::Closure()),
134 write_watcher_(base::Closure(),
135 base::Bind(&TCPSocketLibevent::DidCompleteConnectOrWrite,
136 base::Unretained(this))),
137 read_buf_len_(0),
138 write_buf_len_(0),
139 use_tcp_fastopen_(IsTCPFastOpenEnabled()),
140 tcp_fastopen_connected_(false),
141 fast_open_status_(FAST_OPEN_STATUS_UNKNOWN),
142 waiting_connect_(false),
143 connect_os_error_(0),
144 logging_multiple_connect_attempts_(false),
33 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)) { 145 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)) {
34 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE, 146 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
35 source.ToEventParametersCallback()); 147 source.ToEventParametersCallback());
36 } 148 }
37 149
38 TCPSocketLibevent::~TCPSocketLibevent() { 150 TCPSocketLibevent::~TCPSocketLibevent() {
39 if (socket_ != kInvalidSocket)
40 Close();
41 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE); 151 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
152 if (tcp_fastopen_connected_) {
153 UMA_HISTOGRAM_ENUMERATION("Net.TcpFastOpenSocketConnection",
154 fast_open_status_, FAST_OPEN_MAX_VALUE);
155 }
156 Close();
42 } 157 }
43 158
44 int TCPSocketLibevent::Create(AddressFamily family) { 159 int TCPSocketLibevent::Create(AddressFamily family) {
45 DCHECK(CalledOnValidThread()); 160 DCHECK(CalledOnValidThread());
46 DCHECK_EQ(socket_, kInvalidSocket); 161 DCHECK_EQ(socket_, kInvalidSocket);
47 162
48 socket_ = CreatePlatformSocket(ConvertAddressFamily(family), SOCK_STREAM, 163 socket_ = CreatePlatformSocket(ConvertAddressFamily(family), SOCK_STREAM,
49 IPPROTO_TCP); 164 IPPROTO_TCP);
50 if (socket_ < 0) { 165 if (socket_ < 0) {
51 PLOG(ERROR) << "CreatePlatformSocket() returned an error"; 166 PLOG(ERROR) << "CreatePlatformSocket() returned an error";
52 return MapSystemError(errno); 167 return MapSystemError(errno);
53 } 168 }
54 169
55 if (SetNonBlocking(socket_)) { 170 if (SetNonBlocking(socket_)) {
56 int result = MapSystemError(errno); 171 int result = MapSystemError(errno);
57 Close(); 172 Close();
58 return result; 173 return result;
59 } 174 }
60 175
61 return OK; 176 return OK;
62 } 177 }
63 178
64 int TCPSocketLibevent::Adopt(int socket) { 179 int TCPSocketLibevent::AdoptConnectedSocket(int socket,
180 const IPEndPoint& peer_address) {
65 DCHECK(CalledOnValidThread()); 181 DCHECK(CalledOnValidThread());
66 DCHECK_EQ(socket_, kInvalidSocket); 182 DCHECK_EQ(socket_, kInvalidSocket);
67 183
68 socket_ = socket; 184 socket_ = socket;
69 185
70 if (SetNonBlocking(socket_)) { 186 if (SetNonBlocking(socket_)) {
71 int result = MapSystemError(errno); 187 int result = MapSystemError(errno);
72 Close(); 188 Close();
73 return result; 189 return result;
74 } 190 }
75 191
192 peer_address_ = peer_address;
193
76 return OK; 194 return OK;
77 } 195 }
78 196
79 int TCPSocketLibevent::Release() {
80 DCHECK(CalledOnValidThread());
81 DCHECK(accept_callback_.is_null());
82
83 int result = socket_;
84 socket_ = kInvalidSocket;
85 return result;
86 }
87
88 int TCPSocketLibevent::Bind(const IPEndPoint& address) { 197 int TCPSocketLibevent::Bind(const IPEndPoint& address) {
89 DCHECK(CalledOnValidThread()); 198 DCHECK(CalledOnValidThread());
90 DCHECK_NE(socket_, kInvalidSocket); 199 DCHECK_NE(socket_, kInvalidSocket);
91 200
92 SockaddrStorage storage; 201 SockaddrStorage storage;
93 if (!address.ToSockAddr(storage.addr, &storage.addr_len)) 202 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
94 return ERR_ADDRESS_INVALID; 203 return ERR_ADDRESS_INVALID;
95 204
96 int result = bind(socket_, storage.addr, storage.addr_len); 205 int result = bind(socket_, storage.addr, storage.addr_len);
97 if (result < 0) { 206 if (result < 0) {
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 DCHECK(!callback.is_null()); 247 DCHECK(!callback.is_null());
139 DCHECK(accept_callback_.is_null()); 248 DCHECK(accept_callback_.is_null());
140 249
141 net_log_.BeginEvent(NetLog::TYPE_TCP_ACCEPT); 250 net_log_.BeginEvent(NetLog::TYPE_TCP_ACCEPT);
142 251
143 int result = AcceptInternal(socket, address); 252 int result = AcceptInternal(socket, address);
144 253
145 if (result == ERR_IO_PENDING) { 254 if (result == ERR_IO_PENDING) {
146 if (!base::MessageLoopForIO::current()->WatchFileDescriptor( 255 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
147 socket_, true, base::MessageLoopForIO::WATCH_READ, 256 socket_, true, base::MessageLoopForIO::WATCH_READ,
148 &accept_socket_watcher_, this)) { 257 &accept_socket_watcher_, &accept_watcher_)) {
149 PLOG(ERROR) << "WatchFileDescriptor failed on read"; 258 PLOG(ERROR) << "WatchFileDescriptor failed on read";
150 return MapSystemError(errno); 259 return MapSystemError(errno);
151 } 260 }
152 261
153 accept_socket_ = socket; 262 accept_socket_ = socket;
154 accept_address_ = address; 263 accept_address_ = address;
155 accept_callback_ = callback; 264 accept_callback_ = callback;
156 } 265 }
157 266
158 return result; 267 return result;
159 } 268 }
160 269
270 int TCPSocketLibevent::Connect(const IPEndPoint& address,
271 const CompletionCallback& callback) {
272 DCHECK(CalledOnValidThread());
273 DCHECK_NE(socket_, kInvalidSocket);
274
275 // If in the process of connecting or already connected, then just return OK.
276 if (waiting_connect_ || IsConnected())
277 return OK;
278
279 if (!logging_multiple_connect_attempts_)
280 LogConnectStart(AddressList(address));
281
282 peer_address_ = address;
283
284 int rv = DoConnect();
285 if (rv == ERR_IO_PENDING) {
286 // Synchronous operation not supported.
287 DCHECK(!callback.is_null());
288 write_callback_ = callback;
289 waiting_connect_ = true;
290 } else {
291 DoConnectComplete(rv);
292 }
293
294 return rv;
295 }
296
297 bool TCPSocketLibevent::IsConnected() const {
298 DCHECK(CalledOnValidThread());
299
300 if (socket_ == kInvalidSocket || waiting_connect_)
301 return false;
302
303 if (use_tcp_fastopen_ && !tcp_fastopen_connected_) {
304 IPEndPoint null_end_point;
305 // != is not defined for IPEndPoint.
306 if (!(peer_address_ == null_end_point)) {
307 // With TCP FastOpen, we pretend that the socket is connected.
308 // This allows GetPeerAddress() to return peer_address_.
309 return true;
310 }
311 }
312
313 // Check if connection is alive.
314 char c;
315 int rv = HANDLE_EINTR(recv(socket_, &c, 1, MSG_PEEK));
316 if (rv == 0)
317 return false;
318 if (rv == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
319 return false;
320
321 return true;
322 }
323
324 bool TCPSocketLibevent::IsConnectedAndIdle() const {
325 DCHECK(CalledOnValidThread());
326
327 if (socket_ == kInvalidSocket || waiting_connect_)
328 return false;
329
330 // TODO(wtc): should we also handle the TCP FastOpen case here,
331 // as we do in IsConnected()?
332
333 // Check if connection is alive and we haven't received any data
334 // unexpectedly.
335 char c;
336 int rv = HANDLE_EINTR(recv(socket_, &c, 1, MSG_PEEK));
337 if (rv >= 0)
338 return false;
339 if (errno != EAGAIN && errno != EWOULDBLOCK)
340 return false;
341
342 return true;
343 }
344
345 int TCPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
346 DCHECK(CalledOnValidThread());
347 DCHECK(address);
348 if (!IsConnected())
349 return ERR_SOCKET_NOT_CONNECTED;
350 *address = peer_address_;
351 return OK;
352 }
353
354 int TCPSocketLibevent::Read(IOBuffer* buf,
355 int buf_len,
356 const CompletionCallback& callback) {
357 DCHECK(CalledOnValidThread());
358 DCHECK_NE(kInvalidSocket, socket_);
359 DCHECK(!waiting_connect_);
360 DCHECK(read_callback_.is_null());
361 // Synchronous operation not supported
362 DCHECK(!callback.is_null());
363 DCHECK_GT(buf_len, 0);
364
365 int nread = HANDLE_EINTR(read(socket_, buf->data(), buf_len));
366 if (nread >= 0) {
367 base::StatsCounter read_bytes("tcp.read_bytes");
368 read_bytes.Add(nread);
369 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, nread,
370 buf->data());
371 RecordFastOpenStatus();
372 return nread;
373 }
374 if (errno != EAGAIN && errno != EWOULDBLOCK) {
375 int net_error = MapSystemError(errno);
376 net_log_.AddEvent(NetLog::TYPE_SOCKET_READ_ERROR,
377 CreateNetLogSocketErrorCallback(net_error, errno));
378 return net_error;
379 }
380
381 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
382 socket_, true, base::MessageLoopForIO::WATCH_READ,
383 &read_socket_watcher_, &read_watcher_)) {
384 DVLOG(1) << "WatchFileDescriptor failed on read, errno " << errno;
385 return MapSystemError(errno);
386 }
387
388 read_buf_ = buf;
389 read_buf_len_ = buf_len;
390 read_callback_ = callback;
391 return ERR_IO_PENDING;
392 }
393
394 int TCPSocketLibevent::Write(IOBuffer* buf,
395 int buf_len,
396 const CompletionCallback& callback) {
397 DCHECK(CalledOnValidThread());
398 DCHECK_NE(kInvalidSocket, socket_);
399 DCHECK(!waiting_connect_);
400 DCHECK(write_callback_.is_null());
401 // Synchronous operation not supported
402 DCHECK(!callback.is_null());
403 DCHECK_GT(buf_len, 0);
404
405 int nwrite = InternalWrite(buf, buf_len);
406 if (nwrite >= 0) {
407 base::StatsCounter write_bytes("tcp.write_bytes");
408 write_bytes.Add(nwrite);
409 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, nwrite,
410 buf->data());
411 return nwrite;
412 }
413 if (errno != EAGAIN && errno != EWOULDBLOCK) {
414 int net_error = MapSystemError(errno);
415 net_log_.AddEvent(NetLog::TYPE_SOCKET_WRITE_ERROR,
416 CreateNetLogSocketErrorCallback(net_error, errno));
417 return net_error;
418 }
419
420 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
421 socket_, true, base::MessageLoopForIO::WATCH_WRITE,
422 &write_socket_watcher_, &write_watcher_)) {
423 DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
424 return MapSystemError(errno);
425 }
426
427 write_buf_ = buf;
428 write_buf_len_ = buf_len;
429 write_callback_ = callback;
430 return ERR_IO_PENDING;
431 }
432
161 int TCPSocketLibevent::SetDefaultOptionsForServer() { 433 int TCPSocketLibevent::SetDefaultOptionsForServer() {
434 DCHECK(CalledOnValidThread());
162 return SetAddressReuse(true); 435 return SetAddressReuse(true);
163 } 436 }
164 437
438 int TCPSocketLibevent::SetDefaultOptionsForClient() {
439 DCHECK(CalledOnValidThread());
440
441 // This mirrors the behaviour on Windows. See the comment in
442 // tcp_socket_win.cc after searching for "NODELAY".
443 SetTCPNoDelay(socket_, true); // If SetTCPNoDelay fails, we don't care.
444 SetTCPKeepAlive(socket_, true, kTCPKeepAliveSeconds);
445
446 return OK;
447 }
448
165 int TCPSocketLibevent::SetAddressReuse(bool allow) { 449 int TCPSocketLibevent::SetAddressReuse(bool allow) {
450 DCHECK(CalledOnValidThread());
451
166 // SO_REUSEADDR is useful for server sockets to bind to a recently unbound 452 // SO_REUSEADDR is useful for server sockets to bind to a recently unbound
167 // port. When a socket is closed, the end point changes its state to TIME_WAIT 453 // port. When a socket is closed, the end point changes its state to TIME_WAIT
168 // and wait for 2 MSL (maximum segment lifetime) to ensure the remote peer 454 // and wait for 2 MSL (maximum segment lifetime) to ensure the remote peer
169 // acknowledges its closure. For server sockets, it is usually safe to 455 // acknowledges its closure. For server sockets, it is usually safe to
170 // bind to a TIME_WAIT end point immediately, which is a widely adopted 456 // bind to a TIME_WAIT end point immediately, which is a widely adopted
171 // behavior. 457 // behavior.
172 // 458 //
173 // Note that on *nix, SO_REUSEADDR does not enable the TCP socket to bind to 459 // Note that on *nix, SO_REUSEADDR does not enable the TCP socket to bind to
174 // an end point that is already bound by another socket. To do that one must 460 // an end point that is already bound by another socket. To do that one must
175 // set SO_REUSEPORT instead. This option is not provided on Linux prior 461 // set SO_REUSEPORT instead. This option is not provided on Linux prior
176 // to 3.9. 462 // to 3.9.
177 // 463 //
178 // SO_REUSEPORT is provided in MacOS X and iOS. 464 // SO_REUSEPORT is provided in MacOS X and iOS.
179 int boolean_value = allow ? 1 : 0; 465 int boolean_value = allow ? 1 : 0;
180 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &boolean_value, 466 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &boolean_value,
181 sizeof(boolean_value)); 467 sizeof(boolean_value));
182 if (rv < 0) 468 if (rv < 0)
183 return MapSystemError(errno); 469 return MapSystemError(errno);
184 return OK; 470 return OK;
185 } 471 }
186 472
473 bool TCPSocketLibevent::SetReceiveBufferSize(int32 size) {
474 DCHECK(CalledOnValidThread());
475 int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
476 reinterpret_cast<const char*>(&size),
477 sizeof(size));
478 DCHECK(!rv) << "Could not set socket receive buffer size: " << errno;
479 return rv == 0;
480 }
481
482 bool TCPSocketLibevent::SetSendBufferSize(int32 size) {
483 DCHECK(CalledOnValidThread());
484 int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
485 reinterpret_cast<const char*>(&size),
486 sizeof(size));
487 DCHECK(!rv) << "Could not set socket send buffer size: " << errno;
488 return rv == 0;
489 }
490
491 bool TCPSocketLibevent::SetKeepAlive(bool enable, int delay) {
492 DCHECK(CalledOnValidThread());
493 return SetTCPKeepAlive(socket_, enable, delay);
494 }
495
496 bool TCPSocketLibevent::SetNoDelay(bool no_delay) {
497 DCHECK(CalledOnValidThread());
498 return SetTCPNoDelay(socket_, no_delay);
499 }
500
187 void TCPSocketLibevent::Close() { 501 void TCPSocketLibevent::Close() {
502 DCHECK(CalledOnValidThread());
503
188 if (socket_ != kInvalidSocket) { 504 if (socket_ != kInvalidSocket) {
189 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
190 DCHECK(ok);
191 if (HANDLE_EINTR(close(socket_)) < 0) 505 if (HANDLE_EINTR(close(socket_)) < 0)
192 PLOG(ERROR) << "close"; 506 PLOG(ERROR) << "close";
193 socket_ = kInvalidSocket; 507 socket_ = kInvalidSocket;
194 } 508 }
509
510 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
511 DCHECK(ok);
512 ok = read_socket_watcher_.StopWatchingFileDescriptor();
513 DCHECK(ok);
514 ok = write_socket_watcher_.StopWatchingFileDescriptor();
515 DCHECK(ok);
516
517 if (!accept_callback_.is_null()) {
518 accept_socket_ = NULL;
519 accept_address_ = NULL;
520 accept_callback_.Reset();
521 }
522
523 if (!read_callback_.is_null()) {
524 read_buf_ = NULL;
525 read_buf_len_ = 0;
526 read_callback_.Reset();
527 }
528
529 if (!write_callback_.is_null()) {
530 write_buf_ = NULL;
531 write_buf_len_ = 0;
532 write_callback_.Reset();
533 }
534
535 tcp_fastopen_connected_ = false;
536 fast_open_status_ = FAST_OPEN_STATUS_UNKNOWN;
537 waiting_connect_ = false;
538 peer_address_ = IPEndPoint();
539 connect_os_error_ = 0;
540 }
541
542 bool TCPSocketLibevent::UsingTCPFastOpen() const {
543 return use_tcp_fastopen_;
544 }
545
546 void TCPSocketLibevent::StartLoggingMultipleConnectAttempts(
547 const AddressList& addresses) {
548 if (!logging_multiple_connect_attempts_) {
549 logging_multiple_connect_attempts_ = true;
550 LogConnectStart(addresses);
551 } else {
552 NOTREACHED();
553 }
554 }
555
556 void TCPSocketLibevent::EndLoggingMultipleConnectAttempts(int net_error) {
557 if (logging_multiple_connect_attempts_) {
558 LogConnectCompletion(net_error);
559 logging_multiple_connect_attempts_ = false;
560 } else {
561 NOTREACHED();
562 }
195 } 563 }
196 564
197 int TCPSocketLibevent::AcceptInternal(scoped_ptr<TCPSocketLibevent>* socket, 565 int TCPSocketLibevent::AcceptInternal(scoped_ptr<TCPSocketLibevent>* socket,
198 IPEndPoint* address) { 566 IPEndPoint* address) {
199 SockaddrStorage storage; 567 SockaddrStorage storage;
200 int new_socket = HANDLE_EINTR(accept(socket_, 568 int new_socket = HANDLE_EINTR(accept(socket_,
201 storage.addr, 569 storage.addr,
202 &storage.addr_len)); 570 &storage.addr_len));
203 if (new_socket < 0) { 571 if (new_socket < 0) {
204 int net_error = MapSystemError(errno); 572 int net_error = MapSystemError(errno);
205 if (net_error != ERR_IO_PENDING) 573 if (net_error != ERR_IO_PENDING)
206 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, net_error); 574 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, net_error);
207 return net_error; 575 return net_error;
208 } 576 }
209 577
210 IPEndPoint ip_end_point; 578 IPEndPoint ip_end_point;
211 if (!ip_end_point.FromSockAddr(storage.addr, storage.addr_len)) { 579 if (!ip_end_point.FromSockAddr(storage.addr, storage.addr_len)) {
212 NOTREACHED(); 580 NOTREACHED();
213 if (HANDLE_EINTR(close(new_socket)) < 0) 581 if (HANDLE_EINTR(close(new_socket)) < 0)
214 PLOG(ERROR) << "close"; 582 PLOG(ERROR) << "close";
215 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, ERR_FAILED); 583 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, ERR_FAILED);
216 return ERR_FAILED; 584 return ERR_FAILED;
217 } 585 }
218 scoped_ptr<TCPSocketLibevent> tcp_socket(new TCPSocketLibevent( 586 scoped_ptr<TCPSocketLibevent> tcp_socket(new TCPSocketLibevent(
219 net_log_.net_log(), net_log_.source())); 587 net_log_.net_log(), net_log_.source()));
220 int adopt_result = tcp_socket->Adopt(new_socket); 588 int adopt_result = tcp_socket->AdoptConnectedSocket(new_socket, ip_end_point);
221 if (adopt_result != OK) { 589 if (adopt_result != OK) {
222 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, adopt_result); 590 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, adopt_result);
223 return adopt_result; 591 return adopt_result;
224 } 592 }
225 *socket = tcp_socket.Pass(); 593 *socket = tcp_socket.Pass();
226 *address = ip_end_point; 594 *address = ip_end_point;
227 net_log_.EndEvent(NetLog::TYPE_TCP_ACCEPT, 595 net_log_.EndEvent(NetLog::TYPE_TCP_ACCEPT,
228 CreateNetLogIPEndPointCallback(&ip_end_point)); 596 CreateNetLogIPEndPointCallback(&ip_end_point));
229 return OK; 597 return OK;
230 } 598 }
231 599
232 void TCPSocketLibevent::OnFileCanReadWithoutBlocking(int fd) { 600 int TCPSocketLibevent::DoConnect() {
601 DCHECK_EQ(0, connect_os_error_);
602
603 net_log_.BeginEvent(NetLog::TYPE_TCP_CONNECT_ATTEMPT,
604 CreateNetLogIPEndPointCallback(&peer_address_));
605
606 // Connect the socket.
607 if (!use_tcp_fastopen_) {
608 SockaddrStorage storage;
609 if (!peer_address_.ToSockAddr(storage.addr, &storage.addr_len))
610 return ERR_INVALID_ARGUMENT;
611
612 if (!HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len))) {
613 // Connected without waiting!
614 return OK;
615 }
616 } else {
617 // With TCP FastOpen, we pretend that the socket is connected.
618 DCHECK(!tcp_fastopen_connected_);
619 return OK;
620 }
621
622 // Check if the connect() failed synchronously.
623 connect_os_error_ = errno;
624 if (connect_os_error_ != EINPROGRESS)
625 return MapConnectError(connect_os_error_);
626
627 // Otherwise the connect() is going to complete asynchronously, so watch
628 // for its completion.
629 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
630 socket_, true, base::MessageLoopForIO::WATCH_WRITE,
631 &write_socket_watcher_, &write_watcher_)) {
632 connect_os_error_ = errno;
633 DVLOG(1) << "WatchFileDescriptor failed: " << connect_os_error_;
634 return MapSystemError(connect_os_error_);
635 }
636
637 return ERR_IO_PENDING;
638 }
639
640 void TCPSocketLibevent::DoConnectComplete(int result) {
641 // Log the end of this attempt (and any OS error it threw).
642 int os_error = connect_os_error_;
643 connect_os_error_ = 0;
644 if (result != OK) {
645 net_log_.EndEvent(NetLog::TYPE_TCP_CONNECT_ATTEMPT,
646 NetLog::IntegerCallback("os_error", os_error));
647 } else {
648 net_log_.EndEvent(NetLog::TYPE_TCP_CONNECT_ATTEMPT);
649 }
650
651 if (!logging_multiple_connect_attempts_)
652 LogConnectCompletion(result);
653 }
654
655 void TCPSocketLibevent::LogConnectStart(const AddressList& addresses) {
656 base::StatsCounter connects("tcp.connect");
657 connects.Increment();
658
659 net_log_.BeginEvent(NetLog::TYPE_TCP_CONNECT,
660 addresses.CreateNetLogCallback());
661 }
662
663 void TCPSocketLibevent::LogConnectCompletion(int net_error) {
664 if (net_error == OK)
665 UpdateConnectionTypeHistograms(CONNECTION_ANY);
666
667 if (net_error != OK) {
668 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_CONNECT, net_error);
669 return;
670 }
671
672 SockaddrStorage storage;
673 int rv = getsockname(socket_, storage.addr, &storage.addr_len);
674 if (rv != 0) {
675 PLOG(ERROR) << "getsockname() [rv: " << rv << "] error: ";
676 NOTREACHED();
677 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_CONNECT, rv);
678 return;
679 }
680
681 net_log_.EndEvent(NetLog::TYPE_TCP_CONNECT,
682 CreateNetLogSourceAddressCallback(storage.addr,
683 storage.addr_len));
684 }
685
686 void TCPSocketLibevent::DoReadCallback(int rv) {
687 DCHECK_NE(rv, ERR_IO_PENDING);
688 DCHECK(!read_callback_.is_null());
689
690 // Since Run may result in Read being called, clear read_callback_ up front.
691 CompletionCallback c = read_callback_;
692 read_callback_.Reset();
693 c.Run(rv);
694 }
695
696 void TCPSocketLibevent::DoWriteCallback(int rv) {
697 DCHECK_NE(rv, ERR_IO_PENDING);
698 DCHECK(!write_callback_.is_null());
699
700 // Since Run may result in Write being called, clear write_callback_ up front.
701 CompletionCallback c = write_callback_;
702 write_callback_.Reset();
703 c.Run(rv);
704 }
705
706 void TCPSocketLibevent::DidCompleteRead() {
707 RecordFastOpenStatus();
708 if (read_callback_.is_null())
709 return;
710
711 int bytes_transferred;
712 bytes_transferred = HANDLE_EINTR(read(socket_, read_buf_->data(),
713 read_buf_len_));
714
715 int result;
716 if (bytes_transferred >= 0) {
717 result = bytes_transferred;
718 base::StatsCounter read_bytes("tcp.read_bytes");
719 read_bytes.Add(bytes_transferred);
720 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, result,
721 read_buf_->data());
722 } else {
723 result = MapSystemError(errno);
724 if (result != ERR_IO_PENDING) {
725 net_log_.AddEvent(NetLog::TYPE_SOCKET_READ_ERROR,
726 CreateNetLogSocketErrorCallback(result, errno));
727 }
728 }
729
730 if (result != ERR_IO_PENDING) {
731 read_buf_ = NULL;
732 read_buf_len_ = 0;
733 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
734 DCHECK(ok);
735 DoReadCallback(result);
736 }
737 }
738
739 void TCPSocketLibevent::DidCompleteWrite() {
740 DCHECK(!write_callback_.is_null());
741
742 int bytes_transferred;
743 bytes_transferred = HANDLE_EINTR(write(socket_, write_buf_->data(),
744 write_buf_len_));
745
746 int result;
747 if (bytes_transferred >= 0) {
748 result = bytes_transferred;
749 base::StatsCounter write_bytes("tcp.write_bytes");
750 write_bytes.Add(bytes_transferred);
751 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, result,
752 write_buf_->data());
753 } else {
754 result = MapSystemError(errno);
755 if (result != ERR_IO_PENDING) {
756 net_log_.AddEvent(NetLog::TYPE_SOCKET_WRITE_ERROR,
757 CreateNetLogSocketErrorCallback(result, errno));
758 }
759 }
760
761 if (result != ERR_IO_PENDING) {
762 write_buf_ = NULL;
763 write_buf_len_ = 0;
764 write_socket_watcher_.StopWatchingFileDescriptor();
765 DoWriteCallback(result);
766 }
767 }
768
769 void TCPSocketLibevent::DidCompleteConnect() {
770 DCHECK(waiting_connect_);
771
772 // Get the error that connect() completed with.
773 int os_error = 0;
774 socklen_t len = sizeof(os_error);
775 if (getsockopt(socket_, SOL_SOCKET, SO_ERROR, &os_error, &len) < 0)
776 os_error = errno;
777
778 int result = OK;
779 // TODO(eroman): Is this check really necessary?
780 if (os_error == EINPROGRESS || os_error == EALREADY) {
781 NOTREACHED(); // This indicates a bug in libevent or our code.
782 result = ERR_UNEXPECTED;
783 } else {
784 result = MapConnectError(os_error);
785 }
786
787 connect_os_error_ = os_error;
788 if (result != ERR_IO_PENDING) {
789 DoConnectComplete(result);
790 waiting_connect_ = false;
791 write_socket_watcher_.StopWatchingFileDescriptor();
792 DoWriteCallback(result);
793 }
794 }
795
796 void TCPSocketLibevent::DidCompleteConnectOrWrite() {
797 if (waiting_connect_) {
798 DidCompleteConnect();
799 } else if (!write_callback_.is_null()) {
800 DidCompleteWrite();
801 }
802 }
803
804 void TCPSocketLibevent::DidCompleteAccept() {
233 DCHECK(CalledOnValidThread()); 805 DCHECK(CalledOnValidThread());
234 806
235 int result = AcceptInternal(accept_socket_, accept_address_); 807 int result = AcceptInternal(accept_socket_, accept_address_);
236 if (result != ERR_IO_PENDING) { 808 if (result != ERR_IO_PENDING) {
237 accept_socket_ = NULL; 809 accept_socket_ = NULL;
238 accept_address_ = NULL; 810 accept_address_ = NULL;
239 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor(); 811 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
240 DCHECK(ok); 812 DCHECK(ok);
241 CompletionCallback callback = accept_callback_; 813 CompletionCallback callback = accept_callback_;
242 accept_callback_.Reset(); 814 accept_callback_.Reset();
243 callback.Run(result); 815 callback.Run(result);
244 } 816 }
245 } 817 }
246 818
247 void TCPSocketLibevent::OnFileCanWriteWithoutBlocking(int fd) { 819 int TCPSocketLibevent::InternalWrite(IOBuffer* buf, int buf_len) {
248 NOTREACHED(); 820 int nwrite;
821 if (use_tcp_fastopen_ && !tcp_fastopen_connected_) {
822 SockaddrStorage storage;
823 if (!peer_address_.ToSockAddr(storage.addr, &storage.addr_len)) {
824 errno = EINVAL;
825 return -1;
826 }
827
828 int flags = 0x20000000; // Magic flag to enable TCP_FASTOPEN.
829 #if defined(OS_LINUX)
830 // sendto() will fail with EPIPE when the system doesn't support TCP Fast
831 // Open. Theoretically that shouldn't happen since the caller should check
832 // for system support on startup, but users may dynamically disable TCP Fast
833 // Open via sysctl.
834 flags |= MSG_NOSIGNAL;
835 #endif // defined(OS_LINUX)
836 nwrite = HANDLE_EINTR(sendto(socket_,
837 buf->data(),
838 buf_len,
839 flags,
840 storage.addr,
841 storage.addr_len));
842 tcp_fastopen_connected_ = true;
843
844 if (nwrite < 0) {
845 DCHECK_NE(EPIPE, errno);
846
847 // If errno == EINPROGRESS, that means the kernel didn't have a cookie
848 // and would block. The kernel is internally doing a connect() though.
849 // Remap EINPROGRESS to EAGAIN so we treat this the same as our other
850 // asynchronous cases. Note that the user buffer has not been copied to
851 // kernel space.
852 if (errno == EINPROGRESS) {
853 errno = EAGAIN;
854 fast_open_status_ = FAST_OPEN_SLOW_CONNECT_RETURN;
855 } else {
856 fast_open_status_ = FAST_OPEN_ERROR;
857 }
858 } else {
859 fast_open_status_ = FAST_OPEN_FAST_CONNECT_RETURN;
860 }
861 } else {
862 nwrite = HANDLE_EINTR(write(socket_, buf->data(), buf_len));
863 }
864 return nwrite;
865 }
866
867 void TCPSocketLibevent::RecordFastOpenStatus() {
868 if (use_tcp_fastopen_ &&
869 (fast_open_status_ == FAST_OPEN_FAST_CONNECT_RETURN ||
870 fast_open_status_ == FAST_OPEN_SLOW_CONNECT_RETURN)) {
871 DCHECK_NE(FAST_OPEN_STATUS_UNKNOWN, fast_open_status_);
872 bool getsockopt_success(false);
873 bool server_acked_data(false);
874 #if defined(TCP_INFO)
875 // Probe to see the if the socket used TCP Fast Open.
876 tcp_info info;
877 socklen_t info_len = sizeof(tcp_info);
878 getsockopt_success =
879 getsockopt(socket_, IPPROTO_TCP, TCP_INFO, &info, &info_len) == 0 &&
880 info_len == sizeof(tcp_info);
881 server_acked_data = getsockopt_success &&
882 (info.tcpi_options & TCPI_OPT_SYN_DATA);
883 #endif
884 if (getsockopt_success) {
885 if (fast_open_status_ == FAST_OPEN_FAST_CONNECT_RETURN) {
886 fast_open_status_ = (server_acked_data ? FAST_OPEN_SYN_DATA_ACK :
887 FAST_OPEN_SYN_DATA_NACK);
888 } else {
889 fast_open_status_ = (server_acked_data ? FAST_OPEN_NO_SYN_DATA_ACK :
890 FAST_OPEN_NO_SYN_DATA_NACK);
891 }
892 } else {
893 fast_open_status_ = (fast_open_status_ == FAST_OPEN_FAST_CONNECT_RETURN ?
894 FAST_OPEN_SYN_DATA_FAILED :
895 FAST_OPEN_NO_SYN_DATA_FAILED);
896 }
897 }
249 } 898 }
250 899
251 } // namespace net 900 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698