Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(879)

Unified Diff: mojo/edk/system/channel_posix.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/channel_posix.cc
diff --git a/mojo/edk/system/channel_posix.cc b/mojo/edk/system/channel_posix.cc
new file mode 100644
index 0000000000000000000000000000000000000000..8abe4c5470adaeddf4ad18dbb5ea43b2646980a4
--- /dev/null
+++ b/mojo/edk/system/channel_posix.cc
@@ -0,0 +1,347 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/channel.h"
+
+#include <errno.h>
+#include <sys/uio.h>
+
+#include <algorithm>
+#include <deque>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/lock.h"
+#include "base/task_runner.h"
+#include "mojo/edk/embedder/platform_channel_utils_posix.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+const size_t kMaxBatchReadCapacity = 256 * 1024;
+
+// A view over a Channel::Message object. The write queue uses these since
+// large messages may need to be sent in chunks.
+class MessageView {
+ public:
+ // Owns |message|. |offset| indexes the first unsent byte in the message.
+ MessageView(Channel::MessagePtr message, size_t offset)
+ : message_(std::move(message)),
+ offset_(offset),
+ handles_(message_->TakeHandles()) {
+ DCHECK_GT(message_->data_num_bytes(), offset_);
+ }
+
+ MessageView(MessageView&& other) { *this = std::move(other); }
+
+ MessageView& operator=(MessageView&& other) {
+ message_ = std::move(other.message_);
+ offset_ = other.offset_;
+ handles_ = std::move(other.handles_);
+ return *this;
+ }
+
+ ~MessageView() {}
+
+ const void* data() const {
+ return static_cast<const char*>(message_->data()) + offset_;
+ }
+
+ size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
+
+ size_t data_offset() const { return offset_; }
+ void advance_data_offset(size_t num_bytes) {
+ DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
+ offset_ += num_bytes;
+ }
+
+ ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
+ Channel::MessagePtr TakeMessage() { return std::move(message_); }
+
+ private:
+ Channel::MessagePtr message_;
+ size_t offset_;
+ ScopedPlatformHandleVectorPtr handles_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageView);
+};
+
+class ChannelPosix : public Channel,
+ public base::MessageLoop::DestructionObserver,
+ public base::MessageLoopForIO::Watcher {
+ public:
+ ChannelPosix(Delegate* delegate,
+ ScopedPlatformHandle handle,
+ scoped_refptr<base::TaskRunner> io_task_runner)
+ : Channel(delegate),
+ self_(this),
+ handle_(std::move(handle)),
+ io_task_runner_(io_task_runner) {
+ }
+
+ void Start() override {
+ if (io_task_runner_->RunsTasksOnCurrentThread()) {
+ StartOnIOThread();
+ } else {
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
+ }
+ }
+
+ void ShutDownImpl() override {
+ // Always shut down asynchronously when called through the public interface.
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
+ }
+
+ void Write(MessagePtr message) override {
+ bool write_error = false;
+ {
+ base::AutoLock lock(write_lock_);
+ if (reject_writes_)
+ return;
+ if (outgoing_messages_.empty()) {
+ if (!WriteNoLock(MessageView(std::move(message), 0)))
+ reject_writes_ = write_error = true;
+ } else {
+ outgoing_messages_.emplace_back(std::move(message), 0);
+ }
+ }
+ if (write_error) {
+ // Do not synchronously invoke OnError(). Write() may have been called by
+ // the delegate and we don't want to re-enter it.
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&ChannelPosix::OnError, this));
+ }
+ }
+
+ ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
+ size_t num_handles,
+ void** payload,
+ size_t* payload_size) override {
+ if (incoming_platform_handles_.size() < num_handles)
+ return nullptr;
+ ScopedPlatformHandleVectorPtr handles(
+ new PlatformHandleVector(num_handles));
+ for (size_t i = 0; i < num_handles; ++i) {
+ (*handles)[i] = incoming_platform_handles_.front();
+ incoming_platform_handles_.pop_front();
+ }
+ return handles;
+ }
+
+ private:
+ ~ChannelPosix() override {
+ DCHECK(!read_watcher_);
+ DCHECK(!write_watcher_);
+ for (auto handle : incoming_platform_handles_)
+ handle.CloseIfNecessary();
+ }
+
+ void StartOnIOThread() {
+ DCHECK(!read_watcher_);
+ DCHECK(!write_watcher_);
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
+ base::MessageLoopForIO::current()->WatchFileDescriptor(
+ handle_.get().handle, true /* persistent */,
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
+ base::MessageLoop::current()->AddDestructionObserver(this);
+ }
+
+ void WaitForWriteOnIOThread() {
+ base::AutoLock lock(write_lock_);
+ WaitForWriteOnIOThreadNoLock();
+ }
+
+ void WaitForWriteOnIOThreadNoLock() {
+ if (pending_write_)
+ return;
+ if (!write_watcher_)
+ return;
+ if (io_task_runner_->RunsTasksOnCurrentThread()) {
+ pending_write_ = true;
+ base::MessageLoopForIO::current()->WatchFileDescriptor(
+ handle_.get().handle, false /* persistent */,
+ base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
+ } else {
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
+ }
+ }
+
+ void ShutDownOnIOThread() {
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
+
+ read_watcher_.reset();
+ write_watcher_.reset();
+ handle_.reset();
+
+ // May destroy the |this| if it was the last reference.
+ self_ = nullptr;
+ }
+
+ // base::MessageLoop::DestructionObserver:
+ void WillDestroyCurrentMessageLoop() override {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+ if (self_)
+ ShutDownOnIOThread();
+ }
+
+ // base::MessageLoopForIO::Watcher:
+ void OnFileCanReadWithoutBlocking(int fd) override {
+ CHECK_EQ(fd, handle_.get().handle);
+
+ bool read_error = false;
+ size_t next_read_size = 0;
+ size_t buffer_capacity = 0;
+ size_t total_bytes_read = 0;
+ size_t bytes_read = 0;
+ do {
+ buffer_capacity = next_read_size;
+ char* buffer = GetReadBuffer(&buffer_capacity);
+ DCHECK_GT(buffer_capacity, 0u);
+
+ ssize_t read_result = PlatformChannelRecvmsg(
+ handle_.get(),
+ buffer,
+ buffer_capacity,
+ &incoming_platform_handles_);
+
+ if (read_result > 0) {
+ bytes_read = static_cast<size_t>(read_result);
+ total_bytes_read += bytes_read;
+ if (!OnReadComplete(bytes_read, &next_read_size)) {
+ read_error = true;
+ break;
+ }
+ } else if (read_result == 0 ||
+ (errno != EAGAIN && errno != EWOULDBLOCK)) {
+ read_error = true;
+ break;
+ }
+ } while (bytes_read == buffer_capacity &&
+ total_bytes_read < kMaxBatchReadCapacity &&
+ next_read_size > 0);
+ if (read_error) {
+ // Stop receiving read notifications.
+ read_watcher_.reset();
+
+ OnError();
+ }
+ }
+
+ void OnFileCanWriteWithoutBlocking(int fd) override {
+ bool write_error = false;
+ {
+ base::AutoLock lock(write_lock_);
+ pending_write_ = false;
+ if (!FlushOutgoingMessagesNoLock())
+ reject_writes_ = write_error = true;
+ }
+ if (write_error)
+ OnError();
+ }
+
+ // Attempts to write a message directly to the channel. If the full message
+ // cannot be written, it's queued and a wait is initiated to write the message
+ // ASAP on the I/O thread.
+ bool WriteNoLock(MessageView message_view) {
+ size_t bytes_written = 0;
+ do {
+ message_view.advance_data_offset(bytes_written);
+
+ ssize_t result;
+ ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
+ if (handles && handles->size()) {
+ iovec iov = {
+ const_cast<void*>(message_view.data()),
+ message_view.data_num_bytes()
+ };
+ // TODO: Handle lots of handles.
+ result = PlatformChannelSendmsgWithHandles(
+ handle_.get(), &iov, 1, handles->data(), handles->size());
+ handles->clear();
+ } else {
+ result = PlatformChannelWrite(handle_.get(), message_view.data(),
+ message_view.data_num_bytes());
+ }
+
+ if (result < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ return false;
+ outgoing_messages_.emplace_back(std::move(message_view));
+ WaitForWriteOnIOThreadNoLock();
+ return true;
+ }
+
+ bytes_written = static_cast<size_t>(result);
+ } while (bytes_written < message_view.data_num_bytes());
+
+ return true;
+ }
+
+ bool FlushOutgoingMessagesNoLock() {
+ std::deque<MessageView> messages;
+ std::swap(outgoing_messages_, messages);
+
+ while (!messages.empty()) {
+ if (!WriteNoLock(std::move(messages.front())))
+ return false;
+
+ messages.pop_front();
+ if (!outgoing_messages_.empty()) {
+ // The message was requeued by WriteNoLock(), so we have to wait for
+ // pipe to become writable again. Repopulate the message queue and exit.
+ DCHECK_EQ(outgoing_messages_.size(), 1u);
+ MessageView message_view = std::move(outgoing_messages_.front());
+ std::swap(messages, outgoing_messages_);
+ outgoing_messages_.push_front(std::move(message_view));
+ return true;
+ }
+ }
+
+ return true;
+ }
+
+ // Keeps the Channel alive at least until explicit shutdown on the IO thread.
+ scoped_refptr<Channel> self_;
+
+ ScopedPlatformHandle handle_;
+ scoped_refptr<base::TaskRunner> io_task_runner_;
+
+ // These watchers must only be accessed on the IO thread.
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
+
+ std::deque<PlatformHandle> incoming_platform_handles_;
+
+ // Protects |pending_write_| and |outgoing_messages_|.
+ base::Lock write_lock_;
+ bool pending_write_ = false;
+ bool reject_writes_ = false;
+ std::deque<MessageView> outgoing_messages_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
+};
+
+} // namespace
+
+// static
+scoped_refptr<Channel> Channel::Create(
+ Delegate* delegate,
+ ScopedPlatformHandle platform_handle,
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner);
+}
+
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698