| Index: mojo/public/cpp/bindings/lib/connector.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc
|
| index ceccaa9a109128d0d64d13c82165702beccec1aa..4eff31d2acef7230ee289064735223d7e3e63919 100644
|
| --- a/mojo/public/cpp/bindings/lib/connector.cc
|
| +++ b/mojo/public/cpp/bindings/lib/connector.cc
|
| @@ -7,9 +7,11 @@
|
| #include <stdint.h>
|
| #include <utility>
|
|
|
| +#include "base/bind.h"
|
| #include "base/logging.h"
|
| #include "base/macros.h"
|
| #include "base/synchronization/lock.h"
|
| +#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
|
|
|
| namespace mojo {
|
| namespace internal {
|
| @@ -52,8 +54,11 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe,
|
| drop_writes_(false),
|
| enforce_errors_from_incoming_receiver_(true),
|
| paused_(false),
|
| - destroyed_flag_(nullptr),
|
| - lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) {
|
| + lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
|
| + register_sync_handle_watch_count_(0),
|
| + registered_with_sync_handle_watcher_(false),
|
| + sync_handle_watcher_callback_count_(0),
|
| + weak_factory_(this) {
|
| // Even though we don't have an incoming receiver, we still want to monitor
|
| // the message pipe to know if is closed or encounters an error.
|
| WaitToReadMore();
|
| @@ -62,9 +67,6 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe,
|
| Connector::~Connector() {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| - if (destroyed_flag_)
|
| - *destroyed_flag_ = true;
|
| -
|
| CancelWait();
|
| }
|
|
|
| @@ -191,17 +193,76 @@ bool Connector::Accept(Message* message) {
|
| return true;
|
| }
|
|
|
| +bool Connector::RegisterSyncHandleWatch() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + if (error_)
|
| + return false;
|
| +
|
| + register_sync_handle_watch_count_++;
|
| +
|
| + if (!registered_with_sync_handle_watcher_ && !paused_) {
|
| + registered_with_sync_handle_watcher_ =
|
| + SyncHandleWatcher::current()->RegisterHandle(
|
| + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
|
| + base::Unretained(this)));
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +void Connector::UnregisterSyncHandleWatch() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + if (register_sync_handle_watch_count_ == 0) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
| +
|
| + register_sync_handle_watch_count_--;
|
| + if (register_sync_handle_watch_count_ > 0)
|
| + return;
|
| +
|
| + if (registered_with_sync_handle_watcher_) {
|
| + SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
|
| + registered_with_sync_handle_watcher_ = false;
|
| + }
|
| +}
|
| +
|
| +bool Connector::RunSyncHandleWatch(const bool* should_stop) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DCHECK_GT(register_sync_handle_watch_count_, 0u);
|
| +
|
| + if (error_)
|
| + return false;
|
| +
|
| + ResumeIncomingMethodCallProcessing();
|
| +
|
| + return SyncHandleWatcher::current()->WatchAllHandles(message_pipe_.get(),
|
| + should_stop);
|
| +}
|
| +
|
| // static
|
| void Connector::CallOnHandleReady(void* closure, MojoResult result) {
|
| Connector* self = static_cast<Connector*>(closure);
|
| - self->OnHandleReady(result);
|
| + CHECK(self->async_wait_id_ != 0);
|
| + self->async_wait_id_ = 0;
|
| + self->OnHandleReadyInternal(result);
|
| }
|
|
|
| -void Connector::OnHandleReady(MojoResult result) {
|
| +void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
|
| + base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr());
|
| +
|
| + sync_handle_watcher_callback_count_++;
|
| + OnHandleReadyInternal(result);
|
| + // At this point, this object might have been deleted.
|
| + if (weak_self)
|
| + sync_handle_watcher_callback_count_--;
|
| +}
|
| +
|
| +void Connector::OnHandleReadyInternal(MojoResult result) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| - CHECK(async_wait_id_ != 0);
|
| - async_wait_id_ = 0;
|
| if (result != MOJO_RESULT_OK) {
|
| HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
|
| return;
|
| @@ -218,6 +279,15 @@ void Connector::WaitToReadMore() {
|
| MOJO_DEADLINE_INDEFINITE,
|
| &Connector::CallOnHandleReady,
|
| this);
|
| +
|
| + if (register_sync_handle_watch_count_ > 0 &&
|
| + !registered_with_sync_handle_watcher_) {
|
| + registered_with_sync_handle_watcher_ =
|
| + SyncHandleWatcher::current()->RegisterHandle(
|
| + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
|
| + base::Unretained(this)));
|
| + }
|
| }
|
|
|
| bool Connector::ReadSingleMessage(MojoResult* read_result) {
|
| @@ -227,9 +297,7 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
|
|
|
| // Detect if |this| was destroyed during message dispatch. Allow for the
|
| // possibility of re-entering ReadMore() through message dispatch.
|
| - bool was_destroyed_during_dispatch = false;
|
| - bool* previous_destroyed_flag = destroyed_flag_;
|
| - destroyed_flag_ = &was_destroyed_during_dispatch;
|
| + base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr();
|
|
|
| Message message;
|
| const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
|
| @@ -247,13 +315,8 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
|
| incoming_receiver_ && incoming_receiver_->Accept(&message);
|
| }
|
|
|
| - if (was_destroyed_during_dispatch) {
|
| - if (previous_destroyed_flag)
|
| - *previous_destroyed_flag = true; // Propagate flag.
|
| + if (!weak_self)
|
| return false;
|
| - }
|
| -
|
| - destroyed_flag_ = previous_destroyed_flag;
|
|
|
| if (rv == MOJO_RESULT_SHOULD_WAIT)
|
| return true;
|
| @@ -295,11 +358,15 @@ void Connector::ReadAllAvailableMessages() {
|
| }
|
|
|
| void Connector::CancelWait() {
|
| - if (!async_wait_id_)
|
| - return;
|
| + if (async_wait_id_) {
|
| + waiter_->CancelWait(async_wait_id_);
|
| + async_wait_id_ = 0;
|
| + }
|
|
|
| - waiter_->CancelWait(async_wait_id_);
|
| - async_wait_id_ = 0;
|
| + if (registered_with_sync_handle_watcher_) {
|
| + SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
|
| + registered_with_sync_handle_watcher_ = false;
|
| + }
|
| }
|
|
|
| void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
|
|
|