Index: ipc/mojo/ipc_channel_mojo.cc |
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc |
index 3b646c7c9f0f3a9eb128c368429e8f88faa7a59c..f0fb57ff064a26fd789ec6775ba15f16cd457f76 100644 |
--- a/ipc/mojo/ipc_channel_mojo.cc |
+++ b/ipc/mojo/ipc_channel_mojo.cc |
@@ -175,7 +175,7 @@ void ChannelMojo::ChannelInfoDeleter::operator()( |
// static |
bool ChannelMojo::ShouldBeUsed() { |
// TODO(morrita): Turn this on for a set of platforms. |
- return false; |
Hajime Morrita
2014/12/05 02:54:09
I do this just for exercising tests. No intention
|
+ return true; |
} |
// static |
@@ -240,8 +240,9 @@ ChannelMojo::~ChannelMojo() { |
} |
void ChannelMojo::InitDelegate(ChannelMojo::Delegate* delegate) { |
+ delegate->OnChannelCreated(weak_factory_.GetWeakPtr()); |
+ base::AutoLock l(lock_); |
delegate_ = delegate->ToWeakPtr(); |
- delegate_->OnChannelCreated(weak_factory_.GetWeakPtr()); |
} |
mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe( |
@@ -260,7 +261,12 @@ bool ChannelMojo::Connect() { |
} |
void ChannelMojo::Close() { |
- message_reader_.reset(); |
+ // |message_reader_| has to be cleared inside the lock, |
+ // but the instance has to be deleted outside. |
+ scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_dead; |
+ |
+ base::AutoLock l(lock_); |
+ to_be_dead = message_reader_.Pass(); |
channel_info_.reset(); |
} |
@@ -270,21 +276,38 @@ void ChannelMojo::OnBootstrapError() { |
void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe, |
int32_t peer_pid) { |
- message_reader_ = |
+ scoped_ptr<internal::MessagePipeReader> reader = |
make_scoped_ptr(new internal::MessagePipeReader(pipe.Pass(), this)); |
- |
- for (size_t i = 0; i < pending_messages_.size(); ++i) { |
- bool sent = message_reader_->Send(make_scoped_ptr(pending_messages_[i])); |
- pending_messages_[i] = NULL; |
- if (!sent) { |
- pending_messages_.clear(); |
- listener_->OnChannelError(); |
- return; |
+ MojoResult send_result = MOJO_RESULT_OK; |
+ |
+ { |
+ base::AutoLock l(lock_); |
+ ScopedVector<Message> pending; |
+ |
+ pending_messages_.swap(pending); |
+ for (size_t i = 0; i < pending.size(); ++i) { |
+ send_result = reader->Send(make_scoped_ptr(pending[i])); |
+ pending[i] = nullptr; |
+ if (send_result != MOJO_RESULT_OK) |
+ break; |
} |
+ |
+ // We set |message_reader_| here and won't get any |pending_messages_| |
+ // hereafter. Although we might have some if there is an error, we don't |
+ // care. They cannot be sent anyway. |
+ if (send_result == MOJO_RESULT_OK) |
+ message_reader_ = reader.Pass(); |
} |
- pending_messages_.clear(); |
+ // This should be done outside the |lock_| as delegates are called through |
+ // |CloseWithError()|. |
+ if (reader.get()) { |
+ DCHECK(!message_reader_.get()); |
+ reader->CloseWithError(send_result); |
+ return; |
+ } |
+ DCHECK(message_reader_.get()); |
set_peer_pid(peer_pid); |
listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID())); |
} |
@@ -297,14 +320,48 @@ void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) { |
listener_->OnChannelError(); |
} |
+base::TaskRunner* ChannelMojo::GetIOTaskRunner() { |
+ base::AutoLock l(lock_); |
+ if (!delegate_) |
+ return nullptr; |
+ return delegate_->GetIOTaskRunner().get(); |
+} |
+// Reminder: Be thread-safe. |
bool ChannelMojo::Send(Message* message) { |
- if (!message_reader_) { |
- pending_messages_.push_back(message); |
- return true; |
+ MojoResult send_result = MOJO_RESULT_OK; |
+ |
+ { |
+ base::AutoLock l(lock_); |
+ |
+ if (!message_reader_) { |
+ pending_messages_.push_back(message); |
+ return true; |
+ } |
+ |
+ send_result = message_reader_->Send(make_scoped_ptr(message)); |
+ } |
+ |
+ if (send_result != MOJO_RESULT_OK) { |
+ if (base::TaskRunner* runner = GetIOTaskRunner()) { |
+ runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&internal::MessagePipeReader::CloseWithError, |
+ base::Unretained(message_reader_.get()), send_result)); |
+ // The error shouln't be visible at this point. |
+ // It will be known eventually by |CloseWithError()| on IO thread. |
+ return true; |
+ } |
+ |
+ message_reader_->CloseWithError(send_result); |
+ return false; |
} |
- return message_reader_->Send(make_scoped_ptr(message)); |
+ return true; |
+} |
+ |
+bool ChannelMojo::IsSendThreadSafe() const { |
+ return true; |
} |
base::ProcessId ChannelMojo::GetPeerPID() const { |