Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(909)

Side by Side Diff: media/blink/multibuffer.cc

Issue 1165903002: Multi reader/writer cache/buffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: one more compile fix Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/blink/multibuffer.h"
6
7 #include "base/bind.h"
8
9 namespace std {
10
11 ostream& operator<<(ostream& o, const media::MultiBufferBlockId& id) {
12 if (id.url_data()) {
13 return o << "{" << id.url_data()->url() << ", " << id.block_num() << "}";
14 } else {
15 return o << "{null, " << id.block_num() << "}";
16 }
17 }
18
19 } // namespace std
20
21 namespace media {
22
23 MultiBufferBlockId::MultiBufferBlockId() :
24 url_data_(nullptr),
25 block_num_(0) {
26 }
27 MultiBufferBlockId::MultiBufferBlockId(MultiBufferUrlData url_data,
28 MultiBufferBlockNum block_num) :
29 url_data_(url_data),
30 block_num_(block_num) {
31 }
32
33 MultiBufferBlockId::MultiBufferBlockId(const MultiBufferBlockId& block_id) :
34 url_data_(block_id.url_data()),
35 block_num_(block_id.block_num()) {
36 }
37
38 MultiBufferBlockId::~MultiBufferBlockId() {}
39
40 // Returns the block ID closest to (but less or equal than) |pos| from |index|.
41 template<class T>
42 static MultiBuffer::BlockId ClosestPreviousEntry(
43 const std::map<MultiBuffer::BlockId, T>& index,
44 MultiBuffer::BlockId pos) {
45 auto i = index.upper_bound(pos);
46 DCHECK(i == index.end() || i->first > pos);
47 if (i == index.begin()) {
48 return MultiBufferBlockId();
49 }
50 --i;
51 if (!i->first.SameUrl(pos)) {
52 return MultiBufferBlockId();
53 }
54 DCHECK_LE(i->first, pos);
55 return i->first;
56 }
57
58 // Returns the block ID closest to (but greter than or equal to) |pos|
59 // from |index|.
60 template<class T>
61 static MultiBuffer::BlockId ClosestNextEntry(
62 const std::map<MultiBuffer::BlockId, T>& index,
63 MultiBuffer::BlockId pos) {
64 auto i = index.lower_bound(pos);
65 if (i == index.end()) {
66 return MultiBufferBlockId();
67 }
68 if (!i->first.SameUrl(pos)) {
69 return MultiBufferBlockId();
70 }
71 DCHECK_GE(i->first, pos);
72 return i->first;
73 }
74
75 //
76 // MultiBuffer
77 //
78 MultiBuffer::MultiBuffer(int32_t block_size_shift) :
79 max_size_(0),
80 block_size_shift_(block_size_shift) {
81 }
82
83 MultiBuffer::~MultiBuffer() {
84 // Delete all writers.
85 for (const auto& i : writer_index_) {
86 delete i.second;
87 }
88 }
89
90 void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) {
91 std::set<Reader*> &set_of_readers = readers_[pos];
92 bool already_waited_for = !set_of_readers.empty();
93 set_of_readers.insert(reader);
94
95 if (already_waited_for || Contains(pos)) {
96 return;
97 }
98
99 // We may need to create a new data provider to service this request.
100 // Look for an existing data provider first.
101 DataProvider* provider = NULL;
102 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
103
104 if (pos - closest_writer < kMaxWaitForWriterOffset) {
105 BlockId closest_block = ClosestPreviousEntry(data_, pos);
106 if (pos - closest_block > pos - closest_writer) {
107 provider = writer_index_[closest_writer];
108 DCHECK(provider);
109 }
110 }
111 if (!provider) {
112 provider = writer_index_[pos] = CreateWriter(pos);
113 provider->SetAvailableCallback(
114 base::Bind(&MultiBuffer::DataProviderEvent,
115 base::Unretained(this),
116 base::Unretained(provider)));
117 }
118 provider->SetDeferred(false);
119 }
120
121 void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) {
122 auto i = readers_.find(pos);
123 if (i == readers_.end())
124 return;
125 i->second.erase(reader);
126 if (i->second.empty()) {
127 readers_.erase(i);
128 }
129 }
130
131 void MultiBuffer::CleanupWriters(const BlockId& pos) {
132 BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);
133 if (closest_writer == BlockId())
134 return;
135 if (pos - closest_writer > kMaxWaitForWriterOffset)
136 return;
137 DCHECK(writer_index_[closest_writer]);
138 DataProviderEvent(writer_index_[closest_writer]);
139 }
140
141 bool MultiBuffer::Contains(const BlockId& pos) const {
142 DCHECK(present_[pos] == 0 || present_[pos] == 1)
143 << " pos = " << pos
144 << " present_[pos] " << present_[pos];
145 DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0);
146 return !!present_[pos];
147 }
148
149 MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const {
150 auto i = present_.find(pos);
151 if (i.value()) {
152 return i.range_end();
153 } else {
DaleCurtis 2015/10/19 21:45:25 Don't use else w/ return. Ternary?
hubbe 2015/10/20 00:31:39 I'm not a big fan of terniary. Is this better?
154 return pos;
155 }
156 }
157
158 void MultiBuffer::NotifyAvailableRange(
159 const Range<MultiBufferBlockId>& observer_range,
160 const Range<MultiBufferBlockId>& new_range) {
161 std::set<Reader*> tmp;
162 for (auto i = readers_.lower_bound(observer_range.begin);
163 i != readers_.end() && i->first < observer_range.end;
164 ++i) {
165 tmp.insert(i->second.begin(), i->second.end());
166 }
167 for (Reader* reader: tmp) {
168 reader->NotifyAvailableRange(new_range);
169 }
170 }
171
172 void MultiBuffer::Prune(size_t max_to_free) {
173 // Use a rangemap to merge all consequtive frees into
174 // ranges, then notify observers of changes to those ranges.
175 RangeMap<BlockId, int32_t> freed;
176 while (static_cast<int64_t>(data_.size()) > max_size_ &&
177 !lru_.Empty() &&
178 max_to_free > 0) {
179 BlockId to_free = lru_.Pop();
180 DCHECK(data_[to_free]);
181 DCHECK_EQ(pinned_[to_free], 0);
182 DCHECK_EQ(present_[to_free], 1);
183 data_.erase(to_free);
184 freed.IncrementRange(to_free, to_free + 1, 1);
185 present_.IncrementRange(to_free, to_free + 1, -1);
186 max_to_free--;
187 }
188
189 for (auto freed_iter = freed.first_range();
190 freed_iter != freed.last_range();
191 ++freed_iter) {
192 if (freed_iter.value()) {
193 // Technically, there shouldn't be any observers in this range
194 // as all observers really should be pinning the range where it's
195 // actually observing.
196 NotifyAvailableRange(
197 freed_iter.range(),
198 // Empty range.
199 Range<BlockId>(freed_iter.range_begin(),
200 freed_iter.range_begin()));
201
202 auto i = present_.find(freed_iter.range_begin());
203 DCHECK_EQ(i.value(), 0);
204 DCHECK_LE(i.range_begin(), freed_iter.range_begin());
205 DCHECK_LE(freed_iter.range_end(), i.range_end());
206
207 if (i.range_begin() == freed_iter.range_begin()) {
208 // Notify the previous range that it contains fewer blocks.
209 auto j = i;
210 --j;
211 DCHECK_EQ(j.value(), 1);
212 NotifyAvailableRange(j.range(), j.range());
213 }
214 if (i.range_end() == freed_iter.range_end()) {
215 // Notify the following range that it contains fewer blocks.
216 auto j = i;
217 ++j;
218 DCHECK_EQ(j.value(), 1);
219 NotifyAvailableRange(j.range(), j.range());
220 }
221 }
222 }
223 }
224
225 void MultiBuffer::AddProvider(scoped_ptr<DataProvider> provider) {
226 // If there is already a provider in the same location, we delete it.
227 DCHECK(!provider->Available());
228 BlockId pos = provider->Tell();
229 DataProvider** place = &writer_index_[pos];
230 DCHECK_NE(*place, provider.get());
231 if (*place) delete *place;
232 *place = provider.release();
233 }
234
235 scoped_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider(
236 DataProvider *provider) {
237 BlockId pos = provider->Tell();
238 DCHECK_EQ(writer_index_[pos], provider);
239 writer_index_.erase(pos);
240 return scoped_ptr<DataProvider>(provider);
241 }
242
243 MultiBuffer::ProviderState MultiBuffer::SuggestProviderState(
244 const BlockId& pos) const {
245 MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos);
246 if (next_reader_pos != MultiBufferBlockId() &&
247 (next_reader_pos - pos <= kMaxWaitForWriterOffset ||
248 !pos.url_data()->range_supported())) {
249 // Check if there is another writer between us and the next reader.
250 MultiBufferBlockId next_writer_pos = ClosestNextEntry(
251 writer_index_, pos + 1);
252 if (next_writer_pos == MultiBufferBlockId() ||
253 next_writer_pos > next_reader_pos) {
254 return ProviderStateLoad;
255 }
256 }
257
258 MultiBufferBlockId previous_reader_pos = ClosestPreviousEntry(
259 readers_, pos - 1);
260 if (previous_reader_pos != MultiBufferBlockId() &&
261 (pos - previous_reader_pos <= kMaxWaitForReaderOffset ||
262 !pos.url_data()->range_supported())) {
263 MultiBufferBlockId previous_writer_pos =
264 ClosestPreviousEntry(writer_index_, pos - 1);
265 if (previous_writer_pos < previous_reader_pos) {
266 return ProviderStateDefer;
267 }
268 }
269
270 return ProviderStateDead;
271 }
272
273 bool MultiBuffer::ProviderCollision(const BlockId& id) const {
274 // If there is a writer at the same location, it is always a collision.
275 if (writer_index_.find(id) != writer_index_.end())
276 return true;
277
278 // Data already exists at providers current position,
279 // if the URL supports ranges, we can kill the data provider.
280 if (id.url_data()->range_supported() && Contains(id))
281 return true;
282
283 return false;
284 }
285
286 void MultiBuffer::DataProviderEvent(DataProvider *provider_tmp) {
287 scoped_ptr<DataProvider> provider(RemoveProvider(provider_tmp));
288 BlockId start_pos = provider->Tell();
289 BlockId pos = start_pos;
290 bool eof = false;
291
292 while (!ProviderCollision(pos) && !eof) {
293 if (!provider->Available()) {
294 AddProvider(provider.Pass());
295 break;
296 }
297 DCHECK_GE(pos.block_num(), 0);
298 data_[pos] = provider->Read();
299 eof = data_[pos]->end_of_stream();
300 if (!pinned_[pos])
301 lru_.Use(pos);
302 ++pos;
303 }
304
305 if (pos > start_pos) {
306 present_.SetRange(start_pos, pos, 1);
307 Range<BlockId> expanded_range = present_.find(start_pos).range();
308 NotifyAvailableRange(expanded_range, expanded_range);
309
310 Prune((pos - start_pos) * kMaxFreesPerAdd + 1);
311 }
312
313 // Check that it's still there before we try to delete it.
314 auto i = writer_index_.find(pos);
315 if (i != writer_index_.end() && i->second == provider_tmp) {
316 switch (SuggestProviderState(pos)) {
317 case ProviderStateLoad:
318 // Not sure we actually need to do this
319 provider_tmp->SetDeferred(false);
320 break;
321 case ProviderStateDefer:
322 provider_tmp->SetDeferred(true);
323 break;
324 case ProviderStateDead:
325 RemoveProvider(provider_tmp);
326 break;
327 }
328 }
329 }
330
331 void MultiBuffer::UpdateUrlData(const MultiBufferUrlData& old_url_data,
332 const MultiBufferUrlData& new_url_data) {
DaleCurtis 2015/10/19 21:45:25 Alignment?
hubbe 2015/10/20 00:31:39 Done.
333 MultiBufferBlockId pos(old_url_data, 0);
334 auto i = readers_.lower_bound(pos);
335 while (i != readers_.end() && pos.SameUrl(i->first)) {
336 std::set<Reader*> tmp;
337 tmp.swap(i->second);
338 auto j = i;
339 ++j;
340 readers_.erase(i);
341 i = j;
342 for (Reader* reader: tmp) {
343 reader->UpdateUrlData(old_url_data, new_url_data);
344 }
345 }
346 }
347
348
349 void MultiBuffer::PinRange(
350 const BlockId& from, const BlockId& to, int32_t howmuch) {
DaleCurtis 2015/10/19 21:45:25 how_much
hubbe 2015/10/20 00:31:39 Done.
351 DCHECK_NE(howmuch, 0);
352 DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << howmuch;
353 pinned_.IncrementRange(from, to, howmuch);
354
355 // Iterate over all the modified ranges and check if
356 // any of them have transitioned in or out of the
357 // unlocked state. If so, we iterate over all buffers
358 // in that range and add/remove them from the LRU as
359 // approperiate. We iterate *backwards* through the
360 // ranges, with the idea that data in a continous range
361 // should be freed from the end first.
362
363 if (data_.empty())
364 return;
365
366 auto range = pinned_.find(to - 1);
367 while (1) {
368 if (range.value() == 0 || range.value() == howmuch) {
369 bool pin = range.value() == howmuch;
370 BlockId begin = std::max(range.range_begin(), from);
371 BlockId end = std::min(range.range_end(), to);
372 if (begin >= end)
373 break;
374 DataMap::iterator k = data_.lower_bound(end);
375 while (k != data_.begin()) {
376 --k;
377 if (k->first < begin)
378 break;
379 DCHECK(k->second);
380 DCHECK_GE(k->first.block_num(), 0);
381 if (pin) {
382 DCHECK(pinned_[k->first]);
383 lru_.Remove(k->first);
384 } else {
385 DCHECK(!pinned_[k->first]);
386 lru_.Insert(k->first);
387 }
388 }
389 }
390 if (range == pinned_.first_range()) break;
391 --range;
392 }
393 }
394
395 void MultiBuffer::PinRanges(const RangeMap<BlockId, int32_t>& ranges) {
396 // We know that "last_range" has a zero value, so it is
397 // ok to skip over it.
398 for (auto i = ranges.first_range(); i != ranges.last_range(); ++i) {
399 if (i.value() != 0) {
400 PinRange(i.range_begin(), i.range_end(), i.value());
401 }
402 }
403 }
404
405 void MultiBuffer::IncrementMaxSize(int32_t size) {
406 max_size_ += size;
407 DCHECK_GE(max_size_, 0);
408 // Pruning only happens when blocks are added.
409 }
410
411 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698