| Index: mojo/public/cpp/system/wait_set.cc
|
| diff --git a/mojo/public/cpp/system/wait_set.cc b/mojo/public/cpp/system/wait_set.cc
|
| index 410ec7490706c7b3738521edebf1508cfe6e0d79..1728f81b95e0d8cee496509f27dbb3951487357c 100644
|
| --- a/mojo/public/cpp/system/wait_set.cc
|
| +++ b/mojo/public/cpp/system/wait_set.cc
|
| @@ -7,6 +7,7 @@
|
| #include <algorithm>
|
| #include <limits>
|
| #include <map>
|
| +#include <set>
|
| #include <vector>
|
|
|
| #include "base/containers/stack_container.h"
|
| @@ -33,6 +34,21 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
| watcher_handle_.reset();
|
| }
|
|
|
| + MojoResult AddEvent(base::WaitableEvent* event) {
|
| + auto result = user_events_.insert(event);
|
| + if (result.second)
|
| + return MOJO_RESULT_OK;
|
| + return MOJO_RESULT_ALREADY_EXISTS;
|
| + }
|
| +
|
| + MojoResult RemoveEvent(base::WaitableEvent* event) {
|
| + auto it = user_events_.find(event);
|
| + if (it == user_events_.end())
|
| + return MOJO_RESULT_NOT_FOUND;
|
| + user_events_.erase(it);
|
| + return MOJO_RESULT_OK;
|
| + }
|
| +
|
| MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
|
| DCHECK(watcher_handle_.is_valid());
|
|
|
| @@ -110,7 +126,8 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
| return rv;
|
| }
|
|
|
| - void Wait(size_t* num_ready_handles,
|
| + void Wait(base::WaitableEvent** ready_event,
|
| + size_t* num_ready_handles,
|
| Handle* ready_handles,
|
| MojoResult* ready_results,
|
| MojoHandleSignalsState* signals_states) {
|
| @@ -118,7 +135,6 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
| DCHECK(num_ready_handles);
|
| DCHECK(ready_handles);
|
| DCHECK(ready_results);
|
| - bool should_wait = false;
|
| {
|
| base::AutoLock lock(lock_);
|
| if (ready_handles_.empty()) {
|
| @@ -128,6 +144,7 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
|
|
| DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
|
| uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles);
|
| +
|
| base::StackVector<uintptr_t, 4> ready_contexts;
|
| ready_contexts.container().resize(num_ready_contexts);
|
| base::StackVector<MojoHandleSignalsState, 4> ready_states;
|
| @@ -144,36 +161,53 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
| ready_contexts.container().data(), ready_results, out_states);
|
|
|
| if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
|
| - // Can't arm because one or more handles is already ready.
|
| - *num_ready_handles = num_ready_contexts;
|
| + // Simulate the handles becoming ready. We do this in lieu of
|
| + // returning the results immediately so as to avoid potentially
|
| + // starving user events. i.e., we always want to call WaitMany()
|
| + // below.
|
| + handle_event_.Signal();
|
| for (size_t i = 0; i < num_ready_contexts; ++i) {
|
| auto it = contexts_.find(ready_contexts.container()[i]);
|
| DCHECK(it != contexts_.end());
|
| - ready_handles[i] = it->second->handle();
|
| + ready_handles_[it->second->handle()] = {ready_results[i],
|
| + out_states[i]};
|
| }
|
| - return;
|
| + } else if (rv == MOJO_RESULT_NOT_FOUND) {
|
| + // Nothing to watch. If there are no user events, always signal to
|
| + // avoid deadlock.
|
| + if (user_events_.empty())
|
| + handle_event_.Signal();
|
| + } else {
|
| + // Watcher must be armed now. No need to manually signal.
|
| + DCHECK_EQ(MOJO_RESULT_OK, rv);
|
| }
|
| -
|
| - if (rv == MOJO_RESULT_NOT_FOUND) {
|
| - // There are no handles in the set. Nothing to watch.
|
| - *num_ready_handles = 0;
|
| - return;
|
| - }
|
| -
|
| - // Watcher is armed. We can go on waiting for an event to signal.
|
| - DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS);
|
| - should_wait = true;
|
| }
|
| }
|
|
|
| - if (should_wait)
|
| - handle_event_.Wait();
|
| + // Build a local contiguous array of events to wait on. These are rotated
|
| + // across Wait() calls to avoid starvation, by virtue of the fact that
|
| + // WaitMany guarantees left-to-right priority when multiple events are
|
| + // signaled.
|
| +
|
| + base::StackVector<base::WaitableEvent*, 4> events;
|
| + events.container().resize(user_events_.size() + 1);
|
| + if (waitable_index_shift_ > user_events_.size())
|
| + waitable_index_shift_ = 0;
|
| +
|
| + size_t dest_index = waitable_index_shift_++;
|
| + events.container()[dest_index] = &handle_event_;
|
| + for (auto* e : user_events_) {
|
| + dest_index = (dest_index + 1) % events.container().size();
|
| + events.container()[dest_index] = e;
|
| + }
|
|
|
| + size_t index = base::WaitableEvent::WaitMany(events.container().data(),
|
| + events.container().size());
|
| base::AutoLock lock(lock_);
|
|
|
| - DCHECK(!ready_handles_.empty());
|
| -
|
| - // Pop as many handles as we can out of the ready set and return them.
|
| + // Pop as many handles as we can out of the ready set and return them. Note
|
| + // that we do this regardless of which event signaled, as there may be
|
| + // ready handles in any case and they may be interesting to the caller.
|
| *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
|
| for (size_t i = 0; i < *num_ready_handles; ++i) {
|
| auto it = ready_handles_.begin();
|
| @@ -183,6 +217,14 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
| signals_states[i] = it->second.signals_state;
|
| ready_handles_.erase(it);
|
| }
|
| +
|
| + // If the caller cares, let them know which user event unblocked us, if any.
|
| + if (ready_event) {
|
| + if (events.container()[index] == &handle_event_)
|
| + *ready_event = nullptr;
|
| + else
|
| + *ready_event = events.container()[index];
|
| + }
|
| }
|
|
|
| private:
|
| @@ -282,10 +324,16 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> {
|
| std::map<Handle, scoped_refptr<Context>> handle_to_context_;
|
| std::map<Handle, ReadyState> ready_handles_;
|
| std::vector<scoped_refptr<Context>> cancelled_contexts_;
|
| + std::set<base::WaitableEvent*> user_events_;
|
|
|
| // Event signaled any time a handle notification is received.
|
| base::WaitableEvent handle_event_;
|
|
|
| + // Offset by which to rotate the current set of waitable objects. This is used
|
| + // to guard against event starvation, as base::WaitableEvent::WaitMany gives
|
| + // preference to events in left-to-right order.
|
| + size_t waitable_index_shift_ = 0;
|
| +
|
| DISALLOW_COPY_AND_ASSIGN(State);
|
| };
|
|
|
| @@ -295,6 +343,14 @@ WaitSet::~WaitSet() {
|
| state_->ShutDown();
|
| }
|
|
|
| +MojoResult WaitSet::AddEvent(base::WaitableEvent* event) {
|
| + return state_->AddEvent(event);
|
| +}
|
| +
|
| +MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) {
|
| + return state_->RemoveEvent(event);
|
| +}
|
| +
|
| MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
|
| return state_->AddHandle(handle, signals);
|
| }
|
| @@ -303,11 +359,13 @@ MojoResult WaitSet::RemoveHandle(Handle handle) {
|
| return state_->RemoveHandle(handle);
|
| }
|
|
|
| -void WaitSet::Wait(size_t* num_ready_handles,
|
| +void WaitSet::Wait(base::WaitableEvent** ready_event,
|
| + size_t* num_ready_handles,
|
| Handle* ready_handles,
|
| MojoResult* ready_results,
|
| MojoHandleSignalsState* signals_states) {
|
| - state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states);
|
| + state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results,
|
| + signals_states);
|
| }
|
|
|
| } // namespace mojo
|
|
|