Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Unified Diff: net/udp/udp_socket_win.cc

Issue 861963002: UDP: Windows implementation using non-blocking IO (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: WSAEventSelect Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« net/udp/udp_socket_unittest.cc ('K') | « net/udp/udp_socket_unittest.cc ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/udp/udp_socket_win.cc
diff --git a/net/udp/udp_socket_win.cc b/net/udp/udp_socket_win.cc
index 2f5549f8ed2bd16190ddca69c7f4ce746a63b5d6..e6902f59e7a5beadd6375ba6f30804254dfc462e 100644
--- a/net/udp/udp_socket_win.cc
+++ b/net/udp/udp_socket_win.cc
@@ -53,13 +53,17 @@ class UDPSocketWin::Core : public base::RefCounted<Core> {
// The UDPSocketWin is going away.
void Detach() { socket_ = NULL; }
- // The separate OVERLAPPED variables for asynchronous operation.
- OVERLAPPED read_overlapped_;
- OVERLAPPED write_overlapped_;
+ // Separate events for non-blocking IO operations.
+ WSAEVENT read_event_;
+ WSAEVENT write_event_;
// The buffers used in Read() and Write().
scoped_refptr<IOBuffer> read_iobuffer_;
scoped_refptr<IOBuffer> write_iobuffer_;
+ int read_iobuffer_len_;
rvargas (doing something else) 2015/01/21 22:10:09 We are not supposed to have public variables. We s
Alpha Left Google 2015/01/22 01:01:25 Done.
+ int write_iobuffer_len_;
+ bool non_blocking_reads_initialized_;
+ bool non_blocking_writes_initialized_;
// The address storage passed to WSARecvFrom().
SockaddrStorage recv_addr_storage_;
@@ -112,12 +116,13 @@ class UDPSocketWin::Core : public base::RefCounted<Core> {
UDPSocketWin::Core::Core(UDPSocketWin* socket)
: socket_(socket),
reader_(this),
- writer_(this) {
- memset(&read_overlapped_, 0, sizeof(read_overlapped_));
- memset(&write_overlapped_, 0, sizeof(write_overlapped_));
-
- read_overlapped_.hEvent = WSACreateEvent();
- write_overlapped_.hEvent = WSACreateEvent();
+ writer_(this),
+ read_iobuffer_len_(0),
+ write_iobuffer_len_(0),
+ non_blocking_reads_initialized_(false),
+ non_blocking_writes_initialized_(false) {
+ read_event_ = WSACreateEvent();
+ write_event_ = WSACreateEvent();
}
UDPSocketWin::Core::~Core() {
@@ -125,24 +130,22 @@ UDPSocketWin::Core::~Core() {
read_watcher_.StopWatching();
write_watcher_.StopWatching();
- WSACloseEvent(read_overlapped_.hEvent);
- memset(&read_overlapped_, 0xaf, sizeof(read_overlapped_));
- WSACloseEvent(write_overlapped_.hEvent);
- memset(&write_overlapped_, 0xaf, sizeof(write_overlapped_));
+ WSACloseEvent(read_event_);
+ WSACloseEvent(write_event_);
}
void UDPSocketWin::Core::WatchForRead() {
// We grab an extra reference because there is an IO operation in progress.
// Balanced in ReadDelegate::OnObjectSignaled().
AddRef();
- read_watcher_.StartWatching(read_overlapped_.hEvent, &reader_);
+ read_watcher_.StartWatching(read_event_, &reader_);
}
void UDPSocketWin::Core::WatchForWrite() {
// We grab an extra reference because there is an IO operation in progress.
// Balanced in WriteDelegate::OnObjectSignaled().
AddRef();
- write_watcher_.StartWatching(write_overlapped_.hEvent, &writer_);
+ write_watcher_.StartWatching(write_event_, &writer_);
}
void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) {
@@ -151,7 +154,7 @@ void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) {
FROM_HERE_WITH_EXPLICIT_FUNCTION(
"UDPSocketWin_Core_ReadDelegate_OnObjectSignaled"));
- DCHECK_EQ(object, core_->read_overlapped_.hEvent);
+ DCHECK_EQ(object, core_->read_event_);
if (core_->socket_)
core_->socket_->DidCompleteRead();
@@ -164,7 +167,7 @@ void UDPSocketWin::Core::WriteDelegate::OnObjectSignaled(HANDLE object) {
FROM_HERE_WITH_EXPLICIT_FUNCTION(
"UDPSocketWin_Core_WriteDelegate_OnObjectSignaled"));
- DCHECK_EQ(object, core_->write_overlapped_.hEvent);
+ DCHECK_EQ(object, core_->write_event_);
if (core_->socket_)
core_->socket_->DidCompleteWrite();
@@ -567,20 +570,36 @@ void UDPSocketWin::DoWriteCallback(int rv) {
}
void UDPSocketWin::DidCompleteRead() {
- DWORD num_bytes, flags;
- BOOL ok = WSAGetOverlappedResult(socket_, &core_->read_overlapped_,
- &num_bytes, FALSE, &flags);
- WSAResetEvent(core_->read_overlapped_.hEvent);
- int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
- // Convert address.
- if (recv_from_address_ && result >= 0) {
+ WSANETWORKEVENTS network_events;
+ int os_error = 0;
+ int rv = WSAEnumNetworkEvents(socket_, core_->read_event_, &network_events);
+ if (rv == SOCKET_ERROR) {
+ os_error = WSAGetLastError();
+ rv = MapSystemError(os_error);
+ DoReadCallback(rv);
+ return;
+ } else if (network_events.lNetworkEvents) {
Ryan Hamilton 2015/01/21 04:03:35 nit: since there was a return in the previous bloc
Alpha Left Google 2015/01/22 01:01:25 Done.
+ DCHECK(network_events.lNetworkEvents & FD_READ);
+ // If network_events.iErrorCode[FD_READ_BIT] is nonzero, still call
+ // InternalRecvFrom() because it reports a more accurate error code.
+ rv = InternalRecvFrom(core_->read_iobuffer_.get(),
+ core_->read_iobuffer_len_, recv_from_address_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ } else {
+ core_->WatchForRead();
Ryan Hamilton 2015/01/21 04:03:35 Out of curiosity, what conditions lead to this bra
Alpha Left Google 2015/01/22 01:01:26 This comes from tcp_socket_win.cc. The comment th
+ return;
+ }
+ // Convert address if there's data received.
+ if (recv_from_address_ && rv >= 0) {
if (!ReceiveAddressToIPEndpoint(recv_from_address_))
- result = ERR_ADDRESS_INVALID;
+ rv = ERR_ADDRESS_INVALID;
}
- LogRead(result, core_->read_iobuffer_->data());
+ LogRead(rv, core_->read_iobuffer_->data());
core_->read_iobuffer_ = NULL;
+ core_->read_iobuffer_len_ = 0;
recv_from_address_ = NULL;
- DoReadCallback(result);
+ DoReadCallback(rv);
}
void UDPSocketWin::LogRead(int result, const char* bytes) const {
@@ -606,16 +625,31 @@ void UDPSocketWin::LogRead(int result, const char* bytes) const {
}
void UDPSocketWin::DidCompleteWrite() {
- DWORD num_bytes, flags;
- BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
- &num_bytes, FALSE, &flags);
- WSAResetEvent(core_->write_overlapped_.hEvent);
- int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
- LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
-
- send_to_address_.reset();
+ WSANETWORKEVENTS network_events;
+ int os_error = 0;
+ int rv = WSAEnumNetworkEvents(socket_, core_->write_event_, &network_events);
+ if (rv == SOCKET_ERROR) {
+ os_error = WSAGetLastError();
+ rv = MapSystemError(os_error);
+ DoWriteCallback(rv);
+ return;
+ } else if (network_events.lNetworkEvents) {
+ DCHECK(network_events.lNetworkEvents & FD_WRITE);
+ // If network_events.iErrorCode[FD_WRITE_BIT] is nonzero, still call
+ // InternalSendto() because it reports a more accurate error code.
+ rv = InternalSendTo(core_->write_iobuffer_.get(),
+ core_->write_iobuffer_len_, send_to_address_.get());
+ if (rv == ERR_IO_PENDING)
+ return;
+ } else {
+ core_->WatchForWrite();
+ return;
+ }
+ LogWrite(rv, core_->write_iobuffer_->data(), send_to_address_.get());
core_->write_iobuffer_ = NULL;
- DoWriteCallback(result);
+ core_->write_iobuffer_len_ = 0;
+ send_to_address_.reset();
+ DoWriteCallback(rv);
}
void UDPSocketWin::LogWrite(int result,
@@ -639,7 +673,12 @@ void UDPSocketWin::LogWrite(int result,
int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
IPEndPoint* address) {
- DCHECK(!core_->read_iobuffer_.get());
+ if (!core_->non_blocking_reads_initialized_) {
+ // After this call we'll receive FD_READ signals when asynchronous read
rvargas (doing something else) 2015/01/21 22:10:09 nit: after this call the event will be signaled
Alpha Left Google 2015/01/22 01:01:25 Done.
+ // is completed.
+ WSAEventSelect(socket_, core_->read_event_, FD_READ);
+ core_->non_blocking_reads_initialized_ = true;
+ }
SockaddrStorage& storage = core_->recv_addr_storage_;
storage.addr_len = sizeof(storage.addr_storage);
@@ -648,38 +687,40 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
read_buffer.len = buf_len;
DWORD flags = 0;
- DWORD num;
+ DWORD bytes_read;
CHECK_NE(INVALID_SOCKET, socket_);
- AssertEventNotSignaled(core_->read_overlapped_.hEvent);
- int rv = WSARecvFrom(socket_, &read_buffer, 1, &num, &flags, storage.addr,
- &storage.addr_len, &core_->read_overlapped_, NULL);
- if (rv == 0) {
- if (ResetEventIfSignaled(core_->read_overlapped_.hEvent)) {
- int result = num;
- // Convert address.
- if (address && result >= 0) {
- if (!ReceiveAddressToIPEndpoint(address))
- result = ERR_ADDRESS_INVALID;
- }
- LogRead(result, buf->data());
- return result;
+ int rv = WSARecvFrom(socket_, &read_buffer, 1, &bytes_read, &flags,
rvargas (doing something else) 2015/01/21 22:10:09 Looks like there's no reason to use WSARecvFrom ov
Alpha Left Google 2015/01/22 01:01:25 Done.
+ storage.addr, &storage.addr_len, NULL, NULL);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error != WSAEWOULDBLOCK) {
+ rv = MapSystemError(os_error);
+ LogRead(rv, NULL);
+ return rv;
}
} else {
- int os_error = WSAGetLastError();
- if (os_error != WSA_IO_PENDING) {
- int result = MapSystemError(os_error);
- LogRead(result, NULL);
- return result;
+ rv = bytes_read;
+ // Convert address.
+ if (address && rv >= 0 && !ReceiveAddressToIPEndpoint(address)) {
+ rv = ERR_ADDRESS_INVALID;
}
+ LogRead(rv, buf->data());
+ return rv;
}
core_->WatchForRead();
core_->read_iobuffer_ = buf;
+ core_->read_iobuffer_len_ = buf_len;
return ERR_IO_PENDING;
}
int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
const IPEndPoint* address) {
- DCHECK(!core_->write_iobuffer_.get());
+ if (!core_->non_blocking_writes_initialized_) {
+ // After this call we'll receive FD_WRITE signals when asynchronous write
+ // is completed.
+ WSAEventSelect(socket_, core_->write_event_, FD_WRITE);
+ core_->non_blocking_writes_initialized_ = true;
+ }
SockaddrStorage storage;
struct sockaddr* addr = storage.addr;
// Convert address.
@@ -699,27 +740,24 @@ int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
write_buffer.len = buf_len;
DWORD flags = 0;
- DWORD num;
- AssertEventNotSignaled(core_->write_overlapped_.hEvent);
- int rv = WSASendTo(socket_, &write_buffer, 1, &num, flags,
- addr, storage.addr_len, &core_->write_overlapped_, NULL);
- if (rv == 0) {
- if (ResetEventIfSignaled(core_->write_overlapped_.hEvent)) {
- int result = num;
- LogWrite(result, buf->data(), address);
- return result;
- }
- } else {
+ DWORD bytes_written;
+ int rv = WSASendTo(socket_, &write_buffer, 1, &bytes_written, flags, addr,
+ storage.addr_len, NULL, NULL);
+ if (rv == SOCKET_ERROR) {
int os_error = WSAGetLastError();
- if (os_error != WSA_IO_PENDING) {
- int result = MapSystemError(os_error);
- LogWrite(result, NULL, NULL);
- return result;
+ if (os_error != WSAEWOULDBLOCK) {
+ rv = MapSystemError(os_error);
+ LogWrite(rv, NULL, NULL);
+ return rv;
}
+ } else {
+ rv = bytes_written;
+ LogWrite(rv, buf->data(), address);
+ return rv;
}
-
core_->WatchForWrite();
core_->write_iobuffer_ = buf;
+ core_->write_iobuffer_len_ = buf_len;
return ERR_IO_PENDING;
}
« net/udp/udp_socket_unittest.cc ('K') | « net/udp/udp_socket_unittest.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698