Index: media/blink/multibuffer.cc |
diff --git a/media/blink/multibuffer.cc b/media/blink/multibuffer.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ef0d97ccc81f2f1877ede971a9bd078d5350af88 |
--- /dev/null |
+++ b/media/blink/multibuffer.cc |
@@ -0,0 +1,411 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "media/blink/multibuffer.h" |
+ |
+#include "base/bind.h" |
+ |
+namespace std { |
+ |
+ostream& operator<<(ostream& o, const media::MultiBufferBlockId& id) { |
+ if (id.url_data()) { |
+ return o << "{" << id.url_data()->url() << ", " << id.block_num() << "}"; |
+ } else { |
+ return o << "{null, " << id.block_num() << "}"; |
+ } |
+} |
+ |
+} // namespace std |
+ |
+namespace media { |
+ |
+MultiBufferBlockId::MultiBufferBlockId() : |
+ url_data_(nullptr), |
+ block_num_(0) { |
+} |
+MultiBufferBlockId::MultiBufferBlockId(MultiBufferUrlData url_data, |
+ MultiBufferBlockNum block_num) : |
+ url_data_(url_data), |
+ block_num_(block_num) { |
+} |
+ |
+MultiBufferBlockId::MultiBufferBlockId(const MultiBufferBlockId& block_id) : |
+ url_data_(block_id.url_data()), |
+ block_num_(block_id.block_num()) { |
+} |
+ |
+MultiBufferBlockId::~MultiBufferBlockId() {} |
+ |
+// Returns the block ID closest to (but less or equal than) |pos| from |index|. |
+template<class T> |
+static MultiBuffer::BlockId ClosestPreviousEntry( |
+ const std::map<MultiBuffer::BlockId, T>& index, |
+ MultiBuffer::BlockId pos) { |
+ auto i = index.upper_bound(pos); |
+ DCHECK(i == index.end() || i->first > pos); |
+ if (i == index.begin()) { |
+ return MultiBufferBlockId(); |
+ } |
+ --i; |
+ if (!i->first.SameUrl(pos)) { |
+ return MultiBufferBlockId(); |
+ } |
+ DCHECK_LE(i->first, pos); |
+ return i->first; |
+} |
+ |
+// Returns the block ID closest to (but greter than or equal to) |pos| |
+// from |index|. |
+template<class T> |
+static MultiBuffer::BlockId ClosestNextEntry( |
+ const std::map<MultiBuffer::BlockId, T>& index, |
+ MultiBuffer::BlockId pos) { |
+ auto i = index.lower_bound(pos); |
+ if (i == index.end()) { |
+ return MultiBufferBlockId(); |
+ } |
+ if (!i->first.SameUrl(pos)) { |
+ return MultiBufferBlockId(); |
+ } |
+ DCHECK_GE(i->first, pos); |
+ return i->first; |
+} |
+ |
+// |
+// MultiBuffer |
+// |
+MultiBuffer::MultiBuffer(int32_t block_size_shift) : |
+ max_size_(0), |
+ block_size_shift_(block_size_shift) { |
+} |
+ |
+MultiBuffer::~MultiBuffer() { |
+ // Delete all writers. |
+ for (const auto& i : writer_index_) { |
+ delete i.second; |
+ } |
+} |
+ |
+void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) { |
+ std::set<Reader*> &set_of_readers = readers_[pos]; |
+ bool already_waited_for = !set_of_readers.empty(); |
+ set_of_readers.insert(reader); |
+ |
+ if (already_waited_for || Contains(pos)) { |
+ return; |
+ } |
+ |
+ // We may need to create a new data provider to service this request. |
+ // Look for an existing data provider first. |
+ DataProvider* provider = NULL; |
+ BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); |
+ |
+ if (pos - closest_writer < kMaxWaitForWriterOffset) { |
+ BlockId closest_block = ClosestPreviousEntry(data_, pos); |
+ if (pos - closest_block > pos - closest_writer) { |
+ provider = writer_index_[closest_writer]; |
+ DCHECK(provider); |
+ } |
+ } |
+ if (!provider) { |
+ provider = writer_index_[pos] = CreateWriter(pos); |
+ provider->SetAvailableCallback( |
+ base::Bind(&MultiBuffer::DataProviderEvent, |
+ base::Unretained(this), |
+ base::Unretained(provider))); |
+ } |
+ provider->SetDeferred(false); |
+} |
+ |
+void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) { |
+ auto i = readers_.find(pos); |
+ if (i == readers_.end()) |
+ return; |
+ i->second.erase(reader); |
+ if (i->second.empty()) { |
+ readers_.erase(i); |
+ } |
+} |
+ |
+void MultiBuffer::CleanupWriters(const BlockId& pos) { |
+ BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); |
+ if (closest_writer == BlockId()) |
+ return; |
+ if (pos - closest_writer > kMaxWaitForWriterOffset) |
+ return; |
+ DCHECK(writer_index_[closest_writer]); |
+ DataProviderEvent(writer_index_[closest_writer]); |
+} |
+ |
+bool MultiBuffer::Contains(const BlockId& pos) const { |
+ DCHECK(present_[pos] == 0 || present_[pos] == 1) |
+ << " pos = " << pos |
+ << " present_[pos] " << present_[pos]; |
+ DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0); |
+ return !!present_[pos]; |
+} |
+ |
+MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const { |
+ auto i = present_.find(pos); |
+ if (i.value()) { |
+ return i.range_end(); |
+ } else { |
DaleCurtis
2015/10/19 21:45:25
Don't use else w/ return. Ternary?
hubbe
2015/10/20 00:31:39
I'm not a big fan of terniary.
Is this better?
|
+ return pos; |
+ } |
+} |
+ |
+void MultiBuffer::NotifyAvailableRange( |
+ const Range<MultiBufferBlockId>& observer_range, |
+ const Range<MultiBufferBlockId>& new_range) { |
+ std::set<Reader*> tmp; |
+ for (auto i = readers_.lower_bound(observer_range.begin); |
+ i != readers_.end() && i->first < observer_range.end; |
+ ++i) { |
+ tmp.insert(i->second.begin(), i->second.end()); |
+ } |
+ for (Reader* reader: tmp) { |
+ reader->NotifyAvailableRange(new_range); |
+ } |
+} |
+ |
+void MultiBuffer::Prune(size_t max_to_free) { |
+ // Use a rangemap to merge all consequtive frees into |
+ // ranges, then notify observers of changes to those ranges. |
+ RangeMap<BlockId, int32_t> freed; |
+ while (static_cast<int64_t>(data_.size()) > max_size_ && |
+ !lru_.Empty() && |
+ max_to_free > 0) { |
+ BlockId to_free = lru_.Pop(); |
+ DCHECK(data_[to_free]); |
+ DCHECK_EQ(pinned_[to_free], 0); |
+ DCHECK_EQ(present_[to_free], 1); |
+ data_.erase(to_free); |
+ freed.IncrementRange(to_free, to_free + 1, 1); |
+ present_.IncrementRange(to_free, to_free + 1, -1); |
+ max_to_free--; |
+ } |
+ |
+ for (auto freed_iter = freed.first_range(); |
+ freed_iter != freed.last_range(); |
+ ++freed_iter) { |
+ if (freed_iter.value()) { |
+ // Technically, there shouldn't be any observers in this range |
+ // as all observers really should be pinning the range where it's |
+ // actually observing. |
+ NotifyAvailableRange( |
+ freed_iter.range(), |
+ // Empty range. |
+ Range<BlockId>(freed_iter.range_begin(), |
+ freed_iter.range_begin())); |
+ |
+ auto i = present_.find(freed_iter.range_begin()); |
+ DCHECK_EQ(i.value(), 0); |
+ DCHECK_LE(i.range_begin(), freed_iter.range_begin()); |
+ DCHECK_LE(freed_iter.range_end(), i.range_end()); |
+ |
+ if (i.range_begin() == freed_iter.range_begin()) { |
+ // Notify the previous range that it contains fewer blocks. |
+ auto j = i; |
+ --j; |
+ DCHECK_EQ(j.value(), 1); |
+ NotifyAvailableRange(j.range(), j.range()); |
+ } |
+ if (i.range_end() == freed_iter.range_end()) { |
+ // Notify the following range that it contains fewer blocks. |
+ auto j = i; |
+ ++j; |
+ DCHECK_EQ(j.value(), 1); |
+ NotifyAvailableRange(j.range(), j.range()); |
+ } |
+ } |
+ } |
+} |
+ |
+void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) { |
+ // If there is already a provider in the same location, we delete it. |
+ DCHECK(!provider->Available()); |
+ BlockId pos = provider->Tell(); |
+ DataProvider** place = &writer_index_[pos]; |
+ DCHECK_NE(*place, provider.get()); |
+ if (*place) delete *place; |
+ *place = provider.release(); |
+} |
+ |
+scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider( |
+ DataProvider *provider) { |
+ BlockId pos = provider->Tell(); |
+ DCHECK_EQ(writer_index_[pos], provider); |
+ writer_index_.erase(pos); |
+ return scoped_ptr<DataProvider>(provider); |
+} |
+ |
+MultiBuffer::ProviderState MultiBuffer::SuggestProviderState( |
+ const BlockId& pos) const { |
+ MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos); |
+ if (next_reader_pos != MultiBufferBlockId() && |
+ (next_reader_pos - pos <= kMaxWaitForWriterOffset || |
+ !pos.url_data()->range_supported())) { |
+ // Check if there is another writer between us and the next reader. |
+ MultiBufferBlockId next_writer_pos = ClosestNextEntry( |
+ writer_index_, pos + 1); |
+ if (next_writer_pos == MultiBufferBlockId() || |
+ next_writer_pos > next_reader_pos) { |
+ return ProviderStateLoad; |
+ } |
+ } |
+ |
+ MultiBufferBlockId previous_reader_pos = ClosestPreviousEntry( |
+ readers_, pos - 1); |
+ if (previous_reader_pos != MultiBufferBlockId() && |
+ (pos - previous_reader_pos <= kMaxWaitForReaderOffset || |
+ !pos.url_data()->range_supported())) { |
+ MultiBufferBlockId previous_writer_pos = |
+ ClosestPreviousEntry(writer_index_, pos - 1); |
+ if (previous_writer_pos < previous_reader_pos) { |
+ return ProviderStateDefer; |
+ } |
+ } |
+ |
+ return ProviderStateDead; |
+} |
+ |
+bool MultiBuffer::ProviderCollision(const BlockId& id) const { |
+ // If there is a writer at the same location, it is always a collision. |
+ if (writer_index_.find(id) != writer_index_.end()) |
+ return true; |
+ |
+ // Data already exists at providers current position, |
+ // if the URL supports ranges, we can kill the data provider. |
+ if (id.url_data()->range_supported() && Contains(id)) |
+ return true; |
+ |
+ return false; |
+} |
+ |
+void MultiBuffer::DataProviderEvent(DataProvider *provider_tmp) { |
+ scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp)); |
+ BlockId start_pos = provider->Tell(); |
+ BlockId pos = start_pos; |
+ bool eof = false; |
+ |
+ while (!ProviderCollision(pos) && !eof) { |
+ if (!provider->Available()) { |
+ AddProvider(provider.Pass()); |
+ break; |
+ } |
+ DCHECK_GE(pos.block_num(), 0); |
+ data_[pos] = provider->Read(); |
+ eof = data_[pos]->end_of_stream(); |
+ if (!pinned_[pos]) |
+ lru_.Use(pos); |
+ ++pos; |
+ } |
+ |
+ if (pos > start_pos) { |
+ present_.SetRange(start_pos, pos, 1); |
+ Range<BlockId> expanded_range = present_.find(start_pos).range(); |
+ NotifyAvailableRange(expanded_range, expanded_range); |
+ |
+ Prune((pos - start_pos) * kMaxFreesPerAdd + 1); |
+ } |
+ |
+ // Check that it's still there before we try to delete it. |
+ auto i = writer_index_.find(pos); |
+ if (i != writer_index_.end() && i->second == provider_tmp) { |
+ switch (SuggestProviderState(pos)) { |
+ case ProviderStateLoad: |
+ // Not sure we actually need to do this |
+ provider_tmp->SetDeferred(false); |
+ break; |
+ case ProviderStateDefer: |
+ provider_tmp->SetDeferred(true); |
+ break; |
+ case ProviderStateDead: |
+ RemoveProvider(provider_tmp); |
+ break; |
+ } |
+ } |
+} |
+ |
+void MultiBuffer::UpdateUrlData(const MultiBufferUrlData& old_url_data, |
+ const MultiBufferUrlData& new_url_data) { |
DaleCurtis
2015/10/19 21:45:25
Alignment?
hubbe
2015/10/20 00:31:39
Done.
|
+ MultiBufferBlockId pos(old_url_data, 0); |
+ auto i = readers_.lower_bound(pos); |
+ while (i != readers_.end() && pos.SameUrl(i->first)) { |
+ std::set<Reader*> tmp; |
+ tmp.swap(i->second); |
+ auto j = i; |
+ ++j; |
+ readers_.erase(i); |
+ i = j; |
+ for (Reader* reader: tmp) { |
+ reader->UpdateUrlData(old_url_data, new_url_data); |
+ } |
+ } |
+} |
+ |
+ |
+void MultiBuffer::PinRange( |
+ const BlockId& from, const BlockId& to, int32_t howmuch) { |
DaleCurtis
2015/10/19 21:45:25
how_much
hubbe
2015/10/20 00:31:39
Done.
|
+ DCHECK_NE(howmuch, 0); |
+ DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << howmuch; |
+ pinned_.IncrementRange(from, to, howmuch); |
+ |
+ // Iterate over all the modified ranges and check if |
+ // any of them have transitioned in or out of the |
+ // unlocked state. If so, we iterate over all buffers |
+ // in that range and add/remove them from the LRU as |
+ // approperiate. We iterate *backwards* through the |
+ // ranges, with the idea that data in a continous range |
+ // should be freed from the end first. |
+ |
+ if (data_.empty()) |
+ return; |
+ |
+ auto range = pinned_.find(to - 1); |
+ while (1) { |
+ if (range.value() == 0 || range.value() == howmuch) { |
+ bool pin = range.value() == howmuch; |
+ BlockId begin = std::max(range.range_begin(), from); |
+ BlockId end = std::min(range.range_end(), to); |
+ if (begin >= end) |
+ break; |
+ DataMap::iterator k = data_.lower_bound(end); |
+ while (k != data_.begin()) { |
+ --k; |
+ if (k->first < begin) |
+ break; |
+ DCHECK(k->second); |
+ DCHECK_GE(k->first.block_num(), 0); |
+ if (pin) { |
+ DCHECK(pinned_[k->first]); |
+ lru_.Remove(k->first); |
+ } else { |
+ DCHECK(!pinned_[k->first]); |
+ lru_.Insert(k->first); |
+ } |
+ } |
+ } |
+ if (range == pinned_.first_range()) break; |
+ --range; |
+ } |
+} |
+ |
+void MultiBuffer::PinRanges(const RangeMap<BlockId, int32_t>& ranges) { |
+ // We know that "last_range" has a zero value, so it is |
+ // ok to skip over it. |
+ for (auto i = ranges.first_range(); i != ranges.last_range(); ++i) { |
+ if (i.value() != 0) { |
+ PinRange(i.range_begin(), i.range_end(), i.value()); |
+ } |
+ } |
+} |
+ |
+void MultiBuffer::IncrementMaxSize(int32_t size) { |
+ max_size_ += size; |
+ DCHECK_GE(max_size_, 0); |
+ // Pruning only happens when blocks are added. |
+} |
+ |
+} // namespace media |