Index: net/base/tcp_client_socket_libevent.cc |
=================================================================== |
--- net/base/tcp_client_socket_libevent.cc (revision 3736) |
+++ net/base/tcp_client_socket_libevent.cc (working copy) |
@@ -67,8 +67,10 @@ |
: socket_(kInvalidSocket), |
addresses_(addresses), |
current_ai_(addresses_.head()), |
- wait_state_(NOT_WAITING), |
- event_(new event) { |
+ waiting_connect_(false), |
+ event_(new event), |
+ write_callback_(NULL), |
+ callback_(NULL) { |
} |
TCPClientSocket::~TCPClientSocket() { |
@@ -80,7 +82,7 @@ |
if (socket_ != kInvalidSocket) |
return OK; |
- DCHECK(wait_state_ == NOT_WAITING); |
+ DCHECK(!waiting_connect_); |
const addrinfo* ai = current_ai_; |
DCHECK(ai); |
@@ -98,7 +100,7 @@ |
DCHECK(callback); |
if (errno != EINPROGRESS) { |
- LOG(ERROR) << "connect failed: " << errno; |
+ DLOG(INFO) << "connect failed: " << errno; |
return MapPosixError(errno); |
} |
@@ -109,7 +111,7 @@ |
MessageLoopForIO::current()->WatchSocket( |
socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this); |
- wait_state_ = WAITING_CONNECT; |
+ waiting_connect_ = true; |
callback_ = callback; |
return ERR_IO_PENDING; |
} |
@@ -126,13 +128,14 @@ |
MessageLoopForIO::current()->UnwatchSocket(event_.get()); |
close(socket_); |
socket_ = kInvalidSocket; |
+ waiting_connect_ = false; |
// Reset for next time. |
current_ai_ = addresses_.head(); |
} |
bool TCPClientSocket::IsConnected() const { |
- if (socket_ == kInvalidSocket || wait_state_ == WAITING_CONNECT) |
+ if (socket_ == kInvalidSocket || waiting_connect_) |
return false; |
// Check if connection is alive. |
@@ -150,7 +153,7 @@ |
int buf_len, |
CompletionCallback* callback) { |
DCHECK(socket_ != kInvalidSocket); |
- DCHECK(wait_state_ == NOT_WAITING); |
+ DCHECK(!waiting_connect_); |
DCHECK(!callback_); |
// Synchronous operation not supported |
DCHECK(callback); |
@@ -160,15 +163,16 @@ |
if (nread >= 0) { |
return nread; |
} |
- if (errno != EAGAIN && errno != EWOULDBLOCK) |
+ if (errno != EAGAIN && errno != EWOULDBLOCK) { |
+ DLOG(INFO) << "read failed, errno " << errno; |
return MapPosixError(errno); |
+ } |
MessageLoopForIO::current()->WatchSocket( |
socket_, EV_READ|EV_PERSIST, event_.get(), this); |
buf_ = buf; |
buf_len_ = buf_len; |
- wait_state_ = WAITING_READ; |
callback_ = callback; |
return ERR_IO_PENDING; |
} |
@@ -177,8 +181,8 @@ |
int buf_len, |
CompletionCallback* callback) { |
DCHECK(socket_ != kInvalidSocket); |
- DCHECK(wait_state_ == NOT_WAITING); |
- DCHECK(!callback_); |
+ DCHECK(!waiting_connect_); |
+ DCHECK(!write_callback_); |
// Synchronous operation not supported |
DCHECK(callback); |
DCHECK(buf_len > 0); |
@@ -193,10 +197,9 @@ |
MessageLoopForIO::current()->WatchSocket( |
socket_, EV_WRITE|EV_PERSIST, event_.get(), this); |
- buf_ = const_cast<char*>(buf); |
- buf_len_ = buf_len; |
- wait_state_ = WAITING_WRITE; |
- callback_ = callback; |
+ write_buf_ = buf; |
+ write_buf_len_ = buf_len; |
+ write_callback_ = callback; |
return ERR_IO_PENDING; |
} |
@@ -222,11 +225,19 @@ |
c->Run(rv); |
} |
+void TCPClientSocket::DoWriteCallback(int rv) { |
+ DCHECK(rv != ERR_IO_PENDING); |
+ DCHECK(write_callback_); |
+ |
+ // since Run may result in Write being called, clear write_callback_ up front. |
+ CompletionCallback* c = write_callback_; |
+ write_callback_ = NULL; |
+ c->Run(rv); |
+} |
+ |
void TCPClientSocket::DidCompleteConnect() { |
int result = ERR_UNEXPECTED; |
- wait_state_ = NOT_WAITING; |
- |
// Check to see if connect succeeded |
int error_code = 0; |
socklen_t len = sizeof(error_code); |
@@ -236,7 +247,6 @@ |
if (error_code == EINPROGRESS || error_code == EALREADY) { |
NOTREACHED(); // This indicates a bug in libevent or our code. |
result = ERR_IO_PENDING; |
- wait_state_ = WAITING_CONNECT; // And await next callback. |
} else if (current_ai_->ai_next && ( |
error_code == EADDRNOTAVAIL || |
error_code == EAFNOSUPPORT || |
@@ -252,25 +262,16 @@ |
} else { |
result = MapPosixError(error_code); |
MessageLoopForIO::current()->UnwatchSocket(event_.get()); |
+ waiting_connect_ = false; |
} |
if (result != ERR_IO_PENDING) |
DoCallback(result); |
} |
-void TCPClientSocket::DidCompleteIO() { |
+void TCPClientSocket::DidCompleteRead() { |
int bytes_transferred; |
- switch (wait_state_) { |
- case WAITING_READ: |
- bytes_transferred = read(socket_, buf_, buf_len_); |
- break; |
- case WAITING_WRITE: |
- bytes_transferred = write(socket_, buf_, buf_len_); |
- break; |
- default: |
- NOTREACHED(); |
- return; |
- } |
+ bytes_transferred = read(socket_, buf_, buf_len_); |
int result; |
if (bytes_transferred >= 0) { |
@@ -280,26 +281,48 @@ |
} |
if (result != ERR_IO_PENDING) { |
- wait_state_ = NOT_WAITING; |
+ buf_ = NULL; |
+ buf_len_ = 0; |
MessageLoopForIO::current()->UnwatchSocket(event_.get()); |
DoCallback(result); |
} |
} |
+void TCPClientSocket::DidCompleteWrite() { |
+ int bytes_transferred; |
+ bytes_transferred = write(socket_, write_buf_, write_buf_len_); |
+ |
+ int result; |
+ if (bytes_transferred >= 0) { |
+ result = bytes_transferred; |
+ } else { |
+ result = MapPosixError(errno); |
+ } |
+ |
+ if (result != ERR_IO_PENDING) { |
+ write_buf_ = NULL; |
+ write_buf_len_ = 0; |
+ MessageLoopForIO::current()->UnwatchSocket(event_.get()); |
+ DoWriteCallback(result); |
+ } |
+} |
+ |
void TCPClientSocket::OnSocketReady(short flags) { |
- switch (wait_state_) { |
- case WAITING_CONNECT: |
- DidCompleteConnect(); |
- break; |
- case WAITING_READ: |
- case WAITING_WRITE: |
- DidCompleteIO(); |
- break; |
- default: |
- NOTREACHED(); |
- break; |
+ // the only used bits of flags are EV_READ and EV_WRITE |
+ |
+ if (waiting_connect_) { |
+ DidCompleteConnect(); |
+ } else { |
+ if ((flags & EV_WRITE) && write_callback_) |
+ DidCompleteWrite(); |
+ if ((flags & EV_READ) && callback_) |
+ DidCompleteRead(); |
} |
} |
+int TCPClientSocket::GetPeerName(struct sockaddr *name, socklen_t *namelen) { |
+ return ::getpeername(socket_, name, namelen); |
+} |
+ |
} // namespace net |