Chromium Code Reviews| Index: mojo/system/raw_channel.cc |
| diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..7e255c7d229785f50a70d3bca41e5719771968c7 |
| --- /dev/null |
| +++ b/mojo/system/raw_channel.cc |
| @@ -0,0 +1,312 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/system/raw_channel.h" |
| + |
| +#include <string.h> |
| + |
| +#include <algorithm> |
| + |
| +#include "base/bind.h" |
| +#include "base/location.h" |
| +#include "base/logging.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "base/stl_util.h" |
| +#include "mojo/system/message_in_transit.h" |
| + |
| +namespace mojo { |
| +namespace system { |
| + |
| +namespace { |
|
viettrungluu
2014/02/26 23:03:39
Note that this anonymous namespace is redundant. (
yzshen1
2014/02/27 02:00:30
Done.
|
| + |
| +const size_t kReadSize = 4096; |
| + |
| +} // namespace |
| + |
| + |
| +RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
| +} |
| + |
| +RawChannel::ReadBuffer::~ReadBuffer() {} |
| + |
| +char* RawChannel::ReadBuffer::GetPosition() { |
| + DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
| + return &buffer_[0] + num_valid_bytes_; |
| +} |
| + |
| +size_t RawChannel::ReadBuffer::GetBytesToRead() const { |
| + DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
| + return kReadSize; |
| +} |
| + |
| +RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} |
| + |
| +RawChannel::WriteBuffer::~WriteBuffer() { |
| + STLDeleteElements(&message_queue_); |
| +} |
| + |
| +const char* RawChannel::WriteBuffer::GetPosition() const { |
| + if (message_queue_.empty()) |
| + return NULL; |
| + |
| + DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_); |
| + return static_cast<const char*>(message_queue_.front()->main_buffer()) + |
| + offset_; |
| +} |
| + |
| +size_t RawChannel::WriteBuffer::GetBytesToWrite() const { |
| + if (message_queue_.empty()) |
| + return 0; |
| + |
| + DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_); |
| + return message_queue_.front()->main_buffer_size() - offset_; |
| +} |
| + |
| +RawChannel::RawChannel(Delegate* delegate, |
| + base::MessageLoopForIO* message_loop_for_io) |
| + : delegate_(delegate), |
| + read_stopped_(false), |
| + message_loop_for_io_(message_loop_for_io), |
| + write_stopped_(false), |
| + weak_ptr_factory_(this) { |
| +} |
| + |
| +RawChannel::~RawChannel() { |
| + DCHECK(!read_buffer_); |
| + DCHECK(!write_buffer_); |
| + |
| + // No need to take the |write_lock_| here -- if there are still weak pointers |
| + // outstanding, then we're hosed anyway (since we wouldn't be able to |
| + // invalidate them cleanly, since we might not be on the I/O thread). |
| + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| +} |
| + |
| +bool RawChannel::Init() { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + |
| + // No need to take the lock. No one should be using us yet. |
| + DCHECK(!read_buffer_); |
| + read_buffer_.reset(new ReadBuffer); |
| + DCHECK(!write_buffer_); |
| + write_buffer_.reset(new WriteBuffer); |
| + |
| + if (!OnInit()) |
| + return false; |
| + |
| + IOResult result = ScheduleRead(); |
| + return result == IO_PENDING; |
|
viettrungluu
2014/02/26 23:03:39
nit: I think you may as well write this as |return
yzshen1
2014/02/27 02:00:30
Done.
|
| +} |
| + |
| +void RawChannel::Shutdown() { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + |
| + base::AutoLock locker(write_lock_); |
| + |
| + weak_ptr_factory_.InvalidateWeakPtrs(); |
| + |
| + read_stopped_ = true; |
| + write_stopped_ = true; |
| + |
| + OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
| +} |
| + |
| +// Reminder: This must be thread-safe, and takes ownership of |message|. |
|
viettrungluu
2014/02/26 23:03:39
You can get rid of the ", and takes ownership ..."
yzshen1
2014/02/27 02:00:30
Done.
|
| +bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| + base::AutoLock locker(write_lock_); |
| + if (write_stopped_) |
| + return false; |
| + |
| + if (!write_buffer_->message_queue_.empty()) { |
| + write_buffer_->message_queue_.push_back(message.release()); |
| + return true; |
| + } |
| + |
| + write_buffer_->message_queue_.push_front(message.release()); |
| + DCHECK_EQ(write_buffer_->offset_, 0u); |
| + |
| + size_t bytes_written = 0; |
| + IOResult io_result = WriteNoLock(&bytes_written); |
| + if (io_result == IO_PENDING) |
| + return true; |
| + |
| + bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, |
| + bytes_written); |
| + if (!result) { |
| + // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
| + // nested context. |
| + message_loop_for_io_->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannel::CallOnFatalError, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + Delegate::FATAL_ERROR_FAILED_WRITE)); |
| + } |
| + |
| + return result; |
| +} |
| + |
| +RawChannel::ReadBuffer* RawChannel::read_buffer() { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + return read_buffer_.get(); |
| +} |
| + |
| +RawChannel::WriteBuffer* RawChannel::write_buffer() { |
| + write_lock_.AssertAcquired(); |
| + return write_buffer_.get(); |
| +} |
| + |
| +void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
|
viettrungluu
2014/02/26 23:03:39
The implementation of this function could use a co
yzshen1
2014/02/27 02:00:30
Done.
|
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + |
| + if (read_stopped_) { |
| + NOTREACHED(); |
| + return; |
| + } |
| + |
| + IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
| + for (;;) { |
| + if (io_result != IO_SUCCEEDED) { |
| + read_stopped_ = true; |
| + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
| + return; |
| + } |
| + |
| + read_buffer_->num_valid_bytes_ += bytes_read; |
| + |
| + // Dispatch all the messages that we can. |
| + bool did_dispatch_message = false; |
| + // Tracks the offset of the first undispatched message in |read_buffer_|. |
| + // Currently, we copy data to ensure that this is zero at the beginning. |
| + size_t read_buffer_start = 0; |
| + size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
| + size_t message_size; |
| + // Note that we rely on short-circuit evaluation here: |
| + // - |read_buffer_start| may be an invalid index into |
| + // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
| + // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
| + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
| + // next read). |
| + while (remaining_bytes > 0 && |
| + MessageInTransit::GetNextMessageSize( |
| + &read_buffer_->buffer_[read_buffer_start], remaining_bytes, |
| + &message_size) && |
| + remaining_bytes >= message_size) { |
| + MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
| + &read_buffer_->buffer_[read_buffer_start]); |
| + DCHECK_EQ(message.main_buffer_size(), message_size); |
| + |
| + // Dispatch the message. |
| + delegate_->OnReadMessage(message); |
| + if (read_stopped_) { |
| + // |Shutdown()| was called in |OnReadMessage()|. |
| + // TODO(vtl): Add test for this case. |
| + return; |
| + } |
| + did_dispatch_message = true; |
| + |
| + // Update our state. |
| + read_buffer_start += message_size; |
| + remaining_bytes -= message_size; |
| + } |
| + |
| + if (read_buffer_start > 0) { |
| + // Move data back to start. |
| + read_buffer_->num_valid_bytes_ = remaining_bytes; |
| + if (read_buffer_->num_valid_bytes_ > 0) { |
| + memmove(&read_buffer_->buffer_[0], |
| + &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
| + } |
| + read_buffer_start = 0; |
| + } |
| + |
| + if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
| + kReadSize) { |
| + // Use power-of-2 buffer sizes. |
| + // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
| + // maximum message size to whatever extent necessary). |
| + // TODO(vtl): We may often be able to peek at the header and get the real |
| + // required extra space (which may be much bigger than |kReadSize|). |
| + size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); |
| + while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) |
| + new_size *= 2; |
| + |
| + // TODO(vtl): It's suboptimal to zero out the fresh memory. |
| + read_buffer_->buffer_.resize(new_size, 0); |
| + } |
| + |
| + // (1) If we dispatched any messages, stop reading for now (and let the |
| + // message loop do its thing for another round). |
| + // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
| + // a single message. Risks: slower, more complex if we want to avoid lots of |
| + // copying. ii. Keep reading until there's no more data and dispatch all the |
| + // messages we can. Risks: starvation of other users of the message loop.) |
| + // (2) If we didn't max out |kReadSize|, stop reading for now. |
| + bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
| + bytes_read = 0; |
| + io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
| + |
| + if (io_result == IO_PENDING) |
|
viettrungluu
2014/02/26 23:03:39
do { ... } while (io_result != IO_PENDING);
yzshen1
2014/02/27 02:00:30
Done.
|
| + return; |
| + } |
| +} |
| + |
| +void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + |
| + bool did_fail = false; |
| + { |
| + base::AutoLock locker(write_lock_); |
| + DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
| + |
| + if (write_stopped_) { |
| + NOTREACHED(); |
| + return; |
| + } |
| + |
| + did_fail = !OnWriteCompletedNoLock(result, bytes_written); |
| + } |
| + |
| + if (did_fail) |
| + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
| +} |
| + |
| +void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
| + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| + // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
| + delegate_->OnFatalError(fatal_error); |
| +} |
| + |
| +bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { |
| + write_lock_.AssertAcquired(); |
| + |
| + DCHECK(!write_stopped_); |
| + DCHECK(!write_buffer_->message_queue_.empty()); |
| + |
| + if (result) { |
| + if (bytes_written < write_buffer_->GetBytesToWrite()) { |
| + // Partial (or no) write. |
| + write_buffer_->offset_ += bytes_written; |
| + } else { |
| + // Complete write. |
| + DCHECK_EQ(bytes_written, write_buffer_->GetBytesToWrite()); |
| + delete write_buffer_->message_queue_.front(); |
| + write_buffer_->message_queue_.pop_front(); |
| + write_buffer_->offset_ = 0; |
| + } |
| + |
| + if (write_buffer_->message_queue_.empty()) |
| + return true; |
| + |
| + // Schedule the next write. |
| + if (ScheduleWriteNoLock() == IO_PENDING) |
| + return true; |
| + } |
| + |
| + write_stopped_ = true; |
| + STLDeleteElements(&write_buffer_->message_queue_); |
|
viettrungluu
2014/02/26 23:03:39
Can't you just do |write_buffer_.reset()| here?
yzshen1
2014/02/27 02:00:30
I was thinking maybe it is good to always give the
|
| + write_buffer_->offset_ = 0; |
| + return false; |
|
viettrungluu
2014/02/26 23:03:39
More importantly, if it's the subclass that calls
yzshen1
2014/02/27 02:00:30
OnWriteCompletedNoLock() is a private helper metho
|
| +} |
| + |
| +} // namespace system |
| +} // namespace mojo |