Index: net/base/tcp_client_socket_libevent.cc |
=================================================================== |
--- net/base/tcp_client_socket_libevent.cc (revision 0) |
+++ net/base/tcp_client_socket_libevent.cc (revision 0) |
@@ -0,0 +1,275 @@ |
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "net/base/tcp_client_socket.h" |
+#include "net/base/net_errors.h" |
+#include "third_party/libevent/event.h" |
+#include "base/message_loop.h" |
+ |
+#include <sys/socket.h> |
+#include <errno.h> |
+#include <fcntl.h> |
+#include <netdb.h> |
+ |
+namespace net { |
+ |
+#ifndef INVALID_SOCKET |
+#define INVALID_SOCKET (unsigned)(-1) |
+#endif |
+ |
+static int MapPosixError(int err) { |
+ // There are numerous posix error codes, but these are the ones we thus far |
+ // find interesting. |
+ switch (err) { |
+ default: |
+ return ERR_FAILED; |
+ } |
+} |
+ |
+//----------------------------------------------------------------------------- |
+ |
+TCPClientSocket::TCPClientSocket(const AddressList& addresses) |
+ : socket_(INVALID_SOCKET), |
+ addresses_(addresses), |
+ current_ai_(addresses_.head()), |
+ wait_state_(NOT_WAITING) { |
+ event_ = new struct event; |
+} |
+ |
+TCPClientSocket::~TCPClientSocket() { |
+ Disconnect(); |
+ delete event_; |
+} |
+ |
+int TCPClientSocket::Connect(CompletionCallback* callback) { |
+ // If already connected, then just return OK. |
+ if (socket_ != INVALID_SOCKET) |
+ return OK; |
+ |
+ const struct addrinfo* ai = current_ai_; |
+ DCHECK(ai); |
+ |
+ int rv = CreateSocket(ai); |
+ if (rv != OK) |
+ return rv; |
+ |
+ if (!connect(socket_, ai->ai_addr, static_cast<int>(ai->ai_addrlen))) { |
+ // Connected without waiting! |
+ return OK; |
+ } |
+ |
+ if (errno != EINPROGRESS && errno != EWOULDBLOCK) { |
+ LOG(ERROR) << "connect failed: " << errno; |
+ return MapPosixError(errno); |
+ } |
+ |
+ // Initialize event_ and link it to our MessagePump. |
+ // POLLOUT is set if the connection is established. POLLIN is set if the connection fails, |
+ // so select for both read and write. |
+ ::event_set(event_, socket_, EV_READ|EV_WRITE|EV_PERSIST, TCPClientSocket_libevent_cb, this); |
+ ::event_base_set(MessageLoop::current()->pump_libevent()->getEventbase(), event_); |
+ if (::event_add(event_, NULL)) |
+ return ERR_FAILED; |
+ |
+ wait_state_ = WAITING_CONNECT; |
+ callback_ = callback; |
+ return ERR_IO_PENDING; |
+} |
+ |
+int TCPClientSocket::ReconnectIgnoringLastError(CompletionCallback* callback) { |
+ // No ignorable errors! |
+ return ERR_FAILED; |
+} |
+ |
+void TCPClientSocket::Disconnect() { |
+ if (socket_ == INVALID_SOCKET) |
+ return; |
+ |
+ ::event_del(event_); |
+ close(socket_); |
+ socket_ = INVALID_SOCKET; |
+ |
+ // Reset for next time. |
+ current_ai_ = addresses_.head(); |
+} |
+ |
+bool TCPClientSocket::IsConnected() const { |
+ if (socket_ == INVALID_SOCKET || wait_state_ == WAITING_CONNECT) |
+ return false; |
+ |
+ // Check if connection is alive. |
+ char c; |
+ int rv = recv(socket_, &c, 1, MSG_PEEK); |
+ if (rv == 0) |
+ return false; |
+ |
+ return true; |
+} |
+ |
+int TCPClientSocket::Read(char* buf, |
+ int buf_len, |
+ CompletionCallback* callback) { |
+ DCHECK(socket_ != INVALID_SOCKET); |
+ DCHECK(wait_state_ == NOT_WAITING); |
+ DCHECK(!callback_); |
+ DCHECK(callback); // null callback not allowed? |
+ DCHECK(buf_len > 0); |
+ |
+ int nread = read(socket_, buf, buf_len); |
+ if (nread > 0) { |
+ return nread; |
+ } |
+ if (nread == -1 && errno != EWOULDBLOCK) |
+ return MapPosixError(errno); |
+ |
+ ::event_set(event_, socket_, EV_READ|EV_PERSIST, TCPClientSocket_libevent_cb, this); |
+ ::event_base_set(MessageLoop::current()->pump_libevent()->getEventbase(), event_); |
+ if (::event_add(event_, NULL)) |
+ return ERR_FAILED; |
+ buf_ = buf; |
+ buf_len_ = buf_len; |
+ wait_state_ = WAITING_READ; |
+ callback_ = callback; |
+ return ERR_IO_PENDING; |
+} |
+ |
+int TCPClientSocket::Write(const char* buf, |
+ int buf_len, |
+ CompletionCallback* callback) { |
+ DCHECK(socket_ != INVALID_SOCKET); |
+ DCHECK(wait_state_ == NOT_WAITING); |
+ DCHECK(!callback_); |
+ DCHECK(buf_len > 0); |
+ |
+ int nwrite = write(socket_, buf, buf_len); |
+ if (nwrite > 0) { |
+ return nwrite; |
+ } |
+ if (nwrite == -1 && errno != EWOULDBLOCK) |
+ return MapPosixError(errno); |
+ |
+ ::event_set(event_, socket_, EV_WRITE|EV_PERSIST, TCPClientSocket_libevent_cb, this); |
+ ::event_base_set(MessageLoop::current()->pump_libevent()->getEventbase(), event_); |
+ if (::event_add(event_, NULL)) |
+ return ERR_FAILED; |
+ buf_ = const_cast<char *>(buf); |
+ buf_len_ = buf_len; |
+ wait_state_ = WAITING_WRITE; |
+ callback_ = callback; |
+ return ERR_IO_PENDING; |
+} |
+ |
+int TCPClientSocket::CreateSocket(const struct addrinfo* ai) { |
+ socket_ = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); |
+ if (socket_ == INVALID_SOCKET) |
+ return MapPosixError(errno); |
+ |
+ // All our socket I/O is nonblocking |
+ int flags = fcntl(socket_, F_GETFL, 0); |
+ if (-1 == flags) |
+ flags = 0; |
+ if (fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) |
+ return MapPosixError(errno); |
+ |
+ return OK; |
+} |
+ |
+void TCPClientSocket::DoCallback(int rv) { |
+ DCHECK(rv != ERR_IO_PENDING); |
+ DCHECK(callback_); |
+ |
+ // since Run may result in Read being called, clear callback_ up front. |
+ CompletionCallback* c = callback_; |
+ callback_ = NULL; |
+ c->Run(rv); |
+} |
+ |
+void TCPClientSocket::DidCompleteConnect() { |
+ int result = ERR_UNEXPECTED; |
+ |
+ wait_state_ = NOT_WAITING; |
+ |
+ /* check to see if connect succeeded */ |
+ int error_code = -1; |
+ socklen_t len = sizeof(error_code); |
+ if (getsockopt(socket_, SOL_SOCKET, SO_ERROR, (char *)&error_code, &len) < 0) { |
+ result = MapPosixError(errno); |
+ } else if (error_code == EINPROGRESS) { |
+ result = ERR_IO_PENDING; |
+ } else if (current_ai_->ai_next && ( |
+ error_code == EADDRNOTAVAIL || |
+ error_code == EAFNOSUPPORT || |
+ error_code == ECONNREFUSED || |
+ error_code == ENETUNREACH || |
+ error_code == EHOSTUNREACH || |
+ error_code == ETIMEDOUT)) { |
+ const struct addrinfo* next = current_ai_->ai_next; |
+ Disconnect(); |
+ current_ai_ = next; |
+ result = Connect(callback_); |
+ } else if (error_code) { |
+ result = MapPosixError(error_code); |
+ } else { |
+ result = 0; |
+ ::event_del(event_); |
+ } |
+ |
+ if (result != ERR_IO_PENDING) |
+ DoCallback(result); |
+} |
+ |
+void TCPClientSocket::DidCompleteIO() { |
+ int result; |
+ int n; |
+ |
+ switch (wait_state_) { |
+ case WAITING_READ: |
+ n = read(socket_, buf_, buf_len_); |
+ break; |
+ case WAITING_WRITE: |
+ n = write(socket_, buf_, buf_len_); |
+ break; |
+ } |
+ |
+ if (n > 0) |
+ result = n; |
+ else if (n == 0) { |
+ // TODO: can we tell why it was closed, and return a more informative message? |
+ // And why does the unit test want to see zero? |
+ //result = ERR_CONNECTION_CLOSED; |
+ result = 0; |
+ } else |
+ result = MapPosixError(errno); |
+ |
+ if (result != ERR_IO_PENDING) { |
+ wait_state_ = NOT_WAITING; |
+ ::event_del(event_); |
+ DoCallback(result); |
+ } |
+} |
+ |
+void TCPClientSocket::OnLibeventNotification(short flags) { |
+ |
+ switch (wait_state_) { |
+ case WAITING_CONNECT: |
+ DidCompleteConnect(); |
+ break; |
+ case WAITING_READ: |
+ case WAITING_WRITE: |
+ DidCompleteIO(); |
+ break; |
+ default: |
+ NOTREACHED(); |
+ break; |
+ } |
+} |
+ |
+extern "C" void TCPClientSocket_libevent_cb(int socket, short flags, void *context) { |
+ TCPClientSocket* that = static_cast<TCPClientSocket*>(context); |
+ DCHECK(that->socket_ == socket); |
+ that->OnLibeventNotification(flags); |
+} |
+ |
+} // namespace net |
+ |