Index: mojo/public/cpp/bindings/lib/connector.cc |
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
index 4426defa934cd0362ecc2eec712730a55dbf04a9..ff9867a27c5004b6c216698c3027964ef205321f 100644 |
--- a/mojo/public/cpp/bindings/lib/connector.cc |
+++ b/mojo/public/cpp/bindings/lib/connector.cc |
@@ -188,6 +188,12 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
} |
} |
+void Connector::EnableNestedDispatch(bool enabled) { |
+ nested_dispatch_enabled_ = enabled; |
+ handle_watcher_.reset(); |
+ WaitToReadMore(); |
+} |
+ |
void Connector::OnWatcherHandleReady(MojoResult result) { |
OnHandleReadyInternal(result); |
} |
@@ -211,6 +217,7 @@ void Connector::OnHandleReadyInternal(MojoResult result) { |
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
return; |
} |
+ |
ReadAllAvailableMessages(); |
// At this point, this object might have been deleted. Return. |
} |
@@ -219,10 +226,11 @@ void Connector::WaitToReadMore() { |
CHECK(!paused_); |
DCHECK(!handle_watcher_); |
- handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); |
+ handle_watcher_.reset(new SimpleWatcher( |
+ FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); |
if (heap_profiler_tag_) |
handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
- MojoResult rv = handle_watcher_->Start( |
+ MojoResult rv = handle_watcher_->Watch( |
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
@@ -232,6 +240,8 @@ void Connector::WaitToReadMore() { |
task_runner_->PostTask( |
FROM_HERE, |
base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
+ } else { |
+ handle_watcher_->ArmOrNotify(); |
} |
if (allow_woken_up_by_others_) { |
@@ -253,6 +263,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
*read_result = rv; |
+ if (nested_dispatch_enabled_) { |
+ // When supporting nested dispatch, we have to rearm the Watcher immediately |
+ // after reading each message (i.e. before dispatch) to ensure that the next |
+ // inbound message can trigger OnHandleReady on the nested loop. |
+ handle_watcher_->ArmOrNotify(); |
+ } |
+ |
if (rv == MOJO_RESULT_OK) { |
receiver_result = |
incoming_receiver_ && incoming_receiver_->Accept(&message); |
@@ -278,19 +295,36 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
void Connector::ReadAllAvailableMessages() { |
while (!error_) { |
+ base::WeakPtr<Connector> weak_self = weak_self_; |
MojoResult rv; |
- if (!ReadSingleMessage(&rv)) { |
- // Return immediately without touching any members. |this| may have been |
- // destroyed. |
+ // May delete |this.| |
+ if (!ReadSingleMessage(&rv)) |
return; |
- } |
- if (paused_) |
+ if (!weak_self || paused_) |
return; |
- if (rv == MOJO_RESULT_SHOULD_WAIT) |
- break; |
+ DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); |
+ |
+ if (rv == MOJO_RESULT_SHOULD_WAIT) { |
+ // Attempt to re-arm the Watcher. |
+ MojoResult ready_result; |
+ MojoResult arm_result = handle_watcher_->Arm(&ready_result); |
+ if (arm_result == MOJO_RESULT_OK) |
+ return; |
+ |
+ // The watcher is already ready to notify again. |
+ DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result); |
+ |
+ if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { |
+ HandleError(false, false); |
+ return; |
+ } |
+ |
+ // There's more to read now, so we'll just keep looping. |
+ DCHECK_EQ(MOJO_RESULT_OK, ready_result); |
+ } |
} |
} |