| Index: mojo/public/cpp/bindings/lib/connector.cc | 
| diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc | 
| index 4426defa934cd0362ecc2eec712730a55dbf04a9..ff9867a27c5004b6c216698c3027964ef205321f 100644 | 
| --- a/mojo/public/cpp/bindings/lib/connector.cc | 
| +++ b/mojo/public/cpp/bindings/lib/connector.cc | 
| @@ -188,6 +188,12 @@ void Connector::SetWatcherHeapProfilerTag(const char* tag) { | 
| } | 
| } | 
|  | 
| +void Connector::EnableNestedDispatch(bool enabled) { | 
| +  nested_dispatch_enabled_ = enabled; | 
| +  handle_watcher_.reset(); | 
| +  WaitToReadMore(); | 
| +} | 
| + | 
| void Connector::OnWatcherHandleReady(MojoResult result) { | 
| OnHandleReadyInternal(result); | 
| } | 
| @@ -211,6 +217,7 @@ void Connector::OnHandleReadyInternal(MojoResult result) { | 
| HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); | 
| return; | 
| } | 
| + | 
| ReadAllAvailableMessages(); | 
| // At this point, this object might have been deleted. Return. | 
| } | 
| @@ -219,10 +226,11 @@ void Connector::WaitToReadMore() { | 
| CHECK(!paused_); | 
| DCHECK(!handle_watcher_); | 
|  | 
| -  handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); | 
| +  handle_watcher_.reset(new SimpleWatcher( | 
| +      FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); | 
| if (heap_profiler_tag_) | 
| handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); | 
| -  MojoResult rv = handle_watcher_->Start( | 
| +  MojoResult rv = handle_watcher_->Watch( | 
| message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, | 
| base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); | 
|  | 
| @@ -232,6 +240,8 @@ void Connector::WaitToReadMore() { | 
| task_runner_->PostTask( | 
| FROM_HERE, | 
| base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); | 
| +  } else { | 
| +    handle_watcher_->ArmOrNotify(); | 
| } | 
|  | 
| if (allow_woken_up_by_others_) { | 
| @@ -253,6 +263,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { | 
| const MojoResult rv = ReadMessage(message_pipe_.get(), &message); | 
| *read_result = rv; | 
|  | 
| +  if (nested_dispatch_enabled_) { | 
| +    // When supporting nested dispatch, we have to rearm the Watcher immediately | 
| +    // after reading each message (i.e. before dispatch) to ensure that the next | 
| +    // inbound message can trigger OnHandleReady on the nested loop. | 
| +    handle_watcher_->ArmOrNotify(); | 
| +  } | 
| + | 
| if (rv == MOJO_RESULT_OK) { | 
| receiver_result = | 
| incoming_receiver_ && incoming_receiver_->Accept(&message); | 
| @@ -278,19 +295,36 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { | 
|  | 
| void Connector::ReadAllAvailableMessages() { | 
| while (!error_) { | 
| +    base::WeakPtr<Connector> weak_self = weak_self_; | 
| MojoResult rv; | 
|  | 
| -    if (!ReadSingleMessage(&rv)) { | 
| -      // Return immediately without touching any members. |this| may have been | 
| -      // destroyed. | 
| +    // May delete |this.| | 
| +    if (!ReadSingleMessage(&rv)) | 
| return; | 
| -    } | 
|  | 
| -    if (paused_) | 
| +    if (!weak_self || paused_) | 
| return; | 
|  | 
| -    if (rv == MOJO_RESULT_SHOULD_WAIT) | 
| -      break; | 
| +    DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); | 
| + | 
| +    if (rv == MOJO_RESULT_SHOULD_WAIT) { | 
| +      // Attempt to re-arm the Watcher. | 
| +      MojoResult ready_result; | 
| +      MojoResult arm_result = handle_watcher_->Arm(&ready_result); | 
| +      if (arm_result == MOJO_RESULT_OK) | 
| +        return; | 
| + | 
| +      // The watcher is already ready to notify again. | 
| +      DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result); | 
| + | 
| +      if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { | 
| +        HandleError(false, false); | 
| +        return; | 
| +      } | 
| + | 
| +      // There's more to read now, so we'll just keep looping. | 
| +      DCHECK_EQ(MOJO_RESULT_OK, ready_result); | 
| +    } | 
| } | 
| } | 
|  | 
|  |