Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(841)

Side by Side Diff: net/socket/tcp_socket_libevent.cc

Issue 23454010: POSIX only: Move client socket functionality from TCPClientSocket into TCPSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/socket/tcp_socket_libevent.h ('k') | net/socket/tcp_socket_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/socket/tcp_socket_libevent.h" 5 #include "net/socket/tcp_socket.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <fcntl.h> 8 #include <fcntl.h>
9 #include <netdb.h> 9 #include <netdb.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
10 #include <sys/socket.h> 12 #include <sys/socket.h>
11 13
14 #include "base/callback_helpers.h"
15 #include "base/logging.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/stats_counters.h"
18 #include "base/posix/eintr_wrapper.h"
12 #include "build/build_config.h" 19 #include "build/build_config.h"
13 20 #include "net/base/address_list.h"
14 #if defined(OS_POSIX) 21 #include "net/base/connection_type_histograms.h"
15 #include <netinet/in.h> 22 #include "net/base/io_buffer.h"
16 #endif
17
18 #include "base/logging.h"
19 #include "base/posix/eintr_wrapper.h"
20 #include "net/base/ip_endpoint.h" 23 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h" 24 #include "net/base/net_errors.h"
22 #include "net/base/net_util.h" 25 #include "net/base/net_util.h"
23 #include "net/socket/socket_descriptor.h" 26 #include "net/base/network_change_notifier.h"
24 #include "net/socket/socket_net_log_params.h" 27 #include "net/socket/socket_net_log_params.h"
25 28
29 // If we don't have a definition for TCPI_OPT_SYN_DATA, create one.
30 #ifndef TCPI_OPT_SYN_DATA
31 #define TCPI_OPT_SYN_DATA 32
32 #endif
33
26 namespace net { 34 namespace net {
27 35
36 namespace {
37
38 const int kTCPKeepAliveSeconds = 45;
39
40 // SetTCPNoDelay turns on/off buffering in the kernel. By default, TCP sockets
41 // will wait up to 200ms for more data to complete a packet before transmitting.
42 // After calling this function, the kernel will not wait. See TCP_NODELAY in
43 // `man 7 tcp`.
44 bool SetTCPNoDelay(int fd, bool no_delay) {
45 int on = no_delay ? 1 : 0;
46 int error = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
47 return error == 0;
48 }
49
50 // SetTCPKeepAlive sets SO_KEEPALIVE.
51 bool SetTCPKeepAlive(int fd, bool enable, int delay) {
52 int on = enable ? 1 : 0;
53 if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on))) {
54 PLOG(ERROR) << "Failed to set SO_KEEPALIVE on fd: " << fd;
55 return false;
56 }
57 #if defined(OS_LINUX) || defined(OS_ANDROID)
58 // Set seconds until first TCP keep alive.
59 if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &delay, sizeof(delay))) {
60 PLOG(ERROR) << "Failed to set TCP_KEEPIDLE on fd: " << fd;
61 return false;
62 }
63 // Set seconds between TCP keep alives.
64 if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &delay, sizeof(delay))) {
65 PLOG(ERROR) << "Failed to set TCP_KEEPINTVL on fd: " << fd;
66 return false;
67 }
68 #endif
69 return true;
70 }
71
72 int MapConnectError(int os_error) {
73 switch (os_error) {
74 case EACCES:
75 return ERR_NETWORK_ACCESS_DENIED;
76 case ETIMEDOUT:
77 return ERR_CONNECTION_TIMED_OUT;
78 default: {
79 int net_error = MapSystemError(os_error);
80 if (net_error == ERR_FAILED)
81 return ERR_CONNECTION_FAILED; // More specific than ERR_FAILED.
82
83 // Give a more specific error when the user is offline.
84 if (net_error == ERR_ADDRESS_UNREACHABLE &&
85 NetworkChangeNotifier::IsOffline()) {
86 return ERR_INTERNET_DISCONNECTED;
87 }
88 return net_error;
89 }
90 }
91 }
92
93 } // namespace
94
95 //-----------------------------------------------------------------------------
96
97 TCPSocketLibevent::Watcher::Watcher(
98 const base::Closure& read_ready_callback,
99 const base::Closure& write_ready_callback)
100 : read_ready_callback_(read_ready_callback),
101 write_ready_callback_(write_ready_callback) {
102 }
103
104 TCPSocketLibevent::Watcher::~Watcher() {
105 }
106
107 void TCPSocketLibevent::Watcher::OnFileCanReadWithoutBlocking(int /* fd */) {
108 if (!read_ready_callback_.is_null())
109 read_ready_callback_.Run();
110 else
111 NOTREACHED();
112 }
113
114 void TCPSocketLibevent::Watcher::OnFileCanWriteWithoutBlocking(int /* fd */) {
115 if (!write_ready_callback_.is_null())
116 write_ready_callback_.Run();
117 else
118 NOTREACHED();
119 }
120
28 TCPSocketLibevent::TCPSocketLibevent(NetLog* net_log, 121 TCPSocketLibevent::TCPSocketLibevent(NetLog* net_log,
29 const NetLog::Source& source) 122 const NetLog::Source& source)
30 : socket_(kInvalidSocket), 123 : socket_(kInvalidSocket),
124 accept_watcher_(base::Bind(&TCPSocketLibevent::DidCompleteAccept,
125 base::Unretained(this)),
126 base::Closure()),
31 accept_socket_(NULL), 127 accept_socket_(NULL),
32 accept_address_(NULL), 128 accept_address_(NULL),
129 read_watcher_(base::Bind(&TCPSocketLibevent::DidCompleteRead,
130 base::Unretained(this)),
131 base::Closure()),
132 write_watcher_(base::Closure(),
133 base::Bind(&TCPSocketLibevent::DidCompleteConnectOrWrite,
134 base::Unretained(this))),
135 read_buf_len_(0),
136 write_buf_len_(0),
137 use_tcp_fastopen_(IsTCPFastOpenEnabled()),
138 tcp_fastopen_connected_(false),
139 fast_open_status_(FAST_OPEN_STATUS_UNKNOWN),
140 waiting_connect_(false),
141 connect_os_error_(0),
142 logging_multiple_connect_attempts_(false),
33 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)) { 143 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SOCKET)) {
34 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE, 144 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
35 source.ToEventParametersCallback()); 145 source.ToEventParametersCallback());
36 } 146 }
37 147
38 TCPSocketLibevent::~TCPSocketLibevent() { 148 TCPSocketLibevent::~TCPSocketLibevent() {
39 if (socket_ != kInvalidSocket)
40 Close();
41 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE); 149 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
150 if (tcp_fastopen_connected_) {
151 UMA_HISTOGRAM_ENUMERATION("Net.TcpFastOpenSocketConnection",
152 fast_open_status_, FAST_OPEN_MAX_VALUE);
153 }
154 Close();
42 } 155 }
43 156
44 int TCPSocketLibevent::Open(AddressFamily family) { 157 int TCPSocketLibevent::Open(AddressFamily family) {
45 DCHECK(CalledOnValidThread()); 158 DCHECK(CalledOnValidThread());
46 DCHECK_EQ(socket_, kInvalidSocket); 159 DCHECK_EQ(socket_, kInvalidSocket);
47 160
48 socket_ = CreatePlatformSocket(ConvertAddressFamily(family), SOCK_STREAM, 161 socket_ = CreatePlatformSocket(ConvertAddressFamily(family), SOCK_STREAM,
49 IPPROTO_TCP); 162 IPPROTO_TCP);
50 if (socket_ < 0) { 163 if (socket_ < 0) {
51 PLOG(ERROR) << "CreatePlatformSocket() returned an error"; 164 PLOG(ERROR) << "CreatePlatformSocket() returned an error";
52 return MapSystemError(errno); 165 return MapSystemError(errno);
53 } 166 }
54 167
55 if (SetNonBlocking(socket_)) { 168 if (SetNonBlocking(socket_)) {
56 int result = MapSystemError(errno); 169 int result = MapSystemError(errno);
57 Close(); 170 Close();
58 return result; 171 return result;
59 } 172 }
60 173
61 return OK; 174 return OK;
62 } 175 }
63 176
64 int TCPSocketLibevent::Adopt(int socket) { 177 int TCPSocketLibevent::AdoptConnectedSocket(int socket,
178 const IPEndPoint& peer_address) {
65 DCHECK(CalledOnValidThread()); 179 DCHECK(CalledOnValidThread());
66 DCHECK_EQ(socket_, kInvalidSocket); 180 DCHECK_EQ(socket_, kInvalidSocket);
67 181
68 socket_ = socket; 182 socket_ = socket;
69 183
70 if (SetNonBlocking(socket_)) { 184 if (SetNonBlocking(socket_)) {
71 int result = MapSystemError(errno); 185 int result = MapSystemError(errno);
72 Close(); 186 Close();
73 return result; 187 return result;
74 } 188 }
75 189
190 peer_address_.reset(new IPEndPoint(peer_address));
191
76 return OK; 192 return OK;
77 } 193 }
78 194
79 int TCPSocketLibevent::Release() {
80 DCHECK(CalledOnValidThread());
81 DCHECK(accept_callback_.is_null());
82
83 int result = socket_;
84 socket_ = kInvalidSocket;
85 return result;
86 }
87
88 int TCPSocketLibevent::Bind(const IPEndPoint& address) { 195 int TCPSocketLibevent::Bind(const IPEndPoint& address) {
89 DCHECK(CalledOnValidThread()); 196 DCHECK(CalledOnValidThread());
90 DCHECK_NE(socket_, kInvalidSocket); 197 DCHECK_NE(socket_, kInvalidSocket);
91 198
92 SockaddrStorage storage; 199 SockaddrStorage storage;
93 if (!address.ToSockAddr(storage.addr, &storage.addr_len)) 200 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
94 return ERR_ADDRESS_INVALID; 201 return ERR_ADDRESS_INVALID;
95 202
96 int result = bind(socket_, storage.addr, storage.addr_len); 203 int result = bind(socket_, storage.addr, storage.addr_len);
97 if (result < 0) { 204 if (result < 0) {
98 PLOG(ERROR) << "bind() returned an error"; 205 PLOG(ERROR) << "bind() returned an error";
99 return MapSystemError(errno); 206 return MapSystemError(errno);
100 } 207 }
101 208
102 return OK; 209 return OK;
103 } 210 }
104 211
105 int TCPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
106 DCHECK(CalledOnValidThread());
107 DCHECK(address);
108
109 SockaddrStorage storage;
110 if (getsockname(socket_, storage.addr, &storage.addr_len) < 0)
111 return MapSystemError(errno);
112 if (!address->FromSockAddr(storage.addr, storage.addr_len))
113 return ERR_FAILED;
114
115 return OK;
116 }
117
118 int TCPSocketLibevent::Listen(int backlog) { 212 int TCPSocketLibevent::Listen(int backlog) {
119 DCHECK(CalledOnValidThread()); 213 DCHECK(CalledOnValidThread());
120 DCHECK_GT(backlog, 0); 214 DCHECK_GT(backlog, 0);
121 DCHECK_NE(socket_, kInvalidSocket); 215 DCHECK_NE(socket_, kInvalidSocket);
122 216
123 int result = listen(socket_, backlog); 217 int result = listen(socket_, backlog);
124 if (result < 0) { 218 if (result < 0) {
125 PLOG(ERROR) << "listen() returned an error"; 219 PLOG(ERROR) << "listen() returned an error";
126 return MapSystemError(errno); 220 return MapSystemError(errno);
127 } 221 }
(...skipping 10 matching lines...) Expand all
138 DCHECK(!callback.is_null()); 232 DCHECK(!callback.is_null());
139 DCHECK(accept_callback_.is_null()); 233 DCHECK(accept_callback_.is_null());
140 234
141 net_log_.BeginEvent(NetLog::TYPE_TCP_ACCEPT); 235 net_log_.BeginEvent(NetLog::TYPE_TCP_ACCEPT);
142 236
143 int result = AcceptInternal(socket, address); 237 int result = AcceptInternal(socket, address);
144 238
145 if (result == ERR_IO_PENDING) { 239 if (result == ERR_IO_PENDING) {
146 if (!base::MessageLoopForIO::current()->WatchFileDescriptor( 240 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
147 socket_, true, base::MessageLoopForIO::WATCH_READ, 241 socket_, true, base::MessageLoopForIO::WATCH_READ,
148 &accept_socket_watcher_, this)) { 242 &accept_socket_watcher_, &accept_watcher_)) {
149 PLOG(ERROR) << "WatchFileDescriptor failed on read"; 243 PLOG(ERROR) << "WatchFileDescriptor failed on read";
150 return MapSystemError(errno); 244 return MapSystemError(errno);
151 } 245 }
152 246
153 accept_socket_ = socket; 247 accept_socket_ = socket;
154 accept_address_ = address; 248 accept_address_ = address;
155 accept_callback_ = callback; 249 accept_callback_ = callback;
156 } 250 }
157 251
158 return result; 252 return result;
159 } 253 }
160 254
255 int TCPSocketLibevent::Connect(const IPEndPoint& address,
256 const CompletionCallback& callback) {
257 DCHECK(CalledOnValidThread());
258 DCHECK_NE(socket_, kInvalidSocket);
259 DCHECK(!waiting_connect_);
260
261 // |peer_address_| will be non-NULL if Connect() has been called. Unless
262 // Close() is called to reset the internal state, a second call to Connect()
263 // is not allowed.
264 // Please note that we don't allow a second Connect() even if the previous
265 // Connect() has failed. Connecting the same |socket_| again after a
266 // connection attempt failed results in unspecified behavior according to
267 // POSIX.
268 DCHECK(!peer_address_);
269
270 if (!logging_multiple_connect_attempts_)
271 LogConnectBegin(AddressList(address));
272
273 peer_address_.reset(new IPEndPoint(address));
274
275 int rv = DoConnect();
276 if (rv == ERR_IO_PENDING) {
277 // Synchronous operation not supported.
278 DCHECK(!callback.is_null());
279 write_callback_ = callback;
280 waiting_connect_ = true;
281 } else {
282 DoConnectComplete(rv);
283 }
284
285 return rv;
286 }
287
288 bool TCPSocketLibevent::IsConnected() const {
289 DCHECK(CalledOnValidThread());
290
291 if (socket_ == kInvalidSocket || waiting_connect_)
292 return false;
293
294 if (use_tcp_fastopen_ && !tcp_fastopen_connected_ && peer_address_) {
295 // With TCP FastOpen, we pretend that the socket is connected.
296 // This allows GetPeerAddress() to return peer_address_.
297 return true;
298 }
299
300 // Check if connection is alive.
301 char c;
302 int rv = HANDLE_EINTR(recv(socket_, &c, 1, MSG_PEEK));
303 if (rv == 0)
304 return false;
305 if (rv == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
306 return false;
307
308 return true;
309 }
310
311 bool TCPSocketLibevent::IsConnectedAndIdle() const {
312 DCHECK(CalledOnValidThread());
313
314 if (socket_ == kInvalidSocket || waiting_connect_)
315 return false;
316
317 // TODO(wtc): should we also handle the TCP FastOpen case here,
318 // as we do in IsConnected()?
319
320 // Check if connection is alive and we haven't received any data
321 // unexpectedly.
322 char c;
323 int rv = HANDLE_EINTR(recv(socket_, &c, 1, MSG_PEEK));
324 if (rv >= 0)
325 return false;
326 if (errno != EAGAIN && errno != EWOULDBLOCK)
327 return false;
328
329 return true;
330 }
331
332 int TCPSocketLibevent::Read(IOBuffer* buf,
333 int buf_len,
334 const CompletionCallback& callback) {
335 DCHECK(CalledOnValidThread());
336 DCHECK_NE(kInvalidSocket, socket_);
337 DCHECK(!waiting_connect_);
338 DCHECK(read_callback_.is_null());
339 // Synchronous operation not supported
340 DCHECK(!callback.is_null());
341 DCHECK_GT(buf_len, 0);
342
343 int nread = HANDLE_EINTR(read(socket_, buf->data(), buf_len));
344 if (nread >= 0) {
345 base::StatsCounter read_bytes("tcp.read_bytes");
346 read_bytes.Add(nread);
347 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, nread,
348 buf->data());
349 RecordFastOpenStatus();
350 return nread;
351 }
352 if (errno != EAGAIN && errno != EWOULDBLOCK) {
353 int net_error = MapSystemError(errno);
354 net_log_.AddEvent(NetLog::TYPE_SOCKET_READ_ERROR,
355 CreateNetLogSocketErrorCallback(net_error, errno));
356 return net_error;
357 }
358
359 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
360 socket_, true, base::MessageLoopForIO::WATCH_READ,
361 &read_socket_watcher_, &read_watcher_)) {
362 DVLOG(1) << "WatchFileDescriptor failed on read, errno " << errno;
363 return MapSystemError(errno);
364 }
365
366 read_buf_ = buf;
367 read_buf_len_ = buf_len;
368 read_callback_ = callback;
369 return ERR_IO_PENDING;
370 }
371
372 int TCPSocketLibevent::Write(IOBuffer* buf,
373 int buf_len,
374 const CompletionCallback& callback) {
375 DCHECK(CalledOnValidThread());
376 DCHECK_NE(kInvalidSocket, socket_);
377 DCHECK(!waiting_connect_);
378 DCHECK(write_callback_.is_null());
379 // Synchronous operation not supported
380 DCHECK(!callback.is_null());
381 DCHECK_GT(buf_len, 0);
382
383 int nwrite = InternalWrite(buf, buf_len);
384 if (nwrite >= 0) {
385 base::StatsCounter write_bytes("tcp.write_bytes");
386 write_bytes.Add(nwrite);
387 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, nwrite,
388 buf->data());
389 return nwrite;
390 }
391 if (errno != EAGAIN && errno != EWOULDBLOCK) {
392 int net_error = MapSystemError(errno);
393 net_log_.AddEvent(NetLog::TYPE_SOCKET_WRITE_ERROR,
394 CreateNetLogSocketErrorCallback(net_error, errno));
395 return net_error;
396 }
397
398 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
399 socket_, true, base::MessageLoopForIO::WATCH_WRITE,
400 &write_socket_watcher_, &write_watcher_)) {
401 DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno;
402 return MapSystemError(errno);
403 }
404
405 write_buf_ = buf;
406 write_buf_len_ = buf_len;
407 write_callback_ = callback;
408 return ERR_IO_PENDING;
409 }
410
411 int TCPSocketLibevent::GetLocalAddress(IPEndPoint* address) const {
412 DCHECK(CalledOnValidThread());
413 DCHECK(address);
414
415 SockaddrStorage storage;
416 if (getsockname(socket_, storage.addr, &storage.addr_len) < 0)
417 return MapSystemError(errno);
418 if (!address->FromSockAddr(storage.addr, storage.addr_len))
419 return ERR_ADDRESS_INVALID;
420
421 return OK;
422 }
423
424 int TCPSocketLibevent::GetPeerAddress(IPEndPoint* address) const {
425 DCHECK(CalledOnValidThread());
426 DCHECK(address);
427 if (!IsConnected())
428 return ERR_SOCKET_NOT_CONNECTED;
429 *address = *peer_address_;
430 return OK;
431 }
432
161 int TCPSocketLibevent::SetDefaultOptionsForServer() { 433 int TCPSocketLibevent::SetDefaultOptionsForServer() {
434 DCHECK(CalledOnValidThread());
162 return SetAddressReuse(true); 435 return SetAddressReuse(true);
163 } 436 }
164 437
438 void TCPSocketLibevent::SetDefaultOptionsForClient() {
439 DCHECK(CalledOnValidThread());
440
441 // This mirrors the behaviour on Windows. See the comment in
442 // tcp_socket_win.cc after searching for "NODELAY".
443 SetTCPNoDelay(socket_, true); // If SetTCPNoDelay fails, we don't care.
444 SetTCPKeepAlive(socket_, true, kTCPKeepAliveSeconds);
445 }
446
165 int TCPSocketLibevent::SetAddressReuse(bool allow) { 447 int TCPSocketLibevent::SetAddressReuse(bool allow) {
448 DCHECK(CalledOnValidThread());
449
166 // SO_REUSEADDR is useful for server sockets to bind to a recently unbound 450 // SO_REUSEADDR is useful for server sockets to bind to a recently unbound
167 // port. When a socket is closed, the end point changes its state to TIME_WAIT 451 // port. When a socket is closed, the end point changes its state to TIME_WAIT
168 // and wait for 2 MSL (maximum segment lifetime) to ensure the remote peer 452 // and wait for 2 MSL (maximum segment lifetime) to ensure the remote peer
169 // acknowledges its closure. For server sockets, it is usually safe to 453 // acknowledges its closure. For server sockets, it is usually safe to
170 // bind to a TIME_WAIT end point immediately, which is a widely adopted 454 // bind to a TIME_WAIT end point immediately, which is a widely adopted
171 // behavior. 455 // behavior.
172 // 456 //
173 // Note that on *nix, SO_REUSEADDR does not enable the TCP socket to bind to 457 // Note that on *nix, SO_REUSEADDR does not enable the TCP socket to bind to
174 // an end point that is already bound by another socket. To do that one must 458 // an end point that is already bound by another socket. To do that one must
175 // set SO_REUSEPORT instead. This option is not provided on Linux prior 459 // set SO_REUSEPORT instead. This option is not provided on Linux prior
176 // to 3.9. 460 // to 3.9.
177 // 461 //
178 // SO_REUSEPORT is provided in MacOS X and iOS. 462 // SO_REUSEPORT is provided in MacOS X and iOS.
179 int boolean_value = allow ? 1 : 0; 463 int boolean_value = allow ? 1 : 0;
180 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &boolean_value, 464 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &boolean_value,
181 sizeof(boolean_value)); 465 sizeof(boolean_value));
182 if (rv < 0) 466 if (rv < 0)
183 return MapSystemError(errno); 467 return MapSystemError(errno);
184 return OK; 468 return OK;
185 } 469 }
186 470
471 bool TCPSocketLibevent::SetReceiveBufferSize(int32 size) {
472 DCHECK(CalledOnValidThread());
473 int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
474 reinterpret_cast<const char*>(&size),
475 sizeof(size));
476 DCHECK(!rv) << "Could not set socket receive buffer size: " << errno;
477 return rv == 0;
478 }
479
480 bool TCPSocketLibevent::SetSendBufferSize(int32 size) {
481 DCHECK(CalledOnValidThread());
482 int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
483 reinterpret_cast<const char*>(&size),
484 sizeof(size));
485 DCHECK(!rv) << "Could not set socket send buffer size: " << errno;
486 return rv == 0;
487 }
488
489 bool TCPSocketLibevent::SetKeepAlive(bool enable, int delay) {
490 DCHECK(CalledOnValidThread());
491 return SetTCPKeepAlive(socket_, enable, delay);
492 }
493
494 bool TCPSocketLibevent::SetNoDelay(bool no_delay) {
495 DCHECK(CalledOnValidThread());
496 return SetTCPNoDelay(socket_, no_delay);
497 }
498
187 void TCPSocketLibevent::Close() { 499 void TCPSocketLibevent::Close() {
500 DCHECK(CalledOnValidThread());
501
502 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
503 DCHECK(ok);
504 ok = read_socket_watcher_.StopWatchingFileDescriptor();
505 DCHECK(ok);
506 ok = write_socket_watcher_.StopWatchingFileDescriptor();
507 DCHECK(ok);
508
188 if (socket_ != kInvalidSocket) { 509 if (socket_ != kInvalidSocket) {
189 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
190 DCHECK(ok);
191 if (HANDLE_EINTR(close(socket_)) < 0) 510 if (HANDLE_EINTR(close(socket_)) < 0)
192 PLOG(ERROR) << "close"; 511 PLOG(ERROR) << "close";
193 socket_ = kInvalidSocket; 512 socket_ = kInvalidSocket;
194 } 513 }
514
515 if (!accept_callback_.is_null()) {
516 accept_socket_ = NULL;
517 accept_address_ = NULL;
518 accept_callback_.Reset();
519 }
520
521 if (!read_callback_.is_null()) {
522 read_buf_ = NULL;
523 read_buf_len_ = 0;
524 read_callback_.Reset();
525 }
526
527 if (!write_callback_.is_null()) {
528 write_buf_ = NULL;
529 write_buf_len_ = 0;
530 write_callback_.Reset();
531 }
532
533 tcp_fastopen_connected_ = false;
534 fast_open_status_ = FAST_OPEN_STATUS_UNKNOWN;
535 waiting_connect_ = false;
536 peer_address_.reset();
537 connect_os_error_ = 0;
538 }
539
540 bool TCPSocketLibevent::UsingTCPFastOpen() const {
541 return use_tcp_fastopen_;
542 }
543
544 void TCPSocketLibevent::StartLoggingMultipleConnectAttempts(
545 const AddressList& addresses) {
546 if (!logging_multiple_connect_attempts_) {
547 logging_multiple_connect_attempts_ = true;
548 LogConnectBegin(addresses);
549 } else {
550 NOTREACHED();
551 }
552 }
553
554 void TCPSocketLibevent::EndLoggingMultipleConnectAttempts(int net_error) {
555 if (logging_multiple_connect_attempts_) {
556 LogConnectEnd(net_error);
557 logging_multiple_connect_attempts_ = false;
558 } else {
559 NOTREACHED();
560 }
195 } 561 }
196 562
197 int TCPSocketLibevent::AcceptInternal(scoped_ptr<TCPSocketLibevent>* socket, 563 int TCPSocketLibevent::AcceptInternal(scoped_ptr<TCPSocketLibevent>* socket,
198 IPEndPoint* address) { 564 IPEndPoint* address) {
199 SockaddrStorage storage; 565 SockaddrStorage storage;
200 int new_socket = HANDLE_EINTR(accept(socket_, 566 int new_socket = HANDLE_EINTR(accept(socket_,
201 storage.addr, 567 storage.addr,
202 &storage.addr_len)); 568 &storage.addr_len));
203 if (new_socket < 0) { 569 if (new_socket < 0) {
204 int net_error = MapSystemError(errno); 570 int net_error = MapSystemError(errno);
205 if (net_error != ERR_IO_PENDING) 571 if (net_error != ERR_IO_PENDING)
206 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, net_error); 572 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, net_error);
207 return net_error; 573 return net_error;
208 } 574 }
209 575
210 IPEndPoint ip_end_point; 576 IPEndPoint ip_end_point;
211 if (!ip_end_point.FromSockAddr(storage.addr, storage.addr_len)) { 577 if (!ip_end_point.FromSockAddr(storage.addr, storage.addr_len)) {
212 NOTREACHED(); 578 NOTREACHED();
213 if (HANDLE_EINTR(close(new_socket)) < 0) 579 if (HANDLE_EINTR(close(new_socket)) < 0)
214 PLOG(ERROR) << "close"; 580 PLOG(ERROR) << "close";
215 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, ERR_FAILED); 581 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT,
216 return ERR_FAILED; 582 ERR_ADDRESS_INVALID);
583 return ERR_ADDRESS_INVALID;
217 } 584 }
218 scoped_ptr<TCPSocketLibevent> tcp_socket(new TCPSocketLibevent( 585 scoped_ptr<TCPSocketLibevent> tcp_socket(new TCPSocketLibevent(
219 net_log_.net_log(), net_log_.source())); 586 net_log_.net_log(), net_log_.source()));
220 int adopt_result = tcp_socket->Adopt(new_socket); 587 int adopt_result = tcp_socket->AdoptConnectedSocket(new_socket, ip_end_point);
221 if (adopt_result != OK) { 588 if (adopt_result != OK) {
222 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, adopt_result); 589 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_ACCEPT, adopt_result);
223 return adopt_result; 590 return adopt_result;
224 } 591 }
225 *socket = tcp_socket.Pass(); 592 *socket = tcp_socket.Pass();
226 *address = ip_end_point; 593 *address = ip_end_point;
227 net_log_.EndEvent(NetLog::TYPE_TCP_ACCEPT, 594 net_log_.EndEvent(NetLog::TYPE_TCP_ACCEPT,
228 CreateNetLogIPEndPointCallback(&ip_end_point)); 595 CreateNetLogIPEndPointCallback(&ip_end_point));
229 return OK; 596 return OK;
230 } 597 }
231 598
232 void TCPSocketLibevent::OnFileCanReadWithoutBlocking(int fd) { 599 int TCPSocketLibevent::DoConnect() {
600 DCHECK_EQ(0, connect_os_error_);
601
602 net_log_.BeginEvent(NetLog::TYPE_TCP_CONNECT_ATTEMPT,
603 CreateNetLogIPEndPointCallback(peer_address_.get()));
604
605 // Connect the socket.
606 if (!use_tcp_fastopen_) {
607 SockaddrStorage storage;
608 if (!peer_address_->ToSockAddr(storage.addr, &storage.addr_len))
609 return ERR_INVALID_ARGUMENT;
610
611 if (!HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len))) {
612 // Connected without waiting!
613 return OK;
614 }
615 } else {
616 // With TCP FastOpen, we pretend that the socket is connected.
617 DCHECK(!tcp_fastopen_connected_);
618 return OK;
619 }
620
621 // Check if the connect() failed synchronously.
622 connect_os_error_ = errno;
623 if (connect_os_error_ != EINPROGRESS)
624 return MapConnectError(connect_os_error_);
625
626 // Otherwise the connect() is going to complete asynchronously, so watch
627 // for its completion.
628 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
629 socket_, true, base::MessageLoopForIO::WATCH_WRITE,
630 &write_socket_watcher_, &write_watcher_)) {
631 connect_os_error_ = errno;
632 DVLOG(1) << "WatchFileDescriptor failed: " << connect_os_error_;
633 return MapSystemError(connect_os_error_);
634 }
635
636 return ERR_IO_PENDING;
637 }
638
639 void TCPSocketLibevent::DoConnectComplete(int result) {
640 // Log the end of this attempt (and any OS error it threw).
641 int os_error = connect_os_error_;
642 connect_os_error_ = 0;
643 if (result != OK) {
644 net_log_.EndEvent(NetLog::TYPE_TCP_CONNECT_ATTEMPT,
645 NetLog::IntegerCallback("os_error", os_error));
646 } else {
647 net_log_.EndEvent(NetLog::TYPE_TCP_CONNECT_ATTEMPT);
648 }
649
650 if (!logging_multiple_connect_attempts_)
651 LogConnectEnd(result);
652 }
653
654 void TCPSocketLibevent::LogConnectBegin(const AddressList& addresses) {
655 base::StatsCounter connects("tcp.connect");
656 connects.Increment();
657
658 net_log_.BeginEvent(NetLog::TYPE_TCP_CONNECT,
659 addresses.CreateNetLogCallback());
660 }
661
662 void TCPSocketLibevent::LogConnectEnd(int net_error) {
663 if (net_error == OK)
664 UpdateConnectionTypeHistograms(CONNECTION_ANY);
665
666 if (net_error != OK) {
667 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_CONNECT, net_error);
668 return;
669 }
670
671 SockaddrStorage storage;
672 int rv = getsockname(socket_, storage.addr, &storage.addr_len);
673 if (rv != 0) {
674 PLOG(ERROR) << "getsockname() [rv: " << rv << "] error: ";
675 NOTREACHED();
676 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_TCP_CONNECT, rv);
677 return;
678 }
679
680 net_log_.EndEvent(NetLog::TYPE_TCP_CONNECT,
681 CreateNetLogSourceAddressCallback(storage.addr,
682 storage.addr_len));
683 }
684
685 void TCPSocketLibevent::DidCompleteRead() {
686 RecordFastOpenStatus();
687 if (read_callback_.is_null())
688 return;
689
690 int bytes_transferred;
691 bytes_transferred = HANDLE_EINTR(read(socket_, read_buf_->data(),
692 read_buf_len_));
693
694 int result;
695 if (bytes_transferred >= 0) {
696 result = bytes_transferred;
697 base::StatsCounter read_bytes("tcp.read_bytes");
698 read_bytes.Add(bytes_transferred);
699 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, result,
700 read_buf_->data());
701 } else {
702 result = MapSystemError(errno);
703 if (result != ERR_IO_PENDING) {
704 net_log_.AddEvent(NetLog::TYPE_SOCKET_READ_ERROR,
705 CreateNetLogSocketErrorCallback(result, errno));
706 }
707 }
708
709 if (result != ERR_IO_PENDING) {
710 read_buf_ = NULL;
711 read_buf_len_ = 0;
712 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
713 DCHECK(ok);
714 base::ResetAndReturn(&read_callback_).Run(result);
715 }
716 }
717
718 void TCPSocketLibevent::DidCompleteWrite() {
719 if (write_callback_.is_null())
720 return;
721
722 int bytes_transferred;
723 bytes_transferred = HANDLE_EINTR(write(socket_, write_buf_->data(),
724 write_buf_len_));
725
726 int result;
727 if (bytes_transferred >= 0) {
728 result = bytes_transferred;
729 base::StatsCounter write_bytes("tcp.write_bytes");
730 write_bytes.Add(bytes_transferred);
731 net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, result,
732 write_buf_->data());
733 } else {
734 result = MapSystemError(errno);
735 if (result != ERR_IO_PENDING) {
736 net_log_.AddEvent(NetLog::TYPE_SOCKET_WRITE_ERROR,
737 CreateNetLogSocketErrorCallback(result, errno));
738 }
739 }
740
741 if (result != ERR_IO_PENDING) {
742 write_buf_ = NULL;
743 write_buf_len_ = 0;
744 write_socket_watcher_.StopWatchingFileDescriptor();
745 base::ResetAndReturn(&write_callback_).Run(result);
746 }
747 }
748
749 void TCPSocketLibevent::DidCompleteConnect() {
750 DCHECK(waiting_connect_);
751
752 // Get the error that connect() completed with.
753 int os_error = 0;
754 socklen_t len = sizeof(os_error);
755 if (getsockopt(socket_, SOL_SOCKET, SO_ERROR, &os_error, &len) < 0)
756 os_error = errno;
757
758 int result = MapConnectError(os_error);
759 connect_os_error_ = os_error;
760 if (result != ERR_IO_PENDING) {
761 DoConnectComplete(result);
762 waiting_connect_ = false;
763 write_socket_watcher_.StopWatchingFileDescriptor();
764 base::ResetAndReturn(&write_callback_).Run(result);
765 }
766 }
767
768 void TCPSocketLibevent::DidCompleteConnectOrWrite() {
769 if (waiting_connect_)
770 DidCompleteConnect();
771 else
772 DidCompleteWrite();
773 }
774
775 void TCPSocketLibevent::DidCompleteAccept() {
233 DCHECK(CalledOnValidThread()); 776 DCHECK(CalledOnValidThread());
234 777
235 int result = AcceptInternal(accept_socket_, accept_address_); 778 int result = AcceptInternal(accept_socket_, accept_address_);
236 if (result != ERR_IO_PENDING) { 779 if (result != ERR_IO_PENDING) {
237 accept_socket_ = NULL; 780 accept_socket_ = NULL;
238 accept_address_ = NULL; 781 accept_address_ = NULL;
239 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor(); 782 bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
240 DCHECK(ok); 783 DCHECK(ok);
241 CompletionCallback callback = accept_callback_; 784 CompletionCallback callback = accept_callback_;
242 accept_callback_.Reset(); 785 accept_callback_.Reset();
243 callback.Run(result); 786 callback.Run(result);
244 } 787 }
245 } 788 }
246 789
247 void TCPSocketLibevent::OnFileCanWriteWithoutBlocking(int fd) { 790 int TCPSocketLibevent::InternalWrite(IOBuffer* buf, int buf_len) {
248 NOTREACHED(); 791 int nwrite;
792 if (use_tcp_fastopen_ && !tcp_fastopen_connected_) {
793 SockaddrStorage storage;
794 if (!peer_address_->ToSockAddr(storage.addr, &storage.addr_len)) {
795 errno = EINVAL;
796 return -1;
797 }
798
799 int flags = 0x20000000; // Magic flag to enable TCP_FASTOPEN.
800 #if defined(OS_LINUX)
801 // sendto() will fail with EPIPE when the system doesn't support TCP Fast
802 // Open. Theoretically that shouldn't happen since the caller should check
803 // for system support on startup, but users may dynamically disable TCP Fast
804 // Open via sysctl.
805 flags |= MSG_NOSIGNAL;
806 #endif // defined(OS_LINUX)
807 nwrite = HANDLE_EINTR(sendto(socket_,
808 buf->data(),
809 buf_len,
810 flags,
811 storage.addr,
812 storage.addr_len));
813 tcp_fastopen_connected_ = true;
814
815 if (nwrite < 0) {
816 DCHECK_NE(EPIPE, errno);
817
818 // If errno == EINPROGRESS, that means the kernel didn't have a cookie
819 // and would block. The kernel is internally doing a connect() though.
820 // Remap EINPROGRESS to EAGAIN so we treat this the same as our other
821 // asynchronous cases. Note that the user buffer has not been copied to
822 // kernel space.
823 if (errno == EINPROGRESS) {
824 errno = EAGAIN;
825 fast_open_status_ = FAST_OPEN_SLOW_CONNECT_RETURN;
826 } else {
827 fast_open_status_ = FAST_OPEN_ERROR;
828 }
829 } else {
830 fast_open_status_ = FAST_OPEN_FAST_CONNECT_RETURN;
831 }
832 } else {
833 nwrite = HANDLE_EINTR(write(socket_, buf->data(), buf_len));
834 }
835 return nwrite;
836 }
837
838 void TCPSocketLibevent::RecordFastOpenStatus() {
839 if (use_tcp_fastopen_ &&
840 (fast_open_status_ == FAST_OPEN_FAST_CONNECT_RETURN ||
841 fast_open_status_ == FAST_OPEN_SLOW_CONNECT_RETURN)) {
842 DCHECK_NE(FAST_OPEN_STATUS_UNKNOWN, fast_open_status_);
843 bool getsockopt_success(false);
844 bool server_acked_data(false);
845 #if defined(TCP_INFO)
846 // Probe to see the if the socket used TCP Fast Open.
847 tcp_info info;
848 socklen_t info_len = sizeof(tcp_info);
849 getsockopt_success =
850 getsockopt(socket_, IPPROTO_TCP, TCP_INFO, &info, &info_len) == 0 &&
851 info_len == sizeof(tcp_info);
852 server_acked_data = getsockopt_success &&
853 (info.tcpi_options & TCPI_OPT_SYN_DATA);
854 #endif
855 if (getsockopt_success) {
856 if (fast_open_status_ == FAST_OPEN_FAST_CONNECT_RETURN) {
857 fast_open_status_ = (server_acked_data ? FAST_OPEN_SYN_DATA_ACK :
858 FAST_OPEN_SYN_DATA_NACK);
859 } else {
860 fast_open_status_ = (server_acked_data ? FAST_OPEN_NO_SYN_DATA_ACK :
861 FAST_OPEN_NO_SYN_DATA_NACK);
862 }
863 } else {
864 fast_open_status_ = (fast_open_status_ == FAST_OPEN_FAST_CONNECT_RETURN ?
865 FAST_OPEN_SYN_DATA_FAILED :
866 FAST_OPEN_NO_SYN_DATA_FAILED);
867 }
868 }
249 } 869 }
250 870
251 } // namespace net 871 } // namespace net
OLDNEW
« no previous file with comments | « net/socket/tcp_socket_libevent.h ('k') | net/socket/tcp_socket_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698