Index: mojo/public/cpp/bindings/lib/connector.cc |
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc |
index 55263a94fcde5d3064096fef012f510f697bd6a9..70f9fd78b2a3576c62e828cbb63d0ea8b14d6776 100644 |
--- a/mojo/public/cpp/bindings/lib/connector.cc |
+++ b/mojo/public/cpp/bindings/lib/connector.cc |
@@ -53,8 +53,7 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, |
enforce_errors_from_incoming_receiver_(true), |
paused_(false), |
lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), |
- register_sync_handle_watch_count_(0), |
- registered_with_sync_handle_watcher_(false), |
+ allow_woken_up_by_others_(false), |
sync_handle_watcher_callback_count_(0), |
weak_factory_(this) { |
weak_self_ = weak_factory_.GetWeakPtr(); |
@@ -74,7 +73,7 @@ void Connector::CloseMessagePipe() { |
CancelWait(); |
MayAutoLock locker(lock_.get()); |
- Close(std::move(message_pipe_)); |
+ message_pipe_.reset(); |
} |
ScopedMessagePipeHandle Connector::PassMessagePipe() { |
@@ -192,61 +191,25 @@ bool Connector::Accept(Message* message) { |
return true; |
} |
-bool Connector::RegisterSyncHandleWatch() { |
+void Connector::AllowWokenUpBySyncWatchOnSameThread() { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- if (error_) |
- return false; |
- |
- register_sync_handle_watch_count_++; |
+ allow_woken_up_by_others_ = true; |
- if (!registered_with_sync_handle_watcher_ && !paused_) { |
- registered_with_sync_handle_watcher_ = |
- SyncHandleWatcher::current()->RegisterHandle( |
- message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
- base::Unretained(this))); |
- } |
- return true; |
+ EnsureSyncWatcherExists(); |
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
-void Connector::UnregisterSyncHandleWatch() { |
+bool Connector::SyncWatch(const bool* should_stop) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- if (register_sync_handle_watch_count_ == 0) { |
- NOTREACHED(); |
- return; |
- } |
- |
- register_sync_handle_watch_count_--; |
- if (register_sync_handle_watch_count_ > 0) |
- return; |
- |
- if (registered_with_sync_handle_watcher_) { |
- SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
- registered_with_sync_handle_watcher_ = false; |
- } |
-} |
- |
-bool Connector::RunSyncHandleWatch(const bool* should_stop) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- DCHECK_GT(register_sync_handle_watch_count_, 0u); |
- |
if (error_) |
return false; |
ResumeIncomingMethodCallProcessing(); |
- if (!should_stop_sync_handle_watch_) |
- should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false); |
- |
- // This object may be destroyed during the WatchAllHandles() call. So we have |
- // to preserve the boolean that WatchAllHandles uses. |
- scoped_refptr<base::RefCountedData<bool>> preserver = |
- should_stop_sync_handle_watch_; |
- const bool* should_stop_array[] = {should_stop, |
- &should_stop_sync_handle_watch_->data}; |
- return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); |
+ EnsureSyncWatcherExists(); |
+ return sync_watcher_->SyncWatch(should_stop); |
} |
void Connector::OnWatcherHandleReady(MojoResult result) { |
@@ -291,13 +254,9 @@ void Connector::WaitToReadMore() { |
base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); |
} |
- if (register_sync_handle_watch_count_ > 0 && |
- !registered_with_sync_handle_watcher_) { |
- registered_with_sync_handle_watcher_ = |
- SyncHandleWatcher::current()->RegisterHandle( |
- message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
- base::Unretained(this))); |
+ if (allow_woken_up_by_others_) { |
+ EnsureSyncWatcherExists(); |
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
} |
@@ -355,26 +314,17 @@ void Connector::ReadAllAvailableMessages() { |
void Connector::CancelWait() { |
handle_watcher_.Cancel(); |
- |
- if (registered_with_sync_handle_watcher_) { |
- SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); |
- registered_with_sync_handle_watcher_ = false; |
- } |
- |
- if (should_stop_sync_handle_watch_) |
- should_stop_sync_handle_watch_->data = true; |
+ sync_watcher_.reset(); |
} |
void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
if (error_ || !message_pipe_.is_valid()) |
return; |
- if (during_sync_handle_watcher_callback() || paused_) { |
- // Enforce calling the error handler asynchronously if: |
- // - currently we are in a sync handle watcher callback. We don't want the |
- // error handler to reenter an ongoing sync call. |
- // - the user has paused receiving messages. We need to wait until the user |
- // starts receiving messages again. |
+ if (paused_) { |
+ // Enforce calling the error handler asynchronously if the user has paused |
+ // receiving messages. We need to wait until the user starts receiving |
+ // messages again. |
force_async_handler = true; |
} |
@@ -384,7 +334,7 @@ void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
if (force_pipe_reset) { |
CancelWait(); |
MayAutoLock locker(lock_.get()); |
- Close(std::move(message_pipe_)); |
+ message_pipe_.reset(); |
MessagePipe dummy_pipe; |
message_pipe_ = std::move(dummy_pipe.handle0); |
} else { |
@@ -400,5 +350,14 @@ void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |
} |
} |
+void Connector::EnsureSyncWatcherExists() { |
+ if (sync_watcher_) |
+ return; |
+ sync_watcher_.reset(new SyncHandleWatcher( |
+ message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&Connector::OnSyncHandleWatcherHandleReady, |
+ base::Unretained(this)))); |
+} |
+ |
} // namespace internal |
} // namespace mojo |