Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index de81b2341e63e5e366ee4a6965dd112df42e721f..119124a424109864b85a280ad2b5c8035aa19f72 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -157,31 +157,46 @@ void MessagePipeDispatcher::InitOnIO() { |
calling_init_ = false; |
} |
-void MessagePipeDispatcher::CloseOnIO() { |
- base::AutoLock locker(lock()); |
+void MessagePipeDispatcher::CloseOnIOAndRelease() { |
+ { |
+ base::AutoLock locker(lock()); |
+ CloseOnIO(); |
+ } |
Release(); // To match CloseImplNoLock. |
- if (transferable_) { |
- if (channel_) { |
- channel_->Shutdown(); |
- channel_ = nullptr; |
- } |
- } else { |
- if (non_transferable_state_ == CONNECT_CALLED || |
- non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
- if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
- RequestNontransferableChannel(); |
- |
- // We can't cancel the pending request yet, since the other side of the |
- // message pipe would want to get pending outgoing messages (if any) or |
- // at least know that this end was closed. So keep this object alive until |
- // then. |
- non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
- AddRef(); |
- } else if (non_transferable_state_ == CONNECTED) { |
- internal::g_broker->CloseMessagePipe(pipe_id_, this); |
- non_transferable_state_ = CLOSED; |
- channel_ = nullptr; |
- } |
+} |
+ |
+void MessagePipeDispatcher::CloseOnIO() { |
+ lock().AssertAcquired(); |
+ |
+ if (channel_) { |
+ // If we closed the channel now then in-flight message pipes wouldn't get |
+ // closed, and their other side wouldn't get a connection error notification |
+ // which could lead to hangs or leaks. So we ask the other side of this |
+ // message pipe to close, which ensures that we have dispatched all |
+ // in-flight message pipes. |
+ DCHECK(!close_requested_); |
+ close_requested_ = true; |
+ AddRef(); |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
+ if (!transferable_) |
+ message->set_route_id(pipe_id_); |
+ channel_->WriteMessage(std::move(message)); |
+ return; |
+ } |
+ |
+ if (!transferable_ && |
+ (non_transferable_state_ == CONNECT_CALLED || |
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { |
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
+ RequestNontransferableChannel(); |
+ |
+ // We can't cancel the pending request yet, since the other side of the |
+ // message pipe would want to get pending outgoing messages (if any) or at |
+ // least know that this end was closed. So keep this object alive until |
+ // then. |
+ non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
+ AddRef(); |
} |
} |
@@ -190,6 +205,7 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { |
} |
void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
base::AutoLock locker(lock()); |
channel_ = channel; |
while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
@@ -198,14 +214,15 @@ void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
} |
if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
- // We kept this object alive until it's connected, we can release it now. |
- internal::g_broker->CloseMessagePipe(pipe_id_, this); |
- non_transferable_state_ = CLOSED; |
- channel_ = nullptr; |
- base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
- } else { |
non_transferable_state_ = CONNECTED; |
+ // We kept this object alive until it's connected, we can close it now. |
+ CloseOnIO(); |
+ // Balance the AddRef in CloseOnIO. |
+ Release(); |
+ return; |
} |
+ |
+ non_transferable_state_ = CONNECTED; |
} |
#if defined(OS_WIN) |
@@ -408,7 +425,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
serialized_(false), |
calling_init_(false), |
write_error_(false), |
- transferable_(transferable) { |
+ transferable_(transferable), |
+ close_requested_(false) { |
} |
MessagePipeDispatcher::~MessagePipeDispatcher() { |
@@ -450,7 +468,7 @@ void MessagePipeDispatcher::CloseImplNoLock() { |
// if the task runs. |
AddRef(); |
internal::g_io_thread_task_runner->PostTask( |
- FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
+ FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this)); |
} |
void MessagePipeDispatcher::SerializeInternal() { |
@@ -867,27 +885,56 @@ void MessagePipeDispatcher::OnReadMessage( |
std::move(platform_handles))); |
} |
+ bool call_release = false; |
if (started_transport_.Try()) { |
- // we're not in the middle of being sent |
+ // We're not in the middle of being sent. |
// Can get synchronously called back in Init if there was initial data. |
scoped_ptr<base::AutoLock> locker; |
- if (!calling_init_) { |
+ if (!calling_init_) |
locker.reset(new base::AutoLock(lock())); |
- } |
- bool was_empty = message_queue_.IsEmpty(); |
- message_queue_.AddMessage(std::move(message)); |
- if (was_empty) |
+ if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
+ if (transferable_) { |
+ channel_->Shutdown(); |
+ } else { |
+ internal::g_broker->CloseMessagePipe(pipe_id_, this); |
+ non_transferable_state_ = CLOSED; |
+ } |
+ channel_ = nullptr; |
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ if (close_requested_) { |
+ // We requested the other side to close the connection while they also |
+ // did the same. We must balance out the AddRef in CloseOnIO to ensure |
+ // this object isn't leaked. |
+ call_release = true; |
+ } |
+ } else { |
+ bool was_empty = message_queue_.IsEmpty(); |
+ message_queue_.AddMessage(std::move(message)); |
+ if (was_empty) |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
+ } |
started_transport_.Release(); |
} else { |
- // If RawChannel is calling OnRead, that means it has its read_lock_ |
- // acquired. That means StartSerialize can't be accessing message queue as |
- // it waits on ReleaseHandle first which acquires readlock_. |
- message_queue_.AddMessage(std::move(message)); |
+ if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
+ // We got a request to shutdown the channel but this object is already |
+ // calling into channel to serialize it. Since all the other side cares |
+ // about is flushing pending messages, we bounce the quit back to it. |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
+ channel_->WriteMessage(std::move(message)); |
+ } else { |
+ // If RawChannel is calling OnRead, that means it has its read_lock_ |
+ // acquired. That means StartSerialize can't be accessing message queue as |
+ // it waits on ReleaseHandle first which acquires read_lock_. |
+ message_queue_.AddMessage(std::move(message)); |
+ } |
} |
+ |
+ if (call_release) |
+ Release(); |
} |
void MessagePipeDispatcher::OnError(Error error) { |
@@ -923,12 +970,9 @@ void MessagePipeDispatcher::OnError(Error error) { |
break; |
} |
+ bool call_release = false; |
if (started_transport_.Try()) { |
base::AutoLock locker(lock()); |
- // We can get two OnError callbacks before the post task below completes. |
- // Although RawChannel still has a pointer to this object until Shutdown is |
- // called, that is safe since this class always does a PostTask to the IO |
- // thread to self destruct. |
if (channel_ && error != ERROR_WRITE) { |
if (transferable_) { |
channel_->Shutdown(); |
@@ -938,12 +982,19 @@ void MessagePipeDispatcher::OnError(Error error) { |
non_transferable_state_ = CLOSED; |
} |
channel_ = nullptr; |
+ if (close_requested_) { |
+ // Balance AddRef in CloseOnIO. |
+ call_release = true; |
+ } |
} |
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
started_transport_.Release(); |
} else { |
// We must be waiting to call ReleaseHandle. It will call Shutdown. |
} |
+ |
+ if (call_release) |
+ Release(); |
} |
MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |