| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index de81b2341e63e5e366ee4a6965dd112df42e721f..119124a424109864b85a280ad2b5c8035aa19f72 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -157,31 +157,46 @@ void MessagePipeDispatcher::InitOnIO() {
|
| calling_init_ = false;
|
| }
|
|
|
| -void MessagePipeDispatcher::CloseOnIO() {
|
| - base::AutoLock locker(lock());
|
| +void MessagePipeDispatcher::CloseOnIOAndRelease() {
|
| + {
|
| + base::AutoLock locker(lock());
|
| + CloseOnIO();
|
| + }
|
| Release(); // To match CloseImplNoLock.
|
| - if (transferable_) {
|
| - if (channel_) {
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| - } else {
|
| - if (non_transferable_state_ == CONNECT_CALLED ||
|
| - non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
|
| - if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| - RequestNontransferableChannel();
|
| -
|
| - // We can't cancel the pending request yet, since the other side of the
|
| - // message pipe would want to get pending outgoing messages (if any) or
|
| - // at least know that this end was closed. So keep this object alive until
|
| - // then.
|
| - non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
|
| - AddRef();
|
| - } else if (non_transferable_state_ == CONNECTED) {
|
| - internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| - non_transferable_state_ = CLOSED;
|
| - channel_ = nullptr;
|
| - }
|
| +}
|
| +
|
| +void MessagePipeDispatcher::CloseOnIO() {
|
| + lock().AssertAcquired();
|
| +
|
| + if (channel_) {
|
| + // If we closed the channel now then in-flight message pipes wouldn't get
|
| + // closed, and their other side wouldn't get a connection error notification
|
| + // which could lead to hangs or leaks. So we ask the other side of this
|
| + // message pipe to close, which ensures that we have dispatched all
|
| + // in-flight message pipes.
|
| + DCHECK(!close_requested_);
|
| + close_requested_ = true;
|
| + AddRef();
|
| + scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| + MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
|
| + if (!transferable_)
|
| + message->set_route_id(pipe_id_);
|
| + channel_->WriteMessage(std::move(message));
|
| + return;
|
| + }
|
| +
|
| + if (!transferable_ &&
|
| + (non_transferable_state_ == CONNECT_CALLED ||
|
| + non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) {
|
| + if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
|
| + RequestNontransferableChannel();
|
| +
|
| + // We can't cancel the pending request yet, since the other side of the
|
| + // message pipe would want to get pending outgoing messages (if any) or at
|
| + // least know that this end was closed. So keep this object alive until
|
| + // then.
|
| + non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
|
| + AddRef();
|
| }
|
| }
|
|
|
| @@ -190,6 +205,7 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
|
| }
|
|
|
| void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
|
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| base::AutoLock locker(lock());
|
| channel_ = channel;
|
| while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
|
| @@ -198,14 +214,15 @@ void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
|
| }
|
|
|
| if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
|
| - // We kept this object alive until it's connected, we can release it now.
|
| - internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| - non_transferable_state_ = CLOSED;
|
| - channel_ = nullptr;
|
| - base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
|
| - } else {
|
| non_transferable_state_ = CONNECTED;
|
| + // We kept this object alive until it's connected, we can close it now.
|
| + CloseOnIO();
|
| + // Balance the AddRef in CloseOnIO.
|
| + Release();
|
| + return;
|
| }
|
| +
|
| + non_transferable_state_ = CONNECTED;
|
| }
|
|
|
| #if defined(OS_WIN)
|
| @@ -408,7 +425,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
|
| serialized_(false),
|
| calling_init_(false),
|
| write_error_(false),
|
| - transferable_(transferable) {
|
| + transferable_(transferable),
|
| + close_requested_(false) {
|
| }
|
|
|
| MessagePipeDispatcher::~MessagePipeDispatcher() {
|
| @@ -450,7 +468,7 @@ void MessagePipeDispatcher::CloseImplNoLock() {
|
| // if the task runs.
|
| AddRef();
|
| internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this));
|
| + FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIOAndRelease, this));
|
| }
|
|
|
| void MessagePipeDispatcher::SerializeInternal() {
|
| @@ -867,27 +885,56 @@ void MessagePipeDispatcher::OnReadMessage(
|
| std::move(platform_handles)));
|
| }
|
|
|
| + bool call_release = false;
|
| if (started_transport_.Try()) {
|
| - // we're not in the middle of being sent
|
| + // We're not in the middle of being sent.
|
|
|
| // Can get synchronously called back in Init if there was initial data.
|
| scoped_ptr<base::AutoLock> locker;
|
| - if (!calling_init_) {
|
| + if (!calling_init_)
|
| locker.reset(new base::AutoLock(lock()));
|
| - }
|
|
|
| - bool was_empty = message_queue_.IsEmpty();
|
| - message_queue_.AddMessage(std::move(message));
|
| - if (was_empty)
|
| + if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
|
| + if (transferable_) {
|
| + channel_->Shutdown();
|
| + } else {
|
| + internal::g_broker->CloseMessagePipe(pipe_id_, this);
|
| + non_transferable_state_ = CLOSED;
|
| + }
|
| + channel_ = nullptr;
|
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| + if (close_requested_) {
|
| + // We requested the other side to close the connection while they also
|
| + // did the same. We must balance out the AddRef in CloseOnIO to ensure
|
| + // this object isn't leaked.
|
| + call_release = true;
|
| + }
|
| + } else {
|
| + bool was_empty = message_queue_.IsEmpty();
|
| + message_queue_.AddMessage(std::move(message));
|
| + if (was_empty)
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| + }
|
|
|
| started_transport_.Release();
|
| } else {
|
| - // If RawChannel is calling OnRead, that means it has its read_lock_
|
| - // acquired. That means StartSerialize can't be accessing message queue as
|
| - // it waits on ReleaseHandle first which acquires readlock_.
|
| - message_queue_.AddMessage(std::move(message));
|
| + if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) {
|
| + // We got a request to shutdown the channel but this object is already
|
| + // calling into channel to serialize it. Since all the other side cares
|
| + // about is flushing pending messages, we bounce the quit back to it.
|
| + scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| + MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr));
|
| + channel_->WriteMessage(std::move(message));
|
| + } else {
|
| + // If RawChannel is calling OnRead, that means it has its read_lock_
|
| + // acquired. That means StartSerialize can't be accessing message queue as
|
| + // it waits on ReleaseHandle first which acquires read_lock_.
|
| + message_queue_.AddMessage(std::move(message));
|
| + }
|
| }
|
| +
|
| + if (call_release)
|
| + Release();
|
| }
|
|
|
| void MessagePipeDispatcher::OnError(Error error) {
|
| @@ -923,12 +970,9 @@ void MessagePipeDispatcher::OnError(Error error) {
|
| break;
|
| }
|
|
|
| + bool call_release = false;
|
| if (started_transport_.Try()) {
|
| base::AutoLock locker(lock());
|
| - // We can get two OnError callbacks before the post task below completes.
|
| - // Although RawChannel still has a pointer to this object until Shutdown is
|
| - // called, that is safe since this class always does a PostTask to the IO
|
| - // thread to self destruct.
|
| if (channel_ && error != ERROR_WRITE) {
|
| if (transferable_) {
|
| channel_->Shutdown();
|
| @@ -938,12 +982,19 @@ void MessagePipeDispatcher::OnError(Error error) {
|
| non_transferable_state_ = CLOSED;
|
| }
|
| channel_ = nullptr;
|
| + if (close_requested_) {
|
| + // Balance AddRef in CloseOnIO.
|
| + call_release = true;
|
| + }
|
| }
|
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| started_transport_.Release();
|
| } else {
|
| // We must be waiting to call ReleaseHandle. It will call Shutdown.
|
| }
|
| +
|
| + if (call_release)
|
| + Release();
|
| }
|
|
|
| MojoResult MessagePipeDispatcher::AttachTransportsNoLock(
|
|
|