| Index: mojo/edk/system/channel.cc
|
| diff --git a/mojo/edk/system/channel.cc b/mojo/edk/system/channel.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fd6370d2c89bc9cc9323b323f57c0c9684d297a2
|
| --- /dev/null
|
| +++ b/mojo/edk/system/channel.cc
|
| @@ -0,0 +1,339 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/edk/system/channel.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include <algorithm>
|
| +#include <limits>
|
| +
|
| +#include "base/macros.h"
|
| +#include "base/memory/aligned_memory.h"
|
| +#include "mojo/edk/embedder/platform_handle.h"
|
| +
|
| +namespace mojo {
|
| +namespace edk {
|
| +
|
| +namespace {
|
| +
|
| +static_assert(sizeof(Channel::Message::Header) % kChannelMessageAlignment == 0,
|
| + "Invalid Header size.");
|
| +
|
| +} // namespace
|
| +
|
| +const size_t kReadBufferSize = 4096;
|
| +const size_t kMaxUnusedReadBufferCapacity = 256 * 1024;
|
| +const size_t kMaxChannelMessageSize = 256 * 1024 * 1024;
|
| +
|
| +Channel::Message::Message(size_t payload_size, size_t num_handles) {
|
| + size_ = payload_size + sizeof(Header);
|
| +#if defined(OS_WIN)
|
| + // On Windows we serialize platform handles directly into the message buffer.
|
| + size_ += num_handles * sizeof(PlatformHandle);
|
| +#endif
|
| +
|
| + data_ = static_cast<char*>(base::AlignedAlloc(size_,
|
| + kChannelMessageAlignment));
|
| + header_ = reinterpret_cast<Header*>(data_);
|
| +
|
| + DCHECK_LE(size_, std::numeric_limits<uint32_t>::max());
|
| + header_->num_bytes = static_cast<uint32_t>(size_);
|
| +
|
| + DCHECK_LE(num_handles, std::numeric_limits<uint16_t>::max());
|
| + header_->num_handles = static_cast<uint16_t>(num_handles);
|
| +
|
| + header_->padding = 0;
|
| +
|
| +#if defined(OS_WIN)
|
| + if (num_handles > 0) {
|
| + handles_ = reinterpret_cast<PlatformHandle*>(
|
| + data_ + sizeof(Header) + payload_size);
|
| + // Initialize all handles to invalid values.
|
| + for (size_t i = 0; i < header_->num_handles; ++i)
|
| + handles()[i] = PlatformHandle();
|
| + }
|
| +#endif
|
| +}
|
| +
|
| +Channel::Message::~Message() {
|
| +#if defined(OS_WIN)
|
| + // On POSIX the ScopedPlatformHandleVectorPtr will do this for us.
|
| + for (size_t i = 0; i < header_->num_handles; ++i)
|
| + handles()[i].CloseIfNecessary();
|
| +#endif
|
| + base::AlignedFree(data_);
|
| +}
|
| +
|
| +// static
|
| +Channel::MessagePtr Channel::Message::Deserialize(const void* data,
|
| + size_t data_num_bytes) {
|
| +#if !defined(OS_WIN)
|
| + // We only serialize messages into other messages when performing message
|
| + // relay on Windows.
|
| + NOTREACHED();
|
| +#endif
|
| + if (data_num_bytes < sizeof(Header))
|
| + return nullptr;
|
| +
|
| + const Header* header = reinterpret_cast<const Header*>(data);
|
| + if (header->num_bytes != data_num_bytes) {
|
| + DLOG(ERROR) << "Decoding invalid message: " << header->num_bytes
|
| + << " != " << data_num_bytes;
|
| + return nullptr;
|
| + }
|
| +
|
| + uint32_t handles_size = header->num_handles * sizeof(PlatformHandle);
|
| + if (data_num_bytes < sizeof(Header) + handles_size) {
|
| + DLOG(ERROR) << "Decoding invalid message:" << data_num_bytes
|
| + << " < " << (sizeof(Header) + handles_size);
|
| + return nullptr;
|
| + }
|
| +
|
| + DCHECK_LE(handles_size, data_num_bytes - sizeof(Header));
|
| +
|
| + MessagePtr message(new Message(data_num_bytes - sizeof(Header) - handles_size,
|
| + header->num_handles));
|
| +
|
| + DCHECK_EQ(message->data_num_bytes(), data_num_bytes);
|
| +
|
| + // Copy all bytes, including the serialized handles.
|
| + memcpy(message->mutable_payload(),
|
| + static_cast<const char*>(data) + sizeof(Header),
|
| + data_num_bytes - sizeof(Header));
|
| +
|
| + return message;
|
| +}
|
| +
|
| +size_t Channel::Message::payload_size() const {
|
| +#if defined(OS_WIN)
|
| + return size_ - sizeof(Header) -
|
| + sizeof(PlatformHandle) * header_->num_handles;
|
| +#else
|
| + return header_->num_bytes - sizeof(Header);
|
| +#endif
|
| +}
|
| +
|
| +PlatformHandle* Channel::Message::handles() {
|
| + if (header_->num_handles == 0)
|
| + return nullptr;
|
| +#if defined(OS_WIN)
|
| + return reinterpret_cast<PlatformHandle*>(static_cast<char*>(data_) +
|
| + sizeof(Header) + payload_size());
|
| +#else
|
| + CHECK(handle_vector_);
|
| + return handle_vector_->data();
|
| +#endif
|
| +}
|
| +
|
| +void Channel::Message::SetHandles(ScopedPlatformHandleVectorPtr new_handles) {
|
| + if (header_->num_handles == 0) {
|
| + CHECK(!new_handles || new_handles->size() == 0);
|
| + return;
|
| + }
|
| +
|
| + CHECK(new_handles && new_handles->size() == header_->num_handles);
|
| +#if defined(OS_WIN)
|
| + memcpy(handles(), new_handles->data(),
|
| + sizeof(PlatformHandle) * header_->num_handles);
|
| + new_handles->clear();
|
| +#else
|
| + std::swap(handle_vector_, new_handles);
|
| +#endif
|
| +}
|
| +
|
| +ScopedPlatformHandleVectorPtr Channel::Message::TakeHandles() {
|
| +#if defined(OS_WIN)
|
| + if (header_->num_handles == 0)
|
| + return ScopedPlatformHandleVectorPtr();
|
| + ScopedPlatformHandleVectorPtr moved_handles(
|
| + new PlatformHandleVector(header_->num_handles));
|
| + for (size_t i = 0; i < header_->num_handles; ++i)
|
| + std::swap(moved_handles->at(i), handles()[i]);
|
| + return moved_handles;
|
| +#else
|
| + return std::move(handle_vector_);
|
| +#endif
|
| +}
|
| +
|
| +// Helper class for managing a Channel's read buffer allocations. This maintains
|
| +// a single contiguous buffer with the layout:
|
| +//
|
| +// [discarded bytes][occupied bytes][unoccupied bytes]
|
| +//
|
| +// The Reserve() method ensures that a certain capacity of unoccupied bytes are
|
| +// available. It does not claim that capacity and only allocates new capacity
|
| +// when strictly necessary.
|
| +//
|
| +// Claim() marks unoccupied bytes as occupied.
|
| +//
|
| +// Discard() marks occupied bytes as discarded, signifying that their contents
|
| +// can be forgotten or overwritten.
|
| +//
|
| +// The most common Channel behavior in practice should result in very few
|
| +// allocations and copies, as memory is claimed and discarded shortly after
|
| +// being reserved, and future reservations will immediately reuse discarded
|
| +// memory.
|
| +class Channel::ReadBuffer {
|
| + public:
|
| + ReadBuffer() {
|
| + size_ = kReadBufferSize;
|
| + data_ = static_cast<char*>(base::AlignedAlloc(size_,
|
| + kChannelMessageAlignment));
|
| + }
|
| +
|
| + ~ReadBuffer() {
|
| + DCHECK(data_);
|
| + base::AlignedFree(data_);
|
| + }
|
| +
|
| + const char* occupied_bytes() const { return data_ + num_discarded_bytes_; }
|
| +
|
| + size_t num_occupied_bytes() const {
|
| + return num_occupied_bytes_ - num_discarded_bytes_;
|
| + }
|
| +
|
| + // Ensures the ReadBuffer has enough contiguous space allocated to hold
|
| + // |num_bytes| more bytes; returns the address of the first available byte.
|
| + char* Reserve(size_t num_bytes) {
|
| + if (num_occupied_bytes_ + num_bytes > size_) {
|
| + size_ = std::max(size_ * 2, num_occupied_bytes_ + num_bytes);
|
| + void* new_data = base::AlignedAlloc(size_, kChannelMessageAlignment);
|
| + memcpy(new_data, data_, num_occupied_bytes_);
|
| + base::AlignedFree(data_);
|
| + data_ = static_cast<char*>(new_data);
|
| + }
|
| +
|
| + return data_ + num_occupied_bytes_;
|
| + }
|
| +
|
| + // Marks the first |num_bytes| unoccupied bytes as occupied.
|
| + void Claim(size_t num_bytes) {
|
| + DCHECK_LE(num_occupied_bytes_ + num_bytes, size_);
|
| + num_occupied_bytes_ += num_bytes;
|
| + }
|
| +
|
| + // Marks the first |num_bytes| occupied bytes as discarded. This may result in
|
| + // shrinkage of the internal buffer, and it is not safe to assume the result
|
| + // of a previous Reserve() call is still valid after this.
|
| + void Discard(size_t num_bytes) {
|
| + DCHECK_LE(num_discarded_bytes_ + num_bytes, num_occupied_bytes_);
|
| + num_discarded_bytes_ += num_bytes;
|
| +
|
| + if (num_discarded_bytes_ == num_occupied_bytes_) {
|
| + // We can just reuse the buffer from the beginning in this common case.
|
| + num_discarded_bytes_ = 0;
|
| + num_occupied_bytes_ = 0;
|
| + }
|
| +
|
| + if (num_discarded_bytes_ > kMaxUnusedReadBufferCapacity) {
|
| + // In the uncommon case that we have a lot of discarded data at the
|
| + // front of the buffer, simply move remaining data to a smaller buffer.
|
| + size_t num_preserved_bytes = num_occupied_bytes_ - num_discarded_bytes_;
|
| + size_ = std::max(num_preserved_bytes, kReadBufferSize);
|
| + char* new_data = static_cast<char*>(
|
| + base::AlignedAlloc(size_, kChannelMessageAlignment));
|
| + memcpy(new_data, data_ + num_discarded_bytes_, num_preserved_bytes);
|
| + base::AlignedFree(data_);
|
| + data_ = new_data;
|
| + num_discarded_bytes_ = 0;
|
| + num_occupied_bytes_ = num_preserved_bytes;
|
| + }
|
| +
|
| + // TODO: we should also adaptively shrink the buffer in case of the
|
| + // occasional abnormally large read.
|
| + }
|
| +
|
| + private:
|
| + char* data_ = nullptr;
|
| +
|
| + // The total size of the allocated buffer.
|
| + size_t size_ = 0;
|
| +
|
| + // The number of discarded bytes at the beginning of the allocated buffer.
|
| + size_t num_discarded_bytes_ = 0;
|
| +
|
| + // The total number of occupied bytes, including discarded bytes.
|
| + size_t num_occupied_bytes_ = 0;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ReadBuffer);
|
| +};
|
| +
|
| +Channel::Channel(Delegate* delegate)
|
| + : delegate_(delegate), read_buffer_(new ReadBuffer) {
|
| +}
|
| +
|
| +Channel::~Channel() {
|
| +}
|
| +
|
| +void Channel::ShutDown() {
|
| + delegate_ = nullptr;
|
| + ShutDownImpl();
|
| +}
|
| +
|
| +char* Channel::GetReadBuffer(size_t *buffer_capacity) {
|
| + DCHECK(read_buffer_);
|
| + size_t required_capacity = *buffer_capacity;
|
| + if (!required_capacity)
|
| + required_capacity = kReadBufferSize;
|
| +
|
| + *buffer_capacity = required_capacity;
|
| + return read_buffer_->Reserve(required_capacity);
|
| +}
|
| +
|
| +bool Channel::OnReadComplete(size_t bytes_read, size_t *next_read_size_hint) {
|
| + bool did_dispatch_message = false;
|
| + read_buffer_->Claim(bytes_read);
|
| + while (read_buffer_->num_occupied_bytes() >= sizeof(Message::Header)) {
|
| + // We have at least enough data available for a MessageHeader.
|
| + const Message::Header* header = reinterpret_cast<const Message::Header*>(
|
| + read_buffer_->occupied_bytes());
|
| + if (header->num_bytes < sizeof(Message::Header) ||
|
| + header->num_bytes > kMaxChannelMessageSize) {
|
| + LOG(ERROR) << "Invalid message size: " << header->num_bytes;
|
| + return false;
|
| + }
|
| +
|
| + if (read_buffer_->num_occupied_bytes() < header->num_bytes) {
|
| + // Not enough data available to read the full message. Hint to the
|
| + // implementation that it should try reading the full size of the message.
|
| + *next_read_size_hint =
|
| + header->num_bytes - read_buffer_->num_occupied_bytes();
|
| + return true;
|
| + }
|
| +
|
| + size_t payload_size = header->num_bytes - sizeof(Message::Header);
|
| + void* payload = payload_size ? const_cast<Message::Header*>(&header[1])
|
| + : nullptr;
|
| +
|
| + ScopedPlatformHandleVectorPtr handles;
|
| + if (header->num_handles > 0) {
|
| + handles = GetReadPlatformHandles(header->num_handles,
|
| + &payload, &payload_size);
|
| + if (!handles) {
|
| + // Not enough handles available for this message.
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // We've got a complete message! Dispatch it and try another.
|
| + if (delegate_) {
|
| + delegate_->OnChannelMessage(payload, payload_size, std::move(handles));
|
| + did_dispatch_message = true;
|
| + }
|
| +
|
| + read_buffer_->Discard(header->num_bytes);
|
| + }
|
| +
|
| + *next_read_size_hint = did_dispatch_message ? 0 : kReadBufferSize;
|
| + return true;
|
| +}
|
| +
|
| +void Channel::OnError() {
|
| + if (delegate_)
|
| + delegate_->OnChannelError();
|
| +}
|
| +
|
| +} // namespace edk
|
| +} // namespace mojo
|
|
|