| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..8a98b62fb0c3390623a40cece49aafd78d2f104a
|
| --- /dev/null
|
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| @@ -0,0 +1,343 @@
|
| +// Copyright 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "mojo/edk/embedder/embedder_internal.h"
|
| +#include "mojo/edk/system/configuration.h"
|
| +#include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/memory.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +void DataPipeProducerDispatcher::Init(
|
| + embedder::ScopedPlatformHandle message_pipe) {
|
| + if (message_pipe.is_valid()) {
|
| + channel_ = RawChannel::Create(message_pipe.Pass());
|
| + mojo::embedder::internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
|
| + }
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::InitOnIO() {
|
| + base::AutoLock locker(lock());
|
| + if (channel_)
|
| + channel_->Init(this);
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::CloseOnIO() {
|
| + base::AutoLock locker(lock());
|
| + if (channel_) {
|
| + channel_->Shutdown();
|
| + channel_ = nullptr;
|
| + }
|
| +}
|
| +
|
| +Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
|
| + return Type::DATA_PIPE_PRODUCER;
|
| +}
|
| +
|
| +scoped_refptr<DataPipeProducerDispatcher>
|
| +DataPipeProducerDispatcher::Deserialize(
|
| + const void* source,
|
| + size_t size,
|
| + embedder::PlatformHandleVector* platform_handles) {
|
| + MojoCreateDataPipeOptions options;
|
| + embedder::ScopedPlatformHandle platform_handle =
|
| + DataPipe::Deserialize(source, size, platform_handles, &options,
|
| + nullptr, 0);
|
| +
|
| + scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
|
| + if (platform_handle.is_valid())
|
| + rv->Init(platform_handle.Pass());
|
| + return rv;
|
| +}
|
| +
|
| +DataPipeProducerDispatcher::DataPipeProducerDispatcher(
|
| + const MojoCreateDataPipeOptions& options)
|
| + : options_(options), channel_(nullptr), error_(false) {
|
| +}
|
| +
|
| +DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
|
| + // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
|
| + DCHECK(!channel_);
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
|
| + lock().AssertAcquired();
|
| + awakable_list_.CancelAll();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::CloseImplNoLock() {
|
| + lock().AssertAcquired();
|
| + mojo::embedder::internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
|
| +}
|
| +
|
| +scoped_refptr<Dispatcher>
|
| +DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
| + lock().AssertAcquired();
|
| +
|
| + scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
|
| + rv->channel_ = channel_;
|
| + channel_ = nullptr;
|
| + rv->options_ = options_;
|
| + return scoped_refptr<Dispatcher>(rv.get());
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
|
| + UserPointer<const void> elements,
|
| + UserPointer<uint32_t> num_bytes,
|
| + MojoWriteDataFlags flags) {
|
| + lock().AssertAcquired();
|
| + if (InTwoPhaseWrite())
|
| + return MOJO_RESULT_BUSY;
|
| + if (error_)
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| + if (num_bytes.Get() % options_.element_num_bytes != 0)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + if (num_bytes.Get() == 0)
|
| + return MOJO_RESULT_OK; // Nothing to do.
|
| +
|
| + // For now, we ignore options.capacity_num_bytes as a total of all pending
|
| + // writes (and just treat this per message). We will implement that later if
|
| + // we need to. All current uses want all their data to be sent, and it's not
|
| + // clear that this backpressure should be done at the mojo layer or at a
|
| + // higher application layer.
|
| + if (num_bytes.Get() > options_.capacity_num_bytes)
|
| + return MOJO_RESULT_OUT_OF_RANGE;
|
| +
|
| + // Ignore MOJO_WRITE_DATA_FLAG_ALL_OR_NONE for now, it's also not clear we
|
| + // need/want the functionality to only write a subset at this layer.
|
| +
|
| + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| +
|
| + WriteDataIntoMessages(elements, num_bytes);
|
| +
|
| + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| + if (!new_state.equals(old_state))
|
| + awakable_list_.AwakeForStateChange(new_state);
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
|
| + UserPointer<void*> buffer,
|
| + UserPointer<uint32_t> buffer_num_bytes,
|
| + MojoWriteDataFlags flags) {
|
| + lock().AssertAcquired();
|
| + if (InTwoPhaseWrite())
|
| + return MOJO_RESULT_BUSY;
|
| + if (error_)
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| + if (buffer_num_bytes.Get() % options_.element_num_bytes != 0)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + if (buffer_num_bytes.Get() > options_.capacity_num_bytes)
|
| + return MOJO_RESULT_OUT_OF_RANGE;
|
| +
|
| + if (buffer_num_bytes.Get() == 0)
|
| + buffer_num_bytes.Put(options_.capacity_num_bytes);
|
| +
|
| + two_phase_data_.resize(buffer_num_bytes.Get());
|
| + buffer.Put(&two_phase_data_[0]);
|
| +
|
| + // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
|
| + // we can construct a MessageInTransit here. But then we need to make
|
| + // MessageInTransit support changing its data size later.
|
| +
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
|
| + uint32_t num_bytes_written) {
|
| + lock().AssertAcquired();
|
| + if (!InTwoPhaseWrite())
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| +
|
| + // Note: Allow successful completion of the two-phase write even if the other
|
| + // side has been closed.
|
| + MojoResult rv;
|
| + if (num_bytes_written > two_phase_data_.size() ||
|
| + num_bytes_written % options_.element_num_bytes != 0) {
|
| + rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| + } else {
|
| + WriteDataIntoMessages(
|
| + MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])),
|
| + MakeUserPointer(&num_bytes_written));
|
| + rv = MOJO_RESULT_OK;
|
| + }
|
| +
|
| + // Two-phase write ended even on failure.
|
| + two_phase_data_.clear();
|
| + // If we're now writable, we *became* writable (since we weren't writable
|
| + // during the two-phase write), so awake producer awakables.
|
| + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| + if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| + awakable_list_.AwakeForStateChange(new_state);
|
| +
|
| + return rv;
|
| +}
|
| +
|
| +HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
|
| + const {
|
| + lock().AssertAcquired();
|
| +
|
| + HandleSignalsState rv;
|
| + if (!error_) {
|
| + if (!InTwoPhaseWrite())
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| + } else {
|
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + }
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + return rv;
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
|
| + Awakable* awakable,
|
| + MojoHandleSignals signals,
|
| + uint32_t context,
|
| + HandleSignalsState* signals_state) {
|
| + lock().AssertAcquired();
|
| + HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| + if (state.satisfies(signals)) {
|
| + if (signals_state)
|
| + *signals_state = state;
|
| + return MOJO_RESULT_ALREADY_EXISTS;
|
| + }
|
| + if (!state.can_satisfy(signals)) {
|
| + if (signals_state)
|
| + *signals_state = state;
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| + }
|
| +
|
| + awakable_list_.Add(awakable, signals, context);
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
|
| + Awakable* awakable,
|
| + HandleSignalsState* signals_state) {
|
| + lock().AssertAcquired();
|
| + awakable_list_.Remove(awakable);
|
| + if (signals_state)
|
| + *signals_state = GetHandleSignalsStateImplNoLock();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::StartSerializeImplNoLock(
|
| + size_t* max_size,
|
| + size_t* max_platform_handles) {
|
| + DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
|
| +
|
| + if (channel_) {
|
| + std::vector<char> temp;
|
| + serialized_platform_handle_ = channel_->ReleaseHandle(&temp);
|
| + channel_ = nullptr;
|
| + DCHECK(temp.empty());
|
| + }
|
| + DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
|
| + false, max_size, max_platform_handles);
|
| +}
|
| +
|
| +bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
|
| + void* destination,
|
| + size_t* actual_size,
|
| + embedder::PlatformHandleVector* platform_handles) {
|
| + DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
|
| +
|
| + DataPipe::EndSerialize(
|
| + options_,
|
| + serialized_platform_handle_.Pass(),
|
| + embedder::ScopedPlatformHandle(), 0,
|
| + destination, actual_size, platform_handles);
|
| + CloseImplNoLock();
|
| + return true;
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::TransportStarted() {
|
| + started_transport_.Acquire();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::TransportEnded() {
|
| + started_transport_.Release();
|
| +}
|
| +
|
| +bool DataPipeProducerDispatcher::IsBusyNoLock() const {
|
| + lock().AssertAcquired();
|
| + return InTwoPhaseWrite();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::OnReadMessage(
|
| + const MessageInTransit::View& message_view,
|
| + embedder::ScopedPlatformHandleVectorPtr platform_handles) {
|
| + NOTREACHED();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::OnError(Error error) {
|
| + switch (error) {
|
| + case ERROR_READ_SHUTDOWN:
|
| + case ERROR_READ_BROKEN:
|
| + case ERROR_READ_BAD_MESSAGE:
|
| + case ERROR_READ_UNKNOWN:
|
| + LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages";
|
| + break;
|
| + case ERROR_WRITE:
|
| + // Write errors are slightly notable: they probably shouldn't happen under
|
| + // normal operation (but maybe the other side crashed).
|
| + LOG(WARNING) << "DataPipeProducerDispatcher write error";
|
| + break;
|
| + }
|
| +
|
| + error_ = true;
|
| + if (started_transport_.Try()) {
|
| + base::AutoLock locker(lock());
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| +
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
|
| + channel_ = nullptr;
|
| + started_transport_.Release();
|
| + } else {
|
| + // We must be waiting to call ReleaseHandle. It will call Shutdown.
|
| + }
|
| +}
|
| +
|
| +bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
|
| + return !two_phase_data_.empty();
|
| +}
|
| +
|
| +bool DataPipeProducerDispatcher::WriteDataIntoMessages(
|
| + UserPointer<const void> elements,
|
| + UserPointer<uint32_t> num_bytes) {
|
| + // The maximum amount of data to send per message (make it a multiple of the
|
| + // element size.
|
| + size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
|
| + max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
|
| + DCHECK_GT(max_message_num_bytes, 0u);
|
| +
|
| + size_t offset = 0;
|
| + while (offset < num_bytes.Get()) {
|
| + size_t message_num_bytes =
|
| + std::min(max_message_num_bytes, num_bytes.Get() - offset);
|
| + scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| + MessageInTransit::Type::MESSAGE, message_num_bytes,
|
| + elements.At(offset)));
|
| + if (!channel_->WriteMessage(message.Pass())) {
|
| + error_ = true;
|
| + return false;
|
| + }
|
| +
|
| + offset += message_num_bytes;
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|