Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1950)

Unified Diff: net/udp/udp_socket_win.cc

Issue 861963002: UDP: Windows implementation using non-blocking IO (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: git cl try Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« net/udp/udp_socket_perftest.cc ('K') | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/udp/udp_socket_win.cc
diff --git a/net/udp/udp_socket_win.cc b/net/udp/udp_socket_win.cc
index 2f5549f8ed2bd16190ddca69c7f4ce746a63b5d6..2a39928f73a1259381160eb8e625d9369ae6d745 100644
--- a/net/udp/udp_socket_win.cc
+++ b/net/udp/udp_socket_win.cc
@@ -44,25 +44,40 @@ namespace net {
// declared on this class anymore.
class UDPSocketWin::Core : public base::RefCounted<Core> {
public:
- explicit Core(UDPSocketWin* socket);
+ explicit Core(UDPSocketWin* socket, bool use_overlapped_io);
- // Start watching for the end of a read or write operation.
+ // Start watching for the end of a read or write operation in overlapped mode.
+ // In non-blocking mode watch the signals to continure read/write.
void WatchForRead();
void WatchForWrite();
// The UDPSocketWin is going away.
void Detach() { socket_ = NULL; }
- // The separate OVERLAPPED variables for asynchronous operation.
- OVERLAPPED read_overlapped_;
- OVERLAPPED write_overlapped_;
+ bool use_overlapped_io() const { return use_overlapped_io_; }
- // The buffers used in Read() and Write().
- scoped_refptr<IOBuffer> read_iobuffer_;
- scoped_refptr<IOBuffer> write_iobuffer_;
+ OVERLAPPED* read_overlapped() { return &read_overlapped_; }
+ OVERLAPPED* write_overlapped() { return &write_overlapped_; }
- // The address storage passed to WSARecvFrom().
- SockaddrStorage recv_addr_storage_;
+ WSAEVENT read_event() const { return read_event_; }
+ WSAEVENT write_event() const { return write_event_; }
+
+ scoped_refptr<IOBuffer> read_iobuffer() const { return read_iobuffer_; }
+ scoped_refptr<IOBuffer> write_iobuffer() const { return write_iobuffer_; }
+
+ void set_read_iobuffer(scoped_refptr<IOBuffer> buf, int len) {
+ read_iobuffer_ = buf;
+ read_iobuffer_len_ = len;
+ }
+ void set_write_iobuffer(scoped_refptr<IOBuffer> buf, int len) {
+ write_iobuffer_ = buf;
+ write_iobuffer_len_ = len;
+ }
+
+ int read_iobuffer_len() const { return read_iobuffer_len_; }
+ int write_iobuffer_len() const { return write_iobuffer_len_; }
+
+ SockaddrStorage* recv_addr_storage() { return &recv_addr_storage_; }
private:
friend class base::RefCounted<Core>;
@@ -95,6 +110,7 @@ class UDPSocketWin::Core : public base::RefCounted<Core> {
// The socket that created this object.
UDPSocketWin* socket_;
+ const bool use_overlapped_io_;
// |reader_| handles the signals from |read_watcher_|.
ReadDelegate reader_;
@@ -106,18 +122,43 @@ class UDPSocketWin::Core : public base::RefCounted<Core> {
// |write_watcher_| watches for events from Write();
base::win::ObjectWatcher write_watcher_;
+ // The separate OVERLAPPED variables for asynchronous operation.
+ OVERLAPPED read_overlapped_;
+ OVERLAPPED write_overlapped_;
+
+ // Separate events for non-blocking IO operations.
+ WSAEVENT read_event_;
+ WSAEVENT write_event_;
+
+ // The buffers used in Read() and Write().
+ scoped_refptr<IOBuffer> read_iobuffer_;
+ scoped_refptr<IOBuffer> write_iobuffer_;
+ int read_iobuffer_len_;
+ int write_iobuffer_len_;
+
+ // The address storage passed to WSARecvFrom().
+ SockaddrStorage recv_addr_storage_;
+
DISALLOW_COPY_AND_ASSIGN(Core);
};
-UDPSocketWin::Core::Core(UDPSocketWin* socket)
+UDPSocketWin::Core::Core(UDPSocketWin* socket, bool use_overlapped_io)
: socket_(socket),
+ use_overlapped_io_(use_overlapped_io),
reader_(this),
- writer_(this) {
+ writer_(this),
+ read_iobuffer_len_(0),
+ write_iobuffer_len_(0) {
memset(&read_overlapped_, 0, sizeof(read_overlapped_));
memset(&write_overlapped_, 0, sizeof(write_overlapped_));
- read_overlapped_.hEvent = WSACreateEvent();
- write_overlapped_.hEvent = WSACreateEvent();
+ if (use_overlapped_io) {
+ read_overlapped_.hEvent = WSACreateEvent();
+ write_overlapped_.hEvent = WSACreateEvent();
+ } else {
+ read_event_ = WSACreateEvent();
+ write_event_ = WSACreateEvent();
+ }
}
UDPSocketWin::Core::~Core() {
@@ -125,24 +166,35 @@ UDPSocketWin::Core::~Core() {
read_watcher_.StopWatching();
write_watcher_.StopWatching();
- WSACloseEvent(read_overlapped_.hEvent);
- memset(&read_overlapped_, 0xaf, sizeof(read_overlapped_));
- WSACloseEvent(write_overlapped_.hEvent);
- memset(&write_overlapped_, 0xaf, sizeof(write_overlapped_));
+ if (use_overlapped_io_) {
+ WSACloseEvent(read_overlapped_.hEvent);
+ memset(&read_overlapped_, 0, sizeof(read_overlapped_));
+ WSACloseEvent(write_overlapped_.hEvent);
+ memset(&write_overlapped_, 0, sizeof(write_overlapped_));
+ } else {
+ WSACloseEvent(read_event_);
+ WSACloseEvent(write_event_);
+ }
}
void UDPSocketWin::Core::WatchForRead() {
// We grab an extra reference because there is an IO operation in progress.
// Balanced in ReadDelegate::OnObjectSignaled().
AddRef();
- read_watcher_.StartWatching(read_overlapped_.hEvent, &reader_);
+ if (use_overlapped_io_)
+ read_watcher_.StartWatching(read_overlapped_.hEvent, &reader_);
+ else
+ read_watcher_.StartWatching(read_event_, &reader_);
}
void UDPSocketWin::Core::WatchForWrite() {
// We grab an extra reference because there is an IO operation in progress.
// Balanced in WriteDelegate::OnObjectSignaled().
AddRef();
- write_watcher_.StartWatching(write_overlapped_.hEvent, &writer_);
+ if (use_overlapped_io_)
+ write_watcher_.StartWatching(write_overlapped_.hEvent, &writer_);
+ else
+ write_watcher_.StartWatching(write_event_, &writer_);
}
void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) {
@@ -151,9 +203,15 @@ void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) {
FROM_HERE_WITH_EXPLICIT_FUNCTION(
"UDPSocketWin_Core_ReadDelegate_OnObjectSignaled"));
- DCHECK_EQ(object, core_->read_overlapped_.hEvent);
- if (core_->socket_)
- core_->socket_->DidCompleteRead();
+ if (core_->use_overlapped_io()) {
+ DCHECK_EQ(object, core_->read_overlapped()->hEvent);
+ if (core_->socket_)
+ core_->socket_->DidCompleteReadOverlapped();
+ } else {
+ DCHECK_EQ(object, core_->read_event());
+ if (core_->socket_)
+ core_->socket_->OnReadSignaledNonBlocking();
+ }
core_->Release();
}
@@ -164,9 +222,15 @@ void UDPSocketWin::Core::WriteDelegate::OnObjectSignaled(HANDLE object) {
FROM_HERE_WITH_EXPLICIT_FUNCTION(
"UDPSocketWin_Core_WriteDelegate_OnObjectSignaled"));
- DCHECK_EQ(object, core_->write_overlapped_.hEvent);
- if (core_->socket_)
- core_->socket_->DidCompleteWrite();
+ if (core_->use_overlapped_io()) {
+ DCHECK_EQ(object, core_->write_overlapped()->hEvent);
+ if (core_->socket_)
+ core_->socket_->DidCompleteWriteOverlapped();
+ } else {
+ DCHECK_EQ(object, core_->write_event());
+ if (core_->socket_)
+ core_->socket_->OnWriteSignaledNonBlocking();
+ }
core_->Release();
}
@@ -261,10 +325,13 @@ UDPSocketWin::UDPSocketWin(DatagramSocket::BindType bind_type,
multicast_time_to_live_(1),
bind_type_(bind_type),
rand_int_cb_(rand_int_cb),
+ non_blocking_reads_initialized_(false),
+ non_blocking_writes_initialized_(false),
recv_from_address_(NULL),
net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)),
qos_handle_(NULL),
- qos_flow_id_(0) {
+ qos_flow_id_(0),
+ use_overlapped_io_(true) {
EnsureWinsockInit();
net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
source.ToEventParametersCallback());
@@ -285,7 +352,7 @@ int UDPSocketWin::Open(AddressFamily address_family) {
socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, IPPROTO_UDP);
if (socket_ == INVALID_SOCKET)
return MapSystemError(WSAGetLastError());
- core_ = new Core(this);
+ core_ = new Core(this, use_overlapped_io_);
return OK;
}
@@ -377,7 +444,9 @@ int UDPSocketWin::RecvFrom(IOBuffer* buf,
DCHECK(!callback.is_null()); // Synchronous operation not supported.
DCHECK_GT(buf_len, 0);
- int nread = InternalRecvFrom(buf, buf_len, address);
+ int nread = use_overlapped_io_
+ ? InternalRecvFromOverlapped(buf, buf_len, address)
+ : InternalRecvFromNonBlocking(buf, buf_len, address);
if (nread != ERR_IO_PENDING)
return nread;
@@ -410,7 +479,9 @@ int UDPSocketWin::SendToOrWrite(IOBuffer* buf,
DCHECK_GT(buf_len, 0);
DCHECK(!send_to_address_.get());
- int nwrite = InternalSendTo(buf, buf_len, address);
+ int nwrite = use_overlapped_io_
+ ? InternalSendToOverlapped(buf, buf_len, address)
+ : InternalSendToNonBlocking(buf, buf_len, address);
if (nwrite != ERR_IO_PENDING)
return nwrite;
@@ -566,23 +637,95 @@ void UDPSocketWin::DoWriteCallback(int rv) {
c.Run(rv);
}
-void UDPSocketWin::DidCompleteRead() {
+void UDPSocketWin::DidCompleteReadOverlapped() {
DWORD num_bytes, flags;
- BOOL ok = WSAGetOverlappedResult(socket_, &core_->read_overlapped_,
+ BOOL ok = WSAGetOverlappedResult(socket_, core_->read_overlapped(),
&num_bytes, FALSE, &flags);
- WSAResetEvent(core_->read_overlapped_.hEvent);
+ WSAResetEvent(core_->read_overlapped()->hEvent);
int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
// Convert address.
if (recv_from_address_ && result >= 0) {
if (!ReceiveAddressToIPEndpoint(recv_from_address_))
result = ERR_ADDRESS_INVALID;
}
- LogRead(result, core_->read_iobuffer_->data());
- core_->read_iobuffer_ = NULL;
+ LogRead(result, core_->read_iobuffer()->data());
+ core_->set_read_iobuffer(NULL, 0);
recv_from_address_ = NULL;
DoReadCallback(result);
}
+void UDPSocketWin::DidCompleteWriteOverlapped() {
+ DWORD num_bytes, flags;
+ BOOL ok = WSAGetOverlappedResult(socket_, core_->write_overlapped(),
+ &num_bytes, FALSE, &flags);
+ WSAResetEvent(core_->write_overlapped()->hEvent);
+ int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
+ LogWrite(result, core_->write_iobuffer()->data(), send_to_address_.get());
+
+ send_to_address_.reset();
+ core_->set_write_iobuffer(NULL, 0);
+ DoWriteCallback(result);
+}
+
+void UDPSocketWin::OnReadSignaledNonBlocking() {
+ DCHECK(core_->read_iobuffer());
+ WSANETWORKEVENTS network_events;
+ int os_error = 0;
+ int rv = WSAEnumNetworkEvents(socket_, core_->read_event(), &network_events);
+ if (rv == SOCKET_ERROR) {
+ os_error = WSAGetLastError();
+ rv = MapSystemError(os_error);
+ DoReadCallback(rv);
+ return;
+ }
+ if (network_events.lNetworkEvents) {
+ DCHECK(network_events.lNetworkEvents & FD_READ);
+ // Regardless of the error code in network_events.iErrorCode[FD_READ_BIT]
+ // we will continue to call recvfrom because it gives a more accurate
+ // error code.
+ rv = InternalRecvFromNonBlocking(core_->read_iobuffer().get(),
+ core_->read_iobuffer_len(),
+ recv_from_address_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ } else {
+ core_->WatchForRead();
+ return;
+ }
+ core_->set_read_iobuffer(NULL, 0);
+ recv_from_address_ = NULL;
+ DoReadCallback(rv);
+}
+
+void UDPSocketWin::OnWriteSignaledNonBlocking() {
+ DCHECK(core_->write_iobuffer());
+ WSANETWORKEVENTS network_events;
+ int os_error = 0;
+ int rv = WSAEnumNetworkEvents(socket_, core_->write_event(), &network_events);
+ if (rv == SOCKET_ERROR) {
+ os_error = WSAGetLastError();
+ rv = MapSystemError(os_error);
+ DoWriteCallback(rv);
+ return;
+ } else if (network_events.lNetworkEvents) {
+ DCHECK(network_events.lNetworkEvents & FD_WRITE);
+ // Regardless of the error code in network_events.iErrorCode[FD_WRITE_BIT]
+ // we will continue to call sendto because it gives a more accurate error
+ // code.
+ rv = InternalSendToNonBlocking(core_->write_iobuffer().get(),
+ core_->write_iobuffer_len(),
+ send_to_address_.get());
+ if (rv == ERR_IO_PENDING)
+ return;
+ } else {
+ core_->WatchForWrite();
+ return;
+ }
+ core_->set_write_iobuffer(NULL, 0);
+ send_to_address_.reset();
+ DoWriteCallback(rv);
+}
+
void UDPSocketWin::LogRead(int result, const char* bytes) const {
if (result < 0) {
net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
@@ -605,19 +748,6 @@ void UDPSocketWin::LogRead(int result, const char* bytes) const {
NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
}
-void UDPSocketWin::DidCompleteWrite() {
- DWORD num_bytes, flags;
- BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
- &num_bytes, FALSE, &flags);
- WSAResetEvent(core_->write_overlapped_.hEvent);
- int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
- LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
-
- send_to_address_.reset();
- core_->write_iobuffer_ = NULL;
- DoWriteCallback(result);
-}
-
void UDPSocketWin::LogWrite(int result,
const char* bytes,
const IPEndPoint* address) const {
@@ -637,11 +767,12 @@ void UDPSocketWin::LogWrite(int result,
NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
}
-int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
- IPEndPoint* address) {
- DCHECK(!core_->read_iobuffer_.get());
- SockaddrStorage& storage = core_->recv_addr_storage_;
- storage.addr_len = sizeof(storage.addr_storage);
+int UDPSocketWin::InternalRecvFromOverlapped(IOBuffer* buf,
+ int buf_len,
+ IPEndPoint* address) {
+ DCHECK(!core_->read_iobuffer());
+ SockaddrStorage* storage = core_->recv_addr_storage();
+ storage->addr_len = sizeof(storage->addr_storage);
WSABUF read_buffer;
read_buffer.buf = buf->data();
@@ -650,11 +781,11 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
DWORD flags = 0;
DWORD num;
CHECK_NE(INVALID_SOCKET, socket_);
- AssertEventNotSignaled(core_->read_overlapped_.hEvent);
- int rv = WSARecvFrom(socket_, &read_buffer, 1, &num, &flags, storage.addr,
- &storage.addr_len, &core_->read_overlapped_, NULL);
+ AssertEventNotSignaled(core_->read_overlapped()->hEvent);
+ int rv = WSARecvFrom(socket_, &read_buffer, 1, &num, &flags, storage->addr,
+ &storage->addr_len, core_->read_overlapped(), NULL);
if (rv == 0) {
- if (ResetEventIfSignaled(core_->read_overlapped_.hEvent)) {
+ if (ResetEventIfSignaled(core_->read_overlapped()->hEvent)) {
int result = num;
// Convert address.
if (address && result >= 0) {
@@ -673,13 +804,14 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
}
}
core_->WatchForRead();
- core_->read_iobuffer_ = buf;
+ core_->set_read_iobuffer(buf, buf_len);
return ERR_IO_PENDING;
}
-int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
- const IPEndPoint* address) {
- DCHECK(!core_->write_iobuffer_.get());
+int UDPSocketWin::InternalSendToOverlapped(IOBuffer* buf,
+ int buf_len,
+ const IPEndPoint* address) {
+ DCHECK(!core_->write_iobuffer());
SockaddrStorage storage;
struct sockaddr* addr = storage.addr;
// Convert address.
@@ -700,11 +832,11 @@ int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
DWORD flags = 0;
DWORD num;
- AssertEventNotSignaled(core_->write_overlapped_.hEvent);
- int rv = WSASendTo(socket_, &write_buffer, 1, &num, flags,
- addr, storage.addr_len, &core_->write_overlapped_, NULL);
+ AssertEventNotSignaled(core_->write_overlapped()->hEvent);
+ int rv = WSASendTo(socket_, &write_buffer, 1, &num, flags, addr,
+ storage.addr_len, core_->write_overlapped(), NULL);
if (rv == 0) {
- if (ResetEventIfSignaled(core_->write_overlapped_.hEvent)) {
+ if (ResetEventIfSignaled(core_->write_overlapped()->hEvent)) {
int result = num;
LogWrite(result, buf->data(), address);
return result;
@@ -719,10 +851,87 @@ int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
}
core_->WatchForWrite();
- core_->write_iobuffer_ = buf;
+ core_->set_write_iobuffer(buf, 0);
return ERR_IO_PENDING;
}
+int UDPSocketWin::InternalRecvFromNonBlocking(IOBuffer* buf,
+ int buf_len,
+ IPEndPoint* address) {
+ if (!non_blocking_reads_initialized_) {
+ // After this call the event will be signaled when asynchronous read is
+ // completed.g
+ WSAEventSelect(socket_, core_->read_event(), FD_READ);
+ non_blocking_reads_initialized_ = true;
+ }
+ SockaddrStorage* storage = core_->recv_addr_storage();
+ storage->addr_len = sizeof(storage->addr_storage);
+
+ CHECK_NE(INVALID_SOCKET, socket_);
+ int rv = recvfrom(socket_, buf->data(), buf_len, 0, storage->addr,
+ &storage->addr_len);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error == WSAEWOULDBLOCK) {
+ core_->WatchForRead();
+ core_->set_read_iobuffer(buf, buf_len);
+ return ERR_IO_PENDING;
+ } else {
+ rv = MapSystemError(os_error);
+ LogRead(rv, NULL);
+ return rv;
+ }
+ } else {
+ // Convert address.
+ if (address && rv >= 0 && !ReceiveAddressToIPEndpoint(address)) {
+ rv = ERR_ADDRESS_INVALID;
+ }
+ LogRead(rv, buf->data());
+ return rv;
+ }
+}
+
+int UDPSocketWin::InternalSendToNonBlocking(IOBuffer* buf,
+ int buf_len,
+ const IPEndPoint* address) {
+ if (!non_blocking_writes_initialized_) {
+ // After this call the event will be signaled when asynchronous write is
+ // completed.
+ WSAEventSelect(socket_, core_->write_event(), FD_WRITE);
+ non_blocking_writes_initialized_ = true;
+ }
+ SockaddrStorage storage;
+ struct sockaddr* addr = storage.addr;
+ // Convert address.
+ if (address) {
+ if (!address->ToSockAddr(addr, &storage.addr_len)) {
+ int result = ERR_ADDRESS_INVALID;
+ LogWrite(result, NULL, NULL);
+ return result;
+ }
+ } else {
+ addr = NULL;
+ storage.addr_len = 0;
+ }
+
+ int rv = sendto(socket_, buf->data(), buf_len, 0, addr, storage.addr_len);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error == WSAEWOULDBLOCK) {
+ core_->WatchForWrite();
+ core_->set_write_iobuffer(buf, buf_len);
+ return ERR_IO_PENDING;
+ } else {
+ rv = MapSystemError(os_error);
+ LogWrite(rv, NULL, NULL);
+ return rv;
+ }
+ } else {
+ LogWrite(rv, buf->data(), address);
+ return rv;
+ }
+}
+
int UDPSocketWin::SetMulticastOptions() {
if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
DWORD loop = 0;
@@ -808,8 +1017,8 @@ int UDPSocketWin::RandomBind(const IPAddressNumber& address) {
}
bool UDPSocketWin::ReceiveAddressToIPEndpoint(IPEndPoint* address) const {
- SockaddrStorage& storage = core_->recv_addr_storage_;
- return address->FromSockAddr(storage.addr, storage.addr_len);
+ SockaddrStorage* storage = core_->recv_addr_storage();
+ return address->FromSockAddr(storage->addr, storage->addr_len);
}
int UDPSocketWin::JoinGroup(
@@ -1016,4 +1225,12 @@ void UDPSocketWin::DetachFromThread() {
base::NonThreadSafe::DetachFromThread();
}
+void UDPSocketWin::UseNonBlockingIO() {
+ if (core_) {
+ NOTREACHED() << "Cannot change to non-blocking mode after socket is used.";
+ } else {
+ use_overlapped_io_ = false;
+ }
+}
+
} // namespace net
« net/udp/udp_socket_perftest.cc ('K') | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698