OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "media/blink/multibuffer.h" | |
6 | |
7 #include "base/bind.h" | |
8 | |
9 namespace media { | |
10 | |
11 // Returns the block ID closest to (but less or equal than) |pos| from |index|. | |
12 template <class T> | |
13 static MultiBuffer::BlockId ClosestPreviousEntry( | |
14 const std::map<MultiBuffer::BlockId, T>& index, | |
15 MultiBuffer::BlockId pos) { | |
16 auto i = index.upper_bound(pos); | |
17 DCHECK(i == index.end() || i->first > pos); | |
18 if (i == index.begin()) { | |
19 return std::numeric_limits<MultiBufferBlockId>::min(); | |
20 } | |
21 --i; | |
22 DCHECK_LE(i->first, pos); | |
23 return i->first; | |
24 } | |
25 | |
26 // Returns the block ID closest to (but greter than or equal to) |pos| | |
27 // from |index|. | |
28 template <class T> | |
29 static MultiBuffer::BlockId ClosestNextEntry( | |
30 const std::map<MultiBuffer::BlockId, T>& index, | |
31 MultiBuffer::BlockId pos) { | |
32 auto i = index.lower_bound(pos); | |
33 if (i == index.end()) { | |
34 return std::numeric_limits<MultiBufferBlockId>::max(); | |
35 } | |
36 DCHECK_GE(i->first, pos); | |
37 return i->first; | |
38 } | |
39 | |
40 // | |
41 // MultiBuffer::GlobalLRU | |
42 // | |
43 MultiBuffer::GlobalLRU::GlobalLRU() : max_size_(0), data_size_(0) {} | |
44 | |
45 MultiBuffer::GlobalLRU::~GlobalLRU() { | |
46 // By the time we're freed, all blocks should have been removed, | |
47 // and our sums should be zero. | |
48 DCHECK(lru_.Empty()); | |
49 DCHECK_EQ(max_size_, 0); | |
50 DCHECK_EQ(data_size_, 0); | |
51 } | |
52 | |
53 void MultiBuffer::GlobalLRU::Use(MultiBuffer* multibuffer, | |
54 MultiBufferBlockId block_id) { | |
55 GlobalBlockId id(multibuffer, block_id); | |
56 lru_.Use(id); | |
57 } | |
58 | |
59 void MultiBuffer::GlobalLRU::Insert(MultiBuffer* multibuffer, | |
60 MultiBufferBlockId block_id) { | |
61 GlobalBlockId id(multibuffer, block_id); | |
62 lru_.Insert(id); | |
63 } | |
64 | |
65 void MultiBuffer::GlobalLRU::Remove(MultiBuffer* multibuffer, | |
66 MultiBufferBlockId block_id) { | |
67 GlobalBlockId id(multibuffer, block_id); | |
68 lru_.Remove(id); | |
69 } | |
70 | |
71 bool MultiBuffer::GlobalLRU::Contains(MultiBuffer* multibuffer, | |
72 MultiBufferBlockId block_id) { | |
73 GlobalBlockId id(multibuffer, block_id); | |
74 return lru_.Contains(id); | |
75 } | |
76 | |
77 void MultiBuffer::GlobalLRU::IncrementDataSize(int64_t blocks) { | |
78 data_size_ += blocks; | |
79 DCHECK_GE(data_size_, 0); | |
80 } | |
81 | |
82 void MultiBuffer::GlobalLRU::IncrementMaxSize(int64_t blocks) { | |
83 max_size_ += blocks; | |
84 DCHECK_GE(max_size_, 0); | |
85 } | |
86 | |
87 void MultiBuffer::GlobalLRU::Prune(size_t max_to_free) { | |
88 // Group the blocks by multibuffer, then free them. | |
89 std::map<MultiBuffer*, std::vector<MultiBufferBlockId>> to_free; | |
90 size_t freed = 0; | |
91 while (data_size_ - freed > max_size_ && !lru_.Empty() && | |
92 freed < max_to_free) { | |
93 GlobalBlockId block_id = lru_.Pop(); | |
94 to_free[block_id.first].push_back(block_id.second); | |
95 freed++; | |
96 } | |
97 for (const auto& to_free_pair : to_free) { | |
98 to_free_pair.first->ReleaseBlocks(to_free_pair.second); | |
99 } | |
100 } | |
101 | |
102 size_t MultiBuffer::GlobalLRU::Size() const { | |
103 return lru_.Size(); | |
104 } | |
105 | |
106 // | |
107 // MultiBuffer | |
108 // | |
109 MultiBuffer::MultiBuffer(int32_t block_size_shift, | |
110 const scoped_refptr<GlobalLRU>& global_lru) | |
111 : max_size_(0), block_size_shift_(block_size_shift), lru_(global_lru) {} | |
112 | |
113 MultiBuffer::~MultiBuffer() { | |
114 // Delete all writers. | |
115 for (const auto& i : writer_index_) { | |
116 delete i.second; | |
117 } | |
118 // Remove all blocks from the LRU. | |
119 for (const auto& i : data_) { | |
120 lru_->Remove(this, i.first); | |
121 } | |
122 lru_->IncrementDataSize(-static_cast<int64_t>(data_.size())); | |
123 lru_->IncrementMaxSize(-max_size_); | |
124 } | |
125 | |
126 void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) { | |
127 std::set<Reader*>& set_of_readers = readers_[pos]; | |
128 bool already_waited_for = !set_of_readers.empty(); | |
129 set_of_readers.insert(reader); | |
130 | |
131 if (already_waited_for || Contains(pos)) { | |
132 return; | |
133 } | |
134 | |
135 // We may need to create a new data provider to service this request. | |
136 // Look for an existing data provider first. | |
137 DataProvider* provider = NULL; | |
138 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); | |
139 | |
140 if (closest_writer > pos - kMaxWaitForWriterOffset) { | |
141 auto i = present_.find(pos); | |
142 BlockId closest_block; | |
143 if (i.value()) { | |
144 closest_block = pos; | |
145 } else if (i == present_.begin()) { | |
146 closest_block = -1; | |
147 } else { | |
148 closest_block = i.range_begin() - 1; | |
149 } | |
150 | |
151 if (closest_writer > closest_block) { | |
152 provider = writer_index_[closest_writer]; | |
153 DCHECK(provider); | |
154 } | |
155 } | |
156 if (!provider) { | |
157 DCHECK(writer_index_.find(pos) == writer_index_.end()); | |
158 provider = writer_index_[pos] = CreateWriter(pos); | |
159 provider->SetAvailableCallback(base::Bind(&MultiBuffer::DataProviderEvent, | |
160 base::Unretained(this), | |
161 base::Unretained(provider))); | |
162 } | |
163 provider->SetDeferred(false); | |
164 } | |
165 | |
166 void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) { | |
167 auto i = readers_.find(pos); | |
168 if (i == readers_.end()) | |
169 return; | |
170 i->second.erase(reader); | |
171 if (i->second.empty()) { | |
172 readers_.erase(i); | |
173 } | |
174 } | |
175 | |
176 void MultiBuffer::CleanupWriters(const BlockId& pos) { | |
177 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos); | |
178 if (closest_writer < 0) | |
179 return; | |
180 if (pos - closest_writer > kMaxWaitForWriterOffset) | |
181 return; | |
182 DCHECK(writer_index_[closest_writer]); | |
183 DataProviderEvent(writer_index_[closest_writer]); | |
184 } | |
185 | |
186 bool MultiBuffer::Contains(const BlockId& pos) const { | |
187 DCHECK(present_[pos] == 0 || present_[pos] == 1) | |
188 << " pos = " << pos << " present_[pos] " << present_[pos]; | |
189 DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0); | |
190 return !!present_[pos]; | |
191 } | |
192 | |
193 MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const { | |
194 auto i = present_.find(pos); | |
195 if (i.value()) | |
196 return i.range_end(); | |
197 return pos; | |
198 } | |
199 | |
200 void MultiBuffer::NotifyAvailableRange( | |
201 const Range<MultiBufferBlockId>& observer_range, | |
202 const Range<MultiBufferBlockId>& new_range) { | |
203 std::set<Reader*> tmp; | |
204 for (auto i = readers_.lower_bound(observer_range.begin); | |
205 i != readers_.end() && i->first < observer_range.end; ++i) { | |
206 tmp.insert(i->second.begin(), i->second.end()); | |
207 } | |
208 for (Reader* reader : tmp) { | |
209 reader->NotifyAvailableRange(new_range); | |
210 } | |
211 } | |
212 | |
213 void MultiBuffer::ReleaseBlocks(const std::vector<MultiBufferBlockId> blocks) { | |
214 RangeMap<BlockId, int32_t> freed; | |
215 for (MultiBufferBlockId to_free : blocks) { | |
216 DCHECK(data_[to_free]); | |
217 DCHECK_EQ(pinned_[to_free], 0); | |
218 DCHECK_EQ(present_[to_free], 1); | |
219 data_.erase(to_free); | |
220 freed.IncrementRange(to_free, to_free + 1, 1); | |
221 present_.IncrementRange(to_free, to_free + 1, -1); | |
222 } | |
223 lru_->IncrementDataSize(-blocks.size()); | |
224 | |
225 for (const auto& freed_range : freed) { | |
226 if (freed_range.second) { | |
227 // Technically, there shouldn't be any observers in this range | |
228 // as all observers really should be pinning the range where it's | |
229 // actually observing. | |
230 NotifyAvailableRange( | |
231 freed_range.first, | |
232 // Empty range. | |
233 Range<BlockId>(freed_range.first.begin, freed_range.first.begin)); | |
234 | |
235 auto i = present_.find(freed_range.first.begin); | |
236 DCHECK_EQ(i.value(), 0); | |
237 DCHECK_LE(i.range_begin(), freed_range.first.begin); | |
238 DCHECK_LE(freed_range.first.end, i.range_end()); | |
239 | |
240 if (i.range_begin() == freed_range.first.begin) { | |
241 // Notify the previous range that it contains fewer blocks. | |
242 auto j = i; | |
243 --j; | |
244 DCHECK_EQ(j.value(), 1); | |
245 NotifyAvailableRange(j.range(), j.range()); | |
246 } | |
247 if (i.range_end() == freed_range.first.end) { | |
248 // Notify the following range that it contains fewer blocks. | |
249 auto j = i; | |
250 ++j; | |
251 DCHECK_EQ(j.value(), 1); | |
252 NotifyAvailableRange(j.range(), j.range()); | |
253 } | |
254 } | |
255 } | |
256 } | |
257 | |
258 void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) { | |
259 // If there is already a provider in the same location, we delete it. | |
260 DCHECK(!provider->Available()); | |
261 BlockId pos = provider->Tell(); | |
262 DataProvider** place = &writer_index_[pos]; | |
263 DCHECK_NE(*place, provider.get()); | |
264 if (*place) | |
265 delete *place; | |
266 *place = provider.release(); | |
267 } | |
268 | |
269 scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider( | |
270 DataProvider* provider) { | |
271 BlockId pos = provider->Tell(); | |
272 DCHECK_EQ(writer_index_[pos], provider); | |
273 writer_index_.erase(pos); | |
274 return scoped_ptr<DataProvider>(provider); | |
275 } | |
276 | |
277 MultiBuffer::ProviderState MultiBuffer::SuggestProviderState( | |
278 const BlockId& pos) const { | |
279 MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos); | |
280 if (next_reader_pos != std::numeric_limits<MultiBufferBlockId>::max() && | |
281 (next_reader_pos - pos <= kMaxWaitForWriterOffset || !RangeSupported())) { | |
282 // Check if there is another writer between us and the next reader. | |
283 MultiBufferBlockId next_writer_pos = | |
284 ClosestNextEntry(writer_index_, pos + 1); | |
285 if (next_writer_pos > next_reader_pos) { | |
286 return ProviderStateLoad; | |
287 } | |
288 } | |
289 | |
290 MultiBufferBlockId previous_reader_pos = | |
291 ClosestPreviousEntry(readers_, pos - 1); | |
292 if (previous_reader_pos != std::numeric_limits<MultiBufferBlockId>::min() && | |
293 (pos - previous_reader_pos <= kMaxWaitForReaderOffset || | |
294 !RangeSupported())) { | |
295 MultiBufferBlockId previous_writer_pos = | |
296 ClosestPreviousEntry(writer_index_, pos - 1); | |
297 if (previous_writer_pos < previous_reader_pos) { | |
298 return ProviderStateDefer; | |
299 } | |
300 } | |
301 | |
302 return ProviderStateDead; | |
303 } | |
304 | |
305 bool MultiBuffer::ProviderCollision(const BlockId& id) const { | |
306 // If there is a writer at the same location, it is always a collision. | |
307 if (writer_index_.find(id) != writer_index_.end()) | |
308 return true; | |
309 | |
310 // Data already exists at providers current position, | |
311 // if the URL supports ranges, we can kill the data provider. | |
312 if (RangeSupported() && Contains(id)) | |
313 return true; | |
314 | |
315 return false; | |
316 } | |
317 | |
318 void MultiBuffer::Prune(size_t max_to_free) { | |
319 lru_->Prune(max_to_free); | |
320 } | |
321 | |
322 void MultiBuffer::DataProviderEvent(DataProvider* provider_tmp) { | |
323 scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp)); | |
324 BlockId start_pos = provider->Tell(); | |
325 BlockId pos = start_pos; | |
326 bool eof = false; | |
327 int64_t blocks_before = data_.size(); | |
328 | |
329 while (!ProviderCollision(pos) && !eof) { | |
330 if (!provider->Available()) { | |
331 AddProvider(provider.Pass()); | |
332 break; | |
333 } | |
334 DCHECK_GE(pos, 0); | |
335 data_[pos] = provider->Read(); | |
336 eof = data_[pos]->end_of_stream(); | |
liberato (no reviews please)
2015/10/29 18:44:00
eof = (data_[pos] = provider->Read()); to save a l
hubbe
2015/10/29 21:45:06
Saved the lookup slightly differently, hope that's
| |
337 if (!pinned_[pos]) | |
338 lru_->Use(this, pos); | |
339 ++pos; | |
340 } | |
341 int64_t blocks_after = data_.size(); | |
342 int64_t blocks_added = blocks_after - blocks_before; | |
343 | |
344 if (pos > start_pos) { | |
345 present_.SetRange(start_pos, pos, 1); | |
346 Range<BlockId> expanded_range = present_.find(start_pos).range(); | |
347 NotifyAvailableRange(expanded_range, expanded_range); | |
348 | |
349 lru_->IncrementDataSize(blocks_added); | |
350 Prune(blocks_added * kMaxFreesPerAdd + 1); | |
351 } | |
352 | |
353 // Check that it's still there before we try to delete it. | |
354 auto i = writer_index_.find(pos); | |
liberato (no reviews please)
2015/10/29 18:44:00
how is it removed if we AddProvider() it above? i
hubbe
2015/10/29 21:45:06
Comment added.
| |
355 if (i != writer_index_.end() && i->second == provider_tmp) { | |
356 switch (SuggestProviderState(pos)) { | |
357 case ProviderStateLoad: | |
358 // Not sure we actually need to do this | |
359 provider_tmp->SetDeferred(false); | |
360 break; | |
361 case ProviderStateDefer: | |
362 provider_tmp->SetDeferred(true); | |
363 break; | |
364 case ProviderStateDead: | |
365 RemoveProvider(provider_tmp); | |
366 break; | |
367 } | |
368 } | |
369 } | |
370 | |
371 void MultiBuffer::MergeFrom(MultiBuffer* other) { | |
372 // Import data and update LRU. | |
373 for (const auto& data : other->data_) { | |
374 if (data_.insert(std::make_pair(data.first, data.second)).second) { | |
375 if (!pinned_[data.first]) { | |
376 lru_->Insert(this, data.first); | |
377 } | |
378 } | |
379 } | |
380 // Update present_ | |
381 for (const auto& r : other->present_) { | |
382 if (r.second) { | |
383 present_.SetRange(r.first.begin, r.first.end, 1); | |
384 } | |
385 } | |
386 // Notify existing readers. | |
387 auto last = present_.begin(); | |
388 for (const auto& r : other->present_) { | |
389 if (r.second) { | |
390 auto i = present_.find(r.first.begin); | |
391 if (i != last) { | |
392 NotifyAvailableRange(i.range(), i.range()); | |
393 last = i; | |
394 } | |
395 } | |
396 } | |
397 } | |
398 | |
399 void MultiBuffer::PinRange(const BlockId& from, | |
400 const BlockId& to, | |
401 int32_t how_much) { | |
402 DCHECK_NE(how_much, 0); | |
403 DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << how_much; | |
404 pinned_.IncrementRange(from, to, how_much); | |
405 Range<BlockId> modified_range(from, to); | |
406 | |
407 // Iterate over all the modified ranges and check if any of them have | |
408 // transitioned in or out of the unlocked state. If so, we iterate over | |
409 // all buffers in that range and add/remove them from the LRU as approperiate. | |
410 // We iterate *backwards* through the ranges, with the idea that data in a | |
411 // continous range should be freed from the end first. | |
412 | |
413 if (data_.empty()) | |
414 return; | |
415 | |
416 auto range = pinned_.find(to - 1); | |
417 while (1) { | |
418 if (range.value() == 0 || range.value() == how_much) { | |
419 bool pin = range.value() == how_much; | |
420 Range<BlockId> transition_range = modified_range.Intersect(range.range()); | |
421 BlockId transition_begin = std::max(range.range_begin(), from); | |
422 BlockId transition_end = std::min(range.range_end(), to); | |
423 if (transition_range.Empty()) | |
424 break; | |
425 | |
426 // For each range that has transitioned to/from a pinned state, | |
427 // we iterate over the corresponding ranges in |present_| to find | |
428 // the blocks that are actually in the multibuffer. | |
429 for (auto present_block_range = present_.find(transition_end - 1); | |
430 present_block_range != present_.begin(); --present_block_range) { | |
431 if (!present_block_range.value()) | |
432 continue; | |
433 Range<BlockId> present_transitioned_range = | |
434 transition_range.Intersect(present_block_range.range()); | |
435 if (present_transitioned_range.Empty()) | |
436 break; | |
437 for (BlockId block = present_transitioned_range.end - 1; | |
438 block >= present_transitioned_range.begin; --block) { | |
439 DCHECK_GE(block, 0); | |
440 DCHECK(data_.find(block) != data_.end()); | |
441 if (pin) { | |
442 DCHECK(pinned_[block]); | |
443 lru_->Remove(this, block); | |
444 } else { | |
445 DCHECK(!pinned_[block]); | |
446 lru_->Insert(this, block); | |
447 } | |
448 } | |
449 } | |
450 } | |
451 if (range == pinned_.begin()) | |
452 break; | |
453 --range; | |
454 } | |
455 } | |
456 | |
457 void MultiBuffer::PinRanges(const RangeMap<BlockId, int32_t>& ranges) { | |
458 for (const auto& r : ranges) { | |
459 if (r.second != 0) { | |
460 PinRange(r.first.begin, r.first.end, r.second); | |
461 } | |
462 } | |
463 } | |
464 | |
465 void MultiBuffer::IncrementMaxSize(int32_t size) { | |
466 max_size_ += size; | |
467 lru_->IncrementMaxSize(size); | |
468 DCHECK_GE(max_size_, 0); | |
469 // Pruning only happens when blocks are added. | |
470 } | |
471 | |
472 } // namespace media | |
OLD | NEW |