Chromium Code Reviews| Index: mojo/message_pump/message_pump_mojo.cc |
| diff --git a/mojo/message_pump/message_pump_mojo.cc b/mojo/message_pump/message_pump_mojo.cc |
| index 12f9cc163c444a3f6303a2d380a66daa478c7ac6..fa7e4f31e39ac358c9e55447dcbdc22427658867 100644 |
| --- a/mojo/message_pump/message_pump_mojo.cc |
| +++ b/mojo/message_pump/message_pump_mojo.cc |
| @@ -5,8 +5,10 @@ |
| #include "mojo/message_pump/message_pump_mojo.h" |
| #include <algorithm> |
| +#include <map> |
| #include <vector> |
| +#include "base/containers/small_map.h" |
| #include "base/debug/alias.h" |
| #include "base/lazy_instance.h" |
| #include "base/logging.h" |
| @@ -14,6 +16,7 @@ |
| #include "base/time/time.h" |
| #include "mojo/message_pump/message_pump_mojo_handler.h" |
| #include "mojo/message_pump/time_helper.h" |
| +#include "mojo/public/c/system/wait_set.h" |
| namespace mojo { |
| namespace common { |
| @@ -35,13 +38,6 @@ MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, |
| } // namespace |
| -// State needed for one iteration of WaitMany. The first handle and flags |
| -// corresponds to that of the control pipe. |
| -struct MessagePumpMojo::WaitState { |
| - std::vector<Handle> handles; |
| - std::vector<MojoHandleSignals> wait_signals; |
| -}; |
| - |
| struct MessagePumpMojo::RunState { |
| RunState() : should_quit(false) {} |
| @@ -59,6 +55,17 @@ MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { |
| CHECK_EQ(result, MOJO_RESULT_OK); |
| CHECK(read_handle_.is_valid()); |
| CHECK(write_handle_.is_valid()); |
| + |
| + MojoHandle handle; |
| + result = MojoCreateWaitSet(&handle); |
|
Ken Rockot(use gerrit already)
2016/01/05 17:02:29
general no-op comment: We should consider adding a
Anand Mistry (off Chromium)
2016/01/05 23:29:52
Ack. It would be nice, but since this is the only
|
| + CHECK_EQ(result, MOJO_RESULT_OK); |
| + wait_set_handle_.reset(Handle(handle)); |
| + CHECK(wait_set_handle_.is_valid()); |
| + |
| + result = MojoAddHandle(wait_set_handle_.get().value(), |
| + read_handle_.get().value(), |
| + MOJO_HANDLE_SIGNAL_READABLE); |
| + CHECK_EQ(result, MOJO_RESULT_OK); |
| } |
| MessagePumpMojo::~MessagePumpMojo() { |
| @@ -94,9 +101,24 @@ void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, |
| bool inserted = deadline_handles_.insert(handle).second; |
| DCHECK(inserted); |
| } |
| + |
| + MojoResult result = MojoAddHandle(wait_set_handle_.get().value(), |
| + handle.value(), wait_signals); |
| + // Because stopping a HandleWatcher is now asynchronous, it's possible for the |
| + // handle to no longer be open at this point. |
| + CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT); |
| } |
| void MessagePumpMojo::RemoveHandler(const Handle& handle) { |
| + MojoResult result = MojoRemoveHandle(wait_set_handle_.get().value(), |
| + handle.value()); |
| + // At this point, it's possible that the handle has been closed, which would |
| + // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also |
| + // possible for the handle to have already been removed, so all of the |
| + // possible error codes are valid here. |
| + CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND || |
| + result == MOJO_RESULT_INVALID_ARGUMENT); |
| + |
| handlers_.erase(handle); |
| deadline_handles_.erase(handle); |
| } |
| @@ -170,51 +192,150 @@ void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { |
| } |
| bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
| - const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; |
| - const WaitState wait_state = GetWaitState(); |
| - |
| - std::vector<MojoHandleSignalsState> states(wait_state.handles.size()); |
| - const WaitManyResult wait_many_result = |
| - WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states); |
| - const MojoResult result = wait_many_result.result; |
| - bool did_work = true; |
| - if (result == MOJO_RESULT_OK) { |
| - if (wait_many_result.index == 0) { |
| - if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) { |
| - // The Mojo EDK is shutting down. The ThreadQuitHelper task in |
| - // base::Thread won't get run since the control pipe depends on the EDK |
| - // staying alive. So quit manually to avoid this thread hanging. |
| - Quit(); |
| - } else { |
| - // Control pipe was written to. |
| - ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL, |
| - MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| - } |
| - } else { |
| - DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != |
| - handlers_.end()); |
| - WillSignalHandler(); |
| - handlers_[wait_state.handles[wait_many_result.index]] |
| - .handler->OnHandleReady(wait_state.handles[wait_many_result.index]); |
| - DidSignalHandler(); |
| + bool did_work = block; |
| + if (block) { |
| + // If the wait isn't blocking (deadline == 0), there's no point in waiting. |
| + // Wait sets do not require a wait operation to be performed in order to |
| + // retreive any ready handles. Performing a wait with deadline == 0 is |
| + // unnecessary work. |
| + did_work = WaitForReadyHandles(run_state); |
| + } |
| + |
| + did_work |= ProcessReadyHandles(); |
| + did_work |= RemoveExpiredHandles(); |
| + |
| + return did_work; |
| +} |
| + |
| +bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const { |
| + const MojoDeadline deadline = GetDeadlineForWait(run_state); |
| + const MojoResult wait_result = Wait(wait_set_handle_.get(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + deadline, nullptr); |
| + if (wait_result == MOJO_RESULT_OK) { |
| + // Handles may be ready. Or not since wake-ups can be spurious in certain |
| + // circumstances. |
| + return true; |
| + } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) { |
| + return false; |
| + } |
| + |
| + base::debug::Alias(&wait_result); |
| + // Unexpected result is likely fatal, crash so we can determine cause. |
| + CHECK(false); |
| + return false; |
| +} |
| + |
| +bool MessagePumpMojo::ProcessReadyHandles() { |
| + // Maximum number of handles to retrieve and process. Experimentally, the 95th |
| + // percentile is 1 handle, and the long-term average is 1.1. However, this has |
| + // been seen to reach >10 under heavy load. 8 is a hand-wavy compromise. |
| + const uint32_t kMaxServiced = 8; |
| + uint32_t count = kMaxServiced; |
|
Ken Rockot(use gerrit already)
2016/01/05 17:02:29
nit: how about num_ready_handles or something more
Anand Mistry (off Chromium)
2016/01/05 23:29:52
Done.
|
| + MojoHandle handles[kMaxServiced]; |
| + MojoResult handle_results[kMaxServiced]; |
| + |
| + const MojoResult get_result = MojoGetReadyHandles( |
| + wait_set_handle_.get().value(), &count, handles, handle_results, nullptr); |
| + CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT); |
| + if (get_result != MOJO_RESULT_OK) |
| + return false; |
| + |
| + DCHECK(count); |
| + DCHECK_LE(count, kMaxServiced); |
| + // Do this in two steps, because notifying a handler may remove/add other |
| + // handles that may have also been woken up. |
| + // First, enumerate the IDs of the ready handles. Then, iterate over the |
| + // handles and only take action if the ID hasn't changed. |
| + // Since the size of this map is bounded by |kMaxServiced|, use a SmallMap to |
| + // avoid the per-element allocation. |
| + base::SmallMap<std::map<Handle, int>, kMaxServiced> ready_handles; |
| + for (uint32_t i = 0 ; i < count; i++) { |
| + const Handle handle = Handle(handles[i]); |
| + // Skip the control handle. It's special. |
| + if (handle.value() == read_handle_.get().value()) |
| + continue; |
| + DCHECK(handle.is_valid()); |
| + const auto it = handlers_.find(handle); |
| + // Skip handles that have been removed. This is possible because |
| + // RemoveHandler() can be called with a handle that has been closed. Because |
| + // the handle is closed, the MojoRemoveHandle() call in RemoveHandler() |
| + // would have failed, but the handle is still in the wait set. Once the |
| + // handle is retrieved using MojoGetReadyHandles(), it is implicitly removed |
| + // from the set. The result is either the pending result that existed when |
| + // the handle was closed, or |MOJO_RESULT_CANCELLED| to indicate that the |
| + // handle was closed. |
| + if (it == handlers_.end()) |
| + continue; |
| + ready_handles[handle] = it->second.id; |
| + } |
| + |
| + for (uint32_t i = 0 ; i < count; i++) { |
| + const Handle handle = Handle(handles[i]); |
| + |
| + // If the handle has been removed, or it's ID has changed, skip over it. |
| + // If the handle's ID has changed, and it still satisfies its signals, |
| + // then it'll be caught in the next message pump iteration. |
| + const auto it = handlers_.find(handle); |
| + if ((handle.value() != read_handle_.get().value()) && |
| + (it == handlers_.end() || it->second.id != ready_handles[handle])) { |
| + continue; |
| } |
| - } else { |
| - switch (result) { |
| + |
| + switch (handle_results[i]) { |
| case MOJO_RESULT_CANCELLED: |
| case MOJO_RESULT_FAILED_PRECONDITION: |
| - case MOJO_RESULT_INVALID_ARGUMENT: |
| - RemoveInvalidHandle(wait_state, result, wait_many_result.index); |
| + DVLOG(1) << "Error: " << handle_results[i] |
| + << " handle: " << handle.value(); |
| + if (handle.value() == read_handle_.get().value()) { |
| + // The Mojo EDK is shutting down. The ThreadQuitHelper task in |
| + // base::Thread won't get run since the control pipe depends on the |
| + // EDK staying alive. So quit manually to avoid this thread hanging. |
| + Quit(); |
| + } else { |
| + RemoveInvalidHandle(handle_results[i], handle); |
| + } |
| break; |
| - case MOJO_RESULT_DEADLINE_EXCEEDED: |
| - did_work = false; |
| + case MOJO_RESULT_OK: |
| + if (handle.value() == read_handle_.get().value()) { |
| + DVLOG(1) << "Signaled control pipe"; |
| + // Control pipe was written to. |
| + ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, |
| + nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| + } else { |
| + DVLOG(1) << "Handle ready: " << handle.value(); |
| + SignalHandleReady(handle); |
| + } |
| break; |
| default: |
| - base::debug::Alias(&result); |
| + base::debug::Alias(&i); |
| + base::debug::Alias(&handle_results[i]); |
| // Unexpected result is likely fatal, crash so we can determine cause. |
| CHECK(false); |
| } |
| } |
| + return true; |
| +} |
| +void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) { |
| + // TODO(sky): deal with control pipe going bad. |
| + CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || |
| + result == MOJO_RESULT_CANCELLED || |
| + result == MOJO_RESULT_DEADLINE_EXCEEDED); |
| + // Indicates the control pipe went bad. |
| + CHECK_NE(handle.value(), read_handle_.get().value()); |
| + |
| + auto it = handlers_.find(handle); |
| + CHECK(it != handlers_.end()); |
| + MessagePumpMojoHandler* handler = it->second.handler; |
| + RemoveHandler(handle); |
| + WillSignalHandler(); |
| + handler->OnHandleError(handle, result); |
| + DidSignalHandler(); |
| +} |
| + |
| +bool MessagePumpMojo::RemoveExpiredHandles() { |
| + bool removed = false; |
| // Notify and remove any handlers whose time has expired. First, iterate over |
| // the set of handles that have a deadline, and add the expired handles to a |
| // map of <Handle, id>. Then, iterate over those expired handles and remove |
| @@ -230,40 +351,20 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
| if (!it->second.deadline.is_null() && it->second.deadline < now) |
| expired_handles[handle] = it->second.id; |
| } |
| - for (auto& pair : expired_handles) { |
| + for (const auto& pair : expired_handles) { |
| auto it = handlers_.find(pair.first); |
| // Don't need to check deadline again since it can't change if id hasn't |
| // changed. |
| if (it != handlers_.end() && it->second.id == pair.second) { |
| - MessagePumpMojoHandler* handler = handlers_[pair.first].handler; |
| + MessagePumpMojoHandler* handler = it->second.handler; |
| RemoveHandler(pair.first); |
| WillSignalHandler(); |
| handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED); |
| DidSignalHandler(); |
| - did_work = true; |
| + removed = true; |
| } |
| } |
| - return did_work; |
| -} |
| - |
| -void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, |
| - MojoResult result, |
| - uint32_t index) { |
| - // TODO(sky): deal with control pipe going bad. |
| - CHECK(result == MOJO_RESULT_INVALID_ARGUMENT || |
| - result == MOJO_RESULT_FAILED_PRECONDITION || |
| - result == MOJO_RESULT_CANCELLED); |
| - CHECK_NE(index, 0u); // Indicates the control pipe went bad. |
| - |
| - // Remove the handle first, this way if OnHandleError() tries to remove the |
| - // handle our iterator isn't invalidated. |
| - Handle handle = wait_state.handles[index]; |
| - CHECK(handlers_.find(handle) != handlers_.end()); |
| - MessagePumpMojoHandler* handler = handlers_[handle].handler; |
| - RemoveHandler(handle); |
| - WillSignalHandler(); |
| - handler->OnHandleError(handle, result); |
| - DidSignalHandler(); |
| + return removed; |
| } |
| void MessagePumpMojo::SignalControlPipe() { |
| @@ -280,20 +381,6 @@ void MessagePumpMojo::SignalControlPipe() { |
| CHECK_EQ(MOJO_RESULT_OK, result); |
| } |
| -MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const { |
| - WaitState wait_state; |
| - wait_state.handles.push_back(read_handle_.get()); |
| - wait_state.wait_signals.push_back( |
| - MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED); |
| - |
| - for (HandleToHandler::const_iterator i = handlers_.begin(); |
| - i != handlers_.end(); ++i) { |
| - wait_state.handles.push_back(i->first); |
| - wait_state.wait_signals.push_back(i->second.wait_signals); |
| - } |
| - return wait_state; |
| -} |
| - |
| MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
| const RunState& run_state) const { |
| const base::TimeTicks now(internal::NowTicks()); |
| @@ -308,6 +395,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
| return deadline; |
| } |
| +void MessagePumpMojo::SignalHandleReady(Handle handle) { |
| + DCHECK(handlers_.find(handle) != handlers_.end()); |
| + WillSignalHandler(); |
| + handlers_[handle].handler->OnHandleReady(handle); |
| + DidSignalHandler(); |
| +} |
| + |
| void MessagePumpMojo::WillSignalHandler() { |
| FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); |
| } |