Chromium Code Reviews| Index: mojo/public/cpp/system/wait_set.cc |
| diff --git a/mojo/public/cpp/system/wait_set.cc b/mojo/public/cpp/system/wait_set.cc |
| index 410ec7490706c7b3738521edebf1508cfe6e0d79..ccc4230de467da45d194f75082336991789e4d10 100644 |
| --- a/mojo/public/cpp/system/wait_set.cc |
| +++ b/mojo/public/cpp/system/wait_set.cc |
| @@ -7,6 +7,7 @@ |
| #include <algorithm> |
| #include <limits> |
| #include <map> |
| +#include <set> |
| #include <vector> |
| #include "base/containers/stack_container.h" |
| @@ -33,6 +34,21 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| watcher_handle_.reset(); |
| } |
| + MojoResult AddEvent(base::WaitableEvent* event) { |
| + auto result = user_events_.insert(event); |
| + if (result.second) |
| + return MOJO_RESULT_OK; |
| + return MOJO_RESULT_ALREADY_EXISTS; |
| + } |
| + |
| + MojoResult RemoveEvent(base::WaitableEvent* event) { |
| + auto it = user_events_.find(event); |
| + if (it == user_events_.end()) |
| + return MOJO_RESULT_NOT_FOUND; |
| + user_events_.erase(it); |
| + return MOJO_RESULT_OK; |
| + } |
| + |
| MojoResult AddHandle(Handle handle, MojoHandleSignals signals) { |
| DCHECK(watcher_handle_.is_valid()); |
| @@ -110,7 +126,8 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| return rv; |
| } |
| - void Wait(size_t* num_ready_handles, |
| + void Wait(base::WaitableEvent** ready_event, |
| + size_t* num_ready_handles, |
| Handle* ready_handles, |
| MojoResult* ready_results, |
| MojoHandleSignalsState* signals_states) { |
| @@ -118,7 +135,6 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| DCHECK(num_ready_handles); |
| DCHECK(ready_handles); |
| DCHECK(ready_results); |
| - bool should_wait = false; |
| { |
| base::AutoLock lock(lock_); |
| if (ready_handles_.empty()) { |
| @@ -128,6 +144,7 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max()); |
| uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles); |
| + |
| base::StackVector<uintptr_t, 4> ready_contexts; |
| ready_contexts.container().resize(num_ready_contexts); |
| base::StackVector<MojoHandleSignalsState, 4> ready_states; |
| @@ -144,36 +161,53 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| ready_contexts.container().data(), ready_results, out_states); |
| if (rv == MOJO_RESULT_FAILED_PRECONDITION) { |
| - // Can't arm because one or more handles is already ready. |
| - *num_ready_handles = num_ready_contexts; |
| + // Simulate the handles becoming ready. We do this in lieu of |
| + // returning the results immediately so as to not avoid potentially |
|
yzshen1
2017/03/23 20:15:50
nit: "not avoid" -> "avoid"?
Ken Rockot(use gerrit already)
2017/03/23 22:04:20
Done
|
| + // starving user events. i.e., we always want to call WaitMany() |
| + // below. |
| + handle_event_.Signal(); |
| for (size_t i = 0; i < num_ready_contexts; ++i) { |
| auto it = contexts_.find(ready_contexts.container()[i]); |
| DCHECK(it != contexts_.end()); |
| - ready_handles[i] = it->second->handle(); |
| + ready_handles_[it->second->handle()] = {ready_results[i], |
| + out_states[i]}; |
| } |
| - return; |
| + } else if (rv == MOJO_RESULT_NOT_FOUND) { |
| + // Nothing to watch. If there are no user events, always signal to |
| + // avoid deadlock. |
| + if (user_events_.empty()) |
| + handle_event_.Signal(); |
| + } else { |
| + // Watcher must be armed now. No need to manually signal. |
| + DCHECK_EQ(MOJO_RESULT_OK, rv); |
| } |
| - |
| - if (rv == MOJO_RESULT_NOT_FOUND) { |
| - // There are no handles in the set. Nothing to watch. |
| - *num_ready_handles = 0; |
| - return; |
| - } |
| - |
| - // Watcher is armed. We can go on waiting for an event to signal. |
| - DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS); |
| - should_wait = true; |
| } |
| } |
| - if (should_wait) |
| - handle_event_.Wait(); |
| + // Build a local contiguous array of events to wait on. These are rotated |
| + // across Wait() calls to avoid starvation, by virtue of the fact that |
| + // WaitMany guarantees left-to-right priority when multiple events are |
| + // signaled. |
| + |
| + base::StackVector<base::WaitableEvent*, 4> events; |
| + events.container().resize(user_events_.size() + 1); |
| + if (waitable_index_shift_ > user_events_.size()) |
| + waitable_index_shift_ = 0; |
| + |
| + size_t dest_index = waitable_index_shift_++; |
| + events.container()[dest_index] = &handle_event_; |
| + for (auto* e : user_events_) { |
| + dest_index = (dest_index + 1) % events.container().size(); |
| + events.container()[dest_index] = e; |
| + } |
| + size_t index = base::WaitableEvent::WaitMany(events.container().data(), |
| + events.container().size()); |
| base::AutoLock lock(lock_); |
| - DCHECK(!ready_handles_.empty()); |
| - |
| - // Pop as many handles as we can out of the ready set and return them. |
| + // Pop as many handles as we can out of the ready set and return them. Note |
| + // that we do this regardless of which event signaled, as there may be |
| + // ready handles in any case and they may be interesting to the caller. |
| *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size()); |
| for (size_t i = 0; i < *num_ready_handles; ++i) { |
| auto it = ready_handles_.begin(); |
| @@ -183,6 +217,14 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| signals_states[i] = it->second.signals_state; |
| ready_handles_.erase(it); |
| } |
| + |
| + // If the caller cares, let them know which user event unblocked us, if any. |
| + if (ready_event) { |
| + if (events.container()[index] == &handle_event_) |
| + *ready_event = nullptr; |
| + else |
| + *ready_event = events.container()[index]; |
| + } |
| } |
| private: |
| @@ -282,10 +324,16 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
| std::map<Handle, scoped_refptr<Context>> handle_to_context_; |
| std::map<Handle, ReadyState> ready_handles_; |
| std::vector<scoped_refptr<Context>> cancelled_contexts_; |
| + std::set<base::WaitableEvent*> user_events_; |
| // Event signaled any time a handle notification is received. |
| base::WaitableEvent handle_event_; |
| + // Offset by which to rotate the current set of waitable objects. This is used |
| + // to guard against event starvation, as base::WaitableEvent::WaitMany gives |
| + // preference to events in left-to-right order. |
| + size_t waitable_index_shift_ = 0; |
| + |
| DISALLOW_COPY_AND_ASSIGN(State); |
| }; |
| @@ -295,6 +343,14 @@ WaitSet::~WaitSet() { |
| state_->ShutDown(); |
| } |
| +MojoResult WaitSet::AddEvent(base::WaitableEvent* event) { |
| + return state_->AddEvent(event); |
| +} |
| + |
| +MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) { |
| + return state_->RemoveEvent(event); |
| +} |
| + |
| MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) { |
| return state_->AddHandle(handle, signals); |
| } |
| @@ -303,11 +359,13 @@ MojoResult WaitSet::RemoveHandle(Handle handle) { |
| return state_->RemoveHandle(handle); |
| } |
| -void WaitSet::Wait(size_t* num_ready_handles, |
| +void WaitSet::Wait(base::WaitableEvent** ready_event, |
| + size_t* num_ready_handles, |
| Handle* ready_handles, |
| MojoResult* ready_results, |
| MojoHandleSignalsState* signals_states) { |
| - state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states); |
| + state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results, |
| + signals_states); |
| } |
| } // namespace mojo |