Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(380)

Unified Diff: mojo/system/raw_channel.cc

Issue 169723004: RawChannel refactoring (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/system/raw_channel.cc
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
new file mode 100644
index 0000000000000000000000000000000000000000..ae25e1f8fe1adde106a0af78f10a8c41a6c3d7a5
--- /dev/null
+++ b/mojo/system/raw_channel.cc
@@ -0,0 +1,309 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/raw_channel.h"
+
+#include <string.h>
+
+#include <algorithm>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/stl_util.h"
+#include "mojo/system/message_in_transit.h"
+
+namespace mojo {
+namespace system {
+
+namespace {
+
+const size_t kReadSize = 4096;
+
+} // namespace
+
+RawChannel::IOBufferPreserver::IOBufferPreserver(
+ scoped_ptr<std::vector<char> > read_buffer,
+ scoped_ptr<MessageInTransit> write_buffer)
+ : read_buffer_(read_buffer.Pass()),
+ write_buffer_(write_buffer.Pass()) {
+}
+
+RawChannel::IOBufferPreserver::~IOBufferPreserver() {
+}
+
+RawChannel::RawChannel(Delegate* delegate,
+ base::MessageLoopForIO* message_loop_for_io)
+ : delegate_(delegate),
+ read_stopped_(false),
+ read_buffer_(kReadSize),
+ read_buffer_num_valid_bytes_(0),
+ message_loop_for_io_(message_loop_for_io),
+ write_stopped_(false),
+ write_message_offset_(0),
+ weak_ptr_factory_(this) {
+}
+
+RawChannel::~RawChannel() {
+ DCHECK(write_message_queue_.empty());
+
+ // No need to take the |write_lock_| here -- if there are still weak pointers
+ // outstanding, then we're hosed anyway (since we wouldn't be able to
+ // invalidate them cleanly, since we might not be on the I/O thread).
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs());
+}
+
+bool RawChannel::Init() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ // No need to take the lock. No one should be using us yet.
+ DCHECK(write_message_queue_.empty());
+ DCHECK_EQ(kReadSize, read_buffer_.size());
+ DCHECK_EQ(0u, read_buffer_num_valid_bytes_);
+
+ if (!OnInit())
+ return false;
+
+ IOResult result = Read(true, &read_buffer_[0], kReadSize, NULL);
+ return result == IO_PENDING;
+}
+
+void RawChannel::Shutdown() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ base::AutoLock locker(write_lock_);
+
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
+ scoped_ptr<std::vector<char> > preserved_read_buffer;
+ if (!read_stopped_) {
+ read_stopped_ = true;
+ preserved_read_buffer.reset(new std::vector<char>());
+ preserved_read_buffer->swap(read_buffer_);
+ }
+
+ scoped_ptr<MessageInTransit> preserved_write_buffer;
+ if (!write_stopped_) {
+ write_stopped_ = true;
+ if (!write_message_queue_.empty()) {
+ preserved_write_buffer.reset(write_message_queue_.front());
+ write_message_queue_.pop_front();
+ }
+ STLDeleteElements(&write_message_queue_);
+ } else {
+ DCHECK(write_message_queue_.empty());
+ }
+
+ scoped_ptr<IOBufferPreserver> preserver(new IOBufferPreserver(
+ preserved_read_buffer.Pass(), preserved_write_buffer.Pass()));
+
+ OnShutdownNoLock(preserver.Pass());
+}
+
+// Reminder: This must be thread-safe, and takes ownership of |message|.
+bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
+ base::AutoLock locker(write_lock_);
+ if (write_stopped_)
+ return false;
+
+ if (!write_message_queue_.empty()) {
+ write_message_queue_.push_back(message.release());
+ return true;
+ }
+
+ write_message_queue_.push_front(message.release());
+ DCHECK_EQ(write_message_offset_, 0u);
+
+ MessageInTransit* front_message = write_message_queue_.front();
+ size_t bytes_written = 0;
+ IOResult io_result = WriteNoLock(
+ false,
+ static_cast<const char*>(front_message->main_buffer()),
+ front_message->main_buffer_size(),
+ &bytes_written);
+
+ if (io_result == IO_PENDING)
+ return true;
+
+ bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
+ bytes_written);
+ if (!result) {
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the
+ // nested context.
+ message_loop_for_io_->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::CallOnFatalError,
+ weak_ptr_factory_.GetWeakPtr(),
+ Delegate::FATAL_ERROR_FAILED_WRITE));
+ }
+
+ return result;
+}
+
+void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ if (read_stopped_) {
+ NOTREACHED();
+ return;
+ }
+
+ IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
+ for (;;) {
+ if (io_result != IO_SUCCEEDED) {
+ read_stopped_ = true;
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
+ return;
+ }
+
+ read_buffer_num_valid_bytes_ += bytes_read;
+
+ // Dispatch all the messages that we can.
+ bool did_dispatch_message = false;
+ // Tracks the offset of the first undispatched message in |read_buffer_|.
+ // Currently, we copy data to ensure that this is zero at the beginning.
+ size_t read_buffer_start = 0;
+ size_t message_size;
+ // Note that we rely on short-circuit evaluation here:
+ // - |read_buffer_start| may be an invalid index into |read_buffer_| if
+ // |read_buffer_num_valid_bytes_| is zero.
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true.
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
+ // next read).
+ while (read_buffer_num_valid_bytes_ > 0 &&
+ MessageInTransit::GetNextMessageSize(
+ &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
+ &message_size) &&
+ read_buffer_num_valid_bytes_ >= message_size) {
+ MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
+ &read_buffer_[read_buffer_start]);
+ DCHECK_EQ(message.main_buffer_size(), message_size);
+
+ // Dispatch the message.
+ delegate_->OnReadMessage(message);
+ if (read_stopped_) {
+ // |Shutdown()| was called in |OnReadMessage()|.
+ // TODO(vtl): Add test for this case.
+ return;
+ }
+ did_dispatch_message = true;
+
+ // Update our state.
+ read_buffer_start += message_size;
+ read_buffer_num_valid_bytes_ -= message_size;
+ }
+
+ // (1) If we dispatched any messages, stop reading for now (and let the
+ // message loop do its thing for another round).
+ // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
+ // a single message. Risks: slower, more complex if we want to avoid lots of
+ // copying. ii. Keep reading until there's no more data and dispatch all the
+ // messages we can. Risks: starvation of other users of the message loop.)
+ // (2) If we didn't max out |kReadSize|, stop reading for now.
+ bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
+
+ if (read_buffer_start > 0) {
+ // Move data back to start.
+ if (read_buffer_num_valid_bytes_ > 0) {
+ memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
+ read_buffer_num_valid_bytes_);
+ }
+ read_buffer_start = 0;
+ }
+
+ if (read_buffer_.size() - read_buffer_num_valid_bytes_ < kReadSize) {
+ // Use power-of-2 buffer sizes.
+ // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
+ // maximum message size to whatever extent necessary).
+ // TODO(vtl): We may often be able to peek at the header and get the real
+ // required extra space (which may be much bigger than |kReadSize|).
+ size_t new_size = std::max(read_buffer_.size(), kReadSize);
+ while (new_size < read_buffer_num_valid_bytes_ + kReadSize)
+ new_size *= 2;
+
+ // TODO(vtl): It's suboptimal to zero out the fresh memory.
+ read_buffer_.resize(new_size, 0);
+ }
+
+ io_result = Read(schedule_for_later,
+ &read_buffer_[read_buffer_num_valid_bytes_],
+ kReadSize,
+ &bytes_read);
+ if (io_result == IO_PENDING)
+ return;
+ }
+}
+
+void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ bool did_fail = false;
+ {
+ base::AutoLock locker(write_lock_);
+ DCHECK_EQ(write_stopped_, write_message_queue_.empty());
+
+ if (write_stopped_) {
+ NOTREACHED();
+ return;
+ }
+
+ did_fail = !OnWriteCompletedNoLock(result, bytes_written);
+ }
+
+ if (did_fail)
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
+}
+
+void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
+ delegate_->OnFatalError(fatal_error);
+}
+
+bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
+ write_lock_.AssertAcquired();
+
+ DCHECK(!write_stopped_);
+ DCHECK(!write_message_queue_.empty());
+
+ if (result) {
+ MessageInTransit* message = write_message_queue_.front();
+ DCHECK_LT(write_message_offset_, message->main_buffer_size());
+ size_t bytes_to_write = message->main_buffer_size() - write_message_offset_;
+
+ if (bytes_written < bytes_to_write) {
+ // Partial (or no) write.
+ write_message_offset_ += bytes_written;
+ } else {
+ // Complete write.
+ DCHECK_EQ(bytes_written, bytes_to_write);
+ write_message_queue_.pop_front();
+ delete message;
+ write_message_offset_ = 0;
+ }
+
+ if (write_message_queue_.empty())
+ return true;
+
+ // Schedule the next write.
+ message = write_message_queue_.front();
+ bytes_to_write = message->main_buffer_size() - write_message_offset_;
+
+ if (WriteNoLock(true,
+ static_cast<const char*>(message->main_buffer()) +
+ write_message_offset_,
+ bytes_to_write,
+ NULL) == IO_PENDING) {
+ return true;
+ }
+ }
+
+ write_stopped_ = true;
+ STLDeleteElements(&write_message_queue_);
+ return false;
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698