| Index: mojo/edk/system/raw_channel.cc
|
| diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
|
| deleted file mode 100644
|
| index 93c2d05d61f4a97c7050d26cdbc9ffc196c65806..0000000000000000000000000000000000000000
|
| --- a/mojo/edk/system/raw_channel.cc
|
| +++ /dev/null
|
| @@ -1,739 +0,0 @@
|
| -// Copyright 2014 The Chromium Authors. All rights reserved.
|
| -// Use of this source code is governed by a BSD-style license that can be
|
| -// found in the LICENSE file.
|
| -
|
| -#include "mojo/edk/system/raw_channel.h"
|
| -
|
| -#include <stddef.h>
|
| -#include <stdint.h>
|
| -#include <string.h>
|
| -#include <algorithm>
|
| -#include <utility>
|
| -
|
| -#include "base/bind.h"
|
| -#include "base/location.h"
|
| -#include "base/logging.h"
|
| -#include "base/message_loop/message_loop.h"
|
| -#include "mojo/edk/embedder/embedder_internal.h"
|
| -#include "mojo/edk/system/configuration.h"
|
| -#include "mojo/edk/system/message_in_transit.h"
|
| -#include "mojo/edk/system/transport_data.h"
|
| -
|
| -namespace mojo {
|
| -namespace edk {
|
| -
|
| -const size_t kReadSize = 4096;
|
| -
|
| -// RawChannel::ReadBuffer ------------------------------------------------------
|
| -
|
| -RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
|
| -}
|
| -
|
| -RawChannel::ReadBuffer::~ReadBuffer() {
|
| -}
|
| -
|
| -void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
|
| - DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
|
| - *addr = &buffer_[0] + num_valid_bytes_;
|
| - *size = kReadSize;
|
| -}
|
| -
|
| -// RawChannel::WriteBuffer -----------------------------------------------------
|
| -
|
| -RawChannel::WriteBuffer::WriteBuffer()
|
| - : serialized_platform_handle_size_(0),
|
| - platform_handles_offset_(0),
|
| - data_offset_(0) {
|
| -}
|
| -
|
| -RawChannel::WriteBuffer::~WriteBuffer() {
|
| - message_queue_.Clear();
|
| -}
|
| -
|
| -bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
|
| - if (message_queue_.IsEmpty())
|
| - return false;
|
| -
|
| - const TransportData* transport_data =
|
| - message_queue_.PeekMessage()->transport_data();
|
| - if (!transport_data)
|
| - return false;
|
| -
|
| - const PlatformHandleVector* all_platform_handles =
|
| - transport_data->platform_handles();
|
| - if (!all_platform_handles) {
|
| - DCHECK_EQ(platform_handles_offset_, 0u);
|
| - return false;
|
| - }
|
| - if (platform_handles_offset_ >= all_platform_handles->size()) {
|
| - DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
|
| - return false;
|
| - }
|
| -
|
| - return true;
|
| -}
|
| -
|
| -void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
|
| - size_t* num_platform_handles,
|
| - PlatformHandle** platform_handles,
|
| - void** serialization_data) {
|
| - DCHECK(HavePlatformHandlesToSend());
|
| -
|
| - MessageInTransit* message = message_queue_.PeekMessage();
|
| - TransportData* transport_data = message->transport_data();
|
| - PlatformHandleVector* all_platform_handles =
|
| - transport_data->platform_handles();
|
| - *num_platform_handles =
|
| - all_platform_handles->size() - platform_handles_offset_;
|
| - *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
|
| -
|
| - if (serialized_platform_handle_size_ > 0) {
|
| - size_t serialization_data_offset =
|
| - transport_data->platform_handle_table_offset();
|
| - serialization_data_offset +=
|
| - platform_handles_offset_ * serialized_platform_handle_size_;
|
| - *serialization_data = static_cast<char*>(transport_data->buffer()) +
|
| - serialization_data_offset;
|
| - } else {
|
| - *serialization_data = nullptr;
|
| - }
|
| -}
|
| -
|
| -void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) {
|
| - buffers->clear();
|
| -
|
| - if (message_queue_.IsEmpty())
|
| - return;
|
| -
|
| - const MessageInTransit* message = message_queue_.PeekMessage();
|
| - if (message->type() == MessageInTransit::Type::RAW_MESSAGE) {
|
| - // These are already-serialized messages so we don't want to write another
|
| - // header as they include that.
|
| - if (data_offset_ == 0) {
|
| - size_t header_size = message->total_size() - message->num_bytes();
|
| - data_offset_ = header_size;
|
| - }
|
| - }
|
| -
|
| - DCHECK_LT(data_offset_, message->total_size());
|
| - size_t bytes_to_write = message->total_size() - data_offset_;
|
| -
|
| - size_t transport_data_buffer_size =
|
| - message->transport_data() ? message->transport_data()->buffer_size() : 0;
|
| -
|
| - if (!transport_data_buffer_size) {
|
| - // Only write from the main buffer.
|
| - DCHECK_LT(data_offset_, message->main_buffer_size());
|
| - DCHECK_LE(bytes_to_write, message->main_buffer_size());
|
| - Buffer buffer = {
|
| - static_cast<const char*>(message->main_buffer()) + data_offset_,
|
| - bytes_to_write};
|
| -
|
| - buffers->push_back(buffer);
|
| - return;
|
| - }
|
| -
|
| - if (data_offset_ >= message->main_buffer_size()) {
|
| - // Only write from the transport data buffer.
|
| - DCHECK_LT(data_offset_ - message->main_buffer_size(),
|
| - transport_data_buffer_size);
|
| - DCHECK_LE(bytes_to_write, transport_data_buffer_size);
|
| - Buffer buffer = {
|
| - static_cast<const char*>(message->transport_data()->buffer()) +
|
| - (data_offset_ - message->main_buffer_size()),
|
| - bytes_to_write};
|
| -
|
| - buffers->push_back(buffer);
|
| - return;
|
| - }
|
| -
|
| - // TODO(vtl): We could actually send out buffers from multiple messages, with
|
| - // the "stopping" condition being reaching a message with platform handles
|
| - // attached.
|
| -
|
| - // Write from both buffers.
|
| - DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
|
| - transport_data_buffer_size);
|
| - Buffer buffer1 = {
|
| - static_cast<const char*>(message->main_buffer()) + data_offset_,
|
| - message->main_buffer_size() - data_offset_};
|
| - buffers->push_back(buffer1);
|
| - Buffer buffer2 = {
|
| - static_cast<const char*>(message->transport_data()->buffer()),
|
| - transport_data_buffer_size};
|
| - buffers->push_back(buffer2);
|
| -}
|
| -
|
| -// RawChannel ------------------------------------------------------------------
|
| -
|
| -RawChannel::RawChannel()
|
| - : delegate_(nullptr),
|
| - error_occurred_(false),
|
| - calling_delegate_(false),
|
| - write_ready_(false),
|
| - write_stopped_(false),
|
| - pending_write_error_(false),
|
| - initialized_(false),
|
| - weak_ptr_factory_(this) {
|
| - read_buffer_.reset(new ReadBuffer);
|
| - write_buffer_.reset(new WriteBuffer());
|
| -}
|
| -
|
| -RawChannel::~RawChannel() {
|
| - DCHECK(!read_buffer_);
|
| - DCHECK(!write_buffer_);
|
| -}
|
| -
|
| -void RawChannel::Init(Delegate* delegate) {
|
| - DCHECK(delegate);
|
| -
|
| - base::AutoLock read_locker(read_lock_);
|
| - // Solves race where initialiing on io thread while main thread is serializing
|
| - // this channel and releases handle.
|
| - base::AutoLock locker(write_lock_);
|
| -
|
| - DCHECK(!delegate_);
|
| - delegate_ = delegate;
|
| -
|
| - if (read_buffer_->num_valid_bytes_ ||
|
| - !write_buffer_->message_queue_.IsEmpty()) {
|
| - LazyInitialize();
|
| - }
|
| -}
|
| -
|
| -void RawChannel::EnsureLazyInitialized() {
|
| - {
|
| - base::AutoLock locker(write_lock_);
|
| - if (initialized_)
|
| - return;
|
| - }
|
| -
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::LockAndCallLazyInitialize,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| -}
|
| -
|
| -void RawChannel::LockAndCallLazyInitialize() {
|
| - base::AutoLock read_locker(read_lock_);
|
| - base::AutoLock locker(write_lock_);
|
| - LazyInitialize();
|
| -}
|
| -
|
| -void RawChannel::LazyInitialize() {
|
| - read_lock_.AssertAcquired();
|
| - write_lock_.AssertAcquired();
|
| - DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| - if (initialized_)
|
| - return;
|
| - initialized_ = true;
|
| - base::MessageLoop::current()->AddDestructionObserver(this);
|
| -
|
| - OnInit();
|
| -
|
| - if (read_buffer_->num_valid_bytes_) {
|
| - // We had serialized read buffer data through SetSerializedData call.
|
| - // Make sure we read messages out of it now, otherwise the delegate won't
|
| - // get notified if no other data gets written to the pipe.
|
| - // Although this means that we can call back synchronously into the caller,
|
| - // that's easier than posting a task to do this. That is because if we post
|
| - // a task, a pending read could have started and we wouldn't be able to move
|
| - // the read buffer since it can be in use by the OS in an async operation.
|
| - bool did_dispatch_message = false;
|
| - bool stop_dispatching = false;
|
| - DispatchMessages(&did_dispatch_message, &stop_dispatching);
|
| - }
|
| -
|
| - IOResult io_result = ScheduleRead();
|
| - if (io_result != IO_PENDING) {
|
| - // This will notify the delegate about the read failure. Although we're on
|
| - // the I/O thread, don't call it in the nested context.
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted,
|
| - weak_ptr_factory_.GetWeakPtr(), io_result, 0));
|
| - }
|
| - // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
|
| - // the delegate), not an initialization failure.
|
| -
|
| - write_ready_ = true;
|
| - write_buffer_->serialized_platform_handle_size_ =
|
| - GetSerializedPlatformHandleSize();
|
| - if (!write_buffer_->message_queue_.IsEmpty())
|
| - SendQueuedMessagesNoLock();
|
| -}
|
| -
|
| -void RawChannel::Shutdown() {
|
| - DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| -
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| - // Reset the delegate so that it won't receive further calls.
|
| - delegate_ = nullptr;
|
| - if (calling_delegate_) {
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
|
| - return;
|
| - }
|
| -
|
| - bool empty = false;
|
| - {
|
| - base::AutoLock locker(write_lock_);
|
| - empty = write_buffer_->message_queue_.IsEmpty();
|
| - }
|
| -
|
| - // Normally, we want to flush any pending writes before shutting down. This
|
| - // doesn't apply when 1) we don't have a handle (for obvious reasons),
|
| - // 2) we have a read or write error before (doesn't matter which), or 3) when
|
| - // there are no pending messages to be written.
|
| - if (!IsHandleValid() || error_occurred_ || empty) {
|
| - {
|
| - base::AutoLock read_locker(read_lock_);
|
| - base::AutoLock locker(write_lock_);
|
| - OnShutdownNoLock(std::move(read_buffer_), std::move(write_buffer_));
|
| - if (initialized_)
|
| - base::MessageLoop::current()->RemoveDestructionObserver(this);
|
| - }
|
| -
|
| - delete this;
|
| - return;
|
| - }
|
| -
|
| - base::AutoLock read_locker(read_lock_);
|
| - base::AutoLock locker(write_lock_);
|
| - DCHECK(read_buffer_->IsEmpty()) <<
|
| - "RawChannel::Shutdown called but there is pending data to be read";
|
| -
|
| - write_stopped_ = true;
|
| -}
|
| -
|
| -ScopedPlatformHandle RawChannel::ReleaseHandle(
|
| - std::vector<char>* serialized_read_buffer,
|
| - std::vector<char>* serialized_write_buffer,
|
| - std::vector<int>* serialized_read_fds,
|
| - std::vector<int>* serialized_write_fds,
|
| - bool* write_error) {
|
| - ScopedPlatformHandle rv;
|
| - *write_error = false;
|
| - {
|
| - base::AutoLock read_locker(read_lock_);
|
| - base::AutoLock locker(write_lock_);
|
| - rv = ReleaseHandleNoLock(serialized_read_buffer,
|
| - serialized_write_buffer,
|
| - serialized_read_fds,
|
| - serialized_write_fds,
|
| - write_error);
|
| - delegate_ = nullptr;
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| -
|
| - return rv;
|
| -}
|
| -
|
| -// Reminder: This must be thread-safe.
|
| -bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
|
| - DCHECK(message);
|
| - EnsureLazyInitialized();
|
| - base::AutoLock locker(write_lock_);
|
| - if (write_stopped_)
|
| - return false;
|
| -
|
| - bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
|
| - EnqueueMessageNoLock(std::move(message));
|
| - if (queue_was_empty && write_ready_)
|
| - return SendQueuedMessagesNoLock();
|
| -
|
| - return true;
|
| -}
|
| -
|
| -bool RawChannel::SendQueuedMessagesNoLock() {
|
| - DCHECK_EQ(write_buffer_->data_offset_, 0u);
|
| -
|
| - size_t platform_handles_written = 0;
|
| - size_t bytes_written = 0;
|
| - IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
|
| - if (io_result == IO_PENDING)
|
| - return true;
|
| -
|
| - bool result = OnWriteCompletedInternalNoLock(
|
| - io_result, platform_handles_written, bytes_written);
|
| - if (!result) {
|
| - // Even if we're on the I/O thread, don't call |OnError()| in the nested
|
| - // context.
|
| - pending_write_error_ = true;
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::LockAndCallOnError,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - Delegate::ERROR_WRITE));
|
| - }
|
| -
|
| - return result;
|
| -}
|
| -
|
| -void RawChannel::SetSerializedData(
|
| - char* serialized_read_buffer, size_t serialized_read_buffer_size,
|
| - char* serialized_write_buffer, size_t serialized_write_buffer_size,
|
| - std::vector<int>* serialized_read_fds,
|
| - std::vector<int>* serialized_write_fds) {
|
| - base::AutoLock locker(read_lock_);
|
| -
|
| -#if defined(OS_POSIX)
|
| - SetSerializedFDs(serialized_read_fds, serialized_write_fds);
|
| -#endif
|
| -
|
| - if (serialized_read_buffer_size) {
|
| - // TODO(jam): copy power of 2 algorithm below? or share.
|
| - read_buffer_->buffer_.resize(serialized_read_buffer_size + kReadSize);
|
| - memcpy(&read_buffer_->buffer_[0], serialized_read_buffer,
|
| - serialized_read_buffer_size);
|
| - read_buffer_->num_valid_bytes_ = serialized_read_buffer_size;
|
| - }
|
| -
|
| - if (serialized_write_buffer_size) {
|
| - size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
|
| -
|
| - uint32_t offset = 0;
|
| - while (offset < serialized_write_buffer_size) {
|
| - uint32_t message_num_bytes =
|
| - std::min(static_cast<uint32_t>(max_message_num_bytes),
|
| - static_cast<uint32_t>(serialized_write_buffer_size) -
|
| - offset);
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::Type::RAW_MESSAGE, message_num_bytes,
|
| - static_cast<const char*>(serialized_write_buffer) + offset));
|
| - write_buffer_->message_queue_.AddMessage(std::move(message));
|
| - offset += message_num_bytes;
|
| - }
|
| - }
|
| -}
|
| -
|
| -void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
|
| - DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| - read_lock_.AssertAcquired();
|
| - // Keep reading data in a loop, and dispatch messages if enough data is
|
| - // received. Exit the loop if any of the following happens:
|
| - // - one or more messages were dispatched;
|
| - // - the last read failed, was a partial read or would block;
|
| - // - |Shutdown()| was called.
|
| - do {
|
| - switch (io_result) {
|
| - case IO_SUCCEEDED:
|
| - break;
|
| - case IO_FAILED_SHUTDOWN:
|
| - case IO_FAILED_BROKEN:
|
| - case IO_FAILED_UNKNOWN:
|
| - CallOnError(ReadIOResultToError(io_result));
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - case IO_PENDING:
|
| - NOTREACHED();
|
| - return;
|
| - }
|
| -
|
| - read_buffer_->num_valid_bytes_ += bytes_read;
|
| -
|
| - // Dispatch all the messages that we can.
|
| - bool did_dispatch_message = false;
|
| - bool stop_dispatching = false;
|
| - DispatchMessages(&did_dispatch_message, &stop_dispatching);
|
| - if (stop_dispatching)
|
| - return;
|
| -
|
| - if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
|
| - kReadSize) {
|
| - // Use power-of-2 buffer sizes.
|
| - // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
|
| - // maximum message size to whatever extent necessary).
|
| - // TODO(vtl): We may often be able to peek at the header and get the real
|
| - // required extra space (which may be much bigger than |kReadSize|).
|
| - size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
|
| - while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
|
| - new_size *= 2;
|
| -
|
| - // TODO(vtl): It's suboptimal to zero out the fresh memory.
|
| - read_buffer_->buffer_.resize(new_size, 0);
|
| - }
|
| -
|
| - // (1) If we dispatched any messages, stop reading for now (and let the
|
| - // message loop do its thing for another round).
|
| - // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
|
| - // a single message. Risks: slower, more complex if we want to avoid lots of
|
| - // copying. ii. Keep reading until there's no more data and dispatch all the
|
| - // messages we can. Risks: starvation of other users of the message loop.)
|
| - // (2) If we didn't max out |kReadSize|, stop reading for now.
|
| - bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
|
| - bytes_read = 0;
|
| - io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
|
| - } while (io_result != IO_PENDING);
|
| -}
|
| -
|
| -void RawChannel::OnWriteCompletedNoLock(IOResult io_result,
|
| - size_t platform_handles_written,
|
| - size_t bytes_written) {
|
| - DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| - write_lock_.AssertAcquired();
|
| - DCHECK_NE(io_result, IO_PENDING);
|
| -
|
| - bool did_fail = !OnWriteCompletedInternalNoLock(
|
| - io_result, platform_handles_written, bytes_written);
|
| - if (did_fail) {
|
| - // Don't want to call the delegate with the current callstack for two
|
| - // reasons:
|
| - // 1) We already have write_lock_ acquired, and calling the delegate means
|
| - // also acquiring the read lock. We need to acquire read and then write to
|
| - // avoid deadlocks.
|
| - // 2) We shouldn't call the delegate with write_lock acquired, since the
|
| - // delegate could be calling WriteMessage and that can cause deadlocks.
|
| - pending_write_error_ = true;
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::LockAndCallOnError,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - Delegate::ERROR_WRITE));
|
| - }
|
| -}
|
| -
|
| -void RawChannel::SerializeReadBuffer(size_t additional_bytes_read,
|
| - std::vector<char>* buffer) {
|
| - read_lock_.AssertAcquired();
|
| - read_buffer_->num_valid_bytes_ += additional_bytes_read;
|
| - read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_);
|
| - read_buffer_->buffer_.swap(*buffer);
|
| - read_buffer_->num_valid_bytes_ = 0;
|
| -}
|
| -
|
| -void RawChannel::SerializeWriteBuffer(
|
| - size_t additional_bytes_written,
|
| - size_t additional_platform_handles_written,
|
| - std::vector<char>* buffer,
|
| - std::vector<int>* fds) {
|
| - write_lock_.AssertAcquired();
|
| - if (write_buffer_->IsEmpty()) {
|
| - DCHECK_EQ(0u, additional_bytes_written);
|
| - DCHECK_EQ(0u, additional_platform_handles_written);
|
| - return;
|
| - }
|
| -
|
| - UpdateWriteBuffer(
|
| - additional_platform_handles_written, additional_bytes_written);
|
| - while (!write_buffer_->message_queue_.IsEmpty()) {
|
| - SerializePlatformHandles(fds);
|
| - std::vector<WriteBuffer::Buffer> buffers;
|
| - write_buffer_no_lock()->GetBuffers(&buffers);
|
| - for (size_t i = 0; i < buffers.size(); ++i) {
|
| - buffer->insert(buffer->end(), buffers[i].addr,
|
| - buffers[i].addr + buffers[i].size);
|
| - }
|
| - write_buffer_->message_queue_.DiscardMessage();
|
| - }
|
| -}
|
| -
|
| -void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
|
| - write_lock_.AssertAcquired();
|
| - write_buffer_->message_queue_.AddMessage(std::move(message));
|
| -}
|
| -
|
| -bool RawChannel::OnReadMessageForRawChannel(
|
| - const MessageInTransit::View& message_view) {
|
| - LOG(ERROR) << "Invalid control message (type " << message_view.type()
|
| - << ")";
|
| - return false;
|
| -}
|
| -
|
| -RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
|
| - IOResult io_result) {
|
| - switch (io_result) {
|
| - case IO_FAILED_SHUTDOWN:
|
| - return Delegate::ERROR_READ_SHUTDOWN;
|
| - case IO_FAILED_BROKEN:
|
| - return Delegate::ERROR_READ_BROKEN;
|
| - case IO_FAILED_UNKNOWN:
|
| - return Delegate::ERROR_READ_UNKNOWN;
|
| - case IO_SUCCEEDED:
|
| - case IO_PENDING:
|
| - NOTREACHED();
|
| - break;
|
| - }
|
| - return Delegate::ERROR_READ_UNKNOWN;
|
| -}
|
| -
|
| -void RawChannel::CallOnError(Delegate::Error error) {
|
| - DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
|
| - read_lock_.AssertAcquired();
|
| - error_occurred_ = true;
|
| - if (delegate_) {
|
| - DCHECK(!calling_delegate_);
|
| - calling_delegate_ = true;
|
| - delegate_->OnError(error);
|
| - calling_delegate_ = false;
|
| - } else {
|
| - // We depend on delegate to delete since it could be waiting to call
|
| - // ReleaseHandle.
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| -}
|
| -
|
| -void RawChannel::LockAndCallOnError(Delegate::Error error) {
|
| - base::AutoLock locker(read_lock_);
|
| - CallOnError(error);
|
| -}
|
| -
|
| -bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result,
|
| - size_t platform_handles_written,
|
| - size_t bytes_written) {
|
| - write_lock_.AssertAcquired();
|
| -
|
| - DCHECK(!write_buffer_->message_queue_.IsEmpty());
|
| -
|
| - if (io_result == IO_SUCCEEDED) {
|
| - UpdateWriteBuffer(platform_handles_written, bytes_written);
|
| - if (write_buffer_->message_queue_.IsEmpty()) {
|
| - if (!delegate_) {
|
| - // Shutdown must have been called and we were waiting to flush all
|
| - // pending writes. Now we're done.
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| - return true;
|
| - }
|
| -
|
| - // Schedule the next write.
|
| - io_result = ScheduleWriteNoLock();
|
| - if (io_result == IO_PENDING)
|
| - return true;
|
| - DCHECK_NE(io_result, IO_SUCCEEDED);
|
| - }
|
| -
|
| - write_stopped_ = true;
|
| - write_buffer_->message_queue_.Clear();
|
| - write_buffer_->platform_handles_offset_ = 0;
|
| - write_buffer_->data_offset_ = 0;
|
| - return false;
|
| -}
|
| -
|
| -void RawChannel::DispatchMessages(bool* did_dispatch_message,
|
| - bool* stop_dispatching) {
|
| - *did_dispatch_message = false;
|
| - *stop_dispatching = false;
|
| - // Tracks the offset of the first undispatched message in |read_buffer_|.
|
| - // Currently, we copy data to ensure that this is zero at the beginning.
|
| - size_t read_buffer_start = 0;
|
| - size_t remaining_bytes = read_buffer_->num_valid_bytes_;
|
| - size_t message_size;
|
| - // Note that we rely on short-circuit evaluation here:
|
| - // - |read_buffer_start| may be an invalid index into
|
| - // |read_buffer_->buffer_| if |remaining_bytes| is zero.
|
| - // - |message_size| is only valid if |GetNextMessageSize()| returns true.
|
| - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
|
| - // next read).
|
| - // TODO(vtl): Validate that |message_size| is sane.
|
| - while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
|
| - &read_buffer_->buffer_[read_buffer_start],
|
| - remaining_bytes, &message_size) &&
|
| - remaining_bytes >= message_size) {
|
| - MessageInTransit::View message_view(
|
| - message_size, &read_buffer_->buffer_[read_buffer_start]);
|
| - DCHECK_EQ(message_view.total_size(), message_size);
|
| -
|
| - const char* error_message = nullptr;
|
| - if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
|
| - &error_message)) {
|
| - DCHECK(error_message);
|
| - LOG(ERROR) << "Received invalid message: " << error_message;
|
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| - *stop_dispatching = true;
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - }
|
| -
|
| - if (message_view.type() != MessageInTransit::Type::MESSAGE &&
|
| - message_view.type() != MessageInTransit::Type::QUIT_MESSAGE) {
|
| - if (!OnReadMessageForRawChannel(message_view)) {
|
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| - *stop_dispatching = true;
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - }
|
| - } else {
|
| - ScopedPlatformHandleVectorPtr platform_handles;
|
| - if (message_view.transport_data_buffer()) {
|
| - size_t num_platform_handles;
|
| - const void* platform_handle_table;
|
| - TransportData::GetPlatformHandleTable(
|
| - message_view.transport_data_buffer(), &num_platform_handles,
|
| - &platform_handle_table);
|
| -
|
| - if (num_platform_handles > 0) {
|
| - platform_handles = GetReadPlatformHandles(num_platform_handles,
|
| - platform_handle_table);
|
| - if (!platform_handles) {
|
| - LOG(ERROR) << "Invalid number of platform handles received";
|
| - CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
|
| - *stop_dispatching = true;
|
| - return; // |this| may have been destroyed in |CallOnError()|.
|
| - }
|
| - }
|
| - }
|
| -
|
| - // TODO(vtl): In the case that we aren't expecting any platform handles,
|
| - // for the POSIX implementation, we should confirm that none are stored.
|
| - if (delegate_) {
|
| - DCHECK(!calling_delegate_);
|
| - calling_delegate_ = true;
|
| - delegate_->OnReadMessage(message_view, std::move(platform_handles));
|
| - calling_delegate_ = false;
|
| - }
|
| - }
|
| -
|
| - *did_dispatch_message = true;
|
| -
|
| - // Update our state.
|
| - read_buffer_start += message_size;
|
| - remaining_bytes -= message_size;
|
| - }
|
| -
|
| - if (read_buffer_start > 0) {
|
| - // Move data back to start.
|
| - read_buffer_->num_valid_bytes_ = remaining_bytes;
|
| - if (read_buffer_->num_valid_bytes_ > 0) {
|
| - memmove(&read_buffer_->buffer_[0],
|
| - &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
|
| - }
|
| - read_buffer_start = 0;
|
| - }
|
| -}
|
| -
|
| -void RawChannel::UpdateWriteBuffer(size_t platform_handles_written,
|
| - size_t bytes_written) {
|
| - write_buffer_->platform_handles_offset_ += platform_handles_written;
|
| - write_buffer_->data_offset_ += bytes_written;
|
| -
|
| - MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
|
| - if (write_buffer_->data_offset_ >= message->total_size()) {
|
| - // Complete write.
|
| - CHECK_EQ(write_buffer_->data_offset_, message->total_size());
|
| - write_buffer_->message_queue_.DiscardMessage();
|
| - write_buffer_->platform_handles_offset_ = 0;
|
| - write_buffer_->data_offset_ = 0;
|
| - }
|
| -}
|
| -
|
| -void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) {
|
| - base::AutoLock locker(read_lock_);
|
| - OnReadCompletedNoLock(io_result, bytes_read);
|
| -}
|
| -
|
| -void RawChannel::WillDestroyCurrentMessageLoop() {
|
| - {
|
| - base::AutoLock locker(read_lock_);
|
| - OnReadCompletedNoLock(IO_FAILED_SHUTDOWN, 0);
|
| - }
|
| - // The PostTask inside Shutdown() will never be called, so manually call it
|
| - // here to avoid leaks in LSAN builds.
|
| - Shutdown();
|
| -}
|
| -
|
| -} // namespace edk
|
| -} // namespace mojo
|
|
|