Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(331)

Side by Side Diff: chromecast/media/cma/ipc/media_message_fifo.cc

Issue 2300993003: CmaRenderer is dead. Long live MojoRenderer. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chromecast/media/cma/ipc/media_message_fifo.h"
6
7 #include <utility>
8
9 #include "base/atomicops.h"
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/threading/thread_task_runner_handle.h"
16 #include "chromecast/media/cma/base/cma_logging.h"
17 #include "chromecast/media/cma/ipc/media_memory_chunk.h"
18 #include "chromecast/media/cma/ipc/media_message.h"
19 #include "chromecast/media/cma/ipc/media_message_type.h"
20
21 namespace chromecast {
22 namespace media {
23
24 class MediaMessageFlag
25 : public base::RefCountedThreadSafe<MediaMessageFlag> {
26 public:
27 // |offset| is the offset in the fifo of the media message.
28 explicit MediaMessageFlag(size_t offset);
29
30 bool IsValid() const;
31
32 void Invalidate();
33
34 size_t offset() const { return offset_; }
35
36 private:
37 friend class base::RefCountedThreadSafe<MediaMessageFlag>;
38 virtual ~MediaMessageFlag();
39
40 const size_t offset_;
41 bool flag_;
42
43 DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
44 };
45
46 MediaMessageFlag::MediaMessageFlag(size_t offset)
47 : offset_(offset),
48 flag_(true) {
49 }
50
51 MediaMessageFlag::~MediaMessageFlag() {
52 }
53
54 bool MediaMessageFlag::IsValid() const {
55 return flag_;
56 }
57
58 void MediaMessageFlag::Invalidate() {
59 flag_ = false;
60 }
61
62 class FifoOwnedMemory : public MediaMemoryChunk {
63 public:
64 FifoOwnedMemory(void* data, size_t size,
65 const scoped_refptr<MediaMessageFlag>& flag,
66 const base::Closure& release_msg_cb);
67 ~FifoOwnedMemory() override;
68
69 // MediaMemoryChunk implementation.
70 void* data() const override { return data_; }
71 size_t size() const override { return size_; }
72 bool valid() const override { return flag_->IsValid(); }
73
74 private:
75 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
76 base::Closure release_msg_cb_;
77
78 void* const data_;
79 const size_t size_;
80 scoped_refptr<MediaMessageFlag> flag_;
81
82 DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
83 };
84
85 FifoOwnedMemory::FifoOwnedMemory(void* data,
86 size_t size,
87 const scoped_refptr<MediaMessageFlag>& flag,
88 const base::Closure& release_msg_cb)
89 : task_runner_(base::ThreadTaskRunnerHandle::Get()),
90 release_msg_cb_(release_msg_cb),
91 data_(data),
92 size_(size),
93 flag_(flag) {
94 }
95
96 FifoOwnedMemory::~FifoOwnedMemory() {
97 // Release the flag before notifying that the message has been released.
98 flag_ = scoped_refptr<MediaMessageFlag>();
99 if (!release_msg_cb_.is_null()) {
100 if (task_runner_->BelongsToCurrentThread()) {
101 release_msg_cb_.Run();
102 } else {
103 task_runner_->PostTask(FROM_HERE, release_msg_cb_);
104 }
105 }
106 }
107
108 MediaMessageFifo::MediaMessageFifo(std::unique_ptr<MediaMemoryChunk> mem,
109 bool init)
110 : mem_(std::move(mem)), weak_factory_(this) {
111 CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
112 0u);
113 CHECK_GE(mem_->size(), sizeof(Descriptor));
114 Descriptor* desc = static_cast<Descriptor*>(mem_->data());
115 base_ = static_cast<void*>(&desc->first_item);
116
117 // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
118 // Currently, the sign differs.
119 rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
120 wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
121
122 size_t max_size = mem_->size() -
123 (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
124 if (init) {
125 size_ = max_size;
126 desc->size = size_;
127 internal_rd_offset_ = 0;
128 internal_wr_offset_ = 0;
129 base::subtle::Release_Store(rd_offset_, 0);
130 base::subtle::Release_Store(wr_offset_, 0);
131 } else {
132 size_ = desc->size;
133 CHECK_LE(size_, max_size);
134 internal_rd_offset_ = current_rd_offset();
135 internal_wr_offset_ = current_wr_offset();
136 }
137 CMALOG(kLogControl)
138 << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
139 CHECK_GT(size_, 0u) << size_;
140
141 weak_this_ = weak_factory_.GetWeakPtr();
142 thread_checker_.DetachFromThread();
143 }
144
145 MediaMessageFifo::~MediaMessageFifo() {
146 DCHECK(thread_checker_.CalledOnValidThread());
147 }
148
149 void MediaMessageFifo::ObserveReadActivity(
150 const base::Closure& read_event_cb) {
151 read_event_cb_ = read_event_cb;
152 }
153
154 void MediaMessageFifo::ObserveWriteActivity(
155 const base::Closure& write_event_cb) {
156 write_event_cb_ = write_event_cb;
157 }
158
159 std::unique_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
160 size_t size_to_reserve) {
161 DCHECK(thread_checker_.CalledOnValidThread());
162
163 // Capture first both the read and write offsets.
164 // and exit right away if not enough free space.
165 size_t wr_offset = internal_wr_offset();
166 size_t rd_offset = current_rd_offset();
167 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
168 size_t free_size = size_ - 1 - allocated_size;
169 if (free_size < size_to_reserve)
170 return std::unique_ptr<MediaMemoryChunk>();
171 CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
172
173 // Note: in the next 2 conditions, we have:
174 // trailing_byte_count < size_to_reserve
175 // and since at this stage: size_to_reserve <= free_size
176 // we also have trailing_byte_count <= free_size
177 // which means that all the trailing bytes are free space in the fifo.
178 size_t trailing_byte_count = size_ - wr_offset;
179 if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
180 // If there is no space to even write the smallest message,
181 // skip the trailing bytes and come back to the beginning of the fifo.
182 // (no way to insert a padding message).
183 if (free_size < trailing_byte_count)
184 return std::unique_ptr<MediaMemoryChunk>();
185 wr_offset = 0;
186 CommitInternalWrite(wr_offset);
187
188 } else if (trailing_byte_count < size_to_reserve) {
189 // At this point, we know we have at least the space to write a message.
190 // However, to avoid splitting a message, a padding message is needed.
191 std::unique_ptr<MediaMemoryChunk> mem(
192 ReserveMemoryNoCheck(trailing_byte_count));
193 std::unique_ptr<MediaMessage> padding_message(
194 MediaMessage::CreateMessage(PaddingMediaMsg, std::move(mem)));
195 }
196
197 // Recalculate the free size and exit if not enough free space.
198 wr_offset = internal_wr_offset();
199 allocated_size = (size_ + wr_offset - rd_offset) % size_;
200 free_size = size_ - 1 - allocated_size;
201 if (free_size < size_to_reserve)
202 return std::unique_ptr<MediaMemoryChunk>();
203
204 return ReserveMemoryNoCheck(size_to_reserve);
205 }
206
207 std::unique_ptr<MediaMessage> MediaMessageFifo::Pop() {
208 DCHECK(thread_checker_.CalledOnValidThread());
209
210 // Capture the read and write offsets.
211 size_t rd_offset = internal_rd_offset();
212 size_t wr_offset = current_wr_offset();
213 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
214
215 if (allocated_size < MediaMessage::minimum_msg_size())
216 return std::unique_ptr<MediaMessage>();
217
218 size_t trailing_byte_count = size_ - rd_offset;
219 if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
220 // If there is no space to even have the smallest message,
221 // skip the trailing bytes and come back to the beginning of the fifo.
222 // Note: all the trailing bytes correspond to allocated bytes since:
223 // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
224 rd_offset = 0;
225 allocated_size -= trailing_byte_count;
226 trailing_byte_count = size_;
227 CommitInternalRead(rd_offset);
228 }
229
230 // The message should not be longer than the allocated size
231 // but since a message is a contiguous area of memory, it should also be
232 // smaller than |trailing_byte_count|.
233 size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
234 if (max_msg_size < MediaMessage::minimum_msg_size())
235 return std::unique_ptr<MediaMessage>();
236 void* msg_src = static_cast<uint8_t*>(base_) + rd_offset;
237
238 // Create a flag to protect the serialized structure of the message
239 // from being overwritten.
240 // The serialized structure starts at offset |rd_offset|.
241 scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
242 rd_flags_.push_back(rd_flag);
243 std::unique_ptr<MediaMemoryChunk> mem(new FifoOwnedMemory(
244 msg_src, max_msg_size, rd_flag,
245 base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
246
247 // Create the message which wraps its the serialized structure.
248 std::unique_ptr<MediaMessage> message(
249 MediaMessage::MapMessage(std::move(mem)));
250 CHECK(message);
251
252 // Update the internal read pointer.
253 rd_offset = (rd_offset + message->size()) % size_;
254 CommitInternalRead(rd_offset);
255
256 return message;
257 }
258
259 void MediaMessageFifo::Flush() {
260 DCHECK(thread_checker_.CalledOnValidThread());
261
262 size_t wr_offset = current_wr_offset();
263
264 // Invalidate every memory region before flushing.
265 while (!rd_flags_.empty()) {
266 CMALOG(kLogControl) << "Invalidate flag";
267 rd_flags_.front()->Invalidate();
268 rd_flags_.pop_front();
269 }
270
271 // Flush by setting the read pointer to the value of the write pointer.
272 // Update first the internal read pointer then the public one.
273 CommitInternalRead(wr_offset);
274 CommitRead(wr_offset);
275 }
276
277 std::unique_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
278 size_t size_to_reserve) {
279 size_t wr_offset = internal_wr_offset();
280
281 // Memory block corresponding to the serialized structure of the message.
282 void* msg_start = static_cast<uint8_t*>(base_) + wr_offset;
283 scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
284 wr_flags_.push_back(wr_flag);
285 std::unique_ptr<MediaMemoryChunk> mem(new FifoOwnedMemory(
286 msg_start, size_to_reserve, wr_flag,
287 base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
288
289 // Update the internal write pointer.
290 wr_offset = (wr_offset + size_to_reserve) % size_;
291 CommitInternalWrite(wr_offset);
292
293 return mem;
294 }
295
296 void MediaMessageFifo::OnWrMemoryReleased() {
297 DCHECK(thread_checker_.CalledOnValidThread());
298
299 if (wr_flags_.empty()) {
300 // Sanity check: when there is no protected memory area,
301 // the external write offset has no reason to be different from
302 // the internal write offset.
303 DCHECK_EQ(current_wr_offset(), internal_wr_offset());
304 return;
305 }
306
307 // Update the external write offset.
308 while (!wr_flags_.empty() &&
309 (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
310 // TODO(damienv): Could add a sanity check to make sure the offset is
311 // between the external write offset and the read offset (not included).
312 wr_flags_.pop_front();
313 }
314
315 // Update the read offset to the first locked memory area
316 // or to the internal read pointer if nothing prevents it.
317 size_t external_wr_offset = internal_wr_offset();
318 if (!wr_flags_.empty())
319 external_wr_offset = wr_flags_.front()->offset();
320 CommitWrite(external_wr_offset);
321 }
322
323 void MediaMessageFifo::OnRdMemoryReleased() {
324 DCHECK(thread_checker_.CalledOnValidThread());
325
326 if (rd_flags_.empty()) {
327 // Sanity check: when there is no protected memory area,
328 // the external read offset has no reason to be different from
329 // the internal read offset.
330 DCHECK_EQ(current_rd_offset(), internal_rd_offset());
331 return;
332 }
333
334 // Update the external read offset.
335 while (!rd_flags_.empty() &&
336 (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
337 // TODO(damienv): Could add a sanity check to make sure the offset is
338 // between the external read offset and the write offset.
339 rd_flags_.pop_front();
340 }
341
342 // Update the read offset to the first locked memory area
343 // or to the internal read pointer if nothing prevents it.
344 size_t external_rd_offset = internal_rd_offset();
345 if (!rd_flags_.empty())
346 external_rd_offset = rd_flags_.front()->offset();
347 CommitRead(external_rd_offset);
348 }
349
350 size_t MediaMessageFifo::current_rd_offset() const {
351 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
352 size_t rd_offset = base::subtle::Acquire_Load(rd_offset_);
353 CHECK_LT(rd_offset, size_);
354 return rd_offset;
355 }
356
357 size_t MediaMessageFifo::current_wr_offset() const {
358 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
359
360 // When the fifo consumer acquires the write offset,
361 // we have to make sure that any possible following reads are actually
362 // returning results at least inline with the memory snapshot taken
363 // when the write offset was sampled.
364 // That's why an Acquire_Load is used here.
365 size_t wr_offset = base::subtle::Acquire_Load(wr_offset_);
366 CHECK_LT(wr_offset, size_);
367 return wr_offset;
368 }
369
370 void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
371 // Add a memory fence to ensure the message content is completely read
372 // before updating the read offset.
373 base::subtle::Release_Store(rd_offset_, new_rd_offset);
374
375 // Since rd_offset_ is updated by a release_store above, any thread that
376 // does acquire_load is guaranteed to see the new rd_offset_ set above.
377 // So it is safe to send the notification.
378 if (!read_event_cb_.is_null()) {
379 read_event_cb_.Run();
380 }
381 }
382
383 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
384 // Add a memory fence to ensure the message content is written
385 // before updating the write offset.
386 base::subtle::Release_Store(wr_offset_, new_wr_offset);
387
388 // Since wr_offset_ is updated by a release_store above, any thread that
389 // does acquire_load is guaranteed to see the new wr_offset_ set above.
390 // So it is safe to send the notification.
391 if (!write_event_cb_.is_null()) {
392 write_event_cb_.Run();
393 }
394 }
395
396 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
397 internal_rd_offset_ = new_rd_offset;
398 }
399
400 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
401 internal_wr_offset_ = new_wr_offset;
402 }
403
404 } // namespace media
405 } // namespace chromecast
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698