Index: mojo/public/cpp/bindings/lib/connector.cc |
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
index 4426defa934cd0362ecc2eec712730a55dbf04a9..aa018341ef8a7eea60851386b18434e547ab072f 100644 |
--- a/mojo/public/cpp/bindings/lib/connector.cc |
+++ b/mojo/public/cpp/bindings/lib/connector.cc |
@@ -188,6 +188,12 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
} |
} |
+void Connector::EnableNestedDispatch(bool enabled) { |
+ nested_dispatch_enabled_ = enabled; |
+ CancelWait(); |
yzshen1
2017/03/03 00:03:50
Will this cause problem while we are in the middle
Ken Rockot(use gerrit already)
2017/03/03 00:37:05
Good point. Now I just reset handle_watcher_ befor
|
+ WaitToReadMore(); |
+} |
+ |
void Connector::OnWatcherHandleReady(MojoResult result) { |
OnHandleReadyInternal(result); |
} |
@@ -211,15 +217,50 @@ void Connector::OnHandleReadyInternal(MojoResult result) { |
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
return; |
} |
- ReadAllAvailableMessages(); |
- // At this point, this object might have been deleted. Return. |
+ |
+ for (;;) { |
yzshen1
2017/03/03 00:03:50
nit: Does it make sense to merge ReadAllAvailableM
Ken Rockot(use gerrit already)
2017/03/03 00:37:05
Done
|
+ DestructionTracker::Flag was_destroyed(&destruction_tracker_); |
+ |
+ // May delete |this|. |
+ ReadAllAvailableMessages(); |
+ |
+ if (was_destroyed) |
+ return; |
+ |
+ // We also may have been paused by some dispatch, in which case we're done. |
+ if (!handle_watcher_) |
+ return; |
+ |
+ // Attempt to re-arm the Watcher. |
+ result = handle_watcher_->Arm(); |
+ switch (result) { |
+ case MOJO_RESULT_OK: |
+ // Everything's cool. No more work to do. |
+ return; |
+ |
+ case MOJO_RESULT_ALREADY_EXISTS: |
+ // The handle is already readable again. Continue reading messagexs. |
+ break; |
+ |
+ case MOJO_RESULT_FAILED_PRECONDITION: |
+ // The handle will never be readable again. Notify of error immediately. |
+ // May delete |this|. |
+ HandleError(false, false); |
+ return; |
+ |
+ default: |
+ NOTREACHED(); |
+ break; |
+ } |
+ } |
} |
void Connector::WaitToReadMore() { |
CHECK(!paused_); |
DCHECK(!handle_watcher_); |
- handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); |
+ handle_watcher_.reset( |
+ new Watcher(FROM_HERE, Watcher::ArmingPolicy::MANUAL, task_runner_)); |
if (heap_profiler_tag_) |
handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
MojoResult rv = handle_watcher_->Start( |
@@ -232,6 +273,8 @@ void Connector::WaitToReadMore() { |
task_runner_->PostTask( |
FROM_HERE, |
base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
+ } else { |
+ handle_watcher_->ArmOrNotify(); |
} |
if (allow_woken_up_by_others_) { |
@@ -253,6 +296,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
*read_result = rv; |
+ if (nested_dispatch_enabled_) { |
+ // When supporting nested dispatch, we have to rearm the Watcher immediately |
+ // after reading each message (i.e. before dispatch) to ensure that the next |
+ // inbound message can trigger OnHandleReady on the nested loop. |
+ handle_watcher_->ArmOrNotify(); |
+ } |
+ |
if (rv == MOJO_RESULT_OK) { |
receiver_result = |
incoming_receiver_ && incoming_receiver_->Accept(&message); |