Index: mojo/system/local_data_pipe.cc |
diff --git a/mojo/system/local_data_pipe.cc b/mojo/system/local_data_pipe.cc |
index 2e2395a8e7348fe3f51ed0fbc23d492c76ec20bf..2a1549c6b8d2b95fff0302504485cab75fb6a869 100644 |
--- a/mojo/system/local_data_pipe.cc |
+++ b/mojo/system/local_data_pipe.cc |
@@ -42,7 +42,6 @@ void LocalDataPipe::ProducerCloseImplNoLock() { |
DCHECK(!consumer_in_two_phase_read_no_lock()); |
DestroyBufferNoLock(); |
} |
- AwakeConsumerWaitersForStateChangeNoLock(); |
} |
MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements, |
@@ -61,11 +60,10 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements, |
capacity_num_bytes()); |
if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) { |
// Discard as much as needed (discard oldest first). |
- size_t num_bytes_to_discard = |
- num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_); |
- start_index_ += num_bytes_to_discard; |
- start_index_ %= capacity_num_bytes(); |
- current_num_bytes_ -= num_bytes_to_discard; |
+ MarkDataAsConsumedNoLock( |
+ num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_)); |
+ // No need to wake up write waiters, since we're definitely going to leave |
+ // the buffer full. |
} |
} else { |
if (all_or_none && *num_bytes > capacity_num_bytes() - current_num_bytes_) { |
@@ -96,14 +94,8 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements, |
num_bytes_to_write - num_bytes_to_write_first); |
} |
- bool was_empty = (current_num_bytes_ == 0); |
- |
current_num_bytes_ += num_bytes_to_write; |
DCHECK_LE(current_num_bytes_, capacity_num_bytes()); |
- |
- if (was_empty && num_bytes_to_write > 0) |
- AwakeConsumerWaitersForStateChangeNoLock(); |
- |
*num_bytes = static_cast<uint32_t>(num_bytes_to_write); |
return MOJO_RESULT_OK; |
} |
@@ -114,19 +106,33 @@ MojoResult LocalDataPipe::ProducerBeginWriteDataImplNoLock( |
bool all_or_none) { |
DCHECK(consumer_open_no_lock()); |
+ // The index we need to start writing at. |
+ size_t write_index = |
+ (start_index_ + current_num_bytes_) % capacity_num_bytes(); |
+ |
size_t max_num_bytes_to_write = GetMaxNumBytesToWriteNoLock(); |
if (all_or_none && *buffer_num_bytes > max_num_bytes_to_write) { |
- // Don't return "should wait" since you can't wait for a specified amount of |
- // data. |
- return MOJO_RESULT_OUT_OF_RANGE; |
+ // In "may discard" mode, we can always write from the write index to the |
+ // end of the buffer. |
+ if (may_discard() && |
+ *buffer_num_bytes <= capacity_num_bytes() - write_index) { |
+ // To do so, we need to discard an appropriate amount of data. |
+ // We should only reach here if the start index is after the write index! |
+ DCHECK_GE(start_index_, write_index); |
+ DCHECK_GT(*buffer_num_bytes - max_num_bytes_to_write, 0u); |
+ MarkDataAsConsumedNoLock(*buffer_num_bytes - max_num_bytes_to_write); |
+ max_num_bytes_to_write = *buffer_num_bytes; |
+ } else { |
+ // Don't return "should wait" since you can't wait for a specified amount |
+ // of data. |
+ return MOJO_RESULT_OUT_OF_RANGE; |
+ } |
} |
// Don't go into a two-phase write if there's no room. |
if (max_num_bytes_to_write == 0) |
return MOJO_RESULT_SHOULD_WAIT; |
- size_t write_index = |
- (start_index_ + current_num_bytes_) % capacity_num_bytes(); |
EnsureBufferNoLock(); |
*buffer = buffer_.get() + write_index; |
*buffer_num_bytes = static_cast<uint32_t>(max_num_bytes_to_write); |
@@ -143,21 +149,17 @@ MojoResult LocalDataPipe::ProducerEndWriteDataImplNoLock( |
return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- bool was_empty = (current_num_bytes_ == 0); |
- |
current_num_bytes_ += num_bytes_written; |
DCHECK_LE(current_num_bytes_, capacity_num_bytes()); |
set_producer_two_phase_max_num_bytes_written_no_lock(0); |
- |
- if (was_empty && num_bytes_written > 0) |
- AwakeConsumerWaitersForStateChangeNoLock(); |
- |
return MOJO_RESULT_OK; |
} |
MojoWaitFlags LocalDataPipe::ProducerSatisfiedFlagsNoLock() { |
MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE; |
- if (consumer_open_no_lock() && current_num_bytes_ < capacity_num_bytes()) |
+ if (consumer_open_no_lock() && |
+ (may_discard() || current_num_bytes_ < capacity_num_bytes()) && |
+ !producer_in_two_phase_write_no_lock()) |
rv |= MOJO_WAIT_FLAG_WRITABLE; |
return rv; |
} |
@@ -176,7 +178,6 @@ void LocalDataPipe::ConsumerCloseImplNoLock() { |
if (!producer_open_no_lock() || !producer_in_two_phase_write_no_lock()) |
DestroyBufferNoLock(); |
current_num_bytes_ = 0; |
- AwakeProducerWaitersForStateChangeNoLock(); |
} |
MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements, |
@@ -211,15 +212,7 @@ MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements, |
num_bytes_to_read - num_bytes_to_read_first); |
} |
- bool was_full = (current_num_bytes_ == capacity_num_bytes()); |
- |
- start_index_ += num_bytes_to_read; |
- start_index_ %= capacity_num_bytes(); |
- current_num_bytes_ -= num_bytes_to_read; |
- |
- if (was_full && num_bytes_to_read > 0) |
- AwakeProducerWaitersForStateChangeNoLock(); |
- |
+ MarkDataAsConsumedNoLock(num_bytes_to_read); |
*num_bytes = static_cast<uint32_t>(num_bytes_to_read); |
return MOJO_RESULT_OK; |
} |
@@ -242,16 +235,9 @@ MojoResult LocalDataPipe::ConsumerDiscardDataImplNoLock(uint32_t* num_bytes, |
MOJO_RESULT_FAILED_PRECONDITION; |
} |
- bool was_full = (current_num_bytes_ == capacity_num_bytes()); |
- |
size_t num_bytes_to_discard = |
std::min(static_cast<size_t>(*num_bytes), current_num_bytes_); |
- start_index_ = (start_index_ + num_bytes_to_discard) % capacity_num_bytes(); |
- current_num_bytes_ -= num_bytes_to_discard; |
- |
- if (was_full && num_bytes_to_discard > 0) |
- AwakeProducerWaitersForStateChangeNoLock(); |
- |
+ MarkDataAsConsumedNoLock(num_bytes_to_discard); |
*num_bytes = static_cast<uint32_t>(num_bytes_to_discard); |
return MOJO_RESULT_OK; |
} |
@@ -295,24 +281,15 @@ MojoResult LocalDataPipe::ConsumerEndReadDataImplNoLock( |
return MOJO_RESULT_INVALID_ARGUMENT; |
} |
- bool was_full = (current_num_bytes_ == capacity_num_bytes()); |
- |
- start_index_ += num_bytes_read; |
- DCHECK_LE(start_index_, capacity_num_bytes()); |
- start_index_ %= capacity_num_bytes(); |
- DCHECK_LE(num_bytes_read, current_num_bytes_); |
- current_num_bytes_ -= num_bytes_read; |
+ DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes()); |
+ MarkDataAsConsumedNoLock(num_bytes_read); |
set_consumer_two_phase_max_num_bytes_read_no_lock(0); |
- |
- if (was_full && num_bytes_read > 0) |
- AwakeProducerWaitersForStateChangeNoLock(); |
- |
return MOJO_RESULT_OK; |
} |
MojoWaitFlags LocalDataPipe::ConsumerSatisfiedFlagsNoLock() { |
MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE; |
- if (current_num_bytes_ > 0) |
+ if (current_num_bytes_ > 0 && !consumer_in_two_phase_read_no_lock()) |
rv |= MOJO_WAIT_FLAG_READABLE; |
return rv; |
} |
@@ -360,5 +337,12 @@ size_t LocalDataPipe::GetMaxNumBytesToReadNoLock() { |
return current_num_bytes_; |
} |
+void LocalDataPipe::MarkDataAsConsumedNoLock(size_t num_bytes) { |
+ DCHECK_LE(num_bytes, current_num_bytes_); |
+ start_index_ += num_bytes; |
+ start_index_ %= capacity_num_bytes(); |
+ current_num_bytes_ -= num_bytes; |
+} |
+ |
} // namespace system |
} // namespace mojo |