Index: mojo/system/raw_channel.cc |
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5d8da3bd631fbe2c5f33cba3273e476a29a0ea98 |
--- /dev/null |
+++ b/mojo/system/raw_channel.cc |
@@ -0,0 +1,344 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/system/raw_channel.h" |
+ |
+#include <string.h> |
+ |
+#include <algorithm> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
+#include "mojo/system/message_in_transit.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+const size_t kReadSize = 4096; |
+ |
+RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
+} |
+ |
+RawChannel::ReadBuffer::~ReadBuffer() {} |
+ |
+void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
+ DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
+ *addr = &buffer_[0] + num_valid_bytes_; |
+ *size = kReadSize; |
+} |
+ |
+RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} |
+ |
+RawChannel::WriteBuffer::~WriteBuffer() { |
+ STLDeleteElements(&message_queue_); |
+} |
+ |
+void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
+ buffers->clear(); |
+ |
+ size_t bytes_to_write = GetTotalBytesToWrite(); |
+ if (bytes_to_write == 0) |
+ return; |
+ |
+ MessageInTransit* message = message_queue_.front(); |
+ if (!message->secondary_buffer_size()) { |
+ // Only write from the main buffer. |
+ DCHECK_LT(offset_, message->main_buffer_size()); |
+ DCHECK_LE(bytes_to_write, message->main_buffer_size()); |
+ Buffer buffer = { |
+ static_cast<const char*>(message->main_buffer()) + offset_, |
+ bytes_to_write}; |
+ buffers->push_back(buffer); |
+ return; |
+ } |
+ |
+ if (offset_ >= message->main_buffer_size()) { |
+ // Only write from the secondary buffer. |
+ DCHECK_LT(offset_ - message->main_buffer_size(), |
+ message->secondary_buffer_size()); |
+ DCHECK_LE(bytes_to_write, message->secondary_buffer_size()); |
+ Buffer buffer = { |
+ static_cast<const char*>(message->secondary_buffer()) + |
+ (offset_ - message->main_buffer_size()), |
+ bytes_to_write}; |
+ buffers->push_back(buffer); |
+ return; |
+ } |
+ |
+ // Write from both buffers. |
+ DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + |
+ message->secondary_buffer_size()); |
+ Buffer buffer1 = { |
+ static_cast<const char*>(message->main_buffer()) + offset_, |
+ message->main_buffer_size() - offset_}; |
+ buffers->push_back(buffer1); |
+ Buffer buffer2 = { |
+ static_cast<const char*>(message->secondary_buffer()), |
+ message->secondary_buffer_size()}; |
+ buffers->push_back(buffer2); |
+} |
+ |
+size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { |
+ if (message_queue_.empty()) |
+ return 0; |
+ |
+ MessageInTransit* message = message_queue_.front(); |
+ DCHECK_LT(offset_, message->total_size()); |
+ return message->total_size() - offset_; |
+} |
+ |
+RawChannel::RawChannel(Delegate* delegate, |
+ base::MessageLoopForIO* message_loop_for_io) |
+ : delegate_(delegate), |
+ message_loop_for_io_(message_loop_for_io), |
+ read_stopped_(false), |
+ write_stopped_(false), |
+ weak_ptr_factory_(this) { |
+} |
+ |
+RawChannel::~RawChannel() { |
+ DCHECK(!read_buffer_); |
+ DCHECK(!write_buffer_); |
+ |
+ // No need to take the |write_lock_| here -- if there are still weak pointers |
+ // outstanding, then we're hosed anyway (since we wouldn't be able to |
+ // invalidate them cleanly, since we might not be on the I/O thread). |
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
+} |
+ |
+bool RawChannel::Init() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ // No need to take the lock. No one should be using us yet. |
+ DCHECK(!read_buffer_); |
+ read_buffer_.reset(new ReadBuffer); |
+ DCHECK(!write_buffer_); |
+ write_buffer_.reset(new WriteBuffer); |
+ |
+ if (!OnInit()) |
+ return false; |
+ |
+ return ScheduleRead() == IO_PENDING; |
+} |
+ |
+void RawChannel::Shutdown() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ base::AutoLock locker(write_lock_); |
+ |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
+ |
+ read_stopped_ = true; |
+ write_stopped_ = true; |
+ |
+ OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
+} |
+ |
+// Reminder: This must be thread-safe. |
+bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
+ base::AutoLock locker(write_lock_); |
+ if (write_stopped_) |
+ return false; |
+ |
+ if (!write_buffer_->message_queue_.empty()) { |
+ write_buffer_->message_queue_.push_back(message.release()); |
+ return true; |
+ } |
+ |
+ write_buffer_->message_queue_.push_front(message.release()); |
+ DCHECK_EQ(write_buffer_->offset_, 0u); |
+ |
+ size_t bytes_written = 0; |
+ IOResult io_result = WriteNoLock(&bytes_written); |
+ if (io_result == IO_PENDING) |
+ return true; |
+ |
+ bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, |
+ bytes_written); |
+ if (!result) { |
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
+ // nested context. |
+ message_loop_for_io_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::CallOnFatalError, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ Delegate::FATAL_ERROR_FAILED_WRITE)); |
+ } |
+ |
+ return result; |
+} |
+ |
+RawChannel::ReadBuffer* RawChannel::read_buffer() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ return read_buffer_.get(); |
+} |
+ |
+RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() { |
+ write_lock_.AssertAcquired(); |
+ return write_buffer_.get(); |
+} |
+ |
+void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ if (read_stopped_) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
+ |
+ // Keep reading data in a loop, and dispatches messages if enough data is |
+ // received. Exit the loop if any of the following happens: |
+ // - one or more messages were dispatched; |
+ // - the last read failed, was a partial read or would block; |
+ // - |Shutdown()| was called. |
+ do { |
+ if (io_result != IO_SUCCEEDED) { |
+ read_stopped_ = true; |
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
+ return; |
+ } |
+ |
+ read_buffer_->num_valid_bytes_ += bytes_read; |
+ |
+ // Dispatch all the messages that we can. |
+ bool did_dispatch_message = false; |
+ // Tracks the offset of the first undispatched message in |read_buffer_|. |
+ // Currently, we copy data to ensure that this is zero at the beginning. |
+ size_t read_buffer_start = 0; |
+ size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
+ size_t message_size; |
+ // Note that we rely on short-circuit evaluation here: |
+ // - |read_buffer_start| may be an invalid index into |
+ // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
+ // next read). |
+ while (remaining_bytes > 0 && |
+ MessageInTransit::GetNextMessageSize( |
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes, |
+ &message_size) && |
+ remaining_bytes >= message_size) { |
+ // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with |
+ // some sort of "view" abstraction. |
+ MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
+ &read_buffer_->buffer_[read_buffer_start]); |
+ DCHECK_EQ(message.total_size(), message_size); |
+ |
+ // Dispatch the message. |
+ delegate_->OnReadMessage(message); |
+ if (read_stopped_) { |
+ // |Shutdown()| was called in |OnReadMessage()|. |
+ // TODO(vtl): Add test for this case. |
+ return; |
+ } |
+ did_dispatch_message = true; |
+ |
+ // Update our state. |
+ read_buffer_start += message_size; |
+ remaining_bytes -= message_size; |
+ } |
+ |
+ if (read_buffer_start > 0) { |
+ // Move data back to start. |
+ read_buffer_->num_valid_bytes_ = remaining_bytes; |
+ if (read_buffer_->num_valid_bytes_ > 0) { |
+ memmove(&read_buffer_->buffer_[0], |
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
+ } |
+ read_buffer_start = 0; |
+ } |
+ |
+ if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
+ kReadSize) { |
+ // Use power-of-2 buffer sizes. |
+ // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
+ // maximum message size to whatever extent necessary). |
+ // TODO(vtl): We may often be able to peek at the header and get the real |
+ // required extra space (which may be much bigger than |kReadSize|). |
+ size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); |
+ while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) |
+ new_size *= 2; |
+ |
+ // TODO(vtl): It's suboptimal to zero out the fresh memory. |
+ read_buffer_->buffer_.resize(new_size, 0); |
+ } |
+ |
+ // (1) If we dispatched any messages, stop reading for now (and let the |
+ // message loop do its thing for another round). |
+ // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
+ // a single message. Risks: slower, more complex if we want to avoid lots of |
+ // copying. ii. Keep reading until there's no more data and dispatch all the |
+ // messages we can. Risks: starvation of other users of the message loop.) |
+ // (2) If we didn't max out |kReadSize|, stop reading for now. |
+ bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
+ bytes_read = 0; |
+ io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
+ } while (io_result != IO_PENDING); |
+} |
+ |
+void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ bool did_fail = false; |
+ { |
+ base::AutoLock locker(write_lock_); |
+ DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
+ |
+ if (write_stopped_) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ did_fail = !OnWriteCompletedNoLock(result, bytes_written); |
+ } |
+ |
+ if (did_fail) |
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
+} |
+ |
+void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
+ delegate_->OnFatalError(fatal_error); |
+} |
+ |
+bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { |
+ write_lock_.AssertAcquired(); |
+ |
+ DCHECK(!write_stopped_); |
+ DCHECK(!write_buffer_->message_queue_.empty()); |
+ |
+ if (result) { |
+ if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { |
+ // Partial (or no) write. |
+ write_buffer_->offset_ += bytes_written; |
+ } else { |
+ // Complete write. |
+ DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite()); |
+ delete write_buffer_->message_queue_.front(); |
+ write_buffer_->message_queue_.pop_front(); |
+ write_buffer_->offset_ = 0; |
+ } |
+ |
+ if (write_buffer_->message_queue_.empty()) |
+ return true; |
+ |
+ // Schedule the next write. |
+ if (ScheduleWriteNoLock() == IO_PENDING) |
+ return true; |
+ } |
+ |
+ write_stopped_ = true; |
+ STLDeleteElements(&write_buffer_->message_queue_); |
+ write_buffer_->offset_ = 0; |
+ return false; |
+} |
+ |
+} // namespace system |
+} // namespace mojo |