| Index: mojo/system/raw_channel.cc
|
| diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5d8da3bd631fbe2c5f33cba3273e476a29a0ea98
|
| --- /dev/null
|
| +++ b/mojo/system/raw_channel.cc
|
| @@ -0,0 +1,344 @@
|
| +// Copyright 2014 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/system/raw_channel.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include <algorithm>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/stl_util.h"
|
| +#include "mojo/system/message_in_transit.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +const size_t kReadSize = 4096;
|
| +
|
| +RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
|
| +}
|
| +
|
| +RawChannel::ReadBuffer::~ReadBuffer() {}
|
| +
|
| +void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
|
| + DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
|
| + *addr = &buffer_[0] + num_valid_bytes_;
|
| + *size = kReadSize;
|
| +}
|
| +
|
| +RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
|
| +
|
| +RawChannel::WriteBuffer::~WriteBuffer() {
|
| + STLDeleteElements(&message_queue_);
|
| +}
|
| +
|
| +void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
|
| + buffers->clear();
|
| +
|
| + size_t bytes_to_write = GetTotalBytesToWrite();
|
| + if (bytes_to_write == 0)
|
| + return;
|
| +
|
| + MessageInTransit* message = message_queue_.front();
|
| + if (!message->secondary_buffer_size()) {
|
| + // Only write from the main buffer.
|
| + DCHECK_LT(offset_, message->main_buffer_size());
|
| + DCHECK_LE(bytes_to_write, message->main_buffer_size());
|
| + Buffer buffer = {
|
| + static_cast<const char*>(message->main_buffer()) + offset_,
|
| + bytes_to_write};
|
| + buffers->push_back(buffer);
|
| + return;
|
| + }
|
| +
|
| + if (offset_ >= message->main_buffer_size()) {
|
| + // Only write from the secondary buffer.
|
| + DCHECK_LT(offset_ - message->main_buffer_size(),
|
| + message->secondary_buffer_size());
|
| + DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
|
| + Buffer buffer = {
|
| + static_cast<const char*>(message->secondary_buffer()) +
|
| + (offset_ - message->main_buffer_size()),
|
| + bytes_to_write};
|
| + buffers->push_back(buffer);
|
| + return;
|
| + }
|
| +
|
| + // Write from both buffers.
|
| + DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
|
| + message->secondary_buffer_size());
|
| + Buffer buffer1 = {
|
| + static_cast<const char*>(message->main_buffer()) + offset_,
|
| + message->main_buffer_size() - offset_};
|
| + buffers->push_back(buffer1);
|
| + Buffer buffer2 = {
|
| + static_cast<const char*>(message->secondary_buffer()),
|
| + message->secondary_buffer_size()};
|
| + buffers->push_back(buffer2);
|
| +}
|
| +
|
| +size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
|
| + if (message_queue_.empty())
|
| + return 0;
|
| +
|
| + MessageInTransit* message = message_queue_.front();
|
| + DCHECK_LT(offset_, message->total_size());
|
| + return message->total_size() - offset_;
|
| +}
|
| +
|
| +RawChannel::RawChannel(Delegate* delegate,
|
| + base::MessageLoopForIO* message_loop_for_io)
|
| + : delegate_(delegate),
|
| + message_loop_for_io_(message_loop_for_io),
|
| + read_stopped_(false),
|
| + write_stopped_(false),
|
| + weak_ptr_factory_(this) {
|
| +}
|
| +
|
| +RawChannel::~RawChannel() {
|
| + DCHECK(!read_buffer_);
|
| + DCHECK(!write_buffer_);
|
| +
|
| + // No need to take the |write_lock_| here -- if there are still weak pointers
|
| + // outstanding, then we're hosed anyway (since we wouldn't be able to
|
| + // invalidate them cleanly, since we might not be on the I/O thread).
|
| + DCHECK(!weak_ptr_factory_.HasWeakPtrs());
|
| +}
|
| +
|
| +bool RawChannel::Init() {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + // No need to take the lock. No one should be using us yet.
|
| + DCHECK(!read_buffer_);
|
| + read_buffer_.reset(new ReadBuffer);
|
| + DCHECK(!write_buffer_);
|
| + write_buffer_.reset(new WriteBuffer);
|
| +
|
| + if (!OnInit())
|
| + return false;
|
| +
|
| + return ScheduleRead() == IO_PENDING;
|
| +}
|
| +
|
| +void RawChannel::Shutdown() {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + base::AutoLock locker(write_lock_);
|
| +
|
| + weak_ptr_factory_.InvalidateWeakPtrs();
|
| +
|
| + read_stopped_ = true;
|
| + write_stopped_ = true;
|
| +
|
| + OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
|
| +}
|
| +
|
| +// Reminder: This must be thread-safe.
|
| +bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
|
| + base::AutoLock locker(write_lock_);
|
| + if (write_stopped_)
|
| + return false;
|
| +
|
| + if (!write_buffer_->message_queue_.empty()) {
|
| + write_buffer_->message_queue_.push_back(message.release());
|
| + return true;
|
| + }
|
| +
|
| + write_buffer_->message_queue_.push_front(message.release());
|
| + DCHECK_EQ(write_buffer_->offset_, 0u);
|
| +
|
| + size_t bytes_written = 0;
|
| + IOResult io_result = WriteNoLock(&bytes_written);
|
| + if (io_result == IO_PENDING)
|
| + return true;
|
| +
|
| + bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
|
| + bytes_written);
|
| + if (!result) {
|
| + // Even if we're on the I/O thread, don't call |OnFatalError()| in the
|
| + // nested context.
|
| + message_loop_for_io_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&RawChannel::CallOnFatalError,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + Delegate::FATAL_ERROR_FAILED_WRITE));
|
| + }
|
| +
|
| + return result;
|
| +}
|
| +
|
| +RawChannel::ReadBuffer* RawChannel::read_buffer() {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| + return read_buffer_.get();
|
| +}
|
| +
|
| +RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
|
| + write_lock_.AssertAcquired();
|
| + return write_buffer_.get();
|
| +}
|
| +
|
| +void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + if (read_stopped_) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
| +
|
| + IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
|
| +
|
| + // Keep reading data in a loop, and dispatches messages if enough data is
|
| + // received. Exit the loop if any of the following happens:
|
| + // - one or more messages were dispatched;
|
| + // - the last read failed, was a partial read or would block;
|
| + // - |Shutdown()| was called.
|
| + do {
|
| + if (io_result != IO_SUCCEEDED) {
|
| + read_stopped_ = true;
|
| + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
|
| + return;
|
| + }
|
| +
|
| + read_buffer_->num_valid_bytes_ += bytes_read;
|
| +
|
| + // Dispatch all the messages that we can.
|
| + bool did_dispatch_message = false;
|
| + // Tracks the offset of the first undispatched message in |read_buffer_|.
|
| + // Currently, we copy data to ensure that this is zero at the beginning.
|
| + size_t read_buffer_start = 0;
|
| + size_t remaining_bytes = read_buffer_->num_valid_bytes_;
|
| + size_t message_size;
|
| + // Note that we rely on short-circuit evaluation here:
|
| + // - |read_buffer_start| may be an invalid index into
|
| + // |read_buffer_->buffer_| if |remaining_bytes| is zero.
|
| + // - |message_size| is only valid if |GetNextMessageSize()| returns true.
|
| + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
|
| + // next read).
|
| + while (remaining_bytes > 0 &&
|
| + MessageInTransit::GetNextMessageSize(
|
| + &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
|
| + &message_size) &&
|
| + remaining_bytes >= message_size) {
|
| + // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
|
| + // some sort of "view" abstraction.
|
| + MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
|
| + &read_buffer_->buffer_[read_buffer_start]);
|
| + DCHECK_EQ(message.total_size(), message_size);
|
| +
|
| + // Dispatch the message.
|
| + delegate_->OnReadMessage(message);
|
| + if (read_stopped_) {
|
| + // |Shutdown()| was called in |OnReadMessage()|.
|
| + // TODO(vtl): Add test for this case.
|
| + return;
|
| + }
|
| + did_dispatch_message = true;
|
| +
|
| + // Update our state.
|
| + read_buffer_start += message_size;
|
| + remaining_bytes -= message_size;
|
| + }
|
| +
|
| + if (read_buffer_start > 0) {
|
| + // Move data back to start.
|
| + read_buffer_->num_valid_bytes_ = remaining_bytes;
|
| + if (read_buffer_->num_valid_bytes_ > 0) {
|
| + memmove(&read_buffer_->buffer_[0],
|
| + &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
|
| + }
|
| + read_buffer_start = 0;
|
| + }
|
| +
|
| + if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
|
| + kReadSize) {
|
| + // Use power-of-2 buffer sizes.
|
| + // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
|
| + // maximum message size to whatever extent necessary).
|
| + // TODO(vtl): We may often be able to peek at the header and get the real
|
| + // required extra space (which may be much bigger than |kReadSize|).
|
| + size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
|
| + while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
|
| + new_size *= 2;
|
| +
|
| + // TODO(vtl): It's suboptimal to zero out the fresh memory.
|
| + read_buffer_->buffer_.resize(new_size, 0);
|
| + }
|
| +
|
| + // (1) If we dispatched any messages, stop reading for now (and let the
|
| + // message loop do its thing for another round).
|
| + // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
|
| + // a single message. Risks: slower, more complex if we want to avoid lots of
|
| + // copying. ii. Keep reading until there's no more data and dispatch all the
|
| + // messages we can. Risks: starvation of other users of the message loop.)
|
| + // (2) If we didn't max out |kReadSize|, stop reading for now.
|
| + bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
|
| + bytes_read = 0;
|
| + io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
|
| + } while (io_result != IO_PENDING);
|
| +}
|
| +
|
| +void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + bool did_fail = false;
|
| + {
|
| + base::AutoLock locker(write_lock_);
|
| + DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
|
| +
|
| + if (write_stopped_) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
| +
|
| + did_fail = !OnWriteCompletedNoLock(result, bytes_written);
|
| + }
|
| +
|
| + if (did_fail)
|
| + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
|
| +}
|
| +
|
| +void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| + // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
|
| + delegate_->OnFatalError(fatal_error);
|
| +}
|
| +
|
| +bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
|
| + write_lock_.AssertAcquired();
|
| +
|
| + DCHECK(!write_stopped_);
|
| + DCHECK(!write_buffer_->message_queue_.empty());
|
| +
|
| + if (result) {
|
| + if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
|
| + // Partial (or no) write.
|
| + write_buffer_->offset_ += bytes_written;
|
| + } else {
|
| + // Complete write.
|
| + DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
|
| + delete write_buffer_->message_queue_.front();
|
| + write_buffer_->message_queue_.pop_front();
|
| + write_buffer_->offset_ = 0;
|
| + }
|
| +
|
| + if (write_buffer_->message_queue_.empty())
|
| + return true;
|
| +
|
| + // Schedule the next write.
|
| + if (ScheduleWriteNoLock() == IO_PENDING)
|
| + return true;
|
| + }
|
| +
|
| + write_stopped_ = true;
|
| + STLDeleteElements(&write_buffer_->message_queue_);
|
| + write_buffer_->offset_ = 0;
|
| + return false;
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|