Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(219)

Unified Diff: ipc/ipc_sync_channel.cc

Issue 2097103002: Revert Mojo-based SyncChannel waiting again (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_sync_channel.cc
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index e8eeec6deca4c77dbdd6968ca347456e8c1caa53..8e3f4115b5a5c8426c13edc583389b57faeb822b 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -13,10 +13,9 @@
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
-#include "base/macros.h"
#include "base/memory/ptr_util.h"
-#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
+#include "base/synchronization/waitable_event_watcher.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/trace_event/trace_event.h"
@@ -24,49 +23,12 @@
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_macros.h"
#include "ipc/ipc_sync_message.h"
-#include "ipc/mojo_event.h"
-#include "mojo/public/cpp/bindings/sync_handle_registry.h"
+using base::TimeDelta;
+using base::TimeTicks;
using base::WaitableEvent;
namespace IPC {
-
-namespace {
-
-// A generic callback used when watching handles synchronously. Sets |*signal|
-// to true. Also sets |*error| to true in case of an error.
-void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
- *signal = true;
- *error = result != MOJO_RESULT_OK;
-}
-
-// A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but
-// is only used in cases where failure should be impossible) and runs
-// |callback|.
-void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
- DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
- if (result == MOJO_RESULT_OK)
- callback.Run();
-}
-
-class PumpMessagesEvent {
- public:
- PumpMessagesEvent() { event_.Signal(); }
- ~PumpMessagesEvent() {}
-
- const MojoEvent* event() const { return &event_; }
-
- private:
- MojoEvent event_;
-
- DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
-};
-
-base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
- LAZY_INSTANCE_INITIALIZER;
-
-} // namespace
-
// When we're blocked in a Send(), we need to process incoming synchronous
// messages right away because it could be blocking our reply (either
// directly from the same object we're calling, or indirectly through one or
@@ -193,7 +155,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- MojoEvent* dispatch_event() { return &dispatch_event_; }
+ WaitableEvent* dispatch_event() { return &dispatch_event_; }
base::SingleThreadTaskRunner* listener_task_runner() {
return listener_task_runner_.get();
}
@@ -215,11 +177,11 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- mojo::Watcher* top_send_done_watcher() {
+ base::WaitableEventWatcher* top_send_done_watcher() {
return top_send_done_watcher_;
}
- void set_top_send_done_watcher(mojo::Watcher* watcher) {
+ void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
top_send_done_watcher_ = watcher;
}
@@ -230,6 +192,8 @@ class SyncChannel::ReceivedSyncMsgQueue :
// as manual reset.
ReceivedSyncMsgQueue()
: message_queue_version_(0),
+ dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED),
listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
task_pending_(false),
listener_count_(0),
@@ -250,19 +214,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
std::vector<QueuedMessage> received_replies_;
- // Signaled when we get a synchronous message that we must respond to, as the
+ // Set when we got a synchronous message that we must respond to as the
// sender needs its reply before it can reply to our original synchronous
// message.
- MojoEvent dispatch_event_;
+ WaitableEvent dispatch_event_;
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
base::Lock message_lock_;
bool task_pending_;
int listener_count_;
- // The current send done handle watcher for this thread. Used to maintain
- // a thread-local stack of send done watchers to ensure that nested sync
+ // The current send done event watcher for this thread. Used to maintain
+ // a local global stack of send done watchers to ensure that nested sync
// message loops complete correctly.
- mojo::Watcher* top_send_done_watcher_;
+ base::WaitableEventWatcher* top_send_done_watcher_;
};
base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
@@ -285,9 +249,9 @@ SyncChannel::SyncContext::~SyncContext() {
}
// Adds information about an outgoing sync message to the context so that
-// we know how to deserialize the reply. Returns |true| if the message was added
-// to the context or |false| if it was rejected (e.g. due to shutdown.)
-bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
+// we know how to deserialize the reply. Returns a handle that's set when
+// the reply has arrived.
+void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
// Create the tracking information for this message. This object is stored
// by value since all members are pointers that are cheap to copy. These
// pointers are cleaned up in the Pop() function.
@@ -296,14 +260,12 @@ bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
// OnObjectSignalled, another Send can happen which would stop the watcher
// from being called. The event would get watched later, when the nested
// Send completes, so the event will need to remain set.
- base::AutoLock auto_lock(deserializers_lock_);
- if (reject_new_deserializers_)
- return false;
PendingSyncMsg pending(
SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
- new MojoEvent);
+ new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED));
+ base::AutoLock auto_lock(deserializers_lock_);
deserializers_.push_back(pending);
- return true;
}
bool SyncChannel::SyncContext::Pop() {
@@ -313,7 +275,7 @@ bool SyncChannel::SyncContext::Pop() {
PendingSyncMsg msg = deserializers_.back();
delete msg.deserializer;
delete msg.done_event;
- msg.done_event = nullptr;
+ msg.done_event = NULL;
deserializers_.pop_back();
result = msg.send_result;
}
@@ -330,12 +292,12 @@ bool SyncChannel::SyncContext::Pop() {
return result;
}
-MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
+WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
base::AutoLock auto_lock(deserializers_lock_);
return deserializers_.back().done_event;
}
-MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
+WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
return received_sync_msgs_->dispatch_event();
}
@@ -359,7 +321,7 @@ bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
DVLOG(1) << "Received error reply";
}
- MojoEvent* done_event = deserializers_.back().done_event;
+ base::WaitableEvent* done_event = deserializers_.back().done_event;
TRACE_EVENT_FLOW_BEGIN0(
TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
"SyncChannel::SyncContext::TryToUnblockListener", done_event);
@@ -405,7 +367,7 @@ void SyncChannel::SyncContext::OnChannelError() {
void SyncChannel::SyncContext::OnChannelOpened() {
shutdown_watcher_.StartWatching(
shutdown_event_,
- base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
+ base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
base::Unretained(this)));
Context::OnChannelOpened();
}
@@ -418,7 +380,6 @@ void SyncChannel::SyncContext::OnChannelClosed() {
void SyncChannel::SyncContext::CancelPendingSends() {
base::AutoLock auto_lock(deserializers_lock_);
- reject_new_deserializers_ = true;
PendingSyncMessageQueue::iterator iter;
DVLOG(1) << "Canceling pending sends";
for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
@@ -429,12 +390,21 @@ void SyncChannel::SyncContext::CancelPendingSends() {
}
}
-void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
- DCHECK_EQ(event, shutdown_event_);
+void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
+ if (event == shutdown_event_) {
+ // Process shut down before we can get a reply to a synchronous message.
+ // Cancel pending Send calls, which will end up setting the send done event.
+ CancelPendingSends();
+ } else {
+ // We got the reply, timed out or the process shutdown.
+ DCHECK_EQ(GetSendDoneEvent(), event);
+ base::MessageLoop::current()->QuitNow();
+ }
+}
- // Process shut down before we can get a reply to a synchronous message.
- // Cancel pending Send calls, which will end up setting the send done event.
- CancelPendingSends();
+base::WaitableEventWatcher::EventCallback
+ SyncChannel::SyncContext::MakeWaitableEventCallback() {
+ return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
}
// static
@@ -477,8 +447,7 @@ SyncChannel::SyncChannel(
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
WaitableEvent* shutdown_event)
- : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
- sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
+ : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
// The current (listener) thread must be distinct from the IPC thread, or else
// sending synchronous messages will deadlock.
DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
@@ -517,23 +486,23 @@ bool SyncChannel::Send(Message* message) {
return true;
}
- SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
- bool pump_messages = sync_msg->ShouldPumpMessages();
-
// *this* might get deleted in WaitForReply.
scoped_refptr<SyncContext> context(sync_context());
- if (!context->Push(sync_msg)) {
- DVLOG(1) << "Channel is shutting down. Dropping sync message.";
+ if (context->shutdown_event()->IsSignaled()) {
+ DVLOG(1) << "shutdown event is signaled";
delete message;
return false;
}
+ SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
+ context->Push(sync_msg);
+ WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
+
ChannelProxy::Send(message);
// Wait for reply, or for any other incoming synchronous messages.
// *this* might get deleted, so only call static functions at this point.
- scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
- WaitForReply(registry.get(), context.get(), pump_messages);
+ WaitForReply(context.get(), pump_messages_event);
TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
"SyncChannel::Send", context->GetSendDoneEvent());
@@ -541,43 +510,19 @@ bool SyncChannel::Send(Message* message) {
return context->Pop();
}
-void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
- SyncContext* context,
- bool pump_messages) {
+void SyncChannel::WaitForReply(
+ SyncContext* context, WaitableEvent* pump_messages_event) {
context->DispatchMessages();
-
- const MojoEvent* pump_messages_event = nullptr;
- if (pump_messages)
- pump_messages_event = g_pump_messages_event.Get().event();
-
while (true) {
- bool dispatch = false;
- bool send_done = false;
- bool should_pump_messages = false;
- bool error = false;
- registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &dispatch, &error));
- registry->RegisterHandle(
- context->GetSendDoneEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &send_done, &error));
- if (pump_messages_event) {
- registry->RegisterHandle(
- pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
- }
-
- const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
- registry->WatchAllHandles(stop_flags, 3);
- DCHECK(!error);
-
- registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle());
- registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
- if (pump_messages_event)
- registry->UnregisterHandle(pump_messages_event->GetHandle());
-
- if (dispatch) {
+ WaitableEvent* objects[] = {
+ context->GetDispatchEvent(),
+ context->GetSendDoneEvent(),
+ pump_messages_event
+ };
+
+ unsigned count = pump_messages_event ? 3: 2;
+ size_t result = WaitableEvent::WaitMany(objects, count);
+ if (result == 0 /* dispatch event */) {
// We're waiting for a reply, but we received a blocking synchronous
// call. We must process it or otherwise a deadlock might occur.
context->GetDispatchEvent()->Reset();
@@ -585,7 +530,7 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
continue;
}
- if (should_pump_messages)
+ if (result == 2 /* pump_messages_event */)
WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
break;
@@ -593,59 +538,64 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
}
void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
- mojo::Watcher send_done_watcher;
+ base::WaitableEventWatcher send_done_watcher;
ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
- DCHECK_NE(sync_msg_queue, nullptr);
-
- mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher();
- mojo::Handle old_handle(mojo::kInvalidHandleValue);
- mojo::Watcher::ReadyCallback old_callback;
-
- // Maintain a thread-local stack of watchers to ensure nested calls complete
- // in the correct sequence, i.e. the outermost call completes first, etc.
- if (old_watcher) {
- old_callback = old_watcher->ready_callback();
- old_handle = old_watcher->handle();
- old_watcher->Cancel();
+ DCHECK(sync_msg_queue != NULL);
+
+ base::WaitableEventWatcher* old_send_done_event_watcher =
+ sync_msg_queue->top_send_done_watcher();
+
+ base::WaitableEventWatcher::EventCallback old_callback;
+ base::WaitableEvent* old_event = NULL;
+
+ // Maintain a local global stack of send done delegates to ensure that
+ // nested sync calls complete in the correct sequence, i.e. the
+ // outermost call completes first, etc.
+ if (old_send_done_event_watcher) {
+ old_callback = old_send_done_event_watcher->callback();
+ old_event = old_send_done_event_watcher->GetWatchedEvent();
+ old_send_done_event_watcher->StopWatching();
}
sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
- {
- base::RunLoop nested_loop;
- send_done_watcher.Start(
- context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
+ send_done_watcher.StartWatching(context->GetSendDoneEvent(),
+ context->MakeWaitableEventCallback());
+ {
base::MessageLoop::ScopedNestableTaskAllower allow(
base::MessageLoop::current());
- nested_loop.Run();
- send_done_watcher.Cancel();
+ base::MessageLoop::current()->Run();
}
- sync_msg_queue->set_top_send_done_watcher(old_watcher);
- if (old_watcher)
- old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
+ sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
+ if (old_send_done_event_watcher && old_event) {
+ old_send_done_event_watcher->StartWatching(old_event, old_callback);
+ }
}
-void SyncChannel::OnDispatchHandleReady(MojoResult result) {
- DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
- if (result == MOJO_RESULT_OK) {
- sync_context()->GetDispatchEvent()->Reset();
- sync_context()->DispatchMessages();
- }
+void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
+ DCHECK(event == sync_context()->GetDispatchEvent());
+ // The call to DispatchMessages might delete this object, so reregister
+ // the object watcher first.
+ event->Reset();
+ dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
+ sync_context()->DispatchMessages();
}
void SyncChannel::StartWatching() {
// Ideally we only want to watch this object when running a nested message
// loop. However, we don't know when it exits if there's another nested
// message loop running under it or not, so we wouldn't know whether to
- // stop or keep watching. So we always watch it.
- dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(),
- MOJO_HANDLE_SIGNAL_READABLE,
- base::Bind(&SyncChannel::OnDispatchHandleReady,
- base::Unretained(this)));
+ // stop or keep watching. So we always watch it, and create the event as
+ // manual reset since the object watcher might otherwise reset the event
+ // when we're doing a WaitMany.
+ dispatch_watcher_callback_ =
+ base::Bind(&SyncChannel::OnWaitableEventSignaled,
+ base::Unretained(this));
+ dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
+ dispatch_watcher_callback_);
}
void SyncChannel::OnChannelInit() {
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698