Chromium Code Reviews| Index: ipc/ipc_sync_channel.cc |
| diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc |
| index e8eeec6deca4c77dbdd6968ca347456e8c1caa53..14934b196dc5a836715eaeb5a2d699253343a092 100644 |
| --- a/ipc/ipc_sync_channel.cc |
| +++ b/ipc/ipc_sync_channel.cc |
| @@ -26,6 +26,7 @@ |
| #include "ipc/ipc_sync_message.h" |
| #include "ipc/mojo_event.h" |
| #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
| +#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
| using base::WaitableEvent; |
| @@ -101,6 +102,14 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| return rv; |
| } |
| + // Prevents messages from being dispatched immediately when the dispatch event |
| + // is signaled. Instead, |*dispatch_flag| will be set. |
| + void BlockDispatch(bool *dispatch_flag) { dispatch_flag_ = dispatch_flag; } |
|
yzshen1
2016/08/02 17:38:37
style nit: "*" should be next to "bool" instead of
Ken Rockot(use gerrit already)
2016/08/02 19:05:05
Done
|
| + |
| + // Allows messages to be dispatched immediately when the dispatch event is |
| + // signaled. |
| + void UnblockDispatch() { dispatch_flag_ = nullptr; } |
| + |
| // Called on IPC thread when a synchronous message or reply arrives. |
| void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { |
| bool was_task_pending; |
| @@ -138,12 +147,17 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| context->DispatchMessages(); |
| } |
| + // Dispatches any queued incoming sync messages. If |dispatching_context| is |
| + // not null, messages which target a restricted dispatch channel will only be |
| + // dispatched if |dispatching_context| belongs to the same restricted dispatch |
| + // group as that channel. If |dispatching_context| is null, all queued |
| + // messages are dispatched. |
| void DispatchMessages(SyncContext* dispatching_context) { |
| bool first_time = true; |
| uint32_t expected_version = 0; |
| SyncMessageQueue::iterator it; |
| while (true) { |
| - Message* message = NULL; |
| + Message* message = nullptr; |
| scoped_refptr<SyncChannel::SyncContext> context; |
| { |
| base::AutoLock auto_lock(message_lock_); |
| @@ -153,7 +167,8 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| } |
| for (; it != message_queue_.end(); it++) { |
| int message_group = it->context->restrict_dispatch_group(); |
| - if (message_group == kRestrictDispatchGroup_None || |
| + if (!dispatching_context || |
| + message_group == kRestrictDispatchGroup_None || |
| message_group == dispatching_context->restrict_dispatch_group()) { |
| message = it->message; |
| context = it->context; |
| @@ -165,7 +180,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| } |
| } |
| - if (message == NULL) |
| + if (message == nullptr) |
| break; |
| context->OnDispatchMessage(*message); |
| delete message; |
| @@ -189,7 +204,8 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| if (--listener_count_ == 0) { |
| DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| - lazy_tls_ptr_.Pointer()->Set(NULL); |
| + lazy_tls_ptr_.Pointer()->Set(nullptr); |
| + sync_dispatch_watcher_.reset(); |
| } |
| } |
| @@ -233,10 +249,33 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| task_pending_(false), |
| listener_count_(0), |
| - top_send_done_watcher_(NULL) {} |
| + top_send_done_watcher_(nullptr) { |
| + sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( |
| + dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, |
| + base::Unretained(this)))); |
| + sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| + } |
| ~ReceivedSyncMsgQueue() {} |
| + void OnDispatchHandleReady(MojoResult result) { |
| + if (result != MOJO_RESULT_OK) |
| + return; |
| + |
| + if (dispatch_flag_) { |
| + *dispatch_flag_ = true; |
| + return; |
| + } |
| + |
| + // We were woken up during a sync wait, but no specific SyncChannel is |
| + // currently waiting. i.e., some other Mojo interface on this thread is |
| + // waiting for a response. Since we don't support anything analogous to |
| + // restricted dispatch on Mojo interfaces, in this case it's safe to |
| + // dispatch sync messages for any context. |
| + DispatchMessages(nullptr); |
| + } |
| + |
| // Holds information about a queued synchronous message or reply. |
| struct QueuedMessage { |
| QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
| @@ -263,6 +302,15 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| // a thread-local stack of send done watchers to ensure that nested sync |
| // message loops complete correctly. |
| mojo::Watcher* top_send_done_watcher_; |
| + |
| + // If not null, the address of a flag to set when the dispatch event signals, |
| + // in lieu of actually dispatching messages. This is used by |
| + // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
| + // allowed to process while it's waiting. |
| + bool* dispatch_flag_ = nullptr; |
| + |
| + // Watches |dispatch_event_| during all sync handle watches on this thread. |
| + std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
| }; |
| base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| @@ -505,7 +553,8 @@ scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { |
| bool SyncChannel::Send(Message* message) { |
| #ifdef IPC_MESSAGE_LOG_ENABLED |
| std::string name; |
| - Logging::GetInstance()->GetMessageText(message->type(), &name, message, NULL); |
| + Logging::GetInstance()->GetMessageText( |
| + message->type(), &name, message, nullptr); |
| TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name); |
| #else |
| TRACE_EVENT2("ipc", "SyncChannel::Send", |
| @@ -531,7 +580,7 @@ bool SyncChannel::Send(Message* message) { |
| ChannelProxy::Send(message); |
| // Wait for reply, or for any other incoming synchronous messages. |
| - // *this* might get deleted, so only call static functions at this point. |
| + // |this| might get deleted, so only call static functions at this point. |
| scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; |
| WaitForReply(registry.get(), context.get(), pump_messages); |
| @@ -555,31 +604,32 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| bool send_done = false; |
| bool should_pump_messages = false; |
| bool error = false; |
| - registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), |
| - MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&OnSyncHandleReady, &dispatch, &error)); |
| - registry->RegisterHandle( |
| + bool registered = registry->RegisterHandle( |
| context->GetSendDoneEvent()->GetHandle(), |
| MOJO_HANDLE_SIGNAL_READABLE, |
| base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| + DCHECK(registered); |
| if (pump_messages_event) { |
| - registry->RegisterHandle( |
| + registered = registry->RegisterHandle( |
| pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| + DCHECK(registered); |
| } |
| const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
| + context->received_sync_msgs()->BlockDispatch(&dispatch); |
| registry->WatchAllHandles(stop_flags, 3); |
| + context->received_sync_msgs()->UnblockDispatch(); |
| DCHECK(!error); |
| - registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); |
| + |
| registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| if (pump_messages_event) |
| registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| if (dispatch) { |
| - // We're waiting for a reply, but we received a blocking synchronous |
| - // call. We must process it or otherwise a deadlock might occur. |
| + // We're waiting for a reply, but we received a blocking synchronous call. |
| + // We must process it to avoid potential deadlocks. |
| context->GetDispatchEvent()->Reset(); |
| context->DispatchMessages(); |
| continue; |
| @@ -638,10 +688,10 @@ void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
| } |
| void SyncChannel::StartWatching() { |
| - // Ideally we only want to watch this object when running a nested message |
| - // loop. However, we don't know when it exits if there's another nested |
| - // message loop running under it or not, so we wouldn't know whether to |
| - // stop or keep watching. So we always watch it. |
| + // |dispatch_watcher_| watches the event asynchronously, only dispatching |
| + // messages once the listener thread is unblocked and pumping its task queue. |
| + // The ReceivedSyncMsgQueue also watches this event and may dispatch |
| + // immediately if woken up by the same |
|
yzshen1
2016/08/02 17:38:37
the comment doesn't seem to be complete.
Ken Rockot(use gerrit already)
2016/08/02 19:05:05
Yikes - Done
|
| dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
| MOJO_HANDLE_SIGNAL_READABLE, |
| base::Bind(&SyncChannel::OnDispatchHandleReady, |