Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1273)

Unified Diff: net/udp/udp_socket_win.cc

Issue 861963002: UDP: Windows implementation using non-blocking IO (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix build Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/udp/udp_socket_win.cc
diff --git a/net/udp/udp_socket_win.cc b/net/udp/udp_socket_win.cc
index 2f5549f8ed2bd16190ddca69c7f4ce746a63b5d6..ab237d1411f66f2a5c38b5c018e07c31287defca 100644
--- a/net/udp/udp_socket_win.cc
+++ b/net/udp/udp_socket_win.cc
@@ -261,6 +261,9 @@ UDPSocketWin::UDPSocketWin(DatagramSocket::BindType bind_type,
multicast_time_to_live_(1),
bind_type_(bind_type),
rand_int_cb_(rand_int_cb),
+ use_non_blocking_io_(true),
rvargas (doing something else) 2015/02/03 03:11:41 false
Alpha Left Google 2015/02/03 23:24:45 Done.
+ read_iobuffer_len_(0),
+ write_iobuffer_len_(0),
recv_from_address_(NULL),
net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)),
qos_handle_(NULL),
@@ -285,7 +288,12 @@ int UDPSocketWin::Open(AddressFamily address_family) {
socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, IPPROTO_UDP);
if (socket_ == INVALID_SOCKET)
return MapSystemError(WSAGetLastError());
- core_ = new Core(this);
+ if (!use_non_blocking_io_ || SetNonBlocking(socket_)) {
rvargas (doing something else) 2015/02/03 03:11:41 Failure to set non blocking must be accounted for
Alpha Left Google 2015/02/03 23:24:46 guiwei pointed out that WSAEventSelect already set
+ core_ = new Core(this);
+ } else {
+ read_write_event_.Set(WSACreateEvent());
+ WSAEventSelect(socket_, read_write_event_.Get(), FD_READ | FD_WRITE);
+ }
return OK;
}
@@ -304,6 +312,9 @@ void UDPSocketWin::Close() {
recv_from_address_ = NULL;
write_callback_.Reset();
+ read_write_watcher_.StopWatching();
+ read_write_event_.Close();
rvargas (doing something else) 2015/02/03 03:11:41 must be called after closesocket (or it may race t
Alpha Left Google 2015/02/03 23:24:46 Done.
+
base::TimeTicks start_time = base::TimeTicks::Now();
closesocket(socket_);
UMA_HISTOGRAM_TIMES("Net.UDPSocketWinClose",
@@ -312,8 +323,10 @@ void UDPSocketWin::Close() {
addr_family_ = 0;
is_connected_ = false;
- core_->Detach();
- core_ = NULL;
+ if (core_) {
+ core_->Detach();
+ core_ = NULL;
+ }
}
int UDPSocketWin::GetPeerAddress(IPEndPoint* address) const {
@@ -377,7 +390,8 @@ int UDPSocketWin::RecvFrom(IOBuffer* buf,
DCHECK(!callback.is_null()); // Synchronous operation not supported.
DCHECK_GT(buf_len, 0);
- int nread = InternalRecvFrom(buf, buf_len, address);
+ int nread = core_ ? InternalRecvFromOverlapped(buf, buf_len, address)
+ : InternalRecvFromNonBlocking(buf, buf_len, address);
if (nread != ERR_IO_PENDING)
return nread;
@@ -410,7 +424,8 @@ int UDPSocketWin::SendToOrWrite(IOBuffer* buf,
DCHECK_GT(buf_len, 0);
DCHECK(!send_to_address_.get());
- int nwrite = InternalSendTo(buf, buf_len, address);
+ int nwrite = core_ ? InternalSendToOverlapped(buf, buf_len, address)
+ : InternalSendToNonBlocking(buf, buf_len, address);
if (nwrite != ERR_IO_PENDING)
return nwrite;
@@ -573,31 +588,117 @@ void UDPSocketWin::DidCompleteRead() {
WSAResetEvent(core_->read_overlapped_.hEvent);
int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
// Convert address.
- if (recv_from_address_ && result >= 0) {
- if (!ReceiveAddressToIPEndpoint(recv_from_address_))
+ IPEndPoint address;
+ IPEndPoint* address_to_log = NULL;
+ if (result >= 0) {
+ if (address.FromSockAddr(core_->recv_addr_storage_.addr,
+ core_->recv_addr_storage_.addr_len)) {
+ if (recv_from_address_) {
rvargas (doing something else) 2015/02/03 20:55:30 nit: no {}
Alpha Left Google 2015/02/03 23:24:45 Done.
+ *recv_from_address_ = address;
+ }
+ address_to_log = &address;
+ } else {
result = ERR_ADDRESS_INVALID;
+ }
}
- LogRead(result, core_->read_iobuffer_->data());
+ LogRead(result, core_->read_iobuffer_->data(), address_to_log);
core_->read_iobuffer_ = NULL;
recv_from_address_ = NULL;
DoReadCallback(result);
}
-void UDPSocketWin::LogRead(int result, const char* bytes) const {
+void UDPSocketWin::DidCompleteWrite() {
+ DWORD num_bytes, flags;
+ BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
+ &num_bytes, FALSE, &flags);
+ WSAResetEvent(core_->write_overlapped_.hEvent);
+ int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
+ LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
+
+ send_to_address_.reset();
+ core_->write_iobuffer_ = NULL;
+ DoWriteCallback(result);
+}
+
+void UDPSocketWin::OnObjectSignaled(HANDLE object) {
+ DCHECK(object == read_write_event_.Get());
+ WSANETWORKEVENTS network_events;
+ int os_error = 0;
+ int rv =
+ WSAEnumNetworkEvents(socket_, read_write_event_.Get(), &network_events);
+ if (rv == SOCKET_ERROR) {
+ os_error = WSAGetLastError();
+ rv = MapSystemError(os_error);
+ if (read_iobuffer_) {
+ read_iobuffer_ = NULL;
+ read_iobuffer_len_ = 0;
+ recv_from_address_ = NULL;
+ DoReadCallback(rv);
+ }
+ if (write_iobuffer_) {
+ write_iobuffer_ = NULL;
+ write_iobuffer_len_ = 0;
+ send_to_address_.reset();
+ DoWriteCallback(rv);
+ }
+ return;
+ }
+ if ((network_events.lNetworkEvents & FD_READ) && read_iobuffer_) {
+ OnReadSignaled();
+ }
+ if ((network_events.lNetworkEvents & FD_WRITE) && write_iobuffer_) {
+ OnWriteSignaled();
+ }
+
+ // There's still pending read / write. Watch for further events.
+ if (read_iobuffer_ || write_iobuffer_) {
+ WatchForReadWrite();
+ }
+}
+
+void UDPSocketWin::OnReadSignaled() {
+ DCHECK(read_iobuffer_);
rvargas (doing something else) 2015/02/03 03:11:41 Either both methods dcheck the buffer or not. (I h
Alpha Left Google 2015/02/03 23:24:45 Yeah this DCHECK should be removed since line 646
+ int rv = InternalRecvFromNonBlocking(read_iobuffer_.get(), read_iobuffer_len_,
+ recv_from_address_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ read_iobuffer_ = NULL;
+ read_iobuffer_len_ = 0;
+ recv_from_address_ = NULL;
+ DoReadCallback(rv);
+}
+
+void UDPSocketWin::OnWriteSignaled() {
+ int rv = InternalSendToNonBlocking(write_iobuffer_.get(), write_iobuffer_len_,
+ send_to_address_.get());
+ if (rv == ERR_IO_PENDING)
+ return;
+ write_iobuffer_ = NULL;
+ write_iobuffer_len_ = 0;
+ send_to_address_.reset();
+ DoWriteCallback(rv);
+}
+
+void UDPSocketWin::WatchForReadWrite() {
+ if (read_write_watcher_.GetWatchedObject() != NULL) {
rvargas (doing something else) 2015/02/03 03:11:41 When would this be the case? (red flag: checking a
Alpha Left Google 2015/02/03 23:24:46 For example both write and read got WSA_IO_PENDING
rvargas (doing something else) 2015/02/03 23:54:33 The code from the object watcher is quite old... a
+ DCHECK(read_write_watcher_.GetWatchedObject() == read_write_event_.Get());
+ return;
+ }
+ read_write_watcher_.StartWatching(read_write_event_.Get(), this);
+}
+
+void UDPSocketWin::LogRead(int result,
+ const char* bytes,
+ const IPEndPoint* address) const {
if (result < 0) {
net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
return;
}
if (net_log_.IsLogging()) {
- // Get address for logging, if |address| is NULL.
- IPEndPoint address;
- bool is_address_valid = ReceiveAddressToIPEndpoint(&address);
net_log_.AddEvent(
NetLog::TYPE_UDP_BYTES_RECEIVED,
- CreateNetLogUDPDataTranferCallback(
- result, bytes,
- is_address_valid ? &address : NULL));
+ CreateNetLogUDPDataTranferCallback(result, bytes, address));
}
base::StatsCounter read_bytes("udp.read_bytes");
@@ -605,19 +706,6 @@ void UDPSocketWin::LogRead(int result, const char* bytes) const {
NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
}
-void UDPSocketWin::DidCompleteWrite() {
- DWORD num_bytes, flags;
- BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
- &num_bytes, FALSE, &flags);
- WSAResetEvent(core_->write_overlapped_.hEvent);
- int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
- LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
-
- send_to_address_.reset();
- core_->write_iobuffer_ = NULL;
- DoWriteCallback(result);
-}
-
void UDPSocketWin::LogWrite(int result,
const char* bytes,
const IPEndPoint* address) const {
@@ -637,8 +725,9 @@ void UDPSocketWin::LogWrite(int result,
NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
}
-int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
- IPEndPoint* address) {
+int UDPSocketWin::InternalRecvFromOverlapped(IOBuffer* buf,
+ int buf_len,
+ IPEndPoint* address) {
DCHECK(!core_->read_iobuffer_.get());
SockaddrStorage& storage = core_->recv_addr_storage_;
storage.addr_len = sizeof(storage.addr_storage);
@@ -657,18 +746,26 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
if (ResetEventIfSignaled(core_->read_overlapped_.hEvent)) {
int result = num;
// Convert address.
- if (address && result >= 0) {
- if (!ReceiveAddressToIPEndpoint(address))
+ IPEndPoint address_storage;
+ IPEndPoint* address_to_log = NULL;
+ if (result >= 0) {
+ if (address_storage.FromSockAddr(core_->recv_addr_storage_.addr,
+ core_->recv_addr_storage_.addr_len)) {
+ if (address)
+ *address = address_storage;
+ address_to_log = &address_storage;
+ } else {
result = ERR_ADDRESS_INVALID;
+ }
}
- LogRead(result, buf->data());
+ LogRead(result, buf->data(), address_to_log);
return result;
}
} else {
int os_error = WSAGetLastError();
if (os_error != WSA_IO_PENDING) {
int result = MapSystemError(os_error);
- LogRead(result, NULL);
+ LogRead(result, NULL, NULL);
return result;
}
}
@@ -677,8 +774,9 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
return ERR_IO_PENDING;
}
-int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
- const IPEndPoint* address) {
+int UDPSocketWin::InternalSendToOverlapped(IOBuffer* buf,
+ int buf_len,
+ const IPEndPoint* address) {
DCHECK(!core_->write_iobuffer_.get());
SockaddrStorage storage;
struct sockaddr* addr = storage.addr;
@@ -723,6 +821,76 @@ int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
return ERR_IO_PENDING;
}
+int UDPSocketWin::InternalRecvFromNonBlocking(IOBuffer* buf,
+ int buf_len,
+ IPEndPoint* address) {
+ SockaddrStorage storage;
+ storage.addr_len = sizeof(storage.addr_storage);
+
+ CHECK_NE(INVALID_SOCKET, socket_);
+ int rv = recvfrom(socket_, buf->data(), buf_len, 0, storage.addr,
+ &storage.addr_len);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error == WSAEWOULDBLOCK) {
+ read_iobuffer_ = buf;
+ read_iobuffer_len_ = buf_len;
+ WatchForReadWrite();
+ return ERR_IO_PENDING;
+ }
+ rv = MapSystemError(os_error);
+ LogRead(rv, NULL, NULL);
+ return rv;
+ }
+ IPEndPoint address_storage;
+ IPEndPoint* address_to_log = NULL;
+ if (rv >= 0) {
+ if (address_storage.FromSockAddr(storage.addr, storage.addr_len)) {
+ if (address)
+ *address = address_storage;
+ address_to_log = &address_storage;
+ } else {
+ rv = ERR_ADDRESS_INVALID;
+ }
+ }
+ LogRead(rv, buf->data(), address_to_log);
+ return rv;
+}
+
+int UDPSocketWin::InternalSendToNonBlocking(IOBuffer* buf,
+ int buf_len,
+ const IPEndPoint* address) {
+ SockaddrStorage storage;
+ struct sockaddr* addr = storage.addr;
+ // Convert address.
+ if (address) {
+ if (!address->ToSockAddr(addr, &storage.addr_len)) {
+ int result = ERR_ADDRESS_INVALID;
+ LogWrite(result, NULL, NULL);
+ return result;
+ }
+ } else {
+ addr = NULL;
+ storage.addr_len = 0;
+ }
+
+ int rv = sendto(socket_, buf->data(), buf_len, 0, addr, storage.addr_len);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error == WSAEWOULDBLOCK) {
+ write_iobuffer_ = buf;
+ write_iobuffer_len_ = buf_len;
+ WatchForReadWrite();
+ return ERR_IO_PENDING;
+ }
+ rv = MapSystemError(os_error);
+ LogWrite(rv, NULL, NULL);
+ return rv;
+ }
+ LogWrite(rv, buf->data(), address);
+ return rv;
+}
+
int UDPSocketWin::SetMulticastOptions() {
if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
DWORD loop = 0;
@@ -807,11 +975,6 @@ int UDPSocketWin::RandomBind(const IPAddressNumber& address) {
return DoBind(IPEndPoint(address, 0));
}
-bool UDPSocketWin::ReceiveAddressToIPEndpoint(IPEndPoint* address) const {
- SockaddrStorage& storage = core_->recv_addr_storage_;
- return address->FromSockAddr(storage.addr, storage.addr_len);
-}
-
int UDPSocketWin::JoinGroup(
const IPAddressNumber& group_address) const {
DCHECK(CalledOnValidThread());
@@ -1016,4 +1179,10 @@ void UDPSocketWin::DetachFromThread() {
base::NonThreadSafe::DetachFromThread();
}
+void UDPSocketWin::UseNonBlockingIO() {
+ if (core_)
+ return;
rvargas (doing something else) 2015/02/03 03:11:41 This looks like a serious issue. The data from the
Alpha Left Google 2015/02/03 23:24:46 I'll return a boolean with this call.
+ use_non_blocking_io_ = true;
+}
+
} // namespace net
« no previous file with comments | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698