Index: mojo/system/raw_channel.cc |
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ae25e1f8fe1adde106a0af78f10a8c41a6c3d7a5 |
--- /dev/null |
+++ b/mojo/system/raw_channel.cc |
@@ -0,0 +1,309 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/system/raw_channel.h" |
+ |
+#include <string.h> |
+ |
+#include <algorithm> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
+#include "mojo/system/message_in_transit.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+namespace { |
+ |
+const size_t kReadSize = 4096; |
+ |
+} // namespace |
+ |
+RawChannel::IOBufferPreserver::IOBufferPreserver( |
+ scoped_ptr<std::vector<char> > read_buffer, |
+ scoped_ptr<MessageInTransit> write_buffer) |
+ : read_buffer_(read_buffer.Pass()), |
+ write_buffer_(write_buffer.Pass()) { |
+} |
+ |
+RawChannel::IOBufferPreserver::~IOBufferPreserver() { |
+} |
+ |
+RawChannel::RawChannel(Delegate* delegate, |
+ base::MessageLoopForIO* message_loop_for_io) |
+ : delegate_(delegate), |
+ read_stopped_(false), |
+ read_buffer_(kReadSize), |
+ read_buffer_num_valid_bytes_(0), |
+ message_loop_for_io_(message_loop_for_io), |
+ write_stopped_(false), |
+ write_message_offset_(0), |
+ weak_ptr_factory_(this) { |
+} |
+ |
+RawChannel::~RawChannel() { |
+ DCHECK(write_message_queue_.empty()); |
+ |
+ // No need to take the |write_lock_| here -- if there are still weak pointers |
+ // outstanding, then we're hosed anyway (since we wouldn't be able to |
+ // invalidate them cleanly, since we might not be on the I/O thread). |
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
+} |
+ |
+bool RawChannel::Init() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ // No need to take the lock. No one should be using us yet. |
+ DCHECK(write_message_queue_.empty()); |
+ DCHECK_EQ(kReadSize, read_buffer_.size()); |
+ DCHECK_EQ(0u, read_buffer_num_valid_bytes_); |
+ |
+ if (!OnInit()) |
+ return false; |
+ |
+ IOResult result = Read(true, &read_buffer_[0], kReadSize, NULL); |
+ return result == IO_PENDING; |
+} |
+ |
+void RawChannel::Shutdown() { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ base::AutoLock locker(write_lock_); |
+ |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
+ |
+ scoped_ptr<std::vector<char> > preserved_read_buffer; |
+ if (!read_stopped_) { |
+ read_stopped_ = true; |
+ preserved_read_buffer.reset(new std::vector<char>()); |
+ preserved_read_buffer->swap(read_buffer_); |
+ } |
+ |
+ scoped_ptr<MessageInTransit> preserved_write_buffer; |
+ if (!write_stopped_) { |
+ write_stopped_ = true; |
+ if (!write_message_queue_.empty()) { |
+ preserved_write_buffer.reset(write_message_queue_.front()); |
+ write_message_queue_.pop_front(); |
+ } |
+ STLDeleteElements(&write_message_queue_); |
+ } else { |
+ DCHECK(write_message_queue_.empty()); |
+ } |
+ |
+ scoped_ptr<IOBufferPreserver> preserver(new IOBufferPreserver( |
+ preserved_read_buffer.Pass(), preserved_write_buffer.Pass())); |
+ |
+ OnShutdownNoLock(preserver.Pass()); |
+} |
+ |
+// Reminder: This must be thread-safe, and takes ownership of |message|. |
+bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
+ base::AutoLock locker(write_lock_); |
+ if (write_stopped_) |
+ return false; |
+ |
+ if (!write_message_queue_.empty()) { |
+ write_message_queue_.push_back(message.release()); |
+ return true; |
+ } |
+ |
+ write_message_queue_.push_front(message.release()); |
+ DCHECK_EQ(write_message_offset_, 0u); |
+ |
+ MessageInTransit* front_message = write_message_queue_.front(); |
+ size_t bytes_written = 0; |
+ IOResult io_result = WriteNoLock( |
+ false, |
+ static_cast<const char*>(front_message->main_buffer()), |
+ front_message->main_buffer_size(), |
+ &bytes_written); |
+ |
+ if (io_result == IO_PENDING) |
+ return true; |
+ |
+ bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, |
+ bytes_written); |
+ if (!result) { |
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
+ // nested context. |
+ message_loop_for_io_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&RawChannel::CallOnFatalError, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ Delegate::FATAL_ERROR_FAILED_WRITE)); |
+ } |
+ |
+ return result; |
+} |
+ |
+void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ if (read_stopped_) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
+ for (;;) { |
+ if (io_result != IO_SUCCEEDED) { |
+ read_stopped_ = true; |
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); |
+ return; |
+ } |
+ |
+ read_buffer_num_valid_bytes_ += bytes_read; |
+ |
+ // Dispatch all the messages that we can. |
+ bool did_dispatch_message = false; |
+ // Tracks the offset of the first undispatched message in |read_buffer_|. |
+ // Currently, we copy data to ensure that this is zero at the beginning. |
+ size_t read_buffer_start = 0; |
+ size_t message_size; |
+ // Note that we rely on short-circuit evaluation here: |
+ // - |read_buffer_start| may be an invalid index into |read_buffer_| if |
+ // |read_buffer_num_valid_bytes_| is zero. |
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
+ // next read). |
+ while (read_buffer_num_valid_bytes_ > 0 && |
+ MessageInTransit::GetNextMessageSize( |
+ &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, |
+ &message_size) && |
+ read_buffer_num_valid_bytes_ >= message_size) { |
+ MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, |
+ &read_buffer_[read_buffer_start]); |
+ DCHECK_EQ(message.main_buffer_size(), message_size); |
+ |
+ // Dispatch the message. |
+ delegate_->OnReadMessage(message); |
+ if (read_stopped_) { |
+ // |Shutdown()| was called in |OnReadMessage()|. |
+ // TODO(vtl): Add test for this case. |
+ return; |
+ } |
+ did_dispatch_message = true; |
+ |
+ // Update our state. |
+ read_buffer_start += message_size; |
+ read_buffer_num_valid_bytes_ -= message_size; |
+ } |
+ |
+ // (1) If we dispatched any messages, stop reading for now (and let the |
+ // message loop do its thing for another round). |
+ // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
+ // a single message. Risks: slower, more complex if we want to avoid lots of |
+ // copying. ii. Keep reading until there's no more data and dispatch all the |
+ // messages we can. Risks: starvation of other users of the message loop.) |
+ // (2) If we didn't max out |kReadSize|, stop reading for now. |
+ bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
+ |
+ if (read_buffer_start > 0) { |
+ // Move data back to start. |
+ if (read_buffer_num_valid_bytes_ > 0) { |
+ memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], |
+ read_buffer_num_valid_bytes_); |
+ } |
+ read_buffer_start = 0; |
+ } |
+ |
+ if (read_buffer_.size() - read_buffer_num_valid_bytes_ < kReadSize) { |
+ // Use power-of-2 buffer sizes. |
+ // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
+ // maximum message size to whatever extent necessary). |
+ // TODO(vtl): We may often be able to peek at the header and get the real |
+ // required extra space (which may be much bigger than |kReadSize|). |
+ size_t new_size = std::max(read_buffer_.size(), kReadSize); |
+ while (new_size < read_buffer_num_valid_bytes_ + kReadSize) |
+ new_size *= 2; |
+ |
+ // TODO(vtl): It's suboptimal to zero out the fresh memory. |
+ read_buffer_.resize(new_size, 0); |
+ } |
+ |
+ io_result = Read(schedule_for_later, |
+ &read_buffer_[read_buffer_num_valid_bytes_], |
+ kReadSize, |
+ &bytes_read); |
+ if (io_result == IO_PENDING) |
+ return; |
+ } |
+} |
+ |
+void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ |
+ bool did_fail = false; |
+ { |
+ base::AutoLock locker(write_lock_); |
+ DCHECK_EQ(write_stopped_, write_message_queue_.empty()); |
+ |
+ if (write_stopped_) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ did_fail = !OnWriteCompletedNoLock(result, bytes_written); |
+ } |
+ |
+ if (did_fail) |
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
+} |
+ |
+void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
+ delegate_->OnFatalError(fatal_error); |
+} |
+ |
+bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { |
+ write_lock_.AssertAcquired(); |
+ |
+ DCHECK(!write_stopped_); |
+ DCHECK(!write_message_queue_.empty()); |
+ |
+ if (result) { |
+ MessageInTransit* message = write_message_queue_.front(); |
+ DCHECK_LT(write_message_offset_, message->main_buffer_size()); |
+ size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; |
+ |
+ if (bytes_written < bytes_to_write) { |
+ // Partial (or no) write. |
+ write_message_offset_ += bytes_written; |
+ } else { |
+ // Complete write. |
+ DCHECK_EQ(bytes_written, bytes_to_write); |
+ write_message_queue_.pop_front(); |
+ delete message; |
+ write_message_offset_ = 0; |
+ } |
+ |
+ if (write_message_queue_.empty()) |
+ return true; |
+ |
+ // Schedule the next write. |
+ message = write_message_queue_.front(); |
+ bytes_to_write = message->main_buffer_size() - write_message_offset_; |
+ |
+ if (WriteNoLock(true, |
+ static_cast<const char*>(message->main_buffer()) + |
+ write_message_offset_, |
+ bytes_to_write, |
+ NULL) == IO_PENDING) { |
+ return true; |
+ } |
+ } |
+ |
+ write_stopped_ = true; |
+ STLDeleteElements(&write_message_queue_); |
+ return false; |
+} |
+ |
+} // namespace system |
+} // namespace mojo |