Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2105)

Unified Diff: runtime/bin/eventhandler_win.cc

Issue 85993002: Add UDP support to dart:io (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Update test Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/bin/eventhandler_win.cc
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
index 334d496ee677915af4ca78d7be0103fec852e6fb..28add7ffd4c9c3a515a17a19d42a1100d94dd732 100644
--- a/runtime/bin/eventhandler_win.cc
+++ b/runtime/bin/eventhandler_win.cc
@@ -33,8 +33,14 @@ static const int kShutdownId = -2;
OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
Operation operation) {
+ int actual_size = buffer_size;
+ // For calling recvfrom additional buffer space is needed for the source
+ // address information.
+ if (operation == kRecvFrom) {
Anders Johnsen 2013/12/12 12:19:58 Move this to AllocateRecvFromBuffer?
Søren Gjesse 2013/12/12 15:44:39 Done.
+ actual_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage);
Anders Johnsen 2013/12/12 12:19:58 Make the size a constant in OverlappedBuffer.
Søren Gjesse 2013/12/12 15:44:39 Done.
+ }
OverlappedBuffer* buffer =
- new(buffer_size) OverlappedBuffer(buffer_size, operation);
+ new(actual_size) OverlappedBuffer(actual_size, operation);
return buffer;
}
@@ -50,11 +56,21 @@ OverlappedBuffer* OverlappedBuffer::AllocateReadBuffer(int buffer_size) {
}
+OverlappedBuffer* OverlappedBuffer::AllocateRecvFromBuffer(int buffer_size) {
+ return AllocateBuffer(buffer_size, kRecvFrom);
+}
+
+
OverlappedBuffer* OverlappedBuffer::AllocateWriteBuffer(int buffer_size) {
return AllocateBuffer(buffer_size, kWrite);
}
+OverlappedBuffer* OverlappedBuffer::AllocateSendToBuffer(int buffer_size) {
+ return AllocateBuffer(buffer_size, kSendTo);
+}
+
+
OverlappedBuffer* OverlappedBuffer::AllocateDisconnectBuffer() {
return AllocateBuffer(0, kDisconnect);
}
@@ -91,7 +107,7 @@ int OverlappedBuffer::Write(const void* buffer, int num_bytes) {
int OverlappedBuffer::GetRemainingLength() {
- ASSERT(operation_ == kRead);
+ ASSERT(operation_ == kRead || operation_ == kRecvFrom);
return data_length_ - index_;
}
@@ -199,6 +215,11 @@ void Handle::ReadComplete(OverlappedBuffer* buffer) {
}
+void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
+ ReadComplete(buffer);
+}
+
+
void Handle::WriteComplete(OverlappedBuffer* buffer) {
ScopedLock lock(this);
// Currently only one outstanding write at the time.
@@ -279,6 +300,11 @@ bool Handle::IssueRead() {
}
+bool Handle::IssueRecvFrom() {
+ return false;
+}
+
+
bool Handle::IssueWrite() {
ScopedLock lock(this);
ASSERT(type_ != kListenSocket);
@@ -303,6 +329,11 @@ bool Handle::IssueWrite() {
}
+bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
+ return false;
+}
+
+
void Handle::HandleIssueError() {
DWORD error = GetLastError();
if (error == ERROR_BROKEN_PIPE) {
@@ -543,6 +574,27 @@ int Handle::Read(void* buffer, int num_bytes) {
}
+int Handle::RecvFrom(
+ void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) {
+ ScopedLock lock(this);
+ if (data_ready_ == NULL) return 0;
+ num_bytes = data_ready_->Read(buffer, num_bytes);
Anders Johnsen 2013/12/12 12:19:58 Will this allow us to read only a part of a UDP pa
Søren Gjesse 2013/12/12 15:44:39 It will, I changed it to dispose of the rest, so t
+ if (data_ready_->from()->sa_family == AF_INET) {
+ ASSERT(sa_len >= sizeof(struct sockaddr_in));
+ memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
+ } else {
+ ASSERT(data_ready_->from()->sa_family == AF_INET6);
+ ASSERT(sa_len >= sizeof(struct sockaddr_in6));
+ memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
+ }
+ if (data_ready_->IsEmpty()) {
+ OverlappedBuffer::DisposeBuffer(data_ready_);
+ data_ready_ = NULL;
+ }
+ return num_bytes;
+}
+
+
int Handle::Write(const void* buffer, int num_bytes) {
ScopedLock lock(this);
if (pending_write_ != NULL) return 0;
@@ -556,6 +608,20 @@ int Handle::Write(const void* buffer, int num_bytes) {
}
+int Handle::SendTo(
+ const void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) {
+ ScopedLock lock(this);
+ if (pending_write_ != NULL) return 0;
+ if (num_bytes > kBufferSize) num_bytes = kBufferSize;
+ ASSERT(SupportsOverlappedIO());
+ if (completion_port_ == INVALID_HANDLE_VALUE) return 0;
+ pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
+ pending_write_->Write(buffer, num_bytes);
+ if (!IssueSendTo(sa, sa_len)) return -1;
Anders Johnsen 2013/12/12 12:19:58 Can this leak the pending_write?
Søren Gjesse 2013/12/12 15:44:39 No, IssueSendTo will dispose of it before returnin
+ return num_bytes;
+}
+
+
static void WriteFileThread(uword args) {
StdHandle* handle = reinterpret_cast<StdHandle*>(args);
handle->RunWriteLoop();
@@ -707,7 +773,9 @@ bool ClientSocket::IssueRead() {
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
ASSERT(pending_read_ == NULL);
- OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(1024);
+ // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
+ // handle 64k datagrams.
+ OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
DWORD flags;
flags = 0;
@@ -791,6 +859,85 @@ bool ClientSocket::IsClosed() {
}
+bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
+ ScopedLock lock(this);
+ ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
+ ASSERT(pending_write_ != NULL);
+ ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
+
+ int rc = WSASendTo(socket(),
+ pending_write_->GetWASBUF(),
+ 1,
+ NULL,
+ 0,
+ sa,
+ sa_len,
+ pending_write_->GetCleanOverlapped(),
+ NULL);
+ if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
+ return true;
+ }
+ OverlappedBuffer::DisposeBuffer(pending_write_);
+ pending_write_ = NULL;
+ HandleIssueError();
+ return false;
+}
+
+
+bool DatagramSocket::IssueRecvFrom() {
+ ScopedLock lock(this);
+ ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
+ ASSERT(pending_read_ == NULL);
+
+ OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024);
+
+ DWORD flags;
+ flags = 0;
+ int len;
+ int rc = WSARecvFrom(socket(),
+ buffer->GetWASBUF(),
+ 1,
+ NULL,
+ &flags,
+ buffer->from(),
+ buffer->from_len_addr(),
+ buffer->GetCleanOverlapped(),
+ NULL);
+ if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
+ pending_read_ = buffer;
+ return true;
+ }
+ OverlappedBuffer::DisposeBuffer(buffer);
+ pending_read_ = NULL;
+ HandleIssueError();
+ return false;
+}
+
+
+void DatagramSocket::EnsureInitialized(
+ EventHandlerImplementation* event_handler) {
+ ScopedLock lock(this);
+ if (completion_port_ == INVALID_HANDLE_VALUE) {
+ ASSERT(event_handler_ == NULL);
+ event_handler_ = event_handler;
+ CreateCompletionPort(event_handler_->completion_port());
+ }
+}
+
+
+bool DatagramSocket::IsClosed() {
+ return IsClosing() && !HasPendingRead() && !HasPendingWrite();
+}
+
+
+void DatagramSocket::DoClose() {
+ // Just close the socket. This will cause any queued requests to be aborted.
+ closesocket(socket());
+ MarkClosedRead();
+ MarkClosedWrite();
+}
+
+
static void DeleteIfClosed(Handle* handle) {
if (handle->IsClosed()) {
Dart_Port port = handle->port();
@@ -858,7 +1005,11 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
int event_mask = (1 << kCloseEvent);
DartUtils::PostInt32(handle->port(), event_mask);
} else if (!handle->HasPendingRead()) {
- handle->IssueRead();
+ if (handle->is_datagram_socket()) {
+ handle->IssueRecvFrom();
+ } else {
+ handle->IssueRead();
+ }
}
}
@@ -953,6 +1104,23 @@ void EventHandlerImplementation::HandleRead(Handle* handle,
}
+void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
+ int bytes,
+ OverlappedBuffer* buffer) {
+ ASSERT(handle->is_datagram_socket());
+ buffer->set_data_length(bytes);
+ handle->ReadComplete(buffer);
+ if (!handle->IsClosing()) {
+ int event_mask = 1 << kInEvent;
+ if ((handle->mask() & event_mask) != 0) {
+ DartUtils::PostInt32(handle->port(), event_mask);
+ }
+ }
+
+ DeleteIfClosed(handle);
+}
+
+
void EventHandlerImplementation::HandleWrite(Handle* handle,
int bytes,
OverlappedBuffer* buffer) {
@@ -1004,7 +1172,13 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
HandleRead(handle, bytes, buffer);
break;
}
- case OverlappedBuffer::kWrite: {
+ case OverlappedBuffer::kRecvFrom: {
+ Handle* handle = reinterpret_cast<Handle*>(key);
+ HandleRecvFrom(handle, bytes, buffer);
+ break;
+ }
+ case OverlappedBuffer::kWrite:
+ case OverlappedBuffer::kSendTo: {
Handle* handle = reinterpret_cast<Handle*>(key);
HandleWrite(handle, bytes, buffer);
break;
@@ -1101,6 +1275,8 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
ASSERT(bytes == 0);
handler_impl->HandleIOCompletion(bytes, key, overlapped);
} else {
+ OverlappedBuffer* buffer =
Anders Johnsen 2013/12/12 12:19:58 Unused?
Søren Gjesse 2013/12/12 15:44:39 Yes, removed.
+ OverlappedBuffer::GetFromOverlapped(overlapped);
ASSERT(bytes == 0);
handler_impl->HandleIOCompletion(-1, key, overlapped);
}

Powered by Google App Engine
This is Rietveld 408576698