Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1220)

Unified Diff: mojo/edk/system/channel.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/channel.cc
diff --git a/mojo/edk/system/channel.cc b/mojo/edk/system/channel.cc
new file mode 100644
index 0000000000000000000000000000000000000000..fd6370d2c89bc9cc9323b323f57c0c9684d297a2
--- /dev/null
+++ b/mojo/edk/system/channel.cc
@@ -0,0 +1,339 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/channel.h"
+
+#include <string.h>
+
+#include <algorithm>
+#include <limits>
+
+#include "base/macros.h"
+#include "base/memory/aligned_memory.h"
+#include "mojo/edk/embedder/platform_handle.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+static_assert(sizeof(Channel::Message::Header) % kChannelMessageAlignment == 0,
+ "Invalid Header size.");
+
+} // namespace
+
+const size_t kReadBufferSize = 4096;
+const size_t kMaxUnusedReadBufferCapacity = 256 * 1024;
+const size_t kMaxChannelMessageSize = 256 * 1024 * 1024;
+
+Channel::Message::Message(size_t payload_size, size_t num_handles) {
+ size_ = payload_size + sizeof(Header);
+#if defined(OS_WIN)
+ // On Windows we serialize platform handles directly into the message buffer.
+ size_ += num_handles * sizeof(PlatformHandle);
+#endif
+
+ data_ = static_cast<char*>(base::AlignedAlloc(size_,
+ kChannelMessageAlignment));
+ header_ = reinterpret_cast<Header*>(data_);
+
+ DCHECK_LE(size_, std::numeric_limits<uint32_t>::max());
+ header_->num_bytes = static_cast<uint32_t>(size_);
+
+ DCHECK_LE(num_handles, std::numeric_limits<uint16_t>::max());
+ header_->num_handles = static_cast<uint16_t>(num_handles);
+
+ header_->padding = 0;
+
+#if defined(OS_WIN)
+ if (num_handles > 0) {
+ handles_ = reinterpret_cast<PlatformHandle*>(
+ data_ + sizeof(Header) + payload_size);
+ // Initialize all handles to invalid values.
+ for (size_t i = 0; i < header_->num_handles; ++i)
+ handles()[i] = PlatformHandle();
+ }
+#endif
+}
+
+Channel::Message::~Message() {
+#if defined(OS_WIN)
+ // On POSIX the ScopedPlatformHandleVectorPtr will do this for us.
+ for (size_t i = 0; i < header_->num_handles; ++i)
+ handles()[i].CloseIfNecessary();
+#endif
+ base::AlignedFree(data_);
+}
+
+// static
+Channel::MessagePtr Channel::Message::Deserialize(const void* data,
+ size_t data_num_bytes) {
+#if !defined(OS_WIN)
+ // We only serialize messages into other messages when performing message
+ // relay on Windows.
+ NOTREACHED();
+#endif
+ if (data_num_bytes < sizeof(Header))
+ return nullptr;
+
+ const Header* header = reinterpret_cast<const Header*>(data);
+ if (header->num_bytes != data_num_bytes) {
+ DLOG(ERROR) << "Decoding invalid message: " << header->num_bytes
+ << " != " << data_num_bytes;
+ return nullptr;
+ }
+
+ uint32_t handles_size = header->num_handles * sizeof(PlatformHandle);
+ if (data_num_bytes < sizeof(Header) + handles_size) {
+ DLOG(ERROR) << "Decoding invalid message:" << data_num_bytes
+ << " < " << (sizeof(Header) + handles_size);
+ return nullptr;
+ }
+
+ DCHECK_LE(handles_size, data_num_bytes - sizeof(Header));
+
+ MessagePtr message(new Message(data_num_bytes - sizeof(Header) - handles_size,
+ header->num_handles));
+
+ DCHECK_EQ(message->data_num_bytes(), data_num_bytes);
+
+ // Copy all bytes, including the serialized handles.
+ memcpy(message->mutable_payload(),
+ static_cast<const char*>(data) + sizeof(Header),
+ data_num_bytes - sizeof(Header));
+
+ return message;
+}
+
+size_t Channel::Message::payload_size() const {
+#if defined(OS_WIN)
+ return size_ - sizeof(Header) -
+ sizeof(PlatformHandle) * header_->num_handles;
+#else
+ return header_->num_bytes - sizeof(Header);
+#endif
+}
+
+PlatformHandle* Channel::Message::handles() {
+ if (header_->num_handles == 0)
+ return nullptr;
+#if defined(OS_WIN)
+ return reinterpret_cast<PlatformHandle*>(static_cast<char*>(data_) +
+ sizeof(Header) + payload_size());
+#else
+ CHECK(handle_vector_);
+ return handle_vector_->data();
+#endif
+}
+
+void Channel::Message::SetHandles(ScopedPlatformHandleVectorPtr new_handles) {
+ if (header_->num_handles == 0) {
+ CHECK(!new_handles || new_handles->size() == 0);
+ return;
+ }
+
+ CHECK(new_handles && new_handles->size() == header_->num_handles);
+#if defined(OS_WIN)
+ memcpy(handles(), new_handles->data(),
+ sizeof(PlatformHandle) * header_->num_handles);
+ new_handles->clear();
+#else
+ std::swap(handle_vector_, new_handles);
+#endif
+}
+
+ScopedPlatformHandleVectorPtr Channel::Message::TakeHandles() {
+#if defined(OS_WIN)
+ if (header_->num_handles == 0)
+ return ScopedPlatformHandleVectorPtr();
+ ScopedPlatformHandleVectorPtr moved_handles(
+ new PlatformHandleVector(header_->num_handles));
+ for (size_t i = 0; i < header_->num_handles; ++i)
+ std::swap(moved_handles->at(i), handles()[i]);
+ return moved_handles;
+#else
+ return std::move(handle_vector_);
+#endif
+}
+
+// Helper class for managing a Channel's read buffer allocations. This maintains
+// a single contiguous buffer with the layout:
+//
+// [discarded bytes][occupied bytes][unoccupied bytes]
+//
+// The Reserve() method ensures that a certain capacity of unoccupied bytes are
+// available. It does not claim that capacity and only allocates new capacity
+// when strictly necessary.
+//
+// Claim() marks unoccupied bytes as occupied.
+//
+// Discard() marks occupied bytes as discarded, signifying that their contents
+// can be forgotten or overwritten.
+//
+// The most common Channel behavior in practice should result in very few
+// allocations and copies, as memory is claimed and discarded shortly after
+// being reserved, and future reservations will immediately reuse discarded
+// memory.
+class Channel::ReadBuffer {
+ public:
+ ReadBuffer() {
+ size_ = kReadBufferSize;
+ data_ = static_cast<char*>(base::AlignedAlloc(size_,
+ kChannelMessageAlignment));
+ }
+
+ ~ReadBuffer() {
+ DCHECK(data_);
+ base::AlignedFree(data_);
+ }
+
+ const char* occupied_bytes() const { return data_ + num_discarded_bytes_; }
+
+ size_t num_occupied_bytes() const {
+ return num_occupied_bytes_ - num_discarded_bytes_;
+ }
+
+ // Ensures the ReadBuffer has enough contiguous space allocated to hold
+ // |num_bytes| more bytes; returns the address of the first available byte.
+ char* Reserve(size_t num_bytes) {
+ if (num_occupied_bytes_ + num_bytes > size_) {
+ size_ = std::max(size_ * 2, num_occupied_bytes_ + num_bytes);
+ void* new_data = base::AlignedAlloc(size_, kChannelMessageAlignment);
+ memcpy(new_data, data_, num_occupied_bytes_);
+ base::AlignedFree(data_);
+ data_ = static_cast<char*>(new_data);
+ }
+
+ return data_ + num_occupied_bytes_;
+ }
+
+ // Marks the first |num_bytes| unoccupied bytes as occupied.
+ void Claim(size_t num_bytes) {
+ DCHECK_LE(num_occupied_bytes_ + num_bytes, size_);
+ num_occupied_bytes_ += num_bytes;
+ }
+
+ // Marks the first |num_bytes| occupied bytes as discarded. This may result in
+ // shrinkage of the internal buffer, and it is not safe to assume the result
+ // of a previous Reserve() call is still valid after this.
+ void Discard(size_t num_bytes) {
+ DCHECK_LE(num_discarded_bytes_ + num_bytes, num_occupied_bytes_);
+ num_discarded_bytes_ += num_bytes;
+
+ if (num_discarded_bytes_ == num_occupied_bytes_) {
+ // We can just reuse the buffer from the beginning in this common case.
+ num_discarded_bytes_ = 0;
+ num_occupied_bytes_ = 0;
+ }
+
+ if (num_discarded_bytes_ > kMaxUnusedReadBufferCapacity) {
+ // In the uncommon case that we have a lot of discarded data at the
+ // front of the buffer, simply move remaining data to a smaller buffer.
+ size_t num_preserved_bytes = num_occupied_bytes_ - num_discarded_bytes_;
+ size_ = std::max(num_preserved_bytes, kReadBufferSize);
+ char* new_data = static_cast<char*>(
+ base::AlignedAlloc(size_, kChannelMessageAlignment));
+ memcpy(new_data, data_ + num_discarded_bytes_, num_preserved_bytes);
+ base::AlignedFree(data_);
+ data_ = new_data;
+ num_discarded_bytes_ = 0;
+ num_occupied_bytes_ = num_preserved_bytes;
+ }
+
+ // TODO: we should also adaptively shrink the buffer in case of the
+ // occasional abnormally large read.
+ }
+
+ private:
+ char* data_ = nullptr;
+
+ // The total size of the allocated buffer.
+ size_t size_ = 0;
+
+ // The number of discarded bytes at the beginning of the allocated buffer.
+ size_t num_discarded_bytes_ = 0;
+
+ // The total number of occupied bytes, including discarded bytes.
+ size_t num_occupied_bytes_ = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(ReadBuffer);
+};
+
+Channel::Channel(Delegate* delegate)
+ : delegate_(delegate), read_buffer_(new ReadBuffer) {
+}
+
+Channel::~Channel() {
+}
+
+void Channel::ShutDown() {
+ delegate_ = nullptr;
+ ShutDownImpl();
+}
+
+char* Channel::GetReadBuffer(size_t *buffer_capacity) {
+ DCHECK(read_buffer_);
+ size_t required_capacity = *buffer_capacity;
+ if (!required_capacity)
+ required_capacity = kReadBufferSize;
+
+ *buffer_capacity = required_capacity;
+ return read_buffer_->Reserve(required_capacity);
+}
+
+bool Channel::OnReadComplete(size_t bytes_read, size_t *next_read_size_hint) {
+ bool did_dispatch_message = false;
+ read_buffer_->Claim(bytes_read);
+ while (read_buffer_->num_occupied_bytes() >= sizeof(Message::Header)) {
+ // We have at least enough data available for a MessageHeader.
+ const Message::Header* header = reinterpret_cast<const Message::Header*>(
+ read_buffer_->occupied_bytes());
+ if (header->num_bytes < sizeof(Message::Header) ||
+ header->num_bytes > kMaxChannelMessageSize) {
+ LOG(ERROR) << "Invalid message size: " << header->num_bytes;
+ return false;
+ }
+
+ if (read_buffer_->num_occupied_bytes() < header->num_bytes) {
+ // Not enough data available to read the full message. Hint to the
+ // implementation that it should try reading the full size of the message.
+ *next_read_size_hint =
+ header->num_bytes - read_buffer_->num_occupied_bytes();
+ return true;
+ }
+
+ size_t payload_size = header->num_bytes - sizeof(Message::Header);
+ void* payload = payload_size ? const_cast<Message::Header*>(&header[1])
+ : nullptr;
+
+ ScopedPlatformHandleVectorPtr handles;
+ if (header->num_handles > 0) {
+ handles = GetReadPlatformHandles(header->num_handles,
+ &payload, &payload_size);
+ if (!handles) {
+ // Not enough handles available for this message.
+ break;
+ }
+ }
+
+ // We've got a complete message! Dispatch it and try another.
+ if (delegate_) {
+ delegate_->OnChannelMessage(payload, payload_size, std::move(handles));
+ did_dispatch_message = true;
+ }
+
+ read_buffer_->Discard(header->num_bytes);
+ }
+
+ *next_read_size_hint = did_dispatch_message ? 0 : kReadBufferSize;
+ return true;
+}
+
+void Channel::OnError() {
+ if (delegate_)
+ delegate_->OnChannelError();
+}
+
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698