Chromium Code Reviews| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc |
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..db18123a781c33975cc02755fd6cb53a2723152b |
| --- /dev/null |
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
| @@ -0,0 +1,360 @@ |
| +// Copyright 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/logging.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "mojo/edk/embedder/embedder_internal.h" |
| +#include "mojo/edk/system/configuration.h" |
| +#include "mojo/edk/system/data_pipe.h" |
| +#include "mojo/edk/system/memory.h" |
| + |
| +namespace mojo { |
| +namespace edk { |
| + |
| +void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe) { |
| + if (message_pipe.is_valid()) { |
| + channel_ = RawChannel::Create(message_pipe.Pass()); |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
| + } |
| +} |
| + |
| +void DataPipeProducerDispatcher::InitOnIO() { |
| + base::AutoLock locker(lock()); |
| + if (channel_) |
| + channel_->Init(this); |
| +} |
| + |
| +void DataPipeProducerDispatcher::CloseOnIO() { |
| + base::AutoLock locker(lock()); |
| + if (channel_) { |
| + channel_->Shutdown(); |
| + channel_ = nullptr; |
| + } |
| +} |
| + |
| +Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
| + return Type::DATA_PIPE_PRODUCER; |
| +} |
| + |
| +scoped_refptr<DataPipeProducerDispatcher> |
| +DataPipeProducerDispatcher::Deserialize( |
| + const void* source, |
| + size_t size, |
| + PlatformHandleVector* platform_handles) { |
| + MojoCreateDataPipeOptions options; |
| + ScopedPlatformHandle platform_handle = |
| + DataPipe::Deserialize(source, size, platform_handles, &options, |
| + nullptr, 0); |
| + |
| + scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); |
| + if (platform_handle.is_valid()) |
| + rv->Init(platform_handle.Pass()); |
| + return rv; |
| +} |
| + |
| +DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
| + const MojoCreateDataPipeOptions& options) |
| + : options_(options), channel_(nullptr), error_(false) { |
| +} |
| + |
| +DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
| + // |Close()|/|CloseImplNoLock()| should have taken care of the channel. |
| + DCHECK(!channel_); |
| +} |
| + |
| +void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { |
| + lock().AssertAcquired(); |
| + awakable_list_.CancelAll(); |
| +} |
| + |
| +void DataPipeProducerDispatcher::CloseImplNoLock() { |
| + lock().AssertAcquired(); |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); |
| +} |
| + |
| +scoped_refptr<Dispatcher> |
| +DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| + lock().AssertAcquired(); |
| + |
| + scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); |
| + rv->channel_ = channel_; |
| + channel_ = nullptr; |
| + rv->options_ = options_; |
| + return scoped_refptr<Dispatcher>(rv.get()); |
| +} |
| + |
| +MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
| + UserPointer<const void> elements, |
| + UserPointer<uint32_t> num_bytes, |
| + MojoWriteDataFlags flags) { |
| + lock().AssertAcquired(); |
| + if (InTwoPhaseWrite()) |
| + return MOJO_RESULT_BUSY; |
| + if (error_) |
| + return MOJO_RESULT_FAILED_PRECONDITION; |
| + if (num_bytes.Get() % options_.element_num_bytes != 0) |
| + return MOJO_RESULT_INVALID_ARGUMENT; |
| + if (num_bytes.Get() == 0) |
| + return MOJO_RESULT_OK; // Nothing to do. |
| + |
| + // For now, we ignore options.capacity_num_bytes as a total of all pending |
| + // writes (and just treat it per message). We will implement that later if |
| + // we need to. All current uses want all their data to be sent, and it's not |
| + // clear that this backpressure should be done at the mojo layer or at a |
| + // higher application layer. |
| + bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
| + uint32_t min_num_bytes_to_write = all_or_none ? num_bytes.Get() : 0; |
| + if (min_num_bytes_to_write > options_.capacity_num_bytes) { |
| + // Don't return "should wait" since you can't wait for a specified amount of |
| + // data. |
| + return MOJO_RESULT_OUT_OF_RANGE; |
| + } |
| + |
| + size_t num_bytes_to_write = |
| + std::min(static_cast<size_t>(num_bytes.Get()), |
|
Ken Rockot(use gerrit already)
2015/09/23 22:32:17
size_t should be uint32_t
|
| + options_.capacity_num_bytes); |
| + if (num_bytes_to_write == 0) |
| + return MOJO_RESULT_SHOULD_WAIT; |
| + |
| + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| + |
| + num_bytes.Put(num_bytes_to_write); |
| + WriteDataIntoMessages(elements, num_bytes_to_write); |
| + |
| + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| + if (!new_state.equals(old_state)) |
| + awakable_list_.AwakeForStateChange(new_state); |
| + return MOJO_RESULT_OK; |
| +} |
| + |
| +MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
| + UserPointer<void*> buffer, |
| + UserPointer<uint32_t> buffer_num_bytes, |
| + MojoWriteDataFlags flags) { |
| + lock().AssertAcquired(); |
| + if (InTwoPhaseWrite()) |
| + return MOJO_RESULT_BUSY; |
| + if (error_) |
| + return MOJO_RESULT_FAILED_PRECONDITION; |
| + |
| + bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
| + uint32_t min_num_bytes_to_write = 0; |
| + if (all_or_none) { |
| + min_num_bytes_to_write = buffer_num_bytes.Get(); |
| + if (min_num_bytes_to_write % options_.element_num_bytes != 0) |
| + return MOJO_RESULT_INVALID_ARGUMENT; |
| + if (min_num_bytes_to_write > options_.capacity_num_bytes) { |
| + // Don't return "should wait" since you can't wait for a specified amount |
| + // of data. |
| + return MOJO_RESULT_OUT_OF_RANGE; |
| + } |
| + } |
| + |
| + // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. |
| + if (buffer_num_bytes.Get() == 0) |
| + buffer_num_bytes.Put(options_.capacity_num_bytes); |
| + |
| + two_phase_data_.resize(buffer_num_bytes.Get()); |
| + buffer.Put(&two_phase_data_[0]); |
| + |
| + // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes |
| + // we can construct a MessageInTransit here. But then we need to make |
| + // MessageInTransit support changing its data size later. |
| + |
| + return MOJO_RESULT_OK; |
| +} |
| + |
| +MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( |
| + uint32_t num_bytes_written) { |
| + lock().AssertAcquired(); |
| + if (!InTwoPhaseWrite()) |
| + return MOJO_RESULT_FAILED_PRECONDITION; |
| + |
| + // Note: Allow successful completion of the two-phase write even if the other |
| + // side has been closed. |
| + MojoResult rv = MOJO_RESULT_OK; |
| + if (num_bytes_written > two_phase_data_.size() || |
| + num_bytes_written % options_.element_num_bytes != 0) { |
| + rv = MOJO_RESULT_INVALID_ARGUMENT; |
| + } else if (channel_) { |
| + WriteDataIntoMessages( |
| + MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])), |
| + num_bytes_written); |
| + } |
| + |
| + // Two-phase write ended even on failure. |
| + two_phase_data_.clear(); |
| + // If we're now writable, we *became* writable (since we weren't writable |
| + // during the two-phase write), so awake producer awakables. |
| + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| + if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
| + awakable_list_.AwakeForStateChange(new_state); |
| + |
| + return rv; |
| +} |
| + |
| +HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
| + const { |
| + lock().AssertAcquired(); |
| + |
| + HandleSignalsState rv; |
| + if (!error_) { |
| + if (!InTwoPhaseWrite()) |
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| + } else { |
| + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| + } |
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| + return rv; |
| +} |
| + |
| +MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
| + Awakable* awakable, |
| + MojoHandleSignals signals, |
| + uint32_t context, |
| + HandleSignalsState* signals_state) { |
| + lock().AssertAcquired(); |
| + HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| + if (state.satisfies(signals)) { |
| + if (signals_state) |
| + *signals_state = state; |
| + return MOJO_RESULT_ALREADY_EXISTS; |
| + } |
| + if (!state.can_satisfy(signals)) { |
| + if (signals_state) |
| + *signals_state = state; |
| + return MOJO_RESULT_FAILED_PRECONDITION; |
| + } |
| + |
| + awakable_list_.Add(awakable, signals, context); |
| + return MOJO_RESULT_OK; |
| +} |
| + |
| +void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( |
| + Awakable* awakable, |
| + HandleSignalsState* signals_state) { |
| + lock().AssertAcquired(); |
| + awakable_list_.Remove(awakable); |
| + if (signals_state) |
| + *signals_state = GetHandleSignalsStateImplNoLock(); |
| +} |
| + |
| +void DataPipeProducerDispatcher::StartSerializeImplNoLock( |
| + size_t* max_size, |
| + size_t* max_platform_handles) { |
| + DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
| + |
| + if (channel_) { |
| + std::vector<char> temp; |
| + serialized_platform_handle_ = channel_->ReleaseHandle(&temp); |
| + channel_ = nullptr; |
| + DCHECK(temp.empty()); |
| + } |
| + DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
| + false, max_size, max_platform_handles); |
| +} |
| + |
| +bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( |
| + void* destination, |
| + size_t* actual_size, |
| + PlatformHandleVector* platform_handles) { |
| + DCHECK(HasOneRef()); // Only one ref => no need to take the lock. |
| + |
| + DataPipe::EndSerialize( |
| + options_, |
| + serialized_platform_handle_.Pass(), |
| + ScopedPlatformHandle(), 0, |
| + destination, actual_size, platform_handles); |
| + CloseImplNoLock(); |
| + return true; |
| +} |
| + |
| +void DataPipeProducerDispatcher::TransportStarted() { |
| + started_transport_.Acquire(); |
| +} |
| + |
| +void DataPipeProducerDispatcher::TransportEnded() { |
| + started_transport_.Release(); |
| +} |
| + |
| +bool DataPipeProducerDispatcher::IsBusyNoLock() const { |
| + lock().AssertAcquired(); |
| + return InTwoPhaseWrite(); |
| +} |
| + |
| +void DataPipeProducerDispatcher::OnReadMessage( |
| + const MessageInTransit::View& message_view, |
| + ScopedPlatformHandleVectorPtr platform_handles) { |
| + NOTREACHED(); |
| +} |
| + |
| +void DataPipeProducerDispatcher::OnError(Error error) { |
| + switch (error) { |
| + case ERROR_READ_SHUTDOWN: |
| + case ERROR_READ_BROKEN: |
| + case ERROR_READ_BAD_MESSAGE: |
| + case ERROR_READ_UNKNOWN: |
| + LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages"; |
| + break; |
| + case ERROR_WRITE: |
| + // Write errors are slightly notable: they probably shouldn't happen under |
| + // normal operation (but maybe the other side crashed). |
| + LOG(WARNING) << "DataPipeProducerDispatcher write error"; |
| + break; |
| + } |
| + |
| + error_ = true; |
| + if (started_transport_.Try()) { |
| + base::AutoLock locker(lock()); |
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| + |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); |
| + channel_ = nullptr; |
| + started_transport_.Release(); |
| + } else { |
| + // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| + } |
| +} |
| + |
| +bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { |
| + return !two_phase_data_.empty(); |
| +} |
| + |
| +bool DataPipeProducerDispatcher::WriteDataIntoMessages( |
| + UserPointer<const void> elements, |
| + size_t num_bytes) { |
| + // The maximum amount of data to send per message (make it a multiple of the |
| + // element size. |
| + size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
| + max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; |
| + DCHECK_GT(max_message_num_bytes, 0u); |
| + |
| + size_t offset = 0; |
| + while (offset < num_bytes) { |
| + size_t message_num_bytes = |
| + std::min(max_message_num_bytes, num_bytes - offset); |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::MESSAGE, message_num_bytes, |
| + elements.At(offset))); |
| + if (!channel_->WriteMessage(message.Pass())) { |
| + error_ = true; |
| + return false; |
| + } |
| + |
| + offset += message_num_bytes; |
| + } |
| + |
| + return true; |
| +} |
| + |
| +} // namespace edk |
| +} // namespace mojo |