Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(682)

Unified Diff: mojo/message_pump/message_pump_mojo.cc

Issue 1467953002: Implement MessagePumpMojo using WaitSet. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-implementation
Patch Set: Rebase, address comments, and auto format. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | mojo/message_pump/message_pump_mojo_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/message_pump/message_pump_mojo.cc
diff --git a/mojo/message_pump/message_pump_mojo.cc b/mojo/message_pump/message_pump_mojo.cc
index 696e083bf76294a5a25bc207d99ddf5470f00b63..3f60dd7116d9744411263ad6452563e148ee0990 100644
--- a/mojo/message_pump/message_pump_mojo.cc
+++ b/mojo/message_pump/message_pump_mojo.cc
@@ -7,8 +7,10 @@
#include <stdint.h>
#include <algorithm>
+#include <map>
#include <vector>
+#include "base/containers/small_map.h"
#include "base/debug/alias.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
@@ -16,6 +18,7 @@
#include "base/time/time.h"
#include "mojo/message_pump/message_pump_mojo_handler.h"
#include "mojo/message_pump/time_helper.h"
+#include "mojo/public/c/system/wait_set.h"
namespace mojo {
namespace common {
@@ -37,13 +40,6 @@ MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
} // namespace
-// State needed for one iteration of WaitMany. The first handle and flags
-// corresponds to that of the control pipe.
-struct MessagePumpMojo::WaitState {
- std::vector<Handle> handles;
- std::vector<MojoHandleSignals> wait_signals;
-};
-
struct MessagePumpMojo::RunState {
RunState() : should_quit(false) {}
@@ -61,6 +57,17 @@ MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
CHECK_EQ(result, MOJO_RESULT_OK);
CHECK(read_handle_.is_valid());
CHECK(write_handle_.is_valid());
+
+ MojoHandle handle;
+ result = MojoCreateWaitSet(&handle);
+ CHECK_EQ(result, MOJO_RESULT_OK);
+ wait_set_handle_.reset(Handle(handle));
+ CHECK(wait_set_handle_.is_valid());
+
+ result =
+ MojoAddHandle(wait_set_handle_.get().value(), read_handle_.get().value(),
+ MOJO_HANDLE_SIGNAL_READABLE);
+ CHECK_EQ(result, MOJO_RESULT_OK);
}
MessagePumpMojo::~MessagePumpMojo() {
@@ -96,9 +103,24 @@ void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
bool inserted = deadline_handles_.insert(handle).second;
DCHECK(inserted);
}
+
+ MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
+ handle.value(), wait_signals);
+ // Because stopping a HandleWatcher is now asynchronous, it's possible for the
+ // handle to no longer be open at this point.
+ CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT);
}
void MessagePumpMojo::RemoveHandler(const Handle& handle) {
+ MojoResult result =
+ MojoRemoveHandle(wait_set_handle_.get().value(), handle.value());
+ // At this point, it's possible that the handle has been closed, which would
+ // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also
+ // possible for the handle to have already been removed, so all of the
+ // possible error codes are valid here.
+ CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND ||
+ result == MOJO_RESULT_INVALID_ARGUMENT);
+
handlers_.erase(handle);
deadline_handles_.erase(handle);
}
@@ -172,51 +194,150 @@ void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
}
bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
- const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
- const WaitState wait_state = GetWaitState();
-
- std::vector<MojoHandleSignalsState> states(wait_state.handles.size());
- const WaitManyResult wait_many_result =
- WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states);
- const MojoResult result = wait_many_result.result;
- bool did_work = true;
- if (result == MOJO_RESULT_OK) {
- if (wait_many_result.index == 0) {
- if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) {
- // The Mojo EDK is shutting down. The ThreadQuitHelper task in
- // base::Thread won't get run since the control pipe depends on the EDK
- // staying alive. So quit manually to avoid this thread hanging.
- Quit();
- } else {
- // Control pipe was written to.
- ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
- MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
- }
- } else {
- DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) !=
- handlers_.end());
- WillSignalHandler();
- handlers_[wait_state.handles[wait_many_result.index]]
- .handler->OnHandleReady(wait_state.handles[wait_many_result.index]);
- DidSignalHandler();
+ bool did_work = block;
+ if (block) {
+ // If the wait isn't blocking (deadline == 0), there's no point in waiting.
+ // Wait sets do not require a wait operation to be performed in order to
+ // retreive any ready handles. Performing a wait with deadline == 0 is
+ // unnecessary work.
+ did_work = WaitForReadyHandles(run_state);
+ }
+
+ did_work |= ProcessReadyHandles();
+ did_work |= RemoveExpiredHandles();
+
+ return did_work;
+}
+
+bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const {
+ const MojoDeadline deadline = GetDeadlineForWait(run_state);
+ const MojoResult wait_result = Wait(
+ wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
+ if (wait_result == MOJO_RESULT_OK) {
+ // Handles may be ready. Or not since wake-ups can be spurious in certain
+ // circumstances.
+ return true;
+ } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
+ return false;
+ }
+
+ base::debug::Alias(&wait_result);
+ // Unexpected result is likely fatal, crash so we can determine cause.
+ CHECK(false);
+ return false;
+}
+
+bool MessagePumpMojo::ProcessReadyHandles() {
+ // Maximum number of handles to retrieve and process. Experimentally, the 95th
+ // percentile is 1 handle, and the long-term average is 1.1. However, this has
+ // been seen to reach >10 under heavy load. 8 is a hand-wavy compromise.
+ const uint32_t kMaxServiced = 8;
+ uint32_t num_ready_handles = kMaxServiced;
+ MojoHandle handles[kMaxServiced];
+ MojoResult handle_results[kMaxServiced];
+
+ const MojoResult get_result =
+ MojoGetReadyHandles(wait_set_handle_.get().value(), &num_ready_handles,
+ handles, handle_results, nullptr);
+ CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT);
+ if (get_result != MOJO_RESULT_OK)
+ return false;
+
+ DCHECK(num_ready_handles);
+ DCHECK_LE(num_ready_handles, kMaxServiced);
+ // Do this in two steps, because notifying a handler may remove/add other
+ // handles that may have also been woken up.
+ // First, enumerate the IDs of the ready handles. Then, iterate over the
+ // handles and only take action if the ID hasn't changed.
+ // Since the size of this map is bounded by |kMaxServiced|, use a SmallMap to
+ // avoid the per-element allocation.
+ base::SmallMap<std::map<Handle, int>, kMaxServiced> ready_handles;
+ for (uint32_t i = 0; i < num_ready_handles; i++) {
+ const Handle handle = Handle(handles[i]);
+ // Skip the control handle. It's special.
+ if (handle.value() == read_handle_.get().value())
+ continue;
+ DCHECK(handle.is_valid());
+ const auto it = handlers_.find(handle);
+ // Skip handles that have been removed. This is possible because
+ // RemoveHandler() can be called with a handle that has been closed. Because
+ // the handle is closed, the MojoRemoveHandle() call in RemoveHandler()
+ // would have failed, but the handle is still in the wait set. Once the
+ // handle is retrieved using MojoGetReadyHandles(), it is implicitly removed
+ // from the set. The result is either the pending result that existed when
+ // the handle was closed, or |MOJO_RESULT_CANCELLED| to indicate that the
+ // handle was closed.
+ if (it == handlers_.end())
+ continue;
+ ready_handles[handle] = it->second.id;
+ }
+
+ for (uint32_t i = 0; i < num_ready_handles; i++) {
+ const Handle handle = Handle(handles[i]);
+
+ // If the handle has been removed, or it's ID has changed, skip over it.
+ // If the handle's ID has changed, and it still satisfies its signals,
+ // then it'll be caught in the next message pump iteration.
+ const auto it = handlers_.find(handle);
+ if ((handle.value() != read_handle_.get().value()) &&
+ (it == handlers_.end() || it->second.id != ready_handles[handle])) {
+ continue;
}
- } else {
- switch (result) {
+
+ switch (handle_results[i]) {
case MOJO_RESULT_CANCELLED:
case MOJO_RESULT_FAILED_PRECONDITION:
- case MOJO_RESULT_INVALID_ARGUMENT:
- RemoveInvalidHandle(wait_state, result, wait_many_result.index);
+ DVLOG(1) << "Error: " << handle_results[i]
+ << " handle: " << handle.value();
+ if (handle.value() == read_handle_.get().value()) {
+ // The Mojo EDK is shutting down. The ThreadQuitHelper task in
+ // base::Thread won't get run since the control pipe depends on the
+ // EDK staying alive. So quit manually to avoid this thread hanging.
+ Quit();
+ } else {
+ RemoveInvalidHandle(handle_results[i], handle);
+ }
break;
- case MOJO_RESULT_DEADLINE_EXCEEDED:
- did_work = false;
+ case MOJO_RESULT_OK:
+ if (handle.value() == read_handle_.get().value()) {
+ DVLOG(1) << "Signaled control pipe";
+ // Control pipe was written to.
+ ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+ } else {
+ DVLOG(1) << "Handle ready: " << handle.value();
+ SignalHandleReady(handle);
+ }
break;
default:
- base::debug::Alias(&result);
+ base::debug::Alias(&i);
+ base::debug::Alias(&handle_results[i]);
// Unexpected result is likely fatal, crash so we can determine cause.
CHECK(false);
}
}
+ return true;
+}
+void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) {
+ // TODO(sky): deal with control pipe going bad.
+ CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
+ result == MOJO_RESULT_CANCELLED ||
+ result == MOJO_RESULT_DEADLINE_EXCEEDED);
+ // Indicates the control pipe went bad.
+ CHECK_NE(handle.value(), read_handle_.get().value());
+
+ auto it = handlers_.find(handle);
+ CHECK(it != handlers_.end());
+ MessagePumpMojoHandler* handler = it->second.handler;
+ RemoveHandler(handle);
+ WillSignalHandler();
+ handler->OnHandleError(handle, result);
+ DidSignalHandler();
+}
+
+bool MessagePumpMojo::RemoveExpiredHandles() {
+ bool removed = false;
// Notify and remove any handlers whose time has expired. First, iterate over
// the set of handles that have a deadline, and add the expired handles to a
// map of <Handle, id>. Then, iterate over those expired handles and remove
@@ -232,40 +353,20 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
if (!it->second.deadline.is_null() && it->second.deadline < now)
expired_handles[handle] = it->second.id;
}
- for (auto& pair : expired_handles) {
+ for (const auto& pair : expired_handles) {
auto it = handlers_.find(pair.first);
// Don't need to check deadline again since it can't change if id hasn't
// changed.
if (it != handlers_.end() && it->second.id == pair.second) {
- MessagePumpMojoHandler* handler = handlers_[pair.first].handler;
+ MessagePumpMojoHandler* handler = it->second.handler;
RemoveHandler(pair.first);
WillSignalHandler();
handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED);
DidSignalHandler();
- did_work = true;
+ removed = true;
}
}
- return did_work;
-}
-
-void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state,
- MojoResult result,
- uint32_t index) {
- // TODO(sky): deal with control pipe going bad.
- CHECK(result == MOJO_RESULT_INVALID_ARGUMENT ||
- result == MOJO_RESULT_FAILED_PRECONDITION ||
- result == MOJO_RESULT_CANCELLED);
- CHECK_NE(index, 0u); // Indicates the control pipe went bad.
-
- // Remove the handle first, this way if OnHandleError() tries to remove the
- // handle our iterator isn't invalidated.
- Handle handle = wait_state.handles[index];
- CHECK(handlers_.find(handle) != handlers_.end());
- MessagePumpMojoHandler* handler = handlers_[handle].handler;
- RemoveHandler(handle);
- WillSignalHandler();
- handler->OnHandleError(handle, result);
- DidSignalHandler();
+ return removed;
}
void MessagePumpMojo::SignalControlPipe() {
@@ -282,20 +383,6 @@ void MessagePumpMojo::SignalControlPipe() {
CHECK_EQ(MOJO_RESULT_OK, result);
}
-MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const {
- WaitState wait_state;
- wait_state.handles.push_back(read_handle_.get());
- wait_state.wait_signals.push_back(
- MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
-
- for (HandleToHandler::const_iterator i = handlers_.begin();
- i != handlers_.end(); ++i) {
- wait_state.handles.push_back(i->first);
- wait_state.wait_signals.push_back(i->second.wait_signals);
- }
- return wait_state;
-}
-
MojoDeadline MessagePumpMojo::GetDeadlineForWait(
const RunState& run_state) const {
const base::TimeTicks now(internal::NowTicks());
@@ -310,6 +397,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait(
return deadline;
}
+void MessagePumpMojo::SignalHandleReady(Handle handle) {
+ DCHECK(handlers_.find(handle) != handlers_.end());
+ WillSignalHandler();
+ handlers_[handle].handler->OnHandleReady(handle);
+ DidSignalHandler();
+}
+
void MessagePumpMojo::WillSignalHandler() {
FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
}
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | mojo/message_pump/message_pump_mojo_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698