| Index: runtime/bin/eventhandler_win.cc
|
| diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
|
| index eedcf167da2c6db3b478c657c1d2db5660453d31..e961581be4827226a2fc72e8be21be5d325150ac 100644
|
| --- a/runtime/bin/eventhandler_win.cc
|
| +++ b/runtime/bin/eventhandler_win.cc
|
| @@ -115,8 +115,10 @@ int OverlappedBuffer::GetRemainingLength() {
|
| return data_length_ - index_;
|
| }
|
|
|
| +
|
| Handle::Handle(intptr_t handle)
|
| - : DescriptorInfoBase(handle),
|
| + : ReferenceCounted(),
|
| + DescriptorInfoBase(handle),
|
| handle_(reinterpret_cast<HANDLE>(handle)),
|
| completion_port_(INVALID_HANDLE_VALUE),
|
| event_handler_(NULL),
|
| @@ -138,6 +140,10 @@ Handle::~Handle() {
|
|
|
|
|
| bool Handle::CreateCompletionPort(HANDLE completion_port) {
|
| + ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
|
| + // A reference to the Handle is Retained by the IO completion port.
|
| + // It is Released by DeleteIfClosed.
|
| + Retain();
|
| completion_port_ = CreateIoCompletionPort(
|
| handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
|
| return (completion_port_ != NULL);
|
| @@ -392,8 +398,16 @@ void Handle::HandleIssueError() {
|
| void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
|
| MonitorLocker ml(monitor_);
|
| event_handler_ = event_handler;
|
| - if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) {
|
| - CreateCompletionPort(event_handler_->completion_port());
|
| + if (completion_port_ == INVALID_HANDLE_VALUE) {
|
| + if (SupportsOverlappedIO()) {
|
| + CreateCompletionPort(event_handler_->completion_port());
|
| + } else {
|
| + // We need to retain the Handle even if overlapped IO is not supported.
|
| + // It is Released by DeleteIfClosed after ReadSyncCompleteAsync
|
| + // manually puts an event on the IO completion port.
|
| + Retain();
|
| + completion_port_ = event_handler_->completion_port();
|
| + }
|
| }
|
| }
|
|
|
| @@ -546,9 +560,13 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
|
|
|
| static void DeleteIfClosed(Handle* handle) {
|
| if (handle->IsClosed()) {
|
| + handle->set_completion_port(INVALID_HANDLE_VALUE);
|
| + handle->set_event_handler(NULL);
|
| handle->NotifyAllDartPorts(1 << kDestroyedEvent);
|
| handle->RemoveAllPorts();
|
| - delete handle;
|
| + // Once the Handle is closed, no further events on the IO completion port
|
| + // will mention it. Thus, we can drop the reference here.
|
| + handle->Release();
|
| }
|
| }
|
|
|
| @@ -561,11 +579,22 @@ void ListenSocket::DoClose() {
|
| ClientSocket* client = Accept();
|
| if (client != NULL) {
|
| client->Close();
|
| + // Release the reference from the list.
|
| + // When an accept completes, we make a new ClientSocket (1 reference),
|
| + // and add it to the IO completion port (1 more reference). If an
|
| + // accepted connection is never requested by the Dart code, then
|
| + // this list owns a reference (first Release), and the IO completion
|
| + // port owns a reference, (second Release in DeleteIfClosed).
|
| + client->Release();
|
| DeleteIfClosed(client);
|
| } else {
|
| break;
|
| }
|
| }
|
| + // To finish resetting the state of the ListenSocket back to what it was
|
| + // before EnsureInitialized was called, we have to reset the AcceptEx_
|
| + // function pointer.
|
| + AcceptEx_ = NULL;
|
| }
|
|
|
|
|
| @@ -792,6 +821,10 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
|
| }
|
| if (!write_thread_exists_) {
|
| write_thread_exists_ = true;
|
| + // The write thread gets a reference to the Handle, which it places in
|
| + // the events it puts on the IO completion port. The reference is
|
| + // Released by DeleteIfClosed.
|
| + Retain();
|
| int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this));
|
| if (result != 0) {
|
| FATAL1("Failed to start write file thread %d", result);
|
| @@ -828,6 +861,11 @@ void StdHandle::DoClose() {
|
| }
|
|
|
|
|
| +#if defined(DEBUG)
|
| +intptr_t ClientSocket::disconnecting_ = 0;
|
| +#endif
|
| +
|
| +
|
| bool ClientSocket::LoadDisconnectEx() {
|
| // Load the DisconnectEx function into memory using WSAIoctl.
|
| GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
|
| @@ -914,8 +952,18 @@ void ClientSocket::IssueDisconnect() {
|
| if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
|
| DisconnectComplete(buffer);
|
| }
|
| + // When the Dart side receives this event, it may decide to close its Dart
|
| + // ports. When all ports are closed, the VM will shut down. The EventHandler
|
| + // will then shut down. If the EventHandler shuts down before this
|
| + // asynchronous disconnect finishes, this ClientSocket will be leaked.
|
| + // TODO(dart:io): Retain a list of client sockets that are in the process of
|
| + // disconnecting. Disconnect them forcefully, and clean up their resources
|
| + // when the EventHandler shuts down.
|
| NotifyAllDartPorts(1 << kDestroyedEvent);
|
| RemoveAllPorts();
|
| +#if defined(DEBUG)
|
| + disconnecting_++;
|
| +#endif
|
| }
|
|
|
|
|
| @@ -926,6 +974,9 @@ void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
|
| OverlappedBuffer::DisposeBuffer(data_ready_);
|
| }
|
| mark_closed();
|
| +#if defined(DEBUG)
|
| + disconnecting_--;
|
| +#endif
|
| }
|
|
|
|
|
| @@ -1037,7 +1088,12 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| } else if (msg->id == kShutdownId) {
|
| shutdown_ = true;
|
| } else {
|
| - Handle* handle = reinterpret_cast<Handle*>(msg->id);
|
| + Socket* socket = reinterpret_cast<Socket*>(msg->id);
|
| + RefCntReleaseScope<Socket> rs(socket);
|
| + if (socket->fd() == -1) {
|
| + return;
|
| + }
|
| + Handle* handle = reinterpret_cast<Handle*>(socket->fd());
|
| ASSERT(handle != NULL);
|
|
|
| if (handle->is_listen_socket()) {
|
| @@ -1062,9 +1118,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| // are listening on the same (address, port) combination.
|
| ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
|
| MutexLocker locker(registry->mutex());
|
| - if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) {
|
| + if (registry->CloseSafe(socket)) {
|
| ASSERT(listen_socket->Mask() == 0);
|
| listen_socket->Close();
|
| + socket->SetClosedFd();
|
| }
|
|
|
| DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
|
| @@ -1132,6 +1189,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| } else if (IS_COMMAND(msg->data, kCloseCommand)) {
|
| handle->SetPortAndMask(msg->dart_port, 0);
|
| handle->Close();
|
| + socket->SetClosedFd();
|
| } else {
|
| UNREACHABLE();
|
| }
|
| @@ -1313,6 +1371,44 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
|
| }
|
|
|
|
|
| +void EventHandlerImplementation::HandleCompletionOrInterrupt(
|
| + BOOL ok,
|
| + DWORD bytes,
|
| + ULONG_PTR key,
|
| + OVERLAPPED* overlapped) {
|
| + if (!ok) {
|
| + // Treat ERROR_CONNECTION_ABORTED as connection closed.
|
| + // The error ERROR_OPERATION_ABORTED is set for pending
|
| + // accept requests for a listen socket which is closed.
|
| + // ERROR_NETNAME_DELETED occurs when the client closes
|
| + // the socket it is reading from.
|
| + DWORD last_error = GetLastError();
|
| + if ((last_error == ERROR_CONNECTION_ABORTED) ||
|
| + (last_error == ERROR_OPERATION_ABORTED) ||
|
| + (last_error == ERROR_NETNAME_DELETED) ||
|
| + (last_error == ERROR_BROKEN_PIPE)) {
|
| + ASSERT(bytes == 0);
|
| + HandleIOCompletion(bytes, key, overlapped);
|
| + } else if (last_error == ERROR_MORE_DATA) {
|
| + // Don't ASSERT no bytes in this case. This can happen if the receive
|
| + // buffer for datagram sockets is too small to contain a full datagram,
|
| + // and in this case bytes hold the bytes that was read.
|
| + HandleIOCompletion(-1, key, overlapped);
|
| + } else {
|
| + ASSERT(bytes == 0);
|
| + HandleIOCompletion(-1, key, overlapped);
|
| + }
|
| + } else if (key == NULL) {
|
| + // A key of NULL signals an interrupt message.
|
| + InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
|
| + HandleInterrupt(msg);
|
| + delete msg;
|
| + } else {
|
| + HandleIOCompletion(bytes, key, overlapped);
|
| + }
|
| +}
|
| +
|
| +
|
| EventHandlerImplementation::EventHandlerImplementation() {
|
| startup_monitor_ = new Monitor();
|
| handler_thread_id_ = Thread::kInvalidThreadId;
|
| @@ -1374,19 +1470,20 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
|
| ml.Notify();
|
| }
|
|
|
| + DWORD bytes;
|
| + ULONG_PTR key;
|
| + OVERLAPPED* overlapped;
|
| + BOOL ok;
|
| while (!handler_impl->shutdown_) {
|
| - DWORD bytes;
|
| - ULONG_PTR key;
|
| - OVERLAPPED* overlapped;
|
| int64_t millis = handler_impl->GetTimeout();
|
| ASSERT(millis == kInfinityTimeout || millis >= 0);
|
| if (millis > kMaxInt32) {
|
| millis = kMaxInt32;
|
| }
|
| ASSERT(sizeof(int32_t) == sizeof(DWORD));
|
| - BOOL ok =
|
| - GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key,
|
| - &overlapped, static_cast<DWORD>(millis));
|
| + DWORD timeout = static_cast<DWORD>(millis);
|
| + ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
|
| + &key, &overlapped, timeout);
|
|
|
| if (!ok && (overlapped == NULL)) {
|
| if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
|
| @@ -1397,37 +1494,35 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
|
| // Timeout is signalled by false result and NULL in overlapped.
|
| handler_impl->HandleTimeout();
|
| }
|
| - } else if (!ok) {
|
| - // Treat ERROR_CONNECTION_ABORTED as connection closed.
|
| - // The error ERROR_OPERATION_ABORTED is set for pending
|
| - // accept requests for a listen socket which is closed.
|
| - // ERROR_NETNAME_DELETED occurs when the client closes
|
| - // the socket it is reading from.
|
| - DWORD last_error = GetLastError();
|
| - if ((last_error == ERROR_CONNECTION_ABORTED) ||
|
| - (last_error == ERROR_OPERATION_ABORTED) ||
|
| - (last_error == ERROR_NETNAME_DELETED) ||
|
| - (last_error == ERROR_BROKEN_PIPE)) {
|
| - ASSERT(bytes == 0);
|
| - handler_impl->HandleIOCompletion(bytes, key, overlapped);
|
| - } else if (last_error == ERROR_MORE_DATA) {
|
| - // Don't ASSERT no bytes in this case. This can happen if the receive
|
| - // buffer for datagram sockets is to small to contain a full datagram,
|
| - // and in this case bytes hold the bytes that was read.
|
| - handler_impl->HandleIOCompletion(-1, key, overlapped);
|
| - } else {
|
| - ASSERT(bytes == 0);
|
| - handler_impl->HandleIOCompletion(-1, key, overlapped);
|
| - }
|
| - } else if (key == NULL) {
|
| - // A key of NULL signals an interrupt message.
|
| - InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
|
| - handler_impl->HandleInterrupt(msg);
|
| - delete msg;
|
| } else {
|
| - handler_impl->HandleIOCompletion(bytes, key, overlapped);
|
| + handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
|
| }
|
| }
|
| +
|
| +// In a Debug build, drain the IO completion port to make sure we aren't
|
| +// leaking any (non-disconnecting) Handles. In a Release build, we don't care
|
| +// because the VM is going down, and the asserts below are Debug-only.
|
| +#if defined(DEBUG)
|
| + while (true) {
|
| + ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
|
| + &key, &overlapped, 0);
|
| + if (!ok && (overlapped == NULL)) {
|
| + // There was an error or nothing is ready. Assume the port is drained.
|
| + break;
|
| + }
|
| + handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
|
| + }
|
| +#endif
|
| +
|
| + // The eventhandler thread is going down so there should be no more live
|
| + // Handles or Sockets.
|
| + // TODO(dart:io): It would be nice to be able to assert here that:
|
| + // ReferenceCounted<Handle>::instances() == 0;
|
| + // However, we cannot at the moment. See the TODO on:
|
| + // ClientSocket::IssueDisconnect()
|
| + DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
|
| + ClientSocket::disconnecting());
|
| + DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
|
| handler->NotifyShutdownDone();
|
| }
|
|
|
|
|