Chromium Code Reviews| Index: ipc/mojo/ipc_channel_mojo.cc |
| diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..febf11affae10c05dba175333dfcbd2d9e1d0564 |
| --- /dev/null |
| +++ b/ipc/mojo/ipc_channel_mojo.cc |
| @@ -0,0 +1,582 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "ipc/mojo/ipc_channel_mojo.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/bind_helpers.h" |
| +#include "base/lazy_instance.h" |
| +#include "ipc/ipc_listener.h" |
| +#include "mojo/embedder/embedder.h" |
| + |
| +#if defined(OS_POSIX) && !defined(OS_NACL) |
| +#include "ipc/file_descriptor_set_posix.h" |
| +#endif |
| + |
| +namespace IPC { |
| + |
| +namespace { |
| + |
| +// IPC::Listener for bootstrap channels. |
| +// It should never receive any message. |
| +class NullListener : public Listener { |
| + public: |
| + virtual bool OnMessageReceived(const Message&) OVERRIDE { |
| + NOTREACHED(); |
| + return false; |
| + } |
| + |
| + virtual void OnChannelConnected(int32 peer_pid) OVERRIDE { |
| + NOTREACHED(); |
| + } |
| + |
| + virtual void OnChannelError() OVERRIDE { |
| + NOTREACHED(); |
| + } |
| + |
| + virtual void OnBadMessageReceived(const Message& message) OVERRIDE { |
| + NOTREACHED(); |
| + } |
| +}; |
| + |
| +base::LazyInstance<NullListener> g_null_listener = LAZY_INSTANCE_INITIALIZER; |
| + |
| +class MojoChannelBuilder : public ChannelBuilder { |
| + public: |
| + MojoChannelBuilder( |
| + ChannelHandle channel_handle, |
| + Channel::Mode mode, |
| + scoped_refptr<base::TaskRunner> io_thread_task_runner) |
| + : channel_handle_(channel_handle), |
| + mode_(mode), |
| + io_thread_task_runner_(io_thread_task_runner) { |
| + } |
| + |
| + virtual std::string GetName() const OVERRIDE { |
| + return channel_handle_.name; |
| + } |
| + |
| + virtual scoped_ptr<Channel> BuildChannel(Listener* listener) OVERRIDE { |
| + return ChannelMojo::Create( |
| + channel_handle_, |
| + mode_, |
| + listener, |
| + io_thread_task_runner_).PassAs<Channel>(); |
| + } |
| + |
| + private: |
| + ChannelHandle channel_handle_; |
| + Channel::Mode mode_; |
| + scoped_refptr<base::TaskRunner> io_thread_task_runner_; |
| +}; |
| + |
| +mojo::embedder::PlatformHandle ToPlatformHandle( |
| + const ChannelHandle& handle) { |
| +#if defined(OS_POSIX) && !defined(OS_NACL) |
| + return mojo::embedder::PlatformHandle(handle.socket.fd); |
| +#elif defined(OS_WIN) |
| + return mojo::embedder::PlatformHandle(handle.pipe.handle); |
| +#else |
| +#error "Unsupported Platform!" |
| +#endif |
| +} |
| + |
| +//------------------------------------------------------------------------------ |
| + |
| +// TODO(morrita): This should be built using higher-level Mojo construct |
| +// for clarify and extensibility. |
| +class HelloMessage { |
|
viettrungluu
2014/07/17 15:11:54
Why do you need to send any hello messages, etc.?
Hajime Morrita
2014/07/17 18:32:12
I'm using the magic header so that we can detect a
|
| + public: |
| + static Pickle CreateRequest(int32 pid) { |
| + Pickle request; |
| + request.WriteString(kHelloRequestMagic); |
| + request.WriteInt(pid); |
| + return request; |
| + } |
| + |
| + static bool ReadRequest(Pickle& pickle, int32* pid) { |
| + PickleIterator iter(pickle); |
| + std::string hello; |
| + if (!iter.ReadString(&hello)) { |
| + DLOG(WARNING) << "Failed to Read magic string."; |
| + return false; |
| + } |
| + |
| + if (hello != kHelloRequestMagic) { |
| + DLOG(WARNING) << "Magic mismatch:" << hello; |
| + return false; |
| + } |
| + |
| + int read_pid; |
| + if (!iter.ReadInt(&read_pid)) { |
| + DLOG(WARNING) << "Failed to Read PID."; |
| + return false; |
| + } |
| + |
| + *pid = read_pid; |
| + return true; |
| + } |
| + |
| + static Pickle CreateResponse(int32 pid) { |
| + Pickle request; |
| + request.WriteString(kHelloResponseMagic); |
| + request.WriteInt(pid); |
| + return request; |
| + } |
| + |
| + static bool ReadResponse(Pickle& pickle, int32* pid) { |
| + PickleIterator iter(pickle); |
| + std::string hello; |
| + if (!iter.ReadString(&hello)) { |
| + DLOG(WARNING) << "Failed to read magic string."; |
| + return false; |
| + } |
| + |
| + if (hello != kHelloResponseMagic) { |
| + DLOG(WARNING) << "Magic mismatch:" << hello; |
| + return false; |
| + } |
| + |
| + int read_pid; |
| + if (!iter.ReadInt(&read_pid)) { |
| + DLOG(WARNING) << "Failed to read PID."; |
| + return false; |
| + } |
| + |
| + *pid = read_pid; |
| + return true; |
| + } |
| + |
| + private: |
| + static const char* kHelloRequestMagic; |
| + static const char* kHelloResponseMagic; |
| +}; |
| + |
| +const char* HelloMessage::kHelloRequestMagic = "MREQ"; |
| +const char* HelloMessage::kHelloResponseMagic = "MRES"; |
| + |
| +} // namespace |
| + |
| +//------------------------------------------------------------------------------ |
| + |
| +// A MessagePipeReader implemenation for IPC::Message communication. |
| +class ChannelMojo::MessageReader : public internal::MessagePipeReader { |
| + public: |
| + MessageReader(ChannelMojo* owner) |
| + : owner_(owner) { } |
| + |
| + bool Send(scoped_ptr<Message> message); |
| + virtual void OnMessageArrived() OVERRIDE; |
| + virtual void OnPipeClosed() OVERRIDE; |
| + virtual void OnPipeError(MojoResult error) OVERRIDE; |
| + |
| + private: |
| + ChannelMojo* owner_; |
| +}; |
| + |
| +void ChannelMojo::MessageReader::OnMessageArrived() { |
| + Message message(data_buffer().empty() ? "" : &data_buffer()[0], |
| + data_buffer().size()); |
| + |
| +#if defined(OS_POSIX) && !defined(OS_NACL) |
| + for (size_t i = 0; i < handle_buffer().size(); ++i) { |
| + mojo::embedder::ScopedPlatformHandle platform_handle; |
| + MojoResult unwrap_result = mojo::embedder::PassWrappedPlatformHandle( |
| + handle_buffer()[i], &platform_handle); |
| + if (unwrap_result != MOJO_RESULT_OK) { |
| + DLOG(WARNING) << "Pipe failed to covert handles. Closing: " |
| + << unwrap_result; |
| + CloseWithError(unwrap_result); |
| + return; |
| + } |
| + |
| + bool ok = message.file_descriptor_set()->Add(platform_handle.release().fd); |
| + DCHECK(ok); |
| + } |
| +#else |
| + DCHECK(handle_buffer_.empty()); |
| +#endif |
| + |
| + message.TraceMessageEnd(); |
| + owner_->OnMessageReceived(message); |
| +} |
| + |
| +void ChannelMojo::MessageReader::OnPipeClosed() { |
| + if (!owner_) |
| + return; |
| + owner_->OnPipeClosed(this); |
| + owner_ = NULL; |
| +} |
| + |
| +void ChannelMojo::MessageReader::OnPipeError(MojoResult error) { |
| + if (!owner_) |
| + return; |
| + owner_->OnPipeError(this); |
| +} |
| + |
| +bool ChannelMojo::MessageReader::Send(scoped_ptr<Message> message) { |
| + DCHECK(IsValid()); |
| + |
| + message->TraceMessageBegin(); |
| + std::vector<MojoHandle> handles; |
| +#if defined(OS_POSIX) && !defined(OS_NACL) |
| + if (message->HasFileDescriptors()) { |
| + FileDescriptorSet* fdset = message->file_descriptor_set(); |
| + for (size_t i = 0; i < fdset->size(); ++i) { |
| + MojoHandle wrapped_handle; |
| + MojoResult wrap_result = CreatePlatformHandleWrapper( |
| + mojo::embedder::ScopedPlatformHandle( |
| + mojo::embedder::PlatformHandle( |
| + fdset->GetDescriptorAt(i))), |
| + &wrapped_handle); |
| + if (MOJO_RESULT_OK != wrap_result) { |
| + DLOG(WARNING) << "Pipe failed to wrap handles. Closing: " |
| + << wrap_result; |
| + CloseWithError(wrap_result); |
| + return false; |
| + } |
| + |
| + handles.push_back(wrapped_handle); |
| + } |
| + } |
| +#endif |
| + MojoResult write_result = MojoWriteMessage( |
| + handle(), |
| + message->data(), message->size(), |
| + handles.empty() ? NULL : &handles[0], handles.size(), |
| + MOJO_WRITE_MESSAGE_FLAG_NONE); |
| + if (MOJO_RESULT_OK != write_result) { |
| + CloseWithError(write_result); |
| + return false; |
| + } |
| + |
| + return true; |
| +} |
| + |
| +//------------------------------------------------------------------------------ |
| + |
| +// MessagePipeReader implemenation for control messages. |
| +// Actual message handling is implemented by sublcasses. |
| +class ChannelMojo::ControlReader : public internal::MessagePipeReader { |
| + public: |
| + ControlReader(ChannelMojo* owner) |
| + : owner_(owner) { } |
| + |
| + virtual bool Connect() { return true; } |
| + virtual void OnPipeClosed() OVERRIDE; |
| + virtual void OnPipeError(MojoResult error) OVERRIDE; |
| + |
| + protected: |
| + ChannelMojo* owner_; |
| +}; |
| + |
| +void ChannelMojo::ControlReader::OnPipeClosed() { |
| + if (!owner_) |
| + return; |
| + owner_->OnPipeClosed(this); |
| + owner_ = NULL; |
| +} |
| + |
| +void ChannelMojo::ControlReader::OnPipeError(MojoResult error) { |
| + if (!owner_) |
| + return; |
| + owner_->OnPipeError(this); |
| +} |
| + |
| +//------------------------------------------------------------------------------ |
| + |
| +// ControlReader for server-side ChannelMojo. |
| +class ChannelMojo::ServerControlReader : public ChannelMojo::ControlReader { |
| + public: |
| + ServerControlReader(ChannelMojo* owner) |
| + : ControlReader(owner) { } |
| + |
| + virtual bool Connect() OVERRIDE; |
| + virtual void OnMessageArrived() OVERRIDE; |
| + |
| + private: |
| + MojoResult SendHelloRequest(); |
| + MojoResult RespondHelloResponse(); |
| + |
| + mojo::ScopedMessagePipeHandle message_pipe_; |
| +}; |
| + |
| +bool ChannelMojo::ServerControlReader::Connect() { |
| + MojoResult result = SendHelloRequest(); |
| + if (result != MOJO_RESULT_OK) { |
| + CloseWithError(result); |
| + return false; |
| + } |
| + |
| + return true; |
| +} |
| + |
| +MojoResult ChannelMojo::ServerControlReader::SendHelloRequest() { |
| + DCHECK(IsValid()); |
| + DCHECK(!message_pipe_.is_valid()); |
| + |
| + mojo::ScopedMessagePipeHandle self; |
| + mojo::ScopedMessagePipeHandle peer; |
| + MojoResult create_result = mojo::CreateMessagePipe( |
| + NULL, &message_pipe_, &peer); |
| + if (MOJO_RESULT_OK != create_result) { |
| + DLOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result; |
| + return create_result; |
| + } |
| + |
| + MojoHandle peer_to_send = peer.get().value(); |
| + Pickle request = HelloMessage::CreateRequest(owner_->GetSelfPID()); |
| + MojoResult write_result = MojoWriteMessage( |
| + handle(), |
| + request.data(), request.size(), |
| + &peer_to_send, 1, |
| + MOJO_WRITE_MESSAGE_FLAG_NONE); |
| + if (MOJO_RESULT_OK != write_result) { |
| + DLOG(WARNING) << "Writing Hello request failed: " << create_result; |
| + return write_result; |
| + } |
| + |
| + // |peer| is sent and no longer owned by |this|. |
| + (void)peer.release(); |
| + return MOJO_RESULT_OK; |
| +} |
| + |
| +MojoResult ChannelMojo::ServerControlReader::RespondHelloResponse() { |
| + Pickle request(data_buffer_.empty() ? "" : data_buffer_.data(), |
| + data_buffer_.size()); |
| + |
| + int32 read_pid = 0; |
| + if (!HelloMessage::ReadResponse(request, &read_pid)) { |
| + DLOG(ERROR) << "Failed to parse Hello response."; |
| + return MOJO_RESULT_UNKNOWN; |
| + } |
| + |
| + base::ProcessId pid = static_cast<base::ProcessId>(read_pid); |
| + owner_->set_peer_pid(pid); |
| + owner_->OnConnected(message_pipe_.Pass()); |
| + return MOJO_RESULT_OK; |
| +} |
| + |
| +void ChannelMojo::ServerControlReader::OnMessageArrived() { |
| + MojoResult result = RespondHelloResponse(); |
| + if (result != MOJO_RESULT_OK) |
| + CloseWithError(result); |
| +} |
| + |
| +//------------------------------------------------------------------------------ |
| + |
| +// ControlReader for client-side ChannelMojo. |
| +class ChannelMojo::ClientControlReader : public ChannelMojo::ControlReader { |
| + public: |
| + ClientControlReader(ChannelMojo* owner) |
| + : ControlReader(owner) { } |
| + |
| + virtual void OnMessageArrived() OVERRIDE; |
| + |
| + private: |
| + MojoResult RespondHelloRequest(); |
| +}; |
| + |
| +MojoResult ChannelMojo::ClientControlReader::RespondHelloRequest() { |
| + DCHECK(IsValid()); |
| + |
| + if (handle_buffer_.size() != 1) { |
| + DLOG(ERROR) << "Hello request doesn't contains required handle: " |
| + << handle_buffer_.size(); |
| + return MOJO_RESULT_UNKNOWN; |
| + } |
| + |
| + mojo::ScopedMessagePipeHandle received_pipe( |
| + (mojo::MessagePipeHandle(handle_buffer_[0]))); |
| + |
| + int32 read_request = 0; |
| + Pickle request(data_buffer_.empty() ? "" : data_buffer_.data(), |
| + data_buffer_.size()); |
| + if (!HelloMessage::ReadRequest(request, &read_request)) { |
| + DLOG(ERROR) << "Hello request has wrong magic."; |
| + return MOJO_RESULT_UNKNOWN; |
| + } |
| + |
| + base::ProcessId pid = read_request; |
| + Pickle response = HelloMessage::CreateResponse(owner_->GetSelfPID()); |
| + MojoResult write_result = MojoWriteMessage( |
| + handle(), |
| + response.data(), response.size(), |
| + NULL, 0, |
| + MOJO_WRITE_MESSAGE_FLAG_NONE); |
| + if (MOJO_RESULT_OK != write_result) { |
| + DLOG(ERROR) << "Writing Hello response failed: " << write_result; |
| + return write_result; |
| + } |
| + |
| + owner_->set_peer_pid(pid); |
| + owner_->OnConnected(received_pipe.Pass()); |
| + return MOJO_RESULT_OK; |
| +} |
| + |
| +void ChannelMojo::ClientControlReader::OnMessageArrived() { |
| + MojoResult result = RespondHelloRequest(); |
| + if (result != MOJO_RESULT_OK) { |
| + DLOG(ERROR) << "Failed to respond Hello request. Closing: " |
| + << result; |
| + CloseWithError(result); |
| + } |
| +} |
| + |
| +// static |
| +scoped_ptr<ChannelMojo> ChannelMojo::Create( |
| + scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener, |
| + scoped_refptr<base::TaskRunner> io_thread_task_runner) { |
| + return make_scoped_ptr(new ChannelMojo( |
| + bootstrap.Pass(), mode, listener, io_thread_task_runner)); |
| +} |
| + |
| +// static |
| +scoped_ptr<ChannelMojo> ChannelMojo::Create( |
| + const ChannelHandle &channel_handle, Mode mode, Listener* listener, |
| + scoped_refptr<base::TaskRunner> io_thread_task_runner) { |
| + return Create( |
| + Channel::Create(channel_handle, mode, g_null_listener.Pointer()), |
| + mode, listener, io_thread_task_runner); |
| +} |
| + |
| +// static |
| +scoped_ptr<ChannelBuilder> ChannelMojo::CreateBuilder( |
| + const ChannelHandle &channel_handle, Mode mode, |
| + scoped_refptr<base::TaskRunner> io_thread_task_runner) { |
| + return make_scoped_ptr( |
| + new MojoChannelBuilder( |
| + channel_handle, mode, |
| + io_thread_task_runner)).PassAs<ChannelBuilder>(); |
| +} |
| + |
| +//------------------------------------------------------------------------------ |
| + |
| +ChannelMojo::ChannelMojo( |
| + scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener, |
| + scoped_refptr<base::TaskRunner> io_thread_task_runner) |
| + : weak_factory_(this), |
| + bootstrap_(bootstrap.Pass()), |
| + mode_(mode), listener_(listener), |
| + peer_pid_(base::kNullProcessId) { |
| + DCHECK(mode_ == MODE_SERVER || mode_ == MODE_CLIENT); |
| + mojo::ScopedMessagePipeHandle control_pipe |
| + = mojo::embedder::CreateChannel( |
| + mojo::embedder::ScopedPlatformHandle( |
| + ToPlatformHandle(bootstrap_->TakePipeHandle())), |
| + io_thread_task_runner, |
| + base::Bind(&ChannelMojo::DidCreateChannel, base::Unretained(this)), |
| + io_thread_task_runner); |
| + |
| + // MessagePipeReader, that is crated in InitOnIOThread(), should live only in |
| + // IO thread, but IPC::Channel can be instantiated outside of it. |
| + // So we move the creation to the appropriate thread. |
| + if (base::MessageLoopProxy::current() == io_thread_task_runner) { |
| + InitOnIOThread(control_pipe.Pass()); |
| + } else { |
| + io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&ChannelMojo::InitOnIOThread, |
| + weak_factory_.GetWeakPtr(), |
| + base::Passed(control_pipe.Pass()))); |
| + } |
| +} |
| + |
| +ChannelMojo::~ChannelMojo() { |
| + Close(); |
| +} |
| + |
| +void ChannelMojo::InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe) { |
| + control_reader_ = CreateControlReader(); |
| + control_reader_->SetPipe(control_pipe.Pass()); |
| +} |
| + |
| +scoped_ptr<ChannelMojo::ControlReader> ChannelMojo::CreateControlReader() { |
| + if (MODE_SERVER == mode_) { |
| + return make_scoped_ptr( |
| + new ServerControlReader(this)).PassAs<ControlReader>(); |
| + } |
| + |
| + DCHECK(mode_ == MODE_CLIENT); |
| + return make_scoped_ptr( |
| + new ClientControlReader(this)).PassAs<ControlReader>(); |
| +} |
| + |
| +bool ChannelMojo::Connect() { |
| + DCHECK(!message_reader_); |
| + return control_reader_->Connect(); |
| +} |
| + |
| +void ChannelMojo::Close() { |
| + control_reader_.reset(); |
| + message_reader_.reset(); |
| +} |
| + |
| +void ChannelMojo::OnConnected(mojo::ScopedMessagePipeHandle pipe) { |
| + message_reader_ = make_scoped_ptr(new MessageReader(this)); |
| + message_reader_->SetPipe(pipe.Pass()); |
| + |
| + for (size_t i = 0; i < pending_messages_.size(); ++i) { |
| + message_reader_->Send(make_scoped_ptr(pending_messages_[i])); |
| + pending_messages_[i] = NULL; |
| + } |
| + |
| + pending_messages_.clear(); |
| + |
| + listener_->OnChannelConnected(GetPeerPID()); |
| +} |
| + |
| +void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) { |
| + Close(); |
| +} |
| + |
| +void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) { |
| + listener_->OnChannelError(); |
| +} |
| + |
| + |
| +bool ChannelMojo::Send(Message* message) { |
| + if (!message_reader_) { |
| + pending_messages_.push_back(message); |
| + return true; |
| + } |
| + |
| + return message_reader_->Send(make_scoped_ptr(message)); |
| +} |
| + |
| +base::ProcessId ChannelMojo::GetPeerPID() const { |
| + return peer_pid_; |
| +} |
| + |
| +base::ProcessId ChannelMojo::GetSelfPID() const { |
| + return bootstrap_->GetSelfPID(); |
| +} |
| + |
| +ChannelHandle ChannelMojo::TakePipeHandle() { |
| + return bootstrap_->TakePipeHandle(); |
| +} |
| + |
| +void ChannelMojo::DidCreateChannel(mojo::embedder::ChannelInfo*) { |
| + // TODO(morrita): I'm not sure what should be done here. |
| + // Apparently there is no way to delete ChannelInfo and it will leak. |
|
viettrungluu
2014/07/17 15:11:54
See DestroyChannelOnIOThread(), which for some rea
Hajime Morrita
2014/07/17 18:32:12
Oh I see. Problem solved!
|
| + // Mojo Embedder API needs some way to release it. |
| +} |
| + |
| +void ChannelMojo::OnMessageReceived(Message& message) { |
| + listener_->OnMessageReceived(message); |
| + if (message.dispatch_error()) |
| + listener_->OnBadMessageReceived(message); |
| +} |
| + |
| +#if defined(OS_POSIX) && !defined(OS_NACL) |
| +int ChannelMojo::GetClientFileDescriptor() const { |
| + return bootstrap_->GetClientFileDescriptor(); |
| +} |
| + |
| +int ChannelMojo::TakeClientFileDescriptor() { |
| + return bootstrap_->TakeClientFileDescriptor(); |
| +} |
| +#endif // defined(OS_POSIX) && !defined(OS_NACL) |
| + |
| +} // namespace IPC |