Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(515)

Unified Diff: media/blink/multibuffer.cc

Issue 1165903002: Multi reader/writer cache/buffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: more tests Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/blink/multibuffer.cc
diff --git a/media/blink/multibuffer.cc b/media/blink/multibuffer.cc
new file mode 100644
index 0000000000000000000000000000000000000000..5447aca5dd80d19a3cf72cb98c2a4d2987f70719
--- /dev/null
+++ b/media/blink/multibuffer.cc
@@ -0,0 +1,668 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/blink/multibuffer.h"
+
+#include "base/bind.h"
+#include "base/callback_helpers.h"
+#include "net/base/net_errors.h"
+
+namespace std {
+
+ostream& operator<<(ostream& o, const media::MultiBufferBlockId& id) {
+ if (id.url_id()) {
+ return o << "{" << id.url_id()->url() << ", " << id.block_num() << "}";
+ } else {
+ return o << "{null, " << id.block_num() << "}";
+ }
+}
+
+} // namespace std
+
+namespace media {
+
+MultiBufferBlockId::MultiBufferBlockId() :
+ url_id_(nullptr),
+ block_num_(-1) {
liberato (no reviews please) 2015/10/09 17:36:09 why not MultiBufferBlockId::min()?
hubbe 2015/10/13 23:08:16 I changed it to 0 actually. Also, I added some mor
+}
+MultiBufferBlockId::MultiBufferBlockId(MultiBufferUrlId url_id,
+ MultiBufferBlockNum block_num) :
+ url_id_(url_id),
+ block_num_(block_num) {
+}
+
+MultiBufferBlockId::MultiBufferBlockId(const MultiBufferBlockId& block_id) :
+ url_id_(block_id.url_id()),
+ block_num_(block_id.block_num()) {
+}
+
+MultiBufferBlockId::~MultiBufferBlockId() {}
+
+MultiBufferDataProvider::~MultiBufferDataProvider() {}
+
+template<class T>
+static MultiBuffer::BlockId ClosestPreviousEntry(
liberato (no reviews please) 2015/10/09 17:36:09 previous or equal?
hubbe 2015/10/13 23:08:16 Comment added.
+ const std::map<MultiBuffer::BlockId, T>& index,
+ MultiBuffer::BlockId pos) {
+ auto i = index.upper_bound(pos);
+ DCHECK(i == index.end() || i->first > pos);
+ if (i == index.begin()) {
+ return MultiBufferBlockId(kUnknownUrlId, -1);
+ }
+ --i;
+ if (!i->first.SameUrl(pos)) {
+ return MultiBufferBlockId(kUnknownUrlId, -1);
+ }
+ DCHECK(i->first <= pos);
+ return i->first;
+}
+
+template<class T>
+static MultiBuffer::BlockId ClosestNextEntry(
+ const std::map<MultiBuffer::BlockId, T>& index,
+ MultiBuffer::BlockId pos) {
+ auto i = index.lower_bound(pos);
+ if (i == index.end()) {
+ return MultiBufferBlockId(kUnknownUrlId, -1);
+ }
+ if (!i->first.SameUrl(pos)) {
+ return MultiBufferBlockId(kUnknownUrlId, -1);
+ }
+ DCHECK(i->first <= pos);
+ return i->first;
+}
+
+//
+// ReaderIndex
+//
+ReaderIndex::ReaderIndex() {}
+ReaderIndex::~ReaderIndex() {}
+
+bool ReaderIndex::WaitFor(MultiBufferBlockId pos, Reader* reader) {
+ std::set<Reader*> &set_of_readers = readers_[pos];
+ bool ret = set_of_readers.empty();
+ set_of_readers.insert(reader);
+ return ret;
+}
+
+bool ReaderIndex::StopWaitFor(MultiBufferBlockId pos, Reader* reader) {
+ auto i = readers_.find(pos);
+ if (i != readers_.end()) {
+ i->second.erase(reader);
+ if (i->second.empty()) {
+ readers_.erase(i);
+ return true;
+ }
+ return false;
+ }
+ return true;
+}
+
+int64_t ReaderIndex::DistToNext(MultiBufferBlockId pos) const {
+ auto i = readers_.lower_bound(pos);
+ if (i == readers_.end()) {
+ return std::numeric_limits<int64_t>::max();
+ }
+ DCHECK_GE(i->first, pos);
+ return i->first - pos;
+}
+
+void ReaderIndex::UpdateURLId(MultiBufferUrlId old_url_id,
+ MultiBufferUrlId new_url_id) {
+ MultiBufferBlockId pos(old_url_id, 0);
+ auto i = readers_.lower_bound(pos);
+ while (i != readers_.end() && pos.SameUrl(i->first)) {
+ std::set<Reader*> tmp;
+ tmp.swap(i->second);
+ auto j = i;
+ ++j;
+ readers_.erase(i);
+ i = j;
+ for (Reader* reader: tmp) {
+ reader->UpdateURLId(old_url_id, new_url_id);
+ }
+ }
+}
+
+void ReaderIndex::NotifyAvailableRange(
+ const Range<MultiBufferBlockId>& observer_range,
+ const Range<MultiBufferBlockId>& new_range) {
+ auto i = readers_.lower_bound(observer_range.begin);
+ while (i != readers_.end() && i->first < observer_range.end) {
+ std::set<Reader*> tmp;
+ tmp.swap(i->second);
+ auto j = i;
+ ++j;
+ readers_.erase(i);
+ i = j;
+ for (Reader* reader: tmp) {
+ reader->NotifyAvailableRange(new_range);
+ }
+ }
+}
+
+//
+// MultiBuffer
+//
+MultiBuffer::MultiBuffer(int32_t block_size_shift) :
+ max_size_(0),
+ block_size_shift_(block_size_shift) {
+}
+
+MultiBuffer::~MultiBuffer() {
+ // Delete all writers.
+ for (const auto& i : writer_index_) {
+ delete i.second;
+ }
+}
+
+void MultiBuffer::WaitFor(BlockId pos, Reader* reader) {
+ DCHECK(!Contains(pos));
+ DCHECK(data_.find(pos) == data_.end()) << "pos = " << pos;
+ if (!reader_index_.WaitFor(pos, reader)) {
+ return;
+ }
+
+ MultiBufferDataProvider* provider = NULL;
+ BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
+ if (pos - closest_writer < kMaxWaitForWriterOffset) {
+ BlockId closest_block = ClosestPreviousEntry(data_, pos);
+ if (pos - closest_block > pos - closest_writer) {
+ provider = writer_index_[closest_writer];
+ DCHECK(provider);
+ }
+ }
+ if (!provider) {
+ provider = writer_index_[pos] = CreateWriter(pos);
+ provider->SetAvailableCallback(
+ base::Bind(&MultiBuffer::DataProviderEvent,
+ base::Unretained(this),
+ base::Unretained(provider)));
+ }
+ provider->SetDeferred(false);
+}
+
+void MultiBuffer::DeferredWaitFor(BlockId pos, Reader* reader) {
+ deferred_reader_index_.WaitFor(pos, reader);
+}
+
+void MultiBuffer::StopWaitFor(BlockId pos, Reader* reader) {
+ reader_index_.StopWaitFor(pos, reader);
+ deferred_reader_index_.StopWaitFor(pos, reader);
+}
+
+void MultiBuffer::CleanupWriters(BlockId pos) {
+ BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
+ if (closest_writer == MultiBufferBlockId(kUnknownUrlId, -1))
liberato (no reviews please) 2015/10/09 17:36:08 might want to have an explicit unknown block id fo
hubbe 2015/10/13 23:08:16 Changed it to MultiBufferBlockId()
+ return;
+ if (pos - closest_writer > kMaxWaitForWriterOffset)
+ return;
+ DCHECK(writer_index_[closest_writer]);
+ DataProviderEvent(writer_index_[closest_writer]);
+}
+
+void MultiBuffer::Observe(BlockId pos, Reader* reader) {
+ observer_index_.WaitFor(pos, reader);
+}
+
+void MultiBuffer::StopObserve(BlockId pos, Reader* reader) {
+ observer_index_.StopWaitFor(pos, reader);
+}
+
+bool MultiBuffer::Contains(BlockId pos) const {
+ DCHECK(present_[pos] == 0 || present_[pos] == 1);
+ DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0);
+ return !!present_[pos];
+}
+
+MultiBufferBlockId MultiBuffer::FindNextUnavailable(BlockId pos) const {
+ auto i = present_.find(pos);
+ if (i.value()) {
+ return i.range_end();
+ } else {
+ return pos;
+ }
+}
+
+void MultiBuffer::Prune(size_t max_to_free) {
+ // Use a rangemap to merge all consequtive frees into
+ // ranges, then notify observers of changes to those ranges.
+ RangeMap<BlockId, int32_t> freed;
+ while (data_.size() > max_size_ && !lru_.Empty() && max_to_free > 0) {
+ BlockId to_free = lru_.Pop();
+ DCHECK(data_[to_free]);
+ DCHECK_EQ(pinned_[to_free], 0);
+ DCHECK_EQ(present_[to_free], 1);
+ data_.erase(to_free);
+ freed.IncrementRange(to_free, to_free + 1, 1);
+ present_.IncrementRange(to_free, to_free + 1, -1);
+ max_to_free--;
+ }
+
+ for (auto freed_iter = freed.first_range();
+ freed_iter != freed.last_range();
+ ++freed_iter) {
+ if (freed_iter.value()) {
+ // Technically, there shouldn't be any observers in this range
+ // as all observers really should be pinning the range where it's
+ // actually observing.
+ observer_index_.NotifyAvailableRange(
+ freed_iter.range(),
+ // Empty range.
+ Range<BlockId>(freed_iter.range_begin(),
+ freed_iter.range_begin()));
+
+ auto i = present_.find(freed_iter.range_begin());
+ DCHECK_EQ(i.value(), 0);
+ DCHECK_LE(i.range_begin(), freed_iter.range_begin());
+ DCHECK_LE(freed_iter.range_end(), i.range_end());
+
+ if (i.range_begin() == freed_iter.range_begin()) {
liberato (no reviews please) 2015/10/09 17:36:09 perhaps comment to the effect of "notify that a ra
hubbe 2015/10/13 23:08:16 Done.
+ auto j = i;
+ --j;
+ DCHECK_EQ(j.value(), 1);
+ observer_index_.NotifyAvailableRange(j.range(), j.range());
+ }
+ if (i.range_end() == freed_iter.range_end()) {
+ auto j = i;
+ ++j;
+ DCHECK_EQ(j.value(), 1);
+ observer_index_.NotifyAvailableRange(j.range(), j.range());
+ }
+ }
+ }
+}
+
+void MultiBuffer::AddProvider(scoped_ptr<MultiBufferDataProvider> provider) {
+ // If there is already a provider in the same location, we delete it.
+ DCHECK(!provider->Available());
+ BlockId pos = provider->Tell();
+ MultiBufferDataProvider** place = &writer_index_[pos];
+ DCHECK_NE(*place, provider.get());
+ if (*place) delete *place;
+ *place = provider.release();
+}
+
+scoped_ptr<MultiBufferDataProvider> MultiBuffer::RemoveProvider(
+ MultiBufferDataProvider *provider) {
+ BlockId pos = provider->Tell();
+ DCHECK_EQ(writer_index_[pos], provider);
+ writer_index_.erase(pos);
+ return scoped_ptr<MultiBufferDataProvider>(provider);
+}
+
+void MultiBuffer::DataProviderEvent(MultiBufferDataProvider *provider_tmp) {
+ scoped_ptr<MultiBufferDataProvider> provider(RemoveProvider(provider_tmp));
+ BlockId start_pos = provider->Tell();
+ BlockId pos = start_pos;
+ bool eof = false;
+
+ while (provider->Available() && !Contains(pos) && !eof) {
+ DCHECK_GE(pos.block_num(), 0);
+ data_[pos] = provider->Read();
+ eof = data_[pos]->end_of_stream();
+ if (!pinned_[pos]) {
+ lru_.Insert(pos);
+ }
+ ++pos;
+ }
+
+ if (!Contains(pos) && !eof) {
+ AddProvider(provider.Pass());
+ }
+
+ if (pos > start_pos) {
+ present_.IncrementRange(start_pos, pos, 1);
+ Range<BlockId> expanded_range = present_.find(start_pos).range();
+ observer_index_.NotifyAvailableRange(expanded_range, expanded_range);
+
+ Prune((pos - start_pos) * kMaxFreesPerAdd + 1);
+ }
+
+ // Check that it's still there before we try to delete it.
+ auto i = writer_index_.find(pos);
+ if (i != writer_index_.end() &&
+ i->second == provider_tmp &&
+ reader_index_.DistToNext(pos) > kMaxWaitForWriterOffset) {
+ if (deferred_reader_index_.DistToNext(pos) > kMaxWaitForWriterOffset) {
+ RemoveProvider(provider_tmp);
+ } else {
+ provider_tmp->SetDeferred(true);
+ }
+ }
+}
+
+void MultiBuffer::UpdateURLId(MultiBufferUrlId old_url_id,
+ MultiBufferUrlId new_url_id) {
+ reader_index_.UpdateURLId(old_url_id, new_url_id);
+ deferred_reader_index_.UpdateURLId(old_url_id, new_url_id);
+ observer_index_.UpdateURLId(old_url_id, new_url_id);
+}
+
+
+void MultiBuffer::PinRange(BlockId from, BlockId to, int32_t howmuch) {
+ DCHECK_NE(howmuch, 0);
+ DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << howmuch;
+ pinned_.IncrementRange(from, to, howmuch);
+
+ // Iterate over all the modified ranges and check if
+ // any of them have transitioned in or out of the
+ // unlocked state. If so, we iterate over all buffers
+ // in that range and add/remove them from the LRU as
+ // approperiate. We iterate *backwards* through the
+ // ranges, with the idea that data in a continous range
+ // should be freed from the end first.
+
+ if (data_.empty())
+ return;
+
+ auto range = pinned_.find(to - 1);
+ while (1) {
+ if (range.value() == 0 || range.value() == howmuch) {
+ bool pin = range.value() == howmuch;
+ BlockId begin = std::max(range.range_begin(), from);
+ BlockId end = std::min(range.range_end(), to);
+ if (begin >= end)
+ break;
+ DataMap::iterator k = data_.lower_bound(end);
+ while (k != data_.begin()) {
+ --k;
+ if (k->first < begin)
+ break;
+ DCHECK(k->second);
+ DCHECK_GE(k->first.block_num(), 0);
+ if (pin) {
+ DCHECK(pinned_[k->first]);
+ lru_.Remove(k->first);
+ } else {
+ DCHECK(!pinned_[k->first]);
+ lru_.Insert(k->first);
+ }
+ }
+ }
+ if (range == pinned_.first_range()) break;
+ --range;
+ }
+}
+
+void MultiBuffer::IncrementMaxSize(int32_t size) {
+ max_size_ += size;
+ DCHECK_GE(max_size_, 0);
+ // Pruning only happens when blocks are added.
+}
+
+MultiBufferReader::MultiBufferReader(
+ MultiBuffer* multibuffer,
+ MultiBufferUrlId url_id,
+ int64_t start,
+ int64_t end,
+ int64_t preload_high,
+ int64_t preload_low,
+ int64_t max_buffer_forward,
+ int64_t max_buffer_backward,
+ base::Callback<void(int64_t, int64_t)> progress_callback) :
+ multibuffer_(multibuffer),
+ url_id_(url_id),
+ end_(end == -1LL ? (1LL << (multibuffer->block_size_shift() + 30)) : end),
+ preload_high_(preload_high),
+ preload_low_(preload_low),
+ preload_pos_(kUnknownUrlId, -1),
+ loading_(true),
+ max_buffer_forward_(max_buffer_forward),
+ max_buffer_backward_(max_buffer_backward),
+ pos_(start),
+ progress_callback_(progress_callback),
+ current_wait_size_(0),
+ clear_on_delete_(nullptr) {
+ // Depends on multibuffer_ being initialized.
+ multibuffer_->IncrementMaxSize(
+ block_ceil(max_buffer_forward_ + max_buffer_backward_).block_num());
+ multibuffer_->PinRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ 1);
+ preload_pos_ = block(pos_);
+ multibuffer_->Observe(preload_pos_, this);
+ IncrementPreloadPos();
+}
+
+MultiBufferReader::~MultiBufferReader() {
+ if (clear_on_delete_) *clear_on_delete_ = false;
+ multibuffer_->StopObserve(preload_pos_, this);
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ multibuffer_->IncrementMaxSize(
+ -block_ceil(max_buffer_forward_ + max_buffer_backward_).block_num());
+ multibuffer_->PinRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ -1);
+ multibuffer_->CleanupWriters(preload_pos_);
+}
+
+MultiBuffer::BlockId MultiBufferReader::block(int64_t byte_pos) const {
+ return MultiBufferBlockId(
+ url_id_,
+ byte_pos >> multibuffer_->block_size_shift());
+}
+
+MultiBuffer::BlockId MultiBufferReader::block_ceil(int64_t byte_pos) const {
+ return block(byte_pos + (1LL << multibuffer_->block_size_shift()) - 1);
+}
+
+void MultiBufferReader::Seek(int64_t pos) {
+ // Use a rangemap to compute the diff in pinning.
+ RangeMap<MultiBuffer::BlockId, int32_t> tmp;
+ tmp.IncrementRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ -1);
+ tmp.IncrementRange(block(pos - max_buffer_backward_),
+ block_ceil(pos + max_buffer_forward_),
+ 1);
+
+ // We know that "last_range" has a zero value, so it is
+ // ok to skip over it.
+ for (auto i = tmp.first_range(); i != tmp.last_range(); ++i) {
liberato (no reviews please) 2015/10/09 17:36:09 nice, though this code appears twice and should pr
hubbe 2015/10/13 23:08:16 Made a MultiBuffer::PinRanges() convenience functi
+ if (i.value() != 0) {
+ multibuffer_->PinRange(i.range_begin(), i.range_end(), i.value());
+ }
+ }
+
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ multibuffer_->StopObserve(preload_pos_, this);
+ MultiBufferBlockId old_preload_pos = preload_pos_;
+ preload_pos_ = block(pos);
+ pos_ = pos;
+ if (IncrementPreloadPos()) {
+ multibuffer_->CleanupWriters(old_preload_pos);
+ }
+}
+
+void MultiBufferReader::SetMaxBuffer(int64_t backward, int64_t forward) {
+ // Use a rangemap to compute the diff in pinning.
+ RangeMap<MultiBuffer::BlockId, int32_t> tmp;
+ tmp.IncrementRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ -1);
+ max_buffer_backward_ = backward;
+ max_buffer_forward_ = forward;
+ tmp.IncrementRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ 1);
+ // We know that "last_range" has a zero value, so it is
+ // ok to skip over it.
+ for (auto i = tmp.first_range(); i != tmp.last_range(); ++i) {
+ if (i.value() != 0) {
+ multibuffer_->PinRange(i.range_begin(), i.range_end(), i.value());
+ }
+ }
+}
+
+
+int64_t MultiBufferReader::Available() const {
+ if (url_id_ == kUnknownUrlId)
+ return -1;
+
+ MultiBufferBlockId current_block = block(pos_);
+ int64_t unavailable_byte_pos =
+ static_cast<int64_t>(
+ multibuffer_->FindNextUnavailable(block(pos_)).block_num()) <<
+ multibuffer_->block_size_shift();
+ return std::max<int64_t>(0, unavailable_byte_pos - pos_);
+}
+
+int64_t MultiBufferReader::TryRead(unsigned char *data, int64_t len) {
+ DCHECK_GT(len, 0);
+ current_wait_size_ = 0;
+ cb_ = base::Closure();
+ DCHECK_LE(pos_ + len, end_);
+ const MultiBuffer::DataMap& data_map = multibuffer_->map();
+ MultiBuffer::DataMap::const_iterator i = data_map.find(block(pos_));
+ int64_t p = pos_;
+ int64_t bytes_read = 0;
+ while (bytes_read < len) {
+ if (i == data_map.end()) break;
+ if (i->first != block(p)) break;
+ if (i->second->end_of_stream()) break;
+ size_t offset = p & ((1LL << multibuffer_->block_size_shift()) - 1);
+ size_t tocopy = std::min<size_t>(len - bytes_read,
+ i->second->data_size() - offset);
+ memcpy(data, i->second->data() + offset, tocopy);
+ data += tocopy;
+ bytes_read += tocopy;
+ p += tocopy;
+ ++i;
+ }
+ Seek(p);
+ return bytes_read;
+}
+
+int MultiBufferReader::Wait(int64_t len, base::Closure cb) {
+ DCHECK_LE(pos_ + len, end_);
+ DCHECK_NE(Available(), -1);
+ DCHECK_LE(len, max_buffer_forward_);
+ current_wait_size_ = len;
+ IncrementPreloadPos();
+ if (Available() >= current_wait_size_) {
+ return net::OK;
+ } else {
+ cb_ = cb;
+ return net::ERR_IO_PENDING;
+ }
+}
+
+void MultiBufferReader::UpdateURLId(MultiBufferUrlId old_url_id,
+ MultiBufferUrlId new_url_id) {
+ DCHECK_EQ(old_url_id, url_id_);
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ multibuffer_->StopObserve(preload_pos_, this);
+ multibuffer_->PinRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ 1);
liberato (no reviews please) 2015/10/09 17:36:09 -1?
hubbe 2015/10/13 23:08:16 Good catch, guess I need more tests.
+ url_id_ = new_url_id;
+ if (url_id_ != kUnknownUrlId)
+ multibuffer_->PinRange(block(pos_ - max_buffer_backward_),
+ block_ceil(pos_ + max_buffer_forward_),
+ 1);
+
+ MultiBufferBlockId old_preload_pos = preload_pos_;
+ multibuffer_->CleanupWriters(old_preload_pos);
liberato (no reviews please) 2015/10/09 17:36:09 would this be better in the multibuffer, maybe in
hubbe 2015/10/13 23:08:16 Unfortunately it's not that easy, because sometime
+
+ preload_pos_ = block(pos_);
+ if (IncrementPreloadPos()) {
+ if (!progress_callback_.is_null()) {
+ MultiBuffer::BlockId unavailable_pos =
+ multibuffer_->FindNextUnavailable(preload_pos_);
+ progress_callback_.Run(
+ pos_,
+ static_cast<int64_t>(unavailable_pos.block_num()) <<
+ multibuffer_->block_size_shift());
+ }
+ }
+}
+
+void MultiBufferReader::SetPreload(int64_t preload_high, int64_t preload_low) {
+ multibuffer_->StopWaitFor(preload_pos_, this);
+ multibuffer_->StopObserve(preload_pos_, this);
+ preload_pos_ = block(pos_);
+ preload_high_ = preload_high;
+ preload_low_ = preload_low;
+ IncrementPreloadPos();
+}
+
+bool MultiBufferReader::IsLoading() const {
+ return loading_;
+}
+
+bool MultiBufferReader::CheckWait() {
+ if (!cb_.is_null() && (Available() >= current_wait_size_ ||
+ Available() == -1)) {
+ current_wait_size_ = 0;
+ bool still_alive = true;
+ clear_on_delete_ = &still_alive;
+ base::ResetAndReturn(&cb_).Run();
+ if (still_alive) {
+ clear_on_delete_ = NULL;
+ } else {
+ return false;
+ }
+ }
+ return true;
+}
+
+void MultiBufferReader::NotifyAvailableRange(
+ const Range<MultiBufferBlockId>& range) {
+
+ // Update end_ if we can.
+ if (range.end > range.begin) {
+ auto i = multibuffer_->map().find(range.end - 1);
+ DCHECK(i != multibuffer_->map().end());
+ if (i->second->end_of_stream()) {
+ // This is an upper limit because the last-to-one block is allowed
+ // to be smaller than the rest of the blocks.
+ int64_t size_upper_limit =
+ static_cast<int64_t>(range.end.block_num()) <<
+ multibuffer_->block_size_shift();
+ end_ = std::min(end_, size_upper_limit);
+ }
+ }
+ if (IncrementPreloadPos()) {
+ if (!progress_callback_.is_null()) {
+ progress_callback_.Run(static_cast<int64_t>(range.begin.block_num()) <<
+ multibuffer_->block_size_shift(),
+ static_cast<int64_t>(range.end.block_num()) <<
+ multibuffer_->block_size_shift());
+ }
+ }
+}
+
+bool MultiBufferReader::IncrementPreloadPos() {
+ int64_t effective_preload = loading_ ? preload_high_ : preload_low_;
+
+ loading_ = false;
+ if (url_id_ != kUnknownUrlId) {
+ MultiBuffer::BlockId max_preload = block_ceil(
+ std::min(end_, pos_ + std::max(effective_preload, current_wait_size_)));
+
+ multibuffer_->StopWaitFor(preload_pos_, this);
+
+ preload_pos_ = multibuffer_->FindNextUnavailable(preload_pos_);
+
+ DVLOG(4) << "IncrementPreloadPos"
+ << " pp = " << preload_pos_
+ << " block_ceil(end_) = " << block_ceil(end_)
+ << " end_ = " << end_
+ << " max_preload " << max_preload;
+
+ multibuffer_->Observe(preload_pos_, this);
+ if (preload_pos_ < block_ceil(end_)) {
+ if (preload_pos_ < max_preload) {
+ loading_ = true;
+ multibuffer_->WaitFor(preload_pos_, this);
+ } else {
+ multibuffer_->DeferredWaitFor(preload_pos_, this);
+ }
+ }
+ }
+ return CheckWait();
+}
+
+} // namespace media

Powered by Google App Engine
This is Rietveld 408576698