| Index: mojo/edk/system/data_pipe.cc
|
| diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
|
| index aede49280b922bb2d5e689ecdec5c0f98050f3de..2431754eb92b4895cf95b0fcbf1a3b8b5bd86c50 100644
|
| --- a/mojo/edk/system/data_pipe.cc
|
| +++ b/mojo/edk/system/data_pipe.cc
|
| @@ -12,6 +12,8 @@
|
| #include "base/logging.h"
|
| #include "mojo/edk/system/awakable_list.h"
|
| #include "mojo/edk/system/configuration.h"
|
| +#include "mojo/edk/system/data_pipe_impl.h"
|
| +#include "mojo/edk/system/local_data_pipe_impl.h"
|
| #include "mojo/edk/system/memory.h"
|
| #include "mojo/edk/system/options_validation.h"
|
|
|
| @@ -83,6 +85,13 @@ MojoResult DataPipe::ValidateCreateOptions(
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| +// static
|
| +DataPipe* DataPipe::CreateLocal(
|
| + const MojoCreateDataPipeOptions& validated_options) {
|
| + return new DataPipe(true, true, validated_options,
|
| + make_scoped_ptr(new LocalDataPipeImpl()));
|
| +}
|
| +
|
| void DataPipe::ProducerCancelAllAwakables() {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| @@ -116,11 +125,11 @@ MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
|
| uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
|
|
|
| HandleSignalsState old_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| - MojoResult rv = ProducerWriteDataImplNoLock(
|
| + impl_->ConsumerGetHandleSignalsState();
|
| + MojoResult rv = impl_->ProducerWriteData(
|
| elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
|
| HandleSignalsState new_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| if (!new_consumer_state.equals(old_consumer_state))
|
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| return rv;
|
| @@ -145,8 +154,8 @@ MojoResult DataPipe::ProducerBeginWriteData(
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes,
|
| - min_num_bytes_to_write);
|
| + MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes,
|
| + min_num_bytes_to_write);
|
| if (rv != MOJO_RESULT_OK)
|
| return rv;
|
| // Note: No need to awake producer awakables, even though we're going from
|
| @@ -167,25 +176,25 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| // consumer has been closed.
|
|
|
| HandleSignalsState old_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| MojoResult rv;
|
| if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
|
| num_bytes_written % element_num_bytes_ != 0) {
|
| rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| producer_two_phase_max_num_bytes_written_ = 0;
|
| } else {
|
| - rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
|
| + rv = impl_->ProducerEndWriteData(num_bytes_written);
|
| }
|
| // Two-phase write ended even on failure.
|
| DCHECK(!producer_in_two_phase_write_no_lock());
|
| // If we're now writable, we *became* writable (since we weren't writable
|
| // during the two-phase write), so awake producer awakables.
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| HandleSignalsState new_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| if (!new_consumer_state.equals(old_consumer_state))
|
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| return rv;
|
| @@ -194,7 +203,7 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - return ProducerGetHandleSignalsStateImplNoLock();
|
| + return impl_->ProducerGetHandleSignalsState();
|
| }
|
|
|
| MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable,
|
| @@ -204,7 +213,7 @@ MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable,
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
|
|
| - HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState();
|
| if (producer_state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = producer_state;
|
| @@ -226,7 +235,7 @@ void DataPipe::ProducerRemoveAwakable(Awakable* awakable,
|
| DCHECK(has_local_producer_no_lock());
|
| producer_awakable_list_->Remove(awakable);
|
| if (signals_state)
|
| - *signals_state = ProducerGetHandleSignalsStateImplNoLock();
|
| + *signals_state = impl_->ProducerGetHandleSignalsState();
|
| }
|
|
|
| void DataPipe::ProducerStartSerialize(Channel* channel,
|
| @@ -234,7 +243,7 @@ void DataPipe::ProducerStartSerialize(Channel* channel,
|
| size_t* max_platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles);
|
| + impl_->ProducerStartSerialize(channel, max_size, max_platform_handles);
|
| }
|
|
|
| bool DataPipe::ProducerEndSerialize(
|
| @@ -244,8 +253,8 @@ bool DataPipe::ProducerEndSerialize(
|
| embedder::PlatformHandleVector* platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_producer_no_lock());
|
| - return ProducerEndSerializeImplNoLock(channel, destination, actual_size,
|
| - platform_handles);
|
| + return impl_->ProducerEndSerialize(channel, destination, actual_size,
|
| + platform_handles);
|
| }
|
|
|
| bool DataPipe::ProducerIsBusy() const {
|
| @@ -284,11 +293,11 @@ MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
|
| uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
|
|
|
| HandleSignalsState old_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| - MojoResult rv = ConsumerReadDataImplNoLock(
|
| + impl_->ProducerGetHandleSignalsState();
|
| + MojoResult rv = impl_->ConsumerReadData(
|
| elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek);
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (!new_producer_state.equals(old_producer_state))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| return rv;
|
| @@ -313,11 +322,11 @@ MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
|
| all_or_none ? max_num_bytes_to_discard : 0;
|
|
|
| HandleSignalsState old_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| - MojoResult rv = ConsumerDiscardDataImplNoLock(
|
| + impl_->ProducerGetHandleSignalsState();
|
| + MojoResult rv = impl_->ConsumerDiscardData(
|
| num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (!new_producer_state.equals(old_producer_state))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| return rv;
|
| @@ -331,7 +340,7 @@ MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
|
| return MOJO_RESULT_BUSY;
|
|
|
| // Note: Don't need to validate |*num_bytes| for query.
|
| - return ConsumerQueryDataImplNoLock(num_bytes);
|
| + return impl_->ConsumerQueryData(num_bytes);
|
| }
|
|
|
| MojoResult DataPipe::ConsumerBeginReadData(
|
| @@ -351,8 +360,8 @@ MojoResult DataPipe::ConsumerBeginReadData(
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes,
|
| - min_num_bytes_to_read);
|
| + MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes,
|
| + min_num_bytes_to_read);
|
| if (rv != MOJO_RESULT_OK)
|
| return rv;
|
| DCHECK(consumer_in_two_phase_read_no_lock());
|
| @@ -367,25 +376,25 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| HandleSignalsState old_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| MojoResult rv;
|
| if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
|
| num_bytes_read % element_num_bytes_ != 0) {
|
| rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| consumer_two_phase_max_num_bytes_read_ = 0;
|
| } else {
|
| - rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
|
| + rv = impl_->ConsumerEndReadData(num_bytes_read);
|
| }
|
| // Two-phase read ended even on failure.
|
| DCHECK(!consumer_in_two_phase_read_no_lock());
|
| // If we're now readable, we *became* readable (since we weren't readable
|
| // during the two-phase read), so awake consumer awakables.
|
| HandleSignalsState new_consumer_state =
|
| - ConsumerGetHandleSignalsStateImplNoLock();
|
| + impl_->ConsumerGetHandleSignalsState();
|
| if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
|
| AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| HandleSignalsState new_producer_state =
|
| - ProducerGetHandleSignalsStateImplNoLock();
|
| + impl_->ProducerGetHandleSignalsState();
|
| if (!new_producer_state.equals(old_producer_state))
|
| AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| return rv;
|
| @@ -394,7 +403,7 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - return ConsumerGetHandleSignalsStateImplNoLock();
|
| + return impl_->ConsumerGetHandleSignalsState();
|
| }
|
|
|
| MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable,
|
| @@ -404,7 +413,7 @@ MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable,
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
|
|
| - HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
|
| + HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState();
|
| if (consumer_state.satisfies(signals)) {
|
| if (signals_state)
|
| *signals_state = consumer_state;
|
| @@ -426,7 +435,7 @@ void DataPipe::ConsumerRemoveAwakable(Awakable* awakable,
|
| DCHECK(has_local_consumer_no_lock());
|
| consumer_awakable_list_->Remove(awakable);
|
| if (signals_state)
|
| - *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
|
| + *signals_state = impl_->ConsumerGetHandleSignalsState();
|
| }
|
|
|
| void DataPipe::ConsumerStartSerialize(Channel* channel,
|
| @@ -434,7 +443,7 @@ void DataPipe::ConsumerStartSerialize(Channel* channel,
|
| size_t* max_platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles);
|
| + impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles);
|
| }
|
|
|
| bool DataPipe::ConsumerEndSerialize(
|
| @@ -444,8 +453,8 @@ bool DataPipe::ConsumerEndSerialize(
|
| embedder::PlatformHandleVector* platform_handles) {
|
| base::AutoLock locker(lock_);
|
| DCHECK(has_local_consumer_no_lock());
|
| - return ConsumerEndSerializeImplNoLock(channel, destination, actual_size,
|
| - platform_handles);
|
| + return impl_->ConsumerEndSerialize(channel, destination, actual_size,
|
| + platform_handles);
|
| }
|
|
|
| bool DataPipe::ConsumerIsBusy() const {
|
| @@ -455,7 +464,8 @@ bool DataPipe::ConsumerIsBusy() const {
|
|
|
| DataPipe::DataPipe(bool has_local_producer,
|
| bool has_local_consumer,
|
| - const MojoCreateDataPipeOptions& validated_options)
|
| + const MojoCreateDataPipeOptions& validated_options,
|
| + scoped_ptr<DataPipeImpl> impl)
|
| : may_discard_((validated_options.flags &
|
| MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
|
| element_num_bytes_(validated_options.element_num_bytes),
|
| @@ -467,11 +477,16 @@ DataPipe::DataPipe(bool has_local_producer,
|
| consumer_awakable_list_(has_local_consumer ? new AwakableList()
|
| : nullptr),
|
| producer_two_phase_max_num_bytes_written_(0),
|
| - consumer_two_phase_max_num_bytes_read_(0) {
|
| + consumer_two_phase_max_num_bytes_read_(0),
|
| + impl_(impl.Pass()) {
|
| + impl_->set_owner(this);
|
| +
|
| +#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
|
| // Check that the passed in options actually are validated.
|
| MojoCreateDataPipeOptions unused = {0};
|
| DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
|
| MOJO_RESULT_OK);
|
| +#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
|
| }
|
|
|
| DataPipe::~DataPipe() {
|
| @@ -491,9 +506,9 @@ void DataPipe::ProducerCloseNoLock() {
|
| DVLOG_IF(2, producer_in_two_phase_write_no_lock())
|
| << "Producer closed with active two-phase write";
|
| producer_two_phase_max_num_bytes_written_ = 0;
|
| - ProducerCloseImplNoLock();
|
| + impl_->ProducerClose();
|
| AwakeConsumerAwakablesForStateChangeNoLock(
|
| - ConsumerGetHandleSignalsStateImplNoLock());
|
| + impl_->ConsumerGetHandleSignalsState());
|
| }
|
|
|
| void DataPipe::ConsumerCloseNoLock() {
|
| @@ -506,9 +521,9 @@ void DataPipe::ConsumerCloseNoLock() {
|
| DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
|
| << "Consumer closed with active two-phase read";
|
| consumer_two_phase_max_num_bytes_read_ = 0;
|
| - ConsumerCloseImplNoLock();
|
| + impl_->ConsumerClose();
|
| AwakeProducerAwakablesForStateChangeNoLock(
|
| - ProducerGetHandleSignalsStateImplNoLock());
|
| + impl_->ProducerGetHandleSignalsState());
|
| }
|
|
|
| void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
|
|
|