Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Unified Diff: runtime/bin/eventhandler_win.cc

Issue 1291163002: Join embeder threads on Windows. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: simplify locking Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/eventhandler_win.cc
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
index db698a3c8831e637cb17e15fbeef1c242548d152..641040c8d229e43981a2702e9dc2a8c0e8511adf 100644
--- a/runtime/bin/eventhandler_win.cc
+++ b/runtime/bin/eventhandler_win.cc
@@ -121,23 +121,16 @@ Handle::Handle(intptr_t handle)
pending_read_(NULL),
pending_write_(NULL),
last_error_(NOERROR),
- flags_(0) {
- InitializeCriticalSection(&cs_);
+ flags_(0),
+ read_thread_id_(Thread::kInvalidThreadId),
+ read_thread_starting_(false),
+ read_thread_finished_(false),
+ monitor_(new Monitor()) {
}
Handle::~Handle() {
- DeleteCriticalSection(&cs_);
-}
-
-
-void Handle::Lock() {
- EnterCriticalSection(&cs_);
-}
-
-
-void Handle::Unlock() {
- LeaveCriticalSection(&cs_);
+ delete monitor_;
}
@@ -154,7 +147,7 @@ bool Handle::CreateCompletionPort(HANDLE completion_port) {
void Handle::Close() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (!IsClosing()) {
// Close the socket and set the closing state. This close method can be
// called again if this socket has pending IO operations in flight.
@@ -175,28 +168,51 @@ void Handle::DoClose() {
bool Handle::HasPendingRead() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
return pending_read_ != NULL;
}
bool Handle::HasPendingWrite() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
return pending_write_ != NULL;
}
+void Handle::WaitForReadThreadFinished() {
+ // Join the Reader thread if there is one.
+ ThreadId to_join = Thread::kInvalidThreadId;
+ {
+ MonitorLocker ml(monitor_);
+ if (read_thread_id_ != Thread::kInvalidThreadId) {
+ while (!read_thread_finished_) {
+ ml.Wait();
+ }
+ read_thread_finished_ = false;
+ to_join = read_thread_id_;
+ read_thread_id_ = Thread::kInvalidThreadId;
+ }
+ }
+ if (to_join != Thread::kInvalidThreadId) {
+ Thread::Join(to_join);
+ }
+}
+
+
void Handle::ReadComplete(OverlappedBuffer* buffer) {
- ScopedLock lock(this);
- // Currently only one outstanding read at the time.
- ASSERT(pending_read_ == buffer);
- ASSERT(data_ready_ == NULL);
- if (!IsClosing() && !buffer->IsEmpty()) {
- data_ready_ = pending_read_;
- } else {
- OverlappedBuffer::DisposeBuffer(buffer);
+ {
+ MonitorLocker ml(monitor_);
+ // Currently only one outstanding read at the time.
+ ASSERT(pending_read_ == buffer);
+ ASSERT(data_ready_ == NULL);
+ if (!IsClosing() && !buffer->IsEmpty()) {
+ data_ready_ = pending_read_;
+ } else {
+ OverlappedBuffer::DisposeBuffer(buffer);
+ }
+ pending_read_ = NULL;
}
- pending_read_ = NULL;
+ WaitForReadThreadFinished();
}
@@ -206,7 +222,7 @@ void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
void Handle::WriteComplete(OverlappedBuffer* buffer) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
// Currently only one outstanding write at the time.
ASSERT(pending_write_ == buffer);
OverlappedBuffer::DisposeBuffer(buffer);
@@ -221,6 +237,7 @@ static void ReadFileThread(uword args) {
void Handle::ReadSyncCompleteAsync() {
+ NotifyReadThreadStarted();
ASSERT(pending_read_ != NULL);
ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
@@ -228,9 +245,10 @@ void Handle::ReadSyncCompleteAsync() {
if (GetFileType(handle_) == FILE_TYPE_CHAR) {
buffer_size = kStdOverlappedBufferSize;
}
+ char* buffer_start = pending_read_->GetBufferStart();
DWORD bytes_read = 0;
BOOL ok = ReadFile(handle_,
- pending_read_->GetBufferStart(),
+ buffer_start,
buffer_size,
&bytes_read,
NULL);
@@ -245,11 +263,42 @@ void Handle::ReadSyncCompleteAsync() {
if (!ok) {
FATAL("PostQueuedCompletionStatus failed");
}
+ NotifyReadThreadFinished();
+}
+
+
+void Handle::NotifyReadThreadStarted() {
+ MonitorLocker ml(monitor_);
+ ASSERT(!read_thread_starting_);
+ ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
+ read_thread_id_ = Thread::GetCurrentThreadId();
+ read_thread_starting_ = true;
+ ml.Notify();
+ while (read_thread_starting_) {
+ ml.Wait();
+ }
+}
+
+void Handle::NotifyReadThreadFinished() {
+ MonitorLocker ml(monitor_);
+ ASSERT(!read_thread_finished_);
+ ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
+ read_thread_finished_ = true;
+ ml.Notify();
+}
+
+
+void Handle::WaitForReadThreadStarted() {
+ while (!read_thread_starting_) {
+ monitor_->Wait(Monitor::kNoTimeout);
+ }
+ read_thread_starting_ = false;
+ monitor_->Notify();
+ ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
}
bool Handle::IssueRead() {
Søren Gjesse 2015/08/27 13:28:24 MonitorLocker ml(monitor_); here.
zra 2015/08/27 14:04:51 The monitor is already entered at calls to non-soc
- ScopedLock lock(this);
ASSERT(type_ != kListenSocket);
ASSERT(pending_read_ == NULL);
OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
@@ -277,6 +326,7 @@ bool Handle::IssueRead() {
if (result != 0) {
FATAL1("Failed to start read file thread %d", result);
}
+ WaitForReadThreadStarted();
return true;
}
}
@@ -288,7 +338,7 @@ bool Handle::IssueRecvFrom() {
bool Handle::IssueWrite() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
ASSERT(type_ != kListenSocket);
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
ASSERT(pending_write_ != NULL);
@@ -345,7 +395,7 @@ void Handle::HandleIssueError() {
void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
event_handler_ = event_handler;
if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) {
CreateCompletionPort(event_handler_->completion_port());
@@ -360,7 +410,7 @@ bool FileHandle::IsClosed() {
void DirectoryWatchHandle::EnsureInitialized(
EventHandlerImplementation* event_handler) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
event_handler_ = event_handler;
if (completion_port_ == INVALID_HANDLE_VALUE) {
CreateCompletionPort(event_handler_->completion_port());
@@ -374,7 +424,6 @@ bool DirectoryWatchHandle::IsClosed() {
bool DirectoryWatchHandle::IssueRead() {
Søren Gjesse 2015/08/27 13:28:24 MonitorLocker ml(monitor_); here as well.
zra 2015/08/27 14:04:51 Ditto.
- ScopedLock lock(this);
// It may have been started before, as we start the directory-handler when
// we create it.
if (pending_read_ != NULL || data_ready_ != NULL) return true;
@@ -399,7 +448,7 @@ bool DirectoryWatchHandle::IssueRead() {
void DirectoryWatchHandle::Stop() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
// Stop the outstanding read, so we can close the handle.
if (pending_read_ != NULL) {
@@ -443,7 +492,7 @@ bool ListenSocket::LoadAcceptEx() {
bool ListenSocket::IssueAccept() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
// For AcceptEx there needs to be buffer storage for address
// information for two addresses (local and remote address). The
@@ -483,7 +532,7 @@ bool ListenSocket::IssueAccept() {
void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
HANDLE completion_port) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (!IsClosing()) {
// Update the accepted socket to support the full range of API calls.
SOCKET s = socket();
@@ -544,13 +593,13 @@ void ListenSocket::DoClose() {
bool ListenSocket::CanAccept() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
return accepted_head_ != NULL;
}
ClientSocket* ListenSocket::Accept() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
ClientSocket *result = NULL;
@@ -577,7 +626,7 @@ ClientSocket* ListenSocket::Accept() {
void ListenSocket::EnsureInitialized(
EventHandlerImplementation* event_handler) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (AcceptEx_ == NULL) {
ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
ASSERT(event_handler_ == NULL);
@@ -594,7 +643,7 @@ bool ListenSocket::IsClosed() {
intptr_t Handle::Available() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (data_ready_ == NULL) return 0;
ASSERT(!data_ready_->IsEmpty());
return data_ready_->GetRemainingLength();
@@ -602,7 +651,7 @@ intptr_t Handle::Available() {
intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (data_ready_ == NULL) return 0;
num_bytes = data_ready_->Read(
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
@@ -617,7 +666,7 @@ intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
intptr_t Handle::RecvFrom(
void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (data_ready_ == NULL) return 0;
num_bytes = data_ready_->Read(
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
@@ -639,7 +688,7 @@ intptr_t Handle::RecvFrom(
intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (pending_write_ != NULL) return 0;
if (num_bytes > kBufferSize) num_bytes = kBufferSize;
ASSERT(SupportsOverlappedIO());
@@ -656,7 +705,7 @@ intptr_t Handle::SendTo(const void* buffer,
intptr_t num_bytes,
struct sockaddr* sa,
socklen_t sa_len) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (pending_write_ != NULL) return 0;
if (num_bytes > kBufferSize) num_bytes = kBufferSize;
ASSERT(SupportsOverlappedIO());
@@ -675,13 +724,14 @@ static void WriteFileThread(uword args) {
void StdHandle::RunWriteLoop() {
- write_monitor_->Enter();
+ MonitorLocker ml(monitor_);
write_thread_running_ = true;
+ thread_id_ = Thread::GetCurrentThreadId();
// Notify we have started.
- write_monitor_->Notify();
+ ml.Notify();
while (write_thread_running_) {
- write_monitor_->Wait(Monitor::kNoTimeout);
+ ml.Wait(Monitor::kNoTimeout);
if (pending_write_ != NULL) {
// We woke up and had a pending write. Execute it.
WriteSyncCompleteAsync();
@@ -689,8 +739,7 @@ void StdHandle::RunWriteLoop() {
}
write_thread_exists_ = false;
- write_monitor_->Notify();
- write_monitor_->Exit();
+ ml.Notify();
}
@@ -718,7 +767,7 @@ void StdHandle::WriteSyncCompleteAsync() {
}
intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (pending_write_ != NULL) return 0;
if (num_bytes > kBufferSize) num_bytes = kBufferSize;
// In the case of stdout and stderr, OverlappedIO is not supported.
@@ -726,7 +775,6 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
// This code is actually never exposed to the user, as stdout and stderr is
// not available as a RawSocket, but only wrapped in a Socket.
// Note that we return '0', unless a thread have already completed a write.
- MonitorLocker locker(write_monitor_);
if (thread_wrote_ > 0) {
if (num_bytes > thread_wrote_) num_bytes = thread_wrote_;
thread_wrote_ -= num_bytes;
@@ -734,14 +782,14 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
}
if (!write_thread_exists_) {
write_thread_exists_ = true;
- int result = Thread::Start(WriteFileThread,
- reinterpret_cast<uword>(this));
+ int result = Thread::Start(
+ WriteFileThread, reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start write file thread %d", result);
}
while (!write_thread_running_) {
// Wait until we the thread is running.
- locker.Wait(Monitor::kNoTimeout);
+ ml.Wait(Monitor::kNoTimeout);
}
}
// Only queue up to INT_MAX bytes.
@@ -749,19 +797,20 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
// Create buffer and notify thread about the new handle.
pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
pending_write_->Write(buffer, truncated_bytes);
- locker.Notify();
+ ml.Notify();
return 0;
}
void StdHandle::DoClose() {
- MonitorLocker locker(write_monitor_);
+ MonitorLocker ml(monitor_);
if (write_thread_exists_) {
write_thread_running_ = false;
- locker.Notify();
+ ml.Notify();
while (write_thread_exists_) {
- locker.Wait(Monitor::kNoTimeout);
+ ml.Wait(Monitor::kNoTimeout);
}
+ Thread::Join(thread_id_);
}
Handle::DoClose();
}
@@ -807,7 +856,7 @@ void ClientSocket::DoClose() {
bool ClientSocket::IssueRead() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
ASSERT(pending_read_ == NULL);
@@ -836,7 +885,7 @@ bool ClientSocket::IssueRead() {
bool ClientSocket::IssueWrite() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
ASSERT(pending_write_ != NULL);
ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
@@ -900,7 +949,7 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
void ClientSocket::EnsureInitialized(
EventHandlerImplementation* event_handler) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (completion_port_ == INVALID_HANDLE_VALUE) {
ASSERT(event_handler_ == NULL);
event_handler_ = event_handler;
@@ -915,7 +964,7 @@ bool ClientSocket::IsClosed() {
bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
ASSERT(pending_write_ != NULL);
ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
@@ -940,7 +989,7 @@ bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
bool DatagramSocket::IssueRecvFrom() {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
ASSERT(pending_read_ == NULL);
@@ -970,7 +1019,7 @@ bool DatagramSocket::IssueRecvFrom() {
void DatagramSocket::EnsureInitialized(
EventHandlerImplementation* event_handler) {
- ScopedLock lock(this);
+ MonitorLocker ml(monitor_);
if (completion_port_ == INVALID_HANDLE_VALUE) {
ASSERT(event_handler_ == NULL);
event_handler_ = event_handler;
@@ -1010,7 +1059,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
reinterpret_cast<ListenSocket*>(handle);
listen_socket->EnsureInitialized(this);
- Handle::ScopedLock lock(listen_socket);
+ MonitorLocker ml(listen_socket->monitor_);
if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
@@ -1040,7 +1089,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
}
} else {
handle->EnsureInitialized(this);
- Handle::ScopedLock lock(handle);
+ MonitorLocker ml(handle->monitor_);
if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
@@ -1114,7 +1163,7 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
listen_socket->AcceptComplete(buffer, completion_port_);
{
- Handle::ScopedLock lock(listen_socket);
+ MonitorLocker ml(listen_socket->monitor_);
TryDispatchingPendingAccepts(listen_socket);
}
@@ -1278,6 +1327,8 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
EventHandlerImplementation::EventHandlerImplementation() {
+ startup_monitor_ = new Monitor();
+ handler_thread_id_ = Thread::kInvalidThreadId;
completion_port_ =
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
if (completion_port_ == NULL) {
@@ -1288,6 +1339,8 @@ EventHandlerImplementation::EventHandlerImplementation() {
EventHandlerImplementation::~EventHandlerImplementation() {
+ Thread::Join(handler_thread_id_);
+ delete startup_monitor_;
CloseHandle(completion_port_);
}
@@ -1322,6 +1375,12 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
EventHandlerImplementation* handler_impl = &handler->delegate_;
ASSERT(handler_impl != NULL);
+ {
+ MonitorLocker ml(handler_impl->startup_monitor_);
+ handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
+ ml.Notify();
+ }
+
while (!handler_impl->shutdown_) {
DWORD bytes;
ULONG_PTR key;
@@ -1382,6 +1441,13 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
FATAL1("Failed to start event handler thread %d", result);
}
+ {
+ MonitorLocker ml(startup_monitor_);
+ while (handler_thread_id_ == Thread::kInvalidThreadId) {
+ ml.Wait();
+ }
+ }
+
// Initialize Winsock32
if (!Socket::Initialize()) {
FATAL("Failed to initialized Windows sockets");
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698