| Index: mojo/edk/system/data_pipe.cc
|
| diff --git a/mojo/edk/system/data_pipe.cc b/mojo/edk/system/data_pipe.cc
|
| index 7167873266e740e66894892897d7a054f6f2e730..cff209af838e2f3bee4ea4cd18e2b57f2b6f31f9 100644
|
| --- a/mojo/edk/system/data_pipe.cc
|
| +++ b/mojo/edk/system/data_pipe.cc
|
| @@ -290,10 +290,8 @@ MojoResult DataPipe::ProducerSetOptions(uint32_t write_threshold_num_bytes) {
|
| HandleSignalsState old_producer_state =
|
| impl_->ProducerGetHandleSignalsState();
|
| producer_write_threshold_num_bytes_ = write_threshold_num_bytes;
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (!new_producer_state.equals(old_producer_state))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -328,10 +326,8 @@ MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
|
| impl_->ConsumerGetHandleSignalsState();
|
| MojoResult rv = impl_->ProducerWriteData(
|
| elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
|
| - HandleSignalsState new_consumer_state =
|
| - impl_->ConsumerGetHandleSignalsState();
|
| - if (!new_consumer_state.equals(old_consumer_state))
|
| - AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
| return rv;
|
| }
|
|
|
| @@ -368,6 +364,8 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
|
|
| HandleSignalsState old_consumer_state =
|
| impl_->ConsumerGetHandleSignalsState();
|
| + HandleSignalsState old_producer_state =
|
| + impl_->ProducerGetHandleSignalsState();
|
| MojoResult rv;
|
| if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
|
| num_bytes_written % element_num_bytes() != 0) {
|
| @@ -380,14 +378,10 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
|
| DCHECK(!producer_in_two_phase_write_no_lock());
|
| // If we're now writable, we *became* writable (since we weren't writable
|
| // during the two-phase write), so awake producer awakables.
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| - HandleSignalsState new_consumer_state =
|
| - impl_->ConsumerGetHandleSignalsState();
|
| - if (!new_consumer_state.equals(old_consumer_state))
|
| - AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
| return rv;
|
| }
|
|
|
| @@ -491,10 +485,8 @@ MojoResult DataPipe::ConsumerSetOptions(uint32_t read_threshold_num_bytes) {
|
| HandleSignalsState old_consumer_state =
|
| impl_->ConsumerGetHandleSignalsState();
|
| consumer_read_threshold_num_bytes_ = read_threshold_num_bytes;
|
| - HandleSignalsState new_consumer_state =
|
| - impl_->ConsumerGetHandleSignalsState();
|
| - if (!new_consumer_state.equals(old_consumer_state))
|
| - AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -527,10 +519,8 @@ MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
|
| impl_->ProducerGetHandleSignalsState();
|
| MojoResult rv = impl_->ConsumerReadData(
|
| elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read, peek);
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (!new_producer_state.equals(old_producer_state))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| return rv;
|
| }
|
|
|
| @@ -556,10 +546,8 @@ MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
|
| impl_->ProducerGetHandleSignalsState();
|
| MojoResult rv = impl_->ConsumerDiscardData(
|
| num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (!new_producer_state.equals(old_producer_state))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| return rv;
|
| }
|
|
|
| @@ -613,14 +601,10 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
|
| DCHECK(!consumer_in_two_phase_read_no_lock());
|
| // If we're now readable, we *became* readable (since we weren't readable
|
| // during the two-phase read), so awake consumer awakables.
|
| - HandleSignalsState new_consumer_state =
|
| - impl_->ConsumerGetHandleSignalsState();
|
| - if (!new_consumer_state.equals(old_consumer_state))
|
| - AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (!new_producer_state.equals(old_producer_state))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| return rv;
|
| }
|
|
|
| @@ -787,27 +771,31 @@ void DataPipe::SetConsumerClosedNoLock() {
|
| void DataPipe::ProducerCloseNoLock() {
|
| mutex_.AssertHeld();
|
| DCHECK(producer_open_);
|
| + HandleSignalsState old_consumer_state =
|
| + impl_->ConsumerGetHandleSignalsState();
|
| producer_open_ = false;
|
| if (has_local_producer_no_lock()) {
|
| ProducerCancelAllStateNoLock();
|
| producer_awakable_list_.reset();
|
| impl_->ProducerClose();
|
| - AwakeConsumerAwakablesForStateChangeNoLock(
|
| - impl_->ConsumerGetHandleSignalsState());
|
| }
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
| }
|
|
|
| void DataPipe::ConsumerCloseNoLock() {
|
| mutex_.AssertHeld();
|
| DCHECK(consumer_open_);
|
| + HandleSignalsState old_producer_state =
|
| + impl_->ProducerGetHandleSignalsState();
|
| consumer_open_ = false;
|
| if (has_local_consumer_no_lock()) {
|
| ConsumerCancelAllStateNoLock();
|
| consumer_awakable_list_.reset();
|
| impl_->ConsumerClose();
|
| - AwakeProducerAwakablesForStateChangeNoLock(
|
| - impl_->ProducerGetHandleSignalsState());
|
| }
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| }
|
|
|
| bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) {
|
| @@ -821,14 +809,10 @@ bool DataPipe::OnReadMessage(unsigned port, MessageInTransit* message) {
|
|
|
| bool rv = impl_->OnReadMessage(port, message);
|
|
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (!new_producer_state.equals(old_producer_state))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| - HandleSignalsState new_consumer_state =
|
| - impl_->ConsumerGetHandleSignalsState();
|
| - if (!new_consumer_state.equals(old_consumer_state))
|
| - AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
|
|
| return rv;
|
| }
|
| @@ -844,30 +828,32 @@ void DataPipe::OnDetachFromChannel(unsigned port) {
|
|
|
| impl_->OnDetachFromChannel(port);
|
|
|
| - HandleSignalsState new_producer_state =
|
| - impl_->ProducerGetHandleSignalsState();
|
| - if (!new_producer_state.equals(old_producer_state))
|
| - AwakeProducerAwakablesForStateChangeNoLock(new_producer_state);
|
| - HandleSignalsState new_consumer_state =
|
| - impl_->ConsumerGetHandleSignalsState();
|
| - if (!new_consumer_state.equals(old_consumer_state))
|
| - AwakeConsumerAwakablesForStateChangeNoLock(new_consumer_state);
|
| + OnProducerMaybeStateChange(old_producer_state,
|
| + impl_->ProducerGetHandleSignalsState());
|
| + OnConsumerMaybeStateChange(old_consumer_state,
|
| + impl_->ConsumerGetHandleSignalsState());
|
| }
|
|
|
| -void DataPipe::AwakeProducerAwakablesForStateChangeNoLock(
|
| +void DataPipe::OnProducerMaybeStateChange(
|
| + const HandleSignalsState& old_producer_state,
|
| const HandleSignalsState& new_producer_state) {
|
| mutex_.AssertHeld();
|
| - if (!has_local_producer_no_lock())
|
| - return;
|
| - producer_awakable_list_->AwakeForStateChange(new_producer_state);
|
| + if (!new_producer_state.equals(old_producer_state) &&
|
| + has_local_producer_no_lock()) {
|
| + producer_awakable_list_->OnStateChange(old_producer_state,
|
| + new_producer_state);
|
| + }
|
| }
|
|
|
| -void DataPipe::AwakeConsumerAwakablesForStateChangeNoLock(
|
| +void DataPipe::OnConsumerMaybeStateChange(
|
| + const HandleSignalsState& old_consumer_state,
|
| const HandleSignalsState& new_consumer_state) {
|
| mutex_.AssertHeld();
|
| - if (!has_local_consumer_no_lock())
|
| - return;
|
| - consumer_awakable_list_->AwakeForStateChange(new_consumer_state);
|
| + if (!new_consumer_state.equals(old_consumer_state) &&
|
| + has_local_consumer_no_lock()) {
|
| + consumer_awakable_list_->OnStateChange(old_consumer_state,
|
| + new_consumer_state);
|
| + }
|
| }
|
|
|
| void DataPipe::SetProducerClosed() {
|
|
|