| Index: ipc/ipc_sync_channel.cc
|
| diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
|
| index fe2b61b193ef3bab906c8a7e36a86d4b1c259a35..e8eeec6deca4c77dbdd6968ca347456e8c1caa53 100644
|
| --- a/ipc/ipc_sync_channel.cc
|
| +++ b/ipc/ipc_sync_channel.cc
|
| @@ -13,10 +13,10 @@
|
| #include "base/lazy_instance.h"
|
| #include "base/location.h"
|
| #include "base/logging.h"
|
| +#include "base/macros.h"
|
| #include "base/memory/ptr_util.h"
|
| #include "base/run_loop.h"
|
| #include "base/synchronization/waitable_event.h"
|
| -#include "base/synchronization/waitable_event_watcher.h"
|
| #include "base/threading/thread_local.h"
|
| #include "base/threading/thread_task_runner_handle.h"
|
| #include "base/trace_event/trace_event.h"
|
| @@ -24,12 +24,49 @@
|
| #include "ipc/ipc_logging.h"
|
| #include "ipc/ipc_message_macros.h"
|
| #include "ipc/ipc_sync_message.h"
|
| +#include "ipc/mojo_event.h"
|
| +#include "mojo/public/cpp/bindings/sync_handle_registry.h"
|
|
|
| -using base::TimeDelta;
|
| -using base::TimeTicks;
|
| using base::WaitableEvent;
|
|
|
| namespace IPC {
|
| +
|
| +namespace {
|
| +
|
| +// A generic callback used when watching handles synchronously. Sets |*signal|
|
| +// to true. Also sets |*error| to true in case of an error.
|
| +void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
|
| + *signal = true;
|
| + *error = result != MOJO_RESULT_OK;
|
| +}
|
| +
|
| +// A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but
|
| +// is only used in cases where failure should be impossible) and runs
|
| +// |callback|.
|
| +void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
|
| + DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
|
| + if (result == MOJO_RESULT_OK)
|
| + callback.Run();
|
| +}
|
| +
|
| +class PumpMessagesEvent {
|
| + public:
|
| + PumpMessagesEvent() { event_.Signal(); }
|
| + ~PumpMessagesEvent() {}
|
| +
|
| + const MojoEvent* event() const { return &event_; }
|
| +
|
| + private:
|
| + MojoEvent event_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
|
| +};
|
| +
|
| +base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
|
| + LAZY_INSTANCE_INITIALIZER;
|
| +
|
| +} // namespace
|
| +
|
| // When we're blocked in a Send(), we need to process incoming synchronous
|
| // messages right away because it could be blocking our reply (either
|
| // directly from the same object we're calling, or indirectly through one or
|
| @@ -156,7 +193,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| }
|
| }
|
|
|
| - WaitableEvent* dispatch_event() { return &dispatch_event_; }
|
| + MojoEvent* dispatch_event() { return &dispatch_event_; }
|
| base::SingleThreadTaskRunner* listener_task_runner() {
|
| return listener_task_runner_.get();
|
| }
|
| @@ -178,11 +215,11 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| }
|
| }
|
|
|
| - base::WaitableEventWatcher* top_send_done_watcher() {
|
| + mojo::Watcher* top_send_done_watcher() {
|
| return top_send_done_watcher_;
|
| }
|
|
|
| - void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
|
| + void set_top_send_done_watcher(mojo::Watcher* watcher) {
|
| top_send_done_watcher_ = watcher;
|
| }
|
|
|
| @@ -193,8 +230,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
| // as manual reset.
|
| ReceivedSyncMsgQueue()
|
| : message_queue_version_(0),
|
| - dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
|
| - base::WaitableEvent::InitialState::NOT_SIGNALED),
|
| listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
|
| task_pending_(false),
|
| listener_count_(0),
|
| @@ -215,19 +250,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
|
|
|
| std::vector<QueuedMessage> received_replies_;
|
|
|
| - // Set when we got a synchronous message that we must respond to as the
|
| + // Signaled when we get a synchronous message that we must respond to, as the
|
| // sender needs its reply before it can reply to our original synchronous
|
| // message.
|
| - WaitableEvent dispatch_event_;
|
| + MojoEvent dispatch_event_;
|
| scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
|
| base::Lock message_lock_;
|
| bool task_pending_;
|
| int listener_count_;
|
|
|
| - // The current send done event watcher for this thread. Used to maintain
|
| - // a local global stack of send done watchers to ensure that nested sync
|
| + // The current send done handle watcher for this thread. Used to maintain
|
| + // a thread-local stack of send done watchers to ensure that nested sync
|
| // message loops complete correctly.
|
| - base::WaitableEventWatcher* top_send_done_watcher_;
|
| + mojo::Watcher* top_send_done_watcher_;
|
| };
|
|
|
| base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
|
| @@ -250,9 +285,9 @@ SyncChannel::SyncContext::~SyncContext() {
|
| }
|
|
|
| // Adds information about an outgoing sync message to the context so that
|
| -// we know how to deserialize the reply. Returns a handle that's set when
|
| -// the reply has arrived.
|
| -void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
|
| +// we know how to deserialize the reply. Returns |true| if the message was added
|
| +// to the context or |false| if it was rejected (e.g. due to shutdown.)
|
| +bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
|
| // Create the tracking information for this message. This object is stored
|
| // by value since all members are pointers that are cheap to copy. These
|
| // pointers are cleaned up in the Pop() function.
|
| @@ -261,12 +296,14 @@ void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
|
| // OnObjectSignalled, another Send can happen which would stop the watcher
|
| // from being called. The event would get watched later, when the nested
|
| // Send completes, so the event will need to remain set.
|
| + base::AutoLock auto_lock(deserializers_lock_);
|
| + if (reject_new_deserializers_)
|
| + return false;
|
| PendingSyncMsg pending(
|
| SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
|
| - new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
|
| - base::WaitableEvent::InitialState::NOT_SIGNALED));
|
| - base::AutoLock auto_lock(deserializers_lock_);
|
| + new MojoEvent);
|
| deserializers_.push_back(pending);
|
| + return true;
|
| }
|
|
|
| bool SyncChannel::SyncContext::Pop() {
|
| @@ -276,7 +313,7 @@ bool SyncChannel::SyncContext::Pop() {
|
| PendingSyncMsg msg = deserializers_.back();
|
| delete msg.deserializer;
|
| delete msg.done_event;
|
| - msg.done_event = NULL;
|
| + msg.done_event = nullptr;
|
| deserializers_.pop_back();
|
| result = msg.send_result;
|
| }
|
| @@ -293,12 +330,12 @@ bool SyncChannel::SyncContext::Pop() {
|
| return result;
|
| }
|
|
|
| -WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
|
| +MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
|
| base::AutoLock auto_lock(deserializers_lock_);
|
| return deserializers_.back().done_event;
|
| }
|
|
|
| -WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
|
| +MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
|
| return received_sync_msgs_->dispatch_event();
|
| }
|
|
|
| @@ -322,7 +359,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
|
| DVLOG(1) << "Received error reply";
|
| }
|
|
|
| - base::WaitableEvent* done_event = deserializers_.back().done_event;
|
| + MojoEvent* done_event = deserializers_.back().done_event;
|
| TRACE_EVENT_FLOW_BEGIN0(
|
| TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
|
| "SyncChannel::SyncContext::TryToUnblockListener", done_event);
|
| @@ -368,7 +405,7 @@ void SyncChannel::SyncContext::OnChannelError() {
|
| void SyncChannel::SyncContext::OnChannelOpened() {
|
| shutdown_watcher_.StartWatching(
|
| shutdown_event_,
|
| - base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
|
| + base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
|
| base::Unretained(this)));
|
| Context::OnChannelOpened();
|
| }
|
| @@ -381,6 +418,7 @@ void SyncChannel::SyncContext::OnChannelClosed() {
|
|
|
| void SyncChannel::SyncContext::CancelPendingSends() {
|
| base::AutoLock auto_lock(deserializers_lock_);
|
| + reject_new_deserializers_ = true;
|
| PendingSyncMessageQueue::iterator iter;
|
| DVLOG(1) << "Canceling pending sends";
|
| for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
|
| @@ -391,21 +429,12 @@ void SyncChannel::SyncContext::CancelPendingSends() {
|
| }
|
| }
|
|
|
| -void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
|
| - if (event == shutdown_event_) {
|
| - // Process shut down before we can get a reply to a synchronous message.
|
| - // Cancel pending Send calls, which will end up setting the send done event.
|
| - CancelPendingSends();
|
| - } else {
|
| - // We got the reply, timed out or the process shutdown.
|
| - DCHECK_EQ(GetSendDoneEvent(), event);
|
| - base::MessageLoop::current()->QuitNow();
|
| - }
|
| -}
|
| +void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
|
| + DCHECK_EQ(event, shutdown_event_);
|
|
|
| -base::WaitableEventWatcher::EventCallback
|
| - SyncChannel::SyncContext::MakeWaitableEventCallback() {
|
| - return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
|
| + // Process shut down before we can get a reply to a synchronous message.
|
| + // Cancel pending Send calls, which will end up setting the send done event.
|
| + CancelPendingSends();
|
| }
|
|
|
| // static
|
| @@ -448,7 +477,8 @@ SyncChannel::SyncChannel(
|
| Listener* listener,
|
| const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
|
| WaitableEvent* shutdown_event)
|
| - : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
|
| + : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
|
| + sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
|
| // The current (listener) thread must be distinct from the IPC thread, or else
|
| // sending synchronous messages will deadlock.
|
| DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
|
| @@ -487,23 +517,23 @@ bool SyncChannel::Send(Message* message) {
|
| return true;
|
| }
|
|
|
| + SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
|
| + bool pump_messages = sync_msg->ShouldPumpMessages();
|
| +
|
| // *this* might get deleted in WaitForReply.
|
| scoped_refptr<SyncContext> context(sync_context());
|
| - if (context->shutdown_event()->IsSignaled()) {
|
| - DVLOG(1) << "shutdown event is signaled";
|
| + if (!context->Push(sync_msg)) {
|
| + DVLOG(1) << "Channel is shutting down. Dropping sync message.";
|
| delete message;
|
| return false;
|
| }
|
|
|
| - SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
|
| - context->Push(sync_msg);
|
| - WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
|
| -
|
| ChannelProxy::Send(message);
|
|
|
| // Wait for reply, or for any other incoming synchronous messages.
|
| // *this* might get deleted, so only call static functions at this point.
|
| - WaitForReply(context.get(), pump_messages_event);
|
| + scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
|
| + WaitForReply(registry.get(), context.get(), pump_messages);
|
|
|
| TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
|
| "SyncChannel::Send", context->GetSendDoneEvent());
|
| @@ -511,19 +541,43 @@ bool SyncChannel::Send(Message* message) {
|
| return context->Pop();
|
| }
|
|
|
| -void SyncChannel::WaitForReply(
|
| - SyncContext* context, WaitableEvent* pump_messages_event) {
|
| +void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
|
| + SyncContext* context,
|
| + bool pump_messages) {
|
| context->DispatchMessages();
|
| +
|
| + const MojoEvent* pump_messages_event = nullptr;
|
| + if (pump_messages)
|
| + pump_messages_event = g_pump_messages_event.Get().event();
|
| +
|
| while (true) {
|
| - WaitableEvent* objects[] = {
|
| - context->GetDispatchEvent(),
|
| - context->GetSendDoneEvent(),
|
| - pump_messages_event
|
| - };
|
| -
|
| - unsigned count = pump_messages_event ? 3: 2;
|
| - size_t result = WaitableEvent::WaitMany(objects, count);
|
| - if (result == 0 /* dispatch event */) {
|
| + bool dispatch = false;
|
| + bool send_done = false;
|
| + bool should_pump_messages = false;
|
| + bool error = false;
|
| + registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(),
|
| + MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&OnSyncHandleReady, &dispatch, &error));
|
| + registry->RegisterHandle(
|
| + context->GetSendDoneEvent()->GetHandle(),
|
| + MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&OnSyncHandleReady, &send_done, &error));
|
| + if (pump_messages_event) {
|
| + registry->RegisterHandle(
|
| + pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
|
| + }
|
| +
|
| + const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
|
| + registry->WatchAllHandles(stop_flags, 3);
|
| + DCHECK(!error);
|
| +
|
| + registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle());
|
| + registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
|
| + if (pump_messages_event)
|
| + registry->UnregisterHandle(pump_messages_event->GetHandle());
|
| +
|
| + if (dispatch) {
|
| // We're waiting for a reply, but we received a blocking synchronous
|
| // call. We must process it or otherwise a deadlock might occur.
|
| context->GetDispatchEvent()->Reset();
|
| @@ -531,7 +585,7 @@ void SyncChannel::WaitForReply(
|
| continue;
|
| }
|
|
|
| - if (result == 2 /* pump_messages_event */)
|
| + if (should_pump_messages)
|
| WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
|
|
|
| break;
|
| @@ -539,64 +593,59 @@ void SyncChannel::WaitForReply(
|
| }
|
|
|
| void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
|
| - base::WaitableEventWatcher send_done_watcher;
|
| + mojo::Watcher send_done_watcher;
|
|
|
| ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
|
| - DCHECK(sync_msg_queue != NULL);
|
| -
|
| - base::WaitableEventWatcher* old_send_done_event_watcher =
|
| - sync_msg_queue->top_send_done_watcher();
|
| -
|
| - base::WaitableEventWatcher::EventCallback old_callback;
|
| - base::WaitableEvent* old_event = NULL;
|
| -
|
| - // Maintain a local global stack of send done delegates to ensure that
|
| - // nested sync calls complete in the correct sequence, i.e. the
|
| - // outermost call completes first, etc.
|
| - if (old_send_done_event_watcher) {
|
| - old_callback = old_send_done_event_watcher->callback();
|
| - old_event = old_send_done_event_watcher->GetWatchedEvent();
|
| - old_send_done_event_watcher->StopWatching();
|
| + DCHECK_NE(sync_msg_queue, nullptr);
|
| +
|
| + mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher();
|
| + mojo::Handle old_handle(mojo::kInvalidHandleValue);
|
| + mojo::Watcher::ReadyCallback old_callback;
|
| +
|
| + // Maintain a thread-local stack of watchers to ensure nested calls complete
|
| + // in the correct sequence, i.e. the outermost call completes first, etc.
|
| + if (old_watcher) {
|
| + old_callback = old_watcher->ready_callback();
|
| + old_handle = old_watcher->handle();
|
| + old_watcher->Cancel();
|
| }
|
|
|
| sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
|
|
|
| - send_done_watcher.StartWatching(context->GetSendDoneEvent(),
|
| - context->MakeWaitableEventCallback());
|
| -
|
| {
|
| + base::RunLoop nested_loop;
|
| + send_done_watcher.Start(
|
| + context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
|
| +
|
| base::MessageLoop::ScopedNestableTaskAllower allow(
|
| base::MessageLoop::current());
|
| - base::RunLoop().Run();
|
| + nested_loop.Run();
|
| + send_done_watcher.Cancel();
|
| }
|
|
|
| - sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
|
| - if (old_send_done_event_watcher && old_event) {
|
| - old_send_done_event_watcher->StartWatching(old_event, old_callback);
|
| - }
|
| + sync_msg_queue->set_top_send_done_watcher(old_watcher);
|
| + if (old_watcher)
|
| + old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
|
| }
|
|
|
| -void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
|
| - DCHECK(event == sync_context()->GetDispatchEvent());
|
| - // The call to DispatchMessages might delete this object, so reregister
|
| - // the object watcher first.
|
| - event->Reset();
|
| - dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
|
| - sync_context()->DispatchMessages();
|
| +void SyncChannel::OnDispatchHandleReady(MojoResult result) {
|
| + DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
|
| + if (result == MOJO_RESULT_OK) {
|
| + sync_context()->GetDispatchEvent()->Reset();
|
| + sync_context()->DispatchMessages();
|
| + }
|
| }
|
|
|
| void SyncChannel::StartWatching() {
|
| // Ideally we only want to watch this object when running a nested message
|
| // loop. However, we don't know when it exits if there's another nested
|
| // message loop running under it or not, so we wouldn't know whether to
|
| - // stop or keep watching. So we always watch it, and create the event as
|
| - // manual reset since the object watcher might otherwise reset the event
|
| - // when we're doing a WaitMany.
|
| - dispatch_watcher_callback_ =
|
| - base::Bind(&SyncChannel::OnWaitableEventSignaled,
|
| - base::Unretained(this));
|
| - dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
|
| - dispatch_watcher_callback_);
|
| + // stop or keep watching. So we always watch it.
|
| + dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(),
|
| + MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&SyncChannel::OnDispatchHandleReady,
|
| + base::Unretained(this)));
|
| }
|
|
|
| void SyncChannel::OnChannelInit() {
|
|
|