Index: runtime/bin/eventhandler_win.cc |
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc |
index db698a3c8831e637cb17e15fbeef1c242548d152..641040c8d229e43981a2702e9dc2a8c0e8511adf 100644 |
--- a/runtime/bin/eventhandler_win.cc |
+++ b/runtime/bin/eventhandler_win.cc |
@@ -121,23 +121,16 @@ Handle::Handle(intptr_t handle) |
pending_read_(NULL), |
pending_write_(NULL), |
last_error_(NOERROR), |
- flags_(0) { |
- InitializeCriticalSection(&cs_); |
+ flags_(0), |
+ read_thread_id_(Thread::kInvalidThreadId), |
+ read_thread_starting_(false), |
+ read_thread_finished_(false), |
+ monitor_(new Monitor()) { |
} |
Handle::~Handle() { |
- DeleteCriticalSection(&cs_); |
-} |
- |
- |
-void Handle::Lock() { |
- EnterCriticalSection(&cs_); |
-} |
- |
- |
-void Handle::Unlock() { |
- LeaveCriticalSection(&cs_); |
+ delete monitor_; |
} |
@@ -154,7 +147,7 @@ bool Handle::CreateCompletionPort(HANDLE completion_port) { |
void Handle::Close() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (!IsClosing()) { |
// Close the socket and set the closing state. This close method can be |
// called again if this socket has pending IO operations in flight. |
@@ -175,28 +168,51 @@ void Handle::DoClose() { |
bool Handle::HasPendingRead() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
return pending_read_ != NULL; |
} |
bool Handle::HasPendingWrite() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
return pending_write_ != NULL; |
} |
+void Handle::WaitForReadThreadFinished() { |
+ // Join the Reader thread if there is one. |
+ ThreadId to_join = Thread::kInvalidThreadId; |
+ { |
+ MonitorLocker ml(monitor_); |
+ if (read_thread_id_ != Thread::kInvalidThreadId) { |
+ while (!read_thread_finished_) { |
+ ml.Wait(); |
+ } |
+ read_thread_finished_ = false; |
+ to_join = read_thread_id_; |
+ read_thread_id_ = Thread::kInvalidThreadId; |
+ } |
+ } |
+ if (to_join != Thread::kInvalidThreadId) { |
+ Thread::Join(to_join); |
+ } |
+} |
+ |
+ |
void Handle::ReadComplete(OverlappedBuffer* buffer) { |
- ScopedLock lock(this); |
- // Currently only one outstanding read at the time. |
- ASSERT(pending_read_ == buffer); |
- ASSERT(data_ready_ == NULL); |
- if (!IsClosing() && !buffer->IsEmpty()) { |
- data_ready_ = pending_read_; |
- } else { |
- OverlappedBuffer::DisposeBuffer(buffer); |
+ { |
+ MonitorLocker ml(monitor_); |
+ // Currently only one outstanding read at the time. |
+ ASSERT(pending_read_ == buffer); |
+ ASSERT(data_ready_ == NULL); |
+ if (!IsClosing() && !buffer->IsEmpty()) { |
+ data_ready_ = pending_read_; |
+ } else { |
+ OverlappedBuffer::DisposeBuffer(buffer); |
+ } |
+ pending_read_ = NULL; |
} |
- pending_read_ = NULL; |
+ WaitForReadThreadFinished(); |
} |
@@ -206,7 +222,7 @@ void Handle::RecvFromComplete(OverlappedBuffer* buffer) { |
void Handle::WriteComplete(OverlappedBuffer* buffer) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
// Currently only one outstanding write at the time. |
ASSERT(pending_write_ == buffer); |
OverlappedBuffer::DisposeBuffer(buffer); |
@@ -221,6 +237,7 @@ static void ReadFileThread(uword args) { |
void Handle::ReadSyncCompleteAsync() { |
+ NotifyReadThreadStarted(); |
ASSERT(pending_read_ != NULL); |
ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); |
@@ -228,9 +245,10 @@ void Handle::ReadSyncCompleteAsync() { |
if (GetFileType(handle_) == FILE_TYPE_CHAR) { |
buffer_size = kStdOverlappedBufferSize; |
} |
+ char* buffer_start = pending_read_->GetBufferStart(); |
DWORD bytes_read = 0; |
BOOL ok = ReadFile(handle_, |
- pending_read_->GetBufferStart(), |
+ buffer_start, |
buffer_size, |
&bytes_read, |
NULL); |
@@ -245,11 +263,42 @@ void Handle::ReadSyncCompleteAsync() { |
if (!ok) { |
FATAL("PostQueuedCompletionStatus failed"); |
} |
+ NotifyReadThreadFinished(); |
+} |
+ |
+ |
+void Handle::NotifyReadThreadStarted() { |
+ MonitorLocker ml(monitor_); |
+ ASSERT(!read_thread_starting_); |
+ ASSERT(read_thread_id_ == Thread::kInvalidThreadId); |
+ read_thread_id_ = Thread::GetCurrentThreadId(); |
+ read_thread_starting_ = true; |
+ ml.Notify(); |
+ while (read_thread_starting_) { |
+ ml.Wait(); |
+ } |
+} |
+ |
+void Handle::NotifyReadThreadFinished() { |
+ MonitorLocker ml(monitor_); |
+ ASSERT(!read_thread_finished_); |
+ ASSERT(read_thread_id_ != Thread::kInvalidThreadId); |
+ read_thread_finished_ = true; |
+ ml.Notify(); |
+} |
+ |
+ |
+void Handle::WaitForReadThreadStarted() { |
+ while (!read_thread_starting_) { |
+ monitor_->Wait(Monitor::kNoTimeout); |
+ } |
+ read_thread_starting_ = false; |
+ monitor_->Notify(); |
+ ASSERT(read_thread_id_ != Thread::kInvalidThreadId); |
} |
bool Handle::IssueRead() { |
Søren Gjesse
2015/08/27 13:28:24
MonitorLocker ml(monitor_); here.
zra
2015/08/27 14:04:51
The monitor is already entered at calls to non-soc
|
- ScopedLock lock(this); |
ASSERT(type_ != kListenSocket); |
ASSERT(pending_read_ == NULL); |
OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
@@ -277,6 +326,7 @@ bool Handle::IssueRead() { |
if (result != 0) { |
FATAL1("Failed to start read file thread %d", result); |
} |
+ WaitForReadThreadStarted(); |
return true; |
} |
} |
@@ -288,7 +338,7 @@ bool Handle::IssueRecvFrom() { |
bool Handle::IssueWrite() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
ASSERT(type_ != kListenSocket); |
ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
ASSERT(pending_write_ != NULL); |
@@ -345,7 +395,7 @@ void Handle::HandleIssueError() { |
void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
event_handler_ = event_handler; |
if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { |
CreateCompletionPort(event_handler_->completion_port()); |
@@ -360,7 +410,7 @@ bool FileHandle::IsClosed() { |
void DirectoryWatchHandle::EnsureInitialized( |
EventHandlerImplementation* event_handler) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
event_handler_ = event_handler; |
if (completion_port_ == INVALID_HANDLE_VALUE) { |
CreateCompletionPort(event_handler_->completion_port()); |
@@ -374,7 +424,6 @@ bool DirectoryWatchHandle::IsClosed() { |
bool DirectoryWatchHandle::IssueRead() { |
Søren Gjesse
2015/08/27 13:28:24
MonitorLocker ml(monitor_); here as well.
zra
2015/08/27 14:04:51
Ditto.
|
- ScopedLock lock(this); |
// It may have been started before, as we start the directory-handler when |
// we create it. |
if (pending_read_ != NULL || data_ready_ != NULL) return true; |
@@ -399,7 +448,7 @@ bool DirectoryWatchHandle::IssueRead() { |
void DirectoryWatchHandle::Stop() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
// Stop the outstanding read, so we can close the handle. |
if (pending_read_ != NULL) { |
@@ -443,7 +492,7 @@ bool ListenSocket::LoadAcceptEx() { |
bool ListenSocket::IssueAccept() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
// For AcceptEx there needs to be buffer storage for address |
// information for two addresses (local and remote address). The |
@@ -483,7 +532,7 @@ bool ListenSocket::IssueAccept() { |
void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
HANDLE completion_port) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (!IsClosing()) { |
// Update the accepted socket to support the full range of API calls. |
SOCKET s = socket(); |
@@ -544,13 +593,13 @@ void ListenSocket::DoClose() { |
bool ListenSocket::CanAccept() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
return accepted_head_ != NULL; |
} |
ClientSocket* ListenSocket::Accept() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
ClientSocket *result = NULL; |
@@ -577,7 +626,7 @@ ClientSocket* ListenSocket::Accept() { |
void ListenSocket::EnsureInitialized( |
EventHandlerImplementation* event_handler) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (AcceptEx_ == NULL) { |
ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
ASSERT(event_handler_ == NULL); |
@@ -594,7 +643,7 @@ bool ListenSocket::IsClosed() { |
intptr_t Handle::Available() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (data_ready_ == NULL) return 0; |
ASSERT(!data_ready_->IsEmpty()); |
return data_ready_->GetRemainingLength(); |
@@ -602,7 +651,7 @@ intptr_t Handle::Available() { |
intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (data_ready_ == NULL) return 0; |
num_bytes = data_ready_->Read( |
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
@@ -617,7 +666,7 @@ intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
intptr_t Handle::RecvFrom( |
void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (data_ready_ == NULL) return 0; |
num_bytes = data_ready_->Read( |
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
@@ -639,7 +688,7 @@ intptr_t Handle::RecvFrom( |
intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (pending_write_ != NULL) return 0; |
if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
ASSERT(SupportsOverlappedIO()); |
@@ -656,7 +705,7 @@ intptr_t Handle::SendTo(const void* buffer, |
intptr_t num_bytes, |
struct sockaddr* sa, |
socklen_t sa_len) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (pending_write_ != NULL) return 0; |
if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
ASSERT(SupportsOverlappedIO()); |
@@ -675,13 +724,14 @@ static void WriteFileThread(uword args) { |
void StdHandle::RunWriteLoop() { |
- write_monitor_->Enter(); |
+ MonitorLocker ml(monitor_); |
write_thread_running_ = true; |
+ thread_id_ = Thread::GetCurrentThreadId(); |
// Notify we have started. |
- write_monitor_->Notify(); |
+ ml.Notify(); |
while (write_thread_running_) { |
- write_monitor_->Wait(Monitor::kNoTimeout); |
+ ml.Wait(Monitor::kNoTimeout); |
if (pending_write_ != NULL) { |
// We woke up and had a pending write. Execute it. |
WriteSyncCompleteAsync(); |
@@ -689,8 +739,7 @@ void StdHandle::RunWriteLoop() { |
} |
write_thread_exists_ = false; |
- write_monitor_->Notify(); |
- write_monitor_->Exit(); |
+ ml.Notify(); |
} |
@@ -718,7 +767,7 @@ void StdHandle::WriteSyncCompleteAsync() { |
} |
intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (pending_write_ != NULL) return 0; |
if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
// In the case of stdout and stderr, OverlappedIO is not supported. |
@@ -726,7 +775,6 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
// This code is actually never exposed to the user, as stdout and stderr is |
// not available as a RawSocket, but only wrapped in a Socket. |
// Note that we return '0', unless a thread have already completed a write. |
- MonitorLocker locker(write_monitor_); |
if (thread_wrote_ > 0) { |
if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; |
thread_wrote_ -= num_bytes; |
@@ -734,14 +782,14 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
} |
if (!write_thread_exists_) { |
write_thread_exists_ = true; |
- int result = Thread::Start(WriteFileThread, |
- reinterpret_cast<uword>(this)); |
+ int result = Thread::Start( |
+ WriteFileThread, reinterpret_cast<uword>(this)); |
if (result != 0) { |
FATAL1("Failed to start write file thread %d", result); |
} |
while (!write_thread_running_) { |
// Wait until we the thread is running. |
- locker.Wait(Monitor::kNoTimeout); |
+ ml.Wait(Monitor::kNoTimeout); |
} |
} |
// Only queue up to INT_MAX bytes. |
@@ -749,19 +797,20 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
// Create buffer and notify thread about the new handle. |
pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
pending_write_->Write(buffer, truncated_bytes); |
- locker.Notify(); |
+ ml.Notify(); |
return 0; |
} |
void StdHandle::DoClose() { |
- MonitorLocker locker(write_monitor_); |
+ MonitorLocker ml(monitor_); |
if (write_thread_exists_) { |
write_thread_running_ = false; |
- locker.Notify(); |
+ ml.Notify(); |
while (write_thread_exists_) { |
- locker.Wait(Monitor::kNoTimeout); |
+ ml.Wait(Monitor::kNoTimeout); |
} |
+ Thread::Join(thread_id_); |
} |
Handle::DoClose(); |
} |
@@ -807,7 +856,7 @@ void ClientSocket::DoClose() { |
bool ClientSocket::IssueRead() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
ASSERT(pending_read_ == NULL); |
@@ -836,7 +885,7 @@ bool ClientSocket::IssueRead() { |
bool ClientSocket::IssueWrite() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
ASSERT(pending_write_ != NULL); |
ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
@@ -900,7 +949,7 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { |
void ClientSocket::EnsureInitialized( |
EventHandlerImplementation* event_handler) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (completion_port_ == INVALID_HANDLE_VALUE) { |
ASSERT(event_handler_ == NULL); |
event_handler_ = event_handler; |
@@ -915,7 +964,7 @@ bool ClientSocket::IsClosed() { |
bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
ASSERT(pending_write_ != NULL); |
ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); |
@@ -940,7 +989,7 @@ bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
bool DatagramSocket::IssueRecvFrom() { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
ASSERT(pending_read_ == NULL); |
@@ -970,7 +1019,7 @@ bool DatagramSocket::IssueRecvFrom() { |
void DatagramSocket::EnsureInitialized( |
EventHandlerImplementation* event_handler) { |
- ScopedLock lock(this); |
+ MonitorLocker ml(monitor_); |
if (completion_port_ == INVALID_HANDLE_VALUE) { |
ASSERT(event_handler_ == NULL); |
event_handler_ = event_handler; |
@@ -1010,7 +1059,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
reinterpret_cast<ListenSocket*>(handle); |
listen_socket->EnsureInitialized(this); |
- Handle::ScopedLock lock(listen_socket); |
+ MonitorLocker ml(listen_socket->monitor_); |
if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
@@ -1040,7 +1089,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
} |
} else { |
handle->EnsureInitialized(this); |
- Handle::ScopedLock lock(handle); |
+ MonitorLocker ml(handle->monitor_); |
if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
@@ -1114,7 +1163,7 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, |
listen_socket->AcceptComplete(buffer, completion_port_); |
{ |
- Handle::ScopedLock lock(listen_socket); |
+ MonitorLocker ml(listen_socket->monitor_); |
TryDispatchingPendingAccepts(listen_socket); |
} |
@@ -1278,6 +1327,8 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, |
EventHandlerImplementation::EventHandlerImplementation() { |
+ startup_monitor_ = new Monitor(); |
+ handler_thread_id_ = Thread::kInvalidThreadId; |
completion_port_ = |
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
if (completion_port_ == NULL) { |
@@ -1288,6 +1339,8 @@ EventHandlerImplementation::EventHandlerImplementation() { |
EventHandlerImplementation::~EventHandlerImplementation() { |
+ Thread::Join(handler_thread_id_); |
+ delete startup_monitor_; |
CloseHandle(completion_port_); |
} |
@@ -1322,6 +1375,12 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) { |
EventHandlerImplementation* handler_impl = &handler->delegate_; |
ASSERT(handler_impl != NULL); |
+ { |
+ MonitorLocker ml(handler_impl->startup_monitor_); |
+ handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); |
+ ml.Notify(); |
+ } |
+ |
while (!handler_impl->shutdown_) { |
DWORD bytes; |
ULONG_PTR key; |
@@ -1382,6 +1441,13 @@ void EventHandlerImplementation::Start(EventHandler* handler) { |
FATAL1("Failed to start event handler thread %d", result); |
} |
+ { |
+ MonitorLocker ml(startup_monitor_); |
+ while (handler_thread_id_ == Thread::kInvalidThreadId) { |
+ ml.Wait(); |
+ } |
+ } |
+ |
// Initialize Winsock32 |
if (!Socket::Initialize()) { |
FATAL("Failed to initialized Windows sockets"); |