| Index: mojo/edk/system/wait_set_dispatcher.cc
|
| diff --git a/mojo/edk/system/wait_set_dispatcher.cc b/mojo/edk/system/wait_set_dispatcher.cc
|
| index 68ace5443302537d87d8aab6fee523a2868b12fb..cfd911f905845af1e4de39b153a7facec0b3ce0e 100644
|
| --- a/mojo/edk/system/wait_set_dispatcher.cc
|
| +++ b/mojo/edk/system/wait_set_dispatcher.cc
|
| @@ -4,12 +4,18 @@
|
|
|
| #include "mojo/edk/system/wait_set_dispatcher.h"
|
|
|
| +#include <string.h>
|
| +
|
| +#include <algorithm>
|
| +#include <limits>
|
| #include <utility>
|
|
|
| #include "base/logging.h"
|
| +#include "mojo/edk/platform/time_ticks.h"
|
| #include "mojo/edk/system/configuration.h"
|
| #include "mojo/edk/system/options_validation.h"
|
|
|
| +using mojo::platform::GetTimeTicks;
|
| using mojo::util::MutexLocker;
|
| using mojo::util::RefPtr;
|
|
|
| @@ -113,6 +119,8 @@ void WaitSetDispatcher::CloseImplNoLock() {
|
| triggered_tail_ = nullptr;
|
| triggered_count_ = 0u;
|
|
|
| + cv_.Signal();
|
| +
|
| // We want to remove the awakables outside the lock, so we have to unlock
|
| // |mutex()|. Note that while unlocked, |Awake()| may get called.
|
| // TODO(vtl): This is pretty terrible, but changing it would require pretty
|
| @@ -230,20 +238,36 @@ MojoResult WaitSetDispatcher::WaitSetRemoveImpl(uint64_t cookie) {
|
| // the entry with this cookie hasn't been added yet.
|
| return MOJO_RESULT_NOT_FOUND;
|
| }
|
| + if (entry->is_being_removed) {
|
| + // This entry is being removed on another thread!
|
| + return MOJO_RESULT_NOT_FOUND;
|
| + }
|
| +
|
| + entry->is_being_removed = true;
|
|
|
| // We'll remove ourself from the target dispatcher's awakable list outside
|
| // the lock.
|
| - dispatcher = std::move(entry->dispatcher);
|
| + dispatcher = entry->dispatcher;
|
| + }
|
| +
|
| + if (dispatcher)
|
| + dispatcher->RemoveAwakable(true, this, cookie, nullptr);
|
|
|
| + {
|
| + MutexLocker locker(&mutex());
|
| +
|
| + if (is_closed_no_lock())
|
| + return MOJO_RESULT_OK;
|
| +
|
| + auto it = entries_.find(cookie);
|
| + DCHECK(it != entries_.end());
|
| + Entry* entry = it->second.get();
|
| + DCHECK(entry->is_being_removed);
|
| if (entry->is_triggered)
|
| RemoveTriggeredNoLock(entry);
|
| -
|
| - // Note: This invalidates |entry|.
|
| - entries_.erase(it);
|
| + entries_.erase(cookie);
|
| }
|
|
|
| - if (dispatcher)
|
| - dispatcher->RemoveAwakable(true, this, cookie, nullptr);
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -256,9 +280,93 @@ MojoResult WaitSetDispatcher::WaitSetWaitImpl(
|
| if (is_closed_no_lock())
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - // TODO(vtl)
|
| - NOTIMPLEMENTED();
|
| - return MOJO_RESULT_UNIMPLEMENTED;
|
| + // Read this before waiting. (If we're going to crash due to reading input
|
| + // values, we'd like to do so before waiting.)
|
| + uint32_t num_results_in = num_results.Get();
|
| +
|
| + if (deadline == MOJO_DEADLINE_INDEFINITE) {
|
| + while (!is_closed_no_lock() && triggered_count_ == 0u)
|
| + cv_.Wait(&mutex());
|
| + } else {
|
| + // We may get spurious wakeups, so record the start time and track the
|
| + // remaining timeout.
|
| + uint64_t wait_remaining = deadline;
|
| + MojoTimeTicks start = GetTimeTicks();
|
| + while (!is_closed_no_lock() && triggered_count_ == 0u) {
|
| + // NOTE(vtl): Possibly, we should add a version of |WaitWithTimeout()|
|
| + // that takes an absolute deadline, since that's what pthreads takes.
|
| + if (cv_.WaitWithTimeout(&mutex(), wait_remaining))
|
| + return MOJO_RESULT_DEADLINE_EXCEEDED; // Definitely timed out.
|
| +
|
| + MojoTimeTicks now = GetTimeTicks();
|
| + DCHECK_GE(now, start);
|
| + uint64_t elapsed = static_cast<uint64_t>(now - start);
|
| + // It's possible that the deadline has passed anyway.
|
| + if (elapsed >= deadline)
|
| + return MOJO_RESULT_DEADLINE_EXCEEDED;
|
| +
|
| + // Otherwise, recalculate the amount that we have left to wait.
|
| + wait_remaining = deadline - elapsed;
|
| + }
|
| + }
|
| + if (is_closed_no_lock())
|
| + return MOJO_RESULT_CANCELLED;
|
| + DCHECK_GT(triggered_count_, 0u);
|
| +
|
| + uint32_t num_results_out =
|
| + static_cast<uint32_t>(std::min<size_t>(num_results_in, triggered_count_));
|
| + std::vector<MojoWaitSetResult> results_out(num_results_out);
|
| + if (num_results_out > 0u) {
|
| + // We're going to copy out all this memory, so we should make sure it's all
|
| + // been zeroed.
|
| + memset(results_out.data(), 0,
|
| + results_out.size() * sizeof(MojoWaitSetResult));
|
| +
|
| + const Entry* entry = triggered_head_;
|
| + for (auto& wait_set_result : results_out) {
|
| + DCHECK(entry);
|
| + DCHECK(entry->is_triggered);
|
| +
|
| + wait_set_result.cookie = entry->cookie;
|
| + // |wait_set_result.reserved| has already been zeroed, as has
|
| + // |wait_set_result.signals_state| (for the cases below where we don't set
|
| + // it explicitly).
|
| + wait_set_result.reserved = 0u;
|
| + if (!entry->dispatcher) {
|
| + wait_set_result.wait_result = MOJO_RESULT_CANCELLED;
|
| + } else if (entry->signals_state.satisfies(entry->signals)) {
|
| + wait_set_result.wait_result = MOJO_RESULT_OK;
|
| + wait_set_result.signals_state = entry->signals_state;
|
| + } else if (!entry->signals_state.can_satisfy(entry->signals)) {
|
| + wait_set_result.wait_result = MOJO_RESULT_FAILED_PRECONDITION;
|
| + wait_set_result.signals_state = entry->signals_state;
|
| + } else {
|
| + NOTREACHED();
|
| + wait_set_result.wait_result = MOJO_RESULT_INTERNAL;
|
| + }
|
| + // TODO(vtl): The comment in mojo/public/c/system/wait_set.h indicates
|
| + // that we may have to provide |MOJO_RESULT_BUSY|, but we never do that
|
| + // here. Is that right or am I missing something?
|
| +
|
| + entry = entry->triggered_next;
|
| + }
|
| + }
|
| + uint32_t max_results_out = static_cast<uint32_t>(
|
| + std::min<size_t>(std::numeric_limits<uint32_t>::max(), triggered_count_));
|
| +
|
| + DCHECK_LE(num_results_out, num_results_in);
|
| + num_results.Put(num_results_out);
|
| + if (num_results_out > 0u) {
|
| + DCHECK_EQ(num_results_out, results_out.size());
|
| + results.PutArray(results_out.data(), results_out.size());
|
| + } else {
|
| + // We were awoken and didn't time out, so the only reason we should be
|
| + // providing no results is if none were requested.
|
| + DCHECK_EQ(num_results_in, 0u);
|
| + }
|
| + if (!max_results.IsNull())
|
| + max_results.Put(max_results_out);
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| void WaitSetDispatcher::Awake(uint64_t context,
|
| @@ -278,6 +386,9 @@ void WaitSetDispatcher::Awake(uint64_t context,
|
| const auto& entry = it->second;
|
| // Once we get "cancelled", we should never be awoken again.
|
| DCHECK(entry->dispatcher);
|
| + if (entry->is_being_removed)
|
| + return;
|
| +
|
| switch (reason) {
|
| case AwakeReason::CANCELLED:
|
| if (!entry->is_triggered)
|
| @@ -313,6 +424,8 @@ void WaitSetDispatcher::AddTriggeredNoLock(Entry* entry) {
|
| DCHECK(!entry->triggered_next);
|
|
|
| entry->is_triggered = true;
|
| + if (!triggered_count_)
|
| + cv_.Signal();
|
| triggered_count_++;
|
|
|
| if (!triggered_tail_) {
|
|
|