Chromium Code Reviews| Index: mojo/public/cpp/bindings/lib/connector.cc |
| diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
| index 4426defa934cd0362ecc2eec712730a55dbf04a9..4b029abc9902b18a6d16f11eb438c4e45b946368 100644 |
| --- a/mojo/public/cpp/bindings/lib/connector.cc |
| +++ b/mojo/public/cpp/bindings/lib/connector.cc |
| @@ -188,6 +188,12 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) { |
| } |
| } |
| +void Connector::EnableNestedDispatch(bool enabled) { |
| + nested_dispatch_enabled_ = enabled; |
| + handle_watcher_.reset(); |
| + WaitToReadMore(); |
| +} |
| + |
| void Connector::OnWatcherHandleReady(MojoResult result) { |
| OnHandleReadyInternal(result); |
| } |
| @@ -211,6 +217,7 @@ void Connector::OnHandleReadyInternal(MojoResult result) { |
| HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
| return; |
| } |
| + |
| ReadAllAvailableMessages(); |
| // At this point, this object might have been deleted. Return. |
| } |
| @@ -219,10 +226,11 @@ void Connector::WaitToReadMore() { |
| CHECK(!paused_); |
| DCHECK(!handle_watcher_); |
| - handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); |
| + handle_watcher_.reset(new SimpleWatcher( |
| + FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); |
| if (heap_profiler_tag_) |
| handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); |
| - MojoResult rv = handle_watcher_->Start( |
| + MojoResult rv = handle_watcher_->Watch( |
| message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
| base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); |
| @@ -232,6 +240,8 @@ void Connector::WaitToReadMore() { |
| task_runner_->PostTask( |
| FROM_HERE, |
| base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
| + } else { |
| + handle_watcher_->ArmOrNotify(); |
| } |
| if (allow_woken_up_by_others_) { |
| @@ -253,6 +263,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
| *read_result = rv; |
| + if (nested_dispatch_enabled_) { |
| + // When supporting nested dispatch, we have to rearm the Watcher immediately |
| + // after reading each message (i.e. before dispatch) to ensure that the next |
| + // inbound message can trigger OnHandleReady on the nested loop. |
| + handle_watcher_->ArmOrNotify(); |
| + } |
| + |
| if (rv == MOJO_RESULT_OK) { |
| receiver_result = |
| incoming_receiver_ && incoming_receiver_->Accept(&message); |
| @@ -278,19 +295,36 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
| void Connector::ReadAllAvailableMessages() { |
| while (!error_) { |
| + base::WeakPtr<Connector> weak_self = weak_self_; |
| MojoResult rv; |
| - if (!ReadSingleMessage(&rv)) { |
| - // Return immediately without touching any members. |this| may have been |
| - // destroyed. |
| + // May delete |this.| |
| + if (!ReadSingleMessage(&rv)) |
| return; |
| - } |
| - if (paused_) |
| + if (!weak_self || paused_) |
| return; |
| - if (rv == MOJO_RESULT_SHOULD_WAIT) |
| - break; |
| + DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); |
| + |
| + if (rv == MOJO_RESULT_SHOULD_WAIT) { |
| + // Attempt to re-arm the Watcher. |
| + MojoResult ready_result; |
| + MojoResult rv = handle_watcher_->Arm(&ready_result); |
|
yzshen1
2017/03/11 00:44:58
Please consider using a different name, it hides |
Ken Rockot(use gerrit already)
2017/03/12 22:24:13
Done
|
| + if (rv == MOJO_RESULT_OK) |
| + return; |
| + |
| + // The watcher is already ready to notify again. |
| + DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, rv); |
| + |
| + if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { |
| + HandleError(false, false); |
| + return; |
| + } |
| + |
| + // There's more to read now, so we'll just keep looping. |
| + DCHECK_EQ(MOJO_RESULT_OK, ready_result); |
| + } |
| } |
| } |