| Index: runtime/bin/eventhandler_win.cc
|
| diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
|
| index db698a3c8831e637cb17e15fbeef1c242548d152..6e27f821fb2a5fe603b02d80f04af957f6212f2c 100644
|
| --- a/runtime/bin/eventhandler_win.cc
|
| +++ b/runtime/bin/eventhandler_win.cc
|
| @@ -121,23 +121,16 @@ Handle::Handle(intptr_t handle)
|
| pending_read_(NULL),
|
| pending_write_(NULL),
|
| last_error_(NOERROR),
|
| - flags_(0) {
|
| - InitializeCriticalSection(&cs_);
|
| + flags_(0),
|
| + read_thread_id_(Thread::kInvalidThreadId),
|
| + read_thread_starting_(false),
|
| + read_thread_finished_(false),
|
| + monitor_(new Monitor()) {
|
| }
|
|
|
|
|
| Handle::~Handle() {
|
| - DeleteCriticalSection(&cs_);
|
| -}
|
| -
|
| -
|
| -void Handle::Lock() {
|
| - EnterCriticalSection(&cs_);
|
| -}
|
| -
|
| -
|
| -void Handle::Unlock() {
|
| - LeaveCriticalSection(&cs_);
|
| + delete monitor_;
|
| }
|
|
|
|
|
| @@ -154,7 +147,7 @@ bool Handle::CreateCompletionPort(HANDLE completion_port) {
|
|
|
|
|
| void Handle::Close() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (!IsClosing()) {
|
| // Close the socket and set the closing state. This close method can be
|
| // called again if this socket has pending IO operations in flight.
|
| @@ -175,28 +168,60 @@ void Handle::DoClose() {
|
|
|
|
|
| bool Handle::HasPendingRead() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| return pending_read_ != NULL;
|
| }
|
|
|
|
|
| bool Handle::HasPendingWrite() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| return pending_write_ != NULL;
|
| }
|
|
|
|
|
| +void Handle::WaitForReadThreadStarted() {
|
| + MonitorLocker ml(monitor_);
|
| + while (read_thread_starting_) {
|
| + ml.Wait();
|
| + }
|
| +}
|
| +
|
| +
|
| +void Handle::WaitForReadThreadFinished() {
|
| + // Join the Reader thread if there is one.
|
| + ThreadId to_join = Thread::kInvalidThreadId;
|
| + {
|
| + MonitorLocker ml(monitor_);
|
| + if (read_thread_id_ != Thread::kInvalidThreadId) {
|
| + while (!read_thread_finished_) {
|
| + ml.Wait();
|
| + }
|
| + read_thread_finished_ = false;
|
| + to_join = read_thread_id_;
|
| + read_thread_id_ = Thread::kInvalidThreadId;
|
| + }
|
| + }
|
| + if (to_join != Thread::kInvalidThreadId) {
|
| + Thread::Join(to_join);
|
| + }
|
| +}
|
| +
|
| +
|
| void Handle::ReadComplete(OverlappedBuffer* buffer) {
|
| - ScopedLock lock(this);
|
| - // Currently only one outstanding read at the time.
|
| - ASSERT(pending_read_ == buffer);
|
| - ASSERT(data_ready_ == NULL);
|
| - if (!IsClosing() && !buffer->IsEmpty()) {
|
| - data_ready_ = pending_read_;
|
| - } else {
|
| - OverlappedBuffer::DisposeBuffer(buffer);
|
| + WaitForReadThreadStarted();
|
| + {
|
| + MonitorLocker ml(monitor_);
|
| + // Currently only one outstanding read at the time.
|
| + ASSERT(pending_read_ == buffer);
|
| + ASSERT(data_ready_ == NULL);
|
| + if (!IsClosing() && !buffer->IsEmpty()) {
|
| + data_ready_ = pending_read_;
|
| + } else {
|
| + OverlappedBuffer::DisposeBuffer(buffer);
|
| + }
|
| + pending_read_ = NULL;
|
| }
|
| - pending_read_ = NULL;
|
| + WaitForReadThreadFinished();
|
| }
|
|
|
|
|
| @@ -206,7 +231,7 @@ void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
|
|
|
|
|
| void Handle::WriteComplete(OverlappedBuffer* buffer) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| // Currently only one outstanding write at the time.
|
| ASSERT(pending_write_ == buffer);
|
| OverlappedBuffer::DisposeBuffer(buffer);
|
| @@ -220,7 +245,26 @@ static void ReadFileThread(uword args) {
|
| }
|
|
|
|
|
| +void Handle::NotifyReadThreadStarted() {
|
| + MonitorLocker ml(monitor_);
|
| + ASSERT(read_thread_starting_);
|
| + ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
|
| + read_thread_id_ = Thread::GetCurrentThreadId();
|
| + read_thread_starting_ = false;
|
| + ml.Notify();
|
| +}
|
| +
|
| +void Handle::NotifyReadThreadFinished() {
|
| + MonitorLocker ml(monitor_);
|
| + ASSERT(!read_thread_finished_);
|
| + ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
|
| + read_thread_finished_ = true;
|
| + ml.Notify();
|
| +}
|
| +
|
| +
|
| void Handle::ReadSyncCompleteAsync() {
|
| + NotifyReadThreadStarted();
|
| ASSERT(pending_read_ != NULL);
|
| ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
|
|
|
| @@ -228,9 +272,10 @@ void Handle::ReadSyncCompleteAsync() {
|
| if (GetFileType(handle_) == FILE_TYPE_CHAR) {
|
| buffer_size = kStdOverlappedBufferSize;
|
| }
|
| + char* buffer_start = pending_read_->GetBufferStart();
|
| DWORD bytes_read = 0;
|
| BOOL ok = ReadFile(handle_,
|
| - pending_read_->GetBufferStart(),
|
| + buffer_start,
|
| buffer_size,
|
| &bytes_read,
|
| NULL);
|
| @@ -245,11 +290,11 @@ void Handle::ReadSyncCompleteAsync() {
|
| if (!ok) {
|
| FATAL("PostQueuedCompletionStatus failed");
|
| }
|
| + NotifyReadThreadFinished();
|
| }
|
|
|
|
|
| bool Handle::IssueRead() {
|
| - ScopedLock lock(this);
|
| ASSERT(type_ != kListenSocket);
|
| ASSERT(pending_read_ == NULL);
|
| OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
|
| @@ -272,6 +317,7 @@ bool Handle::IssueRead() {
|
| } else {
|
| // Completing asynchronously through thread.
|
| pending_read_ = buffer;
|
| + read_thread_starting_ = true;
|
| int result = Thread::Start(ReadFileThread,
|
| reinterpret_cast<uword>(this));
|
| if (result != 0) {
|
| @@ -288,7 +334,7 @@ bool Handle::IssueRecvFrom() {
|
|
|
|
|
| bool Handle::IssueWrite() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| ASSERT(type_ != kListenSocket);
|
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| ASSERT(pending_write_ != NULL);
|
| @@ -345,7 +391,7 @@ void Handle::HandleIssueError() {
|
|
|
|
|
| void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| event_handler_ = event_handler;
|
| if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) {
|
| CreateCompletionPort(event_handler_->completion_port());
|
| @@ -360,7 +406,7 @@ bool FileHandle::IsClosed() {
|
|
|
| void DirectoryWatchHandle::EnsureInitialized(
|
| EventHandlerImplementation* event_handler) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| event_handler_ = event_handler;
|
| if (completion_port_ == INVALID_HANDLE_VALUE) {
|
| CreateCompletionPort(event_handler_->completion_port());
|
| @@ -374,7 +420,6 @@ bool DirectoryWatchHandle::IsClosed() {
|
|
|
|
|
| bool DirectoryWatchHandle::IssueRead() {
|
| - ScopedLock lock(this);
|
| // It may have been started before, as we start the directory-handler when
|
| // we create it.
|
| if (pending_read_ != NULL || data_ready_ != NULL) return true;
|
| @@ -399,7 +444,7 @@ bool DirectoryWatchHandle::IssueRead() {
|
|
|
|
|
| void DirectoryWatchHandle::Stop() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| // Stop the outstanding read, so we can close the handle.
|
|
|
| if (pending_read_ != NULL) {
|
| @@ -443,7 +488,7 @@ bool ListenSocket::LoadAcceptEx() {
|
|
|
|
|
| bool ListenSocket::IssueAccept() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
|
|
| // For AcceptEx there needs to be buffer storage for address
|
| // information for two addresses (local and remote address). The
|
| @@ -483,7 +528,7 @@ bool ListenSocket::IssueAccept() {
|
|
|
| void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
|
| HANDLE completion_port) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (!IsClosing()) {
|
| // Update the accepted socket to support the full range of API calls.
|
| SOCKET s = socket();
|
| @@ -544,13 +589,13 @@ void ListenSocket::DoClose() {
|
|
|
|
|
| bool ListenSocket::CanAccept() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| return accepted_head_ != NULL;
|
| }
|
|
|
|
|
| ClientSocket* ListenSocket::Accept() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
|
|
| ClientSocket *result = NULL;
|
|
|
| @@ -577,7 +622,7 @@ ClientSocket* ListenSocket::Accept() {
|
|
|
| void ListenSocket::EnsureInitialized(
|
| EventHandlerImplementation* event_handler) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (AcceptEx_ == NULL) {
|
| ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
|
| ASSERT(event_handler_ == NULL);
|
| @@ -594,7 +639,7 @@ bool ListenSocket::IsClosed() {
|
|
|
|
|
| intptr_t Handle::Available() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (data_ready_ == NULL) return 0;
|
| ASSERT(!data_ready_->IsEmpty());
|
| return data_ready_->GetRemainingLength();
|
| @@ -602,7 +647,7 @@ intptr_t Handle::Available() {
|
|
|
|
|
| intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (data_ready_ == NULL) return 0;
|
| num_bytes = data_ready_->Read(
|
| buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
|
| @@ -617,7 +662,7 @@ intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
|
|
|
| intptr_t Handle::RecvFrom(
|
| void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (data_ready_ == NULL) return 0;
|
| num_bytes = data_ready_->Read(
|
| buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
|
| @@ -639,7 +684,7 @@ intptr_t Handle::RecvFrom(
|
|
|
|
|
| intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (pending_write_ != NULL) return 0;
|
| if (num_bytes > kBufferSize) num_bytes = kBufferSize;
|
| ASSERT(SupportsOverlappedIO());
|
| @@ -656,7 +701,7 @@ intptr_t Handle::SendTo(const void* buffer,
|
| intptr_t num_bytes,
|
| struct sockaddr* sa,
|
| socklen_t sa_len) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (pending_write_ != NULL) return 0;
|
| if (num_bytes > kBufferSize) num_bytes = kBufferSize;
|
| ASSERT(SupportsOverlappedIO());
|
| @@ -675,13 +720,14 @@ static void WriteFileThread(uword args) {
|
|
|
|
|
| void StdHandle::RunWriteLoop() {
|
| - write_monitor_->Enter();
|
| + MonitorLocker ml(monitor_);
|
| write_thread_running_ = true;
|
| + thread_id_ = Thread::GetCurrentThreadId();
|
| // Notify we have started.
|
| - write_monitor_->Notify();
|
| + ml.Notify();
|
|
|
| while (write_thread_running_) {
|
| - write_monitor_->Wait(Monitor::kNoTimeout);
|
| + ml.Wait(Monitor::kNoTimeout);
|
| if (pending_write_ != NULL) {
|
| // We woke up and had a pending write. Execute it.
|
| WriteSyncCompleteAsync();
|
| @@ -689,8 +735,7 @@ void StdHandle::RunWriteLoop() {
|
| }
|
|
|
| write_thread_exists_ = false;
|
| - write_monitor_->Notify();
|
| - write_monitor_->Exit();
|
| + ml.Notify();
|
| }
|
|
|
|
|
| @@ -718,7 +763,7 @@ void StdHandle::WriteSyncCompleteAsync() {
|
| }
|
|
|
| intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (pending_write_ != NULL) return 0;
|
| if (num_bytes > kBufferSize) num_bytes = kBufferSize;
|
| // In the case of stdout and stderr, OverlappedIO is not supported.
|
| @@ -726,7 +771,6 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
|
| // This code is actually never exposed to the user, as stdout and stderr is
|
| // not available as a RawSocket, but only wrapped in a Socket.
|
| // Note that we return '0', unless a thread have already completed a write.
|
| - MonitorLocker locker(write_monitor_);
|
| if (thread_wrote_ > 0) {
|
| if (num_bytes > thread_wrote_) num_bytes = thread_wrote_;
|
| thread_wrote_ -= num_bytes;
|
| @@ -734,14 +778,14 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
|
| }
|
| if (!write_thread_exists_) {
|
| write_thread_exists_ = true;
|
| - int result = Thread::Start(WriteFileThread,
|
| - reinterpret_cast<uword>(this));
|
| + int result = Thread::Start(
|
| + WriteFileThread, reinterpret_cast<uword>(this));
|
| if (result != 0) {
|
| FATAL1("Failed to start write file thread %d", result);
|
| }
|
| while (!write_thread_running_) {
|
| // Wait until we the thread is running.
|
| - locker.Wait(Monitor::kNoTimeout);
|
| + ml.Wait(Monitor::kNoTimeout);
|
| }
|
| }
|
| // Only queue up to INT_MAX bytes.
|
| @@ -749,19 +793,20 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
|
| // Create buffer and notify thread about the new handle.
|
| pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
|
| pending_write_->Write(buffer, truncated_bytes);
|
| - locker.Notify();
|
| + ml.Notify();
|
| return 0;
|
| }
|
|
|
|
|
| void StdHandle::DoClose() {
|
| - MonitorLocker locker(write_monitor_);
|
| + MonitorLocker ml(monitor_);
|
| if (write_thread_exists_) {
|
| write_thread_running_ = false;
|
| - locker.Notify();
|
| + ml.Notify();
|
| while (write_thread_exists_) {
|
| - locker.Wait(Monitor::kNoTimeout);
|
| + ml.Wait(Monitor::kNoTimeout);
|
| }
|
| + Thread::Join(thread_id_);
|
| }
|
| Handle::DoClose();
|
| }
|
| @@ -807,7 +852,7 @@ void ClientSocket::DoClose() {
|
|
|
|
|
| bool ClientSocket::IssueRead() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| ASSERT(pending_read_ == NULL);
|
|
|
| @@ -836,7 +881,7 @@ bool ClientSocket::IssueRead() {
|
|
|
|
|
| bool ClientSocket::IssueWrite() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| ASSERT(pending_write_ != NULL);
|
| ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
|
| @@ -900,7 +945,7 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
|
|
|
| void ClientSocket::EnsureInitialized(
|
| EventHandlerImplementation* event_handler) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (completion_port_ == INVALID_HANDLE_VALUE) {
|
| ASSERT(event_handler_ == NULL);
|
| event_handler_ = event_handler;
|
| @@ -915,7 +960,7 @@ bool ClientSocket::IsClosed() {
|
|
|
|
|
| bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| ASSERT(pending_write_ != NULL);
|
| ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
|
| @@ -940,7 +985,7 @@ bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
|
|
|
|
|
| bool DatagramSocket::IssueRecvFrom() {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
|
| ASSERT(pending_read_ == NULL);
|
|
|
| @@ -970,7 +1015,7 @@ bool DatagramSocket::IssueRecvFrom() {
|
|
|
| void DatagramSocket::EnsureInitialized(
|
| EventHandlerImplementation* event_handler) {
|
| - ScopedLock lock(this);
|
| + MonitorLocker ml(monitor_);
|
| if (completion_port_ == INVALID_HANDLE_VALUE) {
|
| ASSERT(event_handler_ == NULL);
|
| event_handler_ = event_handler;
|
| @@ -1010,7 +1055,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| reinterpret_cast<ListenSocket*>(handle);
|
| listen_socket->EnsureInitialized(this);
|
|
|
| - Handle::ScopedLock lock(listen_socket);
|
| + MonitorLocker ml(listen_socket->monitor_);
|
|
|
| if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
|
| listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
|
| @@ -1040,7 +1085,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
|
| }
|
| } else {
|
| handle->EnsureInitialized(this);
|
| - Handle::ScopedLock lock(handle);
|
| + MonitorLocker ml(handle->monitor_);
|
|
|
| if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
|
| handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
|
| @@ -1114,7 +1159,7 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
|
| listen_socket->AcceptComplete(buffer, completion_port_);
|
|
|
| {
|
| - Handle::ScopedLock lock(listen_socket);
|
| + MonitorLocker ml(listen_socket->monitor_);
|
| TryDispatchingPendingAccepts(listen_socket);
|
| }
|
|
|
| @@ -1278,6 +1323,8 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
|
|
|
|
|
| EventHandlerImplementation::EventHandlerImplementation() {
|
| + startup_monitor_ = new Monitor();
|
| + handler_thread_id_ = Thread::kInvalidThreadId;
|
| completion_port_ =
|
| CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
|
| if (completion_port_ == NULL) {
|
| @@ -1288,6 +1335,8 @@ EventHandlerImplementation::EventHandlerImplementation() {
|
|
|
|
|
| EventHandlerImplementation::~EventHandlerImplementation() {
|
| + Thread::Join(handler_thread_id_);
|
| + delete startup_monitor_;
|
| CloseHandle(completion_port_);
|
| }
|
|
|
| @@ -1322,6 +1371,12 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
|
| EventHandlerImplementation* handler_impl = &handler->delegate_;
|
| ASSERT(handler_impl != NULL);
|
|
|
| + {
|
| + MonitorLocker ml(handler_impl->startup_monitor_);
|
| + handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
|
| + ml.Notify();
|
| + }
|
| +
|
| while (!handler_impl->shutdown_) {
|
| DWORD bytes;
|
| ULONG_PTR key;
|
| @@ -1382,6 +1437,13 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
|
| FATAL1("Failed to start event handler thread %d", result);
|
| }
|
|
|
| + {
|
| + MonitorLocker ml(startup_monitor_);
|
| + while (handler_thread_id_ == Thread::kInvalidThreadId) {
|
| + ml.Wait();
|
| + }
|
| + }
|
| +
|
| // Initialize Winsock32
|
| if (!Socket::Initialize()) {
|
| FATAL("Failed to initialized Windows sockets");
|
|
|