Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(973)

Side by Side Diff: media/blink/multibuffer.cc

Issue 1165903002: Multi reader/writer cache/buffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: refactor Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/blink/multibuffer.h"
6
7 #include "base/bind.h"
8
9 namespace media {
10
11 // Returns the block ID closest to (but less or equal than) |pos| from |index|.
12 template<class T>
13 static MultiBuffer::BlockId ClosestPreviousEntry(
14 const std::map<MultiBuffer::BlockId, T>& index,
15 MultiBuffer::BlockId pos) {
16 auto i = index.upper_bound(pos);
17 DCHECK(i == index.end() || i->first > pos);
18 if (i == index.begin()) {
19 return std::numeric_limits<MultiBufferBlockId>::min();
20 }
21 --i;
22 DCHECK_LE(i->first, pos);
23 return i->first;
24 }
25
26 // Returns the block ID closest to (but greter than or equal to) |pos|
27 // from |index|.
28 template<class T>
29 static MultiBuffer::BlockId ClosestNextEntry(
30 const std::map<MultiBuffer::BlockId, T>& index,
31 MultiBuffer::BlockId pos) {
32 auto i = index.lower_bound(pos);
33 if (i == index.end()) {
34 return std::numeric_limits<MultiBufferBlockId>::max();
35 }
36 DCHECK_GE(i->first, pos);
37 return i->first;
38 }
39
40 //
41 // MultiBuffer::GlobalLRU
42 //
43 MultiBuffer::GlobalLRU::GlobalLRU() : max_size_(0), data_size_(0) {}
44
45 MultiBuffer::GlobalLRU::~GlobalLRU() {
46 // By the time we're freed, all blocks should have been removed,
47 // and our sums should be zero.
48 DCHECK(lru_.Empty());
49 DCHECK_EQ(max_size_, 0);
50 DCHECK_EQ(data_size_, 0);
51 }
52
53 void MultiBuffer::GlobalLRU::Use(MultiBuffer* multibuffer,
54 MultiBufferBlockId block_id) {
55 GlobalBlockId id(multibuffer, block_id);
56 lru_.Use(id);
57 }
58
59 void MultiBuffer::GlobalLRU::Insert(MultiBuffer* multibuffer,
60 MultiBufferBlockId block_id) {
61 GlobalBlockId id(multibuffer, block_id);
62 lru_.Insert(id);
63 }
64
65 void MultiBuffer::GlobalLRU::Remove(MultiBuffer* multibuffer,
66 MultiBufferBlockId block_id) {
67 GlobalBlockId id(multibuffer, block_id);
68 lru_.Remove(id);
69 }
70
71 bool MultiBuffer::GlobalLRU::Contains(MultiBuffer* multibuffer,
72 MultiBufferBlockId block_id) {
73 GlobalBlockId id(multibuffer, block_id);
74 return lru_.Contains(id);
75 }
76
77 void MultiBuffer::GlobalLRU::IncrementDataSize(int64_t blocks) {
78 data_size_ += blocks;
79 DCHECK_GE(data_size_, 0);
80 }
81
82 void MultiBuffer::GlobalLRU::IncrementMaxSize(int64_t blocks) {
83 max_size_ += blocks;
84 DCHECK_GE(max_size_, 0);
85 }
86
87 void MultiBuffer::GlobalLRU::Prune(size_t max_to_free) {
88 // Group the blocks by multibuffer, then free them.
89 std::map<MultiBuffer*, std::vector<MultiBufferBlockId> > to_free;
90 size_t freed = 0;
91 while (data_size_ - freed > max_size_ &&
92 !lru_.Empty() &&
93 freed < max_to_free) {
94 GlobalBlockId block_id = lru_.Pop();
95 to_free[block_id.first].push_back(block_id.second);
96 freed--;
DaleCurtis 2015/10/28 23:06:13 How does this work? freed is zero and doesn't seem
hubbe 2015/10/29 17:52:44 OOps, should be ++. Added test as well.
97 }
98 for (const auto &to_free_pair : to_free) {
DaleCurtis 2015/10/28 23:06:13 auto&
hubbe 2015/10/29 17:52:44 Done.
99 to_free_pair.first->ReleaseBlocks(to_free_pair.second);
100 }
101 }
102
103 //
104 // MultiBuffer
105 //
106 MultiBuffer::MultiBuffer(int32_t block_size_shift,
107 scoped_refptr<GlobalLRU> global_lru) :
DaleCurtis 2015/10/28 23:06:13 const&
hubbe 2015/10/29 17:52:44 Done.
108 max_size_(0),
109 block_size_shift_(block_size_shift),
110 lru_(global_lru) {
111 }
112
113 MultiBuffer::~MultiBuffer() {
114 // Delete all writers.
115 for (const auto& i : writer_index_) {
116 delete i.second;
117 }
118 // Remove all blocks from the LRU.
119 for (const auto& i : data_) {
120 lru_->Remove(this, i.first);
121 }
122 lru_->IncrementDataSize(-static_cast<int64_t>(data_.size()));
123 lru_->IncrementMaxSize(-max_size_);
124 }
125
126 void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) {
127 std::set<Reader*> &set_of_readers = readers_[pos];
128 bool already_waited_for = !set_of_readers.empty();
129 set_of_readers.insert(reader);
130
131 if (already_waited_for || Contains(pos)) {
132 return;
133 }
134
135 // We may need to create a new data provider to service this request.
136 // Look for an existing data provider first.
137 DataProvider* provider = NULL;
138 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
139
140 if (closest_writer > pos - kMaxWaitForWriterOffset) {
141 auto i = present_.find(pos);
142 BlockId closest_block;
143 if (i.value()) {
144 closest_block = pos;
145 } else if (i == present_.begin()) {
146 closest_block = -1;
147 } else {
148 closest_block = i.range_begin() - 1;
149 }
150
151 if (closest_writer > closest_block) {
152 provider = writer_index_[closest_writer];
153 DCHECK(provider);
154 }
155 }
156 if (!provider) {
157 DCHECK(writer_index_.find(pos) == writer_index_.end());
158 provider = writer_index_[pos] = CreateWriter(pos);
159 provider->SetAvailableCallback(
160 base::Bind(&MultiBuffer::DataProviderEvent,
161 base::Unretained(this),
162 base::Unretained(provider)));
163 }
164 provider->SetDeferred(false);
165 }
166
167 void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) {
168 auto i = readers_.find(pos);
169 if (i == readers_.end())
170 return;
171 i->second.erase(reader);
172 if (i->second.empty()) {
173 readers_.erase(i);
174 }
175 }
176
177 void MultiBuffer::CleanupWriters(const BlockId& pos) {
178 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
179 if (closest_writer < 0)
180 return;
181 if (pos - closest_writer > kMaxWaitForWriterOffset)
182 return;
183 DCHECK(writer_index_[closest_writer]);
184 DataProviderEvent(writer_index_[closest_writer]);
185 }
186
187 bool MultiBuffer::Contains(const BlockId& pos) const {
188 DCHECK(present_[pos] == 0 || present_[pos] == 1)
189 << " pos = " << pos
190 << " present_[pos] " << present_[pos];
191 DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0);
192 return !!present_[pos];
193 }
194
195 MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const {
196 auto i = present_.find(pos);
197 if (i.value())
198 return i.range_end();
199 return pos;
200 }
201
202 void MultiBuffer::NotifyAvailableRange(
203 const Range<MultiBufferBlockId>& observer_range,
204 const Range<MultiBufferBlockId>& new_range) {
205 std::set<Reader*> tmp;
206 for (auto i = readers_.lower_bound(observer_range.begin);
207 i != readers_.end() && i->first < observer_range.end;
208 ++i) {
209 tmp.insert(i->second.begin(), i->second.end());
210 }
211 for (Reader* reader: tmp) {
212 reader->NotifyAvailableRange(new_range);
213 }
214 }
215
216 void MultiBuffer::ReleaseBlocks(const std::vector<MultiBufferBlockId> blocks) {
217 RangeMap<BlockId, int32_t> freed;
218 for (MultiBufferBlockId to_free : blocks) {
DaleCurtis 2015/10/28 23:06:13 const auto&?
hubbe 2015/10/29 17:52:44 It's an integer type though, using "const auto&" s
219 DCHECK(data_[to_free]);
220 DCHECK_EQ(pinned_[to_free], 0);
221 DCHECK_EQ(present_[to_free], 1);
222 data_.erase(to_free);
223 freed.IncrementRange(to_free, to_free + 1, 1);
224 present_.IncrementRange(to_free, to_free + 1, -1);
225 }
226 lru_->IncrementDataSize(-blocks.size());
227
228 for (const auto& freed_range : freed) {
229 if (freed_range.second) {
230 // Technically, there shouldn't be any observers in this range
231 // as all observers really should be pinning the range where it's
232 // actually observing.
233 NotifyAvailableRange(
234 freed_range.first,
235 // Empty range.
236 Range<BlockId>(freed_range.first.begin,
237 freed_range.first.begin));
238
239 auto i = present_.find(freed_range.first.begin);
240 DCHECK_EQ(i.value(), 0);
241 DCHECK_LE(i.range_begin(), freed_range.first.begin);
242 DCHECK_LE(freed_range.first.end, i.range_end());
243
244 if (i.range_begin() == freed_range.first.begin) {
245 // Notify the previous range that it contains fewer blocks.
246 auto j = i;
247 --j;
248 DCHECK_EQ(j.value(), 1);
249 NotifyAvailableRange(j.range(), j.range());
250 }
251 if (i.range_end() == freed_range.first.end) {
252 // Notify the following range that it contains fewer blocks.
253 auto j = i;
254 ++j;
255 DCHECK_EQ(j.value(), 1);
256 NotifyAvailableRange(j.range(), j.range());
257 }
258 }
259 }
260 }
261
262 void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) {
263 // If there is already a provider in the same location, we delete it.
264 DCHECK(!provider->Available());
265 BlockId pos = provider->Tell();
266 DataProvider** place = &writer_index_[pos];
267 DCHECK_NE(*place, provider.get());
268 if (*place) delete *place;
269 *place = provider.release();
270 }
271
272 scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider(
273 DataProvider *provider) {
274 BlockId pos = provider->Tell();
275 DCHECK_EQ(writer_index_[pos], provider);
276 writer_index_.erase(pos);
277 return scoped_ptr<DataProvider>(provider);
278 }
279
280 MultiBuffer::ProviderState MultiBuffer::SuggestProviderState(
281 const BlockId& pos) const {
282 MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos);
283 if (next_reader_pos != std::numeric_limits<MultiBufferBlockId>::max() &&
284 (next_reader_pos - pos <= kMaxWaitForWriterOffset || !RangeSupported())) {
285 // Check if there is another writer between us and the next reader.
286 MultiBufferBlockId next_writer_pos = ClosestNextEntry(
287 writer_index_, pos + 1);
288 if (next_writer_pos > next_reader_pos) {
289 return ProviderStateLoad;
290 }
291 }
292
293 MultiBufferBlockId previous_reader_pos = ClosestPreviousEntry(
294 readers_, pos - 1);
295 if (previous_reader_pos != std::numeric_limits<MultiBufferBlockId>::min() &&
296 (pos - previous_reader_pos <= kMaxWaitForReaderOffset ||
297 !RangeSupported())) {
298 MultiBufferBlockId previous_writer_pos =
299 ClosestPreviousEntry(writer_index_, pos - 1);
300 if (previous_writer_pos < previous_reader_pos) {
301 return ProviderStateDefer;
302 }
303 }
304
305 return ProviderStateDead;
306 }
307
308 bool MultiBuffer::ProviderCollision(const BlockId& id) const {
309 // If there is a writer at the same location, it is always a collision.
310 if (writer_index_.find(id) != writer_index_.end())
311 return true;
312
313 // Data already exists at providers current position,
314 // if the URL supports ranges, we can kill the data provider.
315 if (RangeSupported() && Contains(id))
316 return true;
317
318 return false;
319 }
320
321 void MultiBuffer::Prune(size_t max_to_free) {
322 lru_->Prune(max_to_free);
323 }
324
325 void MultiBuffer::DataProviderEvent(DataProvider *provider_tmp) {
326 scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp));
327 BlockId start_pos = provider->Tell();
328 BlockId pos = start_pos;
329 bool eof = false;
330 int64_t blocks_added = -data_.size();
DaleCurtis 2015/10/28 23:06:13 -size_t might be problematic?
hubbe 2015/10/29 17:52:44 Assuming unary minus behaves anything like binary
hubbe 2015/10/29 21:45:05 Turns out you're right, - on size_t is problematic
331
332 while (!ProviderCollision(pos) && !eof) {
333 if (!provider->Available()) {
334 AddProvider(provider.Pass());
335 break;
336 }
337 DCHECK_GE(pos, 0);
338 data_[pos] = provider->Read();
339 eof = data_[pos]->end_of_stream();
340 if (!pinned_[pos])
341 lru_->Use(this, pos);
342 ++pos;
343 }
344 blocks_added += data_.size();
345
346 if (pos > start_pos) {
347 present_.SetRange(start_pos, pos, 1);
348 Range<BlockId> expanded_range = present_.find(start_pos).range();
349 NotifyAvailableRange(expanded_range, expanded_range);
350
351 lru_->IncrementDataSize(blocks_added);
352 Prune(blocks_added * kMaxFreesPerAdd + 1);
353 }
354
355 // Check that it's still there before we try to delete it.
356 auto i = writer_index_.find(pos);
357 if (i != writer_index_.end() && i->second == provider_tmp) {
358 switch (SuggestProviderState(pos)) {
359 case ProviderStateLoad:
360 // Not sure we actually need to do this
361 provider_tmp->SetDeferred(false);
362 break;
363 case ProviderStateDefer:
364 provider_tmp->SetDeferred(true);
365 break;
366 case ProviderStateDead:
367 RemoveProvider(provider_tmp);
368 break;
369 }
370 }
371 }
372
373 void MultiBuffer::MergeFrom(MultiBuffer* other) {
374 // Import data and update LRU.
375 for (auto data : other->data_) {
DaleCurtis 2015/10/28 23:06:13 const auto& here and the three below.
hubbe 2015/10/29 17:52:44 Done. The range iterator used in the two cases be
376 if (data_.insert(std::make_pair(data.first, data.second)).second) {
377 if (!pinned_[data.first]) {
378 lru_->Insert(this, data.first);
379 }
380 }
381 }
382 // Update present_
383 for (auto r : other->present_) {
384 if (r.second) {
385 present_.SetRange(r.first.begin, r.first.end, 1);
386 }
387 }
388 // Notify existing readers.
389 auto last = present_.begin();
390 for (auto r : other->present_) {
391 if (r.second) {
392 auto i = present_.find(r.first.begin);
393 if (i != last) {
394 NotifyAvailableRange(i.range(), i.range());
395 last = i;
396 }
397 }
398 }
399 }
400
401 void MultiBuffer::PinRange(
402 const BlockId& from, const BlockId& to, int32_t how_much) {
403 DCHECK_NE(how_much, 0);
404 DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << how_much;
405 pinned_.IncrementRange(from, to, how_much);
406 Range<BlockId> modified_range(from, to);
407
408 // Iterate over all the modified ranges and check if any of them have
409 // transitioned in or out of the unlocked state. If so, we iterate over
410 // all buffers in that range and add/remove them from the LRU as approperiate.
411 // We iterate *backwards* through the ranges, with the idea that data in a
412 // continous range should be freed from the end first.
413
414 if (data_.empty())
415 return;
416
417 auto range = pinned_.find(to - 1);
418 while (1) {
419 if (range.value() == 0 || range.value() == how_much) {
420 bool pin = range.value() == how_much;
421 Range<BlockId> transition_range = modified_range.Intersect(range.range());
422 BlockId transition_begin = std::max(range.range_begin(), from);
423 BlockId transition_end = std::min(range.range_end(), to);
424 if (transition_range.Empty())
425 break;
426
427 // For each range that has transitioned to/from a pinned state,
428 // we iterate over the corresponding ranges in |present_| to find
429 // the blocks that are actually in the multibuffer.
430 for (auto present_block_range = present_.find(transition_end - 1);
DaleCurtis 2015/10/28 23:06:13 ditto.
hubbe 2015/10/29 17:52:44 Ditto what? This is an iterator...
DaleCurtis 2015/10/29 17:59:05 Ditto for const auto& unless you need the non-cons
hubbe 2015/10/29 21:45:05 A const iterator is an iterator to a const contain
431 present_block_range != present_.begin();
432 --present_block_range) {
433 if (!present_block_range.value())
434 continue;
435 Range<BlockId> present_transitioned_range =
436 transition_range.Intersect(present_block_range.range());
437 if (present_transitioned_range.Empty())
438 break;
439 for (BlockId block = present_transitioned_range.end - 1;
440 block >= present_transitioned_range.begin;
441 --block) {
442 DCHECK_GE(block, 0);
443 DCHECK(data_.find(block) != data_.end());
444 if (pin) {
445 DCHECK(pinned_[block]);
446 lru_->Remove(this, block);
447 } else {
448 DCHECK(!pinned_[block]);
449 lru_->Insert(this, block);
450 }
451 }
452 }
453 }
454 if (range == pinned_.begin()) break;
455 --range;
456 }
457 }
458
459 void MultiBuffer::PinRanges(const RangeMap<BlockId, int32_t>& ranges) {
460 for (const auto& r : ranges) {
461 if (r.second != 0) {
462 PinRange(r.first.begin, r.first.end, r.second);
463 }
464 }
465 }
466
467 void MultiBuffer::IncrementMaxSize(int32_t size) {
468 max_size_ += size;
469 lru_->IncrementMaxSize(size);
470 DCHECK_GE(max_size_, 0);
471 // Pruning only happens when blocks are added.
472 }
473
474 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698