| Index: third_party/mojo/src/mojo/edk/system/data_pipe.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe.cc b/third_party/mojo/src/mojo/edk/system/data_pipe.cc
|
| index d5e9db95266b49f20ac2f308314258895f02b190..c46de2cc1e0ff5c3b23f94d0c0f8885273b4702d 100644
|
| --- a/third_party/mojo/src/mojo/edk/system/data_pipe.cc
|
| +++ b/third_party/mojo/src/mojo/edk/system/data_pipe.cc
|
| @@ -10,12 +10,17 @@
|
| #include <limits>
|
|
|
| #include "base/logging.h"
|
| +#include "base/memory/aligned_memory.h"
|
| #include "mojo/edk/system/awakable_list.h"
|
| +#include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/configuration.h"
|
| #include "mojo/edk/system/data_pipe_impl.h"
|
| +#include "mojo/edk/system/incoming_endpoint.h"
|
| #include "mojo/edk/system/local_data_pipe_impl.h"
|
| #include "mojo/edk/system/memory.h"
|
| #include "mojo/edk/system/options_validation.h"
|
| +#include "mojo/edk/system/remote_consumer_data_pipe_impl.h"
|
| +#include "mojo/edk/system/remote_producer_data_pipe_impl.h"
|
|
|
| namespace mojo {
|
| namespace system {
|
| @@ -36,7 +41,7 @@ MojoResult DataPipe::ValidateCreateOptions(
|
| UserPointer<const MojoCreateDataPipeOptions> in_options,
|
| MojoCreateDataPipeOptions* out_options) {
|
| const MojoCreateDataPipeOptionsFlags kKnownFlags =
|
| - MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
|
|
|
| *out_options = GetDefaultCreateOptions();
|
| if (in_options.IsNull())
|
| @@ -92,6 +97,170 @@ DataPipe* DataPipe::CreateLocal(
|
| make_scoped_ptr(new LocalDataPipeImpl()));
|
| }
|
|
|
| +// static
|
| +DataPipe* DataPipe::CreateRemoteProducerFromExisting(
|
| + const MojoCreateDataPipeOptions& validated_options,
|
| + MessageInTransitQueue* message_queue,
|
| + ChannelEndpoint* channel_endpoint) {
|
| + scoped_ptr<char, base::AlignedFreeDeleter> buffer;
|
| + size_t buffer_num_bytes = 0;
|
| + if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint(
|
| + validated_options, message_queue, &buffer, &buffer_num_bytes))
|
| + return nullptr;
|
| +
|
| + // Important: This is called under |IncomingEndpoint|'s (which is a
|
| + // |ChannelEndpointClient|) lock, in particular from
|
| + // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that
|
| + // lock, it will reset its |endpoint_| member, which makes any later or
|
| + // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will
|
| + // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()|
|
| + // is called.
|
| + DataPipe* data_pipe =
|
| + new DataPipe(false, true, validated_options,
|
| + make_scoped_ptr(new RemoteProducerDataPipeImpl(
|
| + channel_endpoint, buffer.Pass(), 0, buffer_num_bytes)));
|
| + if (channel_endpoint) {
|
| + if (!channel_endpoint->ReplaceClient(data_pipe, 0))
|
| + data_pipe->OnDetachFromChannel(0);
|
| + } else {
|
| + data_pipe->SetProducerClosed();
|
| + }
|
| + return data_pipe;
|
| +}
|
| +
|
| +// static
|
| +DataPipe* DataPipe::CreateRemoteConsumerFromExisting(
|
| + const MojoCreateDataPipeOptions& validated_options,
|
| + size_t consumer_num_bytes,
|
| + MessageInTransitQueue* message_queue,
|
| + ChannelEndpoint* channel_endpoint) {
|
| + if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint(
|
| + validated_options, &consumer_num_bytes, message_queue))
|
| + return nullptr;
|
| +
|
| + // Important: This is called under |IncomingEndpoint|'s (which is a
|
| + // |ChannelEndpointClient|) lock, in particular from
|
| + // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that
|
| + // lock, it will reset its |endpoint_| member, which makes any later or
|
| + // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will
|
| + // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()|
|
| + // is called.
|
| + DataPipe* data_pipe =
|
| + new DataPipe(true, false, validated_options,
|
| + make_scoped_ptr(new RemoteConsumerDataPipeImpl(
|
| + channel_endpoint, consumer_num_bytes)));
|
| + if (channel_endpoint) {
|
| + if (!channel_endpoint->ReplaceClient(data_pipe, 0))
|
| + data_pipe->OnDetachFromChannel(0);
|
| + } else {
|
| + data_pipe->SetConsumerClosed();
|
| + }
|
| + return data_pipe;
|
| +}
|
| +
|
| +// static
|
| +bool DataPipe::ProducerDeserialize(Channel* channel,
|
| + const void* source,
|
| + size_t size,
|
| + scoped_refptr<DataPipe>* data_pipe) {
|
| + DCHECK(!*data_pipe); // Not technically wrong, but unlikely.
|
| +
|
| + bool consumer_open = false;
|
| + if (size == sizeof(SerializedDataPipeProducerDispatcher)) {
|
| + consumer_open = false;
|
| + } else if (size ==
|
| + sizeof(SerializedDataPipeProducerDispatcher) +
|
| + channel->GetSerializedEndpointSize()) {
|
| + consumer_open = true;
|
| + } else {
|
| + LOG(ERROR) << "Invalid serialized data pipe producer";
|
| + return false;
|
| + }
|
| +
|
| + const SerializedDataPipeProducerDispatcher* s =
|
| + static_cast<const SerializedDataPipeProducerDispatcher*>(source);
|
| + MojoCreateDataPipeOptions revalidated_options = {};
|
| + if (ValidateCreateOptions(MakeUserPointer(&s->validated_options),
|
| + &revalidated_options) != MOJO_RESULT_OK) {
|
| + LOG(ERROR) << "Invalid serialized data pipe producer (bad options)";
|
| + return false;
|
| + }
|
| +
|
| + if (!consumer_open) {
|
| + if (s->consumer_num_bytes != static_cast<size_t>(-1)) {
|
| + LOG(ERROR)
|
| + << "Invalid serialized data pipe producer (bad consumer_num_bytes)";
|
| + return false;
|
| + }
|
| +
|
| + *data_pipe = new DataPipe(
|
| + true, false, revalidated_options,
|
| + make_scoped_ptr(new RemoteConsumerDataPipeImpl(nullptr, 0)));
|
| + (*data_pipe)->SetConsumerClosed();
|
| +
|
| + return true;
|
| + }
|
| +
|
| + if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes ||
|
| + s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) {
|
| + LOG(ERROR)
|
| + << "Invalid serialized data pipe producer (bad consumer_num_bytes)";
|
| + return false;
|
| + }
|
| +
|
| + const void* endpoint_source = static_cast<const char*>(source) +
|
| + sizeof(SerializedDataPipeProducerDispatcher);
|
| + scoped_refptr<IncomingEndpoint> incoming_endpoint =
|
| + channel->DeserializeEndpoint(endpoint_source);
|
| + if (!incoming_endpoint)
|
| + return false;
|
| +
|
| + *data_pipe = incoming_endpoint->ConvertToDataPipeProducer(
|
| + revalidated_options, s->consumer_num_bytes);
|
| + if (!*data_pipe)
|
| + return false;
|
| +
|
| + return true;
|
| +}
|
| +
|
| +// static
|
| +bool DataPipe::ConsumerDeserialize(Channel* channel,
|
| + const void* source,
|
| + size_t size,
|
| + scoped_refptr<DataPipe>* data_pipe) {
|
| + DCHECK(!*data_pipe); // Not technically wrong, but unlikely.
|
| +
|
| + if (size !=
|
| + sizeof(SerializedDataPipeConsumerDispatcher) +
|
| + channel->GetSerializedEndpointSize()) {
|
| + LOG(ERROR) << "Invalid serialized data pipe consumer";
|
| + return false;
|
| + }
|
| +
|
| + const SerializedDataPipeConsumerDispatcher* s =
|
| + static_cast<const SerializedDataPipeConsumerDispatcher*>(source);
|
| + MojoCreateDataPipeOptions revalidated_options = {};
|
| + if (ValidateCreateOptions(MakeUserPointer(&s->validated_options),
|
| + &revalidated_options) != MOJO_RESULT_OK) {
|
| + LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)";
|
| + return false;
|
| + }
|
| +
|
| + const void* endpoint_source = static_cast<const char*>(source) +
|
| + sizeof(SerializedDataPipeConsumerDispatcher);
|
| + scoped_refptr<IncomingEndpoint> incoming_endpoint =
|
| + channel->DeserializeEndpoint(endpoint_source);
|
| + if (!incoming_endpoint)
|
| + return false;
|
| +
|
| + *data_pipe =
|
| + incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options);
|
| + if (!*data_pipe)
|
| + return false;
|
| +
|
| + return true;
|
| +}
|
| +
|
| void DataPipe::ProducerCancelAllAwakables() {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| @@ -253,8 +422,24 @@ bool DataPipe::ProducerEndSerialize(
|
| embedder::PlatformHandleVector* platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - return impl_->ProducerEndSerialize(channel, destination, actual_size,
|
| - platform_handles);
|
| + // Warning: After |ProducerEndSerialize()|, quite probably |impl_| has
|
| + // changed.
|
| + bool rv = impl_->ProducerEndSerialize(channel, destination, actual_size,
|
| + platform_handles);
|
| +
|
| + // TODO(vtl): The code below is similar to, but not quite the same as,
|
| + // |ProducerCloseNoLock()|.
|
| + DCHECK(has_local_producer_no_lock());
|
| + producer_awakable_list_->CancelAll();
|
| + producer_awakable_list_.reset();
|
| + // Not a bug, except possibly in "user" code.
|
| + DVLOG_IF(2, producer_in_two_phase_write_no_lock())
|
| + << "Producer transferred with active two-phase write";
|
| + producer_two_phase_max_num_bytes_written_ = 0;
|
| + if (!has_local_consumer_no_lock())
|
| + producer_open_ = false;
|
| +
|
| + return rv;
|
| }
|
|
|
| bool DataPipe::ProducerIsBusy() const {
|
| @@ -453,8 +638,23 @@ bool DataPipe::ConsumerEndSerialize(
|
| embedder::PlatformHandleVector* platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - return impl_->ConsumerEndSerialize(channel, destination, actual_size,
|
| - platform_handles);
|
| + // Warning: After |ConsumerEndSerialize()|, quite probably |impl_| has
|
| + // changed.
|
| + bool rv = impl_->ConsumerEndSerialize(channel, destination, actual_size,
|
| + platform_handles);
|
| +
|
| + // TODO(vtl): The code below is similar to, but not quite the same as,
|
| + // |ConsumerCloseNoLock()|.
|
| + consumer_awakable_list_->CancelAll();
|
| + consumer_awakable_list_.reset();
|
| + // Not a bug, except possibly in "user" code.
|
| + DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
|
| + << "Consumer transferred with active two-phase read";
|
| + consumer_two_phase_max_num_bytes_read_ = 0;
|
| + if (!has_local_producer_no_lock())
|
| + consumer_open_ = false;
|
| +
|
| + return rv;
|
| }
|
|
|
| bool DataPipe::ConsumerIsBusy() const {
|
| @@ -480,7 +680,7 @@ DataPipe::DataPipe(bool has_local_producer,
|
|
|
| #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
|
| // Check that the passed in options actually are validated.
|
| - MojoCreateDataPipeOptions unused = {0};
|
| + MojoCreateDataPipeOptions unused = {};
|
| DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
|
| MOJO_RESULT_OK);
|
| #endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
|
| @@ -493,34 +693,106 @@ DataPipe::~DataPipe() {
|
| DCHECK(!consumer_awakable_list_);
|
| }
|
|
|
| +scoped_ptr<DataPipeImpl> DataPipe::ReplaceImplNoLock(
|
| + scoped_ptr<DataPipeImpl> new_impl) {
|
| + lock_.AssertAcquired();
|
| + DCHECK(new_impl);
|
| +
|
| + impl_->set_owner(nullptr);
|
| + scoped_ptr<DataPipeImpl> rv(impl_.Pass());
|
| + impl_ = new_impl.Pass();
|
| + impl_->set_owner(this);
|
| + return rv.Pass();
|
| +}
|
| +
|
| +void DataPipe::SetProducerClosedNoLock() {
|
| + lock_.AssertAcquired();
|
| + DCHECK(!has_local_producer_no_lock());
|
| + DCHECK(producer_open_);
|
| + producer_open_ = false;
|
| +}
|
| +
|
| +void DataPipe::SetConsumerClosedNoLock() {
|
| + lock_.AssertAcquired();
|
| + DCHECK(!has_local_consumer_no_lock());
|
| + DCHECK(consumer_open_);
|
| + consumer_open_ = false;
|
| +}
|
| +
|
| void DataPipe::ProducerCloseNoLock() {
|
| lock_.AssertAcquired();
|
| DCHECK(producer_open_);
|
| producer_open_ = false;
|
| - DCHECK(has_local_producer_no_lock());
|
| - producer_awakable_list_.reset();
|
| - // Not a bug, except possibly in "user" code.
|
| - DVLOG_IF(2, producer_in_two_phase_write_no_lock())
|
| - << "Producer closed with active two-phase write";
|
| - producer_two_phase_max_num_bytes_written_ = 0;
|
| - impl_->ProducerClose();
|
| - AwakeConsumerAwakablesForStateChangeNoLock(
|
| - impl_->ConsumerGetHandleSignalsState());
|
| + if (has_local_producer_no_lock()) {
|
| + producer_awakable_list_.reset();
|
| + // Not a bug, except possibly in "user" code.
|
| + DVLOG_IF(2, producer_in_two_phase_write_no_lock())
|
| + << "Producer closed with active two-phase write";
|
| + producer_two_phase_max_num_bytes_written_ = 0;
|
| + impl_->ProducerClose();
|
| + AwakeConsumerAwakablesForStateChangeNoLock(
|
| + impl_->ConsumerGetHandleSignalsState());
|
| + }
|
| }
|
|
|
| void DataPipe::ConsumerCloseNoLock() {
|
| lock_.AssertAcquired();
|
| DCHECK(consumer_open_);
|
| consumer_open_ = false;
|
| - DCHECK(has_local_consumer_no_lock());
|
| - consumer_awakable_list_.reset();
|
| - // Not a bug, except possibly in "user" code.
|
| - DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
|
| - << "Consumer closed with active two-phase read";
|
| - consumer_two_phase_max_num_bytes_read_ = 0;
|
| - impl_->ConsumerClose();
|
| - AwakeProducerAwakablesForStateChangeNoLock(
|
| - impl_->ProducerGetHandleSignalsState());
|
| + if (has_local_consumer_no_lock()) {
|
| + consumer_awakable_list_.reset();
|
| + // Not a bug, except possibly in "user" code.
|
| + DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
|
| + << "Consumer closed with active two-phase read";
|
| + consumer_two_phase_max_num_bytes_read_ = 0;
|
| + impl_->ConsumerClose();
|
| + AwakeProducerAwakablesForStateChangeNoLock(
|
| + impl_->ProducerGetHandleSignalsState());
|
| + }
|
| +}
|
| +
|
| +bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) {
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock());
|
| +
|
| + HandleSignalsState old_producer_state =
|
| + impl_->ProducerGetHandleSignalsState();
|
| + HandleSignalsState old_consumer_state =
|
| + impl_->ConsumerGetHandleSignalsState();
|
| +
|
| + bool rv = impl_->OnReadMessage(port, message);
|
| +
|
| + HandleSignalsState new_producer_state =
|
| + impl_->ProducerGetHandleSignalsState();
|
| + if (!new_producer_state.equals(old_producer_state))
|
| + AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| + HandleSignalsState new_consumer_state =
|
| + impl_->ConsumerGetHandleSignalsState();
|
| + if (!new_consumer_state.equals(old_consumer_state))
|
| + AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| +
|
| + return rv;
|
| +}
|
| +
|
| +void DataPipe::OnDetachFromChannel(unsigned port) {
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(!has_local_producer_no_lock() || !has_local_consumer_no_lock());
|
| +
|
| + HandleSignalsState old_producer_state =
|
| + impl_->ProducerGetHandleSignalsState();
|
| + HandleSignalsState old_consumer_state =
|
| + impl_->ConsumerGetHandleSignalsState();
|
| +
|
| + impl_->OnDetachFromChannel(port);
|
| +
|
| + HandleSignalsState new_producer_state =
|
| + impl_->ProducerGetHandleSignalsState();
|
| + if (!new_producer_state.equals(old_producer_state))
|
| + AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| + HandleSignalsState new_consumer_state =
|
| + impl_->ConsumerGetHandleSignalsState();
|
| + if (!new_consumer_state.equals(old_consumer_state))
|
| + AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| }
|
|
|
| void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
|
| @@ -539,5 +811,15 @@ void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock(
|
| consumer_awakable_list_->AwakeForStateChange(new_consumer_state);
|
| }
|
|
|
| +void DataPipe::SetProducerClosed() {
|
| + base::AutoLock locker(lock_);
|
| + SetProducerClosedNoLock();
|
| +}
|
| +
|
| +void DataPipe::SetConsumerClosed() {
|
| + base::AutoLock locker(lock_);
|
| + SetConsumerClosedNoLock();
|
| +}
|
| +
|
| } // namespace system
|
| } // namespace mojo
|
|
|