| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..f26e7e28c38c5831b7ee84e29defe2b38b9d9cd0 100644
|
| --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| @@ -11,136 +11,99 @@
|
|
|
| #include "base/bind.h"
|
| #include "base/logging.h"
|
| +#include "base/memory/ref_counted.h"
|
| #include "base/message_loop/message_loop.h"
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| #include "mojo/edk/embedder/platform_shared_buffer.h"
|
| #include "mojo/edk/embedder/platform_support.h"
|
| #include "mojo/edk/system/configuration.h"
|
| -#include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/core.h"
|
| +#include "mojo/edk/system/data_pipe_control_message.h"
|
| +#include "mojo/edk/system/node_controller.h"
|
| +#include "mojo/edk/system/ports_message.h"
|
|
|
| namespace mojo {
|
| namespace edk {
|
|
|
| -void DataPipeProducerDispatcher::Init(
|
| - ScopedPlatformHandle message_pipe,
|
| - char* serialized_write_buffer, size_t serialized_write_buffer_size) {
|
| - if (message_pipe.is_valid()) {
|
| - channel_ = RawChannel::Create(std::move(message_pipe));
|
| - channel_->SetSerializedData(
|
| - nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size,
|
| - nullptr, nullptr);
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
|
| - } else {
|
| - error_ = true;
|
| - }
|
| -}
|
| +namespace {
|
|
|
| -void DataPipeProducerDispatcher::InitOnIO() {
|
| - base::AutoLock locker(lock());
|
| - if (channel_)
|
| - channel_->Init(this);
|
| -}
|
| +struct SerializedState {
|
| + MojoCreateDataPipeOptions options;
|
| + uint64_t pipe_id;
|
| + bool peer_closed;
|
| + uint32_t write_offset;
|
| + uint32_t available_capacity;
|
| +};
|
|
|
| -void DataPipeProducerDispatcher::CloseOnIO() {
|
| - base::AutoLock locker(lock());
|
| - if (channel_) {
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| -}
|
| +} // namespace
|
|
|
| -Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
|
| - return Type::DATA_PIPE_PRODUCER;
|
| -}
|
| +// A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
|
| +// reference to the dispatcher to ensure it lives as long as the observed port.
|
| +class DataPipeProducerDispatcher::PortObserverThunk
|
| + : public NodeController::PortObserver {
|
| + public:
|
| + explicit PortObserverThunk(
|
| + scoped_refptr<DataPipeProducerDispatcher> dispatcher)
|
| + : dispatcher_(dispatcher) {}
|
|
|
| -scoped_refptr<DataPipeProducerDispatcher>
|
| -DataPipeProducerDispatcher::Deserialize(
|
| - const void* source,
|
| - size_t size,
|
| - PlatformHandleVector* platform_handles) {
|
| - MojoCreateDataPipeOptions options;
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = 0;
|
| - ScopedPlatformHandle platform_handle =
|
| - DataPipe::Deserialize(source, size, platform_handles, &options,
|
| - &shared_memory_handle, &shared_memory_size);
|
| -
|
| - scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
|
| -
|
| - char* serialized_write_buffer = nullptr;
|
| - size_t serialized_write_buffer_size = 0;
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer;
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping;
|
| - if (shared_memory_size) {
|
| - shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
|
| - shared_memory_size, std::move(shared_memory_handle));
|
| - mapping = shared_buffer->Map(0, shared_memory_size);
|
| - serialized_write_buffer = static_cast<char*>(mapping->GetBase());
|
| - serialized_write_buffer_size = shared_memory_size;
|
| - }
|
| + private:
|
| + ~PortObserverThunk() override {}
|
|
|
| - rv->Init(std::move(platform_handle), serialized_write_buffer,
|
| - serialized_write_buffer_size);
|
| - return rv;
|
| -}
|
| + // NodeController::PortObserver:
|
| + void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
|
|
|
| -DataPipeProducerDispatcher::DataPipeProducerDispatcher(
|
| - const MojoCreateDataPipeOptions& options)
|
| - : options_(options), channel_(nullptr), error_(false), serialized_(false) {
|
| -}
|
| + scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
|
|
|
| -DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
|
| - // See comment in ~MessagePipeDispatcher.
|
| - if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
|
| - channel_->Shutdown();
|
| - else
|
| - DCHECK(!channel_);
|
| -}
|
| + DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
|
| +};
|
|
|
| -void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
|
| - lock().AssertAcquired();
|
| - awakable_list_.CancelAll();
|
| +DataPipeProducerDispatcher::DataPipeProducerDispatcher(
|
| + NodeController* node_controller,
|
| + const ports::PortRef& control_port,
|
| + scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
|
| + const MojoCreateDataPipeOptions& options,
|
| + bool initialized,
|
| + uint64_t pipe_id)
|
| + : options_(options),
|
| + node_controller_(node_controller),
|
| + control_port_(control_port),
|
| + pipe_id_(pipe_id),
|
| + shared_ring_buffer_(shared_ring_buffer),
|
| + available_capacity_(options_.capacity_num_bytes) {
|
| + if (initialized) {
|
| + base::AutoLock lock(lock_);
|
| + InitializeNoLock();
|
| + }
|
| }
|
|
|
| -void DataPipeProducerDispatcher::CloseImplNoLock() {
|
| - lock().AssertAcquired();
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
|
| +Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
|
| + return Type::DATA_PIPE_PRODUCER;
|
| }
|
|
|
| -scoped_refptr<Dispatcher>
|
| -DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
| - lock().AssertAcquired();
|
| -
|
| - SerializeInternal();
|
| -
|
| - scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
|
| - serialized_write_buffer_.swap(rv->serialized_write_buffer_);
|
| - rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
|
| - rv->serialized_ = true;
|
| - return scoped_refptr<Dispatcher>(rv.get());
|
| +MojoResult DataPipeProducerDispatcher::Close() {
|
| + base::AutoLock lock(lock_);
|
| + DVLOG(1) << "Closing data pipe producer " << pipe_id_;
|
| + return CloseNoLock();
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
|
| - const void* elements,
|
| - uint32_t* num_bytes,
|
| - MojoWriteDataFlags flags) {
|
| - lock().AssertAcquired();
|
| - if (InTwoPhaseWrite())
|
| +MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
|
| + uint32_t* num_bytes,
|
| + MojoWriteDataFlags flags) {
|
| + base::AutoLock lock(lock_);
|
| + if (!shared_ring_buffer_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + if (in_two_phase_write_)
|
| return MOJO_RESULT_BUSY;
|
| - if (error_)
|
| +
|
| + if (peer_closed_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
| +
|
| if (*num_bytes % options_.element_num_bytes != 0)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| if (*num_bytes == 0)
|
| return MOJO_RESULT_OK; // Nothing to do.
|
|
|
| - // For now, we ignore options.capacity_num_bytes as a total of all pending
|
| - // writes (and just treat it per message). We will implement that later if
|
| - // we need to. All current uses want all their data to be sent, and it's not
|
| - // clear that this backpressure should be done at the mojo layer or at a
|
| - // higher application layer.
|
| bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
|
| uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
|
| if (min_num_bytes_to_write > options_.capacity_num_bytes) {
|
| @@ -149,98 +112,135 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
|
| return MOJO_RESULT_OUT_OF_RANGE;
|
| }
|
|
|
| - uint32_t num_bytes_to_write =
|
| - std::min(*num_bytes, options_.capacity_num_bytes);
|
| + DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
|
| + uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
|
| if (num_bytes_to_write == 0)
|
| return MOJO_RESULT_SHOULD_WAIT;
|
|
|
| - HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState old_state = GetHandleSignalsStateNoLock();
|
|
|
| *num_bytes = num_bytes_to_write;
|
| - WriteDataIntoMessages(elements, num_bytes_to_write);
|
|
|
| - HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| + CHECK(ring_buffer_mapping_);
|
| + uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
|
| + CHECK(data);
|
| +
|
| + const uint8_t* source = static_cast<const uint8_t*>(elements);
|
| + CHECK(source);
|
| +
|
| + DCHECK_LE(write_offset_, options_.capacity_num_bytes);
|
| + uint32_t tail_bytes_to_write =
|
| + std::min(options_.capacity_num_bytes - write_offset_,
|
| + num_bytes_to_write);
|
| + uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
|
| +
|
| + DCHECK_GT(tail_bytes_to_write, 0u);
|
| + memcpy(data + write_offset_, source, tail_bytes_to_write);
|
| + if (head_bytes_to_write > 0)
|
| + memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
|
| +
|
| + DCHECK_LE(num_bytes_to_write, available_capacity_);
|
| + available_capacity_ -= num_bytes_to_write;
|
| + write_offset_ = (write_offset_ + num_bytes_to_write) %
|
| + options_.capacity_num_bytes;
|
| +
|
| + HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| +
|
| + base::AutoUnlock unlock(lock_);
|
| + NotifyWrite(num_bytes_to_write);
|
| +
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
|
| +MojoResult DataPipeProducerDispatcher::BeginWriteData(
|
| void** buffer,
|
| uint32_t* buffer_num_bytes,
|
| MojoWriteDataFlags flags) {
|
| - lock().AssertAcquired();
|
| - if (InTwoPhaseWrite())
|
| + base::AutoLock lock(lock_);
|
| + if (!shared_ring_buffer_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + if (in_two_phase_write_)
|
| return MOJO_RESULT_BUSY;
|
| - if (error_)
|
| + if (peer_closed_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| - // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
|
| - if (*buffer_num_bytes == 0)
|
| - *buffer_num_bytes = options_.capacity_num_bytes;
|
| + if (available_capacity_ == 0) {
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_SHOULD_WAIT;
|
| + }
|
|
|
| - two_phase_data_.resize(*buffer_num_bytes);
|
| - *buffer = &two_phase_data_[0];
|
| + in_two_phase_write_ = true;
|
| + *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
|
| + available_capacity_);
|
| + DCHECK_GT(*buffer_num_bytes, 0u);
|
|
|
| - // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
|
| - // we can construct a MessageInTransit here. But then we need to make
|
| - // MessageInTransit support changing its data size later.
|
| + CHECK(ring_buffer_mapping_);
|
| + uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
|
| + *buffer = data + write_offset_;
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
|
| +MojoResult DataPipeProducerDispatcher::EndWriteData(
|
| uint32_t num_bytes_written) {
|
| - lock().AssertAcquired();
|
| - if (!InTwoPhaseWrite())
|
| + base::AutoLock lock(lock_);
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + if (!in_two_phase_write_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| + DCHECK(shared_ring_buffer_);
|
| + DCHECK(ring_buffer_mapping_);
|
| +
|
| // Note: Allow successful completion of the two-phase write even if the other
|
| // side has been closed.
|
| MojoResult rv = MOJO_RESULT_OK;
|
| - if (num_bytes_written > two_phase_data_.size() ||
|
| - num_bytes_written % options_.element_num_bytes != 0) {
|
| + if (num_bytes_written > available_capacity_ ||
|
| + num_bytes_written % options_.element_num_bytes != 0 ||
|
| + write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
|
| rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| - } else if (channel_) {
|
| - WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
|
| + } else {
|
| + DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
|
| + available_capacity_ -= num_bytes_written;
|
| + write_offset_ = (write_offset_ + num_bytes_written) %
|
| + options_.capacity_num_bytes;
|
| +
|
| + base::AutoUnlock unlock(lock_);
|
| + NotifyWrite(num_bytes_written);
|
| }
|
|
|
| - // Two-phase write ended even on failure.
|
| - two_phase_data_.clear();
|
| + in_two_phase_write_ = false;
|
| +
|
| // If we're now writable, we *became* writable (since we weren't writable
|
| // during the two-phase write), so awake producer awakables.
|
| - HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| awakable_list_.AwakeForStateChange(new_state);
|
|
|
| return rv;
|
| }
|
|
|
| -HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
|
| - const {
|
| - lock().AssertAcquired();
|
| -
|
| - HandleSignalsState rv;
|
| - if (!error_) {
|
| - if (!InTwoPhaseWrite())
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| - } else {
|
| - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| - }
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| - return rv;
|
| +HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
|
| + base::AutoLock lock(lock_);
|
| + return GetHandleSignalsStateNoLock();
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
|
| +MojoResult DataPipeProducerDispatcher::AddAwakable(
|
| Awakable* awakable,
|
| MojoHandleSignals signals,
|
| uintptr_t context,
|
| HandleSignalsState* signals_state) {
|
| - lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| - HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| + base::AutoLock lock(lock_);
|
| + if (!shared_ring_buffer_ || in_transit_) {
|
| + if (signals_state)
|
| + *signals_state = HandleSignalsState();
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + }
|
| + UpdateSignalsStateNoLock();
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| if (state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = state;
|
| @@ -256,154 +256,256 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| -void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
|
| +void DataPipeProducerDispatcher::RemoveAwakable(
|
| Awakable* awakable,
|
| HandleSignalsState* signals_state) {
|
| - lock().AssertAcquired();
|
| + base::AutoLock lock(lock_);
|
| + if ((!shared_ring_buffer_ || in_transit_) && signals_state)
|
| + *signals_state = HandleSignalsState();
|
| + else if (signals_state)
|
| + *signals_state = GetHandleSignalsStateNoLock();
|
| awakable_list_.Remove(awakable);
|
| - if (signals_state)
|
| - *signals_state = GetHandleSignalsStateImplNoLock();
|
| }
|
|
|
| -void DataPipeProducerDispatcher::StartSerializeImplNoLock(
|
| - size_t* max_size,
|
| - size_t* max_platform_handles) {
|
| - if (!serialized_)
|
| - SerializeInternal();
|
| -
|
| - DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
|
| - !serialized_write_buffer_.empty(), max_size,
|
| - max_platform_handles);
|
| +void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
|
| + uint32_t* num_ports,
|
| + uint32_t* num_handles) {
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + *num_bytes = sizeof(SerializedState);
|
| + *num_ports = 1;
|
| + *num_handles = 1;
|
| }
|
|
|
| -bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
|
| +bool DataPipeProducerDispatcher::EndSerialize(
|
| void* destination,
|
| - size_t* actual_size,
|
| - PlatformHandleVector* platform_handles) {
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = serialized_write_buffer_.size();
|
| - if (shared_memory_size) {
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer(
|
| - internal::g_platform_support->CreateSharedBuffer(
|
| - shared_memory_size));
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping(
|
| - shared_buffer->Map(0, shared_memory_size));
|
| - memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
|
| - shared_memory_size);
|
| - shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
|
| - }
|
| + ports::PortName* ports,
|
| + PlatformHandle* platform_handles) {
|
| + SerializedState* state = static_cast<SerializedState*>(destination);
|
| + memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
|
| +
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + state->pipe_id = pipe_id_;
|
| + state->peer_closed = peer_closed_;
|
| + state->write_offset = write_offset_;
|
| + state->available_capacity = available_capacity_;
|
| +
|
| + ports[0] = control_port_.name();
|
| +
|
| + buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
|
| + platform_handles[0] = buffer_handle_for_transit_.get();
|
|
|
| - DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
|
| - std::move(shared_memory_handle), shared_memory_size,
|
| - destination, actual_size, platform_handles);
|
| - CloseImplNoLock();
|
| return true;
|
| }
|
|
|
| -void DataPipeProducerDispatcher::TransportStarted() {
|
| - started_transport_.Acquire();
|
| +bool DataPipeProducerDispatcher::BeginTransit() {
|
| + base::AutoLock lock(lock_);
|
| + if (in_transit_)
|
| + return false;
|
| + in_transit_ = !in_two_phase_write_;
|
| + return in_transit_;
|
| }
|
|
|
| -void DataPipeProducerDispatcher::TransportEnded() {
|
| - started_transport_.Release();
|
| -}
|
| +void DataPipeProducerDispatcher::CompleteTransitAndClose() {
|
| + node_controller_->SetPortObserver(control_port_, nullptr);
|
|
|
| -bool DataPipeProducerDispatcher::IsBusyNoLock() const {
|
| - lock().AssertAcquired();
|
| - return InTwoPhaseWrite();
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + transferred_ = true;
|
| + in_transit_ = false;
|
| + ignore_result(buffer_handle_for_transit_.release());
|
| + CloseNoLock();
|
| }
|
|
|
| -void DataPipeProducerDispatcher::OnReadMessage(
|
| - const MessageInTransit::View& message_view,
|
| - ScopedPlatformHandleVectorPtr platform_handles) {
|
| - CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages.";
|
| +void DataPipeProducerDispatcher::CancelTransit() {
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(in_transit_);
|
| + in_transit_ = false;
|
| + buffer_handle_for_transit_.reset();
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
|
|
| -void DataPipeProducerDispatcher::OnError(Error error) {
|
| - switch (error) {
|
| - case ERROR_READ_BROKEN:
|
| - case ERROR_READ_BAD_MESSAGE:
|
| - case ERROR_READ_UNKNOWN:
|
| - LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
|
| - break;
|
| - case ERROR_READ_SHUTDOWN:
|
| - // The other side was cleanly closed, so this isn't actually an error.
|
| - DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
|
| - break;
|
| - case ERROR_WRITE:
|
| - // Write errors are slightly notable: they probably shouldn't happen under
|
| - // normal operation (but maybe the other side crashed).
|
| - LOG(WARNING) << "DataPipeProducerDispatcher write error";
|
| - break;
|
| +// static
|
| +scoped_refptr<DataPipeProducerDispatcher>
|
| +DataPipeProducerDispatcher::Deserialize(const void* data,
|
| + size_t num_bytes,
|
| + const ports::PortName* ports,
|
| + size_t num_ports,
|
| + PlatformHandle* handles,
|
| + size_t num_handles) {
|
| + if (num_ports != 1 || num_handles != 1 ||
|
| + num_bytes != sizeof(SerializedState)) {
|
| + return nullptr;
|
| }
|
|
|
| - error_ = true;
|
| - if (started_transport_.Try()) {
|
| - base::AutoLock locker(lock());
|
| - // We can get two OnError callbacks before the post task below completes.
|
| - // Although RawChannel still has a pointer to this object until Shutdown is
|
| - // called, that is safe since this class always does a PostTask to the IO
|
| - // thread to self destruct.
|
| - if (channel_) {
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| - started_transport_.Release();
|
| - } else {
|
| - // We must be waiting to call ReleaseHandle. It will call Shutdown.
|
| + const SerializedState* state = static_cast<const SerializedState*>(data);
|
| +
|
| + NodeController* node_controller = internal::g_core->GetNodeController();
|
| + ports::PortRef port;
|
| + if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
|
| + return nullptr;
|
| +
|
| + PlatformHandle buffer_handle;
|
| + std::swap(buffer_handle, handles[0]);
|
| + scoped_refptr<PlatformSharedBuffer> ring_buffer =
|
| + internal::g_platform_support->CreateSharedBufferFromHandle(
|
| + state->options.capacity_num_bytes,
|
| + ScopedPlatformHandle(buffer_handle));
|
| + if (!ring_buffer) {
|
| + DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
|
| + return nullptr;
|
| }
|
| +
|
| + scoped_refptr<DataPipeProducerDispatcher> dispatcher =
|
| + new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
|
| + state->options, false /* initialized */,
|
| + state->pipe_id);
|
| +
|
| + {
|
| + base::AutoLock lock(dispatcher->lock_);
|
| + dispatcher->peer_closed_ = state->peer_closed;
|
| + dispatcher->write_offset_ = state->write_offset;
|
| + dispatcher->available_capacity_ = state->available_capacity;
|
| + dispatcher->InitializeNoLock();
|
| + }
|
| +
|
| + return dispatcher;
|
| }
|
|
|
| -bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
|
| - return !two_phase_data_.empty();
|
| +DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
|
| + DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
|
| + !ring_buffer_mapping_);
|
| }
|
|
|
| -bool DataPipeProducerDispatcher::WriteDataIntoMessages(
|
| - const void* elements,
|
| - uint32_t num_bytes) {
|
| - // The maximum amount of data to send per message (make it a multiple of the
|
| - // element size.
|
| - size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
|
| - max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
|
| - DCHECK_GT(max_message_num_bytes, 0u);
|
| -
|
| - uint32_t offset = 0;
|
| - while (offset < num_bytes) {
|
| - uint32_t message_num_bytes =
|
| - std::min(static_cast<uint32_t>(max_message_num_bytes),
|
| - num_bytes - offset);
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::Type::MESSAGE, message_num_bytes,
|
| - static_cast<const char*>(elements) + offset));
|
| - if (!channel_->WriteMessage(std::move(message))) {
|
| - error_ = true;
|
| - return false;
|
| +void DataPipeProducerDispatcher::InitializeNoLock() {
|
| + lock_.AssertAcquired();
|
| +
|
| + if (shared_ring_buffer_) {
|
| + ring_buffer_mapping_ =
|
| + shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
|
| + if (!ring_buffer_mapping_) {
|
| + DLOG(ERROR) << "Failed to map shared buffer.";
|
| + shared_ring_buffer_ = nullptr;
|
| }
|
| + }
|
|
|
| - offset += message_num_bytes;
|
| + base::AutoUnlock unlock(lock_);
|
| + node_controller_->SetPortObserver(
|
| + control_port_,
|
| + make_scoped_refptr(new PortObserverThunk(this)));
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::CloseNoLock() {
|
| + lock_.AssertAcquired();
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + is_closed_ = true;
|
| + ring_buffer_mapping_.reset();
|
| + shared_ring_buffer_ = nullptr;
|
| +
|
| + awakable_list_.CancelAll();
|
| + if (!transferred_) {
|
| + base::AutoUnlock unlock(lock_);
|
| + node_controller_->ClosePort(control_port_);
|
| }
|
|
|
| - return true;
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| -void DataPipeProducerDispatcher::SerializeInternal() {
|
| - // We need to stop watching handle immediately, even though not on IO thread,
|
| - // so that other messages aren't read after this.
|
| - if (channel_) {
|
| - std::vector<char> serialized_read_buffer;
|
| - std::vector<int> fds;
|
| - bool write_error = false;
|
| - serialized_platform_handle_ = channel_->ReleaseHandle(
|
| - &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
|
| - &write_error);
|
| - CHECK(serialized_read_buffer.empty());
|
| - CHECK(fds.empty());
|
| - if (write_error)
|
| - serialized_platform_handle_.reset();
|
| - channel_ = nullptr;
|
| +HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
|
| + const {
|
| + lock_.AssertAcquired();
|
| + HandleSignalsState rv;
|
| + if (!peer_closed_) {
|
| + if (!in_two_phase_write_ && shared_ring_buffer_ &&
|
| + available_capacity_ > 0)
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| + } else {
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + }
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + return rv;
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
|
| + DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
|
| + << num_bytes << " bytes written. [control_port="
|
| + << control_port_.name() << "]";
|
| +
|
| + SendDataPipeControlMessage(node_controller_, control_port_,
|
| + DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::OnPortStatusChanged() {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + // We stop observing the control port as soon it's transferred, but this can
|
| + // race with events which are raised right before that happens. This is fine
|
| + // to ignore.
|
| + if (transferred_)
|
| + return;
|
| +
|
| + DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
|
| +
|
| + UpdateSignalsStateNoLock();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
|
| + lock_.AssertAcquired();
|
| +
|
| + bool was_peer_closed = peer_closed_;
|
| + size_t previous_capacity = available_capacity_;
|
| +
|
| + ports::PortStatus port_status;
|
| + if (node_controller_->node()->GetStatus(control_port_, &port_status) !=
|
| + ports::OK ||
|
| + !port_status.receiving_messages) {
|
| + DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
|
| + << " [control_port=" << control_port_.name() << "]";
|
| +
|
| + peer_closed_ = true;
|
| + }
|
| +
|
| + if (port_status.has_messages && !in_transit_) {
|
| + ports::ScopedMessage message;
|
| + do {
|
| + int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
|
| + &message);
|
| + if (rv != ports::OK)
|
| + peer_closed_ = true;
|
| + if (message) {
|
| + PortsMessage* ports_message = static_cast<PortsMessage*>(message.get());
|
| + const DataPipeControlMessage* m =
|
| + static_cast<const DataPipeControlMessage*>(
|
| + ports_message->payload_bytes());
|
| +
|
| + if (m->command != DataPipeCommand::DATA_WAS_READ) {
|
| + DLOG(ERROR) << "Unexpected message from consumer.";
|
| + peer_closed_ = true;
|
| + break;
|
| + }
|
| +
|
| + if (static_cast<size_t>(available_capacity_) + m->num_bytes >
|
| + options_.capacity_num_bytes) {
|
| + DLOG(ERROR) << "Consumer claims to have read too many bytes.";
|
| + break;
|
| + }
|
| +
|
| + DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
|
| + << m->num_bytes << " bytes were read. [control_port="
|
| + << control_port_.name() << "]";
|
| +
|
| + available_capacity_ += m->num_bytes;
|
| + }
|
| + } while (message);
|
| + }
|
| +
|
| + if (peer_closed_ != was_peer_closed ||
|
| + available_capacity_ != previous_capacity) {
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
| - serialized_ = true;
|
| }
|
|
|
| } // namespace edk
|
|
|