| Index: mojo/system/local_data_pipe.cc
|
| diff --git a/mojo/system/local_data_pipe.cc b/mojo/system/local_data_pipe.cc
|
| index 2e2395a8e7348fe3f51ed0fbc23d492c76ec20bf..2a1549c6b8d2b95fff0302504485cab75fb6a869 100644
|
| --- a/mojo/system/local_data_pipe.cc
|
| +++ b/mojo/system/local_data_pipe.cc
|
| @@ -42,7 +42,6 @@ void LocalDataPipe::ProducerCloseImplNoLock() {
|
| DCHECK(!consumer_in_two_phase_read_no_lock());
|
| DestroyBufferNoLock();
|
| }
|
| - AwakeConsumerWaitersForStateChangeNoLock();
|
| }
|
|
|
| MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
|
| @@ -61,11 +60,10 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
|
| capacity_num_bytes());
|
| if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
|
| // Discard as much as needed (discard oldest first).
|
| - size_t num_bytes_to_discard =
|
| - num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_);
|
| - start_index_ += num_bytes_to_discard;
|
| - start_index_ %= capacity_num_bytes();
|
| - current_num_bytes_ -= num_bytes_to_discard;
|
| + MarkDataAsConsumedNoLock(
|
| + num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_));
|
| + // No need to wake up write waiters, since we're definitely going to leave
|
| + // the buffer full.
|
| }
|
| } else {
|
| if (all_or_none && *num_bytes > capacity_num_bytes() - current_num_bytes_) {
|
| @@ -96,14 +94,8 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
|
| num_bytes_to_write - num_bytes_to_write_first);
|
| }
|
|
|
| - bool was_empty = (current_num_bytes_ == 0);
|
| -
|
| current_num_bytes_ += num_bytes_to_write;
|
| DCHECK_LE(current_num_bytes_, capacity_num_bytes());
|
| -
|
| - if (was_empty && num_bytes_to_write > 0)
|
| - AwakeConsumerWaitersForStateChangeNoLock();
|
| -
|
| *num_bytes = static_cast<uint32_t>(num_bytes_to_write);
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -114,19 +106,33 @@ MojoResult LocalDataPipe::ProducerBeginWriteDataImplNoLock(
|
| bool all_or_none) {
|
| DCHECK(consumer_open_no_lock());
|
|
|
| + // The index we need to start writing at.
|
| + size_t write_index =
|
| + (start_index_ + current_num_bytes_) % capacity_num_bytes();
|
| +
|
| size_t max_num_bytes_to_write = GetMaxNumBytesToWriteNoLock();
|
| if (all_or_none && *buffer_num_bytes > max_num_bytes_to_write) {
|
| - // Don't return "should wait" since you can't wait for a specified amount of
|
| - // data.
|
| - return MOJO_RESULT_OUT_OF_RANGE;
|
| + // In "may discard" mode, we can always write from the write index to the
|
| + // end of the buffer.
|
| + if (may_discard() &&
|
| + *buffer_num_bytes <= capacity_num_bytes() - write_index) {
|
| + // To do so, we need to discard an appropriate amount of data.
|
| + // We should only reach here if the start index is after the write index!
|
| + DCHECK_GE(start_index_, write_index);
|
| + DCHECK_GT(*buffer_num_bytes - max_num_bytes_to_write, 0u);
|
| + MarkDataAsConsumedNoLock(*buffer_num_bytes - max_num_bytes_to_write);
|
| + max_num_bytes_to_write = *buffer_num_bytes;
|
| + } else {
|
| + // Don't return "should wait" since you can't wait for a specified amount
|
| + // of data.
|
| + return MOJO_RESULT_OUT_OF_RANGE;
|
| + }
|
| }
|
|
|
| // Don't go into a two-phase write if there's no room.
|
| if (max_num_bytes_to_write == 0)
|
| return MOJO_RESULT_SHOULD_WAIT;
|
|
|
| - size_t write_index =
|
| - (start_index_ + current_num_bytes_) % capacity_num_bytes();
|
| EnsureBufferNoLock();
|
| *buffer = buffer_.get() + write_index;
|
| *buffer_num_bytes = static_cast<uint32_t>(max_num_bytes_to_write);
|
| @@ -143,21 +149,17 @@ MojoResult LocalDataPipe::ProducerEndWriteDataImplNoLock(
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - bool was_empty = (current_num_bytes_ == 0);
|
| -
|
| current_num_bytes_ += num_bytes_written;
|
| DCHECK_LE(current_num_bytes_, capacity_num_bytes());
|
| set_producer_two_phase_max_num_bytes_written_no_lock(0);
|
| -
|
| - if (was_empty && num_bytes_written > 0)
|
| - AwakeConsumerWaitersForStateChangeNoLock();
|
| -
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| MojoWaitFlags LocalDataPipe::ProducerSatisfiedFlagsNoLock() {
|
| MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
|
| - if (consumer_open_no_lock() && current_num_bytes_ < capacity_num_bytes())
|
| + if (consumer_open_no_lock() &&
|
| + (may_discard() || current_num_bytes_ < capacity_num_bytes()) &&
|
| + !producer_in_two_phase_write_no_lock())
|
| rv |= MOJO_WAIT_FLAG_WRITABLE;
|
| return rv;
|
| }
|
| @@ -176,7 +178,6 @@ void LocalDataPipe::ConsumerCloseImplNoLock() {
|
| if (!producer_open_no_lock() || !producer_in_two_phase_write_no_lock())
|
| DestroyBufferNoLock();
|
| current_num_bytes_ = 0;
|
| - AwakeProducerWaitersForStateChangeNoLock();
|
| }
|
|
|
| MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements,
|
| @@ -211,15 +212,7 @@ MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements,
|
| num_bytes_to_read - num_bytes_to_read_first);
|
| }
|
|
|
| - bool was_full = (current_num_bytes_ == capacity_num_bytes());
|
| -
|
| - start_index_ += num_bytes_to_read;
|
| - start_index_ %= capacity_num_bytes();
|
| - current_num_bytes_ -= num_bytes_to_read;
|
| -
|
| - if (was_full && num_bytes_to_read > 0)
|
| - AwakeProducerWaitersForStateChangeNoLock();
|
| -
|
| + MarkDataAsConsumedNoLock(num_bytes_to_read);
|
| *num_bytes = static_cast<uint32_t>(num_bytes_to_read);
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -242,16 +235,9 @@ MojoResult LocalDataPipe::ConsumerDiscardDataImplNoLock(uint32_t* num_bytes,
|
| MOJO_RESULT_FAILED_PRECONDITION;
|
| }
|
|
|
| - bool was_full = (current_num_bytes_ == capacity_num_bytes());
|
| -
|
| size_t num_bytes_to_discard =
|
| std::min(static_cast<size_t>(*num_bytes), current_num_bytes_);
|
| - start_index_ = (start_index_ + num_bytes_to_discard) % capacity_num_bytes();
|
| - current_num_bytes_ -= num_bytes_to_discard;
|
| -
|
| - if (was_full && num_bytes_to_discard > 0)
|
| - AwakeProducerWaitersForStateChangeNoLock();
|
| -
|
| + MarkDataAsConsumedNoLock(num_bytes_to_discard);
|
| *num_bytes = static_cast<uint32_t>(num_bytes_to_discard);
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -295,24 +281,15 @@ MojoResult LocalDataPipe::ConsumerEndReadDataImplNoLock(
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - bool was_full = (current_num_bytes_ == capacity_num_bytes());
|
| -
|
| - start_index_ += num_bytes_read;
|
| - DCHECK_LE(start_index_, capacity_num_bytes());
|
| - start_index_ %= capacity_num_bytes();
|
| - DCHECK_LE(num_bytes_read, current_num_bytes_);
|
| - current_num_bytes_ -= num_bytes_read;
|
| + DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes());
|
| + MarkDataAsConsumedNoLock(num_bytes_read);
|
| set_consumer_two_phase_max_num_bytes_read_no_lock(0);
|
| -
|
| - if (was_full && num_bytes_read > 0)
|
| - AwakeProducerWaitersForStateChangeNoLock();
|
| -
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| MojoWaitFlags LocalDataPipe::ConsumerSatisfiedFlagsNoLock() {
|
| MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
|
| - if (current_num_bytes_ > 0)
|
| + if (current_num_bytes_ > 0 && !consumer_in_two_phase_read_no_lock())
|
| rv |= MOJO_WAIT_FLAG_READABLE;
|
| return rv;
|
| }
|
| @@ -360,5 +337,12 @@ size_t LocalDataPipe::GetMaxNumBytesToReadNoLock() {
|
| return current_num_bytes_;
|
| }
|
|
|
| +void LocalDataPipe::MarkDataAsConsumedNoLock(size_t num_bytes) {
|
| + DCHECK_LE(num_bytes, current_num_bytes_);
|
| + start_index_ += num_bytes;
|
| + start_index_ %= capacity_num_bytes();
|
| + current_num_bytes_ -= num_bytes;
|
| +}
|
| +
|
| } // namespace system
|
| } // namespace mojo
|
|
|