Chromium Code Reviews| Index: mojo/message_pump/message_pump_mojo.cc |
| diff --git a/mojo/message_pump/message_pump_mojo.cc b/mojo/message_pump/message_pump_mojo.cc |
| index 12f9cc163c444a3f6303a2d380a66daa478c7ac6..431abfed94061e0f85730e1d6a0d75ff1e223861 100644 |
| --- a/mojo/message_pump/message_pump_mojo.cc |
| +++ b/mojo/message_pump/message_pump_mojo.cc |
| @@ -5,6 +5,7 @@ |
| #include "mojo/message_pump/message_pump_mojo.h" |
| #include <algorithm> |
| +#include <map> |
| #include <vector> |
| #include "base/debug/alias.h" |
| @@ -14,6 +15,7 @@ |
| #include "base/time/time.h" |
| #include "mojo/message_pump/message_pump_mojo_handler.h" |
| #include "mojo/message_pump/time_helper.h" |
| +#include "mojo/public/c/system/wait_set.h" |
| namespace mojo { |
| namespace common { |
| @@ -35,13 +37,6 @@ MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, |
| } // namespace |
| -// State needed for one iteration of WaitMany. The first handle and flags |
| -// corresponds to that of the control pipe. |
| -struct MessagePumpMojo::WaitState { |
| - std::vector<Handle> handles; |
| - std::vector<MojoHandleSignals> wait_signals; |
| -}; |
| - |
| struct MessagePumpMojo::RunState { |
| RunState() : should_quit(false) {} |
| @@ -59,6 +54,17 @@ MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { |
| CHECK_EQ(result, MOJO_RESULT_OK); |
| CHECK(read_handle_.is_valid()); |
| CHECK(write_handle_.is_valid()); |
| + |
| + MojoHandle handle; |
| + result = MojoCreateWaitSet(&handle); |
| + CHECK_EQ(result, MOJO_RESULT_OK); |
| + wait_set_handle_.reset(Handle(handle)); |
| + CHECK(wait_set_handle_.is_valid()); |
| + |
| + result = MojoAddHandle(wait_set_handle_.get().value(), |
| + read_handle_.get().value(), |
| + MOJO_HANDLE_SIGNAL_READABLE); |
| + CHECK_EQ(result, MOJO_RESULT_OK); |
| } |
| MessagePumpMojo::~MessagePumpMojo() { |
| @@ -94,9 +100,24 @@ void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, |
| bool inserted = deadline_handles_.insert(handle).second; |
| DCHECK(inserted); |
| } |
| + |
| + MojoResult result = MojoAddHandle(wait_set_handle_.get().value(), |
| + handle.value(), wait_signals); |
| + // Because stopping a HandleWatcher is now asynchronous, it's possible for the |
| + // handle to no longer be open at this point. |
| + CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT); |
|
sky
2015/12/11 17:49:01
Can you add test coverage of the add with closed p
Anand Mistry (off Chromium)
2015/12/14 03:16:20
Hm. Here's a question for you. If you add a closed
sky
2015/12/28 17:29:44
Is there a reason you don't want to notify immedia
|
| } |
| void MessagePumpMojo::RemoveHandler(const Handle& handle) { |
| + MojoResult result = MojoRemoveHandle(wait_set_handle_.get().value(), |
| + handle.value()); |
| + // At this point, it's possible that the handle has been closed, which would |
| + // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also |
| + // possible for the handle to have already been removed, so all of the |
| + // possible error codes are valid here. |
| + CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND || |
| + result == MOJO_RESULT_INVALID_ARGUMENT); |
| + |
| handlers_.erase(handle); |
| deadline_handles_.erase(handle); |
| } |
| @@ -170,49 +191,99 @@ void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { |
| } |
| bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
| - const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; |
| - const WaitState wait_state = GetWaitState(); |
| - |
| - std::vector<MojoHandleSignalsState> states(wait_state.handles.size()); |
| - const WaitManyResult wait_many_result = |
| - WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states); |
| - const MojoResult result = wait_many_result.result; |
| - bool did_work = true; |
| - if (result == MOJO_RESULT_OK) { |
| - if (wait_many_result.index == 0) { |
| - if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) { |
| - // The Mojo EDK is shutting down. The ThreadQuitHelper task in |
| - // base::Thread won't get run since the control pipe depends on the EDK |
| - // staying alive. So quit manually to avoid this thread hanging. |
| - Quit(); |
| - } else { |
| - // Control pipe was written to. |
| - ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL, |
| - MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| - } |
| + bool did_work = block; |
| + if (block) { |
| + const MojoDeadline deadline = GetDeadlineForWait(run_state); |
| + const MojoResult wait_result = Wait(wait_set_handle_.get(), |
|
sky
2015/12/11 17:49:01
How come you don't always call wait?
Anand Mistry (off Chromium)
2015/12/14 03:16:20
Added a comment to address.
|
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + deadline, nullptr); |
| + if (wait_result == MOJO_RESULT_OK) { |
| + // Handles may be ready. Or not since wake-ups can be spurious in certain |
| + // circumstances. |
| + } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) { |
| + did_work = false; |
| } else { |
| - DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != |
| - handlers_.end()); |
| - WillSignalHandler(); |
| - handlers_[wait_state.handles[wait_many_result.index]] |
| - .handler->OnHandleReady(wait_state.handles[wait_many_result.index]); |
| - DidSignalHandler(); |
| + base::debug::Alias(&wait_result); |
| + // Unexpected result is likely fatal, crash so we can determine cause. |
| + CHECK(false); |
| + } |
| + } |
| + |
| + const uint32_t kMaxServiced = 8; |
|
sky
2015/12/11 17:49:01
This function is rather lengthy. How about breakin
Anand Mistry (off Chromium)
2015/12/14 03:16:20
Done.
|
| + uint32_t count = kMaxServiced; |
| + MojoResult handle_results[kMaxServiced]; |
| + MojoHandle handles[kMaxServiced] = {MOJO_HANDLE_INVALID}; |
| + |
| + const MojoResult get_result = MojoGetReadyHandles( |
| + wait_set_handle_.get().value(), |
| + &count, handles, handle_results, nullptr); |
| + CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT); |
| + if (get_result == MOJO_RESULT_OK) { |
| + DCHECK(count); |
| + DCHECK_LE(count, kMaxServiced); |
| + // Do this in two steps, because notifying a handler may remove/add other |
| + // handles that may have also been woken up. |
| + // First, enumerate the IDs of the ready handles. Then, iterate over the |
| + // handles and only take action if the ID hasn't changed. |
| + std::map<Handle, int> ready_handles; |
| + for (uint32_t i = 0 ; i < count; i++) { |
| + const Handle handle = Handle(handles[i]); |
| + // Skip the control handle. It's special. |
| + if (handle.value() == read_handle_.get().value()) |
| + continue; |
| + DCHECK(handle.is_valid()); |
| + const auto it = handlers_.find(handle); |
| + // Skip handles that have been removed. |
|
sky
2015/12/11 17:49:01
Under what conditions could this happen? I'm wonde
Anand Mistry (off Chromium)
2015/12/14 03:16:20
Added comment to explain why this is possible. It'
|
| + if (it == handlers_.end()) |
| + continue; |
| + ready_handles[handle] = it->second.id; |
| } |
| - } else { |
| - switch (result) { |
| - case MOJO_RESULT_CANCELLED: |
| - case MOJO_RESULT_FAILED_PRECONDITION: |
| - case MOJO_RESULT_INVALID_ARGUMENT: |
| - RemoveInvalidHandle(wait_state, result, wait_many_result.index); |
| - break; |
| - case MOJO_RESULT_DEADLINE_EXCEEDED: |
| - did_work = false; |
| - break; |
| - default: |
| - base::debug::Alias(&result); |
| - // Unexpected result is likely fatal, crash so we can determine cause. |
| - CHECK(false); |
| + |
| + for (uint32_t i = 0 ; i < count; i++) { |
| + const Handle handle = Handle(handles[i]); |
| + |
| + // If the handle has been removed, or it's ID has changed, skip over it. |
| + // If the handle's ID has changed, and it still satisfied its signals, |
| + // then it'll be caught in the next message pump iteration. |
| + const auto it = handlers_.find(handle); |
| + if ((handle.value() != read_handle_.get().value()) && |
| + (it == handlers_.end() || it->second.id != ready_handles[handle])) { |
| + continue; |
| + } |
| + |
| + switch (handle_results[i]) { |
| + case MOJO_RESULT_CANCELLED: |
| + case MOJO_RESULT_FAILED_PRECONDITION: |
| + DVLOG(1) << "Error: " << handle_results[i] |
| + << " handle: " << handle.value(); |
| + if (handle.value() == read_handle_.get().value()) { |
| + // The Mojo EDK is shutting down. The ThreadQuitHelper task in |
| + // base::Thread won't get run since the control pipe depends on the |
| + // EDK staying alive. So quit manually to avoid this thread hanging. |
| + Quit(); |
| + } else { |
| + RemoveInvalidHandle(handle_results[i], handle); |
| + } |
| + break; |
| + case MOJO_RESULT_OK: |
| + if (handle.value() == read_handle_.get().value()) { |
| + DVLOG(1) << "Signaled control pipe"; |
| + // Control pipe was written to. |
| + ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL, |
|
sky
2015/12/11 17:49:01
nullptr on these?
Anand Mistry (off Chromium)
2015/12/14 03:16:20
Done.
|
| + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| + } else { |
| + DVLOG(1) << "Handle ready: " << handle.value(); |
| + SignalHandleReady(handle); |
| + } |
| + break; |
| + default: |
| + base::debug::Alias(&i); |
| + base::debug::Alias(&handle_results[i]); |
| + // Unexpected result is likely fatal, crash so we can determine cause. |
| + CHECK(false); |
| + } |
| } |
| + did_work = true; |
| } |
| // Notify and remove any handlers whose time has expired. First, iterate over |
| @@ -246,19 +317,18 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
| return did_work; |
| } |
| -void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, |
| - MojoResult result, |
| - uint32_t index) { |
| +void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) { |
| // TODO(sky): deal with control pipe going bad. |
| - CHECK(result == MOJO_RESULT_INVALID_ARGUMENT || |
| - result == MOJO_RESULT_FAILED_PRECONDITION || |
| - result == MOJO_RESULT_CANCELLED); |
| - CHECK_NE(index, 0u); // Indicates the control pipe went bad. |
| + CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || |
| + result == MOJO_RESULT_CANCELLED || |
| + result == MOJO_RESULT_DEADLINE_EXCEEDED); |
| + // Indicates the control pipe went bad. |
| + CHECK_NE(handle.value(), read_handle_.get().value()); |
| // Remove the handle first, this way if OnHandleError() tries to remove the |
| // handle our iterator isn't invalidated. |
| - Handle handle = wait_state.handles[index]; |
| - CHECK(handlers_.find(handle) != handlers_.end()); |
| + if (handlers_.find(handle) == handlers_.end()) |
|
sky
2015/12/11 17:49:01
Under what conditions would this hit?
Anand Mistry (off Chromium)
2015/12/14 03:16:20
It did in a previous iteration, but not any more.
|
| + return; |
| MessagePumpMojoHandler* handler = handlers_[handle].handler; |
| RemoveHandler(handle); |
| WillSignalHandler(); |
| @@ -280,20 +350,6 @@ void MessagePumpMojo::SignalControlPipe() { |
| CHECK_EQ(MOJO_RESULT_OK, result); |
| } |
| -MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const { |
| - WaitState wait_state; |
| - wait_state.handles.push_back(read_handle_.get()); |
| - wait_state.wait_signals.push_back( |
| - MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED); |
| - |
| - for (HandleToHandler::const_iterator i = handlers_.begin(); |
| - i != handlers_.end(); ++i) { |
| - wait_state.handles.push_back(i->first); |
| - wait_state.wait_signals.push_back(i->second.wait_signals); |
| - } |
| - return wait_state; |
| -} |
| - |
| MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
| const RunState& run_state) const { |
| const base::TimeTicks now(internal::NowTicks()); |
| @@ -308,6 +364,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
| return deadline; |
| } |
| +void MessagePumpMojo::SignalHandleReady(Handle handle) { |
| + DCHECK(handlers_.find(handle) != handlers_.end()); |
| + WillSignalHandler(); |
| + handlers_[handle].handler->OnHandleReady(handle); |
| + DidSignalHandler(); |
| +} |
| + |
| void MessagePumpMojo::WillSignalHandler() { |
| FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); |
| } |