| Index: mojo/edk/system/data_pipe.cc
|
| diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
|
| index aede49280b922bb2d5e689ecdec5c0f98050f3de..e5ef3f85d434ca054478096e381efbd728053584 100644
|
| --- a/mojo/edk/system/data_pipe.cc
|
| +++ b/mojo/edk/system/data_pipe.cc
|
| @@ -12,12 +12,40 @@
|
| #include "base/logging.h"
|
| #include "mojo/edk/system/awakable_list.h"
|
| #include "mojo/edk/system/configuration.h"
|
| +#include "mojo/edk/system/data_pipe_impl.h"
|
| #include "mojo/edk/system/memory.h"
|
| #include "mojo/edk/system/options_validation.h"
|
|
|
| namespace mojo {
|
| namespace system {
|
|
|
| +DataPipe::DataPipe(bool has_local_producer,
|
| + bool has_local_consumer,
|
| + const MojoCreateDataPipeOptions& validated_options,
|
| + scoped_ptr<DataPipeImpl> impl)
|
| + : may_discard_((validated_options.flags &
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
|
| + element_num_bytes_(validated_options.element_num_bytes),
|
| + capacity_num_bytes_(validated_options.capacity_num_bytes),
|
| + producer_open_(true),
|
| + consumer_open_(true),
|
| + producer_awakable_list_(has_local_producer ? new AwakableList()
|
| + : nullptr),
|
| + consumer_awakable_list_(has_local_consumer ? new AwakableList()
|
| + : nullptr),
|
| + producer_two_phase_max_num_bytes_written_(0),
|
| + consumer_two_phase_max_num_bytes_read_(0),
|
| + impl_(impl.Pass()) {
|
| + impl_->set_owner(this);
|
| +
|
| +#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
|
| + // Check that the passed in options actually are validated.
|
| + MojoCreateDataPipeOptions unused = {0};
|
| + DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
|
| + MOJO_RESULT_OK);
|
| +#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
|
| +}
|
| +
|
| // static
|
| MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() {
|
| MojoCreateDataPipeOptions result = {
|
| @@ -116,11 +144,11 @@ MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
|
| uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
|
|
|
| HandleSignalsState old_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| - MojoResult rv = ProducerWriteDataImplNoLock(
|
| + impl_->ConsumerGetHandleSignalsState();
|
| + MojoResult rv = impl_->ProducerWriteData(
|
| elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
|
| HandleSignalsState new_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| if (!new_consumer_state.equals(old_consumer_state))
|
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| return rv;
|
| @@ -145,8 +173,8 @@ MojoResult DataPipe::ProducerBeginWriteData(
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes,
|
| - min_num_bytes_to_write);
|
| + MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes,
|
| + min_num_bytes_to_write);
|
| if (rv != MOJO_RESULT_OK)
|
| return rv;
|
| // Note: No need to awake producer awakables, even though we're going from
|
| @@ -167,25 +195,25 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| // consumer has been closed.
|
|
|
| HandleSignalsState old_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| MojoResult rv;
|
| if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
|
| num_bytes_written % element_num_bytes_ != 0) {
|
| rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| producer_two_phase_max_num_bytes_written_ = 0;
|
| } else {
|
| - rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
|
| + rv = impl_->ProducerEndWriteData(num_bytes_written);
|
| }
|
| // Two-phase write ended even on failure.
|
| DCHECK(!producer_in_two_phase_write_no_lock());
|
| // If we're now writable, we *became* writable (since we weren't writable
|
| // during the two-phase write), so awake producer awakables.
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| HandleSignalsState new_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| if (!new_consumer_state.equals(old_consumer_state))
|
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| return rv;
|
| @@ -194,7 +222,7 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - return ProducerGetHandleSignalsStateImplNoLock();
|
| + return impl_->ProducerGetHandleSignalsState();
|
| }
|
|
|
| MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable,
|
| @@ -204,7 +232,7 @@ MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable,
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
|
|
| - HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState();
|
| if (producer_state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = producer_state;
|
| @@ -226,7 +254,7 @@ void DataPipe::ProducerRemoveAwakable(Awakable* awakable,
|
| DCHECK(has_local_producer_no_lock());
|
| producer_awakable_list_->Remove(awakable);
|
| if (signals_state)
|
| - *signals_state = ProducerGetHandleSignalsStateImplNoLock();
|
| + *signals_state = impl_->ProducerGetHandleSignalsState();
|
| }
|
|
|
| void DataPipe::ProducerStartSerialize(Channel* channel,
|
| @@ -234,7 +262,7 @@ void DataPipe::ProducerStartSerialize(Channel* channel,
|
| size_t* max_platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles);
|
| + impl_->ProducerStartSerialize(channel, max_size, max_platform_handles);
|
| }
|
|
|
| bool DataPipe::ProducerEndSerialize(
|
| @@ -244,8 +272,8 @@ bool DataPipe::ProducerEndSerialize(
|
| embedder::PlatformHandleVector* platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - return ProducerEndSerializeImplNoLock(channel, destination, actual_size,
|
| - platform_handles);
|
| + return impl_->ProducerEndSerialize(channel, destination, actual_size,
|
| + platform_handles);
|
| }
|
|
|
| bool DataPipe::ProducerIsBusy() const {
|
| @@ -284,11 +312,11 @@ MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
|
| uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
|
|
|
| HandleSignalsState old_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| - MojoResult rv = ConsumerReadDataImplNoLock(
|
| + impl_->ProducerGetHandleSignalsState();
|
| + MojoResult rv = impl_->ConsumerReadData(
|
| elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek);
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (!new_producer_state.equals(old_producer_state))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| return rv;
|
| @@ -313,11 +341,11 @@ MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
|
| all_or_none ? max_num_bytes_to_discard : 0;
|
|
|
| HandleSignalsState old_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| - MojoResult rv = ConsumerDiscardDataImplNoLock(
|
| + impl_->ProducerGetHandleSignalsState();
|
| + MojoResult rv = impl_->ConsumerDiscardData(
|
| num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (!new_producer_state.equals(old_producer_state))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| return rv;
|
| @@ -331,7 +359,7 @@ MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
|
| return MOJO_RESULT_BUSY;
|
|
|
| // Note: Don't need to validate |*num_bytes| for query.
|
| - return ConsumerQueryDataImplNoLock(num_bytes);
|
| + return impl_->ConsumerQueryData(num_bytes);
|
| }
|
|
|
| MojoResult DataPipe::ConsumerBeginReadData(
|
| @@ -351,8 +379,8 @@ MojoResult DataPipe::ConsumerBeginReadData(
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes,
|
| - min_num_bytes_to_read);
|
| + MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes,
|
| + min_num_bytes_to_read);
|
| if (rv != MOJO_RESULT_OK)
|
| return rv;
|
| DCHECK(consumer_in_two_phase_read_no_lock());
|
| @@ -367,25 +395,25 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| HandleSignalsState old_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| MojoResult rv;
|
| if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
|
| num_bytes_read % element_num_bytes_ != 0) {
|
| rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| consumer_two_phase_max_num_bytes_read_ = 0;
|
| } else {
|
| - rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
|
| + rv = impl_->ConsumerEndReadData(num_bytes_read);
|
| }
|
| // Two-phase read ended even on failure.
|
| DCHECK(!consumer_in_two_phase_read_no_lock());
|
| // If we're now readable, we *became* readable (since we weren't readable
|
| // during the two-phase read), so awake consumer awakables.
|
| HandleSignalsState new_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
|
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (!new_producer_state.equals(old_producer_state))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| return rv;
|
| @@ -394,7 +422,7 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - return ConsumerGetHandleSignalsStateImplNoLock();
|
| + return impl_->ConsumerGetHandleSignalsState();
|
| }
|
|
|
| MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable,
|
| @@ -404,7 +432,7 @@ MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable,
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
|
|
| - HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState();
|
| if (consumer_state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = consumer_state;
|
| @@ -426,7 +454,7 @@ void DataPipe::ConsumerRemoveAwakable(Awakable* awakable,
|
| DCHECK(has_local_consumer_no_lock());
|
| consumer_awakable_list_->Remove(awakable);
|
| if (signals_state)
|
| - *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
|
| + *signals_state = impl_->ConsumerGetHandleSignalsState();
|
| }
|
|
|
| void DataPipe::ConsumerStartSerialize(Channel* channel,
|
| @@ -434,7 +462,7 @@ void DataPipe::ConsumerStartSerialize(Channel* channel,
|
| size_t* max_platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles);
|
| + impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles);
|
| }
|
|
|
| bool DataPipe::ConsumerEndSerialize(
|
| @@ -444,8 +472,8 @@ bool DataPipe::ConsumerEndSerialize(
|
| embedder::PlatformHandleVector* platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - return ConsumerEndSerializeImplNoLock(channel, destination, actual_size,
|
| - platform_handles);
|
| + return impl_->ConsumerEndSerialize(channel, destination, actual_size,
|
| + platform_handles);
|
| }
|
|
|
| bool DataPipe::ConsumerIsBusy() const {
|
| @@ -453,27 +481,6 @@ bool DataPipe::ConsumerIsBusy() const {
|
| return consumer_in_two_phase_read_no_lock();
|
| }
|
|
|
| -DataPipe::DataPipe(bool has_local_producer,
|
| - bool has_local_consumer,
|
| - const MojoCreateDataPipeOptions& validated_options)
|
| - : may_discard_((validated_options.flags &
|
| - MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
|
| - element_num_bytes_(validated_options.element_num_bytes),
|
| - capacity_num_bytes_(validated_options.capacity_num_bytes),
|
| - producer_open_(true),
|
| - consumer_open_(true),
|
| - producer_awakable_list_(has_local_producer ? new AwakableList()
|
| - : nullptr),
|
| - consumer_awakable_list_(has_local_consumer ? new AwakableList()
|
| - : nullptr),
|
| - producer_two_phase_max_num_bytes_written_(0),
|
| - consumer_two_phase_max_num_bytes_read_(0) {
|
| - // Check that the passed in options actually are validated.
|
| - MojoCreateDataPipeOptions unused = {0};
|
| - DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
|
| - MOJO_RESULT_OK);
|
| -}
|
| -
|
| DataPipe::~DataPipe() {
|
| DCHECK(!producer_open_);
|
| DCHECK(!consumer_open_);
|
| @@ -491,9 +498,9 @@ void DataPipe::ProducerCloseNoLock() {
|
| DVLOG_IF(2, producer_in_two_phase_write_no_lock())
|
| << "Producer closed with active two-phase write";
|
| producer_two_phase_max_num_bytes_written_ = 0;
|
| - ProducerCloseImplNoLock();
|
| + impl_->ProducerClose();
|
| AwakeConsumerAwakablesForStateChangeNoLock(
|
| - ConsumerGetHandleSignalsStateImplNoLock());
|
| + impl_->ConsumerGetHandleSignalsState());
|
| }
|
|
|
| void DataPipe::ConsumerCloseNoLock() {
|
| @@ -506,9 +513,9 @@ void DataPipe::ConsumerCloseNoLock() {
|
| DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
|
| << "Consumer closed with active two-phase read";
|
| consumer_two_phase_max_num_bytes_read_ = 0;
|
| - ConsumerCloseImplNoLock();
|
| + impl_->ConsumerClose();
|
| AwakeProducerAwakablesForStateChangeNoLock(
|
| - ProducerGetHandleSignalsStateImplNoLock());
|
| + impl_->ProducerGetHandleSignalsState());
|
| }
|
|
|
| void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
|
|
|