Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Unified Diff: runtime/bin/eventhandler_win.cc

Issue 2760293002: [dart:io] Adds a finalizer to _NativeSocket to avoid socket leaks (Closed)
Patch Set: Address comments Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/eventhandler_win.cc
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
index eedcf167da2c6db3b478c657c1d2db5660453d31..e961581be4827226a2fc72e8be21be5d325150ac 100644
--- a/runtime/bin/eventhandler_win.cc
+++ b/runtime/bin/eventhandler_win.cc
@@ -115,8 +115,10 @@ int OverlappedBuffer::GetRemainingLength() {
return data_length_ - index_;
}
+
Handle::Handle(intptr_t handle)
- : DescriptorInfoBase(handle),
+ : ReferenceCounted(),
+ DescriptorInfoBase(handle),
handle_(reinterpret_cast<HANDLE>(handle)),
completion_port_(INVALID_HANDLE_VALUE),
event_handler_(NULL),
@@ -138,6 +140,10 @@ Handle::~Handle() {
bool Handle::CreateCompletionPort(HANDLE completion_port) {
+ ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
+ // A reference to the Handle is Retained by the IO completion port.
+ // It is Released by DeleteIfClosed.
+ Retain();
completion_port_ = CreateIoCompletionPort(
handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
return (completion_port_ != NULL);
@@ -392,8 +398,16 @@ void Handle::HandleIssueError() {
void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
MonitorLocker ml(monitor_);
event_handler_ = event_handler;
- if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) {
- CreateCompletionPort(event_handler_->completion_port());
+ if (completion_port_ == INVALID_HANDLE_VALUE) {
+ if (SupportsOverlappedIO()) {
+ CreateCompletionPort(event_handler_->completion_port());
+ } else {
+ // We need to retain the Handle even if overlapped IO is not supported.
+ // It is Released by DeleteIfClosed after ReadSyncCompleteAsync
+ // manually puts an event on the IO completion port.
+ Retain();
+ completion_port_ = event_handler_->completion_port();
+ }
}
}
@@ -546,9 +560,13 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
static void DeleteIfClosed(Handle* handle) {
if (handle->IsClosed()) {
+ handle->set_completion_port(INVALID_HANDLE_VALUE);
+ handle->set_event_handler(NULL);
handle->NotifyAllDartPorts(1 << kDestroyedEvent);
handle->RemoveAllPorts();
- delete handle;
+ // Once the Handle is closed, no further events on the IO completion port
+ // will mention it. Thus, we can drop the reference here.
+ handle->Release();
}
}
@@ -561,11 +579,22 @@ void ListenSocket::DoClose() {
ClientSocket* client = Accept();
if (client != NULL) {
client->Close();
+ // Release the reference from the list.
+ // When an accept completes, we make a new ClientSocket (1 reference),
+ // and add it to the IO completion port (1 more reference). If an
+ // accepted connection is never requested by the Dart code, then
+ // this list owns a reference (first Release), and the IO completion
+ // port owns a reference, (second Release in DeleteIfClosed).
+ client->Release();
DeleteIfClosed(client);
} else {
break;
}
}
+ // To finish resetting the state of the ListenSocket back to what it was
+ // before EnsureInitialized was called, we have to reset the AcceptEx_
+ // function pointer.
+ AcceptEx_ = NULL;
}
@@ -792,6 +821,10 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
}
if (!write_thread_exists_) {
write_thread_exists_ = true;
+ // The write thread gets a reference to the Handle, which it places in
+ // the events it puts on the IO completion port. The reference is
+ // Released by DeleteIfClosed.
+ Retain();
int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start write file thread %d", result);
@@ -828,6 +861,11 @@ void StdHandle::DoClose() {
}
+#if defined(DEBUG)
+intptr_t ClientSocket::disconnecting_ = 0;
+#endif
+
+
bool ClientSocket::LoadDisconnectEx() {
// Load the DisconnectEx function into memory using WSAIoctl.
GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
@@ -914,8 +952,18 @@ void ClientSocket::IssueDisconnect() {
if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
DisconnectComplete(buffer);
}
+ // When the Dart side receives this event, it may decide to close its Dart
+ // ports. When all ports are closed, the VM will shut down. The EventHandler
+ // will then shut down. If the EventHandler shuts down before this
+ // asynchronous disconnect finishes, this ClientSocket will be leaked.
+ // TODO(dart:io): Retain a list of client sockets that are in the process of
+ // disconnecting. Disconnect them forcefully, and clean up their resources
+ // when the EventHandler shuts down.
NotifyAllDartPorts(1 << kDestroyedEvent);
RemoveAllPorts();
+#if defined(DEBUG)
+ disconnecting_++;
+#endif
}
@@ -926,6 +974,9 @@ void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
OverlappedBuffer::DisposeBuffer(data_ready_);
}
mark_closed();
+#if defined(DEBUG)
+ disconnecting_--;
+#endif
}
@@ -1037,7 +1088,12 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
} else if (msg->id == kShutdownId) {
shutdown_ = true;
} else {
- Handle* handle = reinterpret_cast<Handle*>(msg->id);
+ Socket* socket = reinterpret_cast<Socket*>(msg->id);
+ RefCntReleaseScope<Socket> rs(socket);
+ if (socket->fd() == -1) {
+ return;
+ }
+ Handle* handle = reinterpret_cast<Handle*>(socket->fd());
ASSERT(handle != NULL);
if (handle->is_listen_socket()) {
@@ -1062,9 +1118,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
// are listening on the same (address, port) combination.
ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
MutexLocker locker(registry->mutex());
- if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) {
+ if (registry->CloseSafe(socket)) {
ASSERT(listen_socket->Mask() == 0);
listen_socket->Close();
+ socket->SetClosedFd();
}
DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
@@ -1132,6 +1189,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
} else if (IS_COMMAND(msg->data, kCloseCommand)) {
handle->SetPortAndMask(msg->dart_port, 0);
handle->Close();
+ socket->SetClosedFd();
} else {
UNREACHABLE();
}
@@ -1313,6 +1371,44 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
}
+void EventHandlerImplementation::HandleCompletionOrInterrupt(
+ BOOL ok,
+ DWORD bytes,
+ ULONG_PTR key,
+ OVERLAPPED* overlapped) {
+ if (!ok) {
+ // Treat ERROR_CONNECTION_ABORTED as connection closed.
+ // The error ERROR_OPERATION_ABORTED is set for pending
+ // accept requests for a listen socket which is closed.
+ // ERROR_NETNAME_DELETED occurs when the client closes
+ // the socket it is reading from.
+ DWORD last_error = GetLastError();
+ if ((last_error == ERROR_CONNECTION_ABORTED) ||
+ (last_error == ERROR_OPERATION_ABORTED) ||
+ (last_error == ERROR_NETNAME_DELETED) ||
+ (last_error == ERROR_BROKEN_PIPE)) {
+ ASSERT(bytes == 0);
+ HandleIOCompletion(bytes, key, overlapped);
+ } else if (last_error == ERROR_MORE_DATA) {
+ // Don't ASSERT no bytes in this case. This can happen if the receive
+ // buffer for datagram sockets is too small to contain a full datagram,
+ // and in this case bytes hold the bytes that was read.
+ HandleIOCompletion(-1, key, overlapped);
+ } else {
+ ASSERT(bytes == 0);
+ HandleIOCompletion(-1, key, overlapped);
+ }
+ } else if (key == NULL) {
+ // A key of NULL signals an interrupt message.
+ InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
+ HandleInterrupt(msg);
+ delete msg;
+ } else {
+ HandleIOCompletion(bytes, key, overlapped);
+ }
+}
+
+
EventHandlerImplementation::EventHandlerImplementation() {
startup_monitor_ = new Monitor();
handler_thread_id_ = Thread::kInvalidThreadId;
@@ -1374,19 +1470,20 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
ml.Notify();
}
+ DWORD bytes;
+ ULONG_PTR key;
+ OVERLAPPED* overlapped;
+ BOOL ok;
while (!handler_impl->shutdown_) {
- DWORD bytes;
- ULONG_PTR key;
- OVERLAPPED* overlapped;
int64_t millis = handler_impl->GetTimeout();
ASSERT(millis == kInfinityTimeout || millis >= 0);
if (millis > kMaxInt32) {
millis = kMaxInt32;
}
ASSERT(sizeof(int32_t) == sizeof(DWORD));
- BOOL ok =
- GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key,
- &overlapped, static_cast<DWORD>(millis));
+ DWORD timeout = static_cast<DWORD>(millis);
+ ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
+ &key, &overlapped, timeout);
if (!ok && (overlapped == NULL)) {
if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
@@ -1397,37 +1494,35 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
// Timeout is signalled by false result and NULL in overlapped.
handler_impl->HandleTimeout();
}
- } else if (!ok) {
- // Treat ERROR_CONNECTION_ABORTED as connection closed.
- // The error ERROR_OPERATION_ABORTED is set for pending
- // accept requests for a listen socket which is closed.
- // ERROR_NETNAME_DELETED occurs when the client closes
- // the socket it is reading from.
- DWORD last_error = GetLastError();
- if ((last_error == ERROR_CONNECTION_ABORTED) ||
- (last_error == ERROR_OPERATION_ABORTED) ||
- (last_error == ERROR_NETNAME_DELETED) ||
- (last_error == ERROR_BROKEN_PIPE)) {
- ASSERT(bytes == 0);
- handler_impl->HandleIOCompletion(bytes, key, overlapped);
- } else if (last_error == ERROR_MORE_DATA) {
- // Don't ASSERT no bytes in this case. This can happen if the receive
- // buffer for datagram sockets is to small to contain a full datagram,
- // and in this case bytes hold the bytes that was read.
- handler_impl->HandleIOCompletion(-1, key, overlapped);
- } else {
- ASSERT(bytes == 0);
- handler_impl->HandleIOCompletion(-1, key, overlapped);
- }
- } else if (key == NULL) {
- // A key of NULL signals an interrupt message.
- InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
- handler_impl->HandleInterrupt(msg);
- delete msg;
} else {
- handler_impl->HandleIOCompletion(bytes, key, overlapped);
+ handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
}
}
+
+// In a Debug build, drain the IO completion port to make sure we aren't
+// leaking any (non-disconnecting) Handles. In a Release build, we don't care
+// because the VM is going down, and the asserts below are Debug-only.
+#if defined(DEBUG)
+ while (true) {
+ ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
+ &key, &overlapped, 0);
+ if (!ok && (overlapped == NULL)) {
+ // There was an error or nothing is ready. Assume the port is drained.
+ break;
+ }
+ handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
+ }
+#endif
+
+ // The eventhandler thread is going down so there should be no more live
+ // Handles or Sockets.
+ // TODO(dart:io): It would be nice to be able to assert here that:
+ // ReferenceCounted<Handle>::instances() == 0;
+ // However, we cannot at the moment. See the TODO on:
+ // ClientSocket::IssueDisconnect()
+ DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
+ ClientSocket::disconnecting());
+ DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698