Index: runtime/bin/eventhandler_win.cc |
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc |
index eedcf167da2c6db3b478c657c1d2db5660453d31..e961581be4827226a2fc72e8be21be5d325150ac 100644 |
--- a/runtime/bin/eventhandler_win.cc |
+++ b/runtime/bin/eventhandler_win.cc |
@@ -115,8 +115,10 @@ int OverlappedBuffer::GetRemainingLength() { |
return data_length_ - index_; |
} |
+ |
Handle::Handle(intptr_t handle) |
- : DescriptorInfoBase(handle), |
+ : ReferenceCounted(), |
+ DescriptorInfoBase(handle), |
handle_(reinterpret_cast<HANDLE>(handle)), |
completion_port_(INVALID_HANDLE_VALUE), |
event_handler_(NULL), |
@@ -138,6 +140,10 @@ Handle::~Handle() { |
bool Handle::CreateCompletionPort(HANDLE completion_port) { |
+ ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
+ // A reference to the Handle is Retained by the IO completion port. |
+ // It is Released by DeleteIfClosed. |
+ Retain(); |
completion_port_ = CreateIoCompletionPort( |
handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0); |
return (completion_port_ != NULL); |
@@ -392,8 +398,16 @@ void Handle::HandleIssueError() { |
void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
MonitorLocker ml(monitor_); |
event_handler_ = event_handler; |
- if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) { |
- CreateCompletionPort(event_handler_->completion_port()); |
+ if (completion_port_ == INVALID_HANDLE_VALUE) { |
+ if (SupportsOverlappedIO()) { |
+ CreateCompletionPort(event_handler_->completion_port()); |
+ } else { |
+ // We need to retain the Handle even if overlapped IO is not supported. |
+ // It is Released by DeleteIfClosed after ReadSyncCompleteAsync |
+ // manually puts an event on the IO completion port. |
+ Retain(); |
+ completion_port_ = event_handler_->completion_port(); |
+ } |
} |
} |
@@ -546,9 +560,13 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
static void DeleteIfClosed(Handle* handle) { |
if (handle->IsClosed()) { |
+ handle->set_completion_port(INVALID_HANDLE_VALUE); |
+ handle->set_event_handler(NULL); |
handle->NotifyAllDartPorts(1 << kDestroyedEvent); |
handle->RemoveAllPorts(); |
- delete handle; |
+ // Once the Handle is closed, no further events on the IO completion port |
+ // will mention it. Thus, we can drop the reference here. |
+ handle->Release(); |
} |
} |
@@ -561,11 +579,22 @@ void ListenSocket::DoClose() { |
ClientSocket* client = Accept(); |
if (client != NULL) { |
client->Close(); |
+ // Release the reference from the list. |
+ // When an accept completes, we make a new ClientSocket (1 reference), |
+ // and add it to the IO completion port (1 more reference). If an |
+ // accepted connection is never requested by the Dart code, then |
+ // this list owns a reference (first Release), and the IO completion |
+ // port owns a reference, (second Release in DeleteIfClosed). |
+ client->Release(); |
DeleteIfClosed(client); |
} else { |
break; |
} |
} |
+ // To finish resetting the state of the ListenSocket back to what it was |
+ // before EnsureInitialized was called, we have to reset the AcceptEx_ |
+ // function pointer. |
+ AcceptEx_ = NULL; |
} |
@@ -792,6 +821,10 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
} |
if (!write_thread_exists_) { |
write_thread_exists_ = true; |
+ // The write thread gets a reference to the Handle, which it places in |
+ // the events it puts on the IO completion port. The reference is |
+ // Released by DeleteIfClosed. |
+ Retain(); |
int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this)); |
if (result != 0) { |
FATAL1("Failed to start write file thread %d", result); |
@@ -828,6 +861,11 @@ void StdHandle::DoClose() { |
} |
+#if defined(DEBUG) |
+intptr_t ClientSocket::disconnecting_ = 0; |
+#endif |
+ |
+ |
bool ClientSocket::LoadDisconnectEx() { |
// Load the DisconnectEx function into memory using WSAIoctl. |
GUID guid_disconnect_ex = WSAID_DISCONNECTEX; |
@@ -914,8 +952,18 @@ void ClientSocket::IssueDisconnect() { |
if (ok || (WSAGetLastError() != WSA_IO_PENDING)) { |
DisconnectComplete(buffer); |
} |
+ // When the Dart side receives this event, it may decide to close its Dart |
+ // ports. When all ports are closed, the VM will shut down. The EventHandler |
+ // will then shut down. If the EventHandler shuts down before this |
+ // asynchronous disconnect finishes, this ClientSocket will be leaked. |
+ // TODO(dart:io): Retain a list of client sockets that are in the process of |
+ // disconnecting. Disconnect them forcefully, and clean up their resources |
+ // when the EventHandler shuts down. |
NotifyAllDartPorts(1 << kDestroyedEvent); |
RemoveAllPorts(); |
+#if defined(DEBUG) |
+ disconnecting_++; |
+#endif |
} |
@@ -926,6 +974,9 @@ void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { |
OverlappedBuffer::DisposeBuffer(data_ready_); |
} |
mark_closed(); |
+#if defined(DEBUG) |
+ disconnecting_--; |
+#endif |
} |
@@ -1037,7 +1088,12 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
} else if (msg->id == kShutdownId) { |
shutdown_ = true; |
} else { |
- Handle* handle = reinterpret_cast<Handle*>(msg->id); |
+ Socket* socket = reinterpret_cast<Socket*>(msg->id); |
+ RefCntReleaseScope<Socket> rs(socket); |
+ if (socket->fd() == -1) { |
+ return; |
+ } |
+ Handle* handle = reinterpret_cast<Handle*>(socket->fd()); |
ASSERT(handle != NULL); |
if (handle->is_listen_socket()) { |
@@ -1062,9 +1118,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
// are listening on the same (address, port) combination. |
ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
MutexLocker locker(registry->mutex()); |
- if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { |
+ if (registry->CloseSafe(socket)) { |
ASSERT(listen_socket->Mask() == 0); |
listen_socket->Close(); |
+ socket->SetClosedFd(); |
} |
DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); |
@@ -1132,6 +1189,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
} else if (IS_COMMAND(msg->data, kCloseCommand)) { |
handle->SetPortAndMask(msg->dart_port, 0); |
handle->Close(); |
+ socket->SetClosedFd(); |
} else { |
UNREACHABLE(); |
} |
@@ -1313,6 +1371,44 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, |
} |
+void EventHandlerImplementation::HandleCompletionOrInterrupt( |
+ BOOL ok, |
+ DWORD bytes, |
+ ULONG_PTR key, |
+ OVERLAPPED* overlapped) { |
+ if (!ok) { |
+ // Treat ERROR_CONNECTION_ABORTED as connection closed. |
+ // The error ERROR_OPERATION_ABORTED is set for pending |
+ // accept requests for a listen socket which is closed. |
+ // ERROR_NETNAME_DELETED occurs when the client closes |
+ // the socket it is reading from. |
+ DWORD last_error = GetLastError(); |
+ if ((last_error == ERROR_CONNECTION_ABORTED) || |
+ (last_error == ERROR_OPERATION_ABORTED) || |
+ (last_error == ERROR_NETNAME_DELETED) || |
+ (last_error == ERROR_BROKEN_PIPE)) { |
+ ASSERT(bytes == 0); |
+ HandleIOCompletion(bytes, key, overlapped); |
+ } else if (last_error == ERROR_MORE_DATA) { |
+ // Don't ASSERT no bytes in this case. This can happen if the receive |
+ // buffer for datagram sockets is too small to contain a full datagram, |
+ // and in this case bytes hold the bytes that was read. |
+ HandleIOCompletion(-1, key, overlapped); |
+ } else { |
+ ASSERT(bytes == 0); |
+ HandleIOCompletion(-1, key, overlapped); |
+ } |
+ } else if (key == NULL) { |
+ // A key of NULL signals an interrupt message. |
+ InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); |
+ HandleInterrupt(msg); |
+ delete msg; |
+ } else { |
+ HandleIOCompletion(bytes, key, overlapped); |
+ } |
+} |
+ |
+ |
EventHandlerImplementation::EventHandlerImplementation() { |
startup_monitor_ = new Monitor(); |
handler_thread_id_ = Thread::kInvalidThreadId; |
@@ -1374,19 +1470,20 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) { |
ml.Notify(); |
} |
+ DWORD bytes; |
+ ULONG_PTR key; |
+ OVERLAPPED* overlapped; |
+ BOOL ok; |
while (!handler_impl->shutdown_) { |
- DWORD bytes; |
- ULONG_PTR key; |
- OVERLAPPED* overlapped; |
int64_t millis = handler_impl->GetTimeout(); |
ASSERT(millis == kInfinityTimeout || millis >= 0); |
if (millis > kMaxInt32) { |
millis = kMaxInt32; |
} |
ASSERT(sizeof(int32_t) == sizeof(DWORD)); |
- BOOL ok = |
- GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key, |
- &overlapped, static_cast<DWORD>(millis)); |
+ DWORD timeout = static_cast<DWORD>(millis); |
+ ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
+ &key, &overlapped, timeout); |
if (!ok && (overlapped == NULL)) { |
if (GetLastError() == ERROR_ABANDONED_WAIT_0) { |
@@ -1397,37 +1494,35 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) { |
// Timeout is signalled by false result and NULL in overlapped. |
handler_impl->HandleTimeout(); |
} |
- } else if (!ok) { |
- // Treat ERROR_CONNECTION_ABORTED as connection closed. |
- // The error ERROR_OPERATION_ABORTED is set for pending |
- // accept requests for a listen socket which is closed. |
- // ERROR_NETNAME_DELETED occurs when the client closes |
- // the socket it is reading from. |
- DWORD last_error = GetLastError(); |
- if ((last_error == ERROR_CONNECTION_ABORTED) || |
- (last_error == ERROR_OPERATION_ABORTED) || |
- (last_error == ERROR_NETNAME_DELETED) || |
- (last_error == ERROR_BROKEN_PIPE)) { |
- ASSERT(bytes == 0); |
- handler_impl->HandleIOCompletion(bytes, key, overlapped); |
- } else if (last_error == ERROR_MORE_DATA) { |
- // Don't ASSERT no bytes in this case. This can happen if the receive |
- // buffer for datagram sockets is to small to contain a full datagram, |
- // and in this case bytes hold the bytes that was read. |
- handler_impl->HandleIOCompletion(-1, key, overlapped); |
- } else { |
- ASSERT(bytes == 0); |
- handler_impl->HandleIOCompletion(-1, key, overlapped); |
- } |
- } else if (key == NULL) { |
- // A key of NULL signals an interrupt message. |
- InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped); |
- handler_impl->HandleInterrupt(msg); |
- delete msg; |
} else { |
- handler_impl->HandleIOCompletion(bytes, key, overlapped); |
+ handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
} |
} |
+ |
+// In a Debug build, drain the IO completion port to make sure we aren't |
+// leaking any (non-disconnecting) Handles. In a Release build, we don't care |
+// because the VM is going down, and the asserts below are Debug-only. |
+#if defined(DEBUG) |
+ while (true) { |
+ ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, |
+ &key, &overlapped, 0); |
+ if (!ok && (overlapped == NULL)) { |
+ // There was an error or nothing is ready. Assume the port is drained. |
+ break; |
+ } |
+ handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped); |
+ } |
+#endif |
+ |
+ // The eventhandler thread is going down so there should be no more live |
+ // Handles or Sockets. |
+ // TODO(dart:io): It would be nice to be able to assert here that: |
+ // ReferenceCounted<Handle>::instances() == 0; |
+ // However, we cannot at the moment. See the TODO on: |
+ // ClientSocket::IssueDisconnect() |
+ DEBUG_ASSERT(ReferenceCounted<Handle>::instances() == |
+ ClientSocket::disconnecting()); |
+ DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); |
handler->NotifyShutdownDone(); |
} |