| Index: runtime/bin/eventhandler_win.cc
|
| diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
|
| index 334d496ee677915af4ca78d7be0103fec852e6fb..7b5d66af420a192da74df9de40b6c8cc5eb700e4 100644
|
| --- a/runtime/bin/eventhandler_win.cc
|
| +++ b/runtime/bin/eventhandler_win.cc
|
| @@ -50,11 +50,24 @@ OverlappedBuffer* OverlappedBuffer::AllocateReadBuffer(int buffer_size) {
|
| }
|
|
|
|
|
| +OverlappedBuffer* OverlappedBuffer::AllocateRecvFromBuffer(int buffer_size) {
|
| + // For calling recvfrom additional buffer space is needed for the source
|
| + // address information.
|
| + buffer_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage);
|
| + return AllocateBuffer(buffer_size, kRecvFrom);
|
| +}
|
| +
|
| +
|
| OverlappedBuffer* OverlappedBuffer::AllocateWriteBuffer(int buffer_size) {
|
| return AllocateBuffer(buffer_size, kWrite);
|
| }
|
|
|
|
|
| +OverlappedBuffer* OverlappedBuffer::AllocateSendToBuffer(int buffer_size) {
|
| + return AllocateBuffer(buffer_size, kSendTo);
|
| +}
|
| +
|
| +
|
| OverlappedBuffer* OverlappedBuffer::AllocateDisconnectBuffer() {
|
| return AllocateBuffer(0, kDisconnect);
|
| }
|
| @@ -91,7 +104,7 @@ int OverlappedBuffer::Write(const void* buffer, int num_bytes) {
|
|
|
|
|
| int OverlappedBuffer::GetRemainingLength() {
|
| - ASSERT(operation_ == kRead);
|
| + ASSERT(operation_ == kRead || operation_ == kRecvFrom);
|
| return data_length_ - index_;
|
| }
|
|
|
| @@ -199,6 +212,11 @@ void Handle::ReadComplete(OverlappedBuffer* buffer) {
|
| }
|
|
|
|
|
| +void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
|
| + ReadComplete(buffer);
|
| +}
|
| +
|
| +
|
| void Handle::WriteComplete(OverlappedBuffer* buffer) {
|
| ScopedLock lock(this);
|
| // Currently only one outstanding write at the time.
|
| @@ -279,6 +297,11 @@ bool Handle::IssueRead() {
|
| }
|
|
|
|
|
| +bool Handle::IssueRecvFrom() {
|
| + return false;
|
| +}
|
| +
|
| +
|
| bool Handle::IssueWrite() {
|
| ScopedLock lock(this);
|
| ASSERT(type_ != kListenSocket);
|
| @@ -303,6 +326,11 @@ bool Handle::IssueWrite() {
|
| }
|
|
|
|
|
| +bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
|
| + return false;
|
| +}
|
| +
|
| +
|
| void Handle::HandleIssueError() {
|
| DWORD error = GetLastError();
|
| if (error == ERROR_BROKEN_PIPE) {
|
| @@ -543,6 +571,27 @@ int Handle::Read(void* buffer, int num_bytes) {
|
| }
|
|
|
|
|
| +int Handle::RecvFrom(
|
| + void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) {
|
| + ScopedLock lock(this);
|
| + if (data_ready_ == NULL) return 0;
|
| + num_bytes = data_ready_->Read(buffer, num_bytes);
|
| + if (data_ready_->from()->sa_family == AF_INET) {
|
| + ASSERT(sa_len >= sizeof(struct sockaddr_in));
|
| + memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
|
| + } else {
|
| + ASSERT(data_ready_->from()->sa_family == AF_INET6);
|
| + ASSERT(sa_len >= sizeof(struct sockaddr_in6));
|
| + memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
|
| + }
|
| + // Always dispose of the buffer, as UDP messages must be read in their
|
| + // entirety to match how recvfrom works in a socket.
|
| + OverlappedBuffer::DisposeBuffer(data_ready_);
|
| + data_ready_ = NULL;
|
| + return num_bytes;
|
| +}
|
| +
|
| +
|
| int Handle::Write(const void* buffer, int num_bytes) {
|
| ScopedLock lock(this);
|
| if (pending_write_ != NULL) return 0;
|
| @@ -556,6 +605,20 @@ int Handle::Write(const void* buffer, int num_bytes) {
|
| }
|
|
|
|
|
| +int Handle::SendTo(
|
| + const void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) {
|
| + ScopedLock lock(this);
|
| + if (pending_write_ != NULL) return 0;
|
| + if (num_bytes > kBufferSize) num_bytes = kBufferSize;
|
| + ASSERT(SupportsOverlappedIO());
|
| + if (completion_port_ == INVALID_HANDLE_VALUE) return 0;
|
| + pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
|
| + pending_write_->Write(buffer, num_bytes);
|
| + if (!IssueSendTo(sa, sa_len)) return -1;
|
| + return num_bytes;
|
| +}
|
| +
|
| +
|
| static void WriteFileThread(uword args) {
|
| StdHandle* handle = reinterpret_cast<StdHandle*>(args);
|
| handle->RunWriteLoop();
|
| @@ -707,7 +770,9 @@ bool ClientSocket::IssueRead() {
|
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| ASSERT(pending_read_ == NULL);
|
|
|
| - OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(1024);
|
| + // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
|
| + // handle 64k datagrams.
|
| + OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
|
|
|
| DWORD flags;
|
| flags = 0;
|
| @@ -791,6 +856,85 @@ bool ClientSocket::IsClosed() {
|
| }
|
|
|
|
|
| +bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
|
| + ScopedLock lock(this);
|
| + ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| + ASSERT(pending_write_ != NULL);
|
| + ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
|
| +
|
| + int rc = WSASendTo(socket(),
|
| + pending_write_->GetWASBUF(),
|
| + 1,
|
| + NULL,
|
| + 0,
|
| + sa,
|
| + sa_len,
|
| + pending_write_->GetCleanOverlapped(),
|
| + NULL);
|
| + if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
|
| + return true;
|
| + }
|
| + OverlappedBuffer::DisposeBuffer(pending_write_);
|
| + pending_write_ = NULL;
|
| + HandleIssueError();
|
| + return false;
|
| +}
|
| +
|
| +
|
| +bool DatagramSocket::IssueRecvFrom() {
|
| + ScopedLock lock(this);
|
| + ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| + ASSERT(pending_read_ == NULL);
|
| +
|
| + OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024);
|
| +
|
| + DWORD flags;
|
| + flags = 0;
|
| + int len;
|
| + int rc = WSARecvFrom(socket(),
|
| + buffer->GetWASBUF(),
|
| + 1,
|
| + NULL,
|
| + &flags,
|
| + buffer->from(),
|
| + buffer->from_len_addr(),
|
| + buffer->GetCleanOverlapped(),
|
| + NULL);
|
| + if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
|
| + pending_read_ = buffer;
|
| + return true;
|
| + }
|
| + OverlappedBuffer::DisposeBuffer(buffer);
|
| + pending_read_ = NULL;
|
| + HandleIssueError();
|
| + return false;
|
| +}
|
| +
|
| +
|
| +void DatagramSocket::EnsureInitialized(
|
| + EventHandlerImplementation* event_handler) {
|
| + ScopedLock lock(this);
|
| + if (completion_port_ == INVALID_HANDLE_VALUE) {
|
| + ASSERT(event_handler_ == NULL);
|
| + event_handler_ = event_handler;
|
| + CreateCompletionPort(event_handler_->completion_port());
|
| + }
|
| +}
|
| +
|
| +
|
| +bool DatagramSocket::IsClosed() {
|
| + return IsClosing() && !HasPendingRead() && !HasPendingWrite();
|
| +}
|
| +
|
| +
|
| +void DatagramSocket::DoClose() {
|
| + // Just close the socket. This will cause any queued requests to be aborted.
|
| + closesocket(socket());
|
| + MarkClosedRead();
|
| + MarkClosedWrite();
|
| +}
|
| +
|
| +
|
| static void DeleteIfClosed(Handle* handle) {
|
| if (handle->IsClosed()) {
|
| Dart_Port port = handle->port();
|
| @@ -858,7 +1002,11 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| int event_mask = (1 << kCloseEvent);
|
| DartUtils::PostInt32(handle->port(), event_mask);
|
| } else if (!handle->HasPendingRead()) {
|
| - handle->IssueRead();
|
| + if (handle->is_datagram_socket()) {
|
| + handle->IssueRecvFrom();
|
| + } else {
|
| + handle->IssueRead();
|
| + }
|
| }
|
| }
|
|
|
| @@ -953,6 +1101,23 @@ void EventHandlerImplementation::HandleRead(Handle* handle,
|
| }
|
|
|
|
|
| +void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
|
| + int bytes,
|
| + OverlappedBuffer* buffer) {
|
| + ASSERT(handle->is_datagram_socket());
|
| + buffer->set_data_length(bytes);
|
| + handle->ReadComplete(buffer);
|
| + if (!handle->IsClosing()) {
|
| + int event_mask = 1 << kInEvent;
|
| + if ((handle->mask() & event_mask) != 0) {
|
| + DartUtils::PostInt32(handle->port(), event_mask);
|
| + }
|
| + }
|
| +
|
| + DeleteIfClosed(handle);
|
| +}
|
| +
|
| +
|
| void EventHandlerImplementation::HandleWrite(Handle* handle,
|
| int bytes,
|
| OverlappedBuffer* buffer) {
|
| @@ -1004,7 +1169,13 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
|
| HandleRead(handle, bytes, buffer);
|
| break;
|
| }
|
| - case OverlappedBuffer::kWrite: {
|
| + case OverlappedBuffer::kRecvFrom: {
|
| + Handle* handle = reinterpret_cast<Handle*>(key);
|
| + HandleRecvFrom(handle, bytes, buffer);
|
| + break;
|
| + }
|
| + case OverlappedBuffer::kWrite:
|
| + case OverlappedBuffer::kSendTo: {
|
| Handle* handle = reinterpret_cast<Handle*>(key);
|
| HandleWrite(handle, bytes, buffer);
|
| break;
|
|
|