Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(322)

Unified Diff: media/blink/multibuffer.cc

Issue 1165903002: Multi reader/writer cache/buffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: lru unit tests Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/blink/multibuffer.cc
diff --git a/media/blink/multibuffer.cc b/media/blink/multibuffer.cc
new file mode 100644
index 0000000000000000000000000000000000000000..9aec4c183cb2764b3148422796b246faa6c00df7
--- /dev/null
+++ b/media/blink/multibuffer.cc
@@ -0,0 +1,329 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/blink/multibuffer.h"
+
+namespace media {
+
+template<class T>
+static int32_t ClosestPreviousEntry(const std::map<int32_t, T>& index,
+ int32_t pos) {
+ typename std::map<int32_t, T>::const_iterator i = index.upper_bound(pos);
+ if (i == index.begin()) {
+ return -1;
+ }
+ --i;
+ return i->first;
+}
+
+MultiBuffer::MultiBuffer(int32_t block_size_shift) :
+ size_(0),
+ max_size_(0),
+ block_size_shift_(block_size_shift) {
+}
+MultiBuffer::~MultiBuffer() {
+ writer_index_.DeleteWaiters();
+}
+
+void MultiBuffer::AddWriter(int32_t pos, Waiter* writer) {
+ writer_index_.WaitFor(pos, writer);
+}
+
+void MultiBuffer::RemoveWriter(int32_t pos, Waiter* writer) {
+ writer_index_.StopWaitFor(pos, writer);
+}
+
+
+void MultiBuffer::WaitFor(int32_t pos, Waiter* reader) {
+ DCHECK(pinned_[pos]);
+ DCHECK(data_.find(pos) == data_.end());
+ reader_index_.WaitFor(pos, reader);
+
+ int32_t closest_writer = ClosestPreviousEntry(writer_index_.map(), pos);
+ int32_t closest_block = ClosestPreviousEntry(data_, pos);
+ if (closest_block >= closest_writer) {
+ closest_writer = -1;
+ }
+ if (closest_writer != -1 &&
+ pos - closest_writer < kMaxWaitForWriterOffset) {
+ // There is already a write available, wake it up.
+ writer_index_.WakeUp(closest_writer);
+ } else {
+ StartWriter(pos);
+ }
+}
+
+void MultiBuffer::DeferredWaitFor(int32_t pos, Waiter* reader) {
+ DCHECK(pinned_[pos]);
+ DCHECK(data_.find(pos) == data_.end());
+ deferred_reader_index_.WaitFor(pos, reader);
+}
+
+void MultiBuffer::StopWaitFor(int32_t pos, Waiter* reader) {
+ reader_index_.StopWaitFor(pos, reader);
+ deferred_reader_index_.StopWaitFor(pos, reader);
+}
+
+bool MultiBuffer::Contains(int32_t pos) const {
+ return data_.find(pos) != data_.end();
+}
+
+bool MultiBuffer::WantNow(int32_t pos) const {
+ return reader_index_.map().find(pos) != reader_index_.map().end();
+}
+
+bool MultiBuffer::WantSoon(int32_t pos) const {
+ return deferred_reader_index_.map().find(pos) !=
+ deferred_reader_index_.map().end();
+}
+
+void MultiBuffer::Prune() {
+ int count = 0;
+ while (size_ > max_size_ && !lru_.Empty()) {
+ int32_t to_free = lru_.Pop();
+ DCHECK(data_[to_free]);
+ DCHECK_EQ(pinned_[to_free], 0);
+ data_.erase(to_free);
+ size_--;
+ if (++count > kMaxFreesPerAdd) break;
+ }
+}
+
+void MultiBuffer::AddData(int32_t pos, scoped_refptr<DataBuffer> data) {
+ if (data_[pos]) {
+ // We already have this data.
+ return;
+ }
+ data_[pos] = data;
+ if (!pinned_[pos]) {
+ lru_.Insert(pos);
+ }
+ size_++;
+ Prune();
+ reader_index_.WakeUp(pos);
+}
+
+void MultiBuffer::PinRange(int32_t from, int32_t to, int32_t howmuch) {
+ pinned_.IncrementRange(from, to, howmuch);
+
+ // Iterate over all the modified ranges and check if
+ // any of them have transitioned in or out of the
+ // unlocked state. If so, we iterate over all buffers
+ // in that range and add/remove them from the LRU as
+ // approperiate. We iterate *backwards* through the
+ // ranges, with the idea that data in a continous range
+ // should be freed from the end first.
+ RangeMap<int32_t, int32_t>::MapType::const_iterator i;
+ i = pinned_.map().lower_bound(to);
+ while (i != pinned_.map().begin()) {
+ if (i->first < from) break;
+ RangeMap<int32_t, int32_t>::MapType::const_iterator j = i;
+ i--;
+ if (i->second == 0 || i->second == howmuch) {
+ bool pin = i->second == howmuch;
+ int32_t begin = std::max(i->first, from);
+ int32_t end = j == pinned_.map().end() ? to : std::min(to, j->first);
+ DataMap::iterator k = data_.lower_bound(end);
+ while (k != data_.begin()) {
+ k--;
+ if (k->first < begin) break;
+ if (pin) {
+ lru_.Remove(k->first);
+ } else {
+ lru_.Insert(k->first);
+ }
+ }
+ }
+ }
+}
+
+void MultiBuffer::IncrementMaxSize(int32_t size) {
+ max_size_ += size;
+ DCHECK_GE(max_size_, 0);
+ // Pruning only happens when blocks are added.
+}
+
+// MultiBufferWriter
+
+MultiBufferWriter::MultiBufferWriter(MultiBuffer* multibuffer, int32_t pos) :
+ multibuffer_(multibuffer),
+ pos_(pos) {
+ multibuffer_->AddWriter(pos_, this);
+}
+
+MultiBufferWriter::~MultiBufferWriter() {
+ multibuffer_->RemoveWriter(pos_, this);
+}
+
+void MultiBufferWriter::Write(scoped_refptr<DataBuffer> data) {
+ multibuffer_->RemoveWriter(pos_, this);
+ if (multibuffer_->Contains(pos_)) {
+ delete this;
+ return;
+ }
+ multibuffer_->AddData(pos_, data);
+ pos_++;
+ multibuffer_->AddWriter(pos_, this);
liberato (no reviews please) 2015/06/09 15:00:53 it seems like there are many calls to AddWriter()
hubbe 2015/06/09 21:23:48 They seem evenly matched to me...
liberato (no reviews please) 2015/06/09 21:50:47 Indeed, you're right. i missed the first line of
+ if (!multibuffer_->Contains(pos_)) {
+ if (multibuffer_->WantNow(pos_)) {
+ return;
+ }
+ if (multibuffer_->WantSoon(pos_)) {
+ SetDeferred(true);
+ return;
+ }
+ }
+ delete this;
liberato (no reviews please) 2015/06/09 15:00:53 who calls Write()? how would they know that it ch
hubbe 2015/06/09 21:23:48 Updated comments in header file, hope it explains.
+}
+
+void MultiBufferWriter::Continue() {
+ SetDeferred(false);
+}
+
+
+MultiBufferReader::MultiBufferReader(MultiBuffer* multibuffer,
+ int64_t start,
+ int64_t end,
+ int64_t preload,
+ int64_t max_buffer_forward,
+ int64_t max_buffer_backward) :
+ multibuffer_(multibuffer),
+ pos_(start),
+ end_(end),
+ preload_(preload),
+ max_buffer_forward_(max_buffer_forward),
+ max_buffer_backward_(max_buffer_backward) {
+ multibuffer_->IncrementMaxSize(
+ block_ciel(max_buffer_forward_ + max_buffer_backward_));
+ multibuffer_->PinRange(
+ block(std::max<int64_t>(0, pos_ - max_buffer_backward_)),
+ block_ciel(std::min(pos_ + max_buffer_forward_, end_)),
+ 1);
+ IncrementPreloadPos();
+}
+
+MultiBufferReader::~MultiBufferReader() {
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ multibuffer_->IncrementMaxSize(
+ -block_ciel(max_buffer_forward_ + max_buffer_backward_));
+ multibuffer_->PinRange(
+ block(std::max<int64_t>(0, pos_ - max_buffer_backward_)),
+ block_ciel(std::min(pos_ + max_buffer_forward_, end_)),
+ -1);
+}
+
+int32_t MultiBufferReader::block(int64_t byte_pos) const {
+ return byte_pos << multibuffer_->block_size_shift();
+}
+
+int32_t MultiBufferReader::block_ciel(int64_t byte_pos) const {
+ return block(byte_pos + (1 << multibuffer_->block_size_shift()) - 1);
+}
+
+void MultiBufferReader::Seek(int64_t pos) {
+ DCHECK(cb_.is_null());
+ RangeMap<int32_t, int32_t> tmp;
+ tmp.IncrementRange(block(std::max<int64_t>(0, pos_ - max_buffer_backward_)),
+ block_ciel(std::min(pos_ + max_buffer_forward_, end_)),
+ -1);
+
+ // If we're seeking to somewhere between pos_ and
+ // preload_pos_, then we don't need to reset preload_pos_.
+ if (pos < pos_ || block(pos) >= preload_pos_) {
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ preload_pos_ = pos_;
+ }
+ pos_ = pos;
+ IncrementPreloadPos();
+
+ tmp.IncrementRange(block(std::max<int64_t>(0, pos_ - max_buffer_backward_)),
+ block_ciel(std::min(pos_ + max_buffer_forward_, end_)),
liberato (no reviews please) 2015/06/09 15:00:53 nit: s/ciel/ceil/
hubbe 2015/06/09 21:23:48 Done.
+ 1);
+ RangeMap<int32_t, int32_t>::MapType::const_iterator i, j;
+ for (i = tmp.map().begin(); i != tmp.map().end(); ++i) {
+ if (i->second != 0) {
+ j = i;
+ ++j;
+ multibuffer_->PinRange(i->first, j->first, i->second);
+ }
+ }
+}
+
+int64_t MultiBufferReader::Available() const {
+ uint64_t preload_byte_pos =
+ preload_pos_ << multibuffer_->block_size_shift();
+ return preload_byte_pos - pos_;
+}
+
+bool MultiBufferReader::TryRead(unsigned char *data, int64_t len) {
+ if (Available() < len) {
+ return false;
+ }
+ const MultiBuffer::DataMap& data_map = multibuffer_->map();
+ MultiBuffer::DataMap::const_iterator i = data_map.find(block(pos_));
+ DCHECK(i != data_map.end());
+ int64_t p = pos_;
+ while (len) {
+ DCHECK_EQ(i->first, block(pos_));
+ size_t offset = p & ((1 << multibuffer_->block_size_shift()) - 1);
+ size_t tocopy = i->second->data_size() - offset;
+ memcpy(data, i->second->data() + offset, tocopy);
+ data += tocopy;
+ len -= tocopy;
+ p += tocopy;
+ ++i;
liberato (no reviews please) 2015/06/09 15:00:53 i'm not sure that this doesn't sometimes walk righ
hubbe 2015/06/09 21:23:48 Hmm, why are you not sure? What can I do to make y
liberato (no reviews please) 2015/06/09 21:50:47 i'm not sure. :) the available() test is suppose
hubbe 2015/06/09 23:14:12 What you're describing should never happen because
+ }
+ Seek(p);
+ return true;
+}
+
+void MultiBufferReader::Wait(int64_t len, base::Closure cb) {
+ DCHECK_LE(Available(), len);
+ DCHECK_LE(len, max_buffer_forward_);
+ current_wait_size_ = len;
+ cb_ = cb;
+ IncrementPreloadPos();
+}
+
+void MultiBufferReader::Continue() {
+ IncrementPreloadPos();
+}
+
+void MultiBufferReader::CheckWait() {
+ if (!cb_.is_null() && Available() >= current_wait_size_) {
+ base::Closure cb;
+ std::swap(cb_, cb);
+ current_wait_size_ = 0;
+ cb.Run();
+ }
+}
+
+void MultiBufferReader::IncrementPreloadPos() {
+ int64_t max_preload = block_ciel(
+ std::min(end_, pos_ + std::max(preload_, current_wait_size_)));
+
+ if (preload_pos_ < max_preload) {
+ MultiBuffer::DataMap::const_iterator i =
+ multibuffer_->map().find(preload_pos_);
+ if (i != multibuffer_->map().end()) {
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ preload_pos_++;
+ ++i;
+ while (preload_pos_ < max_preload &&
+ i != multibuffer_->map().end() &&
+ i->first == preload_pos_) {
+ preload_pos_++;
+ ++i;
+ }
+ }
+ }
+ if (preload_pos_ < max_preload) {
+ multibuffer_->WaitFor(preload_pos_, this);
+ } else if (preload_pos_ < end_) {
+ multibuffer_->DeferredWaitFor(preload_pos_, this);
+ }
+ CheckWait();
+}
+
+} // namespace media

Powered by Google App Engine
This is Rietveld 408576698