Index: mojo/public/cpp/bindings/lib/connector.cc |
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
index 812b4c1133f56db708729a82055cbb24e9671456..7f9a9c30856e1a35fcc25b3e1d28d6007d02221b 100644 |
--- a/mojo/public/cpp/bindings/lib/connector.cc |
+++ b/mojo/public/cpp/bindings/lib/connector.cc |
@@ -11,7 +11,6 @@ |
#include "base/logging.h" |
#include "base/macros.h" |
#include "base/synchronization/lock.h" |
-#include "base/thread_task_runner_handle.h" |
#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
namespace mojo { |
@@ -248,7 +247,7 @@ |
return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); |
} |
-void Connector::OnWatcherHandleReady(MojoResult result) { |
+void Connector::OnHandleWatcherHandleReady(MojoResult result) { |
OnHandleReadyInternal(result); |
} |
@@ -274,21 +273,12 @@ |
} |
void Connector::WaitToReadMore() { |
+ CHECK(!handle_watcher_.is_watching()); |
CHECK(!paused_); |
- DCHECK(!handle_watcher_.IsWatching()); |
- |
- MojoResult rv = handle_watcher_.Start( |
- message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&Connector::OnWatcherHandleReady, |
- base::Unretained(this))); |
- |
- if (rv != MOJO_RESULT_OK) { |
- // If the watch failed because the handle is invalid or its conditions can |
- // no longer be met, we signal the error asynchronously to avoid reentry. |
- base::ThreadTaskRunnerHandle::Get()->PostTask( |
- FROM_HERE, base::Bind(&Connector::OnWatcherHandleReady, |
- weak_factory_.GetWeakPtr(), rv)); |
- } |
+ handle_watcher_.Start(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ base::Bind(&Connector::OnHandleWatcherHandleReady, |
+ base::Unretained(this))); |
if (register_sync_handle_watch_count_ > 0 && |
!registered_with_sync_handle_watcher_) { |
@@ -314,6 +304,13 @@ |
*read_result = rv; |
if (rv == MOJO_RESULT_OK) { |
+ // Dispatching the message may spin in a nested message loop. To ensure we |
+ // continue dispatching messages when this happens start listening for |
+ // messagse now. |
+ if (!handle_watcher_.is_watching()) { |
+ // TODO: Need to evaluate the perf impact of this. |
+ WaitToReadMore(); |
+ } |
receiver_result = |
incoming_receiver_ && incoming_receiver_->Accept(&message); |
} |
@@ -347,13 +344,21 @@ |
if (paused_) |
return; |
- if (rv == MOJO_RESULT_SHOULD_WAIT) |
+ if (rv == MOJO_RESULT_SHOULD_WAIT) { |
+ // ReadSingleMessage could end up calling HandleError which resets |
+ // message_pipe_ to a dummy one that is closed. The old EDK will see the |
+ // that the peer is closed immediately, while the new one is asynchronous |
+ // because of thread hops. In that case, there'll still be an async |
+ // waiter. |
+ if (!handle_watcher_.is_watching()) |
+ WaitToReadMore(); |
break; |
+ } |
} |
} |
void Connector::CancelWait() { |
- handle_watcher_.Cancel(); |
+ handle_watcher_.Stop(); |
if (registered_with_sync_handle_watcher_) { |
SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
@@ -389,6 +394,8 @@ |
} |
if (force_async_handler) { |
+ // |dummy_pipe.handle1| has been destructed. Reading the pipe will |
+ // eventually cause a read error on |message_pipe_| and set error state. |
if (!paused_) |
WaitToReadMore(); |
} else { |