Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(361)

Unified Diff: ipc/ipc_sync_channel.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_sync_channel.cc
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index 6852399bdfa4d8acdaba6bd7e766e715df085947..0d61e08ce945b01e27fd3005b138c1bfbc5c7eee 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -24,9 +24,7 @@
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_macros.h"
#include "ipc/ipc_sync_message.h"
-#include "ipc/mojo_event.h"
-#include "mojo/public/cpp/bindings/sync_handle_registry.h"
-#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
+#include "mojo/public/cpp/bindings/sync_event_watcher.h"
using base::WaitableEvent;
@@ -35,35 +33,13 @@ namespace IPC {
namespace {
// A generic callback used when watching handles synchronously. Sets |*signal|
-// to true. Also sets |*error| to true in case of an error.
-void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
+// to true.
+void OnEventReady(bool* signal) {
*signal = true;
- *error = result != MOJO_RESULT_OK;
}
-// A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result
-// (DCHECKs, but is only used in cases where failure should be impossible) and
-// runs |callback|.
-void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
- DCHECK_EQ(result, MOJO_RESULT_OK);
- callback.Run();
-}
-
-class PumpMessagesEvent {
- public:
- PumpMessagesEvent() { event_.Signal(); }
- ~PumpMessagesEvent() {}
-
- const MojoEvent* event() const { return &event_; }
-
- private:
- MojoEvent event_;
-
- DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
-};
-
-base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
- LAZY_INSTANCE_INITIALIZER;
+base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
+ g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
} // namespace
@@ -87,6 +63,60 @@ base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
class SyncChannel::ReceivedSyncMsgQueue :
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
public:
+ // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we
+ // may nest waiting message loops arbitrarily deep on the SyncChannel's
+ // thread. Every such operation has a corresponding WaitableEvent to be
+ // watched which, when signalled for IPC completion, breaks out of the loop.
+ // A reference to the innermost (i.e. topmost) watcher is held in
+ // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|.
+ //
+ // NestedSendDoneWatcher provides a simple scoper which is used by
+ // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done"
+ // event, preserving the previous topmost state on the local stack until the
+ // new inner loop is broken. If yet another subsequent nested loop is started
+ // therein the process is repeated again in the new inner stack frame, and so
+ // on.
+ //
+ // When this object is destroyed on stack unwind, the previous topmost state
+ // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|,
+ // and its watch is resumed immediately.
+ class NestedSendDoneWatcher {
+ public:
+ NestedSendDoneWatcher(SyncChannel::SyncContext* context,
+ base::RunLoop* run_loop)
+ : sync_msg_queue_(context->received_sync_msgs()),
+ outer_state_(sync_msg_queue_->top_send_done_event_watcher_),
+ event_(context->GetSendDoneEvent()),
+ callback_(
+ base::Bind(&SyncChannel::SyncContext::OnSendDoneEventSignaled,
+ context,
+ run_loop)) {
+ sync_msg_queue_->top_send_done_event_watcher_ = this;
+ if (outer_state_)
+ outer_state_->StopWatching();
+ StartWatching();
+ }
+
+ ~NestedSendDoneWatcher() {
+ sync_msg_queue_->top_send_done_event_watcher_ = outer_state_;
+ if (outer_state_)
+ outer_state_->StartWatching();
+ }
+
+ private:
+ void StartWatching() { watcher_.StartWatching(event_, callback_); }
+ void StopWatching() { watcher_.StopWatching(); }
+
+ ReceivedSyncMsgQueue* const sync_msg_queue_;
+ NestedSendDoneWatcher* const outer_state_;
+
+ base::WaitableEvent* const event_;
+ const base::WaitableEventWatcher::EventCallback callback_;
+ base::WaitableEventWatcher watcher_;
+
+ DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher);
+ };
+
// Returns the ReceivedSyncMsgQueue instance for this thread, creating one
// if necessary. Call RemoveContext on the same thread when done.
static ReceivedSyncMsgQueue* AddContext() {
@@ -208,7 +238,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- MojoEvent* dispatch_event() { return &dispatch_event_; }
+ base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
base::SingleThreadTaskRunner* listener_task_runner() {
return listener_task_runner_.get();
}
@@ -230,14 +260,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- mojo::SimpleWatcher* top_send_done_watcher() {
- return top_send_done_watcher_;
- }
-
- void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) {
- top_send_done_watcher_ = watcher;
- }
-
private:
friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
@@ -245,23 +267,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
// as manual reset.
ReceivedSyncMsgQueue()
: message_queue_version_(0),
+ dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED),
listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
- task_pending_(false),
- listener_count_(0),
- top_send_done_watcher_(nullptr) {
- sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher(
- dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady,
- base::Unretained(this))));
+ sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>(
+ &dispatch_event_,
+ base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
+ base::Unretained(this)))) {
sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
~ReceivedSyncMsgQueue() {}
- void OnDispatchHandleReady(MojoResult result) {
- if (result != MOJO_RESULT_OK)
- return;
-
+ void OnDispatchEventReady() {
if (dispatch_flag_) {
*dispatch_flag_ = true;
return;
@@ -284,23 +302,25 @@ class SyncChannel::ReceivedSyncMsgQueue :
typedef std::list<QueuedMessage> SyncMessageQueue;
SyncMessageQueue message_queue_;
- uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
+
+ // Used to signal DispatchMessages to rescan
+ uint32_t message_queue_version_ = 0;
std::vector<QueuedMessage> received_replies_;
// Signaled when we get a synchronous message that we must respond to, as the
// sender needs its reply before it can reply to our original synchronous
// message.
- MojoEvent dispatch_event_;
+ base::WaitableEvent dispatch_event_;
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
base::Lock message_lock_;
- bool task_pending_;
- int listener_count_;
+ bool task_pending_ = false;
+ int listener_count_ = 0;
- // The current send done handle watcher for this thread. Used to maintain
- // a thread-local stack of send done watchers to ensure that nested sync
- // message loops complete correctly.
- mojo::SimpleWatcher* top_send_done_watcher_;
+ // The current NestedSendDoneWatcher for this thread, if we're currently
+ // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See
+ // NestedSendDoneWatcher comments for more details.
+ NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr;
// If not null, the address of a flag to set when the dispatch event signals,
// in lieu of actually dispatching messages. This is used by
@@ -309,7 +329,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
bool* dispatch_flag_ = nullptr;
// Watches |dispatch_event_| during all sync handle watches on this thread.
- std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_;
+ std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
};
base::LazyInstance<base::ThreadLocalPointer<
@@ -327,6 +347,13 @@ SyncChannel::SyncContext::SyncContext(
restrict_dispatch_group_(kRestrictDispatchGroup_None) {
}
+void SyncChannel::SyncContext::OnSendDoneEventSignaled(
+ base::RunLoop* nested_loop,
+ base::WaitableEvent* event) {
+ DCHECK_EQ(GetSendDoneEvent(), event);
+ nested_loop->Quit();
+}
+
SyncChannel::SyncContext::~SyncContext() {
while (!deserializers_.empty())
Pop();
@@ -349,7 +376,8 @@ bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
return false;
PendingSyncMsg pending(
SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
- new MojoEvent);
+ new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED));
deserializers_.push_back(pending);
return true;
}
@@ -378,12 +406,12 @@ bool SyncChannel::SyncContext::Pop() {
return result;
}
-MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
+base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
base::AutoLock auto_lock(deserializers_lock_);
return deserializers_.back().done_event;
}
-MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
+base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
return received_sync_msgs_->dispatch_event();
}
@@ -407,7 +435,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
DVLOG(1) << "Received error reply";
}
- MojoEvent* done_event = deserializers_.back().done_event;
+ base::WaitableEvent* done_event = deserializers_.back().done_event;
TRACE_EVENT_FLOW_BEGIN0(
TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
"SyncChannel::SyncContext::TryToUnblockListener", done_event);
@@ -526,9 +554,7 @@ SyncChannel::SyncChannel(
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
WaitableEvent* shutdown_event)
: ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
- sync_handle_registry_(mojo::SyncHandleRegistry::current()),
- dispatch_watcher_(FROM_HERE,
- mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
+ sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
// The current (listener) thread must be distinct from the IPC thread, or else
// sending synchronous messages will deadlock.
DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
@@ -596,36 +622,39 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
bool pump_messages) {
context->DispatchMessages();
- const MojoEvent* pump_messages_event = nullptr;
- if (pump_messages)
- pump_messages_event = g_pump_messages_event.Get().event();
+ base::WaitableEvent* pump_messages_event = nullptr;
+ if (pump_messages) {
+ if (!g_pump_messages_event.Get()) {
+ g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>(
+ base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::SIGNALED);
+ }
+ pump_messages_event = g_pump_messages_event.Get().get();
+ }
while (true) {
bool dispatch = false;
bool send_done = false;
bool should_pump_messages = false;
- bool error = false;
- bool registered = registry->RegisterHandle(
- context->GetSendDoneEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &send_done, &error));
+ bool registered = registry->RegisterEvent(
+ context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done));
DCHECK(registered);
+
if (pump_messages_event) {
- registered = registry->RegisterHandle(
- pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
+ registered = registry->RegisterEvent(
+ pump_messages_event,
+ base::Bind(&OnEventReady, &should_pump_messages));
DCHECK(registered);
}
const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
context->received_sync_msgs()->BlockDispatch(&dispatch);
- registry->WatchAllHandles(stop_flags, 3);
+ registry->Wait(stop_flags, 3);
context->received_sync_msgs()->UnblockDispatch();
- DCHECK(!error);
- registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
+ registry->UnregisterEvent(context->GetSendDoneEvent());
if (pump_messages_event)
- registry->UnregisterHandle(pump_messages_event->GetHandle());
+ registry->UnregisterEvent(pump_messages_event);
if (dispatch) {
// We're waiting for a reply, but we received a blocking synchronous call.
@@ -643,46 +672,20 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
}
void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
- mojo::SimpleWatcher send_done_watcher(
- FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
-
- ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
- DCHECK_NE(sync_msg_queue, nullptr);
-
- mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher();
- mojo::Handle old_handle(mojo::kInvalidHandleValue);
- mojo::SimpleWatcher::ReadyCallback old_callback;
-
- // Maintain a thread-local stack of watchers to ensure nested calls complete
- // in the correct sequence, i.e. the outermost call completes first, etc.
- if (old_watcher) {
- old_callback = old_watcher->ready_callback();
- old_handle = old_watcher->handle();
- old_watcher->Cancel();
- }
-
- sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
-
- {
- base::RunLoop nested_loop;
- send_done_watcher.Watch(
- context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
-
- base::MessageLoop::ScopedNestableTaskAllower allow(
- base::MessageLoop::current());
- nested_loop.Run();
- send_done_watcher.Cancel();
- }
-
- sync_msg_queue->set_top_send_done_watcher(old_watcher);
- if (old_watcher)
- old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
+ base::MessageLoop::ScopedNestableTaskAllower allow(
+ base::MessageLoop::current());
+ base::RunLoop nested_loop;
+ ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(context, &nested_loop);
+ nested_loop.Run();
}
-void SyncChannel::OnDispatchHandleReady(MojoResult result) {
- DCHECK_EQ(result, MOJO_RESULT_OK);
+void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
+ DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
sync_context()->GetDispatchEvent()->Reset();
+
+ StartWatching();
+
+ // NOTE: May delete |this|.
sync_context()->DispatchMessages();
}
@@ -691,10 +694,10 @@ void SyncChannel::StartWatching() {
// messages once the listener thread is unblocked and pumping its task queue.
// The ReceivedSyncMsgQueue also watches this event and may dispatch
// immediately if woken up by a message which it's allowed to dispatch.
- dispatch_watcher_.Watch(
- sync_context()->GetDispatchEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this)));
+ dispatch_watcher_.StartWatching(
+ sync_context()->GetDispatchEvent(),
+ base::Bind(&SyncChannel::OnDispatchEventSignaled,
+ base::Unretained(this)));
}
void SyncChannel::OnChannelInit() {
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698