Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: media/blink/multibuffer.cc

Issue 1165903002: Multi reader/writer cache/buffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: formatted Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « media/blink/multibuffer.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/blink/multibuffer.h"
6
7 #include "base/bind.h"
8
9 namespace media {
10
11 // Returns the block ID closest to (but less or equal than) |pos| from |index|.
12 template <class T>
13 static MultiBuffer::BlockId ClosestPreviousEntry(
14 const std::map<MultiBuffer::BlockId, T>& index,
15 MultiBuffer::BlockId pos) {
16 auto i = index.upper_bound(pos);
17 DCHECK(i == index.end() || i->first > pos);
18 if (i == index.begin()) {
19 return std::numeric_limits<MultiBufferBlockId>::min();
20 }
21 --i;
22 DCHECK_LE(i->first, pos);
23 return i->first;
24 }
25
26 // Returns the block ID closest to (but greter than or equal to) |pos|
27 // from |index|.
28 template <class T>
29 static MultiBuffer::BlockId ClosestNextEntry(
30 const std::map<MultiBuffer::BlockId, T>& index,
31 MultiBuffer::BlockId pos) {
32 auto i = index.lower_bound(pos);
33 if (i == index.end()) {
34 return std::numeric_limits<MultiBufferBlockId>::max();
35 }
36 DCHECK_GE(i->first, pos);
37 return i->first;
38 }
39
40 //
41 // MultiBuffer::GlobalLRU
42 //
43 MultiBuffer::GlobalLRU::GlobalLRU() : max_size_(0), data_size_(0) {}
44
45 MultiBuffer::GlobalLRU::~GlobalLRU() {
46 // By the time we're freed, all blocks should have been removed,
47 // and our sums should be zero.
48 DCHECK(lru_.Empty());
49 DCHECK_EQ(max_size_, 0);
50 DCHECK_EQ(data_size_, 0);
51 }
52
53 void MultiBuffer::GlobalLRU::Use(MultiBuffer* multibuffer,
54 MultiBufferBlockId block_id) {
55 GlobalBlockId id(multibuffer, block_id);
56 lru_.Use(id);
57 }
58
59 void MultiBuffer::GlobalLRU::Insert(MultiBuffer* multibuffer,
60 MultiBufferBlockId block_id) {
61 GlobalBlockId id(multibuffer, block_id);
62 lru_.Insert(id);
63 }
64
65 void MultiBuffer::GlobalLRU::Remove(MultiBuffer* multibuffer,
66 MultiBufferBlockId block_id) {
67 GlobalBlockId id(multibuffer, block_id);
68 lru_.Remove(id);
69 }
70
71 bool MultiBuffer::GlobalLRU::Contains(MultiBuffer* multibuffer,
72 MultiBufferBlockId block_id) {
73 GlobalBlockId id(multibuffer, block_id);
74 return lru_.Contains(id);
75 }
76
77 void MultiBuffer::GlobalLRU::IncrementDataSize(int64_t blocks) {
78 data_size_ += blocks;
79 DCHECK_GE(data_size_, 0);
80 }
81
82 void MultiBuffer::GlobalLRU::IncrementMaxSize(int64_t blocks) {
83 max_size_ += blocks;
84 DCHECK_GE(max_size_, 0);
85 }
86
87 void MultiBuffer::GlobalLRU::Prune(int64_t max_to_free) {
88 // We group the blocks by multibuffer so that we can free as many blocks as
89 // possible in one call. This reduces the number of callbacks to clients
90 // when their available ranges change.
91 std::map<MultiBuffer*, std::vector<MultiBufferBlockId>> to_free;
92 int64_t freed = 0;
93 while (data_size_ - freed > max_size_ && !lru_.Empty() &&
94 freed < max_to_free) {
95 GlobalBlockId block_id = lru_.Pop();
96 to_free[block_id.first].push_back(block_id.second);
97 freed++;
98 }
99 for (const auto& to_free_pair : to_free) {
100 to_free_pair.first->ReleaseBlocks(to_free_pair.second);
101 }
102 }
103
104 int64_t MultiBuffer::GlobalLRU::Size() const {
105 return lru_.Size();
106 }
107
108 //
109 // MultiBuffer
110 //
111 MultiBuffer::MultiBuffer(int32_t block_size_shift,
112 const scoped_refptr<GlobalLRU>& global_lru)
113 : max_size_(0), block_size_shift_(block_size_shift), lru_(global_lru) {}
114
115 MultiBuffer::~MultiBuffer() {
116 // Delete all writers.
117 for (const auto& i : writer_index_) {
118 delete i.second;
119 }
120 // Remove all blocks from the LRU.
121 for (const auto& i : data_) {
122 lru_->Remove(this, i.first);
123 }
124 lru_->IncrementDataSize(-static_cast<int64_t>(data_.size()));
125 lru_->IncrementMaxSize(-max_size_);
126 }
127
128 void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) {
129 std::set<Reader*>* set_of_readers = &readers_[pos];
130 bool already_waited_for = !set_of_readers->empty();
131 set_of_readers->insert(reader);
132
133 if (already_waited_for || Contains(pos)) {
134 return;
135 }
136
137 // We may need to create a new data provider to service this request.
138 // Look for an existing data provider first.
139 DataProvider* provider = nullptr;
140 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
141
142 if (closest_writer > pos - kMaxWaitForWriterOffset) {
143 auto i = present_.find(pos);
144 BlockId closest_block;
145 if (i.value()) {
146 // Shouldn't happen, we already tested that Contains(pos) is true.
147 NOTREACHED();
148 closest_block = pos;
149 } else if (i == present_.begin()) {
150 closest_block = -1;
151 } else {
152 closest_block = i.interval_begin() - 1;
153 }
154
155 // Make sure that there are no present blocks between the writer and
156 // the requested position, as that will cause the writer to quit.
157 if (closest_writer > closest_block) {
158 provider = writer_index_[closest_writer];
159 DCHECK(provider);
160 }
161 }
162 if (!provider) {
163 DCHECK(writer_index_.find(pos) == writer_index_.end());
164 provider = writer_index_[pos] = CreateWriter(pos);
165 provider->SetAvailableCallback(base::Bind(
166 &MultiBuffer::DataProviderEvent, base::Unretained(this), provider));
167 }
168 provider->SetDeferred(false);
169 }
170
171 void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) {
172 auto i = readers_.find(pos);
173 if (i == readers_.end())
174 return;
175 i->second.erase(reader);
176 if (i->second.empty()) {
177 readers_.erase(i);
178 }
179 }
180
181 void MultiBuffer::CleanupWriters(const BlockId& pos) {
182 BlockId p2 = pos + kMaxWaitForReaderOffset;
183 BlockId closest_writer = ClosestPreviousEntry(writer_index_, p2);
184 while (closest_writer > pos - kMaxWaitForWriterOffset) {
185 DCHECK(writer_index_[closest_writer]);
186 DataProviderEvent(writer_index_[closest_writer]);
187 closest_writer = ClosestPreviousEntry(writer_index_, closest_writer - 1);
188 }
189 }
190
191 bool MultiBuffer::Contains(const BlockId& pos) const {
192 DCHECK(present_[pos] == 0 || present_[pos] == 1)
193 << " pos = " << pos << " present_[pos] " << present_[pos];
194 DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0);
195 return !!present_[pos];
196 }
197
198 MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const {
199 auto i = present_.find(pos);
200 if (i.value())
201 return i.interval_end();
202 return pos;
203 }
204
205 void MultiBuffer::NotifyAvailableRange(
206 const Interval<MultiBufferBlockId>& observer_range,
207 const Interval<MultiBufferBlockId>& new_range) {
208 std::set<Reader*> tmp;
209 for (auto i = readers_.lower_bound(observer_range.begin);
210 i != readers_.end() && i->first < observer_range.end; ++i) {
211 tmp.insert(i->second.begin(), i->second.end());
212 }
213 for (Reader* reader : tmp) {
214 reader->NotifyAvailableRange(new_range);
215 }
216 }
217
218 void MultiBuffer::ReleaseBlocks(const std::vector<MultiBufferBlockId>& blocks) {
219 IntervalMap<BlockId, int32_t> freed;
220 for (MultiBufferBlockId to_free : blocks) {
221 DCHECK(data_[to_free]);
222 DCHECK_EQ(pinned_[to_free], 0);
223 DCHECK_EQ(present_[to_free], 1);
224 data_.erase(to_free);
225 freed.IncrementInterval(to_free, to_free + 1, 1);
226 present_.IncrementInterval(to_free, to_free + 1, -1);
227 }
228 lru_->IncrementDataSize(-static_cast<int64_t>(blocks.size()));
229
230 for (const auto& freed_range : freed) {
231 if (freed_range.second) {
232 // Technically, there shouldn't be any observers in this range
233 // as all observers really should be pinning the range where it's
234 // actually observing.
235 NotifyAvailableRange(
236 freed_range.first,
237 // Empty range.
238 Interval<BlockId>(freed_range.first.begin, freed_range.first.begin));
239
240 auto i = present_.find(freed_range.first.begin);
241 DCHECK_EQ(i.value(), 0);
242 DCHECK_LE(i.interval_begin(), freed_range.first.begin);
243 DCHECK_LE(freed_range.first.end, i.interval_end());
244
245 if (i.interval_begin() == freed_range.first.begin) {
246 // Notify the previous range that it contains fewer blocks.
247 auto j = i;
248 --j;
249 DCHECK_EQ(j.value(), 1);
250 NotifyAvailableRange(j.interval(), j.interval());
251 }
252 if (i.interval_end() == freed_range.first.end) {
253 // Notify the following range that it contains fewer blocks.
254 auto j = i;
255 ++j;
256 DCHECK_EQ(j.value(), 1);
257 NotifyAvailableRange(j.interval(), j.interval());
258 }
259 }
260 }
261 }
262
263 void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) {
264 // If there is already a provider in the same location, we delete it.
265 DCHECK(!provider->Available());
266 BlockId pos = provider->Tell();
267 DataProvider** place = &writer_index_[pos];
268 DCHECK_NE(*place, provider.get());
269 if (*place)
270 delete *place;
271 *place = provider.release();
272 }
273
274 scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider(
275 DataProvider* provider) {
276 BlockId pos = provider->Tell();
277 DCHECK_EQ(writer_index_[pos], provider);
278 writer_index_.erase(pos);
279 return scoped_ptr<DataProvider>(provider);
280 }
281
282 MultiBuffer::ProviderState MultiBuffer::SuggestProviderState(
283 const BlockId& pos) const {
284 MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos);
285 if (next_reader_pos != std::numeric_limits<MultiBufferBlockId>::max() &&
286 (next_reader_pos - pos <= kMaxWaitForWriterOffset || !RangeSupported())) {
287 // Check if there is another writer between us and the next reader.
288 MultiBufferBlockId next_writer_pos =
289 ClosestNextEntry(writer_index_, pos + 1);
290 if (next_writer_pos > next_reader_pos) {
291 return ProviderStateLoad;
292 }
293 }
294
295 MultiBufferBlockId previous_reader_pos =
296 ClosestPreviousEntry(readers_, pos - 1);
297 if (previous_reader_pos != std::numeric_limits<MultiBufferBlockId>::min() &&
298 (pos - previous_reader_pos <= kMaxWaitForReaderOffset ||
299 !RangeSupported())) {
300 MultiBufferBlockId previous_writer_pos =
301 ClosestPreviousEntry(writer_index_, pos - 1);
302 if (previous_writer_pos < previous_reader_pos) {
303 return ProviderStateDefer;
304 }
305 }
306
307 return ProviderStateDead;
308 }
309
310 bool MultiBuffer::ProviderCollision(const BlockId& id) const {
311 // If there is a writer at the same location, it is always a collision.
312 if (writer_index_.find(id) != writer_index_.end())
313 return true;
314
315 // Data already exists at providers current position,
316 // if the URL supports ranges, we can kill the data provider.
317 if (RangeSupported() && Contains(id))
318 return true;
319
320 return false;
321 }
322
323 void MultiBuffer::Prune(size_t max_to_free) {
324 lru_->Prune(max_to_free);
325 }
326
327 void MultiBuffer::DataProviderEvent(DataProvider* provider_tmp) {
328 scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp));
329 BlockId start_pos = provider->Tell();
330 BlockId pos = start_pos;
331 bool eof = false;
332 int64_t blocks_before = data_.size();
333
334 while (!ProviderCollision(pos) && !eof) {
335 if (!provider->Available()) {
336 AddProvider(provider.Pass());
337 break;
338 }
339 DCHECK_GE(pos, 0);
340 scoped_refptr<DataBuffer> data = provider->Read();
341 data_[pos] = data;
342 eof = data->end_of_stream();
343 if (!pinned_[pos])
344 lru_->Use(this, pos);
345 ++pos;
346 }
347 int64_t blocks_after = data_.size();
348 int64_t blocks_added = blocks_after - blocks_before;
349
350 if (pos > start_pos) {
351 present_.SetInterval(start_pos, pos, 1);
352 Interval<BlockId> expanded_range = present_.find(start_pos).interval();
353 NotifyAvailableRange(expanded_range, expanded_range);
354
355 lru_->IncrementDataSize(blocks_added);
356 Prune(blocks_added * kMaxFreesPerAdd + 1);
357 }
358
359 // Check that it's still there before we try to delete it.
360 // In case of EOF or a collision, we might not have called AddProvider above.
361 // Even if we did call AddProvider, calling NotifyAvailableRange can cause
362 // readers to seek or self-destruct and clean up any associated writers.
363 auto i = writer_index_.find(pos);
364 if (i != writer_index_.end() && i->second == provider_tmp) {
365 switch (SuggestProviderState(pos)) {
366 case ProviderStateLoad:
367 // Not sure we actually need to do this
368 provider_tmp->SetDeferred(false);
369 break;
370 case ProviderStateDefer:
371 provider_tmp->SetDeferred(true);
372 break;
373 case ProviderStateDead:
374 RemoveProvider(provider_tmp);
375 break;
376 }
377 }
378 }
379
380 void MultiBuffer::MergeFrom(MultiBuffer* other) {
381 // Import data and update LRU.
382 for (const auto& data : other->data_) {
383 if (data_.insert(std::make_pair(data.first, data.second)).second) {
384 if (!pinned_[data.first]) {
385 lru_->Insert(this, data.first);
386 }
387 }
388 }
389 // Update present_
390 for (const auto& r : other->present_) {
391 if (r.second) {
392 present_.SetInterval(r.first.begin, r.first.end, 1);
393 }
394 }
395 // Notify existing readers.
396 auto last = present_.begin();
397 for (const auto& r : other->present_) {
398 if (r.second) {
399 auto i = present_.find(r.first.begin);
400 if (i != last) {
401 NotifyAvailableRange(i.interval(), i.interval());
402 last = i;
403 }
404 }
405 }
406 }
407
408 void MultiBuffer::PinRange(const BlockId& from,
409 const BlockId& to,
410 int32_t how_much) {
411 DCHECK_NE(how_much, 0);
412 DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << how_much;
413 pinned_.IncrementInterval(from, to, how_much);
414 Interval<BlockId> modified_range(from, to);
415
416 // Iterate over all the modified ranges and check if any of them have
417 // transitioned in or out of the unlocked state. If so, we iterate over
418 // all buffers in that range and add/remove them from the LRU as approperiate.
419 // We iterate *backwards* through the ranges, with the idea that data in a
420 // continous range should be freed from the end first.
421
422 if (data_.empty())
423 return;
424
425 auto range = pinned_.find(to - 1);
426 while (1) {
427 if (range.value() == 0 || range.value() == how_much) {
428 bool pin = range.value() == how_much;
429 Interval<BlockId> transition_range =
430 modified_range.Intersect(range.interval());
431 if (transition_range.Empty())
432 break;
433
434 // For each range that has transitioned to/from a pinned state,
435 // we iterate over the corresponding ranges in |present_| to find
436 // the blocks that are actually in the multibuffer.
437 for (auto present_block_range = present_.find(transition_range.end - 1);
438 present_block_range != present_.begin(); --present_block_range) {
439 if (!present_block_range.value())
440 continue;
441 Interval<BlockId> present_transitioned_range =
442 transition_range.Intersect(present_block_range.interval());
443 if (present_transitioned_range.Empty())
444 break;
445 for (BlockId block = present_transitioned_range.end - 1;
446 block >= present_transitioned_range.begin; --block) {
447 DCHECK_GE(block, 0);
448 DCHECK(data_.find(block) != data_.end());
449 if (pin) {
450 DCHECK(pinned_[block]);
451 lru_->Remove(this, block);
452 } else {
453 DCHECK(!pinned_[block]);
454 lru_->Insert(this, block);
455 }
456 }
457 }
458 }
459 if (range == pinned_.begin())
460 break;
461 --range;
462 }
463 }
464
465 void MultiBuffer::PinRanges(const IntervalMap<BlockId, int32_t>& ranges) {
466 for (const auto& r : ranges) {
467 if (r.second != 0) {
468 PinRange(r.first.begin, r.first.end, r.second);
469 }
470 }
471 }
472
473 void MultiBuffer::IncrementMaxSize(int32_t size) {
474 max_size_ += size;
475 lru_->IncrementMaxSize(size);
476 DCHECK_GE(max_size_, 0);
477 // Pruning only happens when blocks are added.
478 }
479
480 } // namespace media
OLDNEW
« no previous file with comments | « media/blink/multibuffer.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698