Index: mojo/edk/system/data_pipe.cc |
diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc |
index aede49280b922bb2d5e689ecdec5c0f98050f3de..e5ef3f85d434ca054478096e381efbd728053584 100644 |
--- a/mojo/edk/system/data_pipe.cc |
+++ b/mojo/edk/system/data_pipe.cc |
@@ -12,12 +12,40 @@ |
#include "base/logging.h" |
#include "mojo/edk/system/awakable_list.h" |
#include "mojo/edk/system/configuration.h" |
+#include "mojo/edk/system/data_pipe_impl.h" |
#include "mojo/edk/system/memory.h" |
#include "mojo/edk/system/options_validation.h" |
namespace mojo { |
namespace system { |
+DataPipe::DataPipe(bool has_local_producer, |
+ bool has_local_consumer, |
+ const MojoCreateDataPipeOptions& validated_options, |
+ scoped_ptr<DataPipeImpl> impl) |
+ : may_discard_((validated_options.flags & |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), |
+ element_num_bytes_(validated_options.element_num_bytes), |
+ capacity_num_bytes_(validated_options.capacity_num_bytes), |
+ producer_open_(true), |
+ consumer_open_(true), |
+ producer_awakable_list_(has_local_producer ? new AwakableList() |
+ : nullptr), |
+ consumer_awakable_list_(has_local_consumer ? new AwakableList() |
+ : nullptr), |
+ producer_two_phase_max_num_bytes_written_(0), |
+ consumer_two_phase_max_num_bytes_read_(0), |
+ impl_(impl.Pass()) { |
+ impl_->set_owner(this); |
+ |
+#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
+ // Check that the passed in options actually are validated. |
+ MojoCreateDataPipeOptions unused = {0}; |
+ DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
+ MOJO_RESULT_OK); |
+#endif // !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) |
+} |
+ |
// static |
MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
MojoCreateDataPipeOptions result = { |
@@ -116,11 +144,11 @@ MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements, |
uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; |
HandleSignalsState old_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
- MojoResult rv = ProducerWriteDataImplNoLock( |
+ impl_->ConsumerGetHandleSignalsState(); |
+ MojoResult rv = impl_->ProducerWriteData( |
elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); |
HandleSignalsState new_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
if (!new_consumer_state.equals(old_consumer_state)) |
AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
return rv; |
@@ -145,8 +173,8 @@ MojoResult DataPipe::ProducerBeginWriteData( |
return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, buffer_num_bytes, |
- min_num_bytes_to_write); |
+ MojoResult rv = impl_->ProducerBeginWriteData(buffer, buffer_num_bytes, |
+ min_num_bytes_to_write); |
if (rv != MOJO_RESULT_OK) |
return rv; |
// Note: No need to awake producer awakables, even though we're going from |
@@ -167,25 +195,25 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
// consumer has been closed. |
HandleSignalsState old_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
MojoResult rv; |
if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || |
num_bytes_written % element_num_bytes_ != 0) { |
rv = MOJO_RESULT_INVALID_ARGUMENT; |
producer_two_phase_max_num_bytes_written_ = 0; |
} else { |
- rv = ProducerEndWriteDataImplNoLock(num_bytes_written); |
+ rv = impl_->ProducerEndWriteData(num_bytes_written); |
} |
// Two-phase write ended even on failure. |
DCHECK(!producer_in_two_phase_write_no_lock()); |
// If we're now writable, we *became* writable (since we weren't writable |
// during the two-phase write), so awake producer awakables. |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
HandleSignalsState new_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
if (!new_consumer_state.equals(old_consumer_state)) |
AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
return rv; |
@@ -194,7 +222,7 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { |
HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- return ProducerGetHandleSignalsStateImplNoLock(); |
+ return impl_->ProducerGetHandleSignalsState(); |
} |
MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
@@ -204,7 +232,7 @@ MojoResult DataPipe::ProducerAddAwakable(Awakable* awakable, |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState producer_state = impl_->ProducerGetHandleSignalsState(); |
if (producer_state.satisfies(signals)) { |
if (signals_state) |
*signals_state = producer_state; |
@@ -226,7 +254,7 @@ void DataPipe::ProducerRemoveAwakable(Awakable* awakable, |
DCHECK(has_local_producer_no_lock()); |
producer_awakable_list_->Remove(awakable); |
if (signals_state) |
- *signals_state = ProducerGetHandleSignalsStateImplNoLock(); |
+ *signals_state = impl_->ProducerGetHandleSignalsState(); |
} |
void DataPipe::ProducerStartSerialize(Channel* channel, |
@@ -234,7 +262,7 @@ void DataPipe::ProducerStartSerialize(Channel* channel, |
size_t* max_platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- ProducerStartSerializeImplNoLock(channel, max_size, max_platform_handles); |
+ impl_->ProducerStartSerialize(channel, max_size, max_platform_handles); |
} |
bool DataPipe::ProducerEndSerialize( |
@@ -244,8 +272,8 @@ bool DataPipe::ProducerEndSerialize( |
embedder::PlatformHandleVector* platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_producer_no_lock()); |
- return ProducerEndSerializeImplNoLock(channel, destination, actual_size, |
- platform_handles); |
+ return impl_->ProducerEndSerialize(channel, destination, actual_size, |
+ platform_handles); |
} |
bool DataPipe::ProducerIsBusy() const { |
@@ -284,11 +312,11 @@ MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements, |
uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
HandleSignalsState old_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
- MojoResult rv = ConsumerReadDataImplNoLock( |
+ impl_->ProducerGetHandleSignalsState(); |
+ MojoResult rv = impl_->ConsumerReadData( |
elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek); |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (!new_producer_state.equals(old_producer_state)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
return rv; |
@@ -313,11 +341,11 @@ MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, |
all_or_none ? max_num_bytes_to_discard : 0; |
HandleSignalsState old_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
- MojoResult rv = ConsumerDiscardDataImplNoLock( |
+ impl_->ProducerGetHandleSignalsState(); |
+ MojoResult rv = impl_->ConsumerDiscardData( |
num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (!new_producer_state.equals(old_producer_state)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
return rv; |
@@ -331,7 +359,7 @@ MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { |
return MOJO_RESULT_BUSY; |
// Note: Don't need to validate |*num_bytes| for query. |
- return ConsumerQueryDataImplNoLock(num_bytes); |
+ return impl_->ConsumerQueryData(num_bytes); |
} |
MojoResult DataPipe::ConsumerBeginReadData( |
@@ -351,8 +379,8 @@ MojoResult DataPipe::ConsumerBeginReadData( |
return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, buffer_num_bytes, |
- min_num_bytes_to_read); |
+ MojoResult rv = impl_->ConsumerBeginReadData(buffer, buffer_num_bytes, |
+ min_num_bytes_to_read); |
if (rv != MOJO_RESULT_OK) |
return rv; |
DCHECK(consumer_in_two_phase_read_no_lock()); |
@@ -367,25 +395,25 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
return MOJO_RESULT_FAILED_PRECONDITION; |
HandleSignalsState old_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
MojoResult rv; |
if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || |
num_bytes_read % element_num_bytes_ != 0) { |
rv = MOJO_RESULT_INVALID_ARGUMENT; |
consumer_two_phase_max_num_bytes_read_ = 0; |
} else { |
- rv = ConsumerEndReadDataImplNoLock(num_bytes_read); |
+ rv = impl_->ConsumerEndReadData(num_bytes_read); |
} |
// Two-phase read ended even on failure. |
DCHECK(!consumer_in_two_phase_read_no_lock()); |
// If we're now readable, we *became* readable (since we weren't readable |
// during the two-phase read), so awake consumer awakables. |
HandleSignalsState new_consumer_state = |
- ConsumerGetHandleSignalsStateImplNoLock(); |
+ impl_->ConsumerGetHandleSignalsState(); |
if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) |
AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state); |
HandleSignalsState new_producer_state = |
- ProducerGetHandleSignalsStateImplNoLock(); |
+ impl_->ProducerGetHandleSignalsState(); |
if (!new_producer_state.equals(old_producer_state)) |
AwakeProducerAwakablesForStateChangeNoLock(new_producer_state); |
return rv; |
@@ -394,7 +422,7 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { |
HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- return ConsumerGetHandleSignalsStateImplNoLock(); |
+ return impl_->ConsumerGetHandleSignalsState(); |
} |
MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
@@ -404,7 +432,7 @@ MojoResult DataPipe::ConsumerAddAwakable(Awakable* awakable, |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock(); |
+ HandleSignalsState consumer_state = impl_->ConsumerGetHandleSignalsState(); |
if (consumer_state.satisfies(signals)) { |
if (signals_state) |
*signals_state = consumer_state; |
@@ -426,7 +454,7 @@ void DataPipe::ConsumerRemoveAwakable(Awakable* awakable, |
DCHECK(has_local_consumer_no_lock()); |
consumer_awakable_list_->Remove(awakable); |
if (signals_state) |
- *signals_state = ConsumerGetHandleSignalsStateImplNoLock(); |
+ *signals_state = impl_->ConsumerGetHandleSignalsState(); |
} |
void DataPipe::ConsumerStartSerialize(Channel* channel, |
@@ -434,7 +462,7 @@ void DataPipe::ConsumerStartSerialize(Channel* channel, |
size_t* max_platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- ConsumerStartSerializeImplNoLock(channel, max_size, max_platform_handles); |
+ impl_->ConsumerStartSerialize(channel, max_size, max_platform_handles); |
} |
bool DataPipe::ConsumerEndSerialize( |
@@ -444,8 +472,8 @@ bool DataPipe::ConsumerEndSerialize( |
embedder::PlatformHandleVector* platform_handles) { |
base::AutoLock locker(lock_); |
DCHECK(has_local_consumer_no_lock()); |
- return ConsumerEndSerializeImplNoLock(channel, destination, actual_size, |
- platform_handles); |
+ return impl_->ConsumerEndSerialize(channel, destination, actual_size, |
+ platform_handles); |
} |
bool DataPipe::ConsumerIsBusy() const { |
@@ -453,27 +481,6 @@ bool DataPipe::ConsumerIsBusy() const { |
return consumer_in_two_phase_read_no_lock(); |
} |
-DataPipe::DataPipe(bool has_local_producer, |
- bool has_local_consumer, |
- const MojoCreateDataPipeOptions& validated_options) |
- : may_discard_((validated_options.flags & |
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), |
- element_num_bytes_(validated_options.element_num_bytes), |
- capacity_num_bytes_(validated_options.capacity_num_bytes), |
- producer_open_(true), |
- consumer_open_(true), |
- producer_awakable_list_(has_local_producer ? new AwakableList() |
- : nullptr), |
- consumer_awakable_list_(has_local_consumer ? new AwakableList() |
- : nullptr), |
- producer_two_phase_max_num_bytes_written_(0), |
- consumer_two_phase_max_num_bytes_read_(0) { |
- // Check that the passed in options actually are validated. |
- MojoCreateDataPipeOptions unused = {0}; |
- DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), |
- MOJO_RESULT_OK); |
-} |
- |
DataPipe::~DataPipe() { |
DCHECK(!producer_open_); |
DCHECK(!consumer_open_); |
@@ -491,9 +498,9 @@ void DataPipe::ProducerCloseNoLock() { |
DVLOG_IF(2, producer_in_two_phase_write_no_lock()) |
<< "Producer closed with active two-phase write"; |
producer_two_phase_max_num_bytes_written_ = 0; |
- ProducerCloseImplNoLock(); |
+ impl_->ProducerClose(); |
AwakeConsumerAwakablesForStateChangeNoLock( |
- ConsumerGetHandleSignalsStateImplNoLock()); |
+ impl_->ConsumerGetHandleSignalsState()); |
} |
void DataPipe::ConsumerCloseNoLock() { |
@@ -506,9 +513,9 @@ void DataPipe::ConsumerCloseNoLock() { |
DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) |
<< "Consumer closed with active two-phase read"; |
consumer_two_phase_max_num_bytes_read_ = 0; |
- ConsumerCloseImplNoLock(); |
+ impl_->ConsumerClose(); |
AwakeProducerAwakablesForStateChangeNoLock( |
- ProducerGetHandleSignalsStateImplNoLock()); |
+ impl_->ProducerGetHandleSignalsState()); |
} |
void DataPipe::AwakeProducerAwakablesForStateChangeNoLock( |