Index: mojo/edk/system/channel_posix.cc |
diff --git a/mojo/edk/system/channel_posix.cc b/mojo/edk/system/channel_posix.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8abe4c5470adaeddf4ad18dbb5ea43b2646980a4 |
--- /dev/null |
+++ b/mojo/edk/system/channel_posix.cc |
@@ -0,0 +1,347 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/channel.h" |
+ |
+#include <errno.h> |
+#include <sys/uio.h> |
+ |
+#include <algorithm> |
+#include <deque> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/macros.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/synchronization/lock.h" |
+#include "base/task_runner.h" |
+#include "mojo/edk/embedder/platform_channel_utils_posix.h" |
+#include "mojo/edk/embedder/platform_handle_vector.h" |
+ |
+namespace mojo { |
+namespace edk { |
+ |
+namespace { |
+ |
+const size_t kMaxBatchReadCapacity = 256 * 1024; |
+ |
+// A view over a Channel::Message object. The write queue uses these since |
+// large messages may need to be sent in chunks. |
+class MessageView { |
+ public: |
+ // Owns |message|. |offset| indexes the first unsent byte in the message. |
+ MessageView(Channel::MessagePtr message, size_t offset) |
+ : message_(std::move(message)), |
+ offset_(offset), |
+ handles_(message_->TakeHandles()) { |
+ DCHECK_GT(message_->data_num_bytes(), offset_); |
+ } |
+ |
+ MessageView(MessageView&& other) { *this = std::move(other); } |
+ |
+ MessageView& operator=(MessageView&& other) { |
+ message_ = std::move(other.message_); |
+ offset_ = other.offset_; |
+ handles_ = std::move(other.handles_); |
+ return *this; |
+ } |
+ |
+ ~MessageView() {} |
+ |
+ const void* data() const { |
+ return static_cast<const char*>(message_->data()) + offset_; |
+ } |
+ |
+ size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } |
+ |
+ size_t data_offset() const { return offset_; } |
+ void advance_data_offset(size_t num_bytes) { |
+ DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); |
+ offset_ += num_bytes; |
+ } |
+ |
+ ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); } |
+ Channel::MessagePtr TakeMessage() { return std::move(message_); } |
+ |
+ private: |
+ Channel::MessagePtr message_; |
+ size_t offset_; |
+ ScopedPlatformHandleVectorPtr handles_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(MessageView); |
+}; |
+ |
+class ChannelPosix : public Channel, |
+ public base::MessageLoop::DestructionObserver, |
+ public base::MessageLoopForIO::Watcher { |
+ public: |
+ ChannelPosix(Delegate* delegate, |
+ ScopedPlatformHandle handle, |
+ scoped_refptr<base::TaskRunner> io_task_runner) |
+ : Channel(delegate), |
+ self_(this), |
+ handle_(std::move(handle)), |
+ io_task_runner_(io_task_runner) { |
+ } |
+ |
+ void Start() override { |
+ if (io_task_runner_->RunsTasksOnCurrentThread()) { |
+ StartOnIOThread(); |
+ } else { |
+ io_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this)); |
+ } |
+ } |
+ |
+ void ShutDownImpl() override { |
+ // Always shut down asynchronously when called through the public interface. |
+ io_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this)); |
+ } |
+ |
+ void Write(MessagePtr message) override { |
+ bool write_error = false; |
+ { |
+ base::AutoLock lock(write_lock_); |
+ if (reject_writes_) |
+ return; |
+ if (outgoing_messages_.empty()) { |
+ if (!WriteNoLock(MessageView(std::move(message), 0))) |
+ reject_writes_ = write_error = true; |
+ } else { |
+ outgoing_messages_.emplace_back(std::move(message), 0); |
+ } |
+ } |
+ if (write_error) { |
+ // Do not synchronously invoke OnError(). Write() may have been called by |
+ // the delegate and we don't want to re-enter it. |
+ io_task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&ChannelPosix::OnError, this)); |
+ } |
+ } |
+ |
+ ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
+ size_t num_handles, |
+ void** payload, |
+ size_t* payload_size) override { |
+ if (incoming_platform_handles_.size() < num_handles) |
+ return nullptr; |
+ ScopedPlatformHandleVectorPtr handles( |
+ new PlatformHandleVector(num_handles)); |
+ for (size_t i = 0; i < num_handles; ++i) { |
+ (*handles)[i] = incoming_platform_handles_.front(); |
+ incoming_platform_handles_.pop_front(); |
+ } |
+ return handles; |
+ } |
+ |
+ private: |
+ ~ChannelPosix() override { |
+ DCHECK(!read_watcher_); |
+ DCHECK(!write_watcher_); |
+ for (auto handle : incoming_platform_handles_) |
+ handle.CloseIfNecessary(); |
+ } |
+ |
+ void StartOnIOThread() { |
+ DCHECK(!read_watcher_); |
+ DCHECK(!write_watcher_); |
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); |
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); |
+ base::MessageLoopForIO::current()->WatchFileDescriptor( |
+ handle_.get().handle, true /* persistent */, |
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); |
+ base::MessageLoop::current()->AddDestructionObserver(this); |
+ } |
+ |
+ void WaitForWriteOnIOThread() { |
+ base::AutoLock lock(write_lock_); |
+ WaitForWriteOnIOThreadNoLock(); |
+ } |
+ |
+ void WaitForWriteOnIOThreadNoLock() { |
+ if (pending_write_) |
+ return; |
+ if (!write_watcher_) |
+ return; |
+ if (io_task_runner_->RunsTasksOnCurrentThread()) { |
+ pending_write_ = true; |
+ base::MessageLoopForIO::current()->WatchFileDescriptor( |
+ handle_.get().handle, false /* persistent */, |
+ base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this); |
+ } else { |
+ io_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this)); |
+ } |
+ } |
+ |
+ void ShutDownOnIOThread() { |
+ base::MessageLoop::current()->RemoveDestructionObserver(this); |
+ |
+ read_watcher_.reset(); |
+ write_watcher_.reset(); |
+ handle_.reset(); |
+ |
+ // May destroy the |this| if it was the last reference. |
+ self_ = nullptr; |
+ } |
+ |
+ // base::MessageLoop::DestructionObserver: |
+ void WillDestroyCurrentMessageLoop() override { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ if (self_) |
+ ShutDownOnIOThread(); |
+ } |
+ |
+ // base::MessageLoopForIO::Watcher: |
+ void OnFileCanReadWithoutBlocking(int fd) override { |
+ CHECK_EQ(fd, handle_.get().handle); |
+ |
+ bool read_error = false; |
+ size_t next_read_size = 0; |
+ size_t buffer_capacity = 0; |
+ size_t total_bytes_read = 0; |
+ size_t bytes_read = 0; |
+ do { |
+ buffer_capacity = next_read_size; |
+ char* buffer = GetReadBuffer(&buffer_capacity); |
+ DCHECK_GT(buffer_capacity, 0u); |
+ |
+ ssize_t read_result = PlatformChannelRecvmsg( |
+ handle_.get(), |
+ buffer, |
+ buffer_capacity, |
+ &incoming_platform_handles_); |
+ |
+ if (read_result > 0) { |
+ bytes_read = static_cast<size_t>(read_result); |
+ total_bytes_read += bytes_read; |
+ if (!OnReadComplete(bytes_read, &next_read_size)) { |
+ read_error = true; |
+ break; |
+ } |
+ } else if (read_result == 0 || |
+ (errno != EAGAIN && errno != EWOULDBLOCK)) { |
+ read_error = true; |
+ break; |
+ } |
+ } while (bytes_read == buffer_capacity && |
+ total_bytes_read < kMaxBatchReadCapacity && |
+ next_read_size > 0); |
+ if (read_error) { |
+ // Stop receiving read notifications. |
+ read_watcher_.reset(); |
+ |
+ OnError(); |
+ } |
+ } |
+ |
+ void OnFileCanWriteWithoutBlocking(int fd) override { |
+ bool write_error = false; |
+ { |
+ base::AutoLock lock(write_lock_); |
+ pending_write_ = false; |
+ if (!FlushOutgoingMessagesNoLock()) |
+ reject_writes_ = write_error = true; |
+ } |
+ if (write_error) |
+ OnError(); |
+ } |
+ |
+ // Attempts to write a message directly to the channel. If the full message |
+ // cannot be written, it's queued and a wait is initiated to write the message |
+ // ASAP on the I/O thread. |
+ bool WriteNoLock(MessageView message_view) { |
+ size_t bytes_written = 0; |
+ do { |
+ message_view.advance_data_offset(bytes_written); |
+ |
+ ssize_t result; |
+ ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); |
+ if (handles && handles->size()) { |
+ iovec iov = { |
+ const_cast<void*>(message_view.data()), |
+ message_view.data_num_bytes() |
+ }; |
+ // TODO: Handle lots of handles. |
+ result = PlatformChannelSendmsgWithHandles( |
+ handle_.get(), &iov, 1, handles->data(), handles->size()); |
+ handles->clear(); |
+ } else { |
+ result = PlatformChannelWrite(handle_.get(), message_view.data(), |
+ message_view.data_num_bytes()); |
+ } |
+ |
+ if (result < 0) { |
+ if (errno != EAGAIN && errno != EWOULDBLOCK) |
+ return false; |
+ outgoing_messages_.emplace_back(std::move(message_view)); |
+ WaitForWriteOnIOThreadNoLock(); |
+ return true; |
+ } |
+ |
+ bytes_written = static_cast<size_t>(result); |
+ } while (bytes_written < message_view.data_num_bytes()); |
+ |
+ return true; |
+ } |
+ |
+ bool FlushOutgoingMessagesNoLock() { |
+ std::deque<MessageView> messages; |
+ std::swap(outgoing_messages_, messages); |
+ |
+ while (!messages.empty()) { |
+ if (!WriteNoLock(std::move(messages.front()))) |
+ return false; |
+ |
+ messages.pop_front(); |
+ if (!outgoing_messages_.empty()) { |
+ // The message was requeued by WriteNoLock(), so we have to wait for |
+ // pipe to become writable again. Repopulate the message queue and exit. |
+ DCHECK_EQ(outgoing_messages_.size(), 1u); |
+ MessageView message_view = std::move(outgoing_messages_.front()); |
+ std::swap(messages, outgoing_messages_); |
+ outgoing_messages_.push_front(std::move(message_view)); |
+ return true; |
+ } |
+ } |
+ |
+ return true; |
+ } |
+ |
+ // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
+ scoped_refptr<Channel> self_; |
+ |
+ ScopedPlatformHandle handle_; |
+ scoped_refptr<base::TaskRunner> io_task_runner_; |
+ |
+ // These watchers must only be accessed on the IO thread. |
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; |
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; |
+ |
+ std::deque<PlatformHandle> incoming_platform_handles_; |
+ |
+ // Protects |pending_write_| and |outgoing_messages_|. |
+ base::Lock write_lock_; |
+ bool pending_write_ = false; |
+ bool reject_writes_ = false; |
+ std::deque<MessageView> outgoing_messages_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ChannelPosix); |
+}; |
+ |
+} // namespace |
+ |
+// static |
+scoped_refptr<Channel> Channel::Create( |
+ Delegate* delegate, |
+ ScopedPlatformHandle platform_handle, |
+ scoped_refptr<base::TaskRunner> io_task_runner) { |
+ return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); |
+} |
+ |
+} // namespace edk |
+} // namespace mojo |