Index: ipc/ipc_sync_channel.cc |
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc |
index e8eeec6deca4c77dbdd6968ca347456e8c1caa53..8c883adec75e2f383a6c63617ac1e9897b1012c7 100644 |
--- a/ipc/ipc_sync_channel.cc |
+++ b/ipc/ipc_sync_channel.cc |
@@ -26,6 +26,7 @@ |
#include "ipc/ipc_sync_message.h" |
#include "ipc/mojo_event.h" |
#include "mojo/public/cpp/bindings/sync_handle_registry.h" |
+#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
using base::WaitableEvent; |
@@ -101,6 +102,14 @@ class SyncChannel::ReceivedSyncMsgQueue : |
return rv; |
} |
+ // Prevents messages from being dispatched immediately when the dispatch event |
+ // is signaled. Instead, |*dispatch_flag| will be set. |
+ void BlockDispatch(bool* dispatch_flag) { dispatch_flag_ = dispatch_flag; } |
+ |
+ // Allows messages to be dispatched immediately when the dispatch event is |
+ // signaled. |
+ void UnblockDispatch() { dispatch_flag_ = nullptr; } |
+ |
// Called on IPC thread when a synchronous message or reply arrives. |
void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { |
bool was_task_pending; |
@@ -138,12 +147,17 @@ class SyncChannel::ReceivedSyncMsgQueue : |
context->DispatchMessages(); |
} |
+ // Dispatches any queued incoming sync messages. If |dispatching_context| is |
+ // not null, messages which target a restricted dispatch channel will only be |
+ // dispatched if |dispatching_context| belongs to the same restricted dispatch |
+ // group as that channel. If |dispatching_context| is null, all queued |
+ // messages are dispatched. |
void DispatchMessages(SyncContext* dispatching_context) { |
bool first_time = true; |
uint32_t expected_version = 0; |
SyncMessageQueue::iterator it; |
while (true) { |
- Message* message = NULL; |
+ Message* message = nullptr; |
scoped_refptr<SyncChannel::SyncContext> context; |
{ |
base::AutoLock auto_lock(message_lock_); |
@@ -153,7 +167,8 @@ class SyncChannel::ReceivedSyncMsgQueue : |
} |
for (; it != message_queue_.end(); it++) { |
int message_group = it->context->restrict_dispatch_group(); |
- if (message_group == kRestrictDispatchGroup_None || |
+ if (!dispatching_context || |
+ message_group == kRestrictDispatchGroup_None || |
message_group == dispatching_context->restrict_dispatch_group()) { |
message = it->message; |
context = it->context; |
@@ -165,7 +180,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
} |
} |
- if (message == NULL) |
+ if (message == nullptr) |
break; |
context->OnDispatchMessage(*message); |
delete message; |
@@ -189,7 +204,8 @@ class SyncChannel::ReceivedSyncMsgQueue : |
if (--listener_count_ == 0) { |
DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
- lazy_tls_ptr_.Pointer()->Set(NULL); |
+ lazy_tls_ptr_.Pointer()->Set(nullptr); |
+ sync_dispatch_watcher_.reset(); |
} |
} |
@@ -233,10 +249,33 @@ class SyncChannel::ReceivedSyncMsgQueue : |
listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
task_pending_(false), |
listener_count_(0), |
- top_send_done_watcher_(NULL) {} |
+ top_send_done_watcher_(nullptr) { |
+ sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( |
+ dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, |
+ base::Unretained(this)))); |
+ sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
+ } |
~ReceivedSyncMsgQueue() {} |
+ void OnDispatchHandleReady(MojoResult result) { |
+ if (result != MOJO_RESULT_OK) |
+ return; |
+ |
+ if (dispatch_flag_) { |
+ *dispatch_flag_ = true; |
+ return; |
+ } |
+ |
+ // We were woken up during a sync wait, but no specific SyncChannel is |
+ // currently waiting. i.e., some other Mojo interface on this thread is |
+ // waiting for a response. Since we don't support anything analogous to |
+ // restricted dispatch on Mojo interfaces, in this case it's safe to |
+ // dispatch sync messages for any context. |
+ DispatchMessages(nullptr); |
+ } |
+ |
// Holds information about a queued synchronous message or reply. |
struct QueuedMessage { |
QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
@@ -263,6 +302,15 @@ class SyncChannel::ReceivedSyncMsgQueue : |
// a thread-local stack of send done watchers to ensure that nested sync |
// message loops complete correctly. |
mojo::Watcher* top_send_done_watcher_; |
+ |
+ // If not null, the address of a flag to set when the dispatch event signals, |
+ // in lieu of actually dispatching messages. This is used by |
+ // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
+ // allowed to process while it's waiting. |
+ bool* dispatch_flag_ = nullptr; |
+ |
+ // Watches |dispatch_event_| during all sync handle watches on this thread. |
+ std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
}; |
base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
@@ -505,7 +553,8 @@ scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { |
bool SyncChannel::Send(Message* message) { |
#ifdef IPC_MESSAGE_LOG_ENABLED |
std::string name; |
- Logging::GetInstance()->GetMessageText(message->type(), &name, message, NULL); |
+ Logging::GetInstance()->GetMessageText( |
+ message->type(), &name, message, nullptr); |
TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name); |
#else |
TRACE_EVENT2("ipc", "SyncChannel::Send", |
@@ -531,7 +580,7 @@ bool SyncChannel::Send(Message* message) { |
ChannelProxy::Send(message); |
// Wait for reply, or for any other incoming synchronous messages. |
- // *this* might get deleted, so only call static functions at this point. |
+ // |this| might get deleted, so only call static functions at this point. |
scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; |
WaitForReply(registry.get(), context.get(), pump_messages); |
@@ -555,31 +604,32 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
bool send_done = false; |
bool should_pump_messages = false; |
bool error = false; |
- registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), |
- MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&OnSyncHandleReady, &dispatch, &error)); |
- registry->RegisterHandle( |
+ bool registered = registry->RegisterHandle( |
context->GetSendDoneEvent()->GetHandle(), |
MOJO_HANDLE_SIGNAL_READABLE, |
base::Bind(&OnSyncHandleReady, &send_done, &error)); |
+ DCHECK(registered); |
if (pump_messages_event) { |
- registry->RegisterHandle( |
+ registered = registry->RegisterHandle( |
pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
+ DCHECK(registered); |
} |
const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
+ context->received_sync_msgs()->BlockDispatch(&dispatch); |
registry->WatchAllHandles(stop_flags, 3); |
+ context->received_sync_msgs()->UnblockDispatch(); |
DCHECK(!error); |
- registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); |
+ |
registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
if (pump_messages_event) |
registry->UnregisterHandle(pump_messages_event->GetHandle()); |
if (dispatch) { |
- // We're waiting for a reply, but we received a blocking synchronous |
- // call. We must process it or otherwise a deadlock might occur. |
+ // We're waiting for a reply, but we received a blocking synchronous call. |
+ // We must process it to avoid potential deadlocks. |
context->GetDispatchEvent()->Reset(); |
context->DispatchMessages(); |
continue; |
@@ -638,10 +688,10 @@ void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
} |
void SyncChannel::StartWatching() { |
- // Ideally we only want to watch this object when running a nested message |
- // loop. However, we don't know when it exits if there's another nested |
- // message loop running under it or not, so we wouldn't know whether to |
- // stop or keep watching. So we always watch it. |
+ // |dispatch_watcher_| watches the event asynchronously, only dispatching |
+ // messages once the listener thread is unblocked and pumping its task queue. |
+ // The ReceivedSyncMsgQueue also watches this event and may dispatch |
+ // immediately if woken up by a message which it's allowed to dispatch. |
dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
MOJO_HANDLE_SIGNAL_READABLE, |
base::Bind(&SyncChannel::OnDispatchHandleReady, |