Index: mojo/public/cpp/bindings/lib/connector.cc |
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
index ceccaa9a109128d0d64d13c82165702beccec1aa..4eff31d2acef7230ee289064735223d7e3e63919 100644 |
--- a/mojo/public/cpp/bindings/lib/connector.cc |
+++ b/mojo/public/cpp/bindings/lib/connector.cc |
@@ -7,9 +7,11 @@ |
#include <stdint.h> |
#include <utility> |
+#include "base/bind.h" |
#include "base/logging.h" |
#include "base/macros.h" |
#include "base/synchronization/lock.h" |
+#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
namespace mojo { |
namespace internal { |
@@ -52,8 +54,11 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, |
drop_writes_(false), |
enforce_errors_from_incoming_receiver_(true), |
paused_(false), |
- destroyed_flag_(nullptr), |
- lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { |
+ lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), |
+ register_sync_handle_watch_count_(0), |
+ registered_with_sync_handle_watcher_(false), |
+ sync_handle_watcher_callback_count_(0), |
+ weak_factory_(this) { |
// Even though we don't have an incoming receiver, we still want to monitor |
// the message pipe to know if is closed or encounters an error. |
WaitToReadMore(); |
@@ -62,9 +67,6 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, |
Connector::~Connector() { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- if (destroyed_flag_) |
- *destroyed_flag_ = true; |
- |
CancelWait(); |
} |
@@ -191,17 +193,76 @@ bool Connector::Accept(Message* message) { |
return true; |
} |
+bool Connector::RegisterSyncHandleWatch() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ if (error_) |
+ return false; |
+ |
+ register_sync_handle_watch_count_++; |
+ |
+ if (!registered_with_sync_handle_watcher_ && !paused_) { |
+ registered_with_sync_handle_watcher_ = |
+ SyncHandleWatcher::current()->RegisterHandle( |
+ message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
+ base::Unretained(this))); |
+ } |
+ return true; |
+} |
+ |
+void Connector::UnregisterSyncHandleWatch() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ if (register_sync_handle_watch_count_ == 0) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ register_sync_handle_watch_count_--; |
+ if (register_sync_handle_watch_count_ > 0) |
+ return; |
+ |
+ if (registered_with_sync_handle_watcher_) { |
+ SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
+ registered_with_sync_handle_watcher_ = false; |
+ } |
+} |
+ |
+bool Connector::RunSyncHandleWatch(const bool* should_stop) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK_GT(register_sync_handle_watch_count_, 0u); |
+ |
+ if (error_) |
+ return false; |
+ |
+ ResumeIncomingMethodCallProcessing(); |
+ |
+ return SyncHandleWatcher::current()->WatchAllHandles(message_pipe_.get(), |
+ should_stop); |
+} |
+ |
// static |
void Connector::CallOnHandleReady(void* closure, MojoResult result) { |
Connector* self = static_cast<Connector*>(closure); |
- self->OnHandleReady(result); |
+ CHECK(self->async_wait_id_ != 0); |
+ self->async_wait_id_ = 0; |
+ self->OnHandleReadyInternal(result); |
} |
-void Connector::OnHandleReady(MojoResult result) { |
+void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { |
+ base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); |
+ |
+ sync_handle_watcher_callback_count_++; |
+ OnHandleReadyInternal(result); |
+ // At this point, this object might have been deleted. |
+ if (weak_self) |
+ sync_handle_watcher_callback_count_--; |
+} |
+ |
+void Connector::OnHandleReadyInternal(MojoResult result) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- CHECK(async_wait_id_ != 0); |
- async_wait_id_ = 0; |
if (result != MOJO_RESULT_OK) { |
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); |
return; |
@@ -218,6 +279,15 @@ void Connector::WaitToReadMore() { |
MOJO_DEADLINE_INDEFINITE, |
&Connector::CallOnHandleReady, |
this); |
+ |
+ if (register_sync_handle_watch_count_ > 0 && |
+ !registered_with_sync_handle_watcher_) { |
+ registered_with_sync_handle_watcher_ = |
+ SyncHandleWatcher::current()->RegisterHandle( |
+ message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
+ base::Unretained(this))); |
+ } |
} |
bool Connector::ReadSingleMessage(MojoResult* read_result) { |
@@ -227,9 +297,7 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
// Detect if |this| was destroyed during message dispatch. Allow for the |
// possibility of re-entering ReadMore() through message dispatch. |
- bool was_destroyed_during_dispatch = false; |
- bool* previous_destroyed_flag = destroyed_flag_; |
- destroyed_flag_ = &was_destroyed_during_dispatch; |
+ base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); |
Message message; |
const MojoResult rv = ReadMessage(message_pipe_.get(), &message); |
@@ -247,13 +315,8 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { |
incoming_receiver_ && incoming_receiver_->Accept(&message); |
} |
- if (was_destroyed_during_dispatch) { |
- if (previous_destroyed_flag) |
- *previous_destroyed_flag = true; // Propagate flag. |
+ if (!weak_self) |
return false; |
- } |
- |
- destroyed_flag_ = previous_destroyed_flag; |
if (rv == MOJO_RESULT_SHOULD_WAIT) |
return true; |
@@ -295,11 +358,15 @@ void Connector::ReadAllAvailableMessages() { |
} |
void Connector::CancelWait() { |
- if (!async_wait_id_) |
- return; |
+ if (async_wait_id_) { |
+ waiter_->CancelWait(async_wait_id_); |
+ async_wait_id_ = 0; |
+ } |
- waiter_->CancelWait(async_wait_id_); |
- async_wait_id_ = 0; |
+ if (registered_with_sync_handle_watcher_) { |
+ SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
+ registered_with_sync_handle_watcher_ = false; |
+ } |
} |
void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |