Chromium Code Reviews| Index: runtime/bin/eventhandler_win.cc | 
| diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc | 
| index 334d496ee677915af4ca78d7be0103fec852e6fb..28add7ffd4c9c3a515a17a19d42a1100d94dd732 100644 | 
| --- a/runtime/bin/eventhandler_win.cc | 
| +++ b/runtime/bin/eventhandler_win.cc | 
| @@ -33,8 +33,14 @@ static const int kShutdownId = -2; | 
| OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size, | 
| Operation operation) { | 
| + int actual_size = buffer_size; | 
| + // For calling recvfrom additional buffer space is needed for the source | 
| + // address information. | 
| + if (operation == kRecvFrom) { | 
| 
 
Anders Johnsen
2013/12/12 12:19:58
Move this to AllocateRecvFromBuffer?
 
Søren Gjesse
2013/12/12 15:44:39
Done.
 
 | 
| + actual_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage); | 
| 
 
Anders Johnsen
2013/12/12 12:19:58
Make the size a constant in OverlappedBuffer.
 
Søren Gjesse
2013/12/12 15:44:39
Done.
 
 | 
| + } | 
| OverlappedBuffer* buffer = | 
| - new(buffer_size) OverlappedBuffer(buffer_size, operation); | 
| + new(actual_size) OverlappedBuffer(actual_size, operation); | 
| return buffer; | 
| } | 
| @@ -50,11 +56,21 @@ OverlappedBuffer* OverlappedBuffer::AllocateReadBuffer(int buffer_size) { | 
| } | 
| +OverlappedBuffer* OverlappedBuffer::AllocateRecvFromBuffer(int buffer_size) { | 
| + return AllocateBuffer(buffer_size, kRecvFrom); | 
| +} | 
| + | 
| + | 
| OverlappedBuffer* OverlappedBuffer::AllocateWriteBuffer(int buffer_size) { | 
| return AllocateBuffer(buffer_size, kWrite); | 
| } | 
| +OverlappedBuffer* OverlappedBuffer::AllocateSendToBuffer(int buffer_size) { | 
| + return AllocateBuffer(buffer_size, kSendTo); | 
| +} | 
| + | 
| + | 
| OverlappedBuffer* OverlappedBuffer::AllocateDisconnectBuffer() { | 
| return AllocateBuffer(0, kDisconnect); | 
| } | 
| @@ -91,7 +107,7 @@ int OverlappedBuffer::Write(const void* buffer, int num_bytes) { | 
| int OverlappedBuffer::GetRemainingLength() { | 
| - ASSERT(operation_ == kRead); | 
| + ASSERT(operation_ == kRead || operation_ == kRecvFrom); | 
| return data_length_ - index_; | 
| } | 
| @@ -199,6 +215,11 @@ void Handle::ReadComplete(OverlappedBuffer* buffer) { | 
| } | 
| +void Handle::RecvFromComplete(OverlappedBuffer* buffer) { | 
| + ReadComplete(buffer); | 
| +} | 
| + | 
| + | 
| void Handle::WriteComplete(OverlappedBuffer* buffer) { | 
| ScopedLock lock(this); | 
| // Currently only one outstanding write at the time. | 
| @@ -279,6 +300,11 @@ bool Handle::IssueRead() { | 
| } | 
| +bool Handle::IssueRecvFrom() { | 
| + return false; | 
| +} | 
| + | 
| + | 
| bool Handle::IssueWrite() { | 
| ScopedLock lock(this); | 
| ASSERT(type_ != kListenSocket); | 
| @@ -303,6 +329,11 @@ bool Handle::IssueWrite() { | 
| } | 
| +bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { | 
| + return false; | 
| +} | 
| + | 
| + | 
| void Handle::HandleIssueError() { | 
| DWORD error = GetLastError(); | 
| if (error == ERROR_BROKEN_PIPE) { | 
| @@ -543,6 +574,27 @@ int Handle::Read(void* buffer, int num_bytes) { | 
| } | 
| +int Handle::RecvFrom( | 
| + void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) { | 
| + ScopedLock lock(this); | 
| + if (data_ready_ == NULL) return 0; | 
| + num_bytes = data_ready_->Read(buffer, num_bytes); | 
| 
 
Anders Johnsen
2013/12/12 12:19:58
Will this allow us to read only a part of a UDP pa
 
Søren Gjesse
2013/12/12 15:44:39
It will, I changed it to dispose of the rest, so t
 
 | 
| + if (data_ready_->from()->sa_family == AF_INET) { | 
| + ASSERT(sa_len >= sizeof(struct sockaddr_in)); | 
| + memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); | 
| + } else { | 
| + ASSERT(data_ready_->from()->sa_family == AF_INET6); | 
| + ASSERT(sa_len >= sizeof(struct sockaddr_in6)); | 
| + memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); | 
| + } | 
| + if (data_ready_->IsEmpty()) { | 
| + OverlappedBuffer::DisposeBuffer(data_ready_); | 
| + data_ready_ = NULL; | 
| + } | 
| + return num_bytes; | 
| +} | 
| + | 
| + | 
| int Handle::Write(const void* buffer, int num_bytes) { | 
| ScopedLock lock(this); | 
| if (pending_write_ != NULL) return 0; | 
| @@ -556,6 +608,20 @@ int Handle::Write(const void* buffer, int num_bytes) { | 
| } | 
| +int Handle::SendTo( | 
| + const void* buffer, int num_bytes, struct sockaddr* sa, socklen_t sa_len) { | 
| + ScopedLock lock(this); | 
| + if (pending_write_ != NULL) return 0; | 
| + if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 
| + ASSERT(SupportsOverlappedIO()); | 
| + if (completion_port_ == INVALID_HANDLE_VALUE) return 0; | 
| + pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); | 
| + pending_write_->Write(buffer, num_bytes); | 
| + if (!IssueSendTo(sa, sa_len)) return -1; | 
| 
 
Anders Johnsen
2013/12/12 12:19:58
Can this leak the pending_write?
 
Søren Gjesse
2013/12/12 15:44:39
No, IssueSendTo will dispose of it before returnin
 
 | 
| + return num_bytes; | 
| +} | 
| + | 
| + | 
| static void WriteFileThread(uword args) { | 
| StdHandle* handle = reinterpret_cast<StdHandle*>(args); | 
| handle->RunWriteLoop(); | 
| @@ -707,7 +773,9 @@ bool ClientSocket::IssueRead() { | 
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 
| ASSERT(pending_read_ == NULL); | 
| - OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(1024); | 
| + // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can | 
| + // handle 64k datagrams. | 
| + OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); | 
| DWORD flags; | 
| flags = 0; | 
| @@ -791,6 +859,85 @@ bool ClientSocket::IsClosed() { | 
| } | 
| +bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { | 
| + ScopedLock lock(this); | 
| + ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 
| + ASSERT(pending_write_ != NULL); | 
| + ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); | 
| + | 
| + int rc = WSASendTo(socket(), | 
| + pending_write_->GetWASBUF(), | 
| + 1, | 
| + NULL, | 
| + 0, | 
| + sa, | 
| + sa_len, | 
| + pending_write_->GetCleanOverlapped(), | 
| + NULL); | 
| + if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 
| + return true; | 
| + } | 
| + OverlappedBuffer::DisposeBuffer(pending_write_); | 
| + pending_write_ = NULL; | 
| + HandleIssueError(); | 
| + return false; | 
| +} | 
| + | 
| + | 
| +bool DatagramSocket::IssueRecvFrom() { | 
| + ScopedLock lock(this); | 
| + ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 
| + ASSERT(pending_read_ == NULL); | 
| + | 
| + OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); | 
| + | 
| + DWORD flags; | 
| + flags = 0; | 
| + int len; | 
| + int rc = WSARecvFrom(socket(), | 
| + buffer->GetWASBUF(), | 
| + 1, | 
| + NULL, | 
| + &flags, | 
| + buffer->from(), | 
| + buffer->from_len_addr(), | 
| + buffer->GetCleanOverlapped(), | 
| + NULL); | 
| + if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 
| + pending_read_ = buffer; | 
| + return true; | 
| + } | 
| + OverlappedBuffer::DisposeBuffer(buffer); | 
| + pending_read_ = NULL; | 
| + HandleIssueError(); | 
| + return false; | 
| +} | 
| + | 
| + | 
| +void DatagramSocket::EnsureInitialized( | 
| + EventHandlerImplementation* event_handler) { | 
| + ScopedLock lock(this); | 
| + if (completion_port_ == INVALID_HANDLE_VALUE) { | 
| + ASSERT(event_handler_ == NULL); | 
| + event_handler_ = event_handler; | 
| + CreateCompletionPort(event_handler_->completion_port()); | 
| + } | 
| +} | 
| + | 
| + | 
| +bool DatagramSocket::IsClosed() { | 
| + return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 
| +} | 
| + | 
| + | 
| +void DatagramSocket::DoClose() { | 
| + // Just close the socket. This will cause any queued requests to be aborted. | 
| + closesocket(socket()); | 
| + MarkClosedRead(); | 
| + MarkClosedWrite(); | 
| +} | 
| + | 
| + | 
| static void DeleteIfClosed(Handle* handle) { | 
| if (handle->IsClosed()) { | 
| Dart_Port port = handle->port(); | 
| @@ -858,7 +1005,11 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { | 
| int event_mask = (1 << kCloseEvent); | 
| DartUtils::PostInt32(handle->port(), event_mask); | 
| } else if (!handle->HasPendingRead()) { | 
| - handle->IssueRead(); | 
| + if (handle->is_datagram_socket()) { | 
| + handle->IssueRecvFrom(); | 
| + } else { | 
| + handle->IssueRead(); | 
| + } | 
| } | 
| } | 
| @@ -953,6 +1104,23 @@ void EventHandlerImplementation::HandleRead(Handle* handle, | 
| } | 
| +void EventHandlerImplementation::HandleRecvFrom(Handle* handle, | 
| + int bytes, | 
| + OverlappedBuffer* buffer) { | 
| + ASSERT(handle->is_datagram_socket()); | 
| + buffer->set_data_length(bytes); | 
| + handle->ReadComplete(buffer); | 
| + if (!handle->IsClosing()) { | 
| + int event_mask = 1 << kInEvent; | 
| + if ((handle->mask() & event_mask) != 0) { | 
| + DartUtils::PostInt32(handle->port(), event_mask); | 
| + } | 
| + } | 
| + | 
| + DeleteIfClosed(handle); | 
| +} | 
| + | 
| + | 
| void EventHandlerImplementation::HandleWrite(Handle* handle, | 
| int bytes, | 
| OverlappedBuffer* buffer) { | 
| @@ -1004,7 +1172,13 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, | 
| HandleRead(handle, bytes, buffer); | 
| break; | 
| } | 
| - case OverlappedBuffer::kWrite: { | 
| + case OverlappedBuffer::kRecvFrom: { | 
| + Handle* handle = reinterpret_cast<Handle*>(key); | 
| + HandleRecvFrom(handle, bytes, buffer); | 
| + break; | 
| + } | 
| + case OverlappedBuffer::kWrite: | 
| + case OverlappedBuffer::kSendTo: { | 
| Handle* handle = reinterpret_cast<Handle*>(key); | 
| HandleWrite(handle, bytes, buffer); | 
| break; | 
| @@ -1101,6 +1275,8 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) { | 
| ASSERT(bytes == 0); | 
| handler_impl->HandleIOCompletion(bytes, key, overlapped); | 
| } else { | 
| + OverlappedBuffer* buffer = | 
| 
 
Anders Johnsen
2013/12/12 12:19:58
Unused?
 
Søren Gjesse
2013/12/12 15:44:39
Yes, removed.
 
 | 
| + OverlappedBuffer::GetFromOverlapped(overlapped); | 
| ASSERT(bytes == 0); | 
| handler_impl->HandleIOCompletion(-1, key, overlapped); | 
| } |