Chromium Code Reviews| Index: runtime/bin/eventhandler_win.cc |
| diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc |
| index db698a3c8831e637cb17e15fbeef1c242548d152..641040c8d229e43981a2702e9dc2a8c0e8511adf 100644 |
| --- a/runtime/bin/eventhandler_win.cc |
| +++ b/runtime/bin/eventhandler_win.cc |
| @@ -121,23 +121,16 @@ Handle::Handle(intptr_t handle) |
| pending_read_(NULL), |
| pending_write_(NULL), |
| last_error_(NOERROR), |
| - flags_(0) { |
| - InitializeCriticalSection(&cs_); |
| + flags_(0), |
| + read_thread_id_(Thread::kInvalidThreadId), |
| + read_thread_starting_(false), |
| + read_thread_finished_(false), |
| + monitor_(new Monitor()) { |
| } |
| Handle::~Handle() { |
| - DeleteCriticalSection(&cs_); |
| -} |
| - |
| - |
| -void Handle::Lock() { |
| - EnterCriticalSection(&cs_); |
| -} |
| - |
| - |
| -void Handle::Unlock() { |
| - LeaveCriticalSection(&cs_); |
| + delete monitor_; |
| } |
| @@ -154,7 +147,7 @@ bool Handle::CreateCompletionPort(HANDLE completion_port) { |
| void Handle::Close() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (!IsClosing()) { |
| // Close the socket and set the closing state. This close method can be |
| // called again if this socket has pending IO operations in flight. |
| @@ -175,28 +168,51 @@ void Handle::DoClose() { |
| bool Handle::HasPendingRead() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| return pending_read_ != NULL; |
| } |
| bool Handle::HasPendingWrite() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| return pending_write_ != NULL; |
| } |
| +void Handle::WaitForReadThreadFinished() { |
| + // Join the Reader thread if there is one. |
| + ThreadId to_join = Thread::kInvalidThreadId; |
| + { |
| + MonitorLocker ml(monitor_); |
| + if (read_thread_id_ != Thread::kInvalidThreadId) { |
| + while (!read_thread_finished_) { |
| + ml.Wait(); |
| + } |
| + read_thread_finished_ = false; |
| + to_join = read_thread_id_; |
| + read_thread_id_ = Thread::kInvalidThreadId; |
| + } |
| + } |
| + if (to_join != Thread::kInvalidThreadId) { |
| + Thread::Join(to_join); |
| + } |
| +} |
| + |
| + |
| void Handle::ReadComplete(OverlappedBuffer* buffer) { |
| - ScopedLock lock(this); |
| - // Currently only one outstanding read at the time. |
| - ASSERT(pending_read_ == buffer); |
| - ASSERT(data_ready_ == NULL); |
| - if (!IsClosing() && !buffer->IsEmpty()) { |
| - data_ready_ = pending_read_; |
| - } else { |
| - OverlappedBuffer::DisposeBuffer(buffer); |
| + { |
| + MonitorLocker ml(monitor_); |
| + // Currently only one outstanding read at the time. |
| + ASSERT(pending_read_ == buffer); |
| + ASSERT(data_ready_ == NULL); |
| + if (!IsClosing() && !buffer->IsEmpty()) { |
| + data_ready_ = pending_read_; |
| + } else { |
| + OverlappedBuffer::DisposeBuffer(buffer); |
| + } |
| + pending_read_ = NULL; |
| } |
| - pending_read_ = NULL; |
| + WaitForReadThreadFinished(); |
| } |
| @@ -206,7 +222,7 @@ void Handle::RecvFromComplete(OverlappedBuffer* buffer) { |
| void Handle::WriteComplete(OverlappedBuffer* buffer) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| // Currently only one outstanding write at the time. |
| ASSERT(pending_write_ == buffer); |
| OverlappedBuffer::DisposeBuffer(buffer); |
| @@ -221,6 +237,7 @@ static void ReadFileThread(uword args) { |
| void Handle::ReadSyncCompleteAsync() { |
| + NotifyReadThreadStarted(); |
| ASSERT(pending_read_ != NULL); |
| ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); |
| @@ -228,9 +245,10 @@ void Handle::ReadSyncCompleteAsync() { |
| if (GetFileType(handle_) == FILE_TYPE_CHAR) { |
| buffer_size = kStdOverlappedBufferSize; |
| } |
| + char* buffer_start = pending_read_->GetBufferStart(); |
| DWORD bytes_read = 0; |
| BOOL ok = ReadFile(handle_, |
| - pending_read_->GetBufferStart(), |
| + buffer_start, |
| buffer_size, |
| &bytes_read, |
| NULL); |
| @@ -245,11 +263,42 @@ void Handle::ReadSyncCompleteAsync() { |
| if (!ok) { |
| FATAL("PostQueuedCompletionStatus failed"); |
| } |
| + NotifyReadThreadFinished(); |
| +} |
| + |
| + |
| +void Handle::NotifyReadThreadStarted() { |
| + MonitorLocker ml(monitor_); |
| + ASSERT(!read_thread_starting_); |
| + ASSERT(read_thread_id_ == Thread::kInvalidThreadId); |
| + read_thread_id_ = Thread::GetCurrentThreadId(); |
| + read_thread_starting_ = true; |
| + ml.Notify(); |
| + while (read_thread_starting_) { |
| + ml.Wait(); |
| + } |
| +} |
| + |
| +void Handle::NotifyReadThreadFinished() { |
| + MonitorLocker ml(monitor_); |
| + ASSERT(!read_thread_finished_); |
| + ASSERT(read_thread_id_ != Thread::kInvalidThreadId); |
| + read_thread_finished_ = true; |
| + ml.Notify(); |
| +} |
| + |
| + |
| +void Handle::WaitForReadThreadStarted() { |
| + while (!read_thread_starting_) { |
| + monitor_->Wait(Monitor::kNoTimeout); |
| + } |
| + read_thread_starting_ = false; |
| + monitor_->Notify(); |
| + ASSERT(read_thread_id_ != Thread::kInvalidThreadId); |
| } |
| bool Handle::IssueRead() { |
|
Søren Gjesse
2015/08/27 13:28:24
MonitorLocker ml(monitor_); here.
zra
2015/08/27 14:04:51
The monitor is already entered at calls to non-soc
|
| - ScopedLock lock(this); |
| ASSERT(type_ != kListenSocket); |
| ASSERT(pending_read_ == NULL); |
| OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
| @@ -277,6 +326,7 @@ bool Handle::IssueRead() { |
| if (result != 0) { |
| FATAL1("Failed to start read file thread %d", result); |
| } |
| + WaitForReadThreadStarted(); |
| return true; |
| } |
| } |
| @@ -288,7 +338,7 @@ bool Handle::IssueRecvFrom() { |
| bool Handle::IssueWrite() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| ASSERT(type_ != kListenSocket); |
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
| ASSERT(pending_write_ != NULL); |
| @@ -345,7 +395,7 @@ void Handle::HandleIssueError() { |
| void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| event_handler_ = event_handler; |
| if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { |
| CreateCompletionPort(event_handler_->completion_port()); |
| @@ -360,7 +410,7 @@ bool FileHandle::IsClosed() { |
| void DirectoryWatchHandle::EnsureInitialized( |
| EventHandlerImplementation* event_handler) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| event_handler_ = event_handler; |
| if (completion_port_ == INVALID_HANDLE_VALUE) { |
| CreateCompletionPort(event_handler_->completion_port()); |
| @@ -374,7 +424,6 @@ bool DirectoryWatchHandle::IsClosed() { |
| bool DirectoryWatchHandle::IssueRead() { |
|
Søren Gjesse
2015/08/27 13:28:24
MonitorLocker ml(monitor_); here as well.
zra
2015/08/27 14:04:51
Ditto.
|
| - ScopedLock lock(this); |
| // It may have been started before, as we start the directory-handler when |
| // we create it. |
| if (pending_read_ != NULL || data_ready_ != NULL) return true; |
| @@ -399,7 +448,7 @@ bool DirectoryWatchHandle::IssueRead() { |
| void DirectoryWatchHandle::Stop() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| // Stop the outstanding read, so we can close the handle. |
| if (pending_read_ != NULL) { |
| @@ -443,7 +492,7 @@ bool ListenSocket::LoadAcceptEx() { |
| bool ListenSocket::IssueAccept() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| // For AcceptEx there needs to be buffer storage for address |
| // information for two addresses (local and remote address). The |
| @@ -483,7 +532,7 @@ bool ListenSocket::IssueAccept() { |
| void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
| HANDLE completion_port) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (!IsClosing()) { |
| // Update the accepted socket to support the full range of API calls. |
| SOCKET s = socket(); |
| @@ -544,13 +593,13 @@ void ListenSocket::DoClose() { |
| bool ListenSocket::CanAccept() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| return accepted_head_ != NULL; |
| } |
| ClientSocket* ListenSocket::Accept() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| ClientSocket *result = NULL; |
| @@ -577,7 +626,7 @@ ClientSocket* ListenSocket::Accept() { |
| void ListenSocket::EnsureInitialized( |
| EventHandlerImplementation* event_handler) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (AcceptEx_ == NULL) { |
| ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
| ASSERT(event_handler_ == NULL); |
| @@ -594,7 +643,7 @@ bool ListenSocket::IsClosed() { |
| intptr_t Handle::Available() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (data_ready_ == NULL) return 0; |
| ASSERT(!data_ready_->IsEmpty()); |
| return data_ready_->GetRemainingLength(); |
| @@ -602,7 +651,7 @@ intptr_t Handle::Available() { |
| intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (data_ready_ == NULL) return 0; |
| num_bytes = data_ready_->Read( |
| buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
| @@ -617,7 +666,7 @@ intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
| intptr_t Handle::RecvFrom( |
| void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (data_ready_ == NULL) return 0; |
| num_bytes = data_ready_->Read( |
| buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
| @@ -639,7 +688,7 @@ intptr_t Handle::RecvFrom( |
| intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (pending_write_ != NULL) return 0; |
| if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
| ASSERT(SupportsOverlappedIO()); |
| @@ -656,7 +705,7 @@ intptr_t Handle::SendTo(const void* buffer, |
| intptr_t num_bytes, |
| struct sockaddr* sa, |
| socklen_t sa_len) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (pending_write_ != NULL) return 0; |
| if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
| ASSERT(SupportsOverlappedIO()); |
| @@ -675,13 +724,14 @@ static void WriteFileThread(uword args) { |
| void StdHandle::RunWriteLoop() { |
| - write_monitor_->Enter(); |
| + MonitorLocker ml(monitor_); |
| write_thread_running_ = true; |
| + thread_id_ = Thread::GetCurrentThreadId(); |
| // Notify we have started. |
| - write_monitor_->Notify(); |
| + ml.Notify(); |
| while (write_thread_running_) { |
| - write_monitor_->Wait(Monitor::kNoTimeout); |
| + ml.Wait(Monitor::kNoTimeout); |
| if (pending_write_ != NULL) { |
| // We woke up and had a pending write. Execute it. |
| WriteSyncCompleteAsync(); |
| @@ -689,8 +739,7 @@ void StdHandle::RunWriteLoop() { |
| } |
| write_thread_exists_ = false; |
| - write_monitor_->Notify(); |
| - write_monitor_->Exit(); |
| + ml.Notify(); |
| } |
| @@ -718,7 +767,7 @@ void StdHandle::WriteSyncCompleteAsync() { |
| } |
| intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (pending_write_ != NULL) return 0; |
| if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
| // In the case of stdout and stderr, OverlappedIO is not supported. |
| @@ -726,7 +775,6 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
| // This code is actually never exposed to the user, as stdout and stderr is |
| // not available as a RawSocket, but only wrapped in a Socket. |
| // Note that we return '0', unless a thread have already completed a write. |
| - MonitorLocker locker(write_monitor_); |
| if (thread_wrote_ > 0) { |
| if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; |
| thread_wrote_ -= num_bytes; |
| @@ -734,14 +782,14 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
| } |
| if (!write_thread_exists_) { |
| write_thread_exists_ = true; |
| - int result = Thread::Start(WriteFileThread, |
| - reinterpret_cast<uword>(this)); |
| + int result = Thread::Start( |
| + WriteFileThread, reinterpret_cast<uword>(this)); |
| if (result != 0) { |
| FATAL1("Failed to start write file thread %d", result); |
| } |
| while (!write_thread_running_) { |
| // Wait until we the thread is running. |
| - locker.Wait(Monitor::kNoTimeout); |
| + ml.Wait(Monitor::kNoTimeout); |
| } |
| } |
| // Only queue up to INT_MAX bytes. |
| @@ -749,19 +797,20 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
| // Create buffer and notify thread about the new handle. |
| pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
| pending_write_->Write(buffer, truncated_bytes); |
| - locker.Notify(); |
| + ml.Notify(); |
| return 0; |
| } |
| void StdHandle::DoClose() { |
| - MonitorLocker locker(write_monitor_); |
| + MonitorLocker ml(monitor_); |
| if (write_thread_exists_) { |
| write_thread_running_ = false; |
| - locker.Notify(); |
| + ml.Notify(); |
| while (write_thread_exists_) { |
| - locker.Wait(Monitor::kNoTimeout); |
| + ml.Wait(Monitor::kNoTimeout); |
| } |
| + Thread::Join(thread_id_); |
| } |
| Handle::DoClose(); |
| } |
| @@ -807,7 +856,7 @@ void ClientSocket::DoClose() { |
| bool ClientSocket::IssueRead() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
| ASSERT(pending_read_ == NULL); |
| @@ -836,7 +885,7 @@ bool ClientSocket::IssueRead() { |
| bool ClientSocket::IssueWrite() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
| ASSERT(pending_write_ != NULL); |
| ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
| @@ -900,7 +949,7 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { |
| void ClientSocket::EnsureInitialized( |
| EventHandlerImplementation* event_handler) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (completion_port_ == INVALID_HANDLE_VALUE) { |
| ASSERT(event_handler_ == NULL); |
| event_handler_ = event_handler; |
| @@ -915,7 +964,7 @@ bool ClientSocket::IsClosed() { |
| bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
| ASSERT(pending_write_ != NULL); |
| ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); |
| @@ -940,7 +989,7 @@ bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
| bool DatagramSocket::IssueRecvFrom() { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
| ASSERT(pending_read_ == NULL); |
| @@ -970,7 +1019,7 @@ bool DatagramSocket::IssueRecvFrom() { |
| void DatagramSocket::EnsureInitialized( |
| EventHandlerImplementation* event_handler) { |
| - ScopedLock lock(this); |
| + MonitorLocker ml(monitor_); |
| if (completion_port_ == INVALID_HANDLE_VALUE) { |
| ASSERT(event_handler_ == NULL); |
| event_handler_ = event_handler; |
| @@ -1010,7 +1059,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
| reinterpret_cast<ListenSocket*>(handle); |
| listen_socket->EnsureInitialized(this); |
| - Handle::ScopedLock lock(listen_socket); |
| + MonitorLocker ml(listen_socket->monitor_); |
| if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
| listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
| @@ -1040,7 +1089,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
| } |
| } else { |
| handle->EnsureInitialized(this); |
| - Handle::ScopedLock lock(handle); |
| + MonitorLocker ml(handle->monitor_); |
| if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
| handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
| @@ -1114,7 +1163,7 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, |
| listen_socket->AcceptComplete(buffer, completion_port_); |
| { |
| - Handle::ScopedLock lock(listen_socket); |
| + MonitorLocker ml(listen_socket->monitor_); |
| TryDispatchingPendingAccepts(listen_socket); |
| } |
| @@ -1278,6 +1327,8 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, |
| EventHandlerImplementation::EventHandlerImplementation() { |
| + startup_monitor_ = new Monitor(); |
| + handler_thread_id_ = Thread::kInvalidThreadId; |
| completion_port_ = |
| CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
| if (completion_port_ == NULL) { |
| @@ -1288,6 +1339,8 @@ EventHandlerImplementation::EventHandlerImplementation() { |
| EventHandlerImplementation::~EventHandlerImplementation() { |
| + Thread::Join(handler_thread_id_); |
| + delete startup_monitor_; |
| CloseHandle(completion_port_); |
| } |
| @@ -1322,6 +1375,12 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) { |
| EventHandlerImplementation* handler_impl = &handler->delegate_; |
| ASSERT(handler_impl != NULL); |
| + { |
| + MonitorLocker ml(handler_impl->startup_monitor_); |
| + handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); |
| + ml.Notify(); |
| + } |
| + |
| while (!handler_impl->shutdown_) { |
| DWORD bytes; |
| ULONG_PTR key; |
| @@ -1382,6 +1441,13 @@ void EventHandlerImplementation::Start(EventHandler* handler) { |
| FATAL1("Failed to start event handler thread %d", result); |
| } |
| + { |
| + MonitorLocker ml(startup_monitor_); |
| + while (handler_thread_id_ == Thread::kInvalidThreadId) { |
| + ml.Wait(); |
| + } |
| + } |
| + |
| // Initialize Winsock32 |
| if (!Socket::Initialize()) { |
| FATAL("Failed to initialized Windows sockets"); |