Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1175)

Unified Diff: mojo/edk/system/channel_nacl.cc

Issue 2025763002: Use ChannelMojo in Pepper and NaCl processes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-utility-channel-mojo
Patch Set: iujbhirtughfbnjrthiubj Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/BUILD.gn ('k') | ppapi/nacl_irt/irt_start.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/channel_nacl.cc
diff --git a/mojo/edk/system/channel_nacl.cc b/mojo/edk/system/channel_nacl.cc
new file mode 100644
index 0000000000000000000000000000000000000000..a846af9479e8b6ccdb55193a4ffe18a40b69dea2
--- /dev/null
+++ b/mojo/edk/system/channel_nacl.cc
@@ -0,0 +1,272 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/channel.h"
+
+#include <errno.h>
+
+#include <algorithm>
+#include <deque>
+#include <limits>
+#include <memory>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/lock.h"
+#include "base/task_runner.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+
+#include "base/threading/simple_thread.h"
+#include "native_client/src/public/imc_syscalls.h"
+#include "native_client/src/public/imc_types.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+const size_t kReadBufferSize = 4 * 1024;
+
+// Very simple message structure. Intentionally optimised for simplicity over
+// performance.
+struct SimpleMessage {
+ std::string data;
+ std::vector<int> fds;
+};
+
+class ChannelNacl : public Channel {
+ public:
+ ChannelNacl(Delegate* delegate,
+ ScopedPlatformHandle handle,
+ scoped_refptr<base::TaskRunner> io_task_runner)
+ : Channel(delegate),
+ self_(this),
+ handle_(std::move(handle)),
+ io_task_runner_(io_task_runner) {
+ reader_thread_.reset(
+ new ReaderThread(this, handle.get().handle, io_task_runner_));
+ }
+
+ void Start() override {
+ reader_thread_->Start();
+ }
+
+ void ShutDownImpl() override {
+ // Always shut down asynchronously when called through the public interface.
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&ChannelNacl::ShutDownOnIOThread, this));
+ }
+
+ void Write(MessagePtr message) override {
+ bool write_error = false;
+ {
+ base::AutoLock lock(write_lock_);
+ if (reject_writes_)
+ return;
+
+ // For simplicity, do blocking sends.
+ ScopedPlatformHandleVectorPtr handles =
+ message->TakeHandlesForTransport();
+ std::unique_ptr<int[]> fds(new int[handles->size()]);
+ for (size_t i = 0; i < handles->size(); i++)
+ fds[i] = handles->at(i).handle;
+
+ NaClAbiNaClImcMsgIoVec iov = {
+ const_cast<void*>(message->data()), message->data_num_bytes()
+ };
+ NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds.get(), handles->size() };
+ ssize_t bytes_written = imc_sendmsg(handle_.get().handle, &msgh, 0);
+ DCHECK(bytes_written);
+
+ if (bytes_written < 0 ||
+ static_cast<size_t>(bytes_written) != message->data_num_bytes()) {
+ write_error = true;
+ }
+ }
+ if (write_error) {
+ // Do not synchronously invoke OnError(). Write() may have been called by
+ // the delegate and we don't want to re-enter it.
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&ChannelNacl::OnError, this));
+ }
+ }
+
+ bool GetReadPlatformHandles(
+ size_t num_handles,
+ const void* extra_header,
+ size_t extra_header_size,
+ ScopedPlatformHandleVectorPtr* handles) override {
+ if (num_handles > std::numeric_limits<uint16_t>::max())
+ return false;
+ if (incoming_platform_handles_.size() < num_handles) {
+ handles->reset();
+ return true;
+ }
+
+ handles->reset(new PlatformHandleVector(num_handles));
+ for (size_t i = 0; i < num_handles; ++i) {
+ (*handles)->at(i) = incoming_platform_handles_.front();
+ incoming_platform_handles_.pop_front();
+ }
+
+ return true;
+ }
+
+ private:
+ class ReaderThread : public base::SimpleThread {
+ public:
+ ReaderThread(scoped_refptr<ChannelNacl> channel_nacl, int channel_fd,
+ scoped_refptr<base::TaskRunner> io_task_runner)
+ : base::SimpleThread("Mojo_ChannelNacl_ReaderThread"),
+ channel_nacl_(channel_nacl),
+ channel_fd_(channel_fd),
+ io_task_runner_(io_task_runner) {
+ DCHECK(channel_fd_ != -1);
+ }
+
+ ~ReaderThread() override {
+ Stop();
+ }
+
+ void Run() override {
+ while (true) {
+ {
+ base::AutoLock l(lock_);
+ if (shutdown_)
+ return;
+ }
+
+ SimpleMessage message;
+ message.data.resize(kReadBufferSize);
+ message.fds.resize(NACL_ABI_IMC_DESC_MAX);
+
+ NaClAbiNaClImcMsgIoVec iov = {
+ &message.data[0], message.data.size()
+ };
+ NaClAbiNaClImcMsgHdr msg = {
+ &iov, 1, message.fds.data(), message.fds.size()
+ };
+
+ int bytes_read = imc_recvmsg(channel_fd_, &msg, 0);
+ if (bytes_read < 0) {
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&ChannelNacl::OnError,
+ channel_nacl_));
+ return;
+ }
+
+ message.data.resize(bytes_read);
+ message.fds.resize(msg.desc_length);
+
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&ChannelNacl::OnDataReceived,
+ channel_nacl_,
+ base::Passed(&message)));
+ }
+ }
+
+ void Stop() {
+ if (HasBeenJoined())
+ return;
+
+ {
+ base::AutoLock l(lock_);
+ shutdown_ = true;
+ }
+
+ // Signals the thread to wake up.
+ close(channel_fd_);
+ Join();
+
+ channel_fd_ = -1;
+ }
+
+ private:
+ scoped_refptr<ChannelNacl> channel_nacl_;
+ int channel_fd_;
+ scoped_refptr<base::TaskRunner> io_task_runner_;
+
+ base::Lock lock_;
+ bool shutdown_ = false;
+ };
+
+ ~ChannelNacl() override {
+ for (auto handle : incoming_platform_handles_)
+ handle.CloseIfNecessary();
+ }
+
+ void ShutDownOnIOThread() {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+ reader_thread_->Stop();
+
+ // May destroy the |this| if it was the last reference.
+ self_ = nullptr;
+ }
+
+ void OnDataReceived(SimpleMessage message) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ for (int fd : message.fds) {
+ incoming_platform_handles_.push_back(PlatformHandle(fd));
+ }
+
+ bool read_error = false;
+ size_t next_read_size = 0;
+ size_t buffer_capacity = 0;
+ size_t total_bytes_read = 0;
+ do {
+ buffer_capacity = next_read_size;
+ char* buffer = GetReadBuffer(&buffer_capacity);
+ DCHECK_GT(buffer_capacity, 0u);
+
+ size_t bytes_copied = std::min(message.data.size() - total_bytes_read,
+ buffer_capacity);
+ memcpy(buffer, message.data.data() + total_bytes_read, bytes_copied);
+
+ total_bytes_read += bytes_copied;
+ if (!OnReadComplete(bytes_copied, &next_read_size)) {
+ read_error = true;
+ break;
+ }
+ } while (total_bytes_read < message.data.size() && next_read_size > 0);
+ if (read_error) {
+ // Stop reading. Note, there may be a pending read in the message loop.
+ reader_thread_->Stop();
+
+ OnError();
+ }
+ }
+
+ // Keeps the Channel alive at least until explicit shutdown on the IO thread.
+ scoped_refptr<Channel> self_;
+
+ ScopedPlatformHandle handle_;
+ scoped_refptr<base::TaskRunner> io_task_runner_;
+
+ std::unique_ptr<ReaderThread> reader_thread_;
+
+ std::deque<PlatformHandle> incoming_platform_handles_;
+
+ // Protects |pending_write_| and |outgoing_messages_|.
+ base::Lock write_lock_;
+ bool reject_writes_ = false;
+
+ DISALLOW_COPY_AND_ASSIGN(ChannelNacl);
+};
+
+} // namespace
+
+// static
+scoped_refptr<Channel> Channel::Create(
+ Delegate* delegate,
+ ScopedPlatformHandle platform_handle,
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ return new ChannelNacl(delegate, std::move(platform_handle), io_task_runner);
+}
+
+} // namespace edk
+} // namespace mojo
« no previous file with comments | « mojo/edk/system/BUILD.gn ('k') | ppapi/nacl_irt/irt_start.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698