| Index: mojo/public/cpp/bindings/lib/connector.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc
|
| index 4426defa934cd0362ecc2eec712730a55dbf04a9..ff9867a27c5004b6c216698c3027964ef205321f 100644
|
| --- a/mojo/public/cpp/bindings/lib/connector.cc
|
| +++ b/mojo/public/cpp/bindings/lib/connector.cc
|
| @@ -188,6 +188,12 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) {
|
| }
|
| }
|
|
|
| +void Connector::EnableNestedDispatch(bool enabled) {
|
| + nested_dispatch_enabled_ = enabled;
|
| + handle_watcher_.reset();
|
| + WaitToReadMore();
|
| +}
|
| +
|
| void Connector::OnWatcherHandleReady(MojoResult result) {
|
| OnHandleReadyInternal(result);
|
| }
|
| @@ -211,6 +217,7 @@ void Connector::OnHandleReadyInternal(MojoResult result) {
|
| HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
|
| return;
|
| }
|
| +
|
| ReadAllAvailableMessages();
|
| // At this point, this object might have been deleted. Return.
|
| }
|
| @@ -219,10 +226,11 @@ void Connector::WaitToReadMore() {
|
| CHECK(!paused_);
|
| DCHECK(!handle_watcher_);
|
|
|
| - handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
|
| + handle_watcher_.reset(new SimpleWatcher(
|
| + FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
|
| if (heap_profiler_tag_)
|
| handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
|
| - MojoResult rv = handle_watcher_->Start(
|
| + MojoResult rv = handle_watcher_->Watch(
|
| message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
|
| base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
|
|
|
| @@ -232,6 +240,8 @@ void Connector::WaitToReadMore() {
|
| task_runner_->PostTask(
|
| FROM_HERE,
|
| base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
|
| + } else {
|
| + handle_watcher_->ArmOrNotify();
|
| }
|
|
|
| if (allow_woken_up_by_others_) {
|
| @@ -253,6 +263,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
|
| const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
|
| *read_result = rv;
|
|
|
| + if (nested_dispatch_enabled_) {
|
| + // When supporting nested dispatch, we have to rearm the Watcher immediately
|
| + // after reading each message (i.e. before dispatch) to ensure that the next
|
| + // inbound message can trigger OnHandleReady on the nested loop.
|
| + handle_watcher_->ArmOrNotify();
|
| + }
|
| +
|
| if (rv == MOJO_RESULT_OK) {
|
| receiver_result =
|
| incoming_receiver_ && incoming_receiver_->Accept(&message);
|
| @@ -278,19 +295,36 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
|
|
|
| void Connector::ReadAllAvailableMessages() {
|
| while (!error_) {
|
| + base::WeakPtr<Connector> weak_self = weak_self_;
|
| MojoResult rv;
|
|
|
| - if (!ReadSingleMessage(&rv)) {
|
| - // Return immediately without touching any members. |this| may have been
|
| - // destroyed.
|
| + // May delete |this.|
|
| + if (!ReadSingleMessage(&rv))
|
| return;
|
| - }
|
|
|
| - if (paused_)
|
| + if (!weak_self || paused_)
|
| return;
|
|
|
| - if (rv == MOJO_RESULT_SHOULD_WAIT)
|
| - break;
|
| + DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
|
| +
|
| + if (rv == MOJO_RESULT_SHOULD_WAIT) {
|
| + // Attempt to re-arm the Watcher.
|
| + MojoResult ready_result;
|
| + MojoResult arm_result = handle_watcher_->Arm(&ready_result);
|
| + if (arm_result == MOJO_RESULT_OK)
|
| + return;
|
| +
|
| + // The watcher is already ready to notify again.
|
| + DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
|
| +
|
| + if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
|
| + HandleError(false, false);
|
| + return;
|
| + }
|
| +
|
| + // There's more to read now, so we'll just keep looping.
|
| + DCHECK_EQ(MOJO_RESULT_OK, ready_result);
|
| + }
|
| }
|
| }
|
|
|
|
|