Chromium Code Reviews| Index: ipc/ipc_channel_nacl.cc |
| diff --git a/ipc/ipc_channel_nacl.cc b/ipc/ipc_channel_nacl.cc |
| index 683353edcde2b05b569ea54523fbc5a4ead4b082..5ad6ac4d569f19cb7b8807dde3f5847ffa7e76f5 100644 |
| --- a/ipc/ipc_channel_nacl.cc |
| +++ b/ipc/ipc_channel_nacl.cc |
| @@ -4,18 +4,123 @@ |
| #include "ipc/ipc_channel_nacl.h" |
| +#include <errno.h> |
| +#include <stddef.h> |
| +#include <sys/types.h> |
| + |
| +#include <algorithm> |
| + |
| +#include "base/bind.h" |
| #include "base/file_util.h" |
| #include "base/logging.h" |
| - |
| -// This file is currently a stub to get us linking. |
| -// TODO(brettw) implement this. |
| +#include "base/message_loop_proxy.h" |
| +#include "base/process_util.h" |
| +#include "base/synchronization/lock.h" |
| +#include "base/task_runner_util.h" |
| +#include "base/threading/simple_thread.h" |
| +#include "ipc/file_descriptor_set_posix.h" |
| +#include "ipc/ipc_logging.h" |
| +#include "sys/nacl_imc_api.h" |
|
Mark Seaborn
2012/05/01 22:07:09
These "sys/nacl_*" headers are "system" headers fo
|
| +#include "sys/nacl_syscalls.h" |
| namespace IPC { |
| +namespace { |
|
brettw
2012/05/01 22:29:48
Style nit: blank lines around this and the closing
|
| +scoped_ptr<std::vector<char> > ReadDataOnReaderThread(int pipe) { |
| + DCHECK(pipe >= 0); |
| + scoped_ptr<std::vector<char> > null_ptr; |
| + |
| + if (pipe < 0) |
| + return null_ptr.Pass(); |
| + |
| + CHECK(Channel::kReadBufferSize); |
|
brettw
2012/05/01 22:29:48
I'm not sure why you're asserting on this. I'd jus
dmichael (off chromium)
2012/05/03 17:06:35
That was there since I dereference buffer->at(0).
|
| + scoped_ptr<std::vector<char> > buffer( |
| + new std::vector<char>(Channel::kReadBufferSize)); |
| + struct NaClImcMsgHdr msg = {0}; |
| + struct NaClImcMsgIoVec iov = {&buffer->at(0), buffer->size()}; |
| + msg.iov = &iov; |
| + msg.iov_length = 1; |
| + |
| + int bytes_read = imc_recvmsg(pipe, &msg, 0); |
| + while (bytes_read < 0 && errno == EAGAIN) |
|
Mark Seaborn
2012/05/01 22:07:09
You don't need to handle EAGAIN. I assume your Ch
brettw
2012/05/01 22:29:48
Right, we can probably not bother with EAGAIN for
dmichael (off chromium)
2012/05/03 17:06:35
My thinking was that I don't know for sure yet if
|
| + bytes_read = imc_recvmsg(pipe, &msg, 0); |
| + |
| + if (bytes_read < 0) { |
| + if (errno == ECONNRESET || errno == EPIPE) { |
|
Mark Seaborn
2012/05/01 22:07:09
Ditto. It's up to Chrome what errors this NaCl de
dmichael (off chromium)
2012/05/03 17:06:35
Yeah. I think I'll leave this here for now while I
|
| + return null_ptr.Pass(); |
| + } else { |
| + PLOG(ERROR) << "pipe error (" << pipe << ")"; |
| + return null_ptr.Pass(); |
| + } |
| + } else if (bytes_read == 0) { |
| + // The pipe has closed... |
| + return null_ptr.Pass(); |
| + } |
| + DCHECK(bytes_read); |
| + buffer->resize(bytes_read); |
| + return buffer.Pass(); |
| +} |
| +} // namespace |
| + |
| +class Channel::ChannelImpl::ReaderThreadRunner |
| + : public base::DelegateSimpleThread::Delegate { |
| + public: |
| + // |pipe|: A file descriptor from which we will read using imc_recvmsg. |
| + // |data_read_callback|: A callback we invoke (on the main thread) when we |
| + // have read data. The callback is passed a buffer of |
| + // data that was read. |
| + // |failure_callback|: A callback we invoke when we have a failure reading |
| + // from |pipe|. |
| + // |main_message_loop|: A proxy for the main thread, where we will invoke the |
| + // above callbacks. |
| + ReaderThreadRunner( |
| + int pipe, |
| + base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback, |
| + base::Callback<void ()> failure_callback, |
| + base::MessageLoopProxy* main_message_loop); |
| + |
| + // DelegateSimpleThread implementation. Reads data from the pipe in a loop |
| + // until either we are told to quit or a read fails. |
| + virtual void Run() OVERRIDE; |
| + |
| + // Tell the thread to quit (thread-safe). |
| + void MakeQuit(); |
| + private: |
|
brettw
2012/05/02 18:18:56
Blank line before this.
|
| + int pipe_; |
| + base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback_; |
| + base::Callback<void ()> failure_callback_; |
| + base::MessageLoopProxy* main_message_loop_; |
| + bool should_quit_; |
| + base::Lock quit_lock_; |
| + DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner); |
|
brettw
2012/05/02 18:18:56
Blank line before this.
|
| +}; |
| Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, |
| - Mode mode, |
| - Listener* listener) |
| - : ChannelReader(listener) { |
| + Mode mode, |
| + Listener* listener) |
| + : ChannelReader(listener), |
| + mode_(mode), |
| + peer_pid_(base::kNullProcessId), |
| + waiting_connect_(true), |
| + pipe_(-1), |
| + pipe_name_(channel_handle.name), |
| + weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
| + if (!CreatePipe(channel_handle)) { |
| + // The pipe may have been closed already. |
| + const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client"; |
| + LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name |
| + << "\" in " << modestr << " mode"; |
| + } |
| + reader_thread_runner_.reset( |
| + new ReaderThreadRunner( |
| + pipe_, |
| + base::Bind(&Channel::ChannelImpl::DidRecvMsg, |
| + weak_ptr_factory_.GetWeakPtr()), |
| + base::Bind(&Channel::ChannelImpl::ReadDidFail, |
| + weak_ptr_factory_.GetWeakPtr()), |
| + base::MessageLoopProxy::current())); |
| + reader_thread_.reset( |
| + new base::DelegateSimpleThread(reader_thread_runner_.get(), |
| + "ipc_channel_nacl reader thread")); |
| } |
| Channel::ChannelImpl::~ChannelImpl() { |
| @@ -23,69 +128,247 @@ Channel::ChannelImpl::~ChannelImpl() { |
| } |
| bool Channel::ChannelImpl::Connect() { |
| - NOTIMPLEMENTED(); |
| - return false; |
| + if (pipe_ == -1) { |
| + DLOG(INFO) << "Channel creation failed: " << pipe_name_; |
| + return false; |
| + } |
| + |
| + reader_thread_->Start(); |
| + if (mode_ & MODE_CLIENT_FLAG) { |
| + // If we are a client we want to send a hello message out immediately. |
| + // In server mode we will send a hello message when we receive one from a |
| + // client. |
| + waiting_connect_ = false; |
| + QueueHelloMessage(); |
| + } else if (mode_ & MODE_SERVER_FLAG) { |
| + waiting_connect_ = true; |
| + return true; |
| + } else { |
| + NOTREACHED(); |
| + return false; |
| + } |
| } |
| void Channel::ChannelImpl::Close() { |
| - NOTIMPLEMENTED(); |
| + reader_thread_runner_->MakeQuit(); |
| + reader_thread_->Join(); |
| + pipe_ = -1; |
| + reader_thread_runner_.reset(); |
| + reader_thread_.reset(); |
| + read_queue_.clear(); |
| + output_queue_.clear(); |
| + // TODO(dmichael): Should we be opening & closing the virtual FD, or can we |
| + // rely on the trusted side to manage it? |
| } |
| bool Channel::ChannelImpl::Send(Message* message) { |
| - NOTIMPLEMENTED(); |
| + DVLOG(2) << "sending message @" << message << " on channel @" << this |
| + << " with type " << message->type(); |
| + scoped_ptr<Message> message_ptr(message); |
| + |
| +#ifdef IPC_MESSAGE_LOG_ENABLED |
| + Logging::GetInstance()->OnSendMessage(message, ""); |
| +#endif // IPC_MESSAGE_LOG_ENABLED |
| + |
| + output_queue_.push_back(linked_ptr<Message>(message)); |
| + if (!waiting_connect_) |
| + return ProcessOutgoingMessages(); |
| + |
| + return true; |
| } |
| -int Channel::ChannelImpl::GetClientFileDescriptor() const { |
| - NOTIMPLEMENTED(); |
| - return -1; |
| +void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<std::vector<char> > buffer) { |
| + // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from |
| + // the reader thread after Close is called. If so, we ignore it. |
| + if (pipe_ == -1) |
| + return; |
| + bool first_message_for_connection = waiting_connect_; |
| + if (waiting_connect_) { |
| + // In client mode, we should have already cleared waiting_connect_. |
| + DCHECK(mode_ & MODE_SERVER_FLAG); |
| + waiting_connect_ = false; |
| + } |
| + |
| + read_queue_.push_back(linked_ptr<std::vector<char> >(buffer.release())); |
| + |
| + // If this was the first message we received, and we're a server, there is a |
| + // hello message (and possibly others) in the queue waiting to be sent. |
| + if (first_message_for_connection && (mode_ & MODE_SERVER_FLAG)) |
| + ProcessOutgoingMessages(); |
| } |
| -int Channel::ChannelImpl::TakeClientFileDescriptor() { |
| - NOTIMPLEMENTED(); |
| - return -1; |
| +void Channel::ChannelImpl::ReadDidFail() { |
| + Close(); |
| } |
| -bool Channel::ChannelImpl::AcceptsConnections() const { |
| - NOTIMPLEMENTED(); |
| - return false; |
| +Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner( |
|
brettw
2012/05/02 18:18:56
This is confusing. Can you group all the ReaderThr
dmichael (off chromium)
2012/05/03 17:06:35
Yeah, I had it this way because originally I had R
|
| + int pipe, |
| + base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback, |
| + base::Callback<void ()> failure_callback, |
| + base::MessageLoopProxy* main_message_loop) |
| + : pipe_(pipe), |
| + data_read_callback_(data_read_callback), |
| + failure_callback_(failure_callback), |
| + main_message_loop_(main_message_loop), |
| + should_quit_(false) { |
| } |
| -bool Channel::ChannelImpl::HasAcceptedConnection() const { |
| - NOTIMPLEMENTED(); |
| - return false; |
| +void Channel::ChannelImpl::ReaderThreadRunner::Run() { |
| + while (true) { |
| + scoped_ptr<std::vector<char> > buffer(ReadDataOnReaderThread(pipe_)); |
| + quit_lock_.Acquire(); |
|
brettw
2012/05/02 18:18:56
I don't see how this quitting works. Normally this
dmichael (off chromium)
2012/05/03 17:06:35
You're right, that makes more sense. Done.
|
| + if (should_quit_) |
| + return; |
| + quit_lock_.Release(); |
| + |
| + if (buffer.get()) { |
| + main_message_loop_->PostTask(FROM_HERE, |
| + base::Bind(data_read_callback_, base::Passed(buffer.Pass()))); |
| + } else { |
| + main_message_loop_->PostTask(FROM_HERE, failure_callback_); |
| + // Because the read failed, we know we're going to quit. Don't bother |
| + // trying to read again. |
| + return; |
| + } |
| + } |
| } |
| -bool Channel::ChannelImpl::GetClientEuid(uid_t* client_euid) const { |
| - NOTIMPLEMENTED(); |
| - return false; |
| +void Channel::ChannelImpl::ReaderThreadRunner::MakeQuit() { |
| + base::AutoLock lock(quit_lock_); |
| + should_quit_ = true; |
| } |
| -void Channel::ChannelImpl::ResetToAcceptingConnectionState() { |
| - NOTIMPLEMENTED(); |
| +bool Channel::ChannelImpl::CreatePipe( |
| + const IPC::ChannelHandle& channel_handle) { |
| + DCHECK(pipe_ == -1); |
| + |
| + // There's one possible case in NaCl: |
| + // 1) It's a channel wrapping a pipe that is given to us. |
| + // We don't support these: |
| + // 2) It's for a named channel. |
| + // 3) It's for a client that we implement ourself. |
| + // 4) It's the initial IPC channel. |
| + |
| + if (channel_handle.socket.fd == -1) { |
| + NOTIMPLEMENTED(); |
| + return false; |
| + } |
| + pipe_ = channel_handle.socket.fd; |
| + return true; |
| } |
| -Channel::ChannelImpl::ReadState |
| - Channel::ChannelImpl::ReadData(char* buffer, |
| - int buffer_len, |
| - int* bytes_read) { |
| - return Channel::ChannelImpl::ReadState(); |
| +bool Channel::ChannelImpl::ProcessOutgoingMessages() { |
| + DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
| + // no connection? |
| + if (output_queue_.empty()) |
| + return true; |
| + |
| + if (pipe_ == -1) |
| + return false; |
| + |
| + // Write out all the messages. The trusted implementation is guaranteed to not |
| + // block. See the Chrome-side implementation of NaClCustomDesc |
| + // TODO(dmichael): Update this comment to be more specific. |
| + while (!output_queue_.empty()) { |
| + linked_ptr<Message> msg = output_queue_.front(); |
| + output_queue_.pop_front(); |
| + |
| + struct NaClImcMsgHdr msgh = {0}; |
| + struct NaClImcMsgIoVec iov = {const_cast<void*>(msg->data()), msg->size()}; |
| + msgh.iov = &iov; |
| + msgh.iov_length = 1; |
| + // imc_sendmsg is implemented by <TODO(dmichael), reference class here>, |
| + // and is guaranteed to not block. It is also assumed that each message |
| + // sent via imc_sendmsg represents a complete IPC::Message (no more no |
| + // less), so if you update this code to do something different, you must |
| + // also update <TODO(dmichael), reference class here>. |
| + ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0); |
| + |
| + if (bytes_written < 0) { |
| + // The trusted side should only ever give us an error of EPIPE. We |
| + // should never be interrupted, nor should we get EAGAIN. |
| + DCHECK(errno == EPIPE); |
| + Close(); |
| + PLOG(ERROR) << "pipe_ error on " |
| + << pipe_ |
| + << " Currently writing message of size: " |
| + << msg->size(); |
| + return false; |
| + } |
| + |
| + // Message sent OK! |
| + DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type() |
| + << " on fd " << pipe_; |
| + } |
| + return true; |
| +} |
| + |
| +int Channel::ChannelImpl::GetHelloMessageProcId() { |
| + // TODO(dmichael): Is this right in NaCl? |
|
Mark Seaborn
2012/05/01 22:07:09
Nope. NaCl doesn't expose process IDs.
Well, "sh
dmichael (off chromium)
2012/05/03 17:06:35
Do you have a bug to track that? I would think tha
|
| + int pid = base::GetCurrentProcId(); |
| + return pid; |
| +} |
| + |
| +void Channel::ChannelImpl::QueueHelloMessage() { |
| + // Create the Hello message |
| + scoped_ptr<Message> msg(new Message(MSG_ROUTING_NONE, |
| + HELLO_MESSAGE_TYPE, |
| + IPC::Message::PRIORITY_NORMAL)); |
| + if (!msg->WriteInt(GetHelloMessageProcId())) { |
| + NOTREACHED() << "Unable to pickle hello message proc id"; |
| + return; |
| + } |
| + Send(msg.release()); |
| +} |
| + |
| +Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( |
| + char* buffer, |
| + int buffer_len, |
| + int* bytes_read) { |
| + *bytes_read = 0; |
| + if (pipe_ == -1) |
| + return READ_FAILED; |
| + if (read_queue_.empty()) |
| + return READ_PENDING; |
| + while (!read_queue_.empty() && *bytes_read < buffer_len) { |
| + linked_ptr<std::vector<char> > vec(read_queue_.front()); |
| + int bytes_to_read = buffer_len - *bytes_read; |
| + if (vec->size() <= bytes_to_read) { |
| + // We can read and discard the entire vector. |
| + std::copy(vec->begin(), vec->end(), buffer + *bytes_read); |
| + *bytes_read += vec->size(); |
| + read_queue_.pop_front(); |
| + } else { |
| + // Read all the bytes we can and discard them from the front of the |
| + // vector. (This can be slowish, since erase has to move the back of the |
| + // vector to the front, but it's hopefully a temporary hack and it keeps |
| + // the code simple). |
| + std::copy(vec->begin(), vec->begin() + bytes_to_read, |
| + buffer + *bytes_read); |
| + vec->erase(vec->begin(), vec->begin() + bytes_to_read); |
| + *bytes_read += bytes_to_read; |
| + } |
| + } |
| + return READ_SUCCEEDED; |
| } |
| bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { |
| - return false; |
| + return true; |
| } |
| bool Channel::ChannelImpl::DidEmptyInputBuffers() { |
| - return false; |
| + return true; |
| } |
| void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { |
| -} |
| - |
| -// static |
| -bool Channel::ChannelImpl::IsNamedServerInitialized( |
| - const std::string& channel_id) { |
| - return false; //file_util::PathExists(FilePath(channel_id)); |
| + // The Hello message contains only the process id. |
| + PickleIterator iter(msg); |
| + int pid; |
| + if (!msg.ReadInt(&iter, &pid)) |
| + NOTREACHED(); |
| + peer_pid_ = pid; |
| + QueueHelloMessage(); |
| + listener()->OnChannelConnected(pid); |
| } |
| //------------------------------------------------------------------------------ |
| @@ -117,37 +400,6 @@ bool Channel::Send(Message* message) { |
| return channel_impl_->Send(message); |
| } |
| -int Channel::GetClientFileDescriptor() const { |
| - return channel_impl_->GetClientFileDescriptor(); |
| -} |
| - |
| -int Channel::TakeClientFileDescriptor() { |
| - return channel_impl_->TakeClientFileDescriptor(); |
| -} |
| - |
| -bool Channel::AcceptsConnections() const { |
| - return channel_impl_->AcceptsConnections(); |
| -} |
| - |
| -bool Channel::HasAcceptedConnection() const { |
| - return channel_impl_->HasAcceptedConnection(); |
| -} |
| - |
| -bool Channel::GetClientEuid(uid_t* client_euid) const { |
| - return channel_impl_->GetClientEuid(client_euid); |
| -} |
| - |
| -void Channel::ResetToAcceptingConnectionState() { |
| - channel_impl_->ResetToAcceptingConnectionState(); |
| -} |
| - |
| -base::ProcessId Channel::peer_pid() const { return 0; } |
| - |
| -// static |
| -bool Channel::IsNamedServerInitialized(const std::string& channel_id) { |
| - return ChannelImpl::IsNamedServerInitialized(channel_id); |
| -} |
| - |
| // static |
| std::string Channel::GenerateVerifiedChannelID(const std::string& prefix) { |
| // A random name is sufficient validation on posix systems, so we don't need |