OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "media/blink/multibuffer.h" |
| 6 |
| 7 #include "base/bind.h" |
| 8 |
| 9 namespace media { |
| 10 |
| 11 // Returns the block ID closest to (but less or equal than) |pos| from |index|. |
| 12 template <class T> |
| 13 static MultiBuffer::BlockId ClosestPreviousEntry( |
| 14 const std::map<MultiBuffer::BlockId, T>& index, |
| 15 MultiBuffer::BlockId pos) { |
| 16 auto i = index.upper_bound(pos); |
| 17 DCHECK(i == index.end() || i->first > pos); |
| 18 if (i == index.begin()) { |
| 19 return std::numeric_limits<MultiBufferBlockId>::min(); |
| 20 } |
| 21 --i; |
| 22 DCHECK_LE(i->first, pos); |
| 23 return i->first; |
| 24 } |
| 25 |
| 26 // Returns the block ID closest to (but greter than or equal to) |pos| |
| 27 // from |index|. |
| 28 template <class T> |
| 29 static MultiBuffer::BlockId ClosestNextEntry( |
| 30 const std::map<MultiBuffer::BlockId, T>& index, |
| 31 MultiBuffer::BlockId pos) { |
| 32 auto i = index.lower_bound(pos); |
| 33 if (i == index.end()) { |
| 34 return std::numeric_limits<MultiBufferBlockId>::max(); |
| 35 } |
| 36 DCHECK_GE(i->first, pos); |
| 37 return i->first; |
| 38 } |
| 39 |
| 40 // |
| 41 // MultiBuffer::GlobalLRU |
| 42 // |
| 43 MultiBuffer::GlobalLRU::GlobalLRU() : max_size_(0), data_size_(0) {} |
| 44 |
| 45 MultiBuffer::GlobalLRU::~GlobalLRU() { |
| 46 // By the time we're freed, all blocks should have been removed, |
| 47 // and our sums should be zero. |
| 48 DCHECK(lru_.Empty()); |
| 49 DCHECK_EQ(max_size_, 0); |
| 50 DCHECK_EQ(data_size_, 0); |
| 51 } |
| 52 |
| 53 void MultiBuffer::GlobalLRU::Use(MultiBuffer* multibuffer, |
| 54 MultiBufferBlockId block_id) { |
| 55 GlobalBlockId id(multibuffer, block_id); |
| 56 lru_.Use(id); |
| 57 } |
| 58 |
| 59 void MultiBuffer::GlobalLRU::Insert(MultiBuffer* multibuffer, |
| 60 MultiBufferBlockId block_id) { |
| 61 GlobalBlockId id(multibuffer, block_id); |
| 62 lru_.Insert(id); |
| 63 } |
| 64 |
| 65 void MultiBuffer::GlobalLRU::Remove(MultiBuffer* multibuffer, |
| 66 MultiBufferBlockId block_id) { |
| 67 GlobalBlockId id(multibuffer, block_id); |
| 68 lru_.Remove(id); |
| 69 } |
| 70 |
| 71 bool MultiBuffer::GlobalLRU::Contains(MultiBuffer* multibuffer, |
| 72 MultiBufferBlockId block_id) { |
| 73 GlobalBlockId id(multibuffer, block_id); |
| 74 return lru_.Contains(id); |
| 75 } |
| 76 |
| 77 void MultiBuffer::GlobalLRU::IncrementDataSize(int64_t blocks) { |
| 78 data_size_ += blocks; |
| 79 DCHECK_GE(data_size_, 0); |
| 80 } |
| 81 |
| 82 void MultiBuffer::GlobalLRU::IncrementMaxSize(int64_t blocks) { |
| 83 max_size_ += blocks; |
| 84 DCHECK_GE(max_size_, 0); |
| 85 } |
| 86 |
| 87 void MultiBuffer::GlobalLRU::Prune(int64_t max_to_free) { |
| 88 // We group the blocks by multibuffer so that we can free as many blocks as |
| 89 // possible in one call. This reduces the number of callbacks to clients |
| 90 // when their available ranges change. |
| 91 std::map<MultiBuffer*, std::vector<MultiBufferBlockId>> to_free; |
| 92 int64_t freed = 0; |
| 93 while (data_size_ - freed > max_size_ && !lru_.Empty() && |
| 94 freed < max_to_free) { |
| 95 GlobalBlockId block_id = lru_.Pop(); |
| 96 to_free[block_id.first].push_back(block_id.second); |
| 97 freed++; |
| 98 } |
| 99 for (const auto& to_free_pair : to_free) { |
| 100 to_free_pair.first->ReleaseBlocks(to_free_pair.second); |
| 101 } |
| 102 } |
| 103 |
| 104 int64_t MultiBuffer::GlobalLRU::Size() const { |
| 105 return lru_.Size(); |
| 106 } |
| 107 |
| 108 // |
| 109 // MultiBuffer |
| 110 // |
| 111 MultiBuffer::MultiBuffer(int32_t block_size_shift, |
| 112 const scoped_refptr<GlobalLRU>& global_lru) |
| 113 : max_size_(0), block_size_shift_(block_size_shift), lru_(global_lru) {} |
| 114 |
| 115 MultiBuffer::~MultiBuffer() { |
| 116 // Delete all writers. |
| 117 for (const auto& i : writer_index_) { |
| 118 delete i.second; |
| 119 } |
| 120 // Remove all blocks from the LRU. |
| 121 for (const auto& i : data_) { |
| 122 lru_->Remove(this, i.first); |
| 123 } |
| 124 lru_->IncrementDataSize(-static_cast<int64_t>(data_.size())); |
| 125 lru_->IncrementMaxSize(-max_size_); |
| 126 } |
| 127 |
| 128 void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) { |
| 129 std::set<Reader*>* set_of_readers = &readers_[pos]; |
| 130 bool already_waited_for = !set_of_readers->empty(); |
| 131 set_of_readers->insert(reader); |
| 132 |
| 133 if (already_waited_for || Contains(pos)) { |
| 134 return; |
| 135 } |
| 136 |
| 137 // We may need to create a new data provider to service this request. |
| 138 // Look for an existing data provider first. |
| 139 DataProvider* provider = nullptr; |
| 140 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); |
| 141 |
| 142 if (closest_writer > pos - kMaxWaitForWriterOffset) { |
| 143 auto i = present_.find(pos); |
| 144 BlockId closest_block; |
| 145 if (i.value()) { |
| 146 // Shouldn't happen, we already tested that Contains(pos) is true. |
| 147 NOTREACHED(); |
| 148 closest_block = pos; |
| 149 } else if (i == present_.begin()) { |
| 150 closest_block = -1; |
| 151 } else { |
| 152 closest_block = i.interval_begin() - 1; |
| 153 } |
| 154 |
| 155 // Make sure that there are no present blocks between the writer and |
| 156 // the requested position, as that will cause the writer to quit. |
| 157 if (closest_writer > closest_block) { |
| 158 provider = writer_index_[closest_writer]; |
| 159 DCHECK(provider); |
| 160 } |
| 161 } |
| 162 if (!provider) { |
| 163 DCHECK(writer_index_.find(pos) == writer_index_.end()); |
| 164 provider = writer_index_[pos] = CreateWriter(pos); |
| 165 provider->SetAvailableCallback(base::Bind( |
| 166 &MultiBuffer::DataProviderEvent, base::Unretained(this), provider)); |
| 167 } |
| 168 provider->SetDeferred(false); |
| 169 } |
| 170 |
| 171 void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) { |
| 172 auto i = readers_.find(pos); |
| 173 if (i == readers_.end()) |
| 174 return; |
| 175 i->second.erase(reader); |
| 176 if (i->second.empty()) { |
| 177 readers_.erase(i); |
| 178 } |
| 179 } |
| 180 |
| 181 void MultiBuffer::CleanupWriters(const BlockId& pos) { |
| 182 BlockId p2 = pos + kMaxWaitForReaderOffset; |
| 183 BlockId closest_writer = ClosestPreviousEntry(writer_index_, p2); |
| 184 while (closest_writer > pos - kMaxWaitForWriterOffset) { |
| 185 DCHECK(writer_index_[closest_writer]); |
| 186 DataProviderEvent(writer_index_[closest_writer]); |
| 187 closest_writer = ClosestPreviousEntry(writer_index_, closest_writer - 1); |
| 188 } |
| 189 } |
| 190 |
| 191 bool MultiBuffer::Contains(const BlockId& pos) const { |
| 192 DCHECK(present_[pos] == 0 || present_[pos] == 1) |
| 193 << " pos = " << pos << " present_[pos] " << present_[pos]; |
| 194 DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0); |
| 195 return !!present_[pos]; |
| 196 } |
| 197 |
| 198 MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const { |
| 199 auto i = present_.find(pos); |
| 200 if (i.value()) |
| 201 return i.interval_end(); |
| 202 return pos; |
| 203 } |
| 204 |
| 205 void MultiBuffer::NotifyAvailableRange( |
| 206 const Interval<MultiBufferBlockId>& observer_range, |
| 207 const Interval<MultiBufferBlockId>& new_range) { |
| 208 std::set<Reader*> tmp; |
| 209 for (auto i = readers_.lower_bound(observer_range.begin); |
| 210 i != readers_.end() && i->first < observer_range.end; ++i) { |
| 211 tmp.insert(i->second.begin(), i->second.end()); |
| 212 } |
| 213 for (Reader* reader : tmp) { |
| 214 reader->NotifyAvailableRange(new_range); |
| 215 } |
| 216 } |
| 217 |
| 218 void MultiBuffer::ReleaseBlocks(const std::vector<MultiBufferBlockId>& blocks) { |
| 219 IntervalMap<BlockId, int32_t> freed; |
| 220 for (MultiBufferBlockId to_free : blocks) { |
| 221 DCHECK(data_[to_free]); |
| 222 DCHECK_EQ(pinned_[to_free], 0); |
| 223 DCHECK_EQ(present_[to_free], 1); |
| 224 data_.erase(to_free); |
| 225 freed.IncrementInterval(to_free, to_free + 1, 1); |
| 226 present_.IncrementInterval(to_free, to_free + 1, -1); |
| 227 } |
| 228 lru_->IncrementDataSize(-static_cast<int64_t>(blocks.size())); |
| 229 |
| 230 for (const auto& freed_range : freed) { |
| 231 if (freed_range.second) { |
| 232 // Technically, there shouldn't be any observers in this range |
| 233 // as all observers really should be pinning the range where it's |
| 234 // actually observing. |
| 235 NotifyAvailableRange( |
| 236 freed_range.first, |
| 237 // Empty range. |
| 238 Interval<BlockId>(freed_range.first.begin, freed_range.first.begin)); |
| 239 |
| 240 auto i = present_.find(freed_range.first.begin); |
| 241 DCHECK_EQ(i.value(), 0); |
| 242 DCHECK_LE(i.interval_begin(), freed_range.first.begin); |
| 243 DCHECK_LE(freed_range.first.end, i.interval_end()); |
| 244 |
| 245 if (i.interval_begin() == freed_range.first.begin) { |
| 246 // Notify the previous range that it contains fewer blocks. |
| 247 auto j = i; |
| 248 --j; |
| 249 DCHECK_EQ(j.value(), 1); |
| 250 NotifyAvailableRange(j.interval(), j.interval()); |
| 251 } |
| 252 if (i.interval_end() == freed_range.first.end) { |
| 253 // Notify the following range that it contains fewer blocks. |
| 254 auto j = i; |
| 255 ++j; |
| 256 DCHECK_EQ(j.value(), 1); |
| 257 NotifyAvailableRange(j.interval(), j.interval()); |
| 258 } |
| 259 } |
| 260 } |
| 261 } |
| 262 |
| 263 void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) { |
| 264 // If there is already a provider in the same location, we delete it. |
| 265 DCHECK(!provider->Available()); |
| 266 BlockId pos = provider->Tell(); |
| 267 DataProvider** place = &writer_index_[pos]; |
| 268 DCHECK_NE(*place, provider.get()); |
| 269 if (*place) |
| 270 delete *place; |
| 271 *place = provider.release(); |
| 272 } |
| 273 |
| 274 scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider( |
| 275 DataProvider* provider) { |
| 276 BlockId pos = provider->Tell(); |
| 277 DCHECK_EQ(writer_index_[pos], provider); |
| 278 writer_index_.erase(pos); |
| 279 return scoped_ptr<DataProvider>(provider); |
| 280 } |
| 281 |
| 282 MultiBuffer::ProviderState MultiBuffer::SuggestProviderState( |
| 283 const BlockId& pos) const { |
| 284 MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos); |
| 285 if (next_reader_pos != std::numeric_limits<MultiBufferBlockId>::max() && |
| 286 (next_reader_pos - pos <= kMaxWaitForWriterOffset || !RangeSupported())) { |
| 287 // Check if there is another writer between us and the next reader. |
| 288 MultiBufferBlockId next_writer_pos = |
| 289 ClosestNextEntry(writer_index_, pos + 1); |
| 290 if (next_writer_pos > next_reader_pos) { |
| 291 return ProviderStateLoad; |
| 292 } |
| 293 } |
| 294 |
| 295 MultiBufferBlockId previous_reader_pos = |
| 296 ClosestPreviousEntry(readers_, pos - 1); |
| 297 if (previous_reader_pos != std::numeric_limits<MultiBufferBlockId>::min() && |
| 298 (pos - previous_reader_pos <= kMaxWaitForReaderOffset || |
| 299 !RangeSupported())) { |
| 300 MultiBufferBlockId previous_writer_pos = |
| 301 ClosestPreviousEntry(writer_index_, pos - 1); |
| 302 if (previous_writer_pos < previous_reader_pos) { |
| 303 return ProviderStateDefer; |
| 304 } |
| 305 } |
| 306 |
| 307 return ProviderStateDead; |
| 308 } |
| 309 |
| 310 bool MultiBuffer::ProviderCollision(const BlockId& id) const { |
| 311 // If there is a writer at the same location, it is always a collision. |
| 312 if (writer_index_.find(id) != writer_index_.end()) |
| 313 return true; |
| 314 |
| 315 // Data already exists at providers current position, |
| 316 // if the URL supports ranges, we can kill the data provider. |
| 317 if (RangeSupported() && Contains(id)) |
| 318 return true; |
| 319 |
| 320 return false; |
| 321 } |
| 322 |
| 323 void MultiBuffer::Prune(size_t max_to_free) { |
| 324 lru_->Prune(max_to_free); |
| 325 } |
| 326 |
| 327 void MultiBuffer::DataProviderEvent(DataProvider* provider_tmp) { |
| 328 scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp)); |
| 329 BlockId start_pos = provider->Tell(); |
| 330 BlockId pos = start_pos; |
| 331 bool eof = false; |
| 332 int64_t blocks_before = data_.size(); |
| 333 |
| 334 while (!ProviderCollision(pos) && !eof) { |
| 335 if (!provider->Available()) { |
| 336 AddProvider(provider.Pass()); |
| 337 break; |
| 338 } |
| 339 DCHECK_GE(pos, 0); |
| 340 scoped_refptr<DataBuffer> data = provider->Read(); |
| 341 data_[pos] = data; |
| 342 eof = data->end_of_stream(); |
| 343 if (!pinned_[pos]) |
| 344 lru_->Use(this, pos); |
| 345 ++pos; |
| 346 } |
| 347 int64_t blocks_after = data_.size(); |
| 348 int64_t blocks_added = blocks_after - blocks_before; |
| 349 |
| 350 if (pos > start_pos) { |
| 351 present_.SetInterval(start_pos, pos, 1); |
| 352 Interval<BlockId> expanded_range = present_.find(start_pos).interval(); |
| 353 NotifyAvailableRange(expanded_range, expanded_range); |
| 354 |
| 355 lru_->IncrementDataSize(blocks_added); |
| 356 Prune(blocks_added * kMaxFreesPerAdd + 1); |
| 357 } |
| 358 |
| 359 // Check that it's still there before we try to delete it. |
| 360 // In case of EOF or a collision, we might not have called AddProvider above. |
| 361 // Even if we did call AddProvider, calling NotifyAvailableRange can cause |
| 362 // readers to seek or self-destruct and clean up any associated writers. |
| 363 auto i = writer_index_.find(pos); |
| 364 if (i != writer_index_.end() && i->second == provider_tmp) { |
| 365 switch (SuggestProviderState(pos)) { |
| 366 case ProviderStateLoad: |
| 367 // Not sure we actually need to do this |
| 368 provider_tmp->SetDeferred(false); |
| 369 break; |
| 370 case ProviderStateDefer: |
| 371 provider_tmp->SetDeferred(true); |
| 372 break; |
| 373 case ProviderStateDead: |
| 374 RemoveProvider(provider_tmp); |
| 375 break; |
| 376 } |
| 377 } |
| 378 } |
| 379 |
| 380 void MultiBuffer::MergeFrom(MultiBuffer* other) { |
| 381 // Import data and update LRU. |
| 382 for (const auto& data : other->data_) { |
| 383 if (data_.insert(std::make_pair(data.first, data.second)).second) { |
| 384 if (!pinned_[data.first]) { |
| 385 lru_->Insert(this, data.first); |
| 386 } |
| 387 } |
| 388 } |
| 389 // Update present_ |
| 390 for (const auto& r : other->present_) { |
| 391 if (r.second) { |
| 392 present_.SetInterval(r.first.begin, r.first.end, 1); |
| 393 } |
| 394 } |
| 395 // Notify existing readers. |
| 396 auto last = present_.begin(); |
| 397 for (const auto& r : other->present_) { |
| 398 if (r.second) { |
| 399 auto i = present_.find(r.first.begin); |
| 400 if (i != last) { |
| 401 NotifyAvailableRange(i.interval(), i.interval()); |
| 402 last = i; |
| 403 } |
| 404 } |
| 405 } |
| 406 } |
| 407 |
| 408 void MultiBuffer::PinRange(const BlockId& from, |
| 409 const BlockId& to, |
| 410 int32_t how_much) { |
| 411 DCHECK_NE(how_much, 0); |
| 412 DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << how_much; |
| 413 pinned_.IncrementInterval(from, to, how_much); |
| 414 Interval<BlockId> modified_range(from, to); |
| 415 |
| 416 // Iterate over all the modified ranges and check if any of them have |
| 417 // transitioned in or out of the unlocked state. If so, we iterate over |
| 418 // all buffers in that range and add/remove them from the LRU as approperiate. |
| 419 // We iterate *backwards* through the ranges, with the idea that data in a |
| 420 // continous range should be freed from the end first. |
| 421 |
| 422 if (data_.empty()) |
| 423 return; |
| 424 |
| 425 auto range = pinned_.find(to - 1); |
| 426 while (1) { |
| 427 if (range.value() == 0 || range.value() == how_much) { |
| 428 bool pin = range.value() == how_much; |
| 429 Interval<BlockId> transition_range = |
| 430 modified_range.Intersect(range.interval()); |
| 431 if (transition_range.Empty()) |
| 432 break; |
| 433 |
| 434 // For each range that has transitioned to/from a pinned state, |
| 435 // we iterate over the corresponding ranges in |present_| to find |
| 436 // the blocks that are actually in the multibuffer. |
| 437 for (auto present_block_range = present_.find(transition_range.end - 1); |
| 438 present_block_range != present_.begin(); --present_block_range) { |
| 439 if (!present_block_range.value()) |
| 440 continue; |
| 441 Interval<BlockId> present_transitioned_range = |
| 442 transition_range.Intersect(present_block_range.interval()); |
| 443 if (present_transitioned_range.Empty()) |
| 444 break; |
| 445 for (BlockId block = present_transitioned_range.end - 1; |
| 446 block >= present_transitioned_range.begin; --block) { |
| 447 DCHECK_GE(block, 0); |
| 448 DCHECK(data_.find(block) != data_.end()); |
| 449 if (pin) { |
| 450 DCHECK(pinned_[block]); |
| 451 lru_->Remove(this, block); |
| 452 } else { |
| 453 DCHECK(!pinned_[block]); |
| 454 lru_->Insert(this, block); |
| 455 } |
| 456 } |
| 457 } |
| 458 } |
| 459 if (range == pinned_.begin()) |
| 460 break; |
| 461 --range; |
| 462 } |
| 463 } |
| 464 |
| 465 void MultiBuffer::PinRanges(const IntervalMap<BlockId, int32_t>& ranges) { |
| 466 for (const auto& r : ranges) { |
| 467 if (r.second != 0) { |
| 468 PinRange(r.first.begin, r.first.end, r.second); |
| 469 } |
| 470 } |
| 471 } |
| 472 |
| 473 void MultiBuffer::IncrementMaxSize(int32_t size) { |
| 474 max_size_ += size; |
| 475 lru_->IncrementMaxSize(size); |
| 476 DCHECK_GE(max_size_, 0); |
| 477 // Pruning only happens when blocks are added. |
| 478 } |
| 479 |
| 480 } // namespace media |
OLD | NEW |