| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| index 68c9e0e038875f75385c383b3bb018b985d020f2..386ab14b82e95ad1a7c2d12dfb1144ac763cc383 100644
|
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| @@ -8,158 +8,92 @@
|
| #include <stdint.h>
|
|
|
| #include <algorithm>
|
| +#include <limits>
|
| #include <utility>
|
|
|
| #include "base/bind.h"
|
| #include "base/logging.h"
|
| +#include "base/memory/ref_counted.h"
|
| #include "base/message_loop/message_loop.h"
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| #include "mojo/edk/embedder/platform_shared_buffer.h"
|
| #include "mojo/edk/embedder/platform_support.h"
|
| -#include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/core.h"
|
| +#include "mojo/edk/system/data_pipe_control_message.h"
|
| +#include "mojo/edk/system/node_controller.h"
|
| +#include "mojo/edk/system/ports_message.h"
|
| +#include "mojo/public/c/system/data_pipe.h"
|
|
|
| namespace mojo {
|
| namespace edk {
|
|
|
| -struct SharedMemoryHeader {
|
| - uint32_t data_size;
|
| - uint32_t read_buffer_size;
|
| +namespace {
|
| +
|
| +struct MOJO_ALIGNAS(8) SerializedState {
|
| + MojoCreateDataPipeOptions options;
|
| + uint64_t pipe_id;
|
| + bool peer_closed;
|
| + uint32_t read_offset;
|
| + uint32_t bytes_available;
|
| };
|
|
|
| -void DataPipeConsumerDispatcher::Init(
|
| - ScopedPlatformHandle message_pipe,
|
| - char* serialized_read_buffer, size_t serialized_read_buffer_size) {
|
| - if (message_pipe.is_valid()) {
|
| - channel_ = RawChannel::Create(std::move(message_pipe));
|
| - channel_->SetSerializedData(
|
| - serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u,
|
| - nullptr, nullptr);
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
|
| - } else {
|
| - // The data pipe consumer could have read all the data and the producer
|
| - // closed its end subsequently (before the consumer was sent). In that case
|
| - // when we deserialize the consumer we must make sure to set error_ or
|
| - // otherwise the peer-closed signal will never be satisfied.
|
| - error_ = true;
|
| - }
|
| -}
|
| +} // namespace
|
|
|
| -void DataPipeConsumerDispatcher::InitOnIO() {
|
| - base::AutoLock locker(lock());
|
| - calling_init_ = true;
|
| - if (channel_)
|
| - channel_->Init(this);
|
| - calling_init_ = false;
|
| -}
|
| +// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
|
| +// reference to the dispatcher to ensure it lives as long as the observed port.
|
| +class DataPipeConsumerDispatcher::PortObserverThunk
|
| + : public NodeController::PortObserver {
|
| + public:
|
| + explicit PortObserverThunk(
|
| + scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
|
| + : dispatcher_(dispatcher) {}
|
|
|
| -void DataPipeConsumerDispatcher::CloseOnIO() {
|
| - base::AutoLock locker(lock());
|
| - if (channel_) {
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| -}
|
| + private:
|
| + ~PortObserverThunk() override {}
|
|
|
| -Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
|
| - return Type::DATA_PIPE_CONSUMER;
|
| -}
|
| + // NodeController::PortObserver:
|
| + void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
|
|
|
| -scoped_refptr<DataPipeConsumerDispatcher>
|
| -DataPipeConsumerDispatcher::Deserialize(
|
| - const void* source,
|
| - size_t size,
|
| - PlatformHandleVector* platform_handles) {
|
| - MojoCreateDataPipeOptions options;
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = 0;
|
| -
|
| - ScopedPlatformHandle platform_handle =
|
| - DataPipe::Deserialize(source, size, platform_handles, &options,
|
| - &shared_memory_handle, &shared_memory_size);
|
| -
|
| - scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
|
| -
|
| - char* serialized_read_buffer = nullptr;
|
| - size_t serialized_read_buffer_size = 0;
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer;
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping;
|
| - if (shared_memory_size) {
|
| - shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
|
| - shared_memory_size, std::move(shared_memory_handle));
|
| - mapping = shared_buffer->Map(0, shared_memory_size);
|
| - char* buffer = static_cast<char*>(mapping->GetBase());
|
| - SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
|
| - buffer += sizeof(SharedMemoryHeader);
|
| - if (header->data_size) {
|
| - rv->data_.assign(buffer, buffer + header->data_size);
|
| - buffer += header->data_size;
|
| - }
|
| + scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
|
|
|
| - if (header->read_buffer_size) {
|
| - serialized_read_buffer = buffer;
|
| - serialized_read_buffer_size = header->read_buffer_size;
|
| - buffer += header->read_buffer_size;
|
| - }
|
| - }
|
| -
|
| - rv->Init(std::move(platform_handle), serialized_read_buffer,
|
| - serialized_read_buffer_size);
|
| - return rv;
|
| -}
|
| + DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
|
| +};
|
|
|
| DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
|
| - const MojoCreateDataPipeOptions& options)
|
| + NodeController* node_controller,
|
| + const ports::PortRef& control_port,
|
| + scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
|
| + const MojoCreateDataPipeOptions& options,
|
| + bool initialized,
|
| + uint64_t pipe_id)
|
| : options_(options),
|
| - channel_(nullptr),
|
| - calling_init_(false),
|
| - in_two_phase_read_(false),
|
| - two_phase_max_bytes_read_(0),
|
| - error_(false),
|
| - serialized_(false) {
|
| -}
|
| -
|
| -DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
|
| - // See comment in ~MessagePipeDispatcher.
|
| - if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
|
| - channel_->Shutdown();
|
| - else
|
| - DCHECK(!channel_);
|
| + node_controller_(node_controller),
|
| + control_port_(control_port),
|
| + pipe_id_(pipe_id),
|
| + shared_ring_buffer_(shared_ring_buffer) {
|
| + if (initialized) {
|
| + base::AutoLock lock(lock_);
|
| + InitializeNoLock();
|
| + }
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
|
| - lock().AssertAcquired();
|
| - awakable_list_.CancelAll();
|
| +Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
|
| + return Type::DATA_PIPE_CONSUMER;
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::CloseImplNoLock() {
|
| - lock().AssertAcquired();
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
|
| +MojoResult DataPipeConsumerDispatcher::Close() {
|
| + base::AutoLock lock(lock_);
|
| + DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
|
| + return CloseNoLock();
|
| }
|
|
|
| -scoped_refptr<Dispatcher>
|
| -DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
| - lock().AssertAcquired();
|
| -
|
| - SerializeInternal();
|
| -
|
| - scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
|
| - data_.swap(rv->data_);
|
| - serialized_read_buffer_.swap(rv->serialized_read_buffer_);
|
| - rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
|
| - rv->serialized_ = true;
|
| -
|
| - return scoped_refptr<Dispatcher>(rv.get());
|
| -}
|
| +MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| + uint32_t* num_bytes,
|
| + MojoReadDataFlags flags) {
|
| + base::AutoLock lock(lock_);
|
| + if (!shared_ring_buffer_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| -MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
|
| - void* elements,
|
| - uint32_t* num_bytes,
|
| - MojoReadDataFlags flags) {
|
| - lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| if (in_two_phase_read_)
|
| return MOJO_RESULT_BUSY;
|
|
|
| @@ -170,7 +104,7 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
|
| DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
|
| DVLOG_IF(2, elements)
|
| << "Query mode: ignoring non-null |elements|";
|
| - *num_bytes = static_cast<uint32_t>(data_.size());
|
| + *num_bytes = static_cast<uint32_t>(bytes_available_);
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -192,32 +126,54 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
|
| uint32_t min_num_bytes_to_read =
|
| all_or_none ? max_num_bytes_to_read : 0;
|
|
|
| - if (min_num_bytes_to_read > data_.size())
|
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
|
| + if (min_num_bytes_to_read > bytes_available_) {
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_OUT_OF_RANGE;
|
| + }
|
|
|
| - uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
|
| - static_cast<uint32_t>(data_.size()));
|
| - if (bytes_to_read == 0)
|
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
|
| + uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
|
| + if (bytes_to_read == 0) {
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_SHOULD_WAIT;
|
| + }
|
|
|
| - if (!discard)
|
| - memcpy(elements, &data_[0], bytes_to_read);
|
| + if (!discard) {
|
| + uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
|
| + CHECK(data);
|
| +
|
| + uint8_t* destination = static_cast<uint8_t*>(elements);
|
| + CHECK(destination);
|
| +
|
| + DCHECK_LE(read_offset_, options_.capacity_num_bytes);
|
| + uint32_t tail_bytes_to_copy =
|
| + std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
|
| + uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
|
| + if (tail_bytes_to_copy > 0)
|
| + memcpy(destination, data + read_offset_, tail_bytes_to_copy);
|
| + if (head_bytes_to_copy > 0)
|
| + memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
|
| + }
|
| *num_bytes = bytes_to_read;
|
|
|
| bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
|
| - if (discard || !peek)
|
| - data_.erase(data_.begin(), data_.begin() + bytes_to_read);
|
| + if (discard || !peek) {
|
| + read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
|
| + bytes_available_ -= bytes_to_read;
|
| +
|
| + base::AutoUnlock unlock(lock_);
|
| + NotifyRead(bytes_to_read);
|
| + }
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
|
| - const void** buffer,
|
| - uint32_t* buffer_num_bytes,
|
| - MojoReadDataFlags flags) {
|
| - lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| +MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
|
| + uint32_t* buffer_num_bytes,
|
| + MojoReadDataFlags flags) {
|
| + base::AutoLock lock(lock_);
|
| + if (!shared_ring_buffer_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| if (in_two_phase_read_)
|
| return MOJO_RESULT_BUSY;
|
|
|
| @@ -227,81 +183,82 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
|
| (flags & MOJO_READ_DATA_FLAG_PEEK))
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size());
|
| - if (max_num_bytes_to_read == 0)
|
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
|
| + if (bytes_available_ == 0) {
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_SHOULD_WAIT;
|
| + }
|
| +
|
| + DCHECK_LT(read_offset_, options_.capacity_num_bytes);
|
| + uint32_t bytes_to_read = std::min(bytes_available_,
|
| + options_.capacity_num_bytes - read_offset_);
|
| +
|
| + CHECK(ring_buffer_mapping_);
|
| + uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
|
| + CHECK(data);
|
|
|
| in_two_phase_read_ = true;
|
| - *buffer = &data_[0];
|
| - *buffer_num_bytes = max_num_bytes_to_read;
|
| - two_phase_max_bytes_read_ = max_num_bytes_to_read;
|
| + *buffer = data + read_offset_;
|
| + *buffer_num_bytes = bytes_to_read;
|
| + two_phase_max_bytes_read_ = bytes_to_read;
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
|
| - uint32_t num_bytes_read) {
|
| - lock().AssertAcquired();
|
| +MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
|
| + base::AutoLock lock(lock_);
|
| if (!in_two_phase_read_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| - HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| + if (in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + CHECK(shared_ring_buffer_);
|
| +
|
| + HandleSignalsState old_state = GetHandleSignalsStateNoLock();
|
| MojoResult rv;
|
| if (num_bytes_read > two_phase_max_bytes_read_ ||
|
| num_bytes_read % options_.element_num_bytes != 0) {
|
| rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| } else {
|
| rv = MOJO_RESULT_OK;
|
| - data_.erase(data_.begin(), data_.begin() + num_bytes_read);
|
| + read_offset_ =
|
| + (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
|
| +
|
| + DCHECK_GE(bytes_available_, num_bytes_read);
|
| + bytes_available_ -= num_bytes_read;
|
| +
|
| + base::AutoUnlock unlock(lock_);
|
| + NotifyRead(num_bytes_read);
|
| }
|
|
|
| in_two_phase_read_ = false;
|
| two_phase_max_bytes_read_ = 0;
|
| - if (!data_received_during_two_phase_read_.empty()) {
|
| - if (data_.empty()) {
|
| - data_received_during_two_phase_read_.swap(data_);
|
| - } else {
|
| - data_.insert(data_.end(), data_received_during_two_phase_read_.begin(),
|
| - data_received_during_two_phase_read_.end());
|
| - data_received_during_two_phase_read_.clear();
|
| - }
|
| - }
|
|
|
| - HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
|
|
| return rv;
|
| }
|
|
|
| -HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
|
| - const {
|
| - lock().AssertAcquired();
|
| -
|
| - HandleSignalsState rv;
|
| - if (!data_.empty()) {
|
| - if (!in_two_phase_read_)
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - } else if (!error_) {
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - }
|
| -
|
| - if (error_)
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| - return rv;
|
| +HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
|
| + base::AutoLock lock(lock_);
|
| + return GetHandleSignalsStateNoLock();
|
| }
|
|
|
| -MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
|
| +MojoResult DataPipeConsumerDispatcher::AddAwakable(
|
| Awakable* awakable,
|
| MojoHandleSignals signals,
|
| uintptr_t context,
|
| HandleSignalsState* signals_state) {
|
| - lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| - HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| + base::AutoLock lock(lock_);
|
| + if (!shared_ring_buffer_ || in_transit_) {
|
| + if (signals_state)
|
| + *signals_state = HandleSignalsState();
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + }
|
| + UpdateSignalsStateNoLock();
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| if (state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = state;
|
| @@ -317,186 +274,260 @@ MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
|
| +void DataPipeConsumerDispatcher::RemoveAwakable(
|
| Awakable* awakable,
|
| HandleSignalsState* signals_state) {
|
| - lock().AssertAcquired();
|
| + base::AutoLock lock(lock_);
|
| + if ((!shared_ring_buffer_ || in_transit_) && signals_state)
|
| + *signals_state = HandleSignalsState();
|
| + else if (signals_state)
|
| + *signals_state = GetHandleSignalsStateNoLock();
|
| awakable_list_.Remove(awakable);
|
| - if (signals_state)
|
| - *signals_state = GetHandleSignalsStateImplNoLock();
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
|
| - size_t* max_size,
|
| - size_t* max_platform_handles) {
|
| - if (!serialized_) {
|
| - // Handles the case where we have messages read off RawChannel but not ready
|
| - // by MojoReadMessage.
|
| - SerializeInternal();
|
| - }
|
| -
|
| - DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
|
| - !data_.empty() || !serialized_read_buffer_.empty(),
|
| - max_size, max_platform_handles);
|
| +void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
|
| + uint32_t* num_ports,
|
| + uint32_t* num_handles) {
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
|
| + *num_ports = 1;
|
| + *num_handles = 1;
|
| }
|
|
|
| -bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
|
| +bool DataPipeConsumerDispatcher::EndSerialize(
|
| void* destination,
|
| - size_t* actual_size,
|
| - PlatformHandleVector* platform_handles) {
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
|
| - if (shared_memory_size) {
|
| - shared_memory_size += sizeof(SharedMemoryHeader);
|
| - SharedMemoryHeader header;
|
| - header.data_size = static_cast<uint32_t>(data_.size());
|
| - header.read_buffer_size =
|
| - static_cast<uint32_t>(serialized_read_buffer_.size());
|
| -
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer(
|
| - internal::g_platform_support->CreateSharedBuffer(
|
| - shared_memory_size));
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping(
|
| - shared_buffer->Map(0, shared_memory_size));
|
| -
|
| - char* start = static_cast<char*>(mapping->GetBase());
|
| - memcpy(start, &header, sizeof(SharedMemoryHeader));
|
| - start += sizeof(SharedMemoryHeader);
|
| -
|
| - if (!data_.empty()) {
|
| - memcpy(start, &data_[0], data_.size());
|
| - start += data_.size();
|
| - }
|
| + ports::PortName* ports,
|
| + PlatformHandle* platform_handles) {
|
| + SerializedState* state = static_cast<SerializedState*>(destination);
|
| + memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
|
|
|
| - if (!serialized_read_buffer_.empty()) {
|
| - memcpy(start, &serialized_read_buffer_[0],
|
| - serialized_read_buffer_.size());
|
| - start += serialized_read_buffer_.size();
|
| - }
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + state->pipe_id = pipe_id_;
|
| + state->peer_closed = peer_closed_;
|
| + state->read_offset = read_offset_;
|
| + state->bytes_available = bytes_available_;
|
|
|
| - shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
|
| - }
|
| + ports[0] = control_port_.name();
|
| +
|
| + buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
|
| + platform_handles[0] = buffer_handle_for_transit_.get();
|
|
|
| - DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
|
| - std::move(shared_memory_handle), shared_memory_size,
|
| - destination, actual_size, platform_handles);
|
| - CloseImplNoLock();
|
| return true;
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::TransportStarted() {
|
| - started_transport_.Acquire();
|
| +bool DataPipeConsumerDispatcher::BeginTransit() {
|
| + base::AutoLock lock(lock_);
|
| + if (in_transit_)
|
| + return false;
|
| + in_transit_ = !in_two_phase_read_;
|
| + return in_transit_;
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::TransportEnded() {
|
| - started_transport_.Release();
|
| +void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
|
| + node_controller_->SetPortObserver(control_port_, nullptr);
|
|
|
| - base::AutoLock locker(lock());
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + in_transit_ = false;
|
| + transferred_ = true;
|
| + ignore_result(buffer_handle_for_transit_.release());
|
| + CloseNoLock();
|
| +}
|
|
|
| - // If transporting of DP failed, we might have got more data and didn't awake
|
| - // for.
|
| - // TODO(jam): should we care about only alerting if it was empty before
|
| - // TransportStarted?
|
| - if (!data_.empty())
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| +void DataPipeConsumerDispatcher::CancelTransit() {
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + in_transit_ = false;
|
| + buffer_handle_for_transit_.reset();
|
| + UpdateSignalsStateNoLock();
|
| }
|
|
|
| -bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
|
| - lock().AssertAcquired();
|
| - return in_two_phase_read_;
|
| +// static
|
| +scoped_refptr<DataPipeConsumerDispatcher>
|
| +DataPipeConsumerDispatcher::Deserialize(const void* data,
|
| + size_t num_bytes,
|
| + const ports::PortName* ports,
|
| + size_t num_ports,
|
| + PlatformHandle* handles,
|
| + size_t num_handles) {
|
| + if (num_ports != 1 || num_handles != 1 ||
|
| + num_bytes != sizeof(SerializedState)) {
|
| + return nullptr;
|
| + }
|
| +
|
| + const SerializedState* state = static_cast<const SerializedState*>(data);
|
| +
|
| + NodeController* node_controller = internal::g_core->GetNodeController();
|
| + ports::PortRef port;
|
| + if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
|
| + return nullptr;
|
| +
|
| + PlatformHandle buffer_handle;
|
| + std::swap(buffer_handle, handles[0]);
|
| + scoped_refptr<PlatformSharedBuffer> ring_buffer =
|
| + internal::g_platform_support->CreateSharedBufferFromHandle(
|
| + state->options.capacity_num_bytes,
|
| + ScopedPlatformHandle(buffer_handle));
|
| + if (!ring_buffer) {
|
| + DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
|
| + return nullptr;
|
| + }
|
| +
|
| + scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
|
| + new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
|
| + state->options, false /* initialized */,
|
| + state->pipe_id);
|
| +
|
| + {
|
| + base::AutoLock lock(dispatcher->lock_);
|
| + dispatcher->peer_closed_ = state->peer_closed;
|
| + dispatcher->read_offset_ = state->read_offset;
|
| + dispatcher->bytes_available_ = state->bytes_available;
|
| + dispatcher->InitializeNoLock();
|
| + }
|
| +
|
| + return dispatcher;
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::OnReadMessage(
|
| - const MessageInTransit::View& message_view,
|
| - ScopedPlatformHandleVectorPtr platform_handles) {
|
| - const char* bytes_start = static_cast<const char*>(message_view.bytes());
|
| - const char* bytes_end = bytes_start + message_view.num_bytes();
|
| - if (started_transport_.Try()) {
|
| - // We're not in the middle of being sent.
|
| -
|
| - // Can get synchronously called back in Init if there was initial data.
|
| - scoped_ptr<base::AutoLock> locker;
|
| - if (!calling_init_) {
|
| - locker.reset(new base::AutoLock(lock()));
|
| - }
|
| +DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
|
| + DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
|
| + !in_transit_);
|
| +}
|
|
|
| - if (in_two_phase_read_) {
|
| - data_received_during_two_phase_read_.insert(
|
| - data_received_during_two_phase_read_.end(), bytes_start, bytes_end);
|
| - } else {
|
| - bool was_empty = data_.empty();
|
| - data_.insert(data_.end(), bytes_start, bytes_end);
|
| - if (was_empty)
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| +void DataPipeConsumerDispatcher::InitializeNoLock() {
|
| + lock_.AssertAcquired();
|
| +
|
| + if (shared_ring_buffer_) {
|
| + DCHECK(!ring_buffer_mapping_);
|
| + ring_buffer_mapping_ =
|
| + shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
|
| + if (!ring_buffer_mapping_) {
|
| + DLOG(ERROR) << "Failed to map shared buffer.";
|
| + shared_ring_buffer_ = nullptr;
|
| }
|
| - started_transport_.Release();
|
| - } else {
|
| - // See comment in MessagePipeDispatcher about why we can't and don't need
|
| - // to lock here.
|
| - data_.insert(data_.end(), bytes_start, bytes_end);
|
| }
|
| +
|
| + base::AutoUnlock unlock(lock_);
|
| + node_controller_->SetPortObserver(
|
| + control_port_,
|
| + make_scoped_refptr(new PortObserverThunk(this)));
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::OnError(Error error) {
|
| - switch (error) {
|
| - case ERROR_READ_SHUTDOWN:
|
| - // The other side was cleanly closed, so this isn't actually an error.
|
| - DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
|
| - break;
|
| - case ERROR_READ_BROKEN:
|
| - LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
|
| - break;
|
| - case ERROR_READ_BAD_MESSAGE:
|
| - // Receiving a bad message means either a bug, data corruption, or
|
| - // malicious attack (probably due to some other bug).
|
| - LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
|
| - << "message)";
|
| - break;
|
| - case ERROR_READ_UNKNOWN:
|
| - LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
|
| - break;
|
| - case ERROR_WRITE:
|
| - LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
|
| - break;
|
| +MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
|
| + lock_.AssertAcquired();
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + is_closed_ = true;
|
| + ring_buffer_mapping_.reset();
|
| + shared_ring_buffer_ = nullptr;
|
| +
|
| + awakable_list_.CancelAll();
|
| + if (!transferred_) {
|
| + base::AutoUnlock unlock(lock_);
|
| + node_controller_->ClosePort(control_port_);
|
| }
|
|
|
| - error_ = true;
|
| - if (started_transport_.Try()) {
|
| - base::AutoLock locker(lock());
|
| - // We can get two OnError callbacks before the post task below completes.
|
| - // Although RawChannel still has a pointer to this object until Shutdown is
|
| - // called, that is safe since this class always does a PostTask to the IO
|
| - // thread to self destruct.
|
| - if (channel_) {
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| - started_transport_.Release();
|
| - } else {
|
| - // We must be waiting to call ReleaseHandle. It will call Shutdown.
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +HandleSignalsState
|
| +DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
|
| + lock_.AssertAcquired();
|
| +
|
| + HandleSignalsState rv;
|
| + if (shared_ring_buffer_ && bytes_available_) {
|
| + if (!in_two_phase_read_)
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| + } else if (!peer_closed_ && shared_ring_buffer_) {
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| }
|
| +
|
| + if (peer_closed_)
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + return rv;
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::SerializeInternal() {
|
| - DCHECK(!in_two_phase_read_);
|
| - // We need to stop watching handle immediately, even though not on IO thread,
|
| - // so that other messages aren't read after this.
|
| - if (channel_) {
|
| - std::vector<char> serialized_write_buffer;
|
| - std::vector<int> fds;
|
| - bool write_error = false;
|
| - serialized_platform_handle_ = channel_->ReleaseHandle(
|
| - &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds,
|
| - &write_error);
|
| - CHECK(serialized_write_buffer.empty());
|
| - CHECK(fds.empty());
|
| - CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write.";
|
| -
|
| - channel_ = nullptr;
|
| +void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
|
| + DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
|
| + << num_bytes << " bytes read. [control_port="
|
| + << control_port_.name() << "]";
|
| +
|
| + SendDataPipeControlMessage(node_controller_, control_port_,
|
| + DataPipeCommand::DATA_WAS_READ, num_bytes);
|
| +}
|
| +
|
| +void DataPipeConsumerDispatcher::OnPortStatusChanged() {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + // We stop observing the control port as soon it's transferred, but this can
|
| + // race with events which are raised right before that happens. This is fine
|
| + // to ignore.
|
| + if (transferred_)
|
| + return;
|
| +
|
| + DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
|
| +
|
| + UpdateSignalsStateNoLock();
|
| +}
|
| +
|
| +void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
|
| + lock_.AssertAcquired();
|
| +
|
| + bool was_peer_closed = peer_closed_;
|
| + size_t previous_bytes_available = bytes_available_;
|
| +
|
| + ports::PortStatus port_status;
|
| + if (node_controller_->node()->GetStatus(control_port_, &port_status) !=
|
| + ports::OK ||
|
| + !port_status.receiving_messages) {
|
| + DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
|
| + << " [control_port=" << control_port_.name() << "]";
|
| +
|
| + peer_closed_ = true;
|
| }
|
|
|
| - serialized_ = true;
|
| + if (port_status.has_messages && !in_transit_) {
|
| + ports::ScopedMessage message;
|
| + do {
|
| + int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
|
| + &message);
|
| + if (rv != ports::OK)
|
| + peer_closed_ = true;
|
| + if (message) {
|
| + const DataPipeControlMessage* m =
|
| + static_cast<const DataPipeControlMessage*>(
|
| + message->payload_bytes());
|
| +
|
| + if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
|
| + DLOG(ERROR) << "Unexpected control message from producer.";
|
| + peer_closed_ = true;
|
| + break;
|
| + }
|
| +
|
| + if (static_cast<size_t>(bytes_available_) + m->num_bytes >
|
| + options_.capacity_num_bytes) {
|
| + DLOG(ERROR) << "Producer claims to have written too many bytes.";
|
| + peer_closed_ = true;
|
| + break;
|
| + }
|
| +
|
| + DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
|
| + << m->num_bytes << " bytes were written. [control_port="
|
| + << control_port_.name() << "]";
|
| +
|
| + bytes_available_ += m->num_bytes;
|
| + }
|
| + } while (message);
|
| + }
|
| +
|
| + if (peer_closed_ != was_peer_closed ||
|
| + bytes_available_ != previous_bytes_available) {
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + }
|
| }
|
|
|
| } // namespace edk
|
|
|