Index: mojo/edk/system/channel_nacl.cc |
diff --git a/mojo/edk/system/channel_nacl.cc b/mojo/edk/system/channel_nacl.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a846af9479e8b6ccdb55193a4ffe18a40b69dea2 |
--- /dev/null |
+++ b/mojo/edk/system/channel_nacl.cc |
@@ -0,0 +1,272 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/channel.h" |
+ |
+#include <errno.h> |
+ |
+#include <algorithm> |
+#include <deque> |
+#include <limits> |
+#include <memory> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/macros.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/synchronization/lock.h" |
+#include "base/task_runner.h" |
+#include "mojo/edk/embedder/platform_handle_vector.h" |
+ |
+#include "base/threading/simple_thread.h" |
+#include "native_client/src/public/imc_syscalls.h" |
+#include "native_client/src/public/imc_types.h" |
+ |
+namespace mojo { |
+namespace edk { |
+ |
+namespace { |
+ |
+const size_t kReadBufferSize = 4 * 1024; |
+ |
+// Very simple message structure. Intentionally optimised for simplicity over |
+// performance. |
+struct SimpleMessage { |
+ std::string data; |
+ std::vector<int> fds; |
+}; |
+ |
+class ChannelNacl : public Channel { |
+ public: |
+ ChannelNacl(Delegate* delegate, |
+ ScopedPlatformHandle handle, |
+ scoped_refptr<base::TaskRunner> io_task_runner) |
+ : Channel(delegate), |
+ self_(this), |
+ handle_(std::move(handle)), |
+ io_task_runner_(io_task_runner) { |
+ reader_thread_.reset( |
+ new ReaderThread(this, handle.get().handle, io_task_runner_)); |
+ } |
+ |
+ void Start() override { |
+ reader_thread_->Start(); |
+ } |
+ |
+ void ShutDownImpl() override { |
+ // Always shut down asynchronously when called through the public interface. |
+ io_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&ChannelNacl::ShutDownOnIOThread, this)); |
+ } |
+ |
+ void Write(MessagePtr message) override { |
+ bool write_error = false; |
+ { |
+ base::AutoLock lock(write_lock_); |
+ if (reject_writes_) |
+ return; |
+ |
+ // For simplicity, do blocking sends. |
+ ScopedPlatformHandleVectorPtr handles = |
+ message->TakeHandlesForTransport(); |
+ std::unique_ptr<int[]> fds(new int[handles->size()]); |
+ for (size_t i = 0; i < handles->size(); i++) |
+ fds[i] = handles->at(i).handle; |
+ |
+ NaClAbiNaClImcMsgIoVec iov = { |
+ const_cast<void*>(message->data()), message->data_num_bytes() |
+ }; |
+ NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds.get(), handles->size() }; |
+ ssize_t bytes_written = imc_sendmsg(handle_.get().handle, &msgh, 0); |
+ DCHECK(bytes_written); |
+ |
+ if (bytes_written < 0 || |
+ static_cast<size_t>(bytes_written) != message->data_num_bytes()) { |
+ write_error = true; |
+ } |
+ } |
+ if (write_error) { |
+ // Do not synchronously invoke OnError(). Write() may have been called by |
+ // the delegate and we don't want to re-enter it. |
+ io_task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&ChannelNacl::OnError, this)); |
+ } |
+ } |
+ |
+ bool GetReadPlatformHandles( |
+ size_t num_handles, |
+ const void* extra_header, |
+ size_t extra_header_size, |
+ ScopedPlatformHandleVectorPtr* handles) override { |
+ if (num_handles > std::numeric_limits<uint16_t>::max()) |
+ return false; |
+ if (incoming_platform_handles_.size() < num_handles) { |
+ handles->reset(); |
+ return true; |
+ } |
+ |
+ handles->reset(new PlatformHandleVector(num_handles)); |
+ for (size_t i = 0; i < num_handles; ++i) { |
+ (*handles)->at(i) = incoming_platform_handles_.front(); |
+ incoming_platform_handles_.pop_front(); |
+ } |
+ |
+ return true; |
+ } |
+ |
+ private: |
+ class ReaderThread : public base::SimpleThread { |
+ public: |
+ ReaderThread(scoped_refptr<ChannelNacl> channel_nacl, int channel_fd, |
+ scoped_refptr<base::TaskRunner> io_task_runner) |
+ : base::SimpleThread("Mojo_ChannelNacl_ReaderThread"), |
+ channel_nacl_(channel_nacl), |
+ channel_fd_(channel_fd), |
+ io_task_runner_(io_task_runner) { |
+ DCHECK(channel_fd_ != -1); |
+ } |
+ |
+ ~ReaderThread() override { |
+ Stop(); |
+ } |
+ |
+ void Run() override { |
+ while (true) { |
+ { |
+ base::AutoLock l(lock_); |
+ if (shutdown_) |
+ return; |
+ } |
+ |
+ SimpleMessage message; |
+ message.data.resize(kReadBufferSize); |
+ message.fds.resize(NACL_ABI_IMC_DESC_MAX); |
+ |
+ NaClAbiNaClImcMsgIoVec iov = { |
+ &message.data[0], message.data.size() |
+ }; |
+ NaClAbiNaClImcMsgHdr msg = { |
+ &iov, 1, message.fds.data(), message.fds.size() |
+ }; |
+ |
+ int bytes_read = imc_recvmsg(channel_fd_, &msg, 0); |
+ if (bytes_read < 0) { |
+ io_task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&ChannelNacl::OnError, |
+ channel_nacl_)); |
+ return; |
+ } |
+ |
+ message.data.resize(bytes_read); |
+ message.fds.resize(msg.desc_length); |
+ |
+ io_task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&ChannelNacl::OnDataReceived, |
+ channel_nacl_, |
+ base::Passed(&message))); |
+ } |
+ } |
+ |
+ void Stop() { |
+ if (HasBeenJoined()) |
+ return; |
+ |
+ { |
+ base::AutoLock l(lock_); |
+ shutdown_ = true; |
+ } |
+ |
+ // Signals the thread to wake up. |
+ close(channel_fd_); |
+ Join(); |
+ |
+ channel_fd_ = -1; |
+ } |
+ |
+ private: |
+ scoped_refptr<ChannelNacl> channel_nacl_; |
+ int channel_fd_; |
+ scoped_refptr<base::TaskRunner> io_task_runner_; |
+ |
+ base::Lock lock_; |
+ bool shutdown_ = false; |
+ }; |
+ |
+ ~ChannelNacl() override { |
+ for (auto handle : incoming_platform_handles_) |
+ handle.CloseIfNecessary(); |
+ } |
+ |
+ void ShutDownOnIOThread() { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ reader_thread_->Stop(); |
+ |
+ // May destroy the |this| if it was the last reference. |
+ self_ = nullptr; |
+ } |
+ |
+ void OnDataReceived(SimpleMessage message) { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ for (int fd : message.fds) { |
+ incoming_platform_handles_.push_back(PlatformHandle(fd)); |
+ } |
+ |
+ bool read_error = false; |
+ size_t next_read_size = 0; |
+ size_t buffer_capacity = 0; |
+ size_t total_bytes_read = 0; |
+ do { |
+ buffer_capacity = next_read_size; |
+ char* buffer = GetReadBuffer(&buffer_capacity); |
+ DCHECK_GT(buffer_capacity, 0u); |
+ |
+ size_t bytes_copied = std::min(message.data.size() - total_bytes_read, |
+ buffer_capacity); |
+ memcpy(buffer, message.data.data() + total_bytes_read, bytes_copied); |
+ |
+ total_bytes_read += bytes_copied; |
+ if (!OnReadComplete(bytes_copied, &next_read_size)) { |
+ read_error = true; |
+ break; |
+ } |
+ } while (total_bytes_read < message.data.size() && next_read_size > 0); |
+ if (read_error) { |
+ // Stop reading. Note, there may be a pending read in the message loop. |
+ reader_thread_->Stop(); |
+ |
+ OnError(); |
+ } |
+ } |
+ |
+ // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
+ scoped_refptr<Channel> self_; |
+ |
+ ScopedPlatformHandle handle_; |
+ scoped_refptr<base::TaskRunner> io_task_runner_; |
+ |
+ std::unique_ptr<ReaderThread> reader_thread_; |
+ |
+ std::deque<PlatformHandle> incoming_platform_handles_; |
+ |
+ // Protects |pending_write_| and |outgoing_messages_|. |
+ base::Lock write_lock_; |
+ bool reject_writes_ = false; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ChannelNacl); |
+}; |
+ |
+} // namespace |
+ |
+// static |
+scoped_refptr<Channel> Channel::Create( |
+ Delegate* delegate, |
+ ScopedPlatformHandle platform_handle, |
+ scoped_refptr<base::TaskRunner> io_task_runner) { |
+ return new ChannelNacl(delegate, std::move(platform_handle), io_task_runner); |
+} |
+ |
+} // namespace edk |
+} // namespace mojo |