Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1430)

Unified Diff: mojo/edk/system/raw_channel.cc

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/raw_channel.cc
diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
deleted file mode 100644
index aff11103a9daba8c591742dd0dff41432c32759f..0000000000000000000000000000000000000000
--- a/mojo/edk/system/raw_channel.cc
+++ /dev/null
@@ -1,514 +0,0 @@
-// Copyright 2014 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "mojo/edk/system/raw_channel.h"
-
-#include <string.h>
-
-#include <algorithm>
-
-#include "base/bind.h"
-#include "base/location.h"
-#include "base/logging.h"
-#include "base/message_loop/message_loop.h"
-#include "base/stl_util.h"
-#include "mojo/edk/system/message_in_transit.h"
-#include "mojo/edk/system/transport_data.h"
-
-namespace mojo {
-namespace system {
-
-const size_t kReadSize = 4096;
-
-// RawChannel::ReadBuffer ------------------------------------------------------
-
-RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
-}
-
-RawChannel::ReadBuffer::~ReadBuffer() {
-}
-
-void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
- DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
- *addr = &buffer_[0] + num_valid_bytes_;
- *size = kReadSize;
-}
-
-// RawChannel::WriteBuffer -----------------------------------------------------
-
-RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
- : serialized_platform_handle_size_(serialized_platform_handle_size),
- platform_handles_offset_(0),
- data_offset_(0) {
-}
-
-RawChannel::WriteBuffer::~WriteBuffer() {
- STLDeleteElements(&message_queue_);
-}
-
-bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
- if (message_queue_.empty())
- return false;
-
- const TransportData* transport_data =
- message_queue_.front()->transport_data();
- if (!transport_data)
- return false;
-
- const embedder::PlatformHandleVector* all_platform_handles =
- transport_data->platform_handles();
- if (!all_platform_handles) {
- DCHECK_EQ(platform_handles_offset_, 0u);
- return false;
- }
- if (platform_handles_offset_ >= all_platform_handles->size()) {
- DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
- return false;
- }
-
- return true;
-}
-
-void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
- size_t* num_platform_handles,
- embedder::PlatformHandle** platform_handles,
- void** serialization_data) {
- DCHECK(HavePlatformHandlesToSend());
-
- MessageInTransit* message = message_queue_.front();
- TransportData* transport_data = message->transport_data();
- embedder::PlatformHandleVector* all_platform_handles =
- transport_data->platform_handles();
- *num_platform_handles =
- all_platform_handles->size() - platform_handles_offset_;
- *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
-
- if (serialized_platform_handle_size_ > 0) {
- size_t serialization_data_offset =
- transport_data->platform_handle_table_offset();
- DCHECK_GT(serialization_data_offset, 0u);
- serialization_data_offset +=
- platform_handles_offset_ * serialized_platform_handle_size_;
- *serialization_data = static_cast<char*>(transport_data->buffer()) +
- serialization_data_offset;
- } else {
- *serialization_data = nullptr;
- }
-}
-
-void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
- buffers->clear();
-
- if (message_queue_.empty())
- return;
-
- MessageInTransit* message = message_queue_.front();
- DCHECK_LT(data_offset_, message->total_size());
- size_t bytes_to_write = message->total_size() - data_offset_;
-
- size_t transport_data_buffer_size =
- message->transport_data() ? message->transport_data()->buffer_size() : 0;
-
- if (!transport_data_buffer_size) {
- // Only write from the main buffer.
- DCHECK_LT(data_offset_, message->main_buffer_size());
- DCHECK_LE(bytes_to_write, message->main_buffer_size());
- Buffer buffer = {
- static_cast<const char*>(message->main_buffer()) + data_offset_,
- bytes_to_write};
- buffers->push_back(buffer);
- return;
- }
-
- if (data_offset_ >= message->main_buffer_size()) {
- // Only write from the transport data buffer.
- DCHECK_LT(data_offset_ - message->main_buffer_size(),
- transport_data_buffer_size);
- DCHECK_LE(bytes_to_write, transport_data_buffer_size);
- Buffer buffer = {
- static_cast<const char*>(message->transport_data()->buffer()) +
- (data_offset_ - message->main_buffer_size()),
- bytes_to_write};
- buffers->push_back(buffer);
- return;
- }
-
- // TODO(vtl): We could actually send out buffers from multiple messages, with
- // the "stopping" condition being reaching a message with platform handles
- // attached.
-
- // Write from both buffers.
- DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
- transport_data_buffer_size);
- Buffer buffer1 = {
- static_cast<const char*>(message->main_buffer()) + data_offset_,
- message->main_buffer_size() - data_offset_};
- buffers->push_back(buffer1);
- Buffer buffer2 = {
- static_cast<const char*>(message->transport_data()->buffer()),
- transport_data_buffer_size};
- buffers->push_back(buffer2);
-}
-
-// RawChannel ------------------------------------------------------------------
-
-RawChannel::RawChannel()
- : message_loop_for_io_(nullptr),
- delegate_(nullptr),
- read_stopped_(false),
- write_stopped_(false),
- weak_ptr_factory_(this) {
-}
-
-RawChannel::~RawChannel() {
- DCHECK(!read_buffer_);
- DCHECK(!write_buffer_);
-
- // No need to take the |write_lock_| here -- if there are still weak pointers
- // outstanding, then we're hosed anyway (since we wouldn't be able to
- // invalidate them cleanly, since we might not be on the I/O thread).
- DCHECK(!weak_ptr_factory_.HasWeakPtrs());
-}
-
-void RawChannel::Init(Delegate* delegate) {
- DCHECK(delegate);
-
- DCHECK(!delegate_);
- delegate_ = delegate;
-
- CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
- DCHECK(!message_loop_for_io_);
- message_loop_for_io_ =
- static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
-
- // No need to take the lock. No one should be using us yet.
- DCHECK(!read_buffer_);
- read_buffer_.reset(new ReadBuffer);
- DCHECK(!write_buffer_);
- write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
-
- OnInit();
-
- IOResult io_result = ScheduleRead();
- if (io_result != IO_PENDING) {
- // This will notify the delegate about the read failure. Although we're on
- // the I/O thread, don't call it in the nested context.
- message_loop_for_io_->PostTask(
- FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
- weak_ptr_factory_.GetWeakPtr(), io_result, 0));
- }
- // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
- // the delegate), not an initialization failure.
-}
-
-void RawChannel::Shutdown() {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
-
- base::AutoLock locker(write_lock_);
-
- LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
- << "Shutting down RawChannel with write buffer nonempty";
-
- // Reset the delegate so that it won't receive further calls.
- delegate_ = nullptr;
- read_stopped_ = true;
- write_stopped_ = true;
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
-}
-
-// Reminder: This must be thread-safe.
-bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
- DCHECK(message);
-
- base::AutoLock locker(write_lock_);
- if (write_stopped_)
- return false;
-
- if (!write_buffer_->message_queue_.empty()) {
- EnqueueMessageNoLock(message.Pass());
- return true;
- }
-
- EnqueueMessageNoLock(message.Pass());
- DCHECK_EQ(write_buffer_->data_offset_, 0u);
-
- size_t platform_handles_written = 0;
- size_t bytes_written = 0;
- IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
- if (io_result == IO_PENDING)
- return true;
-
- bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
- bytes_written);
- if (!result) {
- // Even if we're on the I/O thread, don't call |OnError()| in the nested
- // context.
- message_loop_for_io_->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(),
- Delegate::ERROR_WRITE));
- }
-
- return result;
-}
-
-// Reminder: This must be thread-safe.
-bool RawChannel::IsWriteBufferEmpty() {
- base::AutoLock locker(write_lock_);
- return write_buffer_->message_queue_.empty();
-}
-
-void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
-
- if (read_stopped_) {
- NOTREACHED();
- return;
- }
-
- // Keep reading data in a loop, and dispatch messages if enough data is
- // received. Exit the loop if any of the following happens:
- // - one or more messages were dispatched;
- // - the last read failed, was a partial read or would block;
- // - |Shutdown()| was called.
- do {
- switch (io_result) {
- case IO_SUCCEEDED:
- break;
- case IO_FAILED_SHUTDOWN:
- case IO_FAILED_BROKEN:
- case IO_FAILED_UNKNOWN:
- read_stopped_ = true;
- CallOnError(ReadIOResultToError(io_result));
- return;
- case IO_PENDING:
- NOTREACHED();
- return;
- }
-
- read_buffer_->num_valid_bytes_ += bytes_read;
-
- // Dispatch all the messages that we can.
- bool did_dispatch_message = false;
- // Tracks the offset of the first undispatched message in |read_buffer_|.
- // Currently, we copy data to ensure that this is zero at the beginning.
- size_t read_buffer_start = 0;
- size_t remaining_bytes = read_buffer_->num_valid_bytes_;
- size_t message_size;
- // Note that we rely on short-circuit evaluation here:
- // - |read_buffer_start| may be an invalid index into
- // |read_buffer_->buffer_| if |remaining_bytes| is zero.
- // - |message_size| is only valid if |GetNextMessageSize()| returns true.
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
- // next read).
- // TODO(vtl): Validate that |message_size| is sane.
- while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
- &read_buffer_->buffer_[read_buffer_start],
- remaining_bytes, &message_size) &&
- remaining_bytes >= message_size) {
- MessageInTransit::View message_view(
- message_size, &read_buffer_->buffer_[read_buffer_start]);
- DCHECK_EQ(message_view.total_size(), message_size);
-
- const char* error_message = nullptr;
- if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
- &error_message)) {
- DCHECK(error_message);
- LOG(ERROR) << "Received invalid message: " << error_message;
- read_stopped_ = true;
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- return;
- }
-
- if (message_view.type() == MessageInTransit::kTypeRawChannel) {
- if (!OnReadMessageForRawChannel(message_view)) {
- read_stopped_ = true;
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- return;
- }
- } else {
- embedder::ScopedPlatformHandleVectorPtr platform_handles;
- if (message_view.transport_data_buffer()) {
- size_t num_platform_handles;
- const void* platform_handle_table;
- TransportData::GetPlatformHandleTable(
- message_view.transport_data_buffer(), &num_platform_handles,
- &platform_handle_table);
-
- if (num_platform_handles > 0) {
- platform_handles =
- GetReadPlatformHandles(num_platform_handles,
- platform_handle_table).Pass();
- if (!platform_handles) {
- LOG(ERROR) << "Invalid number of platform handles received";
- read_stopped_ = true;
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- return;
- }
- }
- }
-
- // TODO(vtl): In the case that we aren't expecting any platform handles,
- // for the POSIX implementation, we should confirm that none are stored.
-
- // Dispatch the message.
- DCHECK(delegate_);
- delegate_->OnReadMessage(message_view, platform_handles.Pass());
- if (read_stopped_) {
- // |Shutdown()| was called in |OnReadMessage()|.
- // TODO(vtl): Add test for this case.
- return;
- }
- }
-
- did_dispatch_message = true;
-
- // Update our state.
- read_buffer_start += message_size;
- remaining_bytes -= message_size;
- }
-
- if (read_buffer_start > 0) {
- // Move data back to start.
- read_buffer_->num_valid_bytes_ = remaining_bytes;
- if (read_buffer_->num_valid_bytes_ > 0) {
- memmove(&read_buffer_->buffer_[0],
- &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
- }
- read_buffer_start = 0;
- }
-
- if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
- kReadSize) {
- // Use power-of-2 buffer sizes.
- // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
- // maximum message size to whatever extent necessary).
- // TODO(vtl): We may often be able to peek at the header and get the real
- // required extra space (which may be much bigger than |kReadSize|).
- size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
- while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
- new_size *= 2;
-
- // TODO(vtl): It's suboptimal to zero out the fresh memory.
- read_buffer_->buffer_.resize(new_size, 0);
- }
-
- // (1) If we dispatched any messages, stop reading for now (and let the
- // message loop do its thing for another round).
- // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
- // a single message. Risks: slower, more complex if we want to avoid lots of
- // copying. ii. Keep reading until there's no more data and dispatch all the
- // messages we can. Risks: starvation of other users of the message loop.)
- // (2) If we didn't max out |kReadSize|, stop reading for now.
- bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
- bytes_read = 0;
- io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
- } while (io_result != IO_PENDING);
-}
-
-void RawChannel::OnWriteCompleted(IOResult io_result,
- size_t platform_handles_written,
- size_t bytes_written) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- DCHECK_NE(io_result, IO_PENDING);
-
- bool did_fail = false;
- {
- base::AutoLock locker(write_lock_);
- DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
-
- if (write_stopped_) {
- NOTREACHED();
- return;
- }
-
- did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
- bytes_written);
- }
-
- if (did_fail)
- CallOnError(Delegate::ERROR_WRITE);
-}
-
-void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
- write_lock_.AssertAcquired();
- write_buffer_->message_queue_.push_back(message.release());
-}
-
-bool RawChannel::OnReadMessageForRawChannel(
- const MessageInTransit::View& message_view) {
- // No non-implementation specific |RawChannel| control messages.
- LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
- << ")";
- return false;
-}
-
-// static
-RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
- IOResult io_result) {
- switch (io_result) {
- case IO_FAILED_SHUTDOWN:
- return Delegate::ERROR_READ_SHUTDOWN;
- case IO_FAILED_BROKEN:
- return Delegate::ERROR_READ_BROKEN;
- case IO_FAILED_UNKNOWN:
- return Delegate::ERROR_READ_UNKNOWN;
- case IO_SUCCEEDED:
- case IO_PENDING:
- NOTREACHED();
- break;
- }
- return Delegate::ERROR_READ_UNKNOWN;
-}
-
-void RawChannel::CallOnError(Delegate::Error error) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
- if (delegate_)
- delegate_->OnError(error);
-}
-
-bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
- size_t platform_handles_written,
- size_t bytes_written) {
- write_lock_.AssertAcquired();
-
- DCHECK(!write_stopped_);
- DCHECK(!write_buffer_->message_queue_.empty());
-
- if (io_result == IO_SUCCEEDED) {
- write_buffer_->platform_handles_offset_ += platform_handles_written;
- write_buffer_->data_offset_ += bytes_written;
-
- MessageInTransit* message = write_buffer_->message_queue_.front();
- if (write_buffer_->data_offset_ >= message->total_size()) {
- // Complete write.
- CHECK_EQ(write_buffer_->data_offset_, message->total_size());
- write_buffer_->message_queue_.pop_front();
- delete message;
- write_buffer_->platform_handles_offset_ = 0;
- write_buffer_->data_offset_ = 0;
-
- if (write_buffer_->message_queue_.empty())
- return true;
- }
-
- // Schedule the next write.
- io_result = ScheduleWriteNoLock();
- if (io_result == IO_PENDING)
- return true;
- DCHECK_NE(io_result, IO_SUCCEEDED);
- }
-
- write_stopped_ = true;
- STLDeleteElements(&write_buffer_->message_queue_);
- write_buffer_->platform_handles_offset_ = 0;
- write_buffer_->data_offset_ = 0;
- return false;
-}
-
-} // namespace system
-} // namespace mojo
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698