OLD | NEW |
(Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "chromecast/media/cma/ipc/media_message_fifo.h" |
| 6 |
| 7 #include "base/atomicops.h" |
| 8 #include "base/bind.h" |
| 9 #include "base/location.h" |
| 10 #include "base/logging.h" |
| 11 #include "base/message_loop/message_loop_proxy.h" |
| 12 #include "chromecast/media/cma/base/cma_logging.h" |
| 13 #include "chromecast/media/cma/ipc/media_memory_chunk.h" |
| 14 #include "chromecast/media/cma/ipc/media_message.h" |
| 15 #include "chromecast/media/cma/ipc/media_message_type.h" |
| 16 |
| 17 namespace chromecast { |
| 18 namespace media { |
| 19 |
| 20 class MediaMessageFlag |
| 21 : public base::RefCountedThreadSafe<MediaMessageFlag> { |
| 22 public: |
| 23 // |offset| is the offset in the fifo of the media message. |
| 24 explicit MediaMessageFlag(size_t offset); |
| 25 |
| 26 bool IsValid() const; |
| 27 |
| 28 void Invalidate(); |
| 29 |
| 30 size_t offset() const { return offset_; } |
| 31 |
| 32 private: |
| 33 friend class base::RefCountedThreadSafe<MediaMessageFlag>; |
| 34 virtual ~MediaMessageFlag(); |
| 35 |
| 36 const size_t offset_; |
| 37 bool flag_; |
| 38 |
| 39 DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag); |
| 40 }; |
| 41 |
| 42 MediaMessageFlag::MediaMessageFlag(size_t offset) |
| 43 : offset_(offset), |
| 44 flag_(true) { |
| 45 } |
| 46 |
| 47 MediaMessageFlag::~MediaMessageFlag() { |
| 48 } |
| 49 |
| 50 bool MediaMessageFlag::IsValid() const { |
| 51 return flag_; |
| 52 } |
| 53 |
| 54 void MediaMessageFlag::Invalidate() { |
| 55 flag_ = false; |
| 56 } |
| 57 |
| 58 class FifoOwnedMemory : public MediaMemoryChunk { |
| 59 public: |
| 60 FifoOwnedMemory(void* data, size_t size, |
| 61 const scoped_refptr<MediaMessageFlag>& flag, |
| 62 const base::Closure& release_msg_cb); |
| 63 virtual ~FifoOwnedMemory(); |
| 64 |
| 65 // MediaMemoryChunk implementation. |
| 66 virtual void* data() const OVERRIDE { return data_; } |
| 67 virtual size_t size() const OVERRIDE { return size_; } |
| 68 virtual bool valid() const OVERRIDE { return flag_->IsValid(); } |
| 69 |
| 70 private: |
| 71 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 72 base::Closure release_msg_cb_; |
| 73 |
| 74 void* const data_; |
| 75 const size_t size_; |
| 76 scoped_refptr<MediaMessageFlag> flag_; |
| 77 |
| 78 DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory); |
| 79 }; |
| 80 |
| 81 FifoOwnedMemory::FifoOwnedMemory( |
| 82 void* data, size_t size, |
| 83 const scoped_refptr<MediaMessageFlag>& flag, |
| 84 const base::Closure& release_msg_cb) |
| 85 : task_runner_(base::MessageLoopProxy::current()), |
| 86 release_msg_cb_(release_msg_cb), |
| 87 data_(data), |
| 88 size_(size), |
| 89 flag_(flag) { |
| 90 } |
| 91 |
| 92 FifoOwnedMemory::~FifoOwnedMemory() { |
| 93 // Release the flag before notifying that the message has been released. |
| 94 flag_ = scoped_refptr<MediaMessageFlag>(); |
| 95 if (!release_msg_cb_.is_null()) { |
| 96 if (task_runner_->BelongsToCurrentThread()) { |
| 97 release_msg_cb_.Run(); |
| 98 } else { |
| 99 task_runner_->PostTask(FROM_HERE, release_msg_cb_); |
| 100 } |
| 101 } |
| 102 } |
| 103 |
| 104 MediaMessageFifo::MediaMessageFifo( |
| 105 scoped_ptr<MediaMemoryChunk> mem, bool init) |
| 106 : mem_(mem.Pass()), |
| 107 weak_factory_(this) { |
| 108 CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor), |
| 109 0u); |
| 110 CHECK_GE(mem_->size(), sizeof(Descriptor)); |
| 111 Descriptor* desc = static_cast<Descriptor*>(mem_->data()); |
| 112 base_ = static_cast<void*>(&desc->first_item); |
| 113 |
| 114 // TODO(damienv): remove cast when atomic size_t is defined in Chrome. |
| 115 // Currently, the sign differs. |
| 116 rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset)); |
| 117 wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset)); |
| 118 |
| 119 size_t max_size = mem_->size() - |
| 120 (static_cast<char*>(base_) - static_cast<char*>(mem_->data())); |
| 121 if (init) { |
| 122 size_ = max_size; |
| 123 desc->size = size_; |
| 124 internal_rd_offset_ = 0; |
| 125 internal_wr_offset_ = 0; |
| 126 base::subtle::Acquire_Store(rd_offset_, 0); |
| 127 base::subtle::Acquire_Store(wr_offset_, 0); |
| 128 } else { |
| 129 size_ = desc->size; |
| 130 CHECK_LE(size_, max_size); |
| 131 internal_rd_offset_ = current_rd_offset(); |
| 132 internal_wr_offset_ = current_wr_offset(); |
| 133 } |
| 134 CMALOG(kLogControl) |
| 135 << "MediaMessageFifo:" << " init=" << init << " size=" << size_; |
| 136 CHECK_GT(size_, 0) << size_; |
| 137 |
| 138 weak_this_ = weak_factory_.GetWeakPtr(); |
| 139 thread_checker_.DetachFromThread(); |
| 140 } |
| 141 |
| 142 MediaMessageFifo::~MediaMessageFifo() { |
| 143 DCHECK(thread_checker_.CalledOnValidThread()); |
| 144 } |
| 145 |
| 146 void MediaMessageFifo::ObserveReadActivity( |
| 147 const base::Closure& read_event_cb) { |
| 148 read_event_cb_ = read_event_cb; |
| 149 } |
| 150 |
| 151 void MediaMessageFifo::ObserveWriteActivity( |
| 152 const base::Closure& write_event_cb) { |
| 153 write_event_cb_ = write_event_cb; |
| 154 } |
| 155 |
| 156 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory( |
| 157 size_t size_to_reserve) { |
| 158 DCHECK(thread_checker_.CalledOnValidThread()); |
| 159 |
| 160 // Capture first both the read and write offsets. |
| 161 // and exit right away if not enough free space. |
| 162 size_t wr_offset = internal_wr_offset(); |
| 163 size_t rd_offset = current_rd_offset(); |
| 164 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_; |
| 165 size_t free_size = size_ - 1 - allocated_size; |
| 166 if (free_size < size_to_reserve) |
| 167 return scoped_ptr<MediaMemoryChunk>(); |
| 168 CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve); |
| 169 |
| 170 // Note: in the next 2 conditions, we have: |
| 171 // trailing_byte_count < size_to_reserve |
| 172 // and since at this stage: size_to_reserve <= free_size |
| 173 // we also have trailing_byte_count <= free_size |
| 174 // which means that all the trailing bytes are free space in the fifo. |
| 175 size_t trailing_byte_count = size_ - wr_offset; |
| 176 if (trailing_byte_count < MediaMessage::minimum_msg_size()) { |
| 177 // If there is no space to even write the smallest message, |
| 178 // skip the trailing bytes and come back to the beginning of the fifo. |
| 179 // (no way to insert a padding message). |
| 180 if (free_size < trailing_byte_count) |
| 181 return scoped_ptr<MediaMemoryChunk>(); |
| 182 wr_offset = 0; |
| 183 CommitInternalWrite(wr_offset); |
| 184 |
| 185 } else if (trailing_byte_count < size_to_reserve) { |
| 186 // At this point, we know we have at least the space to write a message. |
| 187 // However, to avoid splitting a message, a padding message is needed. |
| 188 scoped_ptr<MediaMemoryChunk> mem( |
| 189 ReserveMemoryNoCheck(trailing_byte_count)); |
| 190 scoped_ptr<MediaMessage> padding_message( |
| 191 MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass())); |
| 192 } |
| 193 |
| 194 // Recalculate the free size and exit if not enough free space. |
| 195 wr_offset = internal_wr_offset(); |
| 196 allocated_size = (size_ + wr_offset - rd_offset) % size_; |
| 197 free_size = size_ - 1 - allocated_size; |
| 198 if (free_size < size_to_reserve) |
| 199 return scoped_ptr<MediaMemoryChunk>(); |
| 200 |
| 201 return ReserveMemoryNoCheck(size_to_reserve); |
| 202 } |
| 203 |
| 204 scoped_ptr<MediaMessage> MediaMessageFifo::Pop() { |
| 205 DCHECK(thread_checker_.CalledOnValidThread()); |
| 206 |
| 207 // Capture the read and write offsets. |
| 208 size_t rd_offset = internal_rd_offset(); |
| 209 size_t wr_offset = current_wr_offset(); |
| 210 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_; |
| 211 |
| 212 if (allocated_size < MediaMessage::minimum_msg_size()) |
| 213 return scoped_ptr<MediaMessage>(); |
| 214 |
| 215 size_t trailing_byte_count = size_ - rd_offset; |
| 216 if (trailing_byte_count < MediaMessage::minimum_msg_size()) { |
| 217 // If there is no space to even have the smallest message, |
| 218 // skip the trailing bytes and come back to the beginning of the fifo. |
| 219 // Note: all the trailing bytes correspond to allocated bytes since: |
| 220 // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size |
| 221 rd_offset = 0; |
| 222 allocated_size -= trailing_byte_count; |
| 223 trailing_byte_count = size_; |
| 224 CommitInternalRead(rd_offset); |
| 225 } |
| 226 |
| 227 // The message should not be longer than the allocated size |
| 228 // but since a message is a contiguous area of memory, it should also be |
| 229 // smaller than |trailing_byte_count|. |
| 230 size_t max_msg_size = std::min(allocated_size, trailing_byte_count); |
| 231 if (max_msg_size < MediaMessage::minimum_msg_size()) |
| 232 return scoped_ptr<MediaMessage>(); |
| 233 void* msg_src = static_cast<uint8*>(base_) + rd_offset; |
| 234 |
| 235 // Create a flag to protect the serialized structure of the message |
| 236 // from being overwritten. |
| 237 // The serialized structure starts at offset |rd_offset|. |
| 238 scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset)); |
| 239 rd_flags_.push_back(rd_flag); |
| 240 scoped_ptr<MediaMemoryChunk> mem( |
| 241 new FifoOwnedMemory( |
| 242 msg_src, max_msg_size, rd_flag, |
| 243 base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_))); |
| 244 |
| 245 // Create the message which wraps its the serialized structure. |
| 246 scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass())); |
| 247 CHECK(message); |
| 248 |
| 249 // Update the internal read pointer. |
| 250 rd_offset = (rd_offset + message->size()) % size_; |
| 251 CommitInternalRead(rd_offset); |
| 252 |
| 253 return message.Pass(); |
| 254 } |
| 255 |
| 256 void MediaMessageFifo::Flush() { |
| 257 DCHECK(thread_checker_.CalledOnValidThread()); |
| 258 |
| 259 size_t wr_offset = current_wr_offset(); |
| 260 |
| 261 // Invalidate every memory region before flushing. |
| 262 while (!rd_flags_.empty()) { |
| 263 CMALOG(kLogControl) << "Invalidate flag"; |
| 264 rd_flags_.front()->Invalidate(); |
| 265 rd_flags_.pop_front(); |
| 266 } |
| 267 |
| 268 // Flush by setting the read pointer to the value of the write pointer. |
| 269 // Update first the internal read pointer then the public one. |
| 270 CommitInternalRead(wr_offset); |
| 271 CommitRead(wr_offset); |
| 272 } |
| 273 |
| 274 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck( |
| 275 size_t size_to_reserve) { |
| 276 size_t wr_offset = internal_wr_offset(); |
| 277 |
| 278 // Memory block corresponding to the serialized structure of the message. |
| 279 void* msg_start = static_cast<uint8*>(base_) + wr_offset; |
| 280 scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset)); |
| 281 wr_flags_.push_back(wr_flag); |
| 282 scoped_ptr<MediaMemoryChunk> mem( |
| 283 new FifoOwnedMemory( |
| 284 msg_start, size_to_reserve, wr_flag, |
| 285 base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_))); |
| 286 |
| 287 // Update the internal write pointer. |
| 288 wr_offset = (wr_offset + size_to_reserve) % size_; |
| 289 CommitInternalWrite(wr_offset); |
| 290 |
| 291 return mem.Pass(); |
| 292 } |
| 293 |
| 294 void MediaMessageFifo::OnWrMemoryReleased() { |
| 295 DCHECK(thread_checker_.CalledOnValidThread()); |
| 296 |
| 297 if (wr_flags_.empty()) { |
| 298 // Sanity check: when there is no protected memory area, |
| 299 // the external write offset has no reason to be different from |
| 300 // the internal write offset. |
| 301 DCHECK_EQ(current_wr_offset(), internal_wr_offset()); |
| 302 return; |
| 303 } |
| 304 |
| 305 // Update the external write offset. |
| 306 while (!wr_flags_.empty() && |
| 307 (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) { |
| 308 // TODO(damienv): Could add a sanity check to make sure the offset is |
| 309 // between the external write offset and the read offset (not included). |
| 310 wr_flags_.pop_front(); |
| 311 } |
| 312 |
| 313 // Update the read offset to the first locked memory area |
| 314 // or to the internal read pointer if nothing prevents it. |
| 315 size_t external_wr_offset = internal_wr_offset(); |
| 316 if (!wr_flags_.empty()) |
| 317 external_wr_offset = wr_flags_.front()->offset(); |
| 318 CommitWrite(external_wr_offset); |
| 319 } |
| 320 |
| 321 void MediaMessageFifo::OnRdMemoryReleased() { |
| 322 DCHECK(thread_checker_.CalledOnValidThread()); |
| 323 |
| 324 if (rd_flags_.empty()) { |
| 325 // Sanity check: when there is no protected memory area, |
| 326 // the external read offset has no reason to be different from |
| 327 // the internal read offset. |
| 328 DCHECK_EQ(current_rd_offset(), internal_rd_offset()); |
| 329 return; |
| 330 } |
| 331 |
| 332 // Update the external read offset. |
| 333 while (!rd_flags_.empty() && |
| 334 (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) { |
| 335 // TODO(damienv): Could add a sanity check to make sure the offset is |
| 336 // between the external read offset and the write offset. |
| 337 rd_flags_.pop_front(); |
| 338 } |
| 339 |
| 340 // Update the read offset to the first locked memory area |
| 341 // or to the internal read pointer if nothing prevents it. |
| 342 size_t external_rd_offset = internal_rd_offset(); |
| 343 if (!rd_flags_.empty()) |
| 344 external_rd_offset = rd_flags_.front()->offset(); |
| 345 CommitRead(external_rd_offset); |
| 346 } |
| 347 |
| 348 size_t MediaMessageFifo::current_rd_offset() const { |
| 349 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize)); |
| 350 size_t rd_offset = base::subtle::Acquire_Load(rd_offset_); |
| 351 CHECK_LT(rd_offset, size_); |
| 352 return rd_offset; |
| 353 } |
| 354 |
| 355 size_t MediaMessageFifo::current_wr_offset() const { |
| 356 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize)); |
| 357 |
| 358 // When the fifo consumer acquires the write offset, |
| 359 // we have to make sure that any possible following reads are actually |
| 360 // returning results at least inline with the memory snapshot taken |
| 361 // when the write offset was sampled. |
| 362 // That's why an Acquire_Load is used here. |
| 363 size_t wr_offset = base::subtle::Acquire_Load(wr_offset_); |
| 364 CHECK_LT(wr_offset, size_); |
| 365 return wr_offset; |
| 366 } |
| 367 |
| 368 void MediaMessageFifo::CommitRead(size_t new_rd_offset) { |
| 369 // Add a memory fence to ensure the message content is completely read |
| 370 // before updating the read offset. |
| 371 base::subtle::Release_Store(rd_offset_, new_rd_offset); |
| 372 |
| 373 // Make sure the read pointer has been updated before sending a notification. |
| 374 if (!read_event_cb_.is_null()) { |
| 375 base::subtle::MemoryBarrier(); |
| 376 read_event_cb_.Run(); |
| 377 } |
| 378 } |
| 379 |
| 380 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) { |
| 381 // Add a memory fence to ensure the message content is written |
| 382 // before updating the write offset. |
| 383 base::subtle::Release_Store(wr_offset_, new_wr_offset); |
| 384 |
| 385 // Make sure the write pointer has been updated before sending a notification. |
| 386 if (!write_event_cb_.is_null()) { |
| 387 base::subtle::MemoryBarrier(); |
| 388 write_event_cb_.Run(); |
| 389 } |
| 390 } |
| 391 |
| 392 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) { |
| 393 internal_rd_offset_ = new_rd_offset; |
| 394 } |
| 395 |
| 396 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) { |
| 397 internal_wr_offset_ = new_wr_offset; |
| 398 } |
| 399 |
| 400 } // namespace media |
| 401 } // namespace chromecast |
OLD | NEW |