Chromium Code Reviews| Index: ipc/ipc_sync_channel.cc |
| diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc |
| index 8e3f4115b5a5c8426c13edc583389b57faeb822b..99258953427a85048377f647c6065d9d78b80140 100644 |
| --- a/ipc/ipc_sync_channel.cc |
| +++ b/ipc/ipc_sync_channel.cc |
| @@ -13,9 +13,10 @@ |
| #include "base/lazy_instance.h" |
| #include "base/location.h" |
| #include "base/logging.h" |
| +#include "base/macros.h" |
| #include "base/memory/ptr_util.h" |
| +#include "base/run_loop.h" |
| #include "base/synchronization/waitable_event.h" |
| -#include "base/synchronization/waitable_event_watcher.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "base/trace_event/trace_event.h" |
| @@ -23,12 +24,74 @@ |
| #include "ipc/ipc_logging.h" |
| #include "ipc/ipc_message_macros.h" |
| #include "ipc/ipc_sync_message.h" |
| +#include "ipc/mojo_event.h" |
| +#include "mojo/public/cpp/bindings/sync_handle_registry.h" |
| -using base::TimeDelta; |
| -using base::TimeTicks; |
| using base::WaitableEvent; |
| namespace IPC { |
| + |
| +namespace { |
| + |
| +// A generic callback used when watching handles synchronously. Sets |*signal| |
| +// to true. Also sets |*error| to true in case of an error. |
| +void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| + *signal = true; |
| + *error = result != MOJO_RESULT_OK; |
| +} |
| + |
| +// A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but |
| +// is only used in cases where failure should be impossible) and runs |
| +// |callback|. |
| +void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
| + DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
| + if (result == MOJO_RESULT_OK) |
| + callback.Run(); |
| +} |
| + |
| +} // namespace |
| + |
| +// A lazy thread-local Mojo Event which is always signaled. Used to wake up the |
| +// sync waiter when a SyncMessage requires the MessageLoop to be pumped while |
| +// waiting for a reply. This object is created lazily and ref-counted so it can |
| +// be cleaned up when no longer in use. |
|
jam
2016/06/21 20:09:05
is cleaning up really necessary? i.e. is this prem
|
| +class SyncChannel::PumpMessagesEvent |
| + : public base::RefCountedThreadSafe<PumpMessagesEvent> { |
| + public: |
| + static scoped_refptr<PumpMessagesEvent> current() { |
| + scoped_refptr<PumpMessagesEvent> current = current_event_.Pointer()->Get(); |
| + if (!current) { |
| + current = new PumpMessagesEvent; |
| + current_event_.Pointer()->Set(current.get()); |
| + } |
| + return current; |
| + } |
| + |
| + const MojoEvent* event() const { return &event_; } |
| + |
| + private: |
| + friend class base::RefCountedThreadSafe<PumpMessagesEvent>; |
| + |
| + PumpMessagesEvent() { event_.Signal(); } |
| + |
| + ~PumpMessagesEvent() { |
| + DCHECK_EQ(current_event_.Pointer()->Get(), this); |
| + current_event_.Pointer()->Set(nullptr); |
| + } |
| + |
| + MojoEvent event_; |
| + |
| + static base::LazyInstance<base::ThreadLocalPointer< |
| + SyncChannel::PumpMessagesEvent>> current_event_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); |
| +}; |
| + |
| +base::LazyInstance<base::ThreadLocalPointer<SyncChannel::PumpMessagesEvent>> |
| + SyncChannel::PumpMessagesEvent::current_event_ = |
| + LAZY_INSTANCE_INITIALIZER; |
| + |
| + |
| // When we're blocked in a Send(), we need to process incoming synchronous |
| // messages right away because it could be blocking our reply (either |
| // directly from the same object we're calling, or indirectly through one or |
| @@ -155,7 +218,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| } |
| } |
| - WaitableEvent* dispatch_event() { return &dispatch_event_; } |
| + MojoEvent* dispatch_event() { return &dispatch_event_; } |
| base::SingleThreadTaskRunner* listener_task_runner() { |
| return listener_task_runner_.get(); |
| } |
| @@ -177,11 +240,11 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| } |
| } |
| - base::WaitableEventWatcher* top_send_done_watcher() { |
| + mojo::Watcher* top_send_done_watcher() { |
| return top_send_done_watcher_; |
| } |
| - void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { |
| + void set_top_send_done_watcher(mojo::Watcher* watcher) { |
| top_send_done_watcher_ = watcher; |
| } |
| @@ -192,8 +255,6 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| // as manual reset. |
| ReceivedSyncMsgQueue() |
| : message_queue_version_(0), |
| - dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| - base::WaitableEvent::InitialState::NOT_SIGNALED), |
| listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| task_pending_(false), |
| listener_count_(0), |
| @@ -214,19 +275,19 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| std::vector<QueuedMessage> received_replies_; |
| - // Set when we got a synchronous message that we must respond to as the |
| + // Signaled when we get a synchronous message that we must respond to, as the |
| // sender needs its reply before it can reply to our original synchronous |
| // message. |
| - WaitableEvent dispatch_event_; |
| + MojoEvent dispatch_event_; |
| scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| base::Lock message_lock_; |
| bool task_pending_; |
| int listener_count_; |
| - // The current send done event watcher for this thread. Used to maintain |
| - // a local global stack of send done watchers to ensure that nested sync |
| + // The current send done handle watcher for this thread. Used to maintain |
| + // a thread-local stack of send done watchers to ensure that nested sync |
| // message loops complete correctly. |
| - base::WaitableEventWatcher* top_send_done_watcher_; |
| + mojo::Watcher* top_send_done_watcher_; |
| }; |
| base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| @@ -249,9 +310,9 @@ SyncChannel::SyncContext::~SyncContext() { |
| } |
| // Adds information about an outgoing sync message to the context so that |
| -// we know how to deserialize the reply. Returns a handle that's set when |
| -// the reply has arrived. |
| -void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| +// we know how to deserialize the reply. Returns |true| if the message was added |
| +// to the context or |false| if it was rejected (e.g. due to shutdown.) |
| +bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| // Create the tracking information for this message. This object is stored |
| // by value since all members are pointers that are cheap to copy. These |
| // pointers are cleaned up in the Pop() function. |
| @@ -260,12 +321,14 @@ void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| // OnObjectSignalled, another Send can happen which would stop the watcher |
| // from being called. The event would get watched later, when the nested |
| // Send completes, so the event will need to remain set. |
| + base::AutoLock auto_lock(deserializers_lock_); |
| + if (reject_new_deserializers_) |
| + return false; |
| PendingSyncMsg pending( |
| SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
| - new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
| - base::WaitableEvent::InitialState::NOT_SIGNALED)); |
| - base::AutoLock auto_lock(deserializers_lock_); |
| + new MojoEvent); |
| deserializers_.push_back(pending); |
| + return true; |
| } |
| bool SyncChannel::SyncContext::Pop() { |
| @@ -275,7 +338,7 @@ bool SyncChannel::SyncContext::Pop() { |
| PendingSyncMsg msg = deserializers_.back(); |
| delete msg.deserializer; |
| delete msg.done_event; |
| - msg.done_event = NULL; |
| + msg.done_event = nullptr; |
| deserializers_.pop_back(); |
| result = msg.send_result; |
| } |
| @@ -292,12 +355,12 @@ bool SyncChannel::SyncContext::Pop() { |
| return result; |
| } |
| -WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| +MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| base::AutoLock auto_lock(deserializers_lock_); |
| return deserializers_.back().done_event; |
| } |
| -WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| +MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| return received_sync_msgs_->dispatch_event(); |
| } |
| @@ -321,7 +384,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| DVLOG(1) << "Received error reply"; |
| } |
| - base::WaitableEvent* done_event = deserializers_.back().done_event; |
| + MojoEvent* done_event = deserializers_.back().done_event; |
| TRACE_EVENT_FLOW_BEGIN0( |
| TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
| @@ -367,7 +430,7 @@ void SyncChannel::SyncContext::OnChannelError() { |
| void SyncChannel::SyncContext::OnChannelOpened() { |
| shutdown_watcher_.StartWatching( |
| shutdown_event_, |
| - base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, |
| + base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled, |
| base::Unretained(this))); |
| Context::OnChannelOpened(); |
| } |
| @@ -380,6 +443,7 @@ void SyncChannel::SyncContext::OnChannelClosed() { |
| void SyncChannel::SyncContext::CancelPendingSends() { |
| base::AutoLock auto_lock(deserializers_lock_); |
| + reject_new_deserializers_ = true; |
| PendingSyncMessageQueue::iterator iter; |
| DVLOG(1) << "Canceling pending sends"; |
| for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
| @@ -390,21 +454,12 @@ void SyncChannel::SyncContext::CancelPendingSends() { |
| } |
| } |
| -void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { |
| - if (event == shutdown_event_) { |
| - // Process shut down before we can get a reply to a synchronous message. |
| - // Cancel pending Send calls, which will end up setting the send done event. |
| - CancelPendingSends(); |
| - } else { |
| - // We got the reply, timed out or the process shutdown. |
| - DCHECK_EQ(GetSendDoneEvent(), event); |
| - base::MessageLoop::current()->QuitNow(); |
| - } |
| -} |
| +void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) { |
| + DCHECK_EQ(event, shutdown_event_); |
| -base::WaitableEventWatcher::EventCallback |
| - SyncChannel::SyncContext::MakeWaitableEventCallback() { |
| - return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); |
| + // Process shut down before we can get a reply to a synchronous message. |
| + // Cancel pending Send calls, which will end up setting the send done event. |
| + CancelPendingSends(); |
| } |
| // static |
| @@ -447,7 +502,9 @@ SyncChannel::SyncChannel( |
| Listener* listener, |
| const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| WaitableEvent* shutdown_event) |
| - : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { |
| + : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| + pump_messages_event_(PumpMessagesEvent::current()), |
| + sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
| // The current (listener) thread must be distinct from the IPC thread, or else |
| // sending synchronous messages will deadlock. |
| DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
| @@ -486,23 +543,25 @@ bool SyncChannel::Send(Message* message) { |
| return true; |
| } |
| + SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| + bool pump_messages = sync_msg->ShouldPumpMessages(); |
| + |
| // *this* might get deleted in WaitForReply. |
| scoped_refptr<SyncContext> context(sync_context()); |
| - if (context->shutdown_event()->IsSignaled()) { |
| - DVLOG(1) << "shutdown event is signaled"; |
| + if (!context->Push(sync_msg)) { |
| + DVLOG(1) << "Channel is shutting down. Dropping sync message."; |
| delete message; |
| return false; |
| } |
| - SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| - context->Push(sync_msg); |
| - WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); |
| - |
| ChannelProxy::Send(message); |
| // Wait for reply, or for any other incoming synchronous messages. |
| // *this* might get deleted, so only call static functions at this point. |
| - WaitForReply(context.get(), pump_messages_event); |
| + scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; |
| + scoped_refptr<PumpMessagesEvent> pump_messages_event = pump_messages_event_; |
| + WaitForReply(registry.get(), context.get(), |
| + pump_messages ? pump_messages_event->event() : nullptr); |
| TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| "SyncChannel::Send", context->GetSendDoneEvent()); |
| @@ -510,19 +569,39 @@ bool SyncChannel::Send(Message* message) { |
| return context->Pop(); |
| } |
| -void SyncChannel::WaitForReply( |
| - SyncContext* context, WaitableEvent* pump_messages_event) { |
| +void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| + SyncContext* context, |
| + const MojoEvent* pump_messages_event) { |
| context->DispatchMessages(); |
| + |
| while (true) { |
| - WaitableEvent* objects[] = { |
| - context->GetDispatchEvent(), |
| - context->GetSendDoneEvent(), |
| - pump_messages_event |
| - }; |
| - |
| - unsigned count = pump_messages_event ? 3: 2; |
| - size_t result = WaitableEvent::WaitMany(objects, count); |
| - if (result == 0 /* dispatch event */) { |
| + bool dispatch = false; |
| + bool send_done = false; |
| + bool should_pump_messages = false; |
| + bool error = false; |
| + registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&OnSyncHandleReady, &dispatch, &error)); |
| + registry->RegisterHandle( |
| + context->GetSendDoneEvent()->GetHandle(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| + if (pump_messages_event) { |
| + registry->RegisterHandle( |
| + pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| + } |
| + |
| + const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
| + registry->WatchAllHandles(stop_flags, 3); |
| + DCHECK(!error); |
| + |
| + registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); |
| + registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| + if (pump_messages_event) |
| + registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| + |
| + if (dispatch) { |
| // We're waiting for a reply, but we received a blocking synchronous |
| // call. We must process it or otherwise a deadlock might occur. |
| context->GetDispatchEvent()->Reset(); |
| @@ -530,7 +609,7 @@ void SyncChannel::WaitForReply( |
| continue; |
| } |
| - if (result == 2 /* pump_messages_event */) |
| + if (should_pump_messages) |
| WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
| break; |
| @@ -538,64 +617,59 @@ void SyncChannel::WaitForReply( |
| } |
| void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
| - base::WaitableEventWatcher send_done_watcher; |
| + mojo::Watcher send_done_watcher; |
| ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
| - DCHECK(sync_msg_queue != NULL); |
| - |
| - base::WaitableEventWatcher* old_send_done_event_watcher = |
| - sync_msg_queue->top_send_done_watcher(); |
| - |
| - base::WaitableEventWatcher::EventCallback old_callback; |
| - base::WaitableEvent* old_event = NULL; |
| - |
| - // Maintain a local global stack of send done delegates to ensure that |
| - // nested sync calls complete in the correct sequence, i.e. the |
| - // outermost call completes first, etc. |
| - if (old_send_done_event_watcher) { |
| - old_callback = old_send_done_event_watcher->callback(); |
| - old_event = old_send_done_event_watcher->GetWatchedEvent(); |
| - old_send_done_event_watcher->StopWatching(); |
| + DCHECK_NE(sync_msg_queue, nullptr); |
| + |
| + mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
| + mojo::Handle old_handle(mojo::kInvalidHandleValue); |
| + mojo::Watcher::ReadyCallback old_callback; |
| + |
| + // Maintain a thread-local stack of watchers to ensure nested calls complete |
| + // in the correct sequence, i.e. the outermost call completes first, etc. |
| + if (old_watcher) { |
| + old_callback = old_watcher->ready_callback(); |
| + old_handle = old_watcher->handle(); |
| + old_watcher->Cancel(); |
| } |
| sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
| - send_done_watcher.StartWatching(context->GetSendDoneEvent(), |
| - context->MakeWaitableEventCallback()); |
| - |
| { |
| + base::RunLoop nested_loop; |
| + send_done_watcher.Start( |
| + context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
| + |
| base::MessageLoop::ScopedNestableTaskAllower allow( |
| base::MessageLoop::current()); |
| - base::MessageLoop::current()->Run(); |
| + nested_loop.Run(); |
| + send_done_watcher.Cancel(); |
| } |
| - sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); |
| - if (old_send_done_event_watcher && old_event) { |
| - old_send_done_event_watcher->StartWatching(old_event, old_callback); |
| - } |
| + sync_msg_queue->set_top_send_done_watcher(old_watcher); |
| + if (old_watcher) |
| + old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
| } |
| -void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { |
| - DCHECK(event == sync_context()->GetDispatchEvent()); |
| - // The call to DispatchMessages might delete this object, so reregister |
| - // the object watcher first. |
| - event->Reset(); |
| - dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); |
| - sync_context()->DispatchMessages(); |
| +void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
| + DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
| + if (result == MOJO_RESULT_OK) { |
| + sync_context()->GetDispatchEvent()->Reset(); |
| + sync_context()->DispatchMessages(); |
| + } |
| } |
| void SyncChannel::StartWatching() { |
| // Ideally we only want to watch this object when running a nested message |
| // loop. However, we don't know when it exits if there's another nested |
| // message loop running under it or not, so we wouldn't know whether to |
| - // stop or keep watching. So we always watch it, and create the event as |
| - // manual reset since the object watcher might otherwise reset the event |
| - // when we're doing a WaitMany. |
| - dispatch_watcher_callback_ = |
| - base::Bind(&SyncChannel::OnWaitableEventSignaled, |
| - base::Unretained(this)); |
| - dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), |
| - dispatch_watcher_callback_); |
| + // stop or keep watching. So we always watch it. |
| + dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
| + MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&SyncChannel::OnDispatchHandleReady, |
| + base::Unretained(this))); |
| } |
| void SyncChannel::OnChannelInit() { |