Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
index dcfbab1545b4ef03eeac2bc6a2e365d972c7eb93..922cb6ce6111ae3ef702178457fb0671d969ad7b 100644 |
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc |
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
@@ -103,6 +103,20 @@ class MultiplexRouter::InterfaceEndpoint |
DCHECK_EQ(MOJO_RESULT_OK, result); |
} |
+ void ResetSyncMessageSignal() { |
+ router_->lock_.AssertAcquired(); |
+ |
+ if (!event_signalled_) |
+ return; |
+ |
+ DCHECK(sync_message_event_receiver_.is_valid()); |
+ MojoResult result = |
+ ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, |
+ nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
+ DCHECK_EQ(MOJO_RESULT_OK, result); |
+ event_signalled_ = false; |
+ } |
+ |
// --------------------------------------------------------------------------- |
// The following public methods (i.e., InterfaceEndpointController |
// implementation) are called by the client on the same thread as the |
@@ -199,20 +213,6 @@ class MultiplexRouter::InterfaceEndpoint |
DCHECK_EQ(MOJO_RESULT_OK, result); |
} |
- void ResetSyncMessageSignal() { |
- router_->lock_.AssertAcquired(); |
- |
- if (!event_signalled_) |
- return; |
- |
- DCHECK(sync_message_event_receiver_.is_valid()); |
- MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), |
- nullptr, nullptr, nullptr, nullptr, |
- MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
- DCHECK_EQ(MOJO_RESULT_OK, result); |
- event_signalled_ = false; |
- } |
- |
// --------------------------------------------------------------------------- |
// The following members are safe to access from any threads. |
@@ -296,6 +296,7 @@ MultiplexRouter::MultiplexRouter( |
next_interface_id_value_(1), |
posted_to_process_tasks_(false), |
encountered_error_(false), |
+ paused_(false), |
testing_mode_(false) { |
// Always participate in sync handle watching, because even if it doesn't |
// expect sync requests during sync handle watching, it may still need to |
@@ -456,6 +457,33 @@ void MultiplexRouter::CloseMessagePipe() { |
OnPipeConnectionError(); |
} |
+void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ connector_.PauseIncomingMethodCallProcessing(); |
+ |
+ base::AutoLock locker(lock_); |
+ paused_ = true; |
+ |
+ for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
+ iter->second->ResetSyncMessageSignal(); |
+} |
+ |
+void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ connector_.ResumeIncomingMethodCallProcessing(); |
+ |
+ base::AutoLock locker(lock_); |
+ paused_ = false; |
+ |
+ for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
+ auto sync_iter = sync_message_tasks_.find(iter->first); |
+ if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
+ iter->second->SignalSyncMessageEvent(); |
+ } |
+ |
+ ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
+} |
+ |
bool MultiplexRouter::HasAssociatedEndpoints() const { |
DCHECK(thread_checker_.CalledOnValidThread()); |
base::AutoLock locker(lock_); |
@@ -482,6 +510,8 @@ bool MultiplexRouter::Accept(Message* message) { |
scoped_refptr<MultiplexRouter> protector(this); |
base::AutoLock locker(lock_); |
+ DCHECK(!paused_); |
+ |
ClientCallBehavior client_call_behavior = |
connector_.during_sync_handle_watcher_callback() |
? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
@@ -590,7 +620,7 @@ void MultiplexRouter::ProcessTasks( |
if (posted_to_process_tasks_) |
return; |
- while (!tasks_.empty()) { |
+ while (!tasks_.empty() && !paused_) { |
std::unique_ptr<Task> task(std::move(tasks_.front())); |
tasks_.pop_front(); |
@@ -635,13 +665,16 @@ bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
if (iter == sync_message_tasks_.end()) |
return false; |
+ if (paused_) |
+ return true; |
+ |
MultiplexRouter::Task* task = iter->second.front(); |
iter->second.pop_front(); |
DCHECK(task->IsMessageTask()); |
std::unique_ptr<Message> message(std::move(task->message)); |
- // Note: after this call, |task| and |iter| may be invalidated. |
+ // Note: after this call, |task| and |iter| may be invalidated. |
bool processed = ProcessIncomingMessage( |
message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); |
DCHECK(processed); |
@@ -663,6 +696,8 @@ bool MultiplexRouter::ProcessNotifyErrorTask( |
ClientCallBehavior client_call_behavior, |
base::SingleThreadTaskRunner* current_task_runner) { |
DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
+ DCHECK(!paused_); |
+ |
lock_.AssertAcquired(); |
InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
if (!endpoint->client()) |
@@ -694,6 +729,7 @@ bool MultiplexRouter::ProcessIncomingMessage( |
ClientCallBehavior client_call_behavior, |
base::SingleThreadTaskRunner* current_task_runner) { |
DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
+ DCHECK(!paused_); |
lock_.AssertAcquired(); |
if (!message) { |