Index: runtime/bin/eventhandler_win.cc |
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc |
index 334d496ee677915af4ca78d7be0103fec852e6fb..7b5d66af420a192da74df9de40b6c8cc5eb700e4 100644 |
--- a/runtime/bin/eventhandler_win.cc |
+++ b/runtime/bin/eventhandler_win.cc |
@@ -50,11 +50,24 @@ OverlappedBuffer* OverlappedBuffer::AllocateReadBuffer(int buffer_size) { |
} |
+OverlappedBuffer* OverlappedBuffer::AllocateRecvFromBuffer(int buffer_size) { |
+ // For calling recvfrom additional buffer space is needed for the source |
+ // address information. |
+ buffer_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage); |
+ return AllocateBuffer(buffer_size, kRecvFrom); |
+} |
+ |
+ |
OverlappedBuffer* OverlappedBuffer::AllocateWriteBuffer(int buffer_size) { |
return AllocateBuffer(buffer_size, kWrite); |
} |
+OverlappedBuffer* OverlappedBuffer::AllocateSendToBuffer(int buffer_size) { |
+ return AllocateBuffer(buffer_size, kSendTo); |
+} |
+ |
+ |
OverlappedBuffer* OverlappedBuffer::AllocateDisconnectBuffer() { |
return AllocateBuffer(0, kDisconnect); |
} |
@@ -91,7 +104,7 @@ int OverlappedBuffer::Write(const void* buffer, int num_bytes) { |
int OverlappedBuffer::GetRemainingLength() { |
- ASSERT(operation_ == kRead); |
+ ASSERT(operation_ == kRead || operation_ == kRecvFrom); |
return data_length_ - index_; |
} |
@@ -199,6 +212,11 @@ void Handle::ReadComplete(OverlappedBuffer* buffer) { |
} |
+void Handle::RecvFromComplete(OverlappedBuffer* buffer) { |
+ ReadComplete(buffer); |
+} |
+ |
+ |
void Handle::WriteComplete(OverlappedBuffer* buffer) { |
ScopedLock lock(this); |
// Currently only one outstanding write at the time. |
@@ -279,6 +297,11 @@ bool Handle::IssueRead() { |
} |
+bool Handle::IssueRecvFrom() { |
+ return false; |
+} |
+ |
+ |
bool Handle::IssueWrite() { |
ScopedLock lock(this); |
ASSERT(type_ != kListenSocket); |
@@ -303,6 +326,11 @@ bool Handle::IssueWrite() { |
} |
+bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
+ return false; |
+} |
+ |
+ |
void Handle::HandleIssueError() { |
DWORD error = GetLastError(); |
if (error == ERROR_BROKEN_PIPE) { |
@@ -543,6 +571,27 @@ int Handle::Read(void* buffer, int num_bytes) { |
} |
+int Handle::RecvFrom( |
+ void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) { |
+ ScopedLock lock(this); |
+ if (data_ready_ == NULL) return 0; |
+ num_bytes = data_ready_->Read(buffer, num_bytes); |
+ if (data_ready_->from()->sa_family == AF_INET) { |
+ ASSERT(sa_len >= sizeof(struct sockaddr_in)); |
+ memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); |
+ } else { |
+ ASSERT(data_ready_->from()->sa_family == AF_INET6); |
+ ASSERT(sa_len >= sizeof(struct sockaddr_in6)); |
+ memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); |
+ } |
+ // Always dispose of the buffer, as UDP messages must be read in their |
+ // entirety to match how recvfrom works in a socket. |
+ OverlappedBuffer::DisposeBuffer(data_ready_); |
+ data_ready_ = NULL; |
+ return num_bytes; |
+} |
+ |
+ |
int Handle::Write(const void* buffer, int num_bytes) { |
ScopedLock lock(this); |
if (pending_write_ != NULL) return 0; |
@@ -556,6 +605,20 @@ int Handle::Write(const void* buffer, int num_bytes) { |
} |
+int Handle::SendTo( |
+ const void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) { |
+ ScopedLock lock(this); |
+ if (pending_write_ != NULL) return 0; |
+ if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
+ ASSERT(SupportsOverlappedIO()); |
+ if (completion_port_ == INVALID_HANDLE_VALUE) return 0; |
+ pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); |
+ pending_write_->Write(buffer, num_bytes); |
+ if (!IssueSendTo(sa, sa_len)) return -1; |
+ return num_bytes; |
+} |
+ |
+ |
static void WriteFileThread(uword args) { |
StdHandle* handle = reinterpret_cast<StdHandle*>(args); |
handle->RunWriteLoop(); |
@@ -707,7 +770,9 @@ bool ClientSocket::IssueRead() { |
ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
ASSERT(pending_read_ == NULL); |
- OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(1024); |
+ // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can |
+ // handle 64k datagrams. |
+ OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); |
DWORD flags; |
flags = 0; |
@@ -791,6 +856,85 @@ bool ClientSocket::IsClosed() { |
} |
+bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
+ ScopedLock lock(this); |
+ ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
+ ASSERT(pending_write_ != NULL); |
+ ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); |
+ |
+ int rc = WSASendTo(socket(), |
+ pending_write_->GetWASBUF(), |
+ 1, |
+ NULL, |
+ 0, |
+ sa, |
+ sa_len, |
+ pending_write_->GetCleanOverlapped(), |
+ NULL); |
+ if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
+ return true; |
+ } |
+ OverlappedBuffer::DisposeBuffer(pending_write_); |
+ pending_write_ = NULL; |
+ HandleIssueError(); |
+ return false; |
+} |
+ |
+ |
+bool DatagramSocket::IssueRecvFrom() { |
+ ScopedLock lock(this); |
+ ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
+ ASSERT(pending_read_ == NULL); |
+ |
+ OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); |
+ |
+ DWORD flags; |
+ flags = 0; |
+ int len; |
+ int rc = WSARecvFrom(socket(), |
+ buffer->GetWASBUF(), |
+ 1, |
+ NULL, |
+ &flags, |
+ buffer->from(), |
+ buffer->from_len_addr(), |
+ buffer->GetCleanOverlapped(), |
+ NULL); |
+ if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
+ pending_read_ = buffer; |
+ return true; |
+ } |
+ OverlappedBuffer::DisposeBuffer(buffer); |
+ pending_read_ = NULL; |
+ HandleIssueError(); |
+ return false; |
+} |
+ |
+ |
+void DatagramSocket::EnsureInitialized( |
+ EventHandlerImplementation* event_handler) { |
+ ScopedLock lock(this); |
+ if (completion_port_ == INVALID_HANDLE_VALUE) { |
+ ASSERT(event_handler_ == NULL); |
+ event_handler_ = event_handler; |
+ CreateCompletionPort(event_handler_->completion_port()); |
+ } |
+} |
+ |
+ |
+bool DatagramSocket::IsClosed() { |
+ return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
+} |
+ |
+ |
+void DatagramSocket::DoClose() { |
+ // Just close the socket. This will cause any queued requests to be aborted. |
+ closesocket(socket()); |
+ MarkClosedRead(); |
+ MarkClosedWrite(); |
+} |
+ |
+ |
static void DeleteIfClosed(Handle* handle) { |
if (handle->IsClosed()) { |
Dart_Port port = handle->port(); |
@@ -858,7 +1002,11 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
int event_mask = (1 << kCloseEvent); |
DartUtils::PostInt32(handle->port(), event_mask); |
} else if (!handle->HasPendingRead()) { |
- handle->IssueRead(); |
+ if (handle->is_datagram_socket()) { |
+ handle->IssueRecvFrom(); |
+ } else { |
+ handle->IssueRead(); |
+ } |
} |
} |
@@ -953,6 +1101,23 @@ void EventHandlerImplementation::HandleRead(Handle* handle, |
} |
+void EventHandlerImplementation::HandleRecvFrom(Handle* handle, |
+ int bytes, |
+ OverlappedBuffer* buffer) { |
+ ASSERT(handle->is_datagram_socket()); |
+ buffer->set_data_length(bytes); |
+ handle->ReadComplete(buffer); |
+ if (!handle->IsClosing()) { |
+ int event_mask = 1 << kInEvent; |
+ if ((handle->mask() & event_mask) != 0) { |
+ DartUtils::PostInt32(handle->port(), event_mask); |
+ } |
+ } |
+ |
+ DeleteIfClosed(handle); |
+} |
+ |
+ |
void EventHandlerImplementation::HandleWrite(Handle* handle, |
int bytes, |
OverlappedBuffer* buffer) { |
@@ -1004,7 +1169,13 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, |
HandleRead(handle, bytes, buffer); |
break; |
} |
- case OverlappedBuffer::kWrite: { |
+ case OverlappedBuffer::kRecvFrom: { |
+ Handle* handle = reinterpret_cast<Handle*>(key); |
+ HandleRecvFrom(handle, bytes, buffer); |
+ break; |
+ } |
+ case OverlappedBuffer::kWrite: |
+ case OverlappedBuffer::kSendTo: { |
Handle* handle = reinterpret_cast<Handle*>(key); |
HandleWrite(handle, bytes, buffer); |
break; |