Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(185)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1554623005: Ensure that in-flight message pipes are always closed and the other end is notified. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix dcheck of lock being acquired at destruction Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index de81b2341e63e5e366ee4a6965dd112df42e721f..119124a424109864b85a280ad2b5c8035aa19f72 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -157,31 +157,46 @@ void MessagePipeDispatcher::InitOnIO() {
calling_init_ = false;
}
-void MessagePipeDispatcher::CloseOnIO() {
- base::AutoLock locker(lock());
+void MessagePipeDispatcher::CloseOnIOAndRelease() {
+ {
+ base::AutoLock locker(lock());
+ CloseOnIO();
+ }
Release(); // To match CloseImplNoLock.
- if (transferable_) {
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
- }
- } else {
- if (non_transferable_state_ == CONNECT_CALLED ||
- non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
- if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
- RequestNontransferableChannel();
-
- // We can't cancel the pending request yet, since the other side of the
- // message pipe would want to get pending outgoing messages (if any) or
- // at least know that this end was closed. So keep this object alive until
- // then.
- non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
- AddRef();
- } else if (non_transferable_state_ == CONNECTED) {
- internal::g_broker->CloseMessagePipe(pipe_id_, this);
- non_transferable_state_ = CLOSED;
- channel_ = nullptr;
- }
+}
+
+void MessagePipeDispatcher::CloseOnIO() {
+ lock().AssertAcquired();
+
+ if (channel_) {
+ // If we closed the channel now then in-flight message pipes wouldn't get
+ // closed, and their other side wouldn't get a connection error notification
+ // which could lead to hangs or leaks. So we ask the other side of this
+ // message pipe to close, which ensures that we have dispatched all
+ // in-flight message pipes.
+ DCHECK(!close_requested_);
+ close_requested_ = true;
+ AddRef();
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
+ if (!transferable_)
+ message->set_route_id(pipe_id_);
+ channel_->WriteMessage(std::move(message));
+ return;
+ }
+
+ if (!transferable_ &&
+ (non_transferable_state_ == CONNECT_CALLED ||
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ RequestNontransferableChannel();
+
+ // We can't cancel the pending request yet, since the other side of the
+ // message pipe would want to get pending outgoing messages (if any) or at
+ // least know that this end was closed. So keep this object alive until
+ // then.
+ non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
+ AddRef();
}
}
@@ -190,6 +205,7 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
}
void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
base::AutoLock locker(lock());
channel_ = channel;
while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
@@ -198,14 +214,15 @@ void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
}
if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
- // We kept this object alive until it's connected, we can release it now.
- internal::g_broker->CloseMessagePipe(pipe_id_, this);
- non_transferable_state_ = CLOSED;
- channel_ = nullptr;
- base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
- } else {
non_transferable_state_ = CONNECTED;
+ // We kept this object alive until it's connected, we can close it now.
+ CloseOnIO();
+ // Balance the AddRef in CloseOnIO.
+ Release();
+ return;
}
+
+ non_transferable_state_ = CONNECTED;
}
#if defined(OS_WIN)
@@ -408,7 +425,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
serialized_(false),
calling_init_(false),
write_error_(false),
- transferable_(transferable) {
+ transferable_(transferable),
+ close_requested_(false) {
}
MessagePipeDispatcher::~MessagePipeDispatcher() {
@@ -450,7 +468,7 @@ void MessagePipeDispatcher::CloseImplNoLock() {
// if the task runs.
AddRef();
internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this));
}
void MessagePipeDispatcher::SerializeInternal() {
@@ -867,27 +885,56 @@ void MessagePipeDispatcher::OnReadMessage(
std::move(platform_handles)));
}
+ bool call_release = false;
if (started_transport_.Try()) {
- // we're not in the middle of being sent
+ // We're not in the middle of being sent.
// Can get synchronously called back in Init if there was initial data.
scoped_ptr<base::AutoLock> locker;
- if (!calling_init_) {
+ if (!calling_init_)
locker.reset(new base::AutoLock(lock()));
- }
- bool was_empty = message_queue_.IsEmpty();
- message_queue_.AddMessage(std::move(message));
- if (was_empty)
+ if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
+ if (transferable_) {
+ channel_->Shutdown();
+ } else {
+ internal::g_broker->CloseMessagePipe(pipe_id_, this);
+ non_transferable_state_ = CLOSED;
+ }
+ channel_ = nullptr;
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ if (close_requested_) {
+ // We requested the other side to close the connection while they also
+ // did the same. We must balance out the AddRef in CloseOnIO to ensure
+ // this object isn't leaked.
+ call_release = true;
+ }
+ } else {
+ bool was_empty = message_queue_.IsEmpty();
+ message_queue_.AddMessage(std::move(message));
+ if (was_empty)
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ }
started_transport_.Release();
} else {
- // If RawChannel is calling OnRead, that means it has its read_lock_
- // acquired. That means StartSerialize can't be accessing message queue as
- // it waits on ReleaseHandle first which acquires readlock_.
- message_queue_.AddMessage(std::move(message));
+ if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
+ // We got a request to shutdown the channel but this object is already
+ // calling into channel to serialize it. Since all the other side cares
+ // about is flushing pending messages, we bounce the quit back to it.
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
+ channel_->WriteMessage(std::move(message));
+ } else {
+ // If RawChannel is calling OnRead, that means it has its read_lock_
+ // acquired. That means StartSerialize can't be accessing message queue as
+ // it waits on ReleaseHandle first which acquires read_lock_.
+ message_queue_.AddMessage(std::move(message));
+ }
}
+
+ if (call_release)
+ Release();
}
void MessagePipeDispatcher::OnError(Error error) {
@@ -923,12 +970,9 @@ void MessagePipeDispatcher::OnError(Error error) {
break;
}
+ bool call_release = false;
if (started_transport_.Try()) {
base::AutoLock locker(lock());
- // We can get two OnError callbacks before the post task below completes.
- // Although RawChannel still has a pointer to this object until Shutdown is
- // called, that is safe since this class always does a PostTask to the IO
- // thread to self destruct.
if (channel_ && error != ERROR_WRITE) {
if (transferable_) {
channel_->Shutdown();
@@ -938,12 +982,19 @@ void MessagePipeDispatcher::OnError(Error error) {
non_transferable_state_ = CLOSED;
}
channel_ = nullptr;
+ if (close_requested_) {
+ // Balance AddRef in CloseOnIO.
+ call_release = true;
+ }
}
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
started_transport_.Release();
} else {
// We must be waiting to call ReleaseHandle. It will call Shutdown.
}
+
+ if (call_release)
+ Release();
}
MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.h ('k') | mojo/edk/system/raw_channel.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698