Chromium Code Reviews| Index: ipc/mojo/ipc_channel_mojo.cc |
| diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc |
| index 3b646c7c9f0f3a9eb128c368429e8f88faa7a59c..f0fb57ff064a26fd789ec6775ba15f16cd457f76 100644 |
| --- a/ipc/mojo/ipc_channel_mojo.cc |
| +++ b/ipc/mojo/ipc_channel_mojo.cc |
| @@ -175,7 +175,7 @@ void ChannelMojo::ChannelInfoDeleter::operator()( |
| // static |
| bool ChannelMojo::ShouldBeUsed() { |
| // TODO(morrita): Turn this on for a set of platforms. |
| - return false; |
|
Hajime Morrita
2014/12/05 02:54:09
I do this just for exercising tests. No intention
|
| + return true; |
| } |
| // static |
| @@ -240,8 +240,9 @@ ChannelMojo::~ChannelMojo() { |
| } |
| void ChannelMojo::InitDelegate(ChannelMojo::Delegate* delegate) { |
| + delegate->OnChannelCreated(weak_factory_.GetWeakPtr()); |
| + base::AutoLock l(lock_); |
| delegate_ = delegate->ToWeakPtr(); |
| - delegate_->OnChannelCreated(weak_factory_.GetWeakPtr()); |
| } |
| mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe( |
| @@ -260,7 +261,12 @@ bool ChannelMojo::Connect() { |
| } |
| void ChannelMojo::Close() { |
| - message_reader_.reset(); |
| + // |message_reader_| has to be cleared inside the lock, |
| + // but the instance has to be deleted outside. |
| + scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_dead; |
| + |
| + base::AutoLock l(lock_); |
| + to_be_dead = message_reader_.Pass(); |
| channel_info_.reset(); |
| } |
| @@ -270,21 +276,38 @@ void ChannelMojo::OnBootstrapError() { |
| void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe, |
| int32_t peer_pid) { |
| - message_reader_ = |
| + scoped_ptr<internal::MessagePipeReader> reader = |
| make_scoped_ptr(new internal::MessagePipeReader(pipe.Pass(), this)); |
| - |
| - for (size_t i = 0; i < pending_messages_.size(); ++i) { |
| - bool sent = message_reader_->Send(make_scoped_ptr(pending_messages_[i])); |
| - pending_messages_[i] = NULL; |
| - if (!sent) { |
| - pending_messages_.clear(); |
| - listener_->OnChannelError(); |
| - return; |
| + MojoResult send_result = MOJO_RESULT_OK; |
| + |
| + { |
| + base::AutoLock l(lock_); |
| + ScopedVector<Message> pending; |
| + |
| + pending_messages_.swap(pending); |
| + for (size_t i = 0; i < pending.size(); ++i) { |
| + send_result = reader->Send(make_scoped_ptr(pending[i])); |
| + pending[i] = nullptr; |
| + if (send_result != MOJO_RESULT_OK) |
| + break; |
| } |
| + |
| + // We set |message_reader_| here and won't get any |pending_messages_| |
| + // hereafter. Although we might have some if there is an error, we don't |
| + // care. They cannot be sent anyway. |
| + if (send_result == MOJO_RESULT_OK) |
| + message_reader_ = reader.Pass(); |
| } |
| - pending_messages_.clear(); |
| + // This should be done outside the |lock_| as delegates are called through |
| + // |CloseWithError()|. |
| + if (reader.get()) { |
| + DCHECK(!message_reader_.get()); |
| + reader->CloseWithError(send_result); |
| + return; |
| + } |
| + DCHECK(message_reader_.get()); |
| set_peer_pid(peer_pid); |
| listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID())); |
| } |
| @@ -297,14 +320,48 @@ void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) { |
| listener_->OnChannelError(); |
| } |
| +base::TaskRunner* ChannelMojo::GetIOTaskRunner() { |
| + base::AutoLock l(lock_); |
| + if (!delegate_) |
| + return nullptr; |
| + return delegate_->GetIOTaskRunner().get(); |
| +} |
| +// Reminder: Be thread-safe. |
| bool ChannelMojo::Send(Message* message) { |
| - if (!message_reader_) { |
| - pending_messages_.push_back(message); |
| - return true; |
| + MojoResult send_result = MOJO_RESULT_OK; |
| + |
| + { |
| + base::AutoLock l(lock_); |
| + |
| + if (!message_reader_) { |
| + pending_messages_.push_back(message); |
| + return true; |
| + } |
| + |
| + send_result = message_reader_->Send(make_scoped_ptr(message)); |
| + } |
| + |
| + if (send_result != MOJO_RESULT_OK) { |
| + if (base::TaskRunner* runner = GetIOTaskRunner()) { |
| + runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&internal::MessagePipeReader::CloseWithError, |
| + base::Unretained(message_reader_.get()), send_result)); |
| + // The error shouln't be visible at this point. |
| + // It will be known eventually by |CloseWithError()| on IO thread. |
| + return true; |
| + } |
| + |
| + message_reader_->CloseWithError(send_result); |
| + return false; |
| } |
| - return message_reader_->Send(make_scoped_ptr(message)); |
| + return true; |
| +} |
| + |
| +bool ChannelMojo::IsSendThreadSafe() const { |
| + return true; |
| } |
| base::ProcessId ChannelMojo::GetPeerPID() const { |