Chromium Code Reviews| Index: mojo/edk/system/message_pipe_dispatcher.cc |
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
| index de81b2341e63e5e366ee4a6965dd112df42e721f..5284b216ac6004461d9d68d005b5051872fa776b 100644 |
| --- a/mojo/edk/system/message_pipe_dispatcher.cc |
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc |
| @@ -157,31 +157,39 @@ void MessagePipeDispatcher::InitOnIO() { |
| calling_init_ = false; |
| } |
| -void MessagePipeDispatcher::CloseOnIO() { |
| +void MessagePipeDispatcher::ReleaseAndCloseOnIO() { |
| base::AutoLock locker(lock()); |
| Release(); // To match CloseImplNoLock. |
| - if (transferable_) { |
| - if (channel_) { |
| - channel_->Shutdown(); |
| - channel_ = nullptr; |
| - } |
| - } else { |
| - if (non_transferable_state_ == CONNECT_CALLED || |
| - non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| - if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| - RequestNontransferableChannel(); |
| - |
| - // We can't cancel the pending request yet, since the other side of the |
| - // message pipe would want to get pending outgoing messages (if any) or |
| - // at least know that this end was closed. So keep this object alive until |
| - // then. |
| - non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| - AddRef(); |
| - } else if (non_transferable_state_ == CONNECTED) { |
| - internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| - non_transferable_state_ = CLOSED; |
| - channel_ = nullptr; |
| - } |
| + CloseOnIO(); |
| +} |
| + |
| +void MessagePipeDispatcher::CloseOnIO() { |
| + lock().AssertAcquired(); |
| + |
| + if (channel_) { |
| + DCHECK(!close_requested_); |
| + close_requested_ = true; |
| + AddRef(); |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| + if (!transferable_) |
| + message->set_route_id(pipe_id_); |
| + channel_->WriteMessage(std::move(message)); |
| + return; |
| + } |
| + |
| + if (!transferable_ && |
| + (non_transferable_state_ == CONNECT_CALLED || |
| + non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)) { |
| + if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| + RequestNontransferableChannel(); |
| + |
| + // We can't cancel the pending request yet, since the other side of the |
| + // message pipe would want to get pending outgoing messages (if any) or at |
| + // least know that this end was closed. So keep this object alive until |
| + // then. |
| + non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| + AddRef(); |
| } |
| } |
| @@ -190,6 +198,7 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| } |
| void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| base::AutoLock locker(lock()); |
| channel_ = channel; |
| while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| @@ -198,14 +207,15 @@ void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| } |
| if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
| - // We kept this object alive until it's connected, we can release it now. |
| - internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| - non_transferable_state_ = CLOSED; |
| - channel_ = nullptr; |
| - base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| - } else { |
| non_transferable_state_ = CONNECTED; |
| + // We kept this object alive until it's connected, we can close it now. |
| + CloseOnIO(); |
| + // Balance the AddRef in CloseOnIO. |
| + base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| + return; |
| } |
| + |
| + non_transferable_state_ = CONNECTED; |
| } |
| #if defined(OS_WIN) |
| @@ -408,7 +418,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| serialized_(false), |
| calling_init_(false), |
| write_error_(false), |
| - transferable_(transferable) { |
| + transferable_(transferable), |
| + close_requested_(false) { |
| } |
| MessagePipeDispatcher::~MessagePipeDispatcher() { |
| @@ -450,7 +461,7 @@ void MessagePipeDispatcher::CloseImplNoLock() { |
| // if the task runs. |
| AddRef(); |
| internal::g_io_thread_task_runner->PostTask( |
| - FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
| + FROM_HERE, base::Bind(&MessagePipeDispatcher::ReleaseAndCloseOnIO, this)); |
| } |
| void MessagePipeDispatcher::SerializeInternal() { |
| @@ -868,25 +879,44 @@ void MessagePipeDispatcher::OnReadMessage( |
| } |
| if (started_transport_.Try()) { |
| - // we're not in the middle of being sent |
| + // We're not in the middle of being sent. |
| // Can get synchronously called back in Init if there was initial data. |
| scoped_ptr<base::AutoLock> locker; |
| - if (!calling_init_) { |
| + if (!calling_init_) |
| locker.reset(new base::AutoLock(lock())); |
| - } |
| - bool was_empty = message_queue_.IsEmpty(); |
| - message_queue_.AddMessage(std::move(message)); |
| - if (was_empty) |
| + if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
| + if (transferable_) { |
| + channel_->Shutdown(); |
| + } else { |
| + internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| + non_transferable_state_ = CLOSED; |
| + } |
| + channel_ = nullptr; |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| + } else { |
| + bool was_empty = message_queue_.IsEmpty(); |
| + message_queue_.AddMessage(std::move(message)); |
| + if (was_empty) |
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| + } |
| started_transport_.Release(); |
| } else { |
| - // If RawChannel is calling OnRead, that means it has its read_lock_ |
| - // acquired. That means StartSerialize can't be accessing message queue as |
| - // it waits on ReleaseHandle first which acquires readlock_. |
| - message_queue_.AddMessage(std::move(message)); |
| + if (message_view.type() == MessageInTransit::Type::QUIT_MESSAGE) { |
| + // We got a request to shutdown the channel but this object is already |
| + // calling into channel to serialize it. Since all the other side cares |
| + // about is flushing pending messages, we bounce the quit back to it. |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::QUIT_MESSAGE, 0, nullptr)); |
| + channel_->WriteMessage(std::move(message)); |
| + } else { |
| + // If RawChannel is calling OnRead, that means it has its read_lock_ |
| + // acquired. That means StartSerialize can't be accessing message queue as |
| + // it waits on ReleaseHandle first which acquires read_lock_. |
| + message_queue_.AddMessage(std::move(message)); |
| + } |
| } |
| } |
| @@ -925,10 +955,6 @@ void MessagePipeDispatcher::OnError(Error error) { |
| if (started_transport_.Try()) { |
| base::AutoLock locker(lock()); |
| - // We can get two OnError callbacks before the post task below completes. |
| - // Although RawChannel still has a pointer to this object until Shutdown is |
| - // called, that is safe since this class always does a PostTask to the IO |
| - // thread to self destruct. |
| if (channel_ && error != ERROR_WRITE) { |
| if (transferable_) { |
| channel_->Shutdown(); |
| @@ -938,6 +964,10 @@ void MessagePipeDispatcher::OnError(Error error) { |
| non_transferable_state_ = CLOSED; |
| } |
| channel_ = nullptr; |
| + if (close_requested_) { |
| + // Balance AddRef in CloseOnIO. |
| + base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
|
sky
2015/12/30 16:33:32
Is there a reason you can't Release() rather than
jam
2015/12/30 17:03:40
This might be the last reference, so I was just si
|
| + } |
| } |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| started_transport_.Release(); |