| Index: mojo/edk/system/channel_nacl.cc
|
| diff --git a/mojo/edk/system/channel_nacl.cc b/mojo/edk/system/channel_nacl.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..a846af9479e8b6ccdb55193a4ffe18a40b69dea2
|
| --- /dev/null
|
| +++ b/mojo/edk/system/channel_nacl.cc
|
| @@ -0,0 +1,272 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/edk/system/channel.h"
|
| +
|
| +#include <errno.h>
|
| +
|
| +#include <algorithm>
|
| +#include <deque>
|
| +#include <limits>
|
| +#include <memory>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/macros.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/task_runner.h"
|
| +#include "mojo/edk/embedder/platform_handle_vector.h"
|
| +
|
| +#include "base/threading/simple_thread.h"
|
| +#include "native_client/src/public/imc_syscalls.h"
|
| +#include "native_client/src/public/imc_types.h"
|
| +
|
| +namespace mojo {
|
| +namespace edk {
|
| +
|
| +namespace {
|
| +
|
| +const size_t kReadBufferSize = 4 * 1024;
|
| +
|
| +// Very simple message structure. Intentionally optimised for simplicity over
|
| +// performance.
|
| +struct SimpleMessage {
|
| + std::string data;
|
| + std::vector<int> fds;
|
| +};
|
| +
|
| +class ChannelNacl : public Channel {
|
| + public:
|
| + ChannelNacl(Delegate* delegate,
|
| + ScopedPlatformHandle handle,
|
| + scoped_refptr<base::TaskRunner> io_task_runner)
|
| + : Channel(delegate),
|
| + self_(this),
|
| + handle_(std::move(handle)),
|
| + io_task_runner_(io_task_runner) {
|
| + reader_thread_.reset(
|
| + new ReaderThread(this, handle.get().handle, io_task_runner_));
|
| + }
|
| +
|
| + void Start() override {
|
| + reader_thread_->Start();
|
| + }
|
| +
|
| + void ShutDownImpl() override {
|
| + // Always shut down asynchronously when called through the public interface.
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(&ChannelNacl::ShutDownOnIOThread, this));
|
| + }
|
| +
|
| + void Write(MessagePtr message) override {
|
| + bool write_error = false;
|
| + {
|
| + base::AutoLock lock(write_lock_);
|
| + if (reject_writes_)
|
| + return;
|
| +
|
| + // For simplicity, do blocking sends.
|
| + ScopedPlatformHandleVectorPtr handles =
|
| + message->TakeHandlesForTransport();
|
| + std::unique_ptr<int[]> fds(new int[handles->size()]);
|
| + for (size_t i = 0; i < handles->size(); i++)
|
| + fds[i] = handles->at(i).handle;
|
| +
|
| + NaClAbiNaClImcMsgIoVec iov = {
|
| + const_cast<void*>(message->data()), message->data_num_bytes()
|
| + };
|
| + NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds.get(), handles->size() };
|
| + ssize_t bytes_written = imc_sendmsg(handle_.get().handle, &msgh, 0);
|
| + DCHECK(bytes_written);
|
| +
|
| + if (bytes_written < 0 ||
|
| + static_cast<size_t>(bytes_written) != message->data_num_bytes()) {
|
| + write_error = true;
|
| + }
|
| + }
|
| + if (write_error) {
|
| + // Do not synchronously invoke OnError(). Write() may have been called by
|
| + // the delegate and we don't want to re-enter it.
|
| + io_task_runner_->PostTask(FROM_HERE,
|
| + base::Bind(&ChannelNacl::OnError, this));
|
| + }
|
| + }
|
| +
|
| + bool GetReadPlatformHandles(
|
| + size_t num_handles,
|
| + const void* extra_header,
|
| + size_t extra_header_size,
|
| + ScopedPlatformHandleVectorPtr* handles) override {
|
| + if (num_handles > std::numeric_limits<uint16_t>::max())
|
| + return false;
|
| + if (incoming_platform_handles_.size() < num_handles) {
|
| + handles->reset();
|
| + return true;
|
| + }
|
| +
|
| + handles->reset(new PlatformHandleVector(num_handles));
|
| + for (size_t i = 0; i < num_handles; ++i) {
|
| + (*handles)->at(i) = incoming_platform_handles_.front();
|
| + incoming_platform_handles_.pop_front();
|
| + }
|
| +
|
| + return true;
|
| + }
|
| +
|
| + private:
|
| + class ReaderThread : public base::SimpleThread {
|
| + public:
|
| + ReaderThread(scoped_refptr<ChannelNacl> channel_nacl, int channel_fd,
|
| + scoped_refptr<base::TaskRunner> io_task_runner)
|
| + : base::SimpleThread("Mojo_ChannelNacl_ReaderThread"),
|
| + channel_nacl_(channel_nacl),
|
| + channel_fd_(channel_fd),
|
| + io_task_runner_(io_task_runner) {
|
| + DCHECK(channel_fd_ != -1);
|
| + }
|
| +
|
| + ~ReaderThread() override {
|
| + Stop();
|
| + }
|
| +
|
| + void Run() override {
|
| + while (true) {
|
| + {
|
| + base::AutoLock l(lock_);
|
| + if (shutdown_)
|
| + return;
|
| + }
|
| +
|
| + SimpleMessage message;
|
| + message.data.resize(kReadBufferSize);
|
| + message.fds.resize(NACL_ABI_IMC_DESC_MAX);
|
| +
|
| + NaClAbiNaClImcMsgIoVec iov = {
|
| + &message.data[0], message.data.size()
|
| + };
|
| + NaClAbiNaClImcMsgHdr msg = {
|
| + &iov, 1, message.fds.data(), message.fds.size()
|
| + };
|
| +
|
| + int bytes_read = imc_recvmsg(channel_fd_, &msg, 0);
|
| + if (bytes_read < 0) {
|
| + io_task_runner_->PostTask(FROM_HERE,
|
| + base::Bind(&ChannelNacl::OnError,
|
| + channel_nacl_));
|
| + return;
|
| + }
|
| +
|
| + message.data.resize(bytes_read);
|
| + message.fds.resize(msg.desc_length);
|
| +
|
| + io_task_runner_->PostTask(FROM_HERE,
|
| + base::Bind(&ChannelNacl::OnDataReceived,
|
| + channel_nacl_,
|
| + base::Passed(&message)));
|
| + }
|
| + }
|
| +
|
| + void Stop() {
|
| + if (HasBeenJoined())
|
| + return;
|
| +
|
| + {
|
| + base::AutoLock l(lock_);
|
| + shutdown_ = true;
|
| + }
|
| +
|
| + // Signals the thread to wake up.
|
| + close(channel_fd_);
|
| + Join();
|
| +
|
| + channel_fd_ = -1;
|
| + }
|
| +
|
| + private:
|
| + scoped_refptr<ChannelNacl> channel_nacl_;
|
| + int channel_fd_;
|
| + scoped_refptr<base::TaskRunner> io_task_runner_;
|
| +
|
| + base::Lock lock_;
|
| + bool shutdown_ = false;
|
| + };
|
| +
|
| + ~ChannelNacl() override {
|
| + for (auto handle : incoming_platform_handles_)
|
| + handle.CloseIfNecessary();
|
| + }
|
| +
|
| + void ShutDownOnIOThread() {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| + reader_thread_->Stop();
|
| +
|
| + // May destroy the |this| if it was the last reference.
|
| + self_ = nullptr;
|
| + }
|
| +
|
| + void OnDataReceived(SimpleMessage message) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + for (int fd : message.fds) {
|
| + incoming_platform_handles_.push_back(PlatformHandle(fd));
|
| + }
|
| +
|
| + bool read_error = false;
|
| + size_t next_read_size = 0;
|
| + size_t buffer_capacity = 0;
|
| + size_t total_bytes_read = 0;
|
| + do {
|
| + buffer_capacity = next_read_size;
|
| + char* buffer = GetReadBuffer(&buffer_capacity);
|
| + DCHECK_GT(buffer_capacity, 0u);
|
| +
|
| + size_t bytes_copied = std::min(message.data.size() - total_bytes_read,
|
| + buffer_capacity);
|
| + memcpy(buffer, message.data.data() + total_bytes_read, bytes_copied);
|
| +
|
| + total_bytes_read += bytes_copied;
|
| + if (!OnReadComplete(bytes_copied, &next_read_size)) {
|
| + read_error = true;
|
| + break;
|
| + }
|
| + } while (total_bytes_read < message.data.size() && next_read_size > 0);
|
| + if (read_error) {
|
| + // Stop reading. Note, there may be a pending read in the message loop.
|
| + reader_thread_->Stop();
|
| +
|
| + OnError();
|
| + }
|
| + }
|
| +
|
| + // Keeps the Channel alive at least until explicit shutdown on the IO thread.
|
| + scoped_refptr<Channel> self_;
|
| +
|
| + ScopedPlatformHandle handle_;
|
| + scoped_refptr<base::TaskRunner> io_task_runner_;
|
| +
|
| + std::unique_ptr<ReaderThread> reader_thread_;
|
| +
|
| + std::deque<PlatformHandle> incoming_platform_handles_;
|
| +
|
| + // Protects |pending_write_| and |outgoing_messages_|.
|
| + base::Lock write_lock_;
|
| + bool reject_writes_ = false;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ChannelNacl);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| +// static
|
| +scoped_refptr<Channel> Channel::Create(
|
| + Delegate* delegate,
|
| + ScopedPlatformHandle platform_handle,
|
| + scoped_refptr<base::TaskRunner> io_task_runner) {
|
| + return new ChannelNacl(delegate, std::move(platform_handle), io_task_runner);
|
| +}
|
| +
|
| +} // namespace edk
|
| +} // namespace mojo
|
|
|