Index: mojo/message_pump/message_pump_mojo.cc |
diff --git a/mojo/message_pump/message_pump_mojo.cc b/mojo/message_pump/message_pump_mojo.cc |
index 696e083bf76294a5a25bc207d99ddf5470f00b63..3f60dd7116d9744411263ad6452563e148ee0990 100644 |
--- a/mojo/message_pump/message_pump_mojo.cc |
+++ b/mojo/message_pump/message_pump_mojo.cc |
@@ -7,8 +7,10 @@ |
#include <stdint.h> |
#include <algorithm> |
+#include <map> |
#include <vector> |
+#include "base/containers/small_map.h" |
#include "base/debug/alias.h" |
#include "base/lazy_instance.h" |
#include "base/logging.h" |
@@ -16,6 +18,7 @@ |
#include "base/time/time.h" |
#include "mojo/message_pump/message_pump_mojo_handler.h" |
#include "mojo/message_pump/time_helper.h" |
+#include "mojo/public/c/system/wait_set.h" |
namespace mojo { |
namespace common { |
@@ -37,13 +40,6 @@ MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, |
} // namespace |
-// State needed for one iteration of WaitMany. The first handle and flags |
-// corresponds to that of the control pipe. |
-struct MessagePumpMojo::WaitState { |
- std::vector<Handle> handles; |
- std::vector<MojoHandleSignals> wait_signals; |
-}; |
- |
struct MessagePumpMojo::RunState { |
RunState() : should_quit(false) {} |
@@ -61,6 +57,17 @@ MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { |
CHECK_EQ(result, MOJO_RESULT_OK); |
CHECK(read_handle_.is_valid()); |
CHECK(write_handle_.is_valid()); |
+ |
+ MojoHandle handle; |
+ result = MojoCreateWaitSet(&handle); |
+ CHECK_EQ(result, MOJO_RESULT_OK); |
+ wait_set_handle_.reset(Handle(handle)); |
+ CHECK(wait_set_handle_.is_valid()); |
+ |
+ result = |
+ MojoAddHandle(wait_set_handle_.get().value(), read_handle_.get().value(), |
+ MOJO_HANDLE_SIGNAL_READABLE); |
+ CHECK_EQ(result, MOJO_RESULT_OK); |
} |
MessagePumpMojo::~MessagePumpMojo() { |
@@ -96,9 +103,24 @@ void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, |
bool inserted = deadline_handles_.insert(handle).second; |
DCHECK(inserted); |
} |
+ |
+ MojoResult result = MojoAddHandle(wait_set_handle_.get().value(), |
+ handle.value(), wait_signals); |
+ // Because stopping a HandleWatcher is now asynchronous, it's possible for the |
+ // handle to no longer be open at this point. |
+ CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT); |
} |
void MessagePumpMojo::RemoveHandler(const Handle& handle) { |
+ MojoResult result = |
+ MojoRemoveHandle(wait_set_handle_.get().value(), handle.value()); |
+ // At this point, it's possible that the handle has been closed, which would |
+ // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also |
+ // possible for the handle to have already been removed, so all of the |
+ // possible error codes are valid here. |
+ CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND || |
+ result == MOJO_RESULT_INVALID_ARGUMENT); |
+ |
handlers_.erase(handle); |
deadline_handles_.erase(handle); |
} |
@@ -172,51 +194,150 @@ void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { |
} |
bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
- const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; |
- const WaitState wait_state = GetWaitState(); |
- |
- std::vector<MojoHandleSignalsState> states(wait_state.handles.size()); |
- const WaitManyResult wait_many_result = |
- WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states); |
- const MojoResult result = wait_many_result.result; |
- bool did_work = true; |
- if (result == MOJO_RESULT_OK) { |
- if (wait_many_result.index == 0) { |
- if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) { |
- // The Mojo EDK is shutting down. The ThreadQuitHelper task in |
- // base::Thread won't get run since the control pipe depends on the EDK |
- // staying alive. So quit manually to avoid this thread hanging. |
- Quit(); |
- } else { |
- // Control pipe was written to. |
- ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL, |
- MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
- } |
- } else { |
- DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != |
- handlers_.end()); |
- WillSignalHandler(); |
- handlers_[wait_state.handles[wait_many_result.index]] |
- .handler->OnHandleReady(wait_state.handles[wait_many_result.index]); |
- DidSignalHandler(); |
+ bool did_work = block; |
+ if (block) { |
+ // If the wait isn't blocking (deadline == 0), there's no point in waiting. |
+ // Wait sets do not require a wait operation to be performed in order to |
+ // retreive any ready handles. Performing a wait with deadline == 0 is |
+ // unnecessary work. |
+ did_work = WaitForReadyHandles(run_state); |
+ } |
+ |
+ did_work |= ProcessReadyHandles(); |
+ did_work |= RemoveExpiredHandles(); |
+ |
+ return did_work; |
+} |
+ |
+bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const { |
+ const MojoDeadline deadline = GetDeadlineForWait(run_state); |
+ const MojoResult wait_result = Wait( |
+ wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); |
+ if (wait_result == MOJO_RESULT_OK) { |
+ // Handles may be ready. Or not since wake-ups can be spurious in certain |
+ // circumstances. |
+ return true; |
+ } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) { |
+ return false; |
+ } |
+ |
+ base::debug::Alias(&wait_result); |
+ // Unexpected result is likely fatal, crash so we can determine cause. |
+ CHECK(false); |
+ return false; |
+} |
+ |
+bool MessagePumpMojo::ProcessReadyHandles() { |
+ // Maximum number of handles to retrieve and process. Experimentally, the 95th |
+ // percentile is 1 handle, and the long-term average is 1.1. However, this has |
+ // been seen to reach >10 under heavy load. 8 is a hand-wavy compromise. |
+ const uint32_t kMaxServiced = 8; |
+ uint32_t num_ready_handles = kMaxServiced; |
+ MojoHandle handles[kMaxServiced]; |
+ MojoResult handle_results[kMaxServiced]; |
+ |
+ const MojoResult get_result = |
+ MojoGetReadyHandles(wait_set_handle_.get().value(), &num_ready_handles, |
+ handles, handle_results, nullptr); |
+ CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT); |
+ if (get_result != MOJO_RESULT_OK) |
+ return false; |
+ |
+ DCHECK(num_ready_handles); |
+ DCHECK_LE(num_ready_handles, kMaxServiced); |
+ // Do this in two steps, because notifying a handler may remove/add other |
+ // handles that may have also been woken up. |
+ // First, enumerate the IDs of the ready handles. Then, iterate over the |
+ // handles and only take action if the ID hasn't changed. |
+ // Since the size of this map is bounded by |kMaxServiced|, use a SmallMap to |
+ // avoid the per-element allocation. |
+ base::SmallMap<std::map<Handle, int>, kMaxServiced> ready_handles; |
+ for (uint32_t i = 0; i < num_ready_handles; i++) { |
+ const Handle handle = Handle(handles[i]); |
+ // Skip the control handle. It's special. |
+ if (handle.value() == read_handle_.get().value()) |
+ continue; |
+ DCHECK(handle.is_valid()); |
+ const auto it = handlers_.find(handle); |
+ // Skip handles that have been removed. This is possible because |
+ // RemoveHandler() can be called with a handle that has been closed. Because |
+ // the handle is closed, the MojoRemoveHandle() call in RemoveHandler() |
+ // would have failed, but the handle is still in the wait set. Once the |
+ // handle is retrieved using MojoGetReadyHandles(), it is implicitly removed |
+ // from the set. The result is either the pending result that existed when |
+ // the handle was closed, or |MOJO_RESULT_CANCELLED| to indicate that the |
+ // handle was closed. |
+ if (it == handlers_.end()) |
+ continue; |
+ ready_handles[handle] = it->second.id; |
+ } |
+ |
+ for (uint32_t i = 0; i < num_ready_handles; i++) { |
+ const Handle handle = Handle(handles[i]); |
+ |
+ // If the handle has been removed, or it's ID has changed, skip over it. |
+ // If the handle's ID has changed, and it still satisfies its signals, |
+ // then it'll be caught in the next message pump iteration. |
+ const auto it = handlers_.find(handle); |
+ if ((handle.value() != read_handle_.get().value()) && |
+ (it == handlers_.end() || it->second.id != ready_handles[handle])) { |
+ continue; |
} |
- } else { |
- switch (result) { |
+ |
+ switch (handle_results[i]) { |
case MOJO_RESULT_CANCELLED: |
case MOJO_RESULT_FAILED_PRECONDITION: |
- case MOJO_RESULT_INVALID_ARGUMENT: |
- RemoveInvalidHandle(wait_state, result, wait_many_result.index); |
+ DVLOG(1) << "Error: " << handle_results[i] |
+ << " handle: " << handle.value(); |
+ if (handle.value() == read_handle_.get().value()) { |
+ // The Mojo EDK is shutting down. The ThreadQuitHelper task in |
+ // base::Thread won't get run since the control pipe depends on the |
+ // EDK staying alive. So quit manually to avoid this thread hanging. |
+ Quit(); |
+ } else { |
+ RemoveInvalidHandle(handle_results[i], handle); |
+ } |
break; |
- case MOJO_RESULT_DEADLINE_EXCEEDED: |
- did_work = false; |
+ case MOJO_RESULT_OK: |
+ if (handle.value() == read_handle_.get().value()) { |
+ DVLOG(1) << "Signaled control pipe"; |
+ // Control pipe was written to. |
+ ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr, |
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
+ } else { |
+ DVLOG(1) << "Handle ready: " << handle.value(); |
+ SignalHandleReady(handle); |
+ } |
break; |
default: |
- base::debug::Alias(&result); |
+ base::debug::Alias(&i); |
+ base::debug::Alias(&handle_results[i]); |
// Unexpected result is likely fatal, crash so we can determine cause. |
CHECK(false); |
} |
} |
+ return true; |
+} |
+void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) { |
+ // TODO(sky): deal with control pipe going bad. |
+ CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || |
+ result == MOJO_RESULT_CANCELLED || |
+ result == MOJO_RESULT_DEADLINE_EXCEEDED); |
+ // Indicates the control pipe went bad. |
+ CHECK_NE(handle.value(), read_handle_.get().value()); |
+ |
+ auto it = handlers_.find(handle); |
+ CHECK(it != handlers_.end()); |
+ MessagePumpMojoHandler* handler = it->second.handler; |
+ RemoveHandler(handle); |
+ WillSignalHandler(); |
+ handler->OnHandleError(handle, result); |
+ DidSignalHandler(); |
+} |
+ |
+bool MessagePumpMojo::RemoveExpiredHandles() { |
+ bool removed = false; |
// Notify and remove any handlers whose time has expired. First, iterate over |
// the set of handles that have a deadline, and add the expired handles to a |
// map of <Handle, id>. Then, iterate over those expired handles and remove |
@@ -232,40 +353,20 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
if (!it->second.deadline.is_null() && it->second.deadline < now) |
expired_handles[handle] = it->second.id; |
} |
- for (auto& pair : expired_handles) { |
+ for (const auto& pair : expired_handles) { |
auto it = handlers_.find(pair.first); |
// Don't need to check deadline again since it can't change if id hasn't |
// changed. |
if (it != handlers_.end() && it->second.id == pair.second) { |
- MessagePumpMojoHandler* handler = handlers_[pair.first].handler; |
+ MessagePumpMojoHandler* handler = it->second.handler; |
RemoveHandler(pair.first); |
WillSignalHandler(); |
handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED); |
DidSignalHandler(); |
- did_work = true; |
+ removed = true; |
} |
} |
- return did_work; |
-} |
- |
-void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, |
- MojoResult result, |
- uint32_t index) { |
- // TODO(sky): deal with control pipe going bad. |
- CHECK(result == MOJO_RESULT_INVALID_ARGUMENT || |
- result == MOJO_RESULT_FAILED_PRECONDITION || |
- result == MOJO_RESULT_CANCELLED); |
- CHECK_NE(index, 0u); // Indicates the control pipe went bad. |
- |
- // Remove the handle first, this way if OnHandleError() tries to remove the |
- // handle our iterator isn't invalidated. |
- Handle handle = wait_state.handles[index]; |
- CHECK(handlers_.find(handle) != handlers_.end()); |
- MessagePumpMojoHandler* handler = handlers_[handle].handler; |
- RemoveHandler(handle); |
- WillSignalHandler(); |
- handler->OnHandleError(handle, result); |
- DidSignalHandler(); |
+ return removed; |
} |
void MessagePumpMojo::SignalControlPipe() { |
@@ -282,20 +383,6 @@ void MessagePumpMojo::SignalControlPipe() { |
CHECK_EQ(MOJO_RESULT_OK, result); |
} |
-MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const { |
- WaitState wait_state; |
- wait_state.handles.push_back(read_handle_.get()); |
- wait_state.wait_signals.push_back( |
- MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED); |
- |
- for (HandleToHandler::const_iterator i = handlers_.begin(); |
- i != handlers_.end(); ++i) { |
- wait_state.handles.push_back(i->first); |
- wait_state.wait_signals.push_back(i->second.wait_signals); |
- } |
- return wait_state; |
-} |
- |
MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
const RunState& run_state) const { |
const base::TimeTicks now(internal::NowTicks()); |
@@ -310,6 +397,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
return deadline; |
} |
+void MessagePumpMojo::SignalHandleReady(Handle handle) { |
+ DCHECK(handlers_.find(handle) != handlers_.end()); |
+ WillSignalHandler(); |
+ handlers_[handle].handler->OnHandleReady(handle); |
+ DidSignalHandler(); |
+} |
+ |
void MessagePumpMojo::WillSignalHandler() { |
FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); |
} |