Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(699)

Unified Diff: mojo/system/local_data_pipe.cc

Issue 129163003: Mojo: DataPipe: Implement "may discard" for two-phase writes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: oops Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/system/local_data_pipe.h ('k') | mojo/system/local_data_pipe_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/system/local_data_pipe.cc
diff --git a/mojo/system/local_data_pipe.cc b/mojo/system/local_data_pipe.cc
index 2e2395a8e7348fe3f51ed0fbc23d492c76ec20bf..2a1549c6b8d2b95fff0302504485cab75fb6a869 100644
--- a/mojo/system/local_data_pipe.cc
+++ b/mojo/system/local_data_pipe.cc
@@ -42,7 +42,6 @@ void LocalDataPipe::ProducerCloseImplNoLock() {
DCHECK(!consumer_in_two_phase_read_no_lock());
DestroyBufferNoLock();
}
- AwakeConsumerWaitersForStateChangeNoLock();
}
MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
@@ -61,11 +60,10 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
capacity_num_bytes());
if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
// Discard as much as needed (discard oldest first).
- size_t num_bytes_to_discard =
- num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_);
- start_index_ += num_bytes_to_discard;
- start_index_ %= capacity_num_bytes();
- current_num_bytes_ -= num_bytes_to_discard;
+ MarkDataAsConsumedNoLock(
+ num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_));
+ // No need to wake up write waiters, since we're definitely going to leave
+ // the buffer full.
}
} else {
if (all_or_none && *num_bytes > capacity_num_bytes() - current_num_bytes_) {
@@ -96,14 +94,8 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
num_bytes_to_write - num_bytes_to_write_first);
}
- bool was_empty = (current_num_bytes_ == 0);
-
current_num_bytes_ += num_bytes_to_write;
DCHECK_LE(current_num_bytes_, capacity_num_bytes());
-
- if (was_empty && num_bytes_to_write > 0)
- AwakeConsumerWaitersForStateChangeNoLock();
-
*num_bytes = static_cast<uint32_t>(num_bytes_to_write);
return MOJO_RESULT_OK;
}
@@ -114,19 +106,33 @@ MojoResult LocalDataPipe::ProducerBeginWriteDataImplNoLock(
bool all_or_none) {
DCHECK(consumer_open_no_lock());
+ // The index we need to start writing at.
+ size_t write_index =
+ (start_index_ + current_num_bytes_) % capacity_num_bytes();
+
size_t max_num_bytes_to_write = GetMaxNumBytesToWriteNoLock();
if (all_or_none && *buffer_num_bytes > max_num_bytes_to_write) {
- // Don't return "should wait" since you can't wait for a specified amount of
- // data.
- return MOJO_RESULT_OUT_OF_RANGE;
+ // In "may discard" mode, we can always write from the write index to the
+ // end of the buffer.
+ if (may_discard() &&
+ *buffer_num_bytes <= capacity_num_bytes() - write_index) {
+ // To do so, we need to discard an appropriate amount of data.
+ // We should only reach here if the start index is after the write index!
+ DCHECK_GE(start_index_, write_index);
+ DCHECK_GT(*buffer_num_bytes - max_num_bytes_to_write, 0u);
+ MarkDataAsConsumedNoLock(*buffer_num_bytes - max_num_bytes_to_write);
+ max_num_bytes_to_write = *buffer_num_bytes;
+ } else {
+ // Don't return "should wait" since you can't wait for a specified amount
+ // of data.
+ return MOJO_RESULT_OUT_OF_RANGE;
+ }
}
// Don't go into a two-phase write if there's no room.
if (max_num_bytes_to_write == 0)
return MOJO_RESULT_SHOULD_WAIT;
- size_t write_index =
- (start_index_ + current_num_bytes_) % capacity_num_bytes();
EnsureBufferNoLock();
*buffer = buffer_.get() + write_index;
*buffer_num_bytes = static_cast<uint32_t>(max_num_bytes_to_write);
@@ -143,21 +149,17 @@ MojoResult LocalDataPipe::ProducerEndWriteDataImplNoLock(
return MOJO_RESULT_INVALID_ARGUMENT;
}
- bool was_empty = (current_num_bytes_ == 0);
-
current_num_bytes_ += num_bytes_written;
DCHECK_LE(current_num_bytes_, capacity_num_bytes());
set_producer_two_phase_max_num_bytes_written_no_lock(0);
-
- if (was_empty && num_bytes_written > 0)
- AwakeConsumerWaitersForStateChangeNoLock();
-
return MOJO_RESULT_OK;
}
MojoWaitFlags LocalDataPipe::ProducerSatisfiedFlagsNoLock() {
MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
- if (consumer_open_no_lock() && current_num_bytes_ < capacity_num_bytes())
+ if (consumer_open_no_lock() &&
+ (may_discard() || current_num_bytes_ < capacity_num_bytes()) &&
+ !producer_in_two_phase_write_no_lock())
rv |= MOJO_WAIT_FLAG_WRITABLE;
return rv;
}
@@ -176,7 +178,6 @@ void LocalDataPipe::ConsumerCloseImplNoLock() {
if (!producer_open_no_lock() || !producer_in_two_phase_write_no_lock())
DestroyBufferNoLock();
current_num_bytes_ = 0;
- AwakeProducerWaitersForStateChangeNoLock();
}
MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements,
@@ -211,15 +212,7 @@ MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements,
num_bytes_to_read - num_bytes_to_read_first);
}
- bool was_full = (current_num_bytes_ == capacity_num_bytes());
-
- start_index_ += num_bytes_to_read;
- start_index_ %= capacity_num_bytes();
- current_num_bytes_ -= num_bytes_to_read;
-
- if (was_full && num_bytes_to_read > 0)
- AwakeProducerWaitersForStateChangeNoLock();
-
+ MarkDataAsConsumedNoLock(num_bytes_to_read);
*num_bytes = static_cast<uint32_t>(num_bytes_to_read);
return MOJO_RESULT_OK;
}
@@ -242,16 +235,9 @@ MojoResult LocalDataPipe::ConsumerDiscardDataImplNoLock(uint32_t* num_bytes,
MOJO_RESULT_FAILED_PRECONDITION;
}
- bool was_full = (current_num_bytes_ == capacity_num_bytes());
-
size_t num_bytes_to_discard =
std::min(static_cast<size_t>(*num_bytes), current_num_bytes_);
- start_index_ = (start_index_ + num_bytes_to_discard) % capacity_num_bytes();
- current_num_bytes_ -= num_bytes_to_discard;
-
- if (was_full && num_bytes_to_discard > 0)
- AwakeProducerWaitersForStateChangeNoLock();
-
+ MarkDataAsConsumedNoLock(num_bytes_to_discard);
*num_bytes = static_cast<uint32_t>(num_bytes_to_discard);
return MOJO_RESULT_OK;
}
@@ -295,24 +281,15 @@ MojoResult LocalDataPipe::ConsumerEndReadDataImplNoLock(
return MOJO_RESULT_INVALID_ARGUMENT;
}
- bool was_full = (current_num_bytes_ == capacity_num_bytes());
-
- start_index_ += num_bytes_read;
- DCHECK_LE(start_index_, capacity_num_bytes());
- start_index_ %= capacity_num_bytes();
- DCHECK_LE(num_bytes_read, current_num_bytes_);
- current_num_bytes_ -= num_bytes_read;
+ DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes());
+ MarkDataAsConsumedNoLock(num_bytes_read);
set_consumer_two_phase_max_num_bytes_read_no_lock(0);
-
- if (was_full && num_bytes_read > 0)
- AwakeProducerWaitersForStateChangeNoLock();
-
return MOJO_RESULT_OK;
}
MojoWaitFlags LocalDataPipe::ConsumerSatisfiedFlagsNoLock() {
MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
- if (current_num_bytes_ > 0)
+ if (current_num_bytes_ > 0 && !consumer_in_two_phase_read_no_lock())
rv |= MOJO_WAIT_FLAG_READABLE;
return rv;
}
@@ -360,5 +337,12 @@ size_t LocalDataPipe::GetMaxNumBytesToReadNoLock() {
return current_num_bytes_;
}
+void LocalDataPipe::MarkDataAsConsumedNoLock(size_t num_bytes) {
+ DCHECK_LE(num_bytes, current_num_bytes_);
+ start_index_ += num_bytes;
+ start_index_ %= capacity_num_bytes();
+ current_num_bytes_ -= num_bytes;
+}
+
} // namespace system
} // namespace mojo
« no previous file with comments | « mojo/system/local_data_pipe.h ('k') | mojo/system/local_data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698