Chromium Code Reviews| Index: media/blink/multibuffer.cc |
| diff --git a/media/blink/multibuffer.cc b/media/blink/multibuffer.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..28b6bc4bdd0207c039399c57eb6890aa1a472f3e |
| --- /dev/null |
| +++ b/media/blink/multibuffer.cc |
| @@ -0,0 +1,474 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "media/blink/multibuffer.h" |
| + |
| +#include "base/bind.h" |
| + |
| +namespace media { |
| + |
| +// Returns the block ID closest to (but less or equal than) |pos| from |index|. |
| +template<class T> |
| +static MultiBuffer::BlockId ClosestPreviousEntry( |
| + const std::map<MultiBuffer::BlockId, T>& index, |
| + MultiBuffer::BlockId pos) { |
| + auto i = index.upper_bound(pos); |
| + DCHECK(i == index.end() || i->first > pos); |
| + if (i == index.begin()) { |
| + return std::numeric_limits<MultiBufferBlockId>::min(); |
| + } |
| + --i; |
| + DCHECK_LE(i->first, pos); |
| + return i->first; |
| +} |
| + |
| +// Returns the block ID closest to (but greter than or equal to) |pos| |
| +// from |index|. |
| +template<class T> |
| +static MultiBuffer::BlockId ClosestNextEntry( |
| + const std::map<MultiBuffer::BlockId, T>& index, |
| + MultiBuffer::BlockId pos) { |
| + auto i = index.lower_bound(pos); |
| + if (i == index.end()) { |
| + return std::numeric_limits<MultiBufferBlockId>::max(); |
| + } |
| + DCHECK_GE(i->first, pos); |
| + return i->first; |
| +} |
| + |
| +// |
| +// MultiBuffer::GlobalLRU |
| +// |
| +MultiBuffer::GlobalLRU::GlobalLRU() : max_size_(0), data_size_(0) {} |
| + |
| +MultiBuffer::GlobalLRU::~GlobalLRU() { |
| + // By the time we're freed, all blocks should have been removed, |
| + // and our sums should be zero. |
| + DCHECK(lru_.Empty()); |
| + DCHECK_EQ(max_size_, 0); |
| + DCHECK_EQ(data_size_, 0); |
| +} |
| + |
| +void MultiBuffer::GlobalLRU::Use(MultiBuffer* multibuffer, |
| + MultiBufferBlockId block_id) { |
| + GlobalBlockId id(multibuffer, block_id); |
| + lru_.Use(id); |
| +} |
| + |
| +void MultiBuffer::GlobalLRU::Insert(MultiBuffer* multibuffer, |
| + MultiBufferBlockId block_id) { |
| + GlobalBlockId id(multibuffer, block_id); |
| + lru_.Insert(id); |
| +} |
| + |
| +void MultiBuffer::GlobalLRU::Remove(MultiBuffer* multibuffer, |
| + MultiBufferBlockId block_id) { |
| + GlobalBlockId id(multibuffer, block_id); |
| + lru_.Remove(id); |
| +} |
| + |
| +bool MultiBuffer::GlobalLRU::Contains(MultiBuffer* multibuffer, |
| + MultiBufferBlockId block_id) { |
| + GlobalBlockId id(multibuffer, block_id); |
| + return lru_.Contains(id); |
| +} |
| + |
| +void MultiBuffer::GlobalLRU::IncrementDataSize(int64_t blocks) { |
| + data_size_ += blocks; |
| + DCHECK_GE(data_size_, 0); |
| +} |
| + |
| +void MultiBuffer::GlobalLRU::IncrementMaxSize(int64_t blocks) { |
| + max_size_ += blocks; |
| + DCHECK_GE(max_size_, 0); |
| +} |
| + |
| +void MultiBuffer::GlobalLRU::Prune(size_t max_to_free) { |
| + // Group the blocks by multibuffer, then free them. |
| + std::map<MultiBuffer*, std::vector<MultiBufferBlockId> > to_free; |
| + size_t freed = 0; |
| + while (data_size_ - freed > max_size_ && |
| + !lru_.Empty() && |
| + freed < max_to_free) { |
| + GlobalBlockId block_id = lru_.Pop(); |
| + to_free[block_id.first].push_back(block_id.second); |
| + freed--; |
|
DaleCurtis
2015/10/28 23:06:13
How does this work? freed is zero and doesn't seem
hubbe
2015/10/29 17:52:44
OOps, should be ++.
Added test as well.
|
| + } |
| + for (const auto &to_free_pair : to_free) { |
|
DaleCurtis
2015/10/28 23:06:13
auto&
hubbe
2015/10/29 17:52:44
Done.
|
| + to_free_pair.first->ReleaseBlocks(to_free_pair.second); |
| + } |
| +} |
| + |
| +// |
| +// MultiBuffer |
| +// |
| +MultiBuffer::MultiBuffer(int32_t block_size_shift, |
| + scoped_refptr<GlobalLRU> global_lru) : |
|
DaleCurtis
2015/10/28 23:06:13
const&
hubbe
2015/10/29 17:52:44
Done.
|
| + max_size_(0), |
| + block_size_shift_(block_size_shift), |
| + lru_(global_lru) { |
| +} |
| + |
| +MultiBuffer::~MultiBuffer() { |
| + // Delete all writers. |
| + for (const auto& i : writer_index_) { |
| + delete i.second; |
| + } |
| + // Remove all blocks from the LRU. |
| + for (const auto& i : data_) { |
| + lru_->Remove(this, i.first); |
| + } |
| + lru_->IncrementDataSize(-static_cast<int64_t>(data_.size())); |
| + lru_->IncrementMaxSize(-max_size_); |
| +} |
| + |
| +void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) { |
| + std::set<Reader*> &set_of_readers = readers_[pos]; |
| + bool already_waited_for = !set_of_readers.empty(); |
| + set_of_readers.insert(reader); |
| + |
| + if (already_waited_for || Contains(pos)) { |
| + return; |
| + } |
| + |
| + // We may need to create a new data provider to service this request. |
| + // Look for an existing data provider first. |
| + DataProvider* provider = NULL; |
| + BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); |
| + |
| + if (closest_writer > pos - kMaxWaitForWriterOffset) { |
| + auto i = present_.find(pos); |
| + BlockId closest_block; |
| + if (i.value()) { |
| + closest_block = pos; |
| + } else if (i == present_.begin()) { |
| + closest_block = -1; |
| + } else { |
| + closest_block = i.range_begin() - 1; |
| + } |
| + |
| + if (closest_writer > closest_block) { |
| + provider = writer_index_[closest_writer]; |
| + DCHECK(provider); |
| + } |
| + } |
| + if (!provider) { |
| + DCHECK(writer_index_.find(pos) == writer_index_.end()); |
| + provider = writer_index_[pos] = CreateWriter(pos); |
| + provider->SetAvailableCallback( |
| + base::Bind(&MultiBuffer::DataProviderEvent, |
| + base::Unretained(this), |
| + base::Unretained(provider))); |
| + } |
| + provider->SetDeferred(false); |
| +} |
| + |
| +void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) { |
| + auto i = readers_.find(pos); |
| + if (i == readers_.end()) |
| + return; |
| + i->second.erase(reader); |
| + if (i->second.empty()) { |
| + readers_.erase(i); |
| + } |
| +} |
| + |
| +void MultiBuffer::CleanupWriters(const BlockId& pos) { |
| + BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); |
| + if (closest_writer < 0) |
| + return; |
| + if (pos - closest_writer > kMaxWaitForWriterOffset) |
| + return; |
| + DCHECK(writer_index_[closest_writer]); |
| + DataProviderEvent(writer_index_[closest_writer]); |
| +} |
| + |
| +bool MultiBuffer::Contains(const BlockId& pos) const { |
| + DCHECK(present_[pos] == 0 || present_[pos] == 1) |
| + << " pos = " << pos |
| + << " present_[pos] " << present_[pos]; |
| + DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0); |
| + return !!present_[pos]; |
| +} |
| + |
| +MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const { |
| + auto i = present_.find(pos); |
| + if (i.value()) |
| + return i.range_end(); |
| + return pos; |
| +} |
| + |
| +void MultiBuffer::NotifyAvailableRange( |
| + const Range<MultiBufferBlockId>& observer_range, |
| + const Range<MultiBufferBlockId>& new_range) { |
| + std::set<Reader*> tmp; |
| + for (auto i = readers_.lower_bound(observer_range.begin); |
| + i != readers_.end() && i->first < observer_range.end; |
| + ++i) { |
| + tmp.insert(i->second.begin(), i->second.end()); |
| + } |
| + for (Reader* reader: tmp) { |
| + reader->NotifyAvailableRange(new_range); |
| + } |
| +} |
| + |
| +void MultiBuffer::ReleaseBlocks(const std::vector<MultiBufferBlockId> blocks) { |
| + RangeMap<BlockId, int32_t> freed; |
| + for (MultiBufferBlockId to_free : blocks) { |
|
DaleCurtis
2015/10/28 23:06:13
const auto&?
hubbe
2015/10/29 17:52:44
It's an integer type though, using "const auto&" s
|
| + DCHECK(data_[to_free]); |
| + DCHECK_EQ(pinned_[to_free], 0); |
| + DCHECK_EQ(present_[to_free], 1); |
| + data_.erase(to_free); |
| + freed.IncrementRange(to_free, to_free + 1, 1); |
| + present_.IncrementRange(to_free, to_free + 1, -1); |
| + } |
| + lru_->IncrementDataSize(-blocks.size()); |
| + |
| + for (const auto& freed_range : freed) { |
| + if (freed_range.second) { |
| + // Technically, there shouldn't be any observers in this range |
| + // as all observers really should be pinning the range where it's |
| + // actually observing. |
| + NotifyAvailableRange( |
| + freed_range.first, |
| + // Empty range. |
| + Range<BlockId>(freed_range.first.begin, |
| + freed_range.first.begin)); |
| + |
| + auto i = present_.find(freed_range.first.begin); |
| + DCHECK_EQ(i.value(), 0); |
| + DCHECK_LE(i.range_begin(), freed_range.first.begin); |
| + DCHECK_LE(freed_range.first.end, i.range_end()); |
| + |
| + if (i.range_begin() == freed_range.first.begin) { |
| + // Notify the previous range that it contains fewer blocks. |
| + auto j = i; |
| + --j; |
| + DCHECK_EQ(j.value(), 1); |
| + NotifyAvailableRange(j.range(), j.range()); |
| + } |
| + if (i.range_end() == freed_range.first.end) { |
| + // Notify the following range that it contains fewer blocks. |
| + auto j = i; |
| + ++j; |
| + DCHECK_EQ(j.value(), 1); |
| + NotifyAvailableRange(j.range(), j.range()); |
| + } |
| + } |
| + } |
| +} |
| + |
| +void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) { |
| + // If there is already a provider in the same location, we delete it. |
| + DCHECK(!provider->Available()); |
| + BlockId pos = provider->Tell(); |
| + DataProvider** place = &writer_index_[pos]; |
| + DCHECK_NE(*place, provider.get()); |
| + if (*place) delete *place; |
| + *place = provider.release(); |
| +} |
| + |
| +scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider( |
| + DataProvider *provider) { |
| + BlockId pos = provider->Tell(); |
| + DCHECK_EQ(writer_index_[pos], provider); |
| + writer_index_.erase(pos); |
| + return scoped_ptr<DataProvider>(provider); |
| +} |
| + |
| +MultiBuffer::ProviderState MultiBuffer::SuggestProviderState( |
| + const BlockId& pos) const { |
| + MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos); |
| + if (next_reader_pos != std::numeric_limits<MultiBufferBlockId>::max() && |
| + (next_reader_pos - pos <= kMaxWaitForWriterOffset || !RangeSupported())) { |
| + // Check if there is another writer between us and the next reader. |
| + MultiBufferBlockId next_writer_pos = ClosestNextEntry( |
| + writer_index_, pos + 1); |
| + if (next_writer_pos > next_reader_pos) { |
| + return ProviderStateLoad; |
| + } |
| + } |
| + |
| + MultiBufferBlockId previous_reader_pos = ClosestPreviousEntry( |
| + readers_, pos - 1); |
| + if (previous_reader_pos != std::numeric_limits<MultiBufferBlockId>::min() && |
| + (pos - previous_reader_pos <= kMaxWaitForReaderOffset || |
| + !RangeSupported())) { |
| + MultiBufferBlockId previous_writer_pos = |
| + ClosestPreviousEntry(writer_index_, pos - 1); |
| + if (previous_writer_pos < previous_reader_pos) { |
| + return ProviderStateDefer; |
| + } |
| + } |
| + |
| + return ProviderStateDead; |
| +} |
| + |
| +bool MultiBuffer::ProviderCollision(const BlockId& id) const { |
| + // If there is a writer at the same location, it is always a collision. |
| + if (writer_index_.find(id) != writer_index_.end()) |
| + return true; |
| + |
| + // Data already exists at providers current position, |
| + // if the URL supports ranges, we can kill the data provider. |
| + if (RangeSupported() && Contains(id)) |
| + return true; |
| + |
| + return false; |
| +} |
| + |
| +void MultiBuffer::Prune(size_t max_to_free) { |
| + lru_->Prune(max_to_free); |
| +} |
| + |
| +void MultiBuffer::DataProviderEvent(DataProvider *provider_tmp) { |
| + scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp)); |
| + BlockId start_pos = provider->Tell(); |
| + BlockId pos = start_pos; |
| + bool eof = false; |
| + int64_t blocks_added = -data_.size(); |
|
DaleCurtis
2015/10/28 23:06:13
-size_t might be problematic?
hubbe
2015/10/29 17:52:44
Assuming unary minus behaves anything like binary
hubbe
2015/10/29 21:45:05
Turns out you're right, - on size_t is problematic
|
| + |
| + while (!ProviderCollision(pos) && !eof) { |
| + if (!provider->Available()) { |
| + AddProvider(provider.Pass()); |
| + break; |
| + } |
| + DCHECK_GE(pos, 0); |
| + data_[pos] = provider->Read(); |
| + eof = data_[pos]->end_of_stream(); |
| + if (!pinned_[pos]) |
| + lru_->Use(this, pos); |
| + ++pos; |
| + } |
| + blocks_added += data_.size(); |
| + |
| + if (pos > start_pos) { |
| + present_.SetRange(start_pos, pos, 1); |
| + Range<BlockId> expanded_range = present_.find(start_pos).range(); |
| + NotifyAvailableRange(expanded_range, expanded_range); |
| + |
| + lru_->IncrementDataSize(blocks_added); |
| + Prune(blocks_added * kMaxFreesPerAdd + 1); |
| + } |
| + |
| + // Check that it's still there before we try to delete it. |
| + auto i = writer_index_.find(pos); |
| + if (i != writer_index_.end() && i->second == provider_tmp) { |
| + switch (SuggestProviderState(pos)) { |
| + case ProviderStateLoad: |
| + // Not sure we actually need to do this |
| + provider_tmp->SetDeferred(false); |
| + break; |
| + case ProviderStateDefer: |
| + provider_tmp->SetDeferred(true); |
| + break; |
| + case ProviderStateDead: |
| + RemoveProvider(provider_tmp); |
| + break; |
| + } |
| + } |
| +} |
| + |
| +void MultiBuffer::MergeFrom(MultiBuffer* other) { |
| + // Import data and update LRU. |
| + for (auto data : other->data_) { |
|
DaleCurtis
2015/10/28 23:06:13
const auto& here and the three below.
hubbe
2015/10/29 17:52:44
Done.
The range iterator used in the two cases be
|
| + if (data_.insert(std::make_pair(data.first, data.second)).second) { |
| + if (!pinned_[data.first]) { |
| + lru_->Insert(this, data.first); |
| + } |
| + } |
| + } |
| + // Update present_ |
| + for (auto r : other->present_) { |
| + if (r.second) { |
| + present_.SetRange(r.first.begin, r.first.end, 1); |
| + } |
| + } |
| + // Notify existing readers. |
| + auto last = present_.begin(); |
| + for (auto r : other->present_) { |
| + if (r.second) { |
| + auto i = present_.find(r.first.begin); |
| + if (i != last) { |
| + NotifyAvailableRange(i.range(), i.range()); |
| + last = i; |
| + } |
| + } |
| + } |
| +} |
| + |
| +void MultiBuffer::PinRange( |
| + const BlockId& from, const BlockId& to, int32_t how_much) { |
| + DCHECK_NE(how_much, 0); |
| + DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << how_much; |
| + pinned_.IncrementRange(from, to, how_much); |
| + Range<BlockId> modified_range(from, to); |
| + |
| + // Iterate over all the modified ranges and check if any of them have |
| + // transitioned in or out of the unlocked state. If so, we iterate over |
| + // all buffers in that range and add/remove them from the LRU as approperiate. |
| + // We iterate *backwards* through the ranges, with the idea that data in a |
| + // continous range should be freed from the end first. |
| + |
| + if (data_.empty()) |
| + return; |
| + |
| + auto range = pinned_.find(to - 1); |
| + while (1) { |
| + if (range.value() == 0 || range.value() == how_much) { |
| + bool pin = range.value() == how_much; |
| + Range<BlockId> transition_range = modified_range.Intersect(range.range()); |
| + BlockId transition_begin = std::max(range.range_begin(), from); |
| + BlockId transition_end = std::min(range.range_end(), to); |
| + if (transition_range.Empty()) |
| + break; |
| + |
| + // For each range that has transitioned to/from a pinned state, |
| + // we iterate over the corresponding ranges in |present_| to find |
| + // the blocks that are actually in the multibuffer. |
| + for (auto present_block_range = present_.find(transition_end - 1); |
|
DaleCurtis
2015/10/28 23:06:13
ditto.
hubbe
2015/10/29 17:52:44
Ditto what? This is an iterator...
DaleCurtis
2015/10/29 17:59:05
Ditto for const auto& unless you need the non-cons
hubbe
2015/10/29 21:45:05
A const iterator is an iterator to a const contain
|
| + present_block_range != present_.begin(); |
| + --present_block_range) { |
| + if (!present_block_range.value()) |
| + continue; |
| + Range<BlockId> present_transitioned_range = |
| + transition_range.Intersect(present_block_range.range()); |
| + if (present_transitioned_range.Empty()) |
| + break; |
| + for (BlockId block = present_transitioned_range.end - 1; |
| + block >= present_transitioned_range.begin; |
| + --block) { |
| + DCHECK_GE(block, 0); |
| + DCHECK(data_.find(block) != data_.end()); |
| + if (pin) { |
| + DCHECK(pinned_[block]); |
| + lru_->Remove(this, block); |
| + } else { |
| + DCHECK(!pinned_[block]); |
| + lru_->Insert(this, block); |
| + } |
| + } |
| + } |
| + } |
| + if (range == pinned_.begin()) break; |
| + --range; |
| + } |
| +} |
| + |
| +void MultiBuffer::PinRanges(const RangeMap<BlockId, int32_t>& ranges) { |
| + for (const auto& r : ranges) { |
| + if (r.second != 0) { |
| + PinRange(r.first.begin, r.first.end, r.second); |
| + } |
| + } |
| +} |
| + |
| +void MultiBuffer::IncrementMaxSize(int32_t size) { |
| + max_size_ += size; |
| + lru_->IncrementMaxSize(size); |
| + DCHECK_GE(max_size_, 0); |
| + // Pruning only happens when blocks are added. |
| +} |
| + |
| +} // namespace media |