Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1365)

Unified Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/cpp/bindings/lib/connector.cc
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc
index 4426defa934cd0362ecc2eec712730a55dbf04a9..ff9867a27c5004b6c216698c3027964ef205321f 100644
--- a/mojo/public/cpp/bindings/lib/connector.cc
+++ b/mojo/public/cpp/bindings/lib/connector.cc
@@ -188,6 +188,12 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) {
}
}
+void Connector::EnableNestedDispatch(bool enabled) {
+ nested_dispatch_enabled_ = enabled;
+ handle_watcher_.reset();
+ WaitToReadMore();
+}
+
void Connector::OnWatcherHandleReady(MojoResult result) {
OnHandleReadyInternal(result);
}
@@ -211,6 +217,7 @@ void Connector::OnHandleReadyInternal(MojoResult result) {
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
return;
}
+
ReadAllAvailableMessages();
// At this point, this object might have been deleted. Return.
}
@@ -219,10 +226,11 @@ void Connector::WaitToReadMore() {
CHECK(!paused_);
DCHECK(!handle_watcher_);
- handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
+ handle_watcher_.reset(new SimpleWatcher(
+ FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
if (heap_profiler_tag_)
handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
- MojoResult rv = handle_watcher_->Start(
+ MojoResult rv = handle_watcher_->Watch(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
@@ -232,6 +240,8 @@ void Connector::WaitToReadMore() {
task_runner_->PostTask(
FROM_HERE,
base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
+ } else {
+ handle_watcher_->ArmOrNotify();
}
if (allow_woken_up_by_others_) {
@@ -253,6 +263,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
*read_result = rv;
+ if (nested_dispatch_enabled_) {
+ // When supporting nested dispatch, we have to rearm the Watcher immediately
+ // after reading each message (i.e. before dispatch) to ensure that the next
+ // inbound message can trigger OnHandleReady on the nested loop.
+ handle_watcher_->ArmOrNotify();
+ }
+
if (rv == MOJO_RESULT_OK) {
receiver_result =
incoming_receiver_ && incoming_receiver_->Accept(&message);
@@ -278,19 +295,36 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
void Connector::ReadAllAvailableMessages() {
while (!error_) {
+ base::WeakPtr<Connector> weak_self = weak_self_;
MojoResult rv;
- if (!ReadSingleMessage(&rv)) {
- // Return immediately without touching any members. |this| may have been
- // destroyed.
+ // May delete |this.|
+ if (!ReadSingleMessage(&rv))
return;
- }
- if (paused_)
+ if (!weak_self || paused_)
return;
- if (rv == MOJO_RESULT_SHOULD_WAIT)
- break;
+ DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
+
+ if (rv == MOJO_RESULT_SHOULD_WAIT) {
+ // Attempt to re-arm the Watcher.
+ MojoResult ready_result;
+ MojoResult arm_result = handle_watcher_->Arm(&ready_result);
+ if (arm_result == MOJO_RESULT_OK)
+ return;
+
+ // The watcher is already ready to notify again.
+ DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
+
+ if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
+ HandleError(false, false);
+ return;
+ }
+
+ // There's more to read now, so we'll just keep looping.
+ DCHECK_EQ(MOJO_RESULT_OK, ready_result);
+ }
}
}

Powered by Google App Engine
This is Rietveld 408576698