Index: third_party/mojo/src/mojo/edk/system/data_pipe.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe.cc b/third_party/mojo/src/mojo/edk/system/data_pipe.cc |
deleted file mode 100644 |
index 396a1603e99bc0e27d675ef75ebd58e661a3bddc..0000000000000000000000000000000000000000 |
--- a/third_party/mojo/src/mojo/edk/system/data_pipe.cc |
+++ /dev/null |
@@ -1,809 +0,0 @@ |
-// Copyright 2013 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-#include "third_party/mojo/src/mojo/edk/system/data_pipe.h" |
- |
-#include <string.h> |
-#include <algorithm> |
-#include <limits> |
-#include <utility> |
- |
-#include "base/logging.h" |
-#include "base/memory/aligned_memory.h" |
-#include "third_party/mojo/src/mojo/edk/system/awakable_list.h" |
-#include "third_party/mojo/src/mojo/edk/system/channel.h" |
-#include "third_party/mojo/src/mojo/edk/system/configuration.h" |
-#include "third_party/mojo/src/mojo/edk/system/data_pipe_impl.h" |
-#include "third_party/mojo/src/mojo/edk/system/incoming_endpoint.h" |
-#include "third_party/mojo/src/mojo/edk/system/local_data_pipe_impl.h" |
-#include "third_party/mojo/src/mojo/edk/system/memory.h" |
-#include "third_party/mojo/src/mojo/edk/system/options_validation.h" |
-#include "third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.h" |
-#include "third_party/mojo/src/mojo/edk/system/remote_producer_data_pipe_impl.h" |
- |
-namespace mojo { |
-namespace system { |
- |
-// static |
-MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
- MojoCreateDataPipeOptions result = { |
- static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
- 1u, |
- static_cast<uint32_t>( |
- GetConfiguration().default_data_pipe_capacity_bytes)}; |
- return result; |
-} |
- |
-// static |
-MojoResult DataPipe::ValidateCreateOptions( |
- UserPointer<const MojoCreateDataPipeOptions> in_options, |
- MojoCreateDataPipeOptions* out_options) { |
- const MojoCreateDataPipeOptionsFlags kKnownFlags = |
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
- |
- *out_options = GetDefaultCreateOptions(); |
- if (in_options.IsNull()) |
- return MOJO_RESULT_OK; |
- |
- UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options); |
- if (!reader.is_valid()) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader)) |
- return MOJO_RESULT_OK; |
- if ((reader.options().flags & ~kKnownFlags)) |
- return MOJO_RESULT_UNIMPLEMENTED; |
- out_options->flags = reader.options().flags; |
- |
- // Checks for fields beyond |flags|: |
- |
- if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, element_num_bytes, |
- reader)) |
- return MOJO_RESULT_OK; |
- if (reader.options().element_num_bytes == 0) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- out_options->element_num_bytes = reader.options().element_num_bytes; |
- |
- if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes, |
- reader) || |
- reader.options().capacity_num_bytes == 0) { |
- // Round the default capacity down to a multiple of the element size (but at |
- // least one element). |
- size_t default_data_pipe_capacity_bytes = |
- GetConfiguration().default_data_pipe_capacity_bytes; |
- out_options->capacity_num_bytes = |
- std::max(static_cast<uint32_t>(default_data_pipe_capacity_bytes - |
- (default_data_pipe_capacity_bytes % |
- out_options->element_num_bytes)), |
- out_options->element_num_bytes); |
- return MOJO_RESULT_OK; |
- } |
- if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- if (reader.options().capacity_num_bytes > |
- GetConfiguration().max_data_pipe_capacity_bytes) |
- return MOJO_RESULT_RESOURCE_EXHAUSTED; |
- out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
- |
- return MOJO_RESULT_OK; |
-} |
- |
-// static |
-DataPipe* DataPipe::CreateLocal( |
- const MojoCreateDataPipeOptions& validated_options) { |
- return new DataPipe(true, true, validated_options, |
- make_scoped_ptr(new LocalDataPipeImpl())); |
-} |
- |
-// static |
-DataPipe* DataPipe::CreateRemoteProducerFromExisting( |
- const MojoCreateDataPipeOptions& validated_options, |
- MessageInTransitQueue* message_queue, |
- ChannelEndpoint* channel_endpoint) { |
- scoped_ptr<char, base::AlignedFreeDeleter> buffer; |
- size_t buffer_num_bytes = 0; |
- if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
- validated_options, message_queue, &buffer, &buffer_num_bytes)) |
- return nullptr; |
- |
- // Important: This is called under |IncomingEndpoint|'s (which is a |
- // |ChannelEndpointClient|) lock, in particular from |
- // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that |
- // lock, it will reset its |endpoint_| member, which makes any later or |
- // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
- // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
- // is called. |
- DataPipe* data_pipe = new DataPipe( |
- false, true, validated_options, |
- make_scoped_ptr(new RemoteProducerDataPipeImpl( |
- channel_endpoint, std::move(buffer), 0, buffer_num_bytes))); |
- if (channel_endpoint) { |
- if (!channel_endpoint->ReplaceClient(data_pipe, 0)) |
- data_pipe->OnDetachFromChannel(0); |
- } else { |
- data_pipe->SetProducerClosed(); |
- } |
- return data_pipe; |
-} |
- |
-// static |
-DataPipe* DataPipe::CreateRemoteConsumerFromExisting( |
- const MojoCreateDataPipeOptions& validated_options, |
- size_t consumer_num_bytes, |
- MessageInTransitQueue* message_queue, |
- ChannelEndpoint* channel_endpoint) { |
- if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
- validated_options, &consumer_num_bytes, message_queue)) |
- return nullptr; |
- |
- // Important: This is called under |IncomingEndpoint|'s (which is a |
- // |ChannelEndpointClient|) lock, in particular from |
- // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that |
- // lock, it will reset its |endpoint_| member, which makes any later or |
- // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
- // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
- // is called. |
- DataPipe* data_pipe = |
- new DataPipe(true, false, validated_options, |
- make_scoped_ptr(new RemoteConsumerDataPipeImpl( |
- channel_endpoint, consumer_num_bytes, nullptr, 0))); |
- if (channel_endpoint) { |
- if (!channel_endpoint->ReplaceClient(data_pipe, 0)) |
- data_pipe->OnDetachFromChannel(0); |
- } else { |
- data_pipe->SetConsumerClosed(); |
- } |
- return data_pipe; |
-} |
- |
-// static |
-bool DataPipe::ProducerDeserialize(Channel* channel, |
- const void* source, |
- size_t size, |
- scoped_refptr<DataPipe>* data_pipe) { |
- DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
- |
- bool consumer_open = false; |
- if (size == sizeof(SerializedDataPipeProducerDispatcher)) { |
- consumer_open = false; |
- } else if (size == |
- sizeof(SerializedDataPipeProducerDispatcher) + |
- channel->GetSerializedEndpointSize()) { |
- consumer_open = true; |
- } else { |
- LOG(ERROR) << "Invalid serialized data pipe producer"; |
- return false; |
- } |
- |
- const SerializedDataPipeProducerDispatcher* s = |
- static_cast<const SerializedDataPipeProducerDispatcher*>(source); |
- MojoCreateDataPipeOptions revalidated_options = {}; |
- if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
- &revalidated_options) != MOJO_RESULT_OK) { |
- LOG(ERROR) << "Invalid serialized data pipe producer (bad options)"; |
- return false; |
- } |
- |
- if (!consumer_open) { |
- if (s->consumer_num_bytes != static_cast<uint32_t>(-1)) { |
- LOG(ERROR) |
- << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
- return false; |
- } |
- |
- *data_pipe = new DataPipe(true, false, revalidated_options, |
- make_scoped_ptr(new RemoteConsumerDataPipeImpl( |
- nullptr, 0, nullptr, 0))); |
- (*data_pipe)->SetConsumerClosed(); |
- |
- return true; |
- } |
- |
- if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || |
- s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { |
- LOG(ERROR) |
- << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
- return false; |
- } |
- |
- const void* endpoint_source = static_cast<const char*>(source) + |
- sizeof(SerializedDataPipeProducerDispatcher); |
- scoped_refptr<IncomingEndpoint> incoming_endpoint = |
- channel->DeserializeEndpoint(endpoint_source); |
- if (!incoming_endpoint) |
- return false; |
- |
- *data_pipe = incoming_endpoint->ConvertToDataPipeProducer( |
- revalidated_options, s->consumer_num_bytes); |
- if (!*data_pipe) |
- return false; |
- |
- return true; |
-} |
- |
-// static |
-bool DataPipe::ConsumerDeserialize(Channel* channel, |
- const void* source, |
- size_t size, |
- scoped_refptr<DataPipe>* data_pipe) { |
- DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
- |
- if (size != |
- sizeof(SerializedDataPipeConsumerDispatcher) + |
- channel->GetSerializedEndpointSize()) { |
- LOG(ERROR) << "Invalid serialized data pipe consumer"; |
- return false; |
- } |
- |
- const SerializedDataPipeConsumerDispatcher* s = |
- static_cast<const SerializedDataPipeConsumerDispatcher*>(source); |
- MojoCreateDataPipeOptions revalidated_options = {}; |
- if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
- &revalidated_options) != MOJO_RESULT_OK) { |
- LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)"; |
- return false; |
- } |
- |
- const void* endpoint_source = static_cast<const char*>(source) + |
- sizeof(SerializedDataPipeConsumerDispatcher); |
- scoped_refptr<IncomingEndpoint> incoming_endpoint = |
- channel->DeserializeEndpoint(endpoint_source); |
- if (!incoming_endpoint) |
- return false; |
- |
- *data_pipe = |
- incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options); |
- if (!*data_pipe) |
- return false; |
- |
- return true; |
-} |
- |
-void DataPipe::ProducerCancelAllAwakables() { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- producer_awakable_list_->CancelAll(); |
-} |
- |
-void DataPipe::ProducerClose() { |
- base::AutoLock locker(lock_); |
- ProducerCloseNoLock(); |
-} |
- |
-MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements, |
- UserPointer<uint32_t> num_bytes, |
- bool all_or_none) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- |
- if (producer_in_two_phase_write_no_lock()) |
- return MOJO_RESULT_BUSY; |
- if (!consumer_open_no_lock()) |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- |
- // Returning "busy" takes priority over "invalid argument". |
- uint32_t max_num_bytes_to_write = num_bytes.Get(); |
- if (max_num_bytes_to_write % element_num_bytes() != 0) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- if (max_num_bytes_to_write == 0) |
- return MOJO_RESULT_OK; // Nothing to do. |
- |
- uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; |
- |
- HandleSignalsState old_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- MojoResult rv = impl_->ProducerWriteData( |
- elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); |
- HandleSignalsState new_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- if (!new_consumer_state.equals(old_consumer_state)) |
- AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
- return rv; |
-} |
- |
-MojoResult DataPipe::ProducerBeginWriteData( |
- UserPointer<void*> buffer, |
- UserPointer<uint32_t> buffer_num_bytes) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- |
- if (producer_in_two_phase_write_no_lock()) |
- return MOJO_RESULT_BUSY; |
- if (!consumer_open_no_lock()) |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- |
- MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes); |
- if (rv != MOJO_RESULT_OK) |
- return rv; |
- // Note: No need to awake producer awakables, even though we're going from |
- // writable to non-writable (since you can't wait on non-writability). |
- // Similarly, though this may have discarded data (in "may discard" mode), |
- // making it non-readable, there's still no need to awake consumer awakables. |
- DCHECK(producer_in_two_phase_write_no_lock()); |
- return MOJO_RESULT_OK; |
-} |
- |
-MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- |
- if (!producer_in_two_phase_write_no_lock()) |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- // Note: Allow successful completion of the two-phase write even if the |
- // consumer has been closed. |
- |
- HandleSignalsState old_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- MojoResult rv; |
- if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || |
- num_bytes_written % element_num_bytes() != 0) { |
- rv = MOJO_RESULT_INVALID_ARGUMENT; |
- producer_two_phase_max_num_bytes_written_ = 0; |
- } else { |
- rv = impl_->ProducerEndWriteData(num_bytes_written); |
- } |
- // Two-phase write ended even on failure. |
- DCHECK(!producer_in_two_phase_write_no_lock()); |
- // If we're now writable, we *became* writable (since we weren't writable |
- // during the two-phase write), so awake producer awakables. |
- HandleSignalsState new_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
- AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
- HandleSignalsState new_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- if (!new_consumer_state.equals(old_consumer_state)) |
- AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
- return rv; |
-} |
- |
-HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- return impl_->ProducerGetHandleSignalsState(); |
-} |
- |
-MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
- MojoHandleSignals signals, |
- uintptr_t context, |
- HandleSignalsState* signals_state) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- |
- HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState(); |
- if (producer_state.satisfies(signals)) { |
- if (signals_state) |
- *signals_state = producer_state; |
- return MOJO_RESULT_ALREADY_EXISTS; |
- } |
- if (!producer_state.can_satisfy(signals)) { |
- if (signals_state) |
- *signals_state = producer_state; |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- } |
- |
- producer_awakable_list_->Add(awakable, signals, context); |
- return MOJO_RESULT_OK; |
-} |
- |
-void DataPipe::ProducerRemoveAwakable(Awakable* awakable, |
- HandleSignalsState* signals_state) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- producer_awakable_list_->Remove(awakable); |
- if (signals_state) |
- *signals_state = impl_->ProducerGetHandleSignalsState(); |
-} |
- |
-void DataPipe::ProducerStartSerialize(Channel* channel, |
- size_t* max_size, |
- size_t* max_platform_handles) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- impl_->ProducerStartSerialize(channel, max_size, max_platform_handles); |
-} |
- |
-bool DataPipe::ProducerEndSerialize( |
- Channel* channel, |
- void* destination, |
- size_t* actual_size, |
- embedder::PlatformHandleVector* platform_handles) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_producer_no_lock()); |
- // Warning: After |ProducerEndSerialize()|, quite probably |impl_| has |
- // changed. |
- bool rv = impl_->ProducerEndSerialize(channel, destination, actual_size, |
- platform_handles); |
- |
- // TODO(vtl): The code below is similar to, but not quite the same as, |
- // |ProducerCloseNoLock()|. |
- DCHECK(has_local_producer_no_lock()); |
- producer_awakable_list_->CancelAll(); |
- producer_awakable_list_.reset(); |
- // Not a bug, except possibly in "user" code. |
- DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
- << "Producer transferred with active two-phase write"; |
- producer_two_phase_max_num_bytes_written_ = 0; |
- if (!has_local_consumer_no_lock()) |
- producer_open_ = false; |
- |
- return rv; |
-} |
- |
-bool DataPipe::ProducerIsBusy() const { |
- base::AutoLock locker(lock_); |
- return producer_in_two_phase_write_no_lock(); |
-} |
- |
-void DataPipe::ConsumerCancelAllAwakables() { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- consumer_awakable_list_->CancelAll(); |
-} |
- |
-void DataPipe::ConsumerClose() { |
- base::AutoLock locker(lock_); |
- ConsumerCloseNoLock(); |
-} |
- |
-MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements, |
- UserPointer<uint32_t> num_bytes, |
- bool all_or_none, |
- bool peek) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- |
- if (consumer_in_two_phase_read_no_lock()) |
- return MOJO_RESULT_BUSY; |
- |
- uint32_t max_num_bytes_to_read = num_bytes.Get(); |
- if (max_num_bytes_to_read % element_num_bytes() != 0) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- if (max_num_bytes_to_read == 0) |
- return MOJO_RESULT_OK; // Nothing to do. |
- |
- uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
- |
- HandleSignalsState old_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- MojoResult rv = impl_->ConsumerReadData( |
- elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); |
- HandleSignalsState new_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- if (!new_producer_state.equals(old_producer_state)) |
- AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
- return rv; |
-} |
- |
-MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, |
- bool all_or_none) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- |
- if (consumer_in_two_phase_read_no_lock()) |
- return MOJO_RESULT_BUSY; |
- |
- uint32_t max_num_bytes_to_discard = num_bytes.Get(); |
- if (max_num_bytes_to_discard % element_num_bytes() != 0) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- if (max_num_bytes_to_discard == 0) |
- return MOJO_RESULT_OK; // Nothing to do. |
- |
- uint32_t min_num_bytes_to_discard = |
- all_or_none ? max_num_bytes_to_discard : 0; |
- |
- HandleSignalsState old_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- MojoResult rv = impl_->ConsumerDiscardData( |
- num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); |
- HandleSignalsState new_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- if (!new_producer_state.equals(old_producer_state)) |
- AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
- return rv; |
-} |
- |
-MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- |
- if (consumer_in_two_phase_read_no_lock()) |
- return MOJO_RESULT_BUSY; |
- |
- // Note: Don't need to validate |*num_bytes| for query. |
- return impl_->ConsumerQueryData(num_bytes); |
-} |
- |
-MojoResult DataPipe::ConsumerBeginReadData( |
- UserPointer<const void*> buffer, |
- UserPointer<uint32_t> buffer_num_bytes) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- |
- if (consumer_in_two_phase_read_no_lock()) |
- return MOJO_RESULT_BUSY; |
- |
- MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes); |
- if (rv != MOJO_RESULT_OK) |
- return rv; |
- DCHECK(consumer_in_two_phase_read_no_lock()); |
- return MOJO_RESULT_OK; |
-} |
- |
-MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- |
- if (!consumer_in_two_phase_read_no_lock()) |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- |
- HandleSignalsState old_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- HandleSignalsState old_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- MojoResult rv; |
- if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || |
- num_bytes_read % element_num_bytes() != 0) { |
- rv = MOJO_RESULT_INVALID_ARGUMENT; |
- consumer_two_phase_max_num_bytes_read_ = 0; |
- } else { |
- rv = impl_->ConsumerEndReadData(num_bytes_read); |
- } |
- // Two-phase read ended even on failure. |
- DCHECK(!consumer_in_two_phase_read_no_lock()); |
- // If we're now readable, we *became* readable (since we weren't readable |
- // during the two-phase read), so awake consumer awakables. |
- HandleSignalsState new_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- if (!new_consumer_state.equals(old_consumer_state)) |
- AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
- HandleSignalsState new_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- if (!new_producer_state.equals(old_producer_state)) |
- AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
- return rv; |
-} |
- |
-HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- return impl_->ConsumerGetHandleSignalsState(); |
-} |
- |
-MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
- MojoHandleSignals signals, |
- uintptr_t context, |
- HandleSignalsState* signals_state) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- |
- HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState(); |
- if (consumer_state.satisfies(signals)) { |
- if (signals_state) |
- *signals_state = consumer_state; |
- return MOJO_RESULT_ALREADY_EXISTS; |
- } |
- if (!consumer_state.can_satisfy(signals)) { |
- if (signals_state) |
- *signals_state = consumer_state; |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- } |
- |
- consumer_awakable_list_->Add(awakable, signals, context); |
- return MOJO_RESULT_OK; |
-} |
- |
-void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, |
- HandleSignalsState* signals_state) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- consumer_awakable_list_->Remove(awakable); |
- if (signals_state) |
- *signals_state = impl_->ConsumerGetHandleSignalsState(); |
-} |
- |
-void DataPipe::ConsumerStartSerialize(Channel* channel, |
- size_t* max_size, |
- size_t* max_platform_handles) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles); |
-} |
- |
-bool DataPipe::ConsumerEndSerialize( |
- Channel* channel, |
- void* destination, |
- size_t* actual_size, |
- embedder::PlatformHandleVector* platform_handles) { |
- base::AutoLock locker(lock_); |
- DCHECK(has_local_consumer_no_lock()); |
- // Warning: After |ConsumerEndSerialize()|, quite probably |impl_| has |
- // changed. |
- bool rv = impl_->ConsumerEndSerialize(channel, destination, actual_size, |
- platform_handles); |
- |
- // TODO(vtl): The code below is similar to, but not quite the same as, |
- // |ConsumerCloseNoLock()|. |
- consumer_awakable_list_->CancelAll(); |
- consumer_awakable_list_.reset(); |
- // Not a bug, except possibly in "user" code. |
- DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
- << "Consumer transferred with active two-phase read"; |
- consumer_two_phase_max_num_bytes_read_ = 0; |
- if (!has_local_producer_no_lock()) |
- consumer_open_ = false; |
- |
- return rv; |
-} |
- |
-bool DataPipe::ConsumerIsBusy() const { |
- base::AutoLock locker(lock_); |
- return consumer_in_two_phase_read_no_lock(); |
-} |
- |
-DataPipe::DataPipe(bool has_local_producer, |
- bool has_local_consumer, |
- const MojoCreateDataPipeOptions& validated_options, |
- scoped_ptr<DataPipeImpl> impl) |
- : validated_options_(validated_options), |
- producer_open_(true), |
- consumer_open_(true), |
- producer_awakable_list_(has_local_producer ? new AwakableList() |
- : nullptr), |
- consumer_awakable_list_(has_local_consumer ? new AwakableList() |
- : nullptr), |
- producer_two_phase_max_num_bytes_written_(0), |
- consumer_two_phase_max_num_bytes_read_(0), |
- impl_(std::move(impl)) { |
- impl_->set_owner(this); |
- |
-#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
- // Check that the passed in options actually are validated. |
- MojoCreateDataPipeOptions unused = {}; |
- DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
- MOJO_RESULT_OK); |
-#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
-} |
- |
-DataPipe::~DataPipe() { |
- DCHECK(!producer_open_); |
- DCHECK(!consumer_open_); |
- DCHECK(!producer_awakable_list_); |
- DCHECK(!consumer_awakable_list_); |
-} |
- |
-scoped_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock( |
- scoped_ptr<DataPipeImpl> new_impl) { |
- lock_.AssertAcquired(); |
- DCHECK(new_impl); |
- |
- impl_->set_owner(nullptr); |
- scoped_ptr<DataPipeImpl> rv(std::move(impl_)); |
- impl_ = std::move(new_impl); |
- impl_->set_owner(this); |
- return rv; |
-} |
- |
-void DataPipe::SetProducerClosedNoLock() { |
- lock_.AssertAcquired(); |
- DCHECK(!has_local_producer_no_lock()); |
- DCHECK(producer_open_); |
- producer_open_ = false; |
-} |
- |
-void DataPipe::SetConsumerClosedNoLock() { |
- lock_.AssertAcquired(); |
- DCHECK(!has_local_consumer_no_lock()); |
- DCHECK(consumer_open_); |
- consumer_open_ = false; |
-} |
- |
-void DataPipe::ProducerCloseNoLock() { |
- lock_.AssertAcquired(); |
- DCHECK(producer_open_); |
- producer_open_ = false; |
- if (has_local_producer_no_lock()) { |
- producer_awakable_list_.reset(); |
- // Not a bug, except possibly in "user" code. |
- DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
- << "Producer closed with active two-phase write"; |
- producer_two_phase_max_num_bytes_written_ = 0; |
- impl_->ProducerClose(); |
- AwakeConsumerAwakablesForStateChangeNoLock( |
- impl_->ConsumerGetHandleSignalsState()); |
- } |
-} |
- |
-void DataPipe::ConsumerCloseNoLock() { |
- lock_.AssertAcquired(); |
- DCHECK(consumer_open_); |
- consumer_open_ = false; |
- if (has_local_consumer_no_lock()) { |
- consumer_awakable_list_.reset(); |
- // Not a bug, except possibly in "user" code. |
- DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
- << "Consumer closed with active two-phase read"; |
- consumer_two_phase_max_num_bytes_read_ = 0; |
- impl_->ConsumerClose(); |
- AwakeProducerAwakablesForStateChangeNoLock( |
- impl_->ProducerGetHandleSignalsState()); |
- } |
-} |
- |
-bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
- base::AutoLock locker(lock_); |
- DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock()); |
- |
- HandleSignalsState old_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- HandleSignalsState old_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- |
- bool rv = impl_->OnReadMessage(port, message); |
- |
- HandleSignalsState new_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- if (!new_producer_state.equals(old_producer_state)) |
- AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
- HandleSignalsState new_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- if (!new_consumer_state.equals(old_consumer_state)) |
- AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
- |
- return rv; |
-} |
- |
-void DataPipe::OnDetachFromChannel(unsigned port) { |
- base::AutoLock locker(lock_); |
- DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock()); |
- |
- HandleSignalsState old_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- HandleSignalsState old_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- |
- impl_->OnDetachFromChannel(port); |
- |
- HandleSignalsState new_producer_state = |
- impl_->ProducerGetHandleSignalsState(); |
- if (!new_producer_state.equals(old_producer_state)) |
- AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
- HandleSignalsState new_consumer_state = |
- impl_->ConsumerGetHandleSignalsState(); |
- if (!new_consumer_state.equals(old_consumer_state)) |
- AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
-} |
- |
-void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( |
- const HandleSignalsState& new_producer_state) { |
- lock_.AssertAcquired(); |
- if (!has_local_producer_no_lock()) |
- return; |
- producer_awakable_list_->AwakeForStateChange(new_producer_state); |
-} |
- |
-void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock( |
- const HandleSignalsState& new_consumer_state) { |
- lock_.AssertAcquired(); |
- if (!has_local_consumer_no_lock()) |
- return; |
- consumer_awakable_list_->AwakeForStateChange(new_consumer_state); |
-} |
- |
-void DataPipe::SetProducerClosed() { |
- base::AutoLock locker(lock_); |
- SetProducerClosedNoLock(); |
-} |
- |
-void DataPipe::SetConsumerClosed() { |
- base::AutoLock locker(lock_); |
- SetConsumerClosedNoLock(); |
-} |
- |
-} // namespace system |
-} // namespace mojo |