| Index: mojo/message_pump/message_pump_mojo.cc
|
| diff --git a/mojo/message_pump/message_pump_mojo.cc b/mojo/message_pump/message_pump_mojo.cc
|
| index 696e083bf76294a5a25bc207d99ddf5470f00b63..3f60dd7116d9744411263ad6452563e148ee0990 100644
|
| --- a/mojo/message_pump/message_pump_mojo.cc
|
| +++ b/mojo/message_pump/message_pump_mojo.cc
|
| @@ -7,8 +7,10 @@
|
| #include <stdint.h>
|
|
|
| #include <algorithm>
|
| +#include <map>
|
| #include <vector>
|
|
|
| +#include "base/containers/small_map.h"
|
| #include "base/debug/alias.h"
|
| #include "base/lazy_instance.h"
|
| #include "base/logging.h"
|
| @@ -16,6 +18,7 @@
|
| #include "base/time/time.h"
|
| #include "mojo/message_pump/message_pump_mojo_handler.h"
|
| #include "mojo/message_pump/time_helper.h"
|
| +#include "mojo/public/c/system/wait_set.h"
|
|
|
| namespace mojo {
|
| namespace common {
|
| @@ -37,13 +40,6 @@ MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
|
|
|
| } // namespace
|
|
|
| -// State needed for one iteration of WaitMany. The first handle and flags
|
| -// corresponds to that of the control pipe.
|
| -struct MessagePumpMojo::WaitState {
|
| - std::vector<Handle> handles;
|
| - std::vector<MojoHandleSignals> wait_signals;
|
| -};
|
| -
|
| struct MessagePumpMojo::RunState {
|
| RunState() : should_quit(false) {}
|
|
|
| @@ -61,6 +57,17 @@ MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
|
| CHECK_EQ(result, MOJO_RESULT_OK);
|
| CHECK(read_handle_.is_valid());
|
| CHECK(write_handle_.is_valid());
|
| +
|
| + MojoHandle handle;
|
| + result = MojoCreateWaitSet(&handle);
|
| + CHECK_EQ(result, MOJO_RESULT_OK);
|
| + wait_set_handle_.reset(Handle(handle));
|
| + CHECK(wait_set_handle_.is_valid());
|
| +
|
| + result =
|
| + MojoAddHandle(wait_set_handle_.get().value(), read_handle_.get().value(),
|
| + MOJO_HANDLE_SIGNAL_READABLE);
|
| + CHECK_EQ(result, MOJO_RESULT_OK);
|
| }
|
|
|
| MessagePumpMojo::~MessagePumpMojo() {
|
| @@ -96,9 +103,24 @@ void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
|
| bool inserted = deadline_handles_.insert(handle).second;
|
| DCHECK(inserted);
|
| }
|
| +
|
| + MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
|
| + handle.value(), wait_signals);
|
| + // Because stopping a HandleWatcher is now asynchronous, it's possible for the
|
| + // handle to no longer be open at this point.
|
| + CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT);
|
| }
|
|
|
| void MessagePumpMojo::RemoveHandler(const Handle& handle) {
|
| + MojoResult result =
|
| + MojoRemoveHandle(wait_set_handle_.get().value(), handle.value());
|
| + // At this point, it's possible that the handle has been closed, which would
|
| + // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also
|
| + // possible for the handle to have already been removed, so all of the
|
| + // possible error codes are valid here.
|
| + CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND ||
|
| + result == MOJO_RESULT_INVALID_ARGUMENT);
|
| +
|
| handlers_.erase(handle);
|
| deadline_handles_.erase(handle);
|
| }
|
| @@ -172,51 +194,150 @@ void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
|
| }
|
|
|
| bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
|
| - const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
|
| - const WaitState wait_state = GetWaitState();
|
| -
|
| - std::vector<MojoHandleSignalsState> states(wait_state.handles.size());
|
| - const WaitManyResult wait_many_result =
|
| - WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states);
|
| - const MojoResult result = wait_many_result.result;
|
| - bool did_work = true;
|
| - if (result == MOJO_RESULT_OK) {
|
| - if (wait_many_result.index == 0) {
|
| - if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) {
|
| - // The Mojo EDK is shutting down. The ThreadQuitHelper task in
|
| - // base::Thread won't get run since the control pipe depends on the EDK
|
| - // staying alive. So quit manually to avoid this thread hanging.
|
| - Quit();
|
| - } else {
|
| - // Control pipe was written to.
|
| - ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
|
| - MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
|
| - }
|
| - } else {
|
| - DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) !=
|
| - handlers_.end());
|
| - WillSignalHandler();
|
| - handlers_[wait_state.handles[wait_many_result.index]]
|
| - .handler->OnHandleReady(wait_state.handles[wait_many_result.index]);
|
| - DidSignalHandler();
|
| + bool did_work = block;
|
| + if (block) {
|
| + // If the wait isn't blocking (deadline == 0), there's no point in waiting.
|
| + // Wait sets do not require a wait operation to be performed in order to
|
| + // retreive any ready handles. Performing a wait with deadline == 0 is
|
| + // unnecessary work.
|
| + did_work = WaitForReadyHandles(run_state);
|
| + }
|
| +
|
| + did_work |= ProcessReadyHandles();
|
| + did_work |= RemoveExpiredHandles();
|
| +
|
| + return did_work;
|
| +}
|
| +
|
| +bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const {
|
| + const MojoDeadline deadline = GetDeadlineForWait(run_state);
|
| + const MojoResult wait_result = Wait(
|
| + wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
|
| + if (wait_result == MOJO_RESULT_OK) {
|
| + // Handles may be ready. Or not since wake-ups can be spurious in certain
|
| + // circumstances.
|
| + return true;
|
| + } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
|
| + return false;
|
| + }
|
| +
|
| + base::debug::Alias(&wait_result);
|
| + // Unexpected result is likely fatal, crash so we can determine cause.
|
| + CHECK(false);
|
| + return false;
|
| +}
|
| +
|
| +bool MessagePumpMojo::ProcessReadyHandles() {
|
| + // Maximum number of handles to retrieve and process. Experimentally, the 95th
|
| + // percentile is 1 handle, and the long-term average is 1.1. However, this has
|
| + // been seen to reach >10 under heavy load. 8 is a hand-wavy compromise.
|
| + const uint32_t kMaxServiced = 8;
|
| + uint32_t num_ready_handles = kMaxServiced;
|
| + MojoHandle handles[kMaxServiced];
|
| + MojoResult handle_results[kMaxServiced];
|
| +
|
| + const MojoResult get_result =
|
| + MojoGetReadyHandles(wait_set_handle_.get().value(), &num_ready_handles,
|
| + handles, handle_results, nullptr);
|
| + CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT);
|
| + if (get_result != MOJO_RESULT_OK)
|
| + return false;
|
| +
|
| + DCHECK(num_ready_handles);
|
| + DCHECK_LE(num_ready_handles, kMaxServiced);
|
| + // Do this in two steps, because notifying a handler may remove/add other
|
| + // handles that may have also been woken up.
|
| + // First, enumerate the IDs of the ready handles. Then, iterate over the
|
| + // handles and only take action if the ID hasn't changed.
|
| + // Since the size of this map is bounded by |kMaxServiced|, use a SmallMap to
|
| + // avoid the per-element allocation.
|
| + base::SmallMap<std::map<Handle, int>, kMaxServiced> ready_handles;
|
| + for (uint32_t i = 0; i < num_ready_handles; i++) {
|
| + const Handle handle = Handle(handles[i]);
|
| + // Skip the control handle. It's special.
|
| + if (handle.value() == read_handle_.get().value())
|
| + continue;
|
| + DCHECK(handle.is_valid());
|
| + const auto it = handlers_.find(handle);
|
| + // Skip handles that have been removed. This is possible because
|
| + // RemoveHandler() can be called with a handle that has been closed. Because
|
| + // the handle is closed, the MojoRemoveHandle() call in RemoveHandler()
|
| + // would have failed, but the handle is still in the wait set. Once the
|
| + // handle is retrieved using MojoGetReadyHandles(), it is implicitly removed
|
| + // from the set. The result is either the pending result that existed when
|
| + // the handle was closed, or |MOJO_RESULT_CANCELLED| to indicate that the
|
| + // handle was closed.
|
| + if (it == handlers_.end())
|
| + continue;
|
| + ready_handles[handle] = it->second.id;
|
| + }
|
| +
|
| + for (uint32_t i = 0; i < num_ready_handles; i++) {
|
| + const Handle handle = Handle(handles[i]);
|
| +
|
| + // If the handle has been removed, or it's ID has changed, skip over it.
|
| + // If the handle's ID has changed, and it still satisfies its signals,
|
| + // then it'll be caught in the next message pump iteration.
|
| + const auto it = handlers_.find(handle);
|
| + if ((handle.value() != read_handle_.get().value()) &&
|
| + (it == handlers_.end() || it->second.id != ready_handles[handle])) {
|
| + continue;
|
| }
|
| - } else {
|
| - switch (result) {
|
| +
|
| + switch (handle_results[i]) {
|
| case MOJO_RESULT_CANCELLED:
|
| case MOJO_RESULT_FAILED_PRECONDITION:
|
| - case MOJO_RESULT_INVALID_ARGUMENT:
|
| - RemoveInvalidHandle(wait_state, result, wait_many_result.index);
|
| + DVLOG(1) << "Error: " << handle_results[i]
|
| + << " handle: " << handle.value();
|
| + if (handle.value() == read_handle_.get().value()) {
|
| + // The Mojo EDK is shutting down. The ThreadQuitHelper task in
|
| + // base::Thread won't get run since the control pipe depends on the
|
| + // EDK staying alive. So quit manually to avoid this thread hanging.
|
| + Quit();
|
| + } else {
|
| + RemoveInvalidHandle(handle_results[i], handle);
|
| + }
|
| break;
|
| - case MOJO_RESULT_DEADLINE_EXCEEDED:
|
| - did_work = false;
|
| + case MOJO_RESULT_OK:
|
| + if (handle.value() == read_handle_.get().value()) {
|
| + DVLOG(1) << "Signaled control pipe";
|
| + // Control pipe was written to.
|
| + ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr,
|
| + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
|
| + } else {
|
| + DVLOG(1) << "Handle ready: " << handle.value();
|
| + SignalHandleReady(handle);
|
| + }
|
| break;
|
| default:
|
| - base::debug::Alias(&result);
|
| + base::debug::Alias(&i);
|
| + base::debug::Alias(&handle_results[i]);
|
| // Unexpected result is likely fatal, crash so we can determine cause.
|
| CHECK(false);
|
| }
|
| }
|
| + return true;
|
| +}
|
|
|
| +void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) {
|
| + // TODO(sky): deal with control pipe going bad.
|
| + CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
|
| + result == MOJO_RESULT_CANCELLED ||
|
| + result == MOJO_RESULT_DEADLINE_EXCEEDED);
|
| + // Indicates the control pipe went bad.
|
| + CHECK_NE(handle.value(), read_handle_.get().value());
|
| +
|
| + auto it = handlers_.find(handle);
|
| + CHECK(it != handlers_.end());
|
| + MessagePumpMojoHandler* handler = it->second.handler;
|
| + RemoveHandler(handle);
|
| + WillSignalHandler();
|
| + handler->OnHandleError(handle, result);
|
| + DidSignalHandler();
|
| +}
|
| +
|
| +bool MessagePumpMojo::RemoveExpiredHandles() {
|
| + bool removed = false;
|
| // Notify and remove any handlers whose time has expired. First, iterate over
|
| // the set of handles that have a deadline, and add the expired handles to a
|
| // map of <Handle, id>. Then, iterate over those expired handles and remove
|
| @@ -232,40 +353,20 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
|
| if (!it->second.deadline.is_null() && it->second.deadline < now)
|
| expired_handles[handle] = it->second.id;
|
| }
|
| - for (auto& pair : expired_handles) {
|
| + for (const auto& pair : expired_handles) {
|
| auto it = handlers_.find(pair.first);
|
| // Don't need to check deadline again since it can't change if id hasn't
|
| // changed.
|
| if (it != handlers_.end() && it->second.id == pair.second) {
|
| - MessagePumpMojoHandler* handler = handlers_[pair.first].handler;
|
| + MessagePumpMojoHandler* handler = it->second.handler;
|
| RemoveHandler(pair.first);
|
| WillSignalHandler();
|
| handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED);
|
| DidSignalHandler();
|
| - did_work = true;
|
| + removed = true;
|
| }
|
| }
|
| - return did_work;
|
| -}
|
| -
|
| -void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state,
|
| - MojoResult result,
|
| - uint32_t index) {
|
| - // TODO(sky): deal with control pipe going bad.
|
| - CHECK(result == MOJO_RESULT_INVALID_ARGUMENT ||
|
| - result == MOJO_RESULT_FAILED_PRECONDITION ||
|
| - result == MOJO_RESULT_CANCELLED);
|
| - CHECK_NE(index, 0u); // Indicates the control pipe went bad.
|
| -
|
| - // Remove the handle first, this way if OnHandleError() tries to remove the
|
| - // handle our iterator isn't invalidated.
|
| - Handle handle = wait_state.handles[index];
|
| - CHECK(handlers_.find(handle) != handlers_.end());
|
| - MessagePumpMojoHandler* handler = handlers_[handle].handler;
|
| - RemoveHandler(handle);
|
| - WillSignalHandler();
|
| - handler->OnHandleError(handle, result);
|
| - DidSignalHandler();
|
| + return removed;
|
| }
|
|
|
| void MessagePumpMojo::SignalControlPipe() {
|
| @@ -282,20 +383,6 @@ void MessagePumpMojo::SignalControlPipe() {
|
| CHECK_EQ(MOJO_RESULT_OK, result);
|
| }
|
|
|
| -MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const {
|
| - WaitState wait_state;
|
| - wait_state.handles.push_back(read_handle_.get());
|
| - wait_state.wait_signals.push_back(
|
| - MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
|
| -
|
| - for (HandleToHandler::const_iterator i = handlers_.begin();
|
| - i != handlers_.end(); ++i) {
|
| - wait_state.handles.push_back(i->first);
|
| - wait_state.wait_signals.push_back(i->second.wait_signals);
|
| - }
|
| - return wait_state;
|
| -}
|
| -
|
| MojoDeadline MessagePumpMojo::GetDeadlineForWait(
|
| const RunState& run_state) const {
|
| const base::TimeTicks now(internal::NowTicks());
|
| @@ -310,6 +397,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait(
|
| return deadline;
|
| }
|
|
|
| +void MessagePumpMojo::SignalHandleReady(Handle handle) {
|
| + DCHECK(handlers_.find(handle) != handlers_.end());
|
| + WillSignalHandler();
|
| + handlers_[handle].handler->OnHandleReady(handle);
|
| + DidSignalHandler();
|
| +}
|
| +
|
| void MessagePumpMojo::WillSignalHandler() {
|
| FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
|
| }
|
|
|