Chromium Code Reviews| Index: ipc/ipc_sync_channel.cc |
| diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc |
| index 6852399bdfa4d8acdaba6bd7e766e715df085947..1dbf21e806452790d63d18d6fd76d0ba0fe5b140 100644 |
| --- a/ipc/ipc_sync_channel.cc |
| +++ b/ipc/ipc_sync_channel.cc |
| @@ -24,9 +24,7 @@ |
| #include "ipc/ipc_logging.h" |
| #include "ipc/ipc_message_macros.h" |
| #include "ipc/ipc_sync_message.h" |
| -#include "ipc/mojo_event.h" |
| -#include "mojo/public/cpp/bindings/sync_handle_registry.h" |
| -#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
| +#include "mojo/public/cpp/bindings/sync_event_watcher.h" |
| using base::WaitableEvent; |
| @@ -35,35 +33,13 @@ namespace IPC { |
| namespace { |
| // A generic callback used when watching handles synchronously. Sets |*signal| |
| -// to true. Also sets |*error| to true in case of an error. |
| -void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| +// to true. |
| +void OnEventReady(bool* signal) { |
| *signal = true; |
| - *error = result != MOJO_RESULT_OK; |
| } |
| -// A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result |
| -// (DCHECKs, but is only used in cases where failure should be impossible) and |
| -// runs |callback|. |
| -void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
| - DCHECK_EQ(result, MOJO_RESULT_OK); |
| - callback.Run(); |
| -} |
| - |
| -class PumpMessagesEvent { |
| - public: |
| - PumpMessagesEvent() { event_.Signal(); } |
| - ~PumpMessagesEvent() {} |
| - |
| - const MojoEvent* event() const { return &event_; } |
| - |
| - private: |
| - MojoEvent event_; |
| - |
| - DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); |
| -}; |
| - |
| -base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = |
| - LAZY_INSTANCE_INITIALIZER; |
| +base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky |
| + g_pump_messages_event = LAZY_INSTANCE_INITIALIZER; |
| } // namespace |
| @@ -208,7 +184,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| } |
| } |
| - MojoEvent* dispatch_event() { return &dispatch_event_; } |
| + base::WaitableEvent* dispatch_event() { return &dispatch_event_; } |
| base::SingleThreadTaskRunner* listener_task_runner() { |
| return listener_task_runner_.get(); |
| } |
| @@ -230,12 +206,23 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| } |
| } |
| - mojo::SimpleWatcher* top_send_done_watcher() { |
| - return top_send_done_watcher_; |
| - } |
| - |
| - void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { |
| + // See SyncChannel::WaitForReplyWithNestedMessageLoop for details. |
| + void SetTopSendDoneState( |
| + base::WaitableEventWatcher* watcher, |
| + base::WaitableEvent* event, |
| + const base::WaitableEventWatcher::EventCallback& callback, |
| + base::WaitableEventWatcher** outer_watcher, |
| + base::WaitableEvent** outer_event, |
| + base::WaitableEventWatcher::EventCallback* outer_callback) { |
| + if (outer_watcher) { |
| + DCHECK(outer_event && outer_callback); |
| + *outer_watcher = top_send_done_watcher_; |
| + *outer_event = top_send_done_event_; |
| + *outer_callback = top_send_done_callback_; |
| + } |
| top_send_done_watcher_ = watcher; |
| + top_send_done_event_ = event; |
| + top_send_done_callback_ = callback; |
| } |
| private: |
| @@ -245,23 +232,19 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| // as manual reset. |
| ReceivedSyncMsgQueue() |
| : message_queue_version_(0), |
| + dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| + base::WaitableEvent::InitialState::NOT_SIGNALED), |
| listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| - task_pending_(false), |
| - listener_count_(0), |
| - top_send_done_watcher_(nullptr) { |
| - sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( |
| - dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, |
| - base::Unretained(this)))); |
| + sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>( |
| + &dispatch_event_, |
| + base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady, |
| + base::Unretained(this)))) { |
| sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| } |
| ~ReceivedSyncMsgQueue() {} |
| - void OnDispatchHandleReady(MojoResult result) { |
| - if (result != MOJO_RESULT_OK) |
| - return; |
| - |
| + void OnDispatchEventReady() { |
| if (dispatch_flag_) { |
| *dispatch_flag_ = true; |
| return; |
| @@ -284,23 +267,27 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| typedef std::list<QueuedMessage> SyncMessageQueue; |
| SyncMessageQueue message_queue_; |
| - uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
| + |
| + // Used to signal DispatchMessages to rescan |
| + uint32_t message_queue_version_ = 0; |
| std::vector<QueuedMessage> received_replies_; |
| // Signaled when we get a synchronous message that we must respond to, as the |
| // sender needs its reply before it can reply to our original synchronous |
| // message. |
| - MojoEvent dispatch_event_; |
| + base::WaitableEvent dispatch_event_; |
| scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| base::Lock message_lock_; |
| - bool task_pending_; |
| - int listener_count_; |
| + bool task_pending_ = false; |
| + int listener_count_ = 0; |
| - // The current send done handle watcher for this thread. Used to maintain |
| - // a thread-local stack of send done watchers to ensure that nested sync |
| - // message loops complete correctly. |
| - mojo::SimpleWatcher* top_send_done_watcher_; |
| + // The current send-done state for this thread. Used to maintain a thread- |
| + // local stack of state to ensure that nested sync message loops complete |
| + // correctly. |
| + base::WaitableEventWatcher* top_send_done_watcher_ = nullptr; |
| + base::WaitableEvent* top_send_done_event_ = nullptr; |
| + base::WaitableEventWatcher::EventCallback top_send_done_callback_; |
| // If not null, the address of a flag to set when the dispatch event signals, |
| // in lieu of actually dispatching messages. This is used by |
| @@ -309,7 +296,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
| bool* dispatch_flag_ = nullptr; |
| // Watches |dispatch_event_| during all sync handle watches on this thread. |
| - std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
| + std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_; |
| }; |
| base::LazyInstance<base::ThreadLocalPointer< |
| @@ -327,6 +314,13 @@ SyncChannel::SyncContext::SyncContext( |
| restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
| } |
| +void SyncChannel::SyncContext::OnSendDoneEventSignaled( |
| + base::RunLoop* nested_loop, |
| + base::WaitableEvent* event) { |
| + DCHECK_EQ(GetSendDoneEvent(), event); |
| + nested_loop->Quit(); |
| +} |
| + |
| SyncChannel::SyncContext::~SyncContext() { |
| while (!deserializers_.empty()) |
| Pop(); |
| @@ -349,7 +343,8 @@ bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| return false; |
| PendingSyncMsg pending( |
| SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
| - new MojoEvent); |
| + new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
| + base::WaitableEvent::InitialState::NOT_SIGNALED)); |
| deserializers_.push_back(pending); |
| return true; |
| } |
| @@ -378,12 +373,12 @@ bool SyncChannel::SyncContext::Pop() { |
| return result; |
| } |
| -MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| +base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| base::AutoLock auto_lock(deserializers_lock_); |
| return deserializers_.back().done_event; |
| } |
| -MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| +base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| return received_sync_msgs_->dispatch_event(); |
| } |
| @@ -407,7 +402,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| DVLOG(1) << "Received error reply"; |
| } |
| - MojoEvent* done_event = deserializers_.back().done_event; |
| + base::WaitableEvent* done_event = deserializers_.back().done_event; |
| TRACE_EVENT_FLOW_BEGIN0( |
| TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
| @@ -526,9 +521,7 @@ SyncChannel::SyncChannel( |
| const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| WaitableEvent* shutdown_event) |
| : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| - sync_handle_registry_(mojo::SyncHandleRegistry::current()), |
| - dispatch_watcher_(FROM_HERE, |
| - mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { |
| + sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
| // The current (listener) thread must be distinct from the IPC thread, or else |
| // sending synchronous messages will deadlock. |
| DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
| @@ -596,24 +589,28 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| bool pump_messages) { |
| context->DispatchMessages(); |
| - const MojoEvent* pump_messages_event = nullptr; |
| - if (pump_messages) |
| - pump_messages_event = g_pump_messages_event.Get().event(); |
| + base::WaitableEvent* pump_messages_event = nullptr; |
| + if (pump_messages) { |
| + if (!g_pump_messages_event.Get()) { |
| + g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>( |
| + base::WaitableEvent::ResetPolicy::MANUAL, |
| + base::WaitableEvent::InitialState::SIGNALED); |
| + } |
| + pump_messages_event = g_pump_messages_event.Get().get(); |
| + } |
| while (true) { |
| bool dispatch = false; |
| bool send_done = false; |
| bool should_pump_messages = false; |
| - bool error = false; |
| - bool registered = registry->RegisterHandle( |
| - context->GetSendDoneEvent()->GetHandle(), |
| - MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| + bool registered = registry->RegisterEvent( |
| + context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done)); |
| DCHECK(registered); |
| + |
| if (pump_messages_event) { |
| - registered = registry->RegisterHandle( |
| - pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| + registered = registry->RegisterEvent( |
| + pump_messages_event, |
| + base::Bind(&OnEventReady, &should_pump_messages)); |
| DCHECK(registered); |
| } |
| @@ -621,11 +618,10 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| context->received_sync_msgs()->BlockDispatch(&dispatch); |
| registry->WatchAllHandles(stop_flags, 3); |
| context->received_sync_msgs()->UnblockDispatch(); |
| - DCHECK(!error); |
| - registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| + registry->UnregisterEvent(context->GetSendDoneEvent()); |
| if (pump_messages_event) |
| - registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| + registry->UnregisterEvent(pump_messages_event); |
| if (dispatch) { |
| // We're waiting for a reply, but we received a blocking synchronous call. |
| @@ -643,46 +639,55 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| } |
| void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
| - mojo::SimpleWatcher send_done_watcher( |
| - FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); |
| - |
| ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
| - DCHECK_NE(sync_msg_queue, nullptr); |
| - |
| - mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
| - mojo::Handle old_handle(mojo::kInvalidHandleValue); |
| - mojo::SimpleWatcher::ReadyCallback old_callback; |
| - |
| - // Maintain a thread-local stack of watchers to ensure nested calls complete |
| - // in the correct sequence, i.e. the outermost call completes first, etc. |
| - if (old_watcher) { |
| - old_callback = old_watcher->ready_callback(); |
| - old_handle = old_watcher->handle(); |
| - old_watcher->Cancel(); |
| - } |
| + DCHECK(sync_msg_queue); |
| - sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
| + base::RunLoop nested_loop; |
| - { |
| - base::RunLoop nested_loop; |
| - send_done_watcher.Watch( |
| - context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
| - |
| - base::MessageLoop::ScopedNestableTaskAllower allow( |
| - base::MessageLoop::current()); |
| - nested_loop.Run(); |
| - send_done_watcher.Cancel(); |
| - } |
| - |
| - sync_msg_queue->set_top_send_done_watcher(old_watcher); |
| - if (old_watcher) |
| - old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
| + // WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we can nest |
| + // waiting message loops arbitrarily deep on the SyncChannel's thread. Every |
| + // such operation has a corresponding WaitableEvent to be watched which, when |
| + // signalled for IPC completion, breaks out of the loop. The innermost (i.e. |
| + // topmost) such event is stored in |sync_msg_queue| state. |
| + // |
| + // Here we preserve the current top-of-stack event state (if any) within the |
| + // local stack frame. We then replace the event state in |sync_msg_queue| with |
| + // our own and run a nested loop. If a subsequent nested loop is started |
| + // therein the process is repeated in that stack frame, and so on. |
| + // |
| + // Once the innermost nested loop is broken, the locally preserved event state |
| + // is swapped back into |sync_msg_queue| before unwinding the stack. |
| + base::WaitableEventWatcher* outer_watcher = nullptr; |
|
yzshen1
2017/03/23 20:15:50
Does it make sense to group these three things int
Ken Rockot(use gerrit already)
2017/03/23 22:04:20
Sure, that sounds nice. In fact I've gone and move
|
| + base::WaitableEvent* outer_event = nullptr; |
| + base::WaitableEventWatcher::EventCallback outer_callback; |
| + base::WaitableEventWatcher send_done_watcher; |
| + base::WaitableEvent* event = context->GetSendDoneEvent(); |
| + const base::WaitableEventWatcher::EventCallback callback = |
| + base::Bind(&SyncContext::OnSendDoneEventSignaled, context, &nested_loop); |
| + sync_msg_queue->SetTopSendDoneState(&send_done_watcher, event, callback, |
| + &outer_watcher, &outer_event, |
| + &outer_callback); |
| + if (outer_watcher) |
| + outer_watcher->StopWatching(); |
| + send_done_watcher.StartWatching(event, callback); |
| + |
| + base::MessageLoop::ScopedNestableTaskAllower allow( |
| + base::MessageLoop::current()); |
| + nested_loop.Run(); |
| + |
| + sync_msg_queue->SetTopSendDoneState( |
| + outer_watcher, outer_event, outer_callback, nullptr, nullptr, nullptr); |
| + if (outer_watcher) |
| + outer_watcher->StartWatching(outer_event, outer_callback); |
| } |
| -void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
| - DCHECK_EQ(result, MOJO_RESULT_OK); |
| +void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) { |
| + DCHECK_EQ(sync_context()->GetDispatchEvent(), event); |
| sync_context()->GetDispatchEvent()->Reset(); |
| + |
| + StartWatching(); |
| + |
| + // NOTE: May delete |this|. |
| sync_context()->DispatchMessages(); |
| } |
| @@ -691,10 +696,10 @@ void SyncChannel::StartWatching() { |
| // messages once the listener thread is unblocked and pumping its task queue. |
| // The ReceivedSyncMsgQueue also watches this event and may dispatch |
| // immediately if woken up by a message which it's allowed to dispatch. |
| - dispatch_watcher_.Watch( |
| - sync_context()->GetDispatchEvent()->GetHandle(), |
| - MOJO_HANDLE_SIGNAL_READABLE, |
| - base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); |
| + dispatch_watcher_.StartWatching( |
| + sync_context()->GetDispatchEvent(), |
| + base::Bind(&SyncChannel::OnDispatchEventSignaled, |
| + base::Unretained(this))); |
| } |
| void SyncChannel::OnChannelInit() { |