Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(327)

Unified Diff: mojo/edk/system/raw_channel.cc

Issue 1649633002: Remove files that are no longer used in the Port EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/raw_channel.cc
diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
deleted file mode 100644
index 93c2d05d61f4a97c7050d26cdbc9ffc196c65806..0000000000000000000000000000000000000000
--- a/mojo/edk/system/raw_channel.cc
+++ /dev/null
@@ -1,739 +0,0 @@
-// Copyright 2014 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "mojo/edk/system/raw_channel.h"
-
-#include <stddef.h>
-#include <stdint.h>
-#include <string.h>
-#include <algorithm>
-#include <utility>
-
-#include "base/bind.h"
-#include "base/location.h"
-#include "base/logging.h"
-#include "base/message_loop/message_loop.h"
-#include "mojo/edk/embedder/embedder_internal.h"
-#include "mojo/edk/system/configuration.h"
-#include "mojo/edk/system/message_in_transit.h"
-#include "mojo/edk/system/transport_data.h"
-
-namespace mojo {
-namespace edk {
-
-const size_t kReadSize = 4096;
-
-// RawChannel::ReadBuffer ------------------------------------------------------
-
-RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
-}
-
-RawChannel::ReadBuffer::~ReadBuffer() {
-}
-
-void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
- DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
- *addr = &buffer_[0] + num_valid_bytes_;
- *size = kReadSize;
-}
-
-// RawChannel::WriteBuffer -----------------------------------------------------
-
-RawChannel::WriteBuffer::WriteBuffer()
- : serialized_platform_handle_size_(0),
- platform_handles_offset_(0),
- data_offset_(0) {
-}
-
-RawChannel::WriteBuffer::~WriteBuffer() {
- message_queue_.Clear();
-}
-
-bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
- if (message_queue_.IsEmpty())
- return false;
-
- const TransportData* transport_data =
- message_queue_.PeekMessage()->transport_data();
- if (!transport_data)
- return false;
-
- const PlatformHandleVector* all_platform_handles =
- transport_data->platform_handles();
- if (!all_platform_handles) {
- DCHECK_EQ(platform_handles_offset_, 0u);
- return false;
- }
- if (platform_handles_offset_ >= all_platform_handles->size()) {
- DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
- return false;
- }
-
- return true;
-}
-
-void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
- size_t* num_platform_handles,
- PlatformHandle** platform_handles,
- void** serialization_data) {
- DCHECK(HavePlatformHandlesToSend());
-
- MessageInTransit* message = message_queue_.PeekMessage();
- TransportData* transport_data = message->transport_data();
- PlatformHandleVector* all_platform_handles =
- transport_data->platform_handles();
- *num_platform_handles =
- all_platform_handles->size() - platform_handles_offset_;
- *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
-
- if (serialized_platform_handle_size_ > 0) {
- size_t serialization_data_offset =
- transport_data->platform_handle_table_offset();
- serialization_data_offset +=
- platform_handles_offset_ * serialized_platform_handle_size_;
- *serialization_data = static_cast<char*>(transport_data->buffer()) +
- serialization_data_offset;
- } else {
- *serialization_data = nullptr;
- }
-}
-
-void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) {
- buffers->clear();
-
- if (message_queue_.IsEmpty())
- return;
-
- const MessageInTransit* message = message_queue_.PeekMessage();
- if (message->type() == MessageInTransit::Type::RAW_MESSAGE) {
- // These are already-serialized messages so we don't want to write another
- // header as they include that.
- if (data_offset_ == 0) {
- size_t header_size = message->total_size() - message->num_bytes();
- data_offset_ = header_size;
- }
- }
-
- DCHECK_LT(data_offset_, message->total_size());
- size_t bytes_to_write = message->total_size() - data_offset_;
-
- size_t transport_data_buffer_size =
- message->transport_data() ? message->transport_data()->buffer_size() : 0;
-
- if (!transport_data_buffer_size) {
- // Only write from the main buffer.
- DCHECK_LT(data_offset_, message->main_buffer_size());
- DCHECK_LE(bytes_to_write, message->main_buffer_size());
- Buffer buffer = {
- static_cast<const char*>(message->main_buffer()) + data_offset_,
- bytes_to_write};
-
- buffers->push_back(buffer);
- return;
- }
-
- if (data_offset_ >= message->main_buffer_size()) {
- // Only write from the transport data buffer.
- DCHECK_LT(data_offset_ - message->main_buffer_size(),
- transport_data_buffer_size);
- DCHECK_LE(bytes_to_write, transport_data_buffer_size);
- Buffer buffer = {
- static_cast<const char*>(message->transport_data()->buffer()) +
- (data_offset_ - message->main_buffer_size()),
- bytes_to_write};
-
- buffers->push_back(buffer);
- return;
- }
-
- // TODO(vtl): We could actually send out buffers from multiple messages, with
- // the "stopping" condition being reaching a message with platform handles
- // attached.
-
- // Write from both buffers.
- DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
- transport_data_buffer_size);
- Buffer buffer1 = {
- static_cast<const char*>(message->main_buffer()) + data_offset_,
- message->main_buffer_size() - data_offset_};
- buffers->push_back(buffer1);
- Buffer buffer2 = {
- static_cast<const char*>(message->transport_data()->buffer()),
- transport_data_buffer_size};
- buffers->push_back(buffer2);
-}
-
-// RawChannel ------------------------------------------------------------------
-
-RawChannel::RawChannel()
- : delegate_(nullptr),
- error_occurred_(false),
- calling_delegate_(false),
- write_ready_(false),
- write_stopped_(false),
- pending_write_error_(false),
- initialized_(false),
- weak_ptr_factory_(this) {
- read_buffer_.reset(new ReadBuffer);
- write_buffer_.reset(new WriteBuffer());
-}
-
-RawChannel::~RawChannel() {
- DCHECK(!read_buffer_);
- DCHECK(!write_buffer_);
-}
-
-void RawChannel::Init(Delegate* delegate) {
- DCHECK(delegate);
-
- base::AutoLock read_locker(read_lock_);
- // Solves race where initialiing on io thread while main thread is serializing
- // this channel and releases handle.
- base::AutoLock locker(write_lock_);
-
- DCHECK(!delegate_);
- delegate_ = delegate;
-
- if (read_buffer_->num_valid_bytes_ ||
- !write_buffer_->message_queue_.IsEmpty()) {
- LazyInitialize();
- }
-}
-
-void RawChannel::EnsureLazyInitialized() {
- {
- base::AutoLock locker(write_lock_);
- if (initialized_)
- return;
- }
-
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::LockAndCallLazyInitialize,
- weak_ptr_factory_.GetWeakPtr()));
-}
-
-void RawChannel::LockAndCallLazyInitialize() {
- base::AutoLock read_locker(read_lock_);
- base::AutoLock locker(write_lock_);
- LazyInitialize();
-}
-
-void RawChannel::LazyInitialize() {
- read_lock_.AssertAcquired();
- write_lock_.AssertAcquired();
- DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
- if (initialized_)
- return;
- initialized_ = true;
- base::MessageLoop::current()->AddDestructionObserver(this);
-
- OnInit();
-
- if (read_buffer_->num_valid_bytes_) {
- // We had serialized read buffer data through SetSerializedData call.
- // Make sure we read messages out of it now, otherwise the delegate won't
- // get notified if no other data gets written to the pipe.
- // Although this means that we can call back synchronously into the caller,
- // that's easier than posting a task to do this. That is because if we post
- // a task, a pending read could have started and we wouldn't be able to move
- // the read buffer since it can be in use by the OS in an async operation.
- bool did_dispatch_message = false;
- bool stop_dispatching = false;
- DispatchMessages(&did_dispatch_message, &stop_dispatching);
- }
-
- IOResult io_result = ScheduleRead();
- if (io_result != IO_PENDING) {
- // This will notify the delegate about the read failure. Although we're on
- // the I/O thread, don't call it in the nested context.
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted,
- weak_ptr_factory_.GetWeakPtr(), io_result, 0));
- }
- // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
- // the delegate), not an initialization failure.
-
- write_ready_ = true;
- write_buffer_->serialized_platform_handle_size_ =
- GetSerializedPlatformHandleSize();
- if (!write_buffer_->message_queue_.IsEmpty())
- SendQueuedMessagesNoLock();
-}
-
-void RawChannel::Shutdown() {
- DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
-
- weak_ptr_factory_.InvalidateWeakPtrs();
- // Reset the delegate so that it won't receive further calls.
- delegate_ = nullptr;
- if (calling_delegate_) {
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
- return;
- }
-
- bool empty = false;
- {
- base::AutoLock locker(write_lock_);
- empty = write_buffer_->message_queue_.IsEmpty();
- }
-
- // Normally, we want to flush any pending writes before shutting down. This
- // doesn't apply when 1) we don't have a handle (for obvious reasons),
- // 2) we have a read or write error before (doesn't matter which), or 3) when
- // there are no pending messages to be written.
- if (!IsHandleValid() || error_occurred_ || empty) {
- {
- base::AutoLock read_locker(read_lock_);
- base::AutoLock locker(write_lock_);
- OnShutdownNoLock(std::move(read_buffer_), std::move(write_buffer_));
- if (initialized_)
- base::MessageLoop::current()->RemoveDestructionObserver(this);
- }
-
- delete this;
- return;
- }
-
- base::AutoLock read_locker(read_lock_);
- base::AutoLock locker(write_lock_);
- DCHECK(read_buffer_->IsEmpty()) <<
- "RawChannel::Shutdown called but there is pending data to be read";
-
- write_stopped_ = true;
-}
-
-ScopedPlatformHandle RawChannel::ReleaseHandle(
- std::vector<char>* serialized_read_buffer,
- std::vector<char>* serialized_write_buffer,
- std::vector<int>* serialized_read_fds,
- std::vector<int>* serialized_write_fds,
- bool* write_error) {
- ScopedPlatformHandle rv;
- *write_error = false;
- {
- base::AutoLock read_locker(read_lock_);
- base::AutoLock locker(write_lock_);
- rv = ReleaseHandleNoLock(serialized_read_buffer,
- serialized_write_buffer,
- serialized_read_fds,
- serialized_write_fds,
- write_error);
- delegate_ = nullptr;
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
- }
-
- return rv;
-}
-
-// Reminder: This must be thread-safe.
-bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
- DCHECK(message);
- EnsureLazyInitialized();
- base::AutoLock locker(write_lock_);
- if (write_stopped_)
- return false;
-
- bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
- EnqueueMessageNoLock(std::move(message));
- if (queue_was_empty && write_ready_)
- return SendQueuedMessagesNoLock();
-
- return true;
-}
-
-bool RawChannel::SendQueuedMessagesNoLock() {
- DCHECK_EQ(write_buffer_->data_offset_, 0u);
-
- size_t platform_handles_written = 0;
- size_t bytes_written = 0;
- IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
- if (io_result == IO_PENDING)
- return true;
-
- bool result = OnWriteCompletedInternalNoLock(
- io_result, platform_handles_written, bytes_written);
- if (!result) {
- // Even if we're on the I/O thread, don't call |OnError()| in the nested
- // context.
- pending_write_error_ = true;
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::LockAndCallOnError,
- weak_ptr_factory_.GetWeakPtr(),
- Delegate::ERROR_WRITE));
- }
-
- return result;
-}
-
-void RawChannel::SetSerializedData(
- char* serialized_read_buffer, size_t serialized_read_buffer_size,
- char* serialized_write_buffer, size_t serialized_write_buffer_size,
- std::vector<int>* serialized_read_fds,
- std::vector<int>* serialized_write_fds) {
- base::AutoLock locker(read_lock_);
-
-#if defined(OS_POSIX)
- SetSerializedFDs(serialized_read_fds, serialized_write_fds);
-#endif
-
- if (serialized_read_buffer_size) {
- // TODO(jam): copy power of 2 algorithm below? or share.
- read_buffer_->buffer_.resize(serialized_read_buffer_size + kReadSize);
- memcpy(&read_buffer_->buffer_[0], serialized_read_buffer,
- serialized_read_buffer_size);
- read_buffer_->num_valid_bytes_ = serialized_read_buffer_size;
- }
-
- if (serialized_write_buffer_size) {
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
-
- uint32_t offset = 0;
- while (offset < serialized_write_buffer_size) {
- uint32_t message_num_bytes =
- std::min(static_cast<uint32_t>(max_message_num_bytes),
- static_cast<uint32_t>(serialized_write_buffer_size) -
- offset);
- scoped_ptr<MessageInTransit> message(new MessageInTransit(
- MessageInTransit::Type::RAW_MESSAGE, message_num_bytes,
- static_cast<const char*>(serialized_write_buffer) + offset));
- write_buffer_->message_queue_.AddMessage(std::move(message));
- offset += message_num_bytes;
- }
- }
-}
-
-void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
- DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
- read_lock_.AssertAcquired();
- // Keep reading data in a loop, and dispatch messages if enough data is
- // received. Exit the loop if any of the following happens:
- // - one or more messages were dispatched;
- // - the last read failed, was a partial read or would block;
- // - |Shutdown()| was called.
- do {
- switch (io_result) {
- case IO_SUCCEEDED:
- break;
- case IO_FAILED_SHUTDOWN:
- case IO_FAILED_BROKEN:
- case IO_FAILED_UNKNOWN:
- CallOnError(ReadIOResultToError(io_result));
- return; // |this| may have been destroyed in |CallOnError()|.
- case IO_PENDING:
- NOTREACHED();
- return;
- }
-
- read_buffer_->num_valid_bytes_ += bytes_read;
-
- // Dispatch all the messages that we can.
- bool did_dispatch_message = false;
- bool stop_dispatching = false;
- DispatchMessages(&did_dispatch_message, &stop_dispatching);
- if (stop_dispatching)
- return;
-
- if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
- kReadSize) {
- // Use power-of-2 buffer sizes.
- // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
- // maximum message size to whatever extent necessary).
- // TODO(vtl): We may often be able to peek at the header and get the real
- // required extra space (which may be much bigger than |kReadSize|).
- size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
- while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
- new_size *= 2;
-
- // TODO(vtl): It's suboptimal to zero out the fresh memory.
- read_buffer_->buffer_.resize(new_size, 0);
- }
-
- // (1) If we dispatched any messages, stop reading for now (and let the
- // message loop do its thing for another round).
- // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
- // a single message. Risks: slower, more complex if we want to avoid lots of
- // copying. ii. Keep reading until there's no more data and dispatch all the
- // messages we can. Risks: starvation of other users of the message loop.)
- // (2) If we didn't max out |kReadSize|, stop reading for now.
- bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
- bytes_read = 0;
- io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
- } while (io_result != IO_PENDING);
-}
-
-void RawChannel::OnWriteCompletedNoLock(IOResult io_result,
- size_t platform_handles_written,
- size_t bytes_written) {
- DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
- write_lock_.AssertAcquired();
- DCHECK_NE(io_result, IO_PENDING);
-
- bool did_fail = !OnWriteCompletedInternalNoLock(
- io_result, platform_handles_written, bytes_written);
- if (did_fail) {
- // Don't want to call the delegate with the current callstack for two
- // reasons:
- // 1) We already have write_lock_ acquired, and calling the delegate means
- // also acquiring the read lock. We need to acquire read and then write to
- // avoid deadlocks.
- // 2) We shouldn't call the delegate with write_lock acquired, since the
- // delegate could be calling WriteMessage and that can cause deadlocks.
- pending_write_error_ = true;
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::LockAndCallOnError,
- weak_ptr_factory_.GetWeakPtr(),
- Delegate::ERROR_WRITE));
- }
-}
-
-void RawChannel::SerializeReadBuffer(size_t additional_bytes_read,
- std::vector<char>* buffer) {
- read_lock_.AssertAcquired();
- read_buffer_->num_valid_bytes_ += additional_bytes_read;
- read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_);
- read_buffer_->buffer_.swap(*buffer);
- read_buffer_->num_valid_bytes_ = 0;
-}
-
-void RawChannel::SerializeWriteBuffer(
- size_t additional_bytes_written,
- size_t additional_platform_handles_written,
- std::vector<char>* buffer,
- std::vector<int>* fds) {
- write_lock_.AssertAcquired();
- if (write_buffer_->IsEmpty()) {
- DCHECK_EQ(0u, additional_bytes_written);
- DCHECK_EQ(0u, additional_platform_handles_written);
- return;
- }
-
- UpdateWriteBuffer(
- additional_platform_handles_written, additional_bytes_written);
- while (!write_buffer_->message_queue_.IsEmpty()) {
- SerializePlatformHandles(fds);
- std::vector<WriteBuffer::Buffer> buffers;
- write_buffer_no_lock()->GetBuffers(&buffers);
- for (size_t i = 0; i < buffers.size(); ++i) {
- buffer->insert(buffer->end(), buffers[i].addr,
- buffers[i].addr + buffers[i].size);
- }
- write_buffer_->message_queue_.DiscardMessage();
- }
-}
-
-void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
- write_lock_.AssertAcquired();
- write_buffer_->message_queue_.AddMessage(std::move(message));
-}
-
-bool RawChannel::OnReadMessageForRawChannel(
- const MessageInTransit::View& message_view) {
- LOG(ERROR) << "Invalid control message (type " << message_view.type()
- << ")";
- return false;
-}
-
-RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
- IOResult io_result) {
- switch (io_result) {
- case IO_FAILED_SHUTDOWN:
- return Delegate::ERROR_READ_SHUTDOWN;
- case IO_FAILED_BROKEN:
- return Delegate::ERROR_READ_BROKEN;
- case IO_FAILED_UNKNOWN:
- return Delegate::ERROR_READ_UNKNOWN;
- case IO_SUCCEEDED:
- case IO_PENDING:
- NOTREACHED();
- break;
- }
- return Delegate::ERROR_READ_UNKNOWN;
-}
-
-void RawChannel::CallOnError(Delegate::Error error) {
- DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
- read_lock_.AssertAcquired();
- error_occurred_ = true;
- if (delegate_) {
- DCHECK(!calling_delegate_);
- calling_delegate_ = true;
- delegate_->OnError(error);
- calling_delegate_ = false;
- } else {
- // We depend on delegate to delete since it could be waiting to call
- // ReleaseHandle.
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
- }
-}
-
-void RawChannel::LockAndCallOnError(Delegate::Error error) {
- base::AutoLock locker(read_lock_);
- CallOnError(error);
-}
-
-bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result,
- size_t platform_handles_written,
- size_t bytes_written) {
- write_lock_.AssertAcquired();
-
- DCHECK(!write_buffer_->message_queue_.IsEmpty());
-
- if (io_result == IO_SUCCEEDED) {
- UpdateWriteBuffer(platform_handles_written, bytes_written);
- if (write_buffer_->message_queue_.IsEmpty()) {
- if (!delegate_) {
- // Shutdown must have been called and we were waiting to flush all
- // pending writes. Now we're done.
- internal::g_io_thread_task_runner->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
- }
- return true;
- }
-
- // Schedule the next write.
- io_result = ScheduleWriteNoLock();
- if (io_result == IO_PENDING)
- return true;
- DCHECK_NE(io_result, IO_SUCCEEDED);
- }
-
- write_stopped_ = true;
- write_buffer_->message_queue_.Clear();
- write_buffer_->platform_handles_offset_ = 0;
- write_buffer_->data_offset_ = 0;
- return false;
-}
-
-void RawChannel::DispatchMessages(bool* did_dispatch_message,
- bool* stop_dispatching) {
- *did_dispatch_message = false;
- *stop_dispatching = false;
- // Tracks the offset of the first undispatched message in |read_buffer_|.
- // Currently, we copy data to ensure that this is zero at the beginning.
- size_t read_buffer_start = 0;
- size_t remaining_bytes = read_buffer_->num_valid_bytes_;
- size_t message_size;
- // Note that we rely on short-circuit evaluation here:
- // - |read_buffer_start| may be an invalid index into
- // |read_buffer_->buffer_| if |remaining_bytes| is zero.
- // - |message_size| is only valid if |GetNextMessageSize()| returns true.
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
- // next read).
- // TODO(vtl): Validate that |message_size| is sane.
- while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
- &read_buffer_->buffer_[read_buffer_start],
- remaining_bytes, &message_size) &&
- remaining_bytes >= message_size) {
- MessageInTransit::View message_view(
- message_size, &read_buffer_->buffer_[read_buffer_start]);
- DCHECK_EQ(message_view.total_size(), message_size);
-
- const char* error_message = nullptr;
- if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
- &error_message)) {
- DCHECK(error_message);
- LOG(ERROR) << "Received invalid message: " << error_message;
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- *stop_dispatching = true;
- return; // |this| may have been destroyed in |CallOnError()|.
- }
-
- if (message_view.type() != MessageInTransit::Type::MESSAGE &&
- message_view.type() != MessageInTransit::Type::QUIT_MESSAGE) {
- if (!OnReadMessageForRawChannel(message_view)) {
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- *stop_dispatching = true;
- return; // |this| may have been destroyed in |CallOnError()|.
- }
- } else {
- ScopedPlatformHandleVectorPtr platform_handles;
- if (message_view.transport_data_buffer()) {
- size_t num_platform_handles;
- const void* platform_handle_table;
- TransportData::GetPlatformHandleTable(
- message_view.transport_data_buffer(), &num_platform_handles,
- &platform_handle_table);
-
- if (num_platform_handles > 0) {
- platform_handles = GetReadPlatformHandles(num_platform_handles,
- platform_handle_table);
- if (!platform_handles) {
- LOG(ERROR) << "Invalid number of platform handles received";
- CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
- *stop_dispatching = true;
- return; // |this| may have been destroyed in |CallOnError()|.
- }
- }
- }
-
- // TODO(vtl): In the case that we aren't expecting any platform handles,
- // for the POSIX implementation, we should confirm that none are stored.
- if (delegate_) {
- DCHECK(!calling_delegate_);
- calling_delegate_ = true;
- delegate_->OnReadMessage(message_view, std::move(platform_handles));
- calling_delegate_ = false;
- }
- }
-
- *did_dispatch_message = true;
-
- // Update our state.
- read_buffer_start += message_size;
- remaining_bytes -= message_size;
- }
-
- if (read_buffer_start > 0) {
- // Move data back to start.
- read_buffer_->num_valid_bytes_ = remaining_bytes;
- if (read_buffer_->num_valid_bytes_ > 0) {
- memmove(&read_buffer_->buffer_[0],
- &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
- }
- read_buffer_start = 0;
- }
-}
-
-void RawChannel::UpdateWriteBuffer(size_t platform_handles_written,
- size_t bytes_written) {
- write_buffer_->platform_handles_offset_ += platform_handles_written;
- write_buffer_->data_offset_ += bytes_written;
-
- MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
- if (write_buffer_->data_offset_ >= message->total_size()) {
- // Complete write.
- CHECK_EQ(write_buffer_->data_offset_, message->total_size());
- write_buffer_->message_queue_.DiscardMessage();
- write_buffer_->platform_handles_offset_ = 0;
- write_buffer_->data_offset_ = 0;
- }
-}
-
-void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) {
- base::AutoLock locker(read_lock_);
- OnReadCompletedNoLock(io_result, bytes_read);
-}
-
-void RawChannel::WillDestroyCurrentMessageLoop() {
- {
- base::AutoLock locker(read_lock_);
- OnReadCompletedNoLock(IO_FAILED_SHUTDOWN, 0);
- }
- // The PostTask inside Shutdown() will never be called, so manually call it
- // here to avoid leaks in LSAN builds.
- Shutdown();
-}
-
-} // namespace edk
-} // namespace mojo
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698