| Index: ipc/ipc_sync_channel.cc
|
| diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
|
| index 6852399bdfa4d8acdaba6bd7e766e715df085947..0d61e08ce945b01e27fd3005b138c1bfbc5c7eee 100644
|
| --- a/ipc/ipc_sync_channel.cc
|
| +++ b/ipc/ipc_sync_channel.cc
|
| @@ -24,9 +24,7 @@
|
| #include "ipc/ipc_logging.h"
|
| #include "ipc/ipc_message_macros.h"
|
| #include "ipc/ipc_sync_message.h"
|
| -#include "ipc/mojo_event.h"
|
| -#include "mojo/public/cpp/bindings/sync_handle_registry.h"
|
| -#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
|
| +#include "mojo/public/cpp/bindings/sync_event_watcher.h"
|
|
|
| using base::WaitableEvent;
|
|
|
| @@ -35,35 +33,13 @@ namespace IPC {
|
| namespace {
|
|
|
| // A generic callback used when watching handles synchronously. Sets |*signal|
|
| -// to true. Also sets |*error| to true in case of an error.
|
| -void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
|
| +// to true.
|
| +void OnEventReady(bool* signal) {
|
| *signal = true;
|
| - *error = result != MOJO_RESULT_OK;
|
| }
|
|
|
| -// A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result
|
| -// (DCHECKs, but is only used in cases where failure should be impossible) and
|
| -// runs |callback|.
|
| -void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
|
| - DCHECK_EQ(result, MOJO_RESULT_OK);
|
| - callback.Run();
|
| -}
|
| -
|
| -class PumpMessagesEvent {
|
| - public:
|
| - PumpMessagesEvent() { event_.Signal(); }
|
| - ~PumpMessagesEvent() {}
|
| -
|
| - const MojoEvent* event() const { return &event_; }
|
| -
|
| - private:
|
| - MojoEvent event_;
|
| -
|
| - DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
|
| -};
|
| -
|
| -base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
|
| - LAZY_INSTANCE_INITIALIZER;
|
| +base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
|
| + g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
|
|
|
| } // namespace
|
|
|
| @@ -87,6 +63,60 @@ base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
|
| class SyncChannel::ReceivedSyncMsgQueue :
|
| public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
|
| public:
|
| + // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we
|
| + // may nest waiting message loops arbitrarily deep on the SyncChannel's
|
| + // thread. Every such operation has a corresponding WaitableEvent to be
|
| + // watched which, when signalled for IPC completion, breaks out of the loop.
|
| + // A reference to the innermost (i.e. topmost) watcher is held in
|
| + // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|.
|
| + //
|
| + // NestedSendDoneWatcher provides a simple scoper which is used by
|
| + // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done"
|
| + // event, preserving the previous topmost state on the local stack until the
|
| + // new inner loop is broken. If yet another subsequent nested loop is started
|
| + // therein the process is repeated again in the new inner stack frame, and so
|
| + // on.
|
| + //
|
| + // When this object is destroyed on stack unwind, the previous topmost state
|
| + // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|,
|
| + // and its watch is resumed immediately.
|
| + class NestedSendDoneWatcher {
|
| + public:
|
| + NestedSendDoneWatcher(SyncChannel::SyncContext* context,
|
| + base::RunLoop* run_loop)
|
| + : sync_msg_queue_(context->received_sync_msgs()),
|
| + outer_state_(sync_msg_queue_->top_send_done_event_watcher_),
|
| + event_(context->GetSendDoneEvent()),
|
| + callback_(
|
| + base::Bind(&SyncChannel::SyncContext::OnSendDoneEventSignaled,
|
| + context,
|
| + run_loop)) {
|
| + sync_msg_queue_->top_send_done_event_watcher_ = this;
|
| + if (outer_state_)
|
| + outer_state_->StopWatching();
|
| + StartWatching();
|
| + }
|
| +
|
| + ~NestedSendDoneWatcher() {
|
| + sync_msg_queue_->top_send_done_event_watcher_ = outer_state_;
|
| + if (outer_state_)
|
| + outer_state_->StartWatching();
|
| + }
|
| +
|
| + private:
|
| + void StartWatching() { watcher_.StartWatching(event_, callback_); }
|
| + void StopWatching() { watcher_.StopWatching(); }
|
| +
|
| + ReceivedSyncMsgQueue* const sync_msg_queue_;
|
| + NestedSendDoneWatcher* const outer_state_;
|
| +
|
| + base::WaitableEvent* const event_;
|
| + const base::WaitableEventWatcher::EventCallback callback_;
|
| + base::WaitableEventWatcher watcher_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher);
|
| + };
|
| +
|
| // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
|
| // if necessary. Call RemoveContext on the same thread when done.
|
| static ReceivedSyncMsgQueue* AddContext() {
|
| @@ -208,7 +238,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| }
|
| }
|
|
|
| - MojoEvent* dispatch_event() { return &dispatch_event_; }
|
| + base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
|
| base::SingleThreadTaskRunner* listener_task_runner() {
|
| return listener_task_runner_.get();
|
| }
|
| @@ -230,14 +260,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| }
|
| }
|
|
|
| - mojo::SimpleWatcher* top_send_done_watcher() {
|
| - return top_send_done_watcher_;
|
| - }
|
| -
|
| - void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) {
|
| - top_send_done_watcher_ = watcher;
|
| - }
|
| -
|
| private:
|
| friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
|
|
|
| @@ -245,23 +267,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| // as manual reset.
|
| ReceivedSyncMsgQueue()
|
| : message_queue_version_(0),
|
| + dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
|
| + base::WaitableEvent::InitialState::NOT_SIGNALED),
|
| listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
|
| - task_pending_(false),
|
| - listener_count_(0),
|
| - top_send_done_watcher_(nullptr) {
|
| - sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher(
|
| - dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
|
| - base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady,
|
| - base::Unretained(this))));
|
| + sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>(
|
| + &dispatch_event_,
|
| + base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
|
| + base::Unretained(this)))) {
|
| sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
|
| }
|
|
|
| ~ReceivedSyncMsgQueue() {}
|
|
|
| - void OnDispatchHandleReady(MojoResult result) {
|
| - if (result != MOJO_RESULT_OK)
|
| - return;
|
| -
|
| + void OnDispatchEventReady() {
|
| if (dispatch_flag_) {
|
| *dispatch_flag_ = true;
|
| return;
|
| @@ -284,23 +302,25 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
|
|
| typedef std::list<QueuedMessage> SyncMessageQueue;
|
| SyncMessageQueue message_queue_;
|
| - uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
|
| +
|
| + // Used to signal DispatchMessages to rescan
|
| + uint32_t message_queue_version_ = 0;
|
|
|
| std::vector<QueuedMessage> received_replies_;
|
|
|
| // Signaled when we get a synchronous message that we must respond to, as the
|
| // sender needs its reply before it can reply to our original synchronous
|
| // message.
|
| - MojoEvent dispatch_event_;
|
| + base::WaitableEvent dispatch_event_;
|
| scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
|
| base::Lock message_lock_;
|
| - bool task_pending_;
|
| - int listener_count_;
|
| + bool task_pending_ = false;
|
| + int listener_count_ = 0;
|
|
|
| - // The current send done handle watcher for this thread. Used to maintain
|
| - // a thread-local stack of send done watchers to ensure that nested sync
|
| - // message loops complete correctly.
|
| - mojo::SimpleWatcher* top_send_done_watcher_;
|
| + // The current NestedSendDoneWatcher for this thread, if we're currently
|
| + // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See
|
| + // NestedSendDoneWatcher comments for more details.
|
| + NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr;
|
|
|
| // If not null, the address of a flag to set when the dispatch event signals,
|
| // in lieu of actually dispatching messages. This is used by
|
| @@ -309,7 +329,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| bool* dispatch_flag_ = nullptr;
|
|
|
| // Watches |dispatch_event_| during all sync handle watches on this thread.
|
| - std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_;
|
| + std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
|
| };
|
|
|
| base::LazyInstance<base::ThreadLocalPointer<
|
| @@ -327,6 +347,13 @@ SyncChannel::SyncContext::SyncContext(
|
| restrict_dispatch_group_(kRestrictDispatchGroup_None) {
|
| }
|
|
|
| +void SyncChannel::SyncContext::OnSendDoneEventSignaled(
|
| + base::RunLoop* nested_loop,
|
| + base::WaitableEvent* event) {
|
| + DCHECK_EQ(GetSendDoneEvent(), event);
|
| + nested_loop->Quit();
|
| +}
|
| +
|
| SyncChannel::SyncContext::~SyncContext() {
|
| while (!deserializers_.empty())
|
| Pop();
|
| @@ -349,7 +376,8 @@ bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
|
| return false;
|
| PendingSyncMsg pending(
|
| SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
|
| - new MojoEvent);
|
| + new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
|
| + base::WaitableEvent::InitialState::NOT_SIGNALED));
|
| deserializers_.push_back(pending);
|
| return true;
|
| }
|
| @@ -378,12 +406,12 @@ bool SyncChannel::SyncContext::Pop() {
|
| return result;
|
| }
|
|
|
| -MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
|
| +base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
|
| base::AutoLock auto_lock(deserializers_lock_);
|
| return deserializers_.back().done_event;
|
| }
|
|
|
| -MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
|
| +base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
|
| return received_sync_msgs_->dispatch_event();
|
| }
|
|
|
| @@ -407,7 +435,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
|
| DVLOG(1) << "Received error reply";
|
| }
|
|
|
| - MojoEvent* done_event = deserializers_.back().done_event;
|
| + base::WaitableEvent* done_event = deserializers_.back().done_event;
|
| TRACE_EVENT_FLOW_BEGIN0(
|
| TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
|
| "SyncChannel::SyncContext::TryToUnblockListener", done_event);
|
| @@ -526,9 +554,7 @@ SyncChannel::SyncChannel(
|
| const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
|
| WaitableEvent* shutdown_event)
|
| : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
|
| - sync_handle_registry_(mojo::SyncHandleRegistry::current()),
|
| - dispatch_watcher_(FROM_HERE,
|
| - mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
|
| + sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
|
| // The current (listener) thread must be distinct from the IPC thread, or else
|
| // sending synchronous messages will deadlock.
|
| DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
|
| @@ -596,36 +622,39 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
|
| bool pump_messages) {
|
| context->DispatchMessages();
|
|
|
| - const MojoEvent* pump_messages_event = nullptr;
|
| - if (pump_messages)
|
| - pump_messages_event = g_pump_messages_event.Get().event();
|
| + base::WaitableEvent* pump_messages_event = nullptr;
|
| + if (pump_messages) {
|
| + if (!g_pump_messages_event.Get()) {
|
| + g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>(
|
| + base::WaitableEvent::ResetPolicy::MANUAL,
|
| + base::WaitableEvent::InitialState::SIGNALED);
|
| + }
|
| + pump_messages_event = g_pump_messages_event.Get().get();
|
| + }
|
|
|
| while (true) {
|
| bool dispatch = false;
|
| bool send_done = false;
|
| bool should_pump_messages = false;
|
| - bool error = false;
|
| - bool registered = registry->RegisterHandle(
|
| - context->GetSendDoneEvent()->GetHandle(),
|
| - MOJO_HANDLE_SIGNAL_READABLE,
|
| - base::Bind(&OnSyncHandleReady, &send_done, &error));
|
| + bool registered = registry->RegisterEvent(
|
| + context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done));
|
| DCHECK(registered);
|
| +
|
| if (pump_messages_event) {
|
| - registered = registry->RegisterHandle(
|
| - pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
|
| - base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
|
| + registered = registry->RegisterEvent(
|
| + pump_messages_event,
|
| + base::Bind(&OnEventReady, &should_pump_messages));
|
| DCHECK(registered);
|
| }
|
|
|
| const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
|
| context->received_sync_msgs()->BlockDispatch(&dispatch);
|
| - registry->WatchAllHandles(stop_flags, 3);
|
| + registry->Wait(stop_flags, 3);
|
| context->received_sync_msgs()->UnblockDispatch();
|
| - DCHECK(!error);
|
|
|
| - registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
|
| + registry->UnregisterEvent(context->GetSendDoneEvent());
|
| if (pump_messages_event)
|
| - registry->UnregisterHandle(pump_messages_event->GetHandle());
|
| + registry->UnregisterEvent(pump_messages_event);
|
|
|
| if (dispatch) {
|
| // We're waiting for a reply, but we received a blocking synchronous call.
|
| @@ -643,46 +672,20 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
|
| }
|
|
|
| void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
|
| - mojo::SimpleWatcher send_done_watcher(
|
| - FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
|
| -
|
| - ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
|
| - DCHECK_NE(sync_msg_queue, nullptr);
|
| -
|
| - mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher();
|
| - mojo::Handle old_handle(mojo::kInvalidHandleValue);
|
| - mojo::SimpleWatcher::ReadyCallback old_callback;
|
| -
|
| - // Maintain a thread-local stack of watchers to ensure nested calls complete
|
| - // in the correct sequence, i.e. the outermost call completes first, etc.
|
| - if (old_watcher) {
|
| - old_callback = old_watcher->ready_callback();
|
| - old_handle = old_watcher->handle();
|
| - old_watcher->Cancel();
|
| - }
|
| -
|
| - sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
|
| -
|
| - {
|
| - base::RunLoop nested_loop;
|
| - send_done_watcher.Watch(
|
| - context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
|
| - base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
|
| -
|
| - base::MessageLoop::ScopedNestableTaskAllower allow(
|
| - base::MessageLoop::current());
|
| - nested_loop.Run();
|
| - send_done_watcher.Cancel();
|
| - }
|
| -
|
| - sync_msg_queue->set_top_send_done_watcher(old_watcher);
|
| - if (old_watcher)
|
| - old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
|
| + base::MessageLoop::ScopedNestableTaskAllower allow(
|
| + base::MessageLoop::current());
|
| + base::RunLoop nested_loop;
|
| + ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(context, &nested_loop);
|
| + nested_loop.Run();
|
| }
|
|
|
| -void SyncChannel::OnDispatchHandleReady(MojoResult result) {
|
| - DCHECK_EQ(result, MOJO_RESULT_OK);
|
| +void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
|
| + DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
|
| sync_context()->GetDispatchEvent()->Reset();
|
| +
|
| + StartWatching();
|
| +
|
| + // NOTE: May delete |this|.
|
| sync_context()->DispatchMessages();
|
| }
|
|
|
| @@ -691,10 +694,10 @@ void SyncChannel::StartWatching() {
|
| // messages once the listener thread is unblocked and pumping its task queue.
|
| // The ReceivedSyncMsgQueue also watches this event and may dispatch
|
| // immediately if woken up by a message which it's allowed to dispatch.
|
| - dispatch_watcher_.Watch(
|
| - sync_context()->GetDispatchEvent()->GetHandle(),
|
| - MOJO_HANDLE_SIGNAL_READABLE,
|
| - base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this)));
|
| + dispatch_watcher_.StartWatching(
|
| + sync_context()->GetDispatchEvent(),
|
| + base::Bind(&SyncChannel::OnDispatchEventSignaled,
|
| + base::Unretained(this)));
|
| }
|
|
|
| void SyncChannel::OnChannelInit() {
|
|
|