Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(513)

Unified Diff: mojo/message_pump/message_pump_mojo.cc

Issue 1467953002: Implement MessagePumpMojo using WaitSet. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-implementation
Patch Set: Rebase. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/message_pump/message_pump_mojo.cc
diff --git a/mojo/message_pump/message_pump_mojo.cc b/mojo/message_pump/message_pump_mojo.cc
index 12f9cc163c444a3f6303a2d380a66daa478c7ac6..431abfed94061e0f85730e1d6a0d75ff1e223861 100644
--- a/mojo/message_pump/message_pump_mojo.cc
+++ b/mojo/message_pump/message_pump_mojo.cc
@@ -5,6 +5,7 @@
#include "mojo/message_pump/message_pump_mojo.h"
#include <algorithm>
+#include <map>
#include <vector>
#include "base/debug/alias.h"
@@ -14,6 +15,7 @@
#include "base/time/time.h"
#include "mojo/message_pump/message_pump_mojo_handler.h"
#include "mojo/message_pump/time_helper.h"
+#include "mojo/public/c/system/wait_set.h"
namespace mojo {
namespace common {
@@ -35,13 +37,6 @@ MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
} // namespace
-// State needed for one iteration of WaitMany. The first handle and flags
-// corresponds to that of the control pipe.
-struct MessagePumpMojo::WaitState {
- std::vector<Handle> handles;
- std::vector<MojoHandleSignals> wait_signals;
-};
-
struct MessagePumpMojo::RunState {
RunState() : should_quit(false) {}
@@ -59,6 +54,17 @@ MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
CHECK_EQ(result, MOJO_RESULT_OK);
CHECK(read_handle_.is_valid());
CHECK(write_handle_.is_valid());
+
+ MojoHandle handle;
+ result = MojoCreateWaitSet(&handle);
+ CHECK_EQ(result, MOJO_RESULT_OK);
+ wait_set_handle_.reset(Handle(handle));
+ CHECK(wait_set_handle_.is_valid());
+
+ result = MojoAddHandle(wait_set_handle_.get().value(),
+ read_handle_.get().value(),
+ MOJO_HANDLE_SIGNAL_READABLE);
+ CHECK_EQ(result, MOJO_RESULT_OK);
}
MessagePumpMojo::~MessagePumpMojo() {
@@ -94,9 +100,24 @@ void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
bool inserted = deadline_handles_.insert(handle).second;
DCHECK(inserted);
}
+
+ MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
+ handle.value(), wait_signals);
+ // Because stopping a HandleWatcher is now asynchronous, it's possible for the
+ // handle to no longer be open at this point.
+ CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT);
sky 2015/12/11 17:49:01 Can you add test coverage of the add with closed p
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Hm. Here's a question for you. If you add a closed
sky 2015/12/28 17:29:44 Is there a reason you don't want to notify immedia
}
void MessagePumpMojo::RemoveHandler(const Handle& handle) {
+ MojoResult result = MojoRemoveHandle(wait_set_handle_.get().value(),
+ handle.value());
+ // At this point, it's possible that the handle has been closed, which would
+ // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also
+ // possible for the handle to have already been removed, so all of the
+ // possible error codes are valid here.
+ CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND ||
+ result == MOJO_RESULT_INVALID_ARGUMENT);
+
handlers_.erase(handle);
deadline_handles_.erase(handle);
}
@@ -170,49 +191,99 @@ void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
}
bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
- const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
- const WaitState wait_state = GetWaitState();
-
- std::vector<MojoHandleSignalsState> states(wait_state.handles.size());
- const WaitManyResult wait_many_result =
- WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states);
- const MojoResult result = wait_many_result.result;
- bool did_work = true;
- if (result == MOJO_RESULT_OK) {
- if (wait_many_result.index == 0) {
- if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) {
- // The Mojo EDK is shutting down. The ThreadQuitHelper task in
- // base::Thread won't get run since the control pipe depends on the EDK
- // staying alive. So quit manually to avoid this thread hanging.
- Quit();
- } else {
- // Control pipe was written to.
- ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
- MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
- }
+ bool did_work = block;
+ if (block) {
+ const MojoDeadline deadline = GetDeadlineForWait(run_state);
+ const MojoResult wait_result = Wait(wait_set_handle_.get(),
sky 2015/12/11 17:49:01 How come you don't always call wait?
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Added a comment to address.
+ MOJO_HANDLE_SIGNAL_READABLE,
+ deadline, nullptr);
+ if (wait_result == MOJO_RESULT_OK) {
+ // Handles may be ready. Or not since wake-ups can be spurious in certain
+ // circumstances.
+ } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
+ did_work = false;
} else {
- DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) !=
- handlers_.end());
- WillSignalHandler();
- handlers_[wait_state.handles[wait_many_result.index]]
- .handler->OnHandleReady(wait_state.handles[wait_many_result.index]);
- DidSignalHandler();
+ base::debug::Alias(&wait_result);
+ // Unexpected result is likely fatal, crash so we can determine cause.
+ CHECK(false);
+ }
+ }
+
+ const uint32_t kMaxServiced = 8;
sky 2015/12/11 17:49:01 This function is rather lengthy. How about breakin
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Done.
+ uint32_t count = kMaxServiced;
+ MojoResult handle_results[kMaxServiced];
+ MojoHandle handles[kMaxServiced] = {MOJO_HANDLE_INVALID};
+
+ const MojoResult get_result = MojoGetReadyHandles(
+ wait_set_handle_.get().value(),
+ &count, handles, handle_results, nullptr);
+ CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT);
+ if (get_result == MOJO_RESULT_OK) {
+ DCHECK(count);
+ DCHECK_LE(count, kMaxServiced);
+ // Do this in two steps, because notifying a handler may remove/add other
+ // handles that may have also been woken up.
+ // First, enumerate the IDs of the ready handles. Then, iterate over the
+ // handles and only take action if the ID hasn't changed.
+ std::map<Handle, int> ready_handles;
+ for (uint32_t i = 0 ; i < count; i++) {
+ const Handle handle = Handle(handles[i]);
+ // Skip the control handle. It's special.
+ if (handle.value() == read_handle_.get().value())
+ continue;
+ DCHECK(handle.is_valid());
+ const auto it = handlers_.find(handle);
+ // Skip handles that have been removed.
sky 2015/12/11 17:49:01 Under what conditions could this happen? I'm wonde
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Added comment to explain why this is possible. It'
+ if (it == handlers_.end())
+ continue;
+ ready_handles[handle] = it->second.id;
}
- } else {
- switch (result) {
- case MOJO_RESULT_CANCELLED:
- case MOJO_RESULT_FAILED_PRECONDITION:
- case MOJO_RESULT_INVALID_ARGUMENT:
- RemoveInvalidHandle(wait_state, result, wait_many_result.index);
- break;
- case MOJO_RESULT_DEADLINE_EXCEEDED:
- did_work = false;
- break;
- default:
- base::debug::Alias(&result);
- // Unexpected result is likely fatal, crash so we can determine cause.
- CHECK(false);
+
+ for (uint32_t i = 0 ; i < count; i++) {
+ const Handle handle = Handle(handles[i]);
+
+ // If the handle has been removed, or it's ID has changed, skip over it.
+ // If the handle's ID has changed, and it still satisfied its signals,
+ // then it'll be caught in the next message pump iteration.
+ const auto it = handlers_.find(handle);
+ if ((handle.value() != read_handle_.get().value()) &&
+ (it == handlers_.end() || it->second.id != ready_handles[handle])) {
+ continue;
+ }
+
+ switch (handle_results[i]) {
+ case MOJO_RESULT_CANCELLED:
+ case MOJO_RESULT_FAILED_PRECONDITION:
+ DVLOG(1) << "Error: " << handle_results[i]
+ << " handle: " << handle.value();
+ if (handle.value() == read_handle_.get().value()) {
+ // The Mojo EDK is shutting down. The ThreadQuitHelper task in
+ // base::Thread won't get run since the control pipe depends on the
+ // EDK staying alive. So quit manually to avoid this thread hanging.
+ Quit();
+ } else {
+ RemoveInvalidHandle(handle_results[i], handle);
+ }
+ break;
+ case MOJO_RESULT_OK:
+ if (handle.value() == read_handle_.get().value()) {
+ DVLOG(1) << "Signaled control pipe";
+ // Control pipe was written to.
+ ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
sky 2015/12/11 17:49:01 nullptr on these?
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Done.
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+ } else {
+ DVLOG(1) << "Handle ready: " << handle.value();
+ SignalHandleReady(handle);
+ }
+ break;
+ default:
+ base::debug::Alias(&i);
+ base::debug::Alias(&handle_results[i]);
+ // Unexpected result is likely fatal, crash so we can determine cause.
+ CHECK(false);
+ }
}
+ did_work = true;
}
// Notify and remove any handlers whose time has expired. First, iterate over
@@ -246,19 +317,18 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
return did_work;
}
-void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state,
- MojoResult result,
- uint32_t index) {
+void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) {
// TODO(sky): deal with control pipe going bad.
- CHECK(result == MOJO_RESULT_INVALID_ARGUMENT ||
- result == MOJO_RESULT_FAILED_PRECONDITION ||
- result == MOJO_RESULT_CANCELLED);
- CHECK_NE(index, 0u); // Indicates the control pipe went bad.
+ CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
+ result == MOJO_RESULT_CANCELLED ||
+ result == MOJO_RESULT_DEADLINE_EXCEEDED);
+ // Indicates the control pipe went bad.
+ CHECK_NE(handle.value(), read_handle_.get().value());
// Remove the handle first, this way if OnHandleError() tries to remove the
// handle our iterator isn't invalidated.
- Handle handle = wait_state.handles[index];
- CHECK(handlers_.find(handle) != handlers_.end());
+ if (handlers_.find(handle) == handlers_.end())
sky 2015/12/11 17:49:01 Under what conditions would this hit?
Anand Mistry (off Chromium) 2015/12/14 03:16:20 It did in a previous iteration, but not any more.
+ return;
MessagePumpMojoHandler* handler = handlers_[handle].handler;
RemoveHandler(handle);
WillSignalHandler();
@@ -280,20 +350,6 @@ void MessagePumpMojo::SignalControlPipe() {
CHECK_EQ(MOJO_RESULT_OK, result);
}
-MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const {
- WaitState wait_state;
- wait_state.handles.push_back(read_handle_.get());
- wait_state.wait_signals.push_back(
- MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
-
- for (HandleToHandler::const_iterator i = handlers_.begin();
- i != handlers_.end(); ++i) {
- wait_state.handles.push_back(i->first);
- wait_state.wait_signals.push_back(i->second.wait_signals);
- }
- return wait_state;
-}
-
MojoDeadline MessagePumpMojo::GetDeadlineForWait(
const RunState& run_state) const {
const base::TimeTicks now(internal::NowTicks());
@@ -308,6 +364,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait(
return deadline;
}
+void MessagePumpMojo::SignalHandleReady(Handle handle) {
+ DCHECK(handlers_.find(handle) != handlers_.end());
+ WillSignalHandler();
+ handlers_[handle].handler->OnHandleReady(handle);
+ DidSignalHandler();
+}
+
void MessagePumpMojo::WillSignalHandler() {
FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
}
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698