 Chromium Code Reviews
 Chromium Code Reviews Issue 780223002:
  ChannelProxy: Send() from UI thread when ChannelMojo is used.  (Closed) 
  Base URL: https://chromium.googlesource.com/chromium/src.git@master
    
  
    Issue 780223002:
  ChannelProxy: Send() from UI thread when ChannelMojo is used.  (Closed) 
  Base URL: https://chromium.googlesource.com/chromium/src.git@master| Index: ipc/mojo/ipc_channel_mojo.cc | 
| diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc | 
| index 3b646c7c9f0f3a9eb128c368429e8f88faa7a59c..f0fb57ff064a26fd789ec6775ba15f16cd457f76 100644 | 
| --- a/ipc/mojo/ipc_channel_mojo.cc | 
| +++ b/ipc/mojo/ipc_channel_mojo.cc | 
| @@ -175,7 +175,7 @@ void ChannelMojo::ChannelInfoDeleter::operator()( | 
| // static | 
| bool ChannelMojo::ShouldBeUsed() { | 
| // TODO(morrita): Turn this on for a set of platforms. | 
| - return false; | 
| 
Hajime Morrita
2014/12/05 02:54:09
I do this just for exercising tests. No intention
 | 
| + return true; | 
| } | 
| // static | 
| @@ -240,8 +240,9 @@ ChannelMojo::~ChannelMojo() { | 
| } | 
| void ChannelMojo::InitDelegate(ChannelMojo::Delegate* delegate) { | 
| + delegate->OnChannelCreated(weak_factory_.GetWeakPtr()); | 
| + base::AutoLock l(lock_); | 
| delegate_ = delegate->ToWeakPtr(); | 
| - delegate_->OnChannelCreated(weak_factory_.GetWeakPtr()); | 
| } | 
| mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe( | 
| @@ -260,7 +261,12 @@ bool ChannelMojo::Connect() { | 
| } | 
| void ChannelMojo::Close() { | 
| - message_reader_.reset(); | 
| + // |message_reader_| has to be cleared inside the lock, | 
| + // but the instance has to be deleted outside. | 
| + scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_dead; | 
| + | 
| + base::AutoLock l(lock_); | 
| + to_be_dead = message_reader_.Pass(); | 
| channel_info_.reset(); | 
| } | 
| @@ -270,21 +276,38 @@ void ChannelMojo::OnBootstrapError() { | 
| void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe, | 
| int32_t peer_pid) { | 
| - message_reader_ = | 
| + scoped_ptr<internal::MessagePipeReader> reader = | 
| make_scoped_ptr(new internal::MessagePipeReader(pipe.Pass(), this)); | 
| - | 
| - for (size_t i = 0; i < pending_messages_.size(); ++i) { | 
| - bool sent = message_reader_->Send(make_scoped_ptr(pending_messages_[i])); | 
| - pending_messages_[i] = NULL; | 
| - if (!sent) { | 
| - pending_messages_.clear(); | 
| - listener_->OnChannelError(); | 
| - return; | 
| + MojoResult send_result = MOJO_RESULT_OK; | 
| + | 
| + { | 
| + base::AutoLock l(lock_); | 
| + ScopedVector<Message> pending; | 
| + | 
| + pending_messages_.swap(pending); | 
| + for (size_t i = 0; i < pending.size(); ++i) { | 
| + send_result = reader->Send(make_scoped_ptr(pending[i])); | 
| + pending[i] = nullptr; | 
| + if (send_result != MOJO_RESULT_OK) | 
| + break; | 
| } | 
| + | 
| + // We set |message_reader_| here and won't get any |pending_messages_| | 
| + // hereafter. Although we might have some if there is an error, we don't | 
| + // care. They cannot be sent anyway. | 
| + if (send_result == MOJO_RESULT_OK) | 
| + message_reader_ = reader.Pass(); | 
| } | 
| - pending_messages_.clear(); | 
| + // This should be done outside the |lock_| as delegates are called through | 
| + // |CloseWithError()|. | 
| + if (reader.get()) { | 
| + DCHECK(!message_reader_.get()); | 
| + reader->CloseWithError(send_result); | 
| + return; | 
| + } | 
| + DCHECK(message_reader_.get()); | 
| set_peer_pid(peer_pid); | 
| listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID())); | 
| } | 
| @@ -297,14 +320,48 @@ void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) { | 
| listener_->OnChannelError(); | 
| } | 
| +base::TaskRunner* ChannelMojo::GetIOTaskRunner() { | 
| + base::AutoLock l(lock_); | 
| + if (!delegate_) | 
| + return nullptr; | 
| + return delegate_->GetIOTaskRunner().get(); | 
| +} | 
| +// Reminder: Be thread-safe. | 
| bool ChannelMojo::Send(Message* message) { | 
| - if (!message_reader_) { | 
| - pending_messages_.push_back(message); | 
| - return true; | 
| + MojoResult send_result = MOJO_RESULT_OK; | 
| + | 
| + { | 
| + base::AutoLock l(lock_); | 
| + | 
| + if (!message_reader_) { | 
| + pending_messages_.push_back(message); | 
| + return true; | 
| + } | 
| + | 
| + send_result = message_reader_->Send(make_scoped_ptr(message)); | 
| + } | 
| + | 
| + if (send_result != MOJO_RESULT_OK) { | 
| + if (base::TaskRunner* runner = GetIOTaskRunner()) { | 
| + runner->PostTask( | 
| + FROM_HERE, | 
| + base::Bind(&internal::MessagePipeReader::CloseWithError, | 
| + base::Unretained(message_reader_.get()), send_result)); | 
| + // The error shouln't be visible at this point. | 
| + // It will be known eventually by |CloseWithError()| on IO thread. | 
| + return true; | 
| + } | 
| + | 
| + message_reader_->CloseWithError(send_result); | 
| + return false; | 
| } | 
| - return message_reader_->Send(make_scoped_ptr(message)); | 
| + return true; | 
| +} | 
| + | 
| +bool ChannelMojo::IsSendThreadSafe() const { | 
| + return true; | 
| } | 
| base::ProcessId ChannelMojo::GetPeerPID() const { |