| Index: mojo/system/raw_channel.cc
|
| diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ae25e1f8fe1adde106a0af78f10a8c41a6c3d7a5
|
| --- /dev/null
|
| +++ b/mojo/system/raw_channel.cc
|
| @@ -0,0 +1,309 @@
|
| +// Copyright 2014 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/system/raw_channel.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include <algorithm>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/stl_util.h"
|
| +#include "mojo/system/message_in_transit.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +namespace {
|
| +
|
| +const size_t kReadSize = 4096;
|
| +
|
| +} // namespace
|
| +
|
| +RawChannel::IOBufferPreserver::IOBufferPreserver(
|
| + scoped_ptr<std::vector<char> > read_buffer,
|
| + scoped_ptr<MessageInTransit> write_buffer)
|
| + : read_buffer_(read_buffer.Pass()),
|
| + write_buffer_(write_buffer.Pass()) {
|
| +}
|
| +
|
| +RawChannel::IOBufferPreserver::~IOBufferPreserver() {
|
| +}
|
| +
|
| +RawChannel::RawChannel(Delegate* delegate,
|
| + base::MessageLoopForIO* message_loop_for_io)
|
| + : delegate_(delegate),
|
| + read_stopped_(false),
|
| + read_buffer_(kReadSize),
|
| + read_buffer_num_valid_bytes_(0),
|
| + message_loop_for_io_(message_loop_for_io),
|
| + write_stopped_(false),
|
| + write_message_offset_(0),
|
| + weak_ptr_factory_(this) {
|
| +}
|
| +
|
| +RawChannel::~RawChannel() {
|
| + DCHECK(write_message_queue_.empty());
|
| +
|
| + // No need to take the |write_lock_| here -- if there are still weak pointers
|
| + // outstanding, then we're hosed anyway (since we wouldn't be able to
|
| + // invalidate them cleanly, since we might not be on the I/O thread).
|
| + DCHECK(!weak_ptr_factory_.HasWeakPtrs());
|
| +}
|
| +
|
| +bool RawChannel::Init() {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + // No need to take the lock. No one should be using us yet.
|
| + DCHECK(write_message_queue_.empty());
|
| + DCHECK_EQ(kReadSize, read_buffer_.size());
|
| + DCHECK_EQ(0u, read_buffer_num_valid_bytes_);
|
| +
|
| + if (!OnInit())
|
| + return false;
|
| +
|
| + IOResult result = Read(true, &read_buffer_[0], kReadSize, NULL);
|
| + return result == IO_PENDING;
|
| +}
|
| +
|
| +void RawChannel::Shutdown() {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + base::AutoLock locker(write_lock_);
|
| +
|
| + weak_ptr_factory_.InvalidateWeakPtrs();
|
| +
|
| + scoped_ptr<std::vector<char> > preserved_read_buffer;
|
| + if (!read_stopped_) {
|
| + read_stopped_ = true;
|
| + preserved_read_buffer.reset(new std::vector<char>());
|
| + preserved_read_buffer->swap(read_buffer_);
|
| + }
|
| +
|
| + scoped_ptr<MessageInTransit> preserved_write_buffer;
|
| + if (!write_stopped_) {
|
| + write_stopped_ = true;
|
| + if (!write_message_queue_.empty()) {
|
| + preserved_write_buffer.reset(write_message_queue_.front());
|
| + write_message_queue_.pop_front();
|
| + }
|
| + STLDeleteElements(&write_message_queue_);
|
| + } else {
|
| + DCHECK(write_message_queue_.empty());
|
| + }
|
| +
|
| + scoped_ptr<IOBufferPreserver> preserver(new IOBufferPreserver(
|
| + preserved_read_buffer.Pass(), preserved_write_buffer.Pass()));
|
| +
|
| + OnShutdownNoLock(preserver.Pass());
|
| +}
|
| +
|
| +// Reminder: This must be thread-safe, and takes ownership of |message|.
|
| +bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
|
| + base::AutoLock locker(write_lock_);
|
| + if (write_stopped_)
|
| + return false;
|
| +
|
| + if (!write_message_queue_.empty()) {
|
| + write_message_queue_.push_back(message.release());
|
| + return true;
|
| + }
|
| +
|
| + write_message_queue_.push_front(message.release());
|
| + DCHECK_EQ(write_message_offset_, 0u);
|
| +
|
| + MessageInTransit* front_message = write_message_queue_.front();
|
| + size_t bytes_written = 0;
|
| + IOResult io_result = WriteNoLock(
|
| + false,
|
| + static_cast<const char*>(front_message->main_buffer()),
|
| + front_message->main_buffer_size(),
|
| + &bytes_written);
|
| +
|
| + if (io_result == IO_PENDING)
|
| + return true;
|
| +
|
| + bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
|
| + bytes_written);
|
| + if (!result) {
|
| + // Even if we're on the I/O thread, don't call |OnFatalError()| in the
|
| + // nested context.
|
| + message_loop_for_io_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&RawChannel::CallOnFatalError,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + Delegate::FATAL_ERROR_FAILED_WRITE));
|
| + }
|
| +
|
| + return result;
|
| +}
|
| +
|
| +void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + if (read_stopped_) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
| +
|
| + IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
|
| + for (;;) {
|
| + if (io_result != IO_SUCCEEDED) {
|
| + read_stopped_ = true;
|
| + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
|
| + return;
|
| + }
|
| +
|
| + read_buffer_num_valid_bytes_ += bytes_read;
|
| +
|
| + // Dispatch all the messages that we can.
|
| + bool did_dispatch_message = false;
|
| + // Tracks the offset of the first undispatched message in |read_buffer_|.
|
| + // Currently, we copy data to ensure that this is zero at the beginning.
|
| + size_t read_buffer_start = 0;
|
| + size_t message_size;
|
| + // Note that we rely on short-circuit evaluation here:
|
| + // - |read_buffer_start| may be an invalid index into |read_buffer_| if
|
| + // |read_buffer_num_valid_bytes_| is zero.
|
| + // - |message_size| is only valid if |GetNextMessageSize()| returns true.
|
| + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
|
| + // next read).
|
| + while (read_buffer_num_valid_bytes_ > 0 &&
|
| + MessageInTransit::GetNextMessageSize(
|
| + &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
|
| + &message_size) &&
|
| + read_buffer_num_valid_bytes_ >= message_size) {
|
| + MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
|
| + &read_buffer_[read_buffer_start]);
|
| + DCHECK_EQ(message.main_buffer_size(), message_size);
|
| +
|
| + // Dispatch the message.
|
| + delegate_->OnReadMessage(message);
|
| + if (read_stopped_) {
|
| + // |Shutdown()| was called in |OnReadMessage()|.
|
| + // TODO(vtl): Add test for this case.
|
| + return;
|
| + }
|
| + did_dispatch_message = true;
|
| +
|
| + // Update our state.
|
| + read_buffer_start += message_size;
|
| + read_buffer_num_valid_bytes_ -= message_size;
|
| + }
|
| +
|
| + // (1) If we dispatched any messages, stop reading for now (and let the
|
| + // message loop do its thing for another round).
|
| + // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
|
| + // a single message. Risks: slower, more complex if we want to avoid lots of
|
| + // copying. ii. Keep reading until there's no more data and dispatch all the
|
| + // messages we can. Risks: starvation of other users of the message loop.)
|
| + // (2) If we didn't max out |kReadSize|, stop reading for now.
|
| + bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
|
| +
|
| + if (read_buffer_start > 0) {
|
| + // Move data back to start.
|
| + if (read_buffer_num_valid_bytes_ > 0) {
|
| + memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
|
| + read_buffer_num_valid_bytes_);
|
| + }
|
| + read_buffer_start = 0;
|
| + }
|
| +
|
| + if (read_buffer_.size() - read_buffer_num_valid_bytes_ < kReadSize) {
|
| + // Use power-of-2 buffer sizes.
|
| + // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
|
| + // maximum message size to whatever extent necessary).
|
| + // TODO(vtl): We may often be able to peek at the header and get the real
|
| + // required extra space (which may be much bigger than |kReadSize|).
|
| + size_t new_size = std::max(read_buffer_.size(), kReadSize);
|
| + while (new_size < read_buffer_num_valid_bytes_ + kReadSize)
|
| + new_size *= 2;
|
| +
|
| + // TODO(vtl): It's suboptimal to zero out the fresh memory.
|
| + read_buffer_.resize(new_size, 0);
|
| + }
|
| +
|
| + io_result = Read(schedule_for_later,
|
| + &read_buffer_[read_buffer_num_valid_bytes_],
|
| + kReadSize,
|
| + &bytes_read);
|
| + if (io_result == IO_PENDING)
|
| + return;
|
| + }
|
| +}
|
| +
|
| +void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| +
|
| + bool did_fail = false;
|
| + {
|
| + base::AutoLock locker(write_lock_);
|
| + DCHECK_EQ(write_stopped_, write_message_queue_.empty());
|
| +
|
| + if (write_stopped_) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
| +
|
| + did_fail = !OnWriteCompletedNoLock(result, bytes_written);
|
| + }
|
| +
|
| + if (did_fail)
|
| + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
|
| +}
|
| +
|
| +void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
|
| + // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
|
| + delegate_->OnFatalError(fatal_error);
|
| +}
|
| +
|
| +bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
|
| + write_lock_.AssertAcquired();
|
| +
|
| + DCHECK(!write_stopped_);
|
| + DCHECK(!write_message_queue_.empty());
|
| +
|
| + if (result) {
|
| + MessageInTransit* message = write_message_queue_.front();
|
| + DCHECK_LT(write_message_offset_, message->main_buffer_size());
|
| + size_t bytes_to_write = message->main_buffer_size() - write_message_offset_;
|
| +
|
| + if (bytes_written < bytes_to_write) {
|
| + // Partial (or no) write.
|
| + write_message_offset_ += bytes_written;
|
| + } else {
|
| + // Complete write.
|
| + DCHECK_EQ(bytes_written, bytes_to_write);
|
| + write_message_queue_.pop_front();
|
| + delete message;
|
| + write_message_offset_ = 0;
|
| + }
|
| +
|
| + if (write_message_queue_.empty())
|
| + return true;
|
| +
|
| + // Schedule the next write.
|
| + message = write_message_queue_.front();
|
| + bytes_to_write = message->main_buffer_size() - write_message_offset_;
|
| +
|
| + if (WriteNoLock(true,
|
| + static_cast<const char*>(message->main_buffer()) +
|
| + write_message_offset_,
|
| + bytes_to_write,
|
| + NULL) == IO_PENDING) {
|
| + return true;
|
| + }
|
| + }
|
| +
|
| + write_stopped_ = true;
|
| + STLDeleteElements(&write_message_queue_);
|
| + return false;
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|