Index: mojo/public/cpp/system/wait_set.cc |
diff --git a/mojo/public/cpp/system/wait_set.cc b/mojo/public/cpp/system/wait_set.cc |
index 410ec7490706c7b3738521edebf1508cfe6e0d79..1728f81b95e0d8cee496509f27dbb3951487357c 100644 |
--- a/mojo/public/cpp/system/wait_set.cc |
+++ b/mojo/public/cpp/system/wait_set.cc |
@@ -7,6 +7,7 @@ |
#include <algorithm> |
#include <limits> |
#include <map> |
+#include <set> |
#include <vector> |
#include "base/containers/stack_container.h" |
@@ -33,6 +34,21 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
watcher_handle_.reset(); |
} |
+ MojoResult AddEvent(base::WaitableEvent* event) { |
+ auto result = user_events_.insert(event); |
+ if (result.second) |
+ return MOJO_RESULT_OK; |
+ return MOJO_RESULT_ALREADY_EXISTS; |
+ } |
+ |
+ MojoResult RemoveEvent(base::WaitableEvent* event) { |
+ auto it = user_events_.find(event); |
+ if (it == user_events_.end()) |
+ return MOJO_RESULT_NOT_FOUND; |
+ user_events_.erase(it); |
+ return MOJO_RESULT_OK; |
+ } |
+ |
MojoResult AddHandle(Handle handle, MojoHandleSignals signals) { |
DCHECK(watcher_handle_.is_valid()); |
@@ -110,7 +126,8 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
return rv; |
} |
- void Wait(size_t* num_ready_handles, |
+ void Wait(base::WaitableEvent** ready_event, |
+ size_t* num_ready_handles, |
Handle* ready_handles, |
MojoResult* ready_results, |
MojoHandleSignalsState* signals_states) { |
@@ -118,7 +135,6 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
DCHECK(num_ready_handles); |
DCHECK(ready_handles); |
DCHECK(ready_results); |
- bool should_wait = false; |
{ |
base::AutoLock lock(lock_); |
if (ready_handles_.empty()) { |
@@ -128,6 +144,7 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max()); |
uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles); |
+ |
base::StackVector<uintptr_t, 4> ready_contexts; |
ready_contexts.container().resize(num_ready_contexts); |
base::StackVector<MojoHandleSignalsState, 4> ready_states; |
@@ -144,36 +161,53 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
ready_contexts.container().data(), ready_results, out_states); |
if (rv == MOJO_RESULT_FAILED_PRECONDITION) { |
- // Can't arm because one or more handles is already ready. |
- *num_ready_handles = num_ready_contexts; |
+ // Simulate the handles becoming ready. We do this in lieu of |
+ // returning the results immediately so as to avoid potentially |
+ // starving user events. i.e., we always want to call WaitMany() |
+ // below. |
+ handle_event_.Signal(); |
for (size_t i = 0; i < num_ready_contexts; ++i) { |
auto it = contexts_.find(ready_contexts.container()[i]); |
DCHECK(it != contexts_.end()); |
- ready_handles[i] = it->second->handle(); |
+ ready_handles_[it->second->handle()] = {ready_results[i], |
+ out_states[i]}; |
} |
- return; |
+ } else if (rv == MOJO_RESULT_NOT_FOUND) { |
+ // Nothing to watch. If there are no user events, always signal to |
+ // avoid deadlock. |
+ if (user_events_.empty()) |
+ handle_event_.Signal(); |
+ } else { |
+ // Watcher must be armed now. No need to manually signal. |
+ DCHECK_EQ(MOJO_RESULT_OK, rv); |
} |
- |
- if (rv == MOJO_RESULT_NOT_FOUND) { |
- // There are no handles in the set. Nothing to watch. |
- *num_ready_handles = 0; |
- return; |
- } |
- |
- // Watcher is armed. We can go on waiting for an event to signal. |
- DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS); |
- should_wait = true; |
} |
} |
- if (should_wait) |
- handle_event_.Wait(); |
+ // Build a local contiguous array of events to wait on. These are rotated |
+ // across Wait() calls to avoid starvation, by virtue of the fact that |
+ // WaitMany guarantees left-to-right priority when multiple events are |
+ // signaled. |
+ |
+ base::StackVector<base::WaitableEvent*, 4> events; |
+ events.container().resize(user_events_.size() + 1); |
+ if (waitable_index_shift_ > user_events_.size()) |
+ waitable_index_shift_ = 0; |
+ |
+ size_t dest_index = waitable_index_shift_++; |
+ events.container()[dest_index] = &handle_event_; |
+ for (auto* e : user_events_) { |
+ dest_index = (dest_index + 1) % events.container().size(); |
+ events.container()[dest_index] = e; |
+ } |
+ size_t index = base::WaitableEvent::WaitMany(events.container().data(), |
+ events.container().size()); |
base::AutoLock lock(lock_); |
- DCHECK(!ready_handles_.empty()); |
- |
- // Pop as many handles as we can out of the ready set and return them. |
+ // Pop as many handles as we can out of the ready set and return them. Note |
+ // that we do this regardless of which event signaled, as there may be |
+ // ready handles in any case and they may be interesting to the caller. |
*num_ready_handles = std::min(*num_ready_handles, ready_handles_.size()); |
for (size_t i = 0; i < *num_ready_handles; ++i) { |
auto it = ready_handles_.begin(); |
@@ -183,6 +217,14 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
signals_states[i] = it->second.signals_state; |
ready_handles_.erase(it); |
} |
+ |
+ // If the caller cares, let them know which user event unblocked us, if any. |
+ if (ready_event) { |
+ if (events.container()[index] == &handle_event_) |
+ *ready_event = nullptr; |
+ else |
+ *ready_event = events.container()[index]; |
+ } |
} |
private: |
@@ -282,10 +324,16 @@ class WaitSet::State : public base::RefCountedThreadSafe<State> { |
std::map<Handle, scoped_refptr<Context>> handle_to_context_; |
std::map<Handle, ReadyState> ready_handles_; |
std::vector<scoped_refptr<Context>> cancelled_contexts_; |
+ std::set<base::WaitableEvent*> user_events_; |
// Event signaled any time a handle notification is received. |
base::WaitableEvent handle_event_; |
+ // Offset by which to rotate the current set of waitable objects. This is used |
+ // to guard against event starvation, as base::WaitableEvent::WaitMany gives |
+ // preference to events in left-to-right order. |
+ size_t waitable_index_shift_ = 0; |
+ |
DISALLOW_COPY_AND_ASSIGN(State); |
}; |
@@ -295,6 +343,14 @@ WaitSet::~WaitSet() { |
state_->ShutDown(); |
} |
+MojoResult WaitSet::AddEvent(base::WaitableEvent* event) { |
+ return state_->AddEvent(event); |
+} |
+ |
+MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) { |
+ return state_->RemoveEvent(event); |
+} |
+ |
MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) { |
return state_->AddHandle(handle, signals); |
} |
@@ -303,11 +359,13 @@ MojoResult WaitSet::RemoveHandle(Handle handle) { |
return state_->RemoveHandle(handle); |
} |
-void WaitSet::Wait(size_t* num_ready_handles, |
+void WaitSet::Wait(base::WaitableEvent** ready_event, |
+ size_t* num_ready_handles, |
Handle* ready_handles, |
MojoResult* ready_results, |
MojoHandleSignalsState* signals_states) { |
- state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states); |
+ state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results, |
+ signals_states); |
} |
} // namespace mojo |