Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1057)

Unified Diff: ipc/ipc_sync_channel.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: docs Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: ipc/ipc_sync_channel.cc
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index 6852399bdfa4d8acdaba6bd7e766e715df085947..1dbf21e806452790d63d18d6fd76d0ba0fe5b140 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -24,9 +24,7 @@
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_macros.h"
#include "ipc/ipc_sync_message.h"
-#include "ipc/mojo_event.h"
-#include "mojo/public/cpp/bindings/sync_handle_registry.h"
-#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
+#include "mojo/public/cpp/bindings/sync_event_watcher.h"
using base::WaitableEvent;
@@ -35,35 +33,13 @@ namespace IPC {
namespace {
// A generic callback used when watching handles synchronously. Sets |*signal|
-// to true. Also sets |*error| to true in case of an error.
-void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
+// to true.
+void OnEventReady(bool* signal) {
*signal = true;
- *error = result != MOJO_RESULT_OK;
}
-// A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result
-// (DCHECKs, but is only used in cases where failure should be impossible) and
-// runs |callback|.
-void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
- DCHECK_EQ(result, MOJO_RESULT_OK);
- callback.Run();
-}
-
-class PumpMessagesEvent {
- public:
- PumpMessagesEvent() { event_.Signal(); }
- ~PumpMessagesEvent() {}
-
- const MojoEvent* event() const { return &event_; }
-
- private:
- MojoEvent event_;
-
- DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
-};
-
-base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
- LAZY_INSTANCE_INITIALIZER;
+base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
+ g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
} // namespace
@@ -208,7 +184,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- MojoEvent* dispatch_event() { return &dispatch_event_; }
+ base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
base::SingleThreadTaskRunner* listener_task_runner() {
return listener_task_runner_.get();
}
@@ -230,12 +206,23 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- mojo::SimpleWatcher* top_send_done_watcher() {
- return top_send_done_watcher_;
- }
-
- void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) {
+ // See SyncChannel::WaitForReplyWithNestedMessageLoop for details.
+ void SetTopSendDoneState(
+ base::WaitableEventWatcher* watcher,
+ base::WaitableEvent* event,
+ const base::WaitableEventWatcher::EventCallback& callback,
+ base::WaitableEventWatcher** outer_watcher,
+ base::WaitableEvent** outer_event,
+ base::WaitableEventWatcher::EventCallback* outer_callback) {
+ if (outer_watcher) {
+ DCHECK(outer_event && outer_callback);
+ *outer_watcher = top_send_done_watcher_;
+ *outer_event = top_send_done_event_;
+ *outer_callback = top_send_done_callback_;
+ }
top_send_done_watcher_ = watcher;
+ top_send_done_event_ = event;
+ top_send_done_callback_ = callback;
}
private:
@@ -245,23 +232,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
// as manual reset.
ReceivedSyncMsgQueue()
: message_queue_version_(0),
+ dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED),
listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
- task_pending_(false),
- listener_count_(0),
- top_send_done_watcher_(nullptr) {
- sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher(
- dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady,
- base::Unretained(this))));
+ sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>(
+ &dispatch_event_,
+ base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
+ base::Unretained(this)))) {
sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
~ReceivedSyncMsgQueue() {}
- void OnDispatchHandleReady(MojoResult result) {
- if (result != MOJO_RESULT_OK)
- return;
-
+ void OnDispatchEventReady() {
if (dispatch_flag_) {
*dispatch_flag_ = true;
return;
@@ -284,23 +267,27 @@ class SyncChannel::ReceivedSyncMsgQueue :
typedef std::list<QueuedMessage> SyncMessageQueue;
SyncMessageQueue message_queue_;
- uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
+
+ // Used to signal DispatchMessages to rescan
+ uint32_t message_queue_version_ = 0;
std::vector<QueuedMessage> received_replies_;
// Signaled when we get a synchronous message that we must respond to, as the
// sender needs its reply before it can reply to our original synchronous
// message.
- MojoEvent dispatch_event_;
+ base::WaitableEvent dispatch_event_;
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
base::Lock message_lock_;
- bool task_pending_;
- int listener_count_;
+ bool task_pending_ = false;
+ int listener_count_ = 0;
- // The current send done handle watcher for this thread. Used to maintain
- // a thread-local stack of send done watchers to ensure that nested sync
- // message loops complete correctly.
- mojo::SimpleWatcher* top_send_done_watcher_;
+ // The current send-done state for this thread. Used to maintain a thread-
+ // local stack of state to ensure that nested sync message loops complete
+ // correctly.
+ base::WaitableEventWatcher* top_send_done_watcher_ = nullptr;
+ base::WaitableEvent* top_send_done_event_ = nullptr;
+ base::WaitableEventWatcher::EventCallback top_send_done_callback_;
// If not null, the address of a flag to set when the dispatch event signals,
// in lieu of actually dispatching messages. This is used by
@@ -309,7 +296,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
bool* dispatch_flag_ = nullptr;
// Watches |dispatch_event_| during all sync handle watches on this thread.
- std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_;
+ std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
};
base::LazyInstance<base::ThreadLocalPointer<
@@ -327,6 +314,13 @@ SyncChannel::SyncContext::SyncContext(
restrict_dispatch_group_(kRestrictDispatchGroup_None) {
}
+void SyncChannel::SyncContext::OnSendDoneEventSignaled(
+ base::RunLoop* nested_loop,
+ base::WaitableEvent* event) {
+ DCHECK_EQ(GetSendDoneEvent(), event);
+ nested_loop->Quit();
+}
+
SyncChannel::SyncContext::~SyncContext() {
while (!deserializers_.empty())
Pop();
@@ -349,7 +343,8 @@ bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
return false;
PendingSyncMsg pending(
SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
- new MojoEvent);
+ new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED));
deserializers_.push_back(pending);
return true;
}
@@ -378,12 +373,12 @@ bool SyncChannel::SyncContext::Pop() {
return result;
}
-MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
+base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
base::AutoLock auto_lock(deserializers_lock_);
return deserializers_.back().done_event;
}
-MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
+base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
return received_sync_msgs_->dispatch_event();
}
@@ -407,7 +402,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
DVLOG(1) << "Received error reply";
}
- MojoEvent* done_event = deserializers_.back().done_event;
+ base::WaitableEvent* done_event = deserializers_.back().done_event;
TRACE_EVENT_FLOW_BEGIN0(
TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
"SyncChannel::SyncContext::TryToUnblockListener", done_event);
@@ -526,9 +521,7 @@ SyncChannel::SyncChannel(
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
WaitableEvent* shutdown_event)
: ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
- sync_handle_registry_(mojo::SyncHandleRegistry::current()),
- dispatch_watcher_(FROM_HERE,
- mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
+ sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
// The current (listener) thread must be distinct from the IPC thread, or else
// sending synchronous messages will deadlock.
DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
@@ -596,24 +589,28 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
bool pump_messages) {
context->DispatchMessages();
- const MojoEvent* pump_messages_event = nullptr;
- if (pump_messages)
- pump_messages_event = g_pump_messages_event.Get().event();
+ base::WaitableEvent* pump_messages_event = nullptr;
+ if (pump_messages) {
+ if (!g_pump_messages_event.Get()) {
+ g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>(
+ base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::SIGNALED);
+ }
+ pump_messages_event = g_pump_messages_event.Get().get();
+ }
while (true) {
bool dispatch = false;
bool send_done = false;
bool should_pump_messages = false;
- bool error = false;
- bool registered = registry->RegisterHandle(
- context->GetSendDoneEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &send_done, &error));
+ bool registered = registry->RegisterEvent(
+ context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done));
DCHECK(registered);
+
if (pump_messages_event) {
- registered = registry->RegisterHandle(
- pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
+ registered = registry->RegisterEvent(
+ pump_messages_event,
+ base::Bind(&OnEventReady, &should_pump_messages));
DCHECK(registered);
}
@@ -621,11 +618,10 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
context->received_sync_msgs()->BlockDispatch(&dispatch);
registry->WatchAllHandles(stop_flags, 3);
context->received_sync_msgs()->UnblockDispatch();
- DCHECK(!error);
- registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
+ registry->UnregisterEvent(context->GetSendDoneEvent());
if (pump_messages_event)
- registry->UnregisterHandle(pump_messages_event->GetHandle());
+ registry->UnregisterEvent(pump_messages_event);
if (dispatch) {
// We're waiting for a reply, but we received a blocking synchronous call.
@@ -643,46 +639,55 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
}
void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
- mojo::SimpleWatcher send_done_watcher(
- FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
-
ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
- DCHECK_NE(sync_msg_queue, nullptr);
-
- mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher();
- mojo::Handle old_handle(mojo::kInvalidHandleValue);
- mojo::SimpleWatcher::ReadyCallback old_callback;
-
- // Maintain a thread-local stack of watchers to ensure nested calls complete
- // in the correct sequence, i.e. the outermost call completes first, etc.
- if (old_watcher) {
- old_callback = old_watcher->ready_callback();
- old_handle = old_watcher->handle();
- old_watcher->Cancel();
- }
+ DCHECK(sync_msg_queue);
- sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
+ base::RunLoop nested_loop;
- {
- base::RunLoop nested_loop;
- send_done_watcher.Watch(
- context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
-
- base::MessageLoop::ScopedNestableTaskAllower allow(
- base::MessageLoop::current());
- nested_loop.Run();
- send_done_watcher.Cancel();
- }
-
- sync_msg_queue->set_top_send_done_watcher(old_watcher);
- if (old_watcher)
- old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
+ // WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we can nest
+ // waiting message loops arbitrarily deep on the SyncChannel's thread. Every
+ // such operation has a corresponding WaitableEvent to be watched which, when
+ // signalled for IPC completion, breaks out of the loop. The innermost (i.e.
+ // topmost) such event is stored in |sync_msg_queue| state.
+ //
+ // Here we preserve the current top-of-stack event state (if any) within the
+ // local stack frame. We then replace the event state in |sync_msg_queue| with
+ // our own and run a nested loop. If a subsequent nested loop is started
+ // therein the process is repeated in that stack frame, and so on.
+ //
+ // Once the innermost nested loop is broken, the locally preserved event state
+ // is swapped back into |sync_msg_queue| before unwinding the stack.
+ base::WaitableEventWatcher* outer_watcher = nullptr;
yzshen1 2017/03/23 20:15:50 Does it make sense to group these three things int
Ken Rockot(use gerrit already) 2017/03/23 22:04:20 Sure, that sounds nice. In fact I've gone and move
+ base::WaitableEvent* outer_event = nullptr;
+ base::WaitableEventWatcher::EventCallback outer_callback;
+ base::WaitableEventWatcher send_done_watcher;
+ base::WaitableEvent* event = context->GetSendDoneEvent();
+ const base::WaitableEventWatcher::EventCallback callback =
+ base::Bind(&SyncContext::OnSendDoneEventSignaled, context, &nested_loop);
+ sync_msg_queue->SetTopSendDoneState(&send_done_watcher, event, callback,
+ &outer_watcher, &outer_event,
+ &outer_callback);
+ if (outer_watcher)
+ outer_watcher->StopWatching();
+ send_done_watcher.StartWatching(event, callback);
+
+ base::MessageLoop::ScopedNestableTaskAllower allow(
+ base::MessageLoop::current());
+ nested_loop.Run();
+
+ sync_msg_queue->SetTopSendDoneState(
+ outer_watcher, outer_event, outer_callback, nullptr, nullptr, nullptr);
+ if (outer_watcher)
+ outer_watcher->StartWatching(outer_event, outer_callback);
}
-void SyncChannel::OnDispatchHandleReady(MojoResult result) {
- DCHECK_EQ(result, MOJO_RESULT_OK);
+void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
+ DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
sync_context()->GetDispatchEvent()->Reset();
+
+ StartWatching();
+
+ // NOTE: May delete |this|.
sync_context()->DispatchMessages();
}
@@ -691,10 +696,10 @@ void SyncChannel::StartWatching() {
// messages once the listener thread is unblocked and pumping its task queue.
// The ReceivedSyncMsgQueue also watches this event and may dispatch
// immediately if woken up by a message which it's allowed to dispatch.
- dispatch_watcher_.Watch(
- sync_context()->GetDispatchEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this)));
+ dispatch_watcher_.StartWatching(
+ sync_context()->GetDispatchEvent(),
+ base::Bind(&SyncChannel::OnDispatchEventSignaled,
+ base::Unretained(this)));
}
void SyncChannel::OnChannelInit() {

Powered by Google App Engine
This is Rietveld 408576698