Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(336)

Unified Diff: mojo/edk/system/channel_win.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/channel_win.cc
diff --git a/mojo/edk/system/channel_win.cc b/mojo/edk/system/channel_win.cc
new file mode 100644
index 0000000000000000000000000000000000000000..00d06e1f03a50e4289f6b5d906d64260ae4862f0
--- /dev/null
+++ b/mojo/edk/system/channel_win.cc
@@ -0,0 +1,306 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/channel.h"
+
+#include <windows.h>
+
+#include <algorithm>
+#include <deque>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/lock.h"
+#include "base/task_runner.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+const size_t kMaxBatchReadCapacity = 256 * 1024;
+
+// A view over a Channel::Message object. The write queue uses these since
+// large messages may need to be sent in chunks.
+class MessageView {
+ public:
+ // Owns |message|. |offset| indexes the first unsent byte in the message.
+ MessageView(Channel::MessagePtr message, size_t offset)
+ : message_(std::move(message)),
+ offset_(offset) {
+ DCHECK_GT(message_->data_num_bytes(), offset_);
+ }
+
+ MessageView(MessageView&& other) { *this = std::move(other); }
+
+ MessageView& operator=(MessageView&& other) {
+ message_ = std::move(other.message_);
+ offset_ = other.offset_;
+ return *this;
+ }
+
+ ~MessageView() {}
+
+ const void* data() const {
+ return static_cast<const char*>(message_->data()) + offset_;
+ }
+
+ size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
+
+ size_t data_offset() const { return offset_; }
+ void advance_data_offset(size_t num_bytes) {
+ DCHECK_GE(message_->data_num_bytes(), offset_ + num_bytes);
+ offset_ += num_bytes;
+ }
+
+ Channel::MessagePtr TakeChannelMessage() { return std::move(message_); }
+
+ private:
+ Channel::MessagePtr message_;
+ size_t offset_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageView);
+};
+
+class ChannelWin : public Channel,
+ public base::MessageLoop::DestructionObserver,
+ public base::MessageLoopForIO::IOHandler {
+ public:
+ ChannelWin(Delegate* delegate,
+ ScopedPlatformHandle handle,
+ scoped_refptr<base::TaskRunner> io_task_runner)
+ : Channel(delegate),
+ self_(this),
+ handle_(std::move(handle)),
+ io_task_runner_(io_task_runner) {
+ memset(&read_context_, 0, sizeof(read_context_));
+ read_context_.handler = this;
+
+ memset(&write_context_, 0, sizeof(write_context_));
+ write_context_.handler = this;
+ }
+
+ void Start() override {
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&ChannelWin::StartOnIOThread, this));
+ }
+
+ void ShutDownImpl() override {
+ // Always shut down asynchronously when called through the public interface.
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&ChannelWin::ShutDownOnIOThread, this));
+ }
+
+ void Write(MessagePtr message) override {
+ bool write_error = false;
+ {
+ base::AutoLock lock(write_lock_);
+ if (reject_writes_)
+ return;
+
+ bool write_now = !delay_writes_ && outgoing_messages_.empty();
+ outgoing_messages_.emplace_back(std::move(message), 0);
+
+ if (write_now && !WriteNoLock(outgoing_messages_.front()))
+ reject_writes_ = write_error = true;
+ }
+ if (write_error) {
+ // Do not synchronously invoke OnError(). Write() may have been called by
+ // the delegate and we don't want to re-enter it.
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&ChannelWin::OnError, this));
+ }
+ }
+
+ ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
+ size_t num_handles,
+ void** payload,
+ size_t* payload_size) override {
+ size_t handles_size = sizeof(PlatformHandle) * num_handles;
+ if (handles_size > *payload_size)
+ return nullptr;
+
+ *payload_size -= handles_size;
+ ScopedPlatformHandleVectorPtr handles(
+ new PlatformHandleVector(num_handles));
+ memcpy(handles->data(),
+ static_cast<const char*>(*payload) + *payload_size, handles_size);
+ return handles;
+ }
+
+ private:
+ // May run on any thread.
+ ~ChannelWin() override {}
+
+ void StartOnIOThread() {
+ base::MessageLoop::current()->AddDestructionObserver(this);
+ base::MessageLoopForIO::current()->RegisterIOHandler(
+ handle_.get().handle, this);
+
+ // Now that we have registered our IOHandler, we can start writing.
+ {
+ base::AutoLock lock(write_lock_);
+ if (delay_writes_) {
+ delay_writes_ = false;
+ WriteNextNoLock();
+ }
+ }
+
+ // Keep this alive in case we synchronously run shutdown.
+ scoped_refptr<ChannelWin> keep_alive(this);
+ ReadMore(0);
+ }
+
+ void ShutDownOnIOThread() {
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
+
+ CancelIo(handle_.get().handle);
+ handle_.reset();
+
+ // May destroy the |this| if it was the last reference.
+ self_ = nullptr;
+ }
+
+ // base::MessageLoop::DestructionObserver:
+ void WillDestroyCurrentMessageLoop() override {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+ if (self_)
+ ShutDownOnIOThread();
+ }
+
+ // base::MessageLoop::IOHandler:
+ void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered,
+ DWORD error) override {
+ if (error != ERROR_SUCCESS) {
+ OnError();
+ } else if (context == &read_context_) {
+ OnReadDone(static_cast<size_t>(bytes_transfered));
+ } else {
+ CHECK(context == &write_context_);
+ OnWriteDone(static_cast<size_t>(bytes_transfered));
+ }
+ Release(); // Balancing reference taken after ReadFile / WriteFile.
+ }
+
+ void OnReadDone(size_t bytes_read) {
+ if (bytes_read > 0) {
+ size_t next_read_size = 0;
+ if (OnReadComplete(bytes_read, &next_read_size)) {
+ ReadMore(next_read_size);
+ } else {
+ OnError();
+ }
+ } else if (bytes_read == 0) {
+ OnError();
+ }
+ }
+
+ void OnWriteDone(size_t bytes_written) {
+ if (bytes_written == 0)
+ return;
+
+ bool write_error = false;
+ {
+ base::AutoLock lock(write_lock_);
+
+ DCHECK(!outgoing_messages_.empty());
+
+ MessageView& message_view = outgoing_messages_.front();
+ message_view.advance_data_offset(bytes_written);
+ if (message_view.data_num_bytes() == 0) {
+ Channel::MessagePtr message = message_view.TakeChannelMessage();
+ outgoing_messages_.pop_front();
+
+ // Clear any handles so they don't get closed on destruction.
+ ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
+ if (handles)
+ handles->clear();
+ }
+
+ if (!WriteNextNoLock())
+ reject_writes_ = write_error = true;
+ }
+ if (write_error)
+ OnError();
+ }
+
+ void ReadMore(size_t next_read_size_hint) {
+ size_t buffer_capacity = next_read_size_hint;
+ char* buffer = GetReadBuffer(&buffer_capacity);
+ DCHECK_GT(buffer_capacity, 0u);
+
+ BOOL ok = ReadFile(handle_.get().handle,
+ buffer,
+ static_cast<DWORD>(buffer_capacity),
+ NULL,
+ &read_context_.overlapped);
+
+ if (ok || GetLastError() == ERROR_IO_PENDING) {
+ AddRef(); // Will be balanced in OnIOCompleted
+ } else {
+ OnError();
+ }
+ }
+
+ // Attempts to write a message directly to the channel. If the full message
+ // cannot be written, it's queued and a wait is initiated to write the message
+ // ASAP on the I/O thread.
+ bool WriteNoLock(const MessageView& message_view) {
+ BOOL ok = WriteFile(handle_.get().handle,
+ message_view.data(),
+ static_cast<DWORD>(message_view.data_num_bytes()),
+ NULL,
+ &write_context_.overlapped);
+
+ if (ok || GetLastError() == ERROR_IO_PENDING) {
+ AddRef(); // Will be balanced in OnIOCompleted.
+ return true;
+ }
+ return false;
+ }
+
+ bool WriteNextNoLock() {
+ if (outgoing_messages_.empty())
+ return true;
+ return WriteNoLock(outgoing_messages_.front());
+ }
+
+ // Keeps the Channel alive at least until explicit shutdown on the IO thread.
+ scoped_refptr<Channel> self_;
+
+ ScopedPlatformHandle handle_;
+ scoped_refptr<base::TaskRunner> io_task_runner_;
+
+ base::MessageLoopForIO::IOContext read_context_;
+ base::MessageLoopForIO::IOContext write_context_;
+
+ // Protects |reject_writes_| and |outgoing_messages_|.
+ base::Lock write_lock_;
+
+ bool delay_writes_ = true;
+
+ bool reject_writes_ = false;
+ std::deque<MessageView> outgoing_messages_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChannelWin);
+};
+
+} // namespace
+
+// static
+scoped_refptr<Channel> Channel::Create(
+ Delegate* delegate,
+ ScopedPlatformHandle platform_handle,
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ return new ChannelWin(delegate, std::move(platform_handle), io_task_runner);
+}
+
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698