Index: mojo/system/raw_channel.cc |
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7e255c7d229785f50a70d3bca41e5719771968c7 |
--- /dev/null |
+++ b/mojo/system/raw_channel.cc |
@@ -0,0 +1,312 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/system/raw_channel.h" |
+ |
+#include <string.h> |
+ |
+#include <algorithm> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
+#include "mojo/system/message_in_transit.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+namespace { |
viettrungluu
2014/02/26 23:03:39
Note that this anonymous namespace is redundant. (
yzshen1
2014/02/27 02:00:30
Done.
|
+ |
+const size_t kReadSize = 4096; |
+ |
+} // namespace |
+ |
+ |
+RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
+} |
+ |
+RawChannel::ReadBuffer::~ReadBuffer() {} |
+ |
+char* RawChannel::ReadBuffer::GetPosition() { |
+ DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
+ return &buffer_[0] + num_valid_bytes_; |
+} |
+ |
+size_t RawChannel::ReadBuffer::GetBytesToRead() const { |
+ DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
+ return kReadSize; |
+} |
+ |
+RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} |
+ |
+RawChannel::WriteBuffer::~WriteBuffer() { |
+ STLDeleteElements(&message_queue_); |
+} |
+ |
+const char* RawChannel::WriteBuffer::GetPosition() const { |
+ if (message_queue_.empty()) |
+ return NULL; |
+ |
+ DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_); |
+ return static_cast<const char*>(message_queue_.front()->main_buffer()) + |
+ offset_; |
+} |
+ |
+size_t RawChannel::WriteBuffer::GetBytesToWrite() const { |
+ if (message_queue_.empty()) |
+ return 0; |
+ |
+ DCHECK_GT(message_queue_.front()->main_buffer_size(), offset_); |
+ return message_queue_.front()->main_buffer_size() - offset_; |
+} |
+ |
+RawChannel::RawChannel(Delegate* delegate, |
+ base::MessageLoopForIO* message_loop_for_io) |
+ : delegate_(delegate), |
+ read_stopped_(false), |
+ message_loop_for_io_(message_loop_for_io), |
+ write_stopped_(false), |
+ weak_ptr_factory_(this) { |
+} |
+ |
+RawChannel::~RawChannel() { |
+ DCHECK(!read_buffer_); |
+ DCHECK(!write_buffer_); |
+ |
+ // No need to take the |write_lock_| here -- if there are still weak pointers |
+ // outstanding, then we're hosed anyway (since we wouldn't be able to |
+ // invalidate them cleanly, since we might not be on the I/O thread). |
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
+} |
+ |
+bool RawChannel::Init() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ // No need to take the lock. No one should be using us yet. |
+ DCHECK(!read_buffer_); |
+ read_buffer_.reset(new ReadBuffer); |
+ DCHECK(!write_buffer_); |
+ write_buffer_.reset(new WriteBuffer); |
+ |
+ if (!OnInit()) |
+ return false; |
+ |
+ IOResult result = ScheduleRead(); |
+ return result == IO_PENDING; |
viettrungluu
2014/02/26 23:03:39
nit: I think you may as well write this as |return
yzshen1
2014/02/27 02:00:30
Done.
|
+} |
+ |
+void RawChannel::Shutdown() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ base::AutoLock locker(write_lock_); |
+ |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
+ |
+ read_stopped_ = true; |
+ write_stopped_ = true; |
+ |
+ OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
+} |
+ |
+// Reminder: This must be thread-safe, and takes ownership of |message|. |
viettrungluu
2014/02/26 23:03:39
You can get rid of the ", and takes ownership ..."
yzshen1
2014/02/27 02:00:30
Done.
|
+bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
+ base::AutoLock locker(write_lock_); |
+ if (write_stopped_) |
+ return false; |
+ |
+ if (!write_buffer_->message_queue_.empty()) { |
+ write_buffer_->message_queue_.push_back(message.release()); |
+ return true; |
+ } |
+ |
+ write_buffer_->message_queue_.push_front(message.release()); |
+ DCHECK_EQ(write_buffer_->offset_, 0u); |
+ |
+ size_t bytes_written = 0; |
+ IOResult io_result = WriteNoLock(&bytes_written); |
+ if (io_result == IO_PENDING) |
+ return true; |
+ |
+ bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, |
+ bytes_written); |
+ if (!result) { |
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
+ // nested context. |
+ message_loop_for_io_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::CallOnFatalError, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ Delegate::FATAL_ERROR_FAILED_WRITE)); |
+ } |
+ |
+ return result; |
+} |
+ |
+RawChannel::ReadBuffer* RawChannel::read_buffer() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ return read_buffer_.get(); |
+} |
+ |
+RawChannel::WriteBuffer* RawChannel::write_buffer() { |
+ write_lock_.AssertAcquired(); |
+ return write_buffer_.get(); |
+} |
+ |
+void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
viettrungluu
2014/02/26 23:03:39
The implementation of this function could use a co
yzshen1
2014/02/27 02:00:30
Done.
|
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ if (read_stopped_) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
+ for (;;) { |
+ if (io_result != IO_SUCCEEDED) { |
+ read_stopped_ = true; |
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
+ return; |
+ } |
+ |
+ read_buffer_->num_valid_bytes_ += bytes_read; |
+ |
+ // Dispatch all the messages that we can. |
+ bool did_dispatch_message = false; |
+ // Tracks the offset of the first undispatched message in |read_buffer_|. |
+ // Currently, we copy data to ensure that this is zero at the beginning. |
+ size_t read_buffer_start = 0; |
+ size_t remaining_bytes = read_buffer_->num_valid_bytes_; |
+ size_t message_size; |
+ // Note that we rely on short-circuit evaluation here: |
+ // - |read_buffer_start| may be an invalid index into |
+ // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
+ // next read). |
+ while (remaining_bytes > 0 && |
+ MessageInTransit::GetNextMessageSize( |
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes, |
+ &message_size) && |
+ remaining_bytes >= message_size) { |
+ MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
+ &read_buffer_->buffer_[read_buffer_start]); |
+ DCHECK_EQ(message.main_buffer_size(), message_size); |
+ |
+ // Dispatch the message. |
+ delegate_->OnReadMessage(message); |
+ if (read_stopped_) { |
+ // |Shutdown()| was called in |OnReadMessage()|. |
+ // TODO(vtl): Add test for this case. |
+ return; |
+ } |
+ did_dispatch_message = true; |
+ |
+ // Update our state. |
+ read_buffer_start += message_size; |
+ remaining_bytes -= message_size; |
+ } |
+ |
+ if (read_buffer_start > 0) { |
+ // Move data back to start. |
+ read_buffer_->num_valid_bytes_ = remaining_bytes; |
+ if (read_buffer_->num_valid_bytes_ > 0) { |
+ memmove(&read_buffer_->buffer_[0], |
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
+ } |
+ read_buffer_start = 0; |
+ } |
+ |
+ if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
+ kReadSize) { |
+ // Use power-of-2 buffer sizes. |
+ // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
+ // maximum message size to whatever extent necessary). |
+ // TODO(vtl): We may often be able to peek at the header and get the real |
+ // required extra space (which may be much bigger than |kReadSize|). |
+ size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); |
+ while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) |
+ new_size *= 2; |
+ |
+ // TODO(vtl): It's suboptimal to zero out the fresh memory. |
+ read_buffer_->buffer_.resize(new_size, 0); |
+ } |
+ |
+ // (1) If we dispatched any messages, stop reading for now (and let the |
+ // message loop do its thing for another round). |
+ // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
+ // a single message. Risks: slower, more complex if we want to avoid lots of |
+ // copying. ii. Keep reading until there's no more data and dispatch all the |
+ // messages we can. Risks: starvation of other users of the message loop.) |
+ // (2) If we didn't max out |kReadSize|, stop reading for now. |
+ bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
+ bytes_read = 0; |
+ io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
+ |
+ if (io_result == IO_PENDING) |
viettrungluu
2014/02/26 23:03:39
do { ... } while (io_result != IO_PENDING);
yzshen1
2014/02/27 02:00:30
Done.
|
+ return; |
+ } |
+} |
+ |
+void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ bool did_fail = false; |
+ { |
+ base::AutoLock locker(write_lock_); |
+ DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
+ |
+ if (write_stopped_) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ did_fail = !OnWriteCompletedNoLock(result, bytes_written); |
+ } |
+ |
+ if (did_fail) |
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
+} |
+ |
+void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
+ delegate_->OnFatalError(fatal_error); |
+} |
+ |
+bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { |
+ write_lock_.AssertAcquired(); |
+ |
+ DCHECK(!write_stopped_); |
+ DCHECK(!write_buffer_->message_queue_.empty()); |
+ |
+ if (result) { |
+ if (bytes_written < write_buffer_->GetBytesToWrite()) { |
+ // Partial (or no) write. |
+ write_buffer_->offset_ += bytes_written; |
+ } else { |
+ // Complete write. |
+ DCHECK_EQ(bytes_written, write_buffer_->GetBytesToWrite()); |
+ delete write_buffer_->message_queue_.front(); |
+ write_buffer_->message_queue_.pop_front(); |
+ write_buffer_->offset_ = 0; |
+ } |
+ |
+ if (write_buffer_->message_queue_.empty()) |
+ return true; |
+ |
+ // Schedule the next write. |
+ if (ScheduleWriteNoLock() == IO_PENDING) |
+ return true; |
+ } |
+ |
+ write_stopped_ = true; |
+ STLDeleteElements(&write_buffer_->message_queue_); |
viettrungluu
2014/02/26 23:03:39
Can't you just do |write_buffer_.reset()| here?
yzshen1
2014/02/27 02:00:30
I was thinking maybe it is good to always give the
|
+ write_buffer_->offset_ = 0; |
+ return false; |
viettrungluu
2014/02/26 23:03:39
More importantly, if it's the subclass that calls
yzshen1
2014/02/27 02:00:30
OnWriteCompletedNoLock() is a private helper metho
|
+} |
+ |
+} // namespace system |
+} // namespace mojo |