Index: ipc/ipc_sync_channel.cc |
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc |
index 6852399bdfa4d8acdaba6bd7e766e715df085947..0d61e08ce945b01e27fd3005b138c1bfbc5c7eee 100644 |
--- a/ipc/ipc_sync_channel.cc |
+++ b/ipc/ipc_sync_channel.cc |
@@ -24,9 +24,7 @@ |
#include "ipc/ipc_logging.h" |
#include "ipc/ipc_message_macros.h" |
#include "ipc/ipc_sync_message.h" |
-#include "ipc/mojo_event.h" |
-#include "mojo/public/cpp/bindings/sync_handle_registry.h" |
-#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
+#include "mojo/public/cpp/bindings/sync_event_watcher.h" |
using base::WaitableEvent; |
@@ -35,35 +33,13 @@ namespace IPC { |
namespace { |
// A generic callback used when watching handles synchronously. Sets |*signal| |
-// to true. Also sets |*error| to true in case of an error. |
-void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
+// to true. |
+void OnEventReady(bool* signal) { |
*signal = true; |
- *error = result != MOJO_RESULT_OK; |
} |
-// A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result |
-// (DCHECKs, but is only used in cases where failure should be impossible) and |
-// runs |callback|. |
-void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
- DCHECK_EQ(result, MOJO_RESULT_OK); |
- callback.Run(); |
-} |
- |
-class PumpMessagesEvent { |
- public: |
- PumpMessagesEvent() { event_.Signal(); } |
- ~PumpMessagesEvent() {} |
- |
- const MojoEvent* event() const { return &event_; } |
- |
- private: |
- MojoEvent event_; |
- |
- DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); |
-}; |
- |
-base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = |
- LAZY_INSTANCE_INITIALIZER; |
+base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky |
+ g_pump_messages_event = LAZY_INSTANCE_INITIALIZER; |
} // namespace |
@@ -87,6 +63,60 @@ base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = |
class SyncChannel::ReceivedSyncMsgQueue : |
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
public: |
+ // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we |
+ // may nest waiting message loops arbitrarily deep on the SyncChannel's |
+ // thread. Every such operation has a corresponding WaitableEvent to be |
+ // watched which, when signalled for IPC completion, breaks out of the loop. |
+ // A reference to the innermost (i.e. topmost) watcher is held in |
+ // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|. |
+ // |
+ // NestedSendDoneWatcher provides a simple scoper which is used by |
+ // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done" |
+ // event, preserving the previous topmost state on the local stack until the |
+ // new inner loop is broken. If yet another subsequent nested loop is started |
+ // therein the process is repeated again in the new inner stack frame, and so |
+ // on. |
+ // |
+ // When this object is destroyed on stack unwind, the previous topmost state |
+ // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|, |
+ // and its watch is resumed immediately. |
+ class NestedSendDoneWatcher { |
+ public: |
+ NestedSendDoneWatcher(SyncChannel::SyncContext* context, |
+ base::RunLoop* run_loop) |
+ : sync_msg_queue_(context->received_sync_msgs()), |
+ outer_state_(sync_msg_queue_->top_send_done_event_watcher_), |
+ event_(context->GetSendDoneEvent()), |
+ callback_( |
+ base::Bind(&SyncChannel::SyncContext::OnSendDoneEventSignaled, |
+ context, |
+ run_loop)) { |
+ sync_msg_queue_->top_send_done_event_watcher_ = this; |
+ if (outer_state_) |
+ outer_state_->StopWatching(); |
+ StartWatching(); |
+ } |
+ |
+ ~NestedSendDoneWatcher() { |
+ sync_msg_queue_->top_send_done_event_watcher_ = outer_state_; |
+ if (outer_state_) |
+ outer_state_->StartWatching(); |
+ } |
+ |
+ private: |
+ void StartWatching() { watcher_.StartWatching(event_, callback_); } |
+ void StopWatching() { watcher_.StopWatching(); } |
+ |
+ ReceivedSyncMsgQueue* const sync_msg_queue_; |
+ NestedSendDoneWatcher* const outer_state_; |
+ |
+ base::WaitableEvent* const event_; |
+ const base::WaitableEventWatcher::EventCallback callback_; |
+ base::WaitableEventWatcher watcher_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher); |
+ }; |
+ |
// Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
// if necessary. Call RemoveContext on the same thread when done. |
static ReceivedSyncMsgQueue* AddContext() { |
@@ -208,7 +238,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
} |
} |
- MojoEvent* dispatch_event() { return &dispatch_event_; } |
+ base::WaitableEvent* dispatch_event() { return &dispatch_event_; } |
base::SingleThreadTaskRunner* listener_task_runner() { |
return listener_task_runner_.get(); |
} |
@@ -230,14 +260,6 @@ class SyncChannel::ReceivedSyncMsgQueue : |
} |
} |
- mojo::SimpleWatcher* top_send_done_watcher() { |
- return top_send_done_watcher_; |
- } |
- |
- void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { |
- top_send_done_watcher_ = watcher; |
- } |
- |
private: |
friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
@@ -245,23 +267,19 @@ class SyncChannel::ReceivedSyncMsgQueue : |
// as manual reset. |
ReceivedSyncMsgQueue() |
: message_queue_version_(0), |
+ dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
+ base::WaitableEvent::InitialState::NOT_SIGNALED), |
listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
- task_pending_(false), |
- listener_count_(0), |
- top_send_done_watcher_(nullptr) { |
- sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( |
- dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, |
- base::Unretained(this)))); |
+ sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>( |
+ &dispatch_event_, |
+ base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady, |
+ base::Unretained(this)))) { |
sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
~ReceivedSyncMsgQueue() {} |
- void OnDispatchHandleReady(MojoResult result) { |
- if (result != MOJO_RESULT_OK) |
- return; |
- |
+ void OnDispatchEventReady() { |
if (dispatch_flag_) { |
*dispatch_flag_ = true; |
return; |
@@ -284,23 +302,25 @@ class SyncChannel::ReceivedSyncMsgQueue : |
typedef std::list<QueuedMessage> SyncMessageQueue; |
SyncMessageQueue message_queue_; |
- uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
+ |
+ // Used to signal DispatchMessages to rescan |
+ uint32_t message_queue_version_ = 0; |
std::vector<QueuedMessage> received_replies_; |
// Signaled when we get a synchronous message that we must respond to, as the |
// sender needs its reply before it can reply to our original synchronous |
// message. |
- MojoEvent dispatch_event_; |
+ base::WaitableEvent dispatch_event_; |
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
base::Lock message_lock_; |
- bool task_pending_; |
- int listener_count_; |
+ bool task_pending_ = false; |
+ int listener_count_ = 0; |
- // The current send done handle watcher for this thread. Used to maintain |
- // a thread-local stack of send done watchers to ensure that nested sync |
- // message loops complete correctly. |
- mojo::SimpleWatcher* top_send_done_watcher_; |
+ // The current NestedSendDoneWatcher for this thread, if we're currently |
+ // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See |
+ // NestedSendDoneWatcher comments for more details. |
+ NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr; |
// If not null, the address of a flag to set when the dispatch event signals, |
// in lieu of actually dispatching messages. This is used by |
@@ -309,7 +329,7 @@ class SyncChannel::ReceivedSyncMsgQueue : |
bool* dispatch_flag_ = nullptr; |
// Watches |dispatch_event_| during all sync handle watches on this thread. |
- std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
+ std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_; |
}; |
base::LazyInstance<base::ThreadLocalPointer< |
@@ -327,6 +347,13 @@ SyncChannel::SyncContext::SyncContext( |
restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
} |
+void SyncChannel::SyncContext::OnSendDoneEventSignaled( |
+ base::RunLoop* nested_loop, |
+ base::WaitableEvent* event) { |
+ DCHECK_EQ(GetSendDoneEvent(), event); |
+ nested_loop->Quit(); |
+} |
+ |
SyncChannel::SyncContext::~SyncContext() { |
while (!deserializers_.empty()) |
Pop(); |
@@ -349,7 +376,8 @@ bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
return false; |
PendingSyncMsg pending( |
SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
- new MojoEvent); |
+ new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
+ base::WaitableEvent::InitialState::NOT_SIGNALED)); |
deserializers_.push_back(pending); |
return true; |
} |
@@ -378,12 +406,12 @@ bool SyncChannel::SyncContext::Pop() { |
return result; |
} |
-MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
+base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
base::AutoLock auto_lock(deserializers_lock_); |
return deserializers_.back().done_event; |
} |
-MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
+base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
return received_sync_msgs_->dispatch_event(); |
} |
@@ -407,7 +435,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
DVLOG(1) << "Received error reply"; |
} |
- MojoEvent* done_event = deserializers_.back().done_event; |
+ base::WaitableEvent* done_event = deserializers_.back().done_event; |
TRACE_EVENT_FLOW_BEGIN0( |
TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
"SyncChannel::SyncContext::TryToUnblockListener", done_event); |
@@ -526,9 +554,7 @@ SyncChannel::SyncChannel( |
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
WaitableEvent* shutdown_event) |
: ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
- sync_handle_registry_(mojo::SyncHandleRegistry::current()), |
- dispatch_watcher_(FROM_HERE, |
- mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { |
+ sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
// The current (listener) thread must be distinct from the IPC thread, or else |
// sending synchronous messages will deadlock. |
DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
@@ -596,36 +622,39 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
bool pump_messages) { |
context->DispatchMessages(); |
- const MojoEvent* pump_messages_event = nullptr; |
- if (pump_messages) |
- pump_messages_event = g_pump_messages_event.Get().event(); |
+ base::WaitableEvent* pump_messages_event = nullptr; |
+ if (pump_messages) { |
+ if (!g_pump_messages_event.Get()) { |
+ g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>( |
+ base::WaitableEvent::ResetPolicy::MANUAL, |
+ base::WaitableEvent::InitialState::SIGNALED); |
+ } |
+ pump_messages_event = g_pump_messages_event.Get().get(); |
+ } |
while (true) { |
bool dispatch = false; |
bool send_done = false; |
bool should_pump_messages = false; |
- bool error = false; |
- bool registered = registry->RegisterHandle( |
- context->GetSendDoneEvent()->GetHandle(), |
- MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&OnSyncHandleReady, &send_done, &error)); |
+ bool registered = registry->RegisterEvent( |
+ context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done)); |
DCHECK(registered); |
+ |
if (pump_messages_event) { |
- registered = registry->RegisterHandle( |
- pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
+ registered = registry->RegisterEvent( |
+ pump_messages_event, |
+ base::Bind(&OnEventReady, &should_pump_messages)); |
DCHECK(registered); |
} |
const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
context->received_sync_msgs()->BlockDispatch(&dispatch); |
- registry->WatchAllHandles(stop_flags, 3); |
+ registry->Wait(stop_flags, 3); |
context->received_sync_msgs()->UnblockDispatch(); |
- DCHECK(!error); |
- registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
+ registry->UnregisterEvent(context->GetSendDoneEvent()); |
if (pump_messages_event) |
- registry->UnregisterHandle(pump_messages_event->GetHandle()); |
+ registry->UnregisterEvent(pump_messages_event); |
if (dispatch) { |
// We're waiting for a reply, but we received a blocking synchronous call. |
@@ -643,46 +672,20 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
} |
void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
- mojo::SimpleWatcher send_done_watcher( |
- FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); |
- |
- ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
- DCHECK_NE(sync_msg_queue, nullptr); |
- |
- mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
- mojo::Handle old_handle(mojo::kInvalidHandleValue); |
- mojo::SimpleWatcher::ReadyCallback old_callback; |
- |
- // Maintain a thread-local stack of watchers to ensure nested calls complete |
- // in the correct sequence, i.e. the outermost call completes first, etc. |
- if (old_watcher) { |
- old_callback = old_watcher->ready_callback(); |
- old_handle = old_watcher->handle(); |
- old_watcher->Cancel(); |
- } |
- |
- sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
- |
- { |
- base::RunLoop nested_loop; |
- send_done_watcher.Watch( |
- context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
- |
- base::MessageLoop::ScopedNestableTaskAllower allow( |
- base::MessageLoop::current()); |
- nested_loop.Run(); |
- send_done_watcher.Cancel(); |
- } |
- |
- sync_msg_queue->set_top_send_done_watcher(old_watcher); |
- if (old_watcher) |
- old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
+ base::MessageLoop::ScopedNestableTaskAllower allow( |
+ base::MessageLoop::current()); |
+ base::RunLoop nested_loop; |
+ ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(context, &nested_loop); |
+ nested_loop.Run(); |
} |
-void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
- DCHECK_EQ(result, MOJO_RESULT_OK); |
+void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) { |
+ DCHECK_EQ(sync_context()->GetDispatchEvent(), event); |
sync_context()->GetDispatchEvent()->Reset(); |
+ |
+ StartWatching(); |
+ |
+ // NOTE: May delete |this|. |
sync_context()->DispatchMessages(); |
} |
@@ -691,10 +694,10 @@ void SyncChannel::StartWatching() { |
// messages once the listener thread is unblocked and pumping its task queue. |
// The ReceivedSyncMsgQueue also watches this event and may dispatch |
// immediately if woken up by a message which it's allowed to dispatch. |
- dispatch_watcher_.Watch( |
- sync_context()->GetDispatchEvent()->GetHandle(), |
- MOJO_HANDLE_SIGNAL_READABLE, |
- base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); |
+ dispatch_watcher_.StartWatching( |
+ sync_context()->GetDispatchEvent(), |
+ base::Bind(&SyncChannel::OnDispatchEventSignaled, |
+ base::Unretained(this))); |
} |
void SyncChannel::OnChannelInit() { |