Index: mojo/edk/system/data_pipe.cc |
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc |
index aede49280b922bb2d5e689ecdec5c0f98050f3de..2431754eb92b4895cf95b0fcbf1a3b8b5bd86c50 100644 |
--- a/mojo/edk/system/data_pipe.cc |
+++ b/mojo/edk/system/data_pipe.cc |
@@ -12,6 +12,8 @@ |
#include "base/logging.h" |
#include "mojo/edk/system/awakable_list.h" |
#include "mojo/edk/system/configuration.h" |
+#include "mojo/edk/system/data_pipe_impl.h" |
+#include "mojo/edk/system/local_data_pipe_impl.h" |
#include "mojo/edk/system/memory.h" |
#include "mojo/edk/system/options_validation.h" |
@@ -83,6 +85,13 @@ MojoResult DataPipe::ValidateCreateOptions( |
return MOJO_RESULT_OK; |
} |
+// static |
+DataPipe* DataPipe::CreateLocal( |
+ const MojoCreateDataPipeOptions& validated_options) { |
+ return new DataPipe(true, true, validated_options, |
+ make_scoped_ptr(new LocalDataPipeImpl())); |
+} |
+ |
void DataPipe::ProducerCancelAllAwakables() { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
@@ -116,11 +125,11 @@ MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements, |
uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; |
HandleSignalsState old_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
- MojoResult rv = ProducerWriteDataImplNoLock( |
+ impl_->ConsumerGetHandleSignalsState(); |
+ MojoResult rv = impl_->ProducerWriteData( |
elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); |
HandleSignalsState new_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
if (!new_consumer_state.equals(old_consumer_state)) |
AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
return rv; |
@@ -145,8 +154,8 @@ MojoResult DataPipe::ProducerBeginWriteData( |
return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, |
- min_num_bytes_to_write); |
+ MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes, |
+ min_num_bytes_to_write); |
if (rv != MOJO_RESULT_OK) |
return rv; |
// Note: No need to awake producer awakables, even though we're going from |
@@ -167,25 +176,25 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
// consumer has been closed. |
HandleSignalsState old_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
MojoResult rv; |
if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || |
num_bytes_written % element_num_bytes_ != 0) { |
rv = MOJO_RESULT_INVALID_ARGUMENT; |
producer_two_phase_max_num_bytes_written_ = 0; |
} else { |
- rv = ProducerEndWriteDataImplNoLock(num_bytes_written); |
+ rv = impl_->ProducerEndWriteData(num_bytes_written); |
} |
// Two-phase write ended even on failure. |
DCHECK(!producer_in_two_phase_write_no_lock()); |
// If we're now writable, we *became* writable (since we weren't writable |
// during the two-phase write), so awake producer awakables. |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
HandleSignalsState new_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
if (!new_consumer_state.equals(old_consumer_state)) |
AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
return rv; |
@@ -194,7 +203,7 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- return ProducerGetHandleSignalsStateImplNoLock(); |
+ return impl_->ProducerGetHandleSignalsState(); |
} |
MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
@@ -204,7 +213,7 @@ MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState(); |
if (producer_state.satisfies(signals)) { |
if (signals_state) |
*signals_state = producer_state; |
@@ -226,7 +235,7 @@ void DataPipe::ProducerRemoveAwakable(Awakable* awakable, |
DCHECK(has_local_producer_no_lock()); |
producer_awakable_list_->Remove(awakable); |
if (signals_state) |
- *signals_state = ProducerGetHandleSignalsStateImplNoLock(); |
+ *signals_state = impl_->ProducerGetHandleSignalsState(); |
} |
void DataPipe::ProducerStartSerialize(Channel* channel, |
@@ -234,7 +243,7 @@ void DataPipe::ProducerStartSerialize(Channel* channel, |
size_t* max_platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles); |
+ impl_->ProducerStartSerialize(channel, max_size, max_platform_handles); |
} |
bool DataPipe::ProducerEndSerialize( |
@@ -244,8 +253,8 @@ bool DataPipe::ProducerEndSerialize( |
embedder::PlatformHandleVector* platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- return ProducerEndSerializeImplNoLock(channel, destination, actual_size, |
- platform_handles); |
+ return impl_->ProducerEndSerialize(channel, destination, actual_size, |
+ platform_handles); |
} |
bool DataPipe::ProducerIsBusy() const { |
@@ -284,11 +293,11 @@ MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements, |
uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
HandleSignalsState old_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
- MojoResult rv = ConsumerReadDataImplNoLock( |
+ impl_->ProducerGetHandleSignalsState(); |
+ MojoResult rv = impl_->ConsumerReadData( |
elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (!new_producer_state.equals(old_producer_state)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
return rv; |
@@ -313,11 +322,11 @@ MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, |
all_or_none ? max_num_bytes_to_discard : 0; |
HandleSignalsState old_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
- MojoResult rv = ConsumerDiscardDataImplNoLock( |
+ impl_->ProducerGetHandleSignalsState(); |
+ MojoResult rv = impl_->ConsumerDiscardData( |
num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (!new_producer_state.equals(old_producer_state)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
return rv; |
@@ -331,7 +340,7 @@ MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { |
return MOJO_RESULT_BUSY; |
// Note: Don't need to validate |*num_bytes| for query. |
- return ConsumerQueryDataImplNoLock(num_bytes); |
+ return impl_->ConsumerQueryData(num_bytes); |
} |
MojoResult DataPipe::ConsumerBeginReadData( |
@@ -351,8 +360,8 @@ MojoResult DataPipe::ConsumerBeginReadData( |
return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, |
- min_num_bytes_to_read); |
+ MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes, |
+ min_num_bytes_to_read); |
if (rv != MOJO_RESULT_OK) |
return rv; |
DCHECK(consumer_in_two_phase_read_no_lock()); |
@@ -367,25 +376,25 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
return MOJO_RESULT_FAILED_PRECONDITION; |
HandleSignalsState old_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
MojoResult rv; |
if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || |
num_bytes_read % element_num_bytes_ != 0) { |
rv = MOJO_RESULT_INVALID_ARGUMENT; |
consumer_two_phase_max_num_bytes_read_ = 0; |
} else { |
- rv = ConsumerEndReadDataImplNoLock(num_bytes_read); |
+ rv = impl_->ConsumerEndReadData(num_bytes_read); |
} |
// Two-phase read ended even on failure. |
DCHECK(!consumer_in_two_phase_read_no_lock()); |
// If we're now readable, we *became* readable (since we weren't readable |
// during the two-phase read), so awake consumer awakables. |
HandleSignalsState new_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) |
AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (!new_producer_state.equals(old_producer_state)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
return rv; |
@@ -394,7 +403,7 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- return ConsumerGetHandleSignalsStateImplNoLock(); |
+ return impl_->ConsumerGetHandleSignalsState(); |
} |
MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
@@ -404,7 +413,7 @@ MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState(); |
if (consumer_state.satisfies(signals)) { |
if (signals_state) |
*signals_state = consumer_state; |
@@ -426,7 +435,7 @@ void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, |
DCHECK(has_local_consumer_no_lock()); |
consumer_awakable_list_->Remove(awakable); |
if (signals_state) |
- *signals_state = ConsumerGetHandleSignalsStateImplNoLock(); |
+ *signals_state = impl_->ConsumerGetHandleSignalsState(); |
} |
void DataPipe::ConsumerStartSerialize(Channel* channel, |
@@ -434,7 +443,7 @@ void DataPipe::ConsumerStartSerialize(Channel* channel, |
size_t* max_platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles); |
+ impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles); |
} |
bool DataPipe::ConsumerEndSerialize( |
@@ -444,8 +453,8 @@ bool DataPipe::ConsumerEndSerialize( |
embedder::PlatformHandleVector* platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- return ConsumerEndSerializeImplNoLock(channel, destination, actual_size, |
- platform_handles); |
+ return impl_->ConsumerEndSerialize(channel, destination, actual_size, |
+ platform_handles); |
} |
bool DataPipe::ConsumerIsBusy() const { |
@@ -455,7 +464,8 @@ bool DataPipe::ConsumerIsBusy() const { |
DataPipe::DataPipe(bool has_local_producer, |
bool has_local_consumer, |
- const MojoCreateDataPipeOptions& validated_options) |
+ const MojoCreateDataPipeOptions& validated_options, |
+ scoped_ptr<DataPipeImpl> impl) |
: may_discard_((validated_options.flags & |
MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), |
element_num_bytes_(validated_options.element_num_bytes), |
@@ -467,11 +477,16 @@ DataPipe::DataPipe(bool has_local_producer, |
consumer_awakable_list_(has_local_consumer ? new AwakableList() |
: nullptr), |
producer_two_phase_max_num_bytes_written_(0), |
- consumer_two_phase_max_num_bytes_read_(0) { |
+ consumer_two_phase_max_num_bytes_read_(0), |
+ impl_(impl.Pass()) { |
+ impl_->set_owner(this); |
+ |
+#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
// Check that the passed in options actually are validated. |
MojoCreateDataPipeOptions unused = {0}; |
DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
MOJO_RESULT_OK); |
+#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
} |
DataPipe::~DataPipe() { |
@@ -491,9 +506,9 @@ void DataPipe::ProducerCloseNoLock() { |
DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
<< "Producer closed with active two-phase write"; |
producer_two_phase_max_num_bytes_written_ = 0; |
- ProducerCloseImplNoLock(); |
+ impl_->ProducerClose(); |
AwakeConsumerAwakablesForStateChangeNoLock( |
- ConsumerGetHandleSignalsStateImplNoLock()); |
+ impl_->ConsumerGetHandleSignalsState()); |
} |
void DataPipe::ConsumerCloseNoLock() { |
@@ -506,9 +521,9 @@ void DataPipe::ConsumerCloseNoLock() { |
DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
<< "Consumer closed with active two-phase read"; |
consumer_two_phase_max_num_bytes_read_ = 0; |
- ConsumerCloseImplNoLock(); |
+ impl_->ConsumerClose(); |
AwakeProducerAwakablesForStateChangeNoLock( |
- ProducerGetHandleSignalsStateImplNoLock()); |
+ impl_->ProducerGetHandleSignalsState()); |
} |
void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( |