| Index: chromecast/media/cma/ipc/media_message_fifo.cc
|
| diff --git a/chromecast/media/cma/ipc/media_message_fifo.cc b/chromecast/media/cma/ipc/media_message_fifo.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d3dad3b7c9f0a7f0ae4b6d37a82a7bf2c2fa7691
|
| --- /dev/null
|
| +++ b/chromecast/media/cma/ipc/media_message_fifo.cc
|
| @@ -0,0 +1,395 @@
|
| +// Copyright 2014 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "chromecast/media/cma/ipc/media_message_fifo.h"
|
| +
|
| +#include "base/atomicops.h"
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/message_loop/message_loop_proxy.h"
|
| +#include "chromecast/media/cma/base/cma_logging.h"
|
| +#include "chromecast/media/cma/ipc/media_memory_chunk.h"
|
| +#include "chromecast/media/cma/ipc/media_message.h"
|
| +#include "chromecast/media/cma/ipc/media_message_type.h"
|
| +
|
| +namespace chromecast {
|
| +namespace media {
|
| +
|
| +class MediaMessageFlag
|
| + : public base::RefCountedThreadSafe<MediaMessageFlag> {
|
| + public:
|
| + // |offset| is the offset in the fifo of the media message.
|
| + explicit MediaMessageFlag(size_t offset);
|
| +
|
| + bool IsValid() const;
|
| +
|
| + void Invalidate();
|
| +
|
| + size_t offset() const { return offset_; }
|
| +
|
| + private:
|
| + friend class base::RefCountedThreadSafe<MediaMessageFlag>;
|
| + virtual ~MediaMessageFlag();
|
| +
|
| + const size_t offset_;
|
| + bool flag_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
|
| +};
|
| +
|
| +MediaMessageFlag::MediaMessageFlag(size_t offset)
|
| + : offset_(offset),
|
| + flag_(true) {
|
| +}
|
| +
|
| +MediaMessageFlag::~MediaMessageFlag() {
|
| +}
|
| +
|
| +bool MediaMessageFlag::IsValid() const {
|
| + return flag_;
|
| +}
|
| +
|
| +void MediaMessageFlag::Invalidate() {
|
| + flag_ = false;
|
| +}
|
| +
|
| +class FifoOwnedMemory : public MediaMemoryChunk {
|
| + public:
|
| + FifoOwnedMemory(void* data, size_t size,
|
| + const scoped_refptr<MediaMessageFlag>& flag,
|
| + const base::Closure& release_msg_cb);
|
| + virtual ~FifoOwnedMemory();
|
| +
|
| + // MediaMemoryChunk implementation.
|
| + virtual void* data() const OVERRIDE { return data_; }
|
| + virtual size_t size() const OVERRIDE { return size_; }
|
| + virtual bool valid() const OVERRIDE { return flag_->IsValid(); }
|
| +
|
| + private:
|
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
|
| + base::Closure release_msg_cb_;
|
| +
|
| + void* const data_;
|
| + const size_t size_;
|
| + scoped_refptr<MediaMessageFlag> flag_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
|
| +};
|
| +
|
| +FifoOwnedMemory::FifoOwnedMemory(
|
| + void* data, size_t size,
|
| + const scoped_refptr<MediaMessageFlag>& flag,
|
| + const base::Closure& release_msg_cb)
|
| + : task_runner_(base::MessageLoopProxy::current()),
|
| + release_msg_cb_(release_msg_cb),
|
| + data_(data),
|
| + size_(size),
|
| + flag_(flag) {
|
| +}
|
| +
|
| +FifoOwnedMemory::~FifoOwnedMemory() {
|
| + // Release the flag before notifying that the message has been released.
|
| + flag_ = scoped_refptr<MediaMessageFlag>();
|
| + if (!release_msg_cb_.is_null()) {
|
| + if (task_runner_->BelongsToCurrentThread()) {
|
| + release_msg_cb_.Run();
|
| + } else {
|
| + task_runner_->PostTask(FROM_HERE, release_msg_cb_);
|
| + }
|
| + }
|
| +}
|
| +
|
| +MediaMessageFifo::MediaMessageFifo(
|
| + scoped_ptr<MediaMemoryChunk> mem, bool init)
|
| + : mem_(mem.Pass()),
|
| + weak_factory_(this) {
|
| + CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
|
| + 0u);
|
| + CHECK_GE(mem_->size(), sizeof(Descriptor));
|
| + Descriptor* desc = static_cast<Descriptor*>(mem_->data());
|
| + base_ = static_cast<void*>(&desc->first_item);
|
| +
|
| + // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
|
| + // Currently, the sign differs.
|
| + rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
|
| + wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
|
| +
|
| + size_t max_size = mem_->size() -
|
| + (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
|
| + if (init) {
|
| + size_ = max_size;
|
| + desc->size = size_;
|
| + internal_rd_offset_ = 0;
|
| + internal_wr_offset_ = 0;
|
| + base::subtle::Acquire_Store(rd_offset_, 0);
|
| + base::subtle::Acquire_Store(wr_offset_, 0);
|
| + } else {
|
| + size_ = desc->size;
|
| + CHECK_LE(size_, max_size);
|
| + internal_rd_offset_ = current_rd_offset();
|
| + internal_wr_offset_ = current_wr_offset();
|
| + }
|
| + CMALOG(kLogControl)
|
| + << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
|
| + CHECK_GT(size_, 0) << size_;
|
| +
|
| + weak_this_ = weak_factory_.GetWeakPtr();
|
| + thread_checker_.DetachFromThread();
|
| +}
|
| +
|
| +MediaMessageFifo::~MediaMessageFifo() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +}
|
| +
|
| +void MediaMessageFifo::ObserveReadActivity(
|
| + const base::Closure& read_event_cb) {
|
| + read_event_cb_ = read_event_cb;
|
| +}
|
| +
|
| +void MediaMessageFifo::ObserveWriteActivity(
|
| + const base::Closure& write_event_cb) {
|
| + write_event_cb_ = write_event_cb;
|
| +}
|
| +
|
| +scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
|
| + size_t size_to_reserve) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // Capture first both the read and write offsets.
|
| + // and exit right away if not enough free space.
|
| + size_t wr_offset = internal_wr_offset();
|
| + size_t rd_offset = current_rd_offset();
|
| + size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
|
| + size_t free_size = size_ - 1 - allocated_size;
|
| + if (free_size < size_to_reserve)
|
| + return scoped_ptr<MediaMemoryChunk>();
|
| + CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
|
| +
|
| + // Note: in the next 2 conditions, we have:
|
| + // trailing_byte_count < size_to_reserve
|
| + // and since at this stage: size_to_reserve <= free_size
|
| + // we also have trailing_byte_count <= free_size
|
| + // which means that all the trailing bytes are free space in the fifo.
|
| + size_t trailing_byte_count = size_ - wr_offset;
|
| + if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
|
| + // If there is no space to even write the smallest message,
|
| + // skip the trailing bytes and come back to the beginning of the fifo.
|
| + // (no way to insert a padding message).
|
| + if (free_size < trailing_byte_count)
|
| + return scoped_ptr<MediaMemoryChunk>();
|
| + wr_offset = 0;
|
| + CommitInternalWrite(wr_offset);
|
| +
|
| + } else if (trailing_byte_count < size_to_reserve) {
|
| + // At this point, we know we have at least the space to write a message.
|
| + // However, to avoid splitting a message, a padding message is needed.
|
| + scoped_ptr<MediaMemoryChunk> mem(
|
| + ReserveMemoryNoCheck(trailing_byte_count));
|
| + scoped_ptr<MediaMessage> padding_message(
|
| + MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
|
| + }
|
| +
|
| + // Recalculate the free size and exit if not enough free space.
|
| + wr_offset = internal_wr_offset();
|
| + allocated_size = (size_ + wr_offset - rd_offset) % size_;
|
| + free_size = size_ - 1 - allocated_size;
|
| + if (free_size < size_to_reserve)
|
| + return scoped_ptr<MediaMemoryChunk>();
|
| +
|
| + return ReserveMemoryNoCheck(size_to_reserve);
|
| +}
|
| +
|
| +scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // Capture the read and write offsets.
|
| + size_t rd_offset = internal_rd_offset();
|
| + size_t wr_offset = current_wr_offset();
|
| + size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
|
| +
|
| + if (allocated_size < MediaMessage::minimum_msg_size())
|
| + return scoped_ptr<MediaMessage>();
|
| +
|
| + size_t trailing_byte_count = size_ - rd_offset;
|
| + if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
|
| + // If there is no space to even have the smallest message,
|
| + // skip the trailing bytes and come back to the beginning of the fifo.
|
| + // Note: all the trailing bytes correspond to allocated bytes since:
|
| + // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
|
| + rd_offset = 0;
|
| + allocated_size -= trailing_byte_count;
|
| + trailing_byte_count = size_;
|
| + CommitInternalRead(rd_offset);
|
| + }
|
| +
|
| + // The message should not be longer than the allocated size
|
| + // but since a message is a contiguous area of memory, it should also be
|
| + // smaller than |trailing_byte_count|.
|
| + size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
|
| + if (max_msg_size < MediaMessage::minimum_msg_size())
|
| + return scoped_ptr<MediaMessage>();
|
| + void* msg_src = static_cast<uint8*>(base_) + rd_offset;
|
| +
|
| + // Create a flag to protect the serialized structure of the message
|
| + // from being overwritten.
|
| + // The serialized structure starts at offset |rd_offset|.
|
| + scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
|
| + rd_flags_.push_back(rd_flag);
|
| + scoped_ptr<MediaMemoryChunk> mem(
|
| + new FifoOwnedMemory(
|
| + msg_src, max_msg_size, rd_flag,
|
| + base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
|
| +
|
| + // Create the message which wraps its the serialized structure.
|
| + scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
|
| + CHECK(message);
|
| +
|
| + // Update the internal read pointer.
|
| + rd_offset = (rd_offset + message->size()) % size_;
|
| + CommitInternalRead(rd_offset);
|
| +
|
| + return message.Pass();
|
| +}
|
| +
|
| +void MediaMessageFifo::Flush() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + size_t wr_offset = current_wr_offset();
|
| +
|
| + // Invalidate every memory region before flushing.
|
| + while (!rd_flags_.empty()) {
|
| + CMALOG(kLogControl) << "Invalidate flag";
|
| + rd_flags_.front()->Invalidate();
|
| + rd_flags_.pop_front();
|
| + }
|
| +
|
| + // Flush by setting the read pointer to the value of the write pointer.
|
| + // Update first the internal read pointer then the public one.
|
| + CommitInternalRead(wr_offset);
|
| + CommitRead(wr_offset);
|
| +}
|
| +
|
| +scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
|
| + size_t size_to_reserve) {
|
| + size_t wr_offset = internal_wr_offset();
|
| +
|
| + // Memory block corresponding to the serialized structure of the message.
|
| + void* msg_start = static_cast<uint8*>(base_) + wr_offset;
|
| + scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
|
| + wr_flags_.push_back(wr_flag);
|
| + scoped_ptr<MediaMemoryChunk> mem(
|
| + new FifoOwnedMemory(
|
| + msg_start, size_to_reserve, wr_flag,
|
| + base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
|
| +
|
| + // Update the internal write pointer.
|
| + wr_offset = (wr_offset + size_to_reserve) % size_;
|
| + CommitInternalWrite(wr_offset);
|
| +
|
| + return mem.Pass();
|
| +}
|
| +
|
| +void MediaMessageFifo::OnWrMemoryReleased() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + if (wr_flags_.empty()) {
|
| + // Sanity check: when there is no protected memory area,
|
| + // the external write offset has no reason to be different from
|
| + // the internal write offset.
|
| + DCHECK_EQ(current_wr_offset(), internal_wr_offset());
|
| + return;
|
| + }
|
| +
|
| + // Update the external write offset.
|
| + while (!wr_flags_.empty() &&
|
| + (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
|
| + // TODO(damienv): Could add a sanity check to make sure the offset is
|
| + // between the external write offset and the read offset (not included).
|
| + wr_flags_.pop_front();
|
| + }
|
| +
|
| + // Update the read offset to the first locked memory area
|
| + // or to the internal read pointer if nothing prevents it.
|
| + size_t external_wr_offset = internal_wr_offset();
|
| + if (!wr_flags_.empty())
|
| + external_wr_offset = wr_flags_.front()->offset();
|
| + CommitWrite(external_wr_offset);
|
| +}
|
| +
|
| +void MediaMessageFifo::OnRdMemoryReleased() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + if (rd_flags_.empty()) {
|
| + // Sanity check: when there is no protected memory area,
|
| + // the external read offset has no reason to be different from
|
| + // the internal read offset.
|
| + DCHECK_EQ(current_rd_offset(), internal_rd_offset());
|
| + return;
|
| + }
|
| +
|
| + // Update the external read offset.
|
| + while (!rd_flags_.empty() &&
|
| + (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
|
| + // TODO(damienv): Could add a sanity check to make sure the offset is
|
| + // between the external read offset and the write offset.
|
| + rd_flags_.pop_front();
|
| + }
|
| +
|
| + // Update the read offset to the first locked memory area
|
| + // or to the internal read pointer if nothing prevents it.
|
| + size_t external_rd_offset = internal_rd_offset();
|
| + if (!rd_flags_.empty())
|
| + external_rd_offset = rd_flags_.front()->offset();
|
| + CommitRead(external_rd_offset);
|
| +}
|
| +
|
| +size_t MediaMessageFifo::current_rd_offset() const {
|
| + DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
|
| + size_t rd_offset = base::subtle::NoBarrier_Load(rd_offset_);
|
| + CHECK_LT(rd_offset, size_);
|
| + return rd_offset;
|
| +}
|
| +
|
| +size_t MediaMessageFifo::current_wr_offset() const {
|
| + DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
|
| + size_t wr_offset = base::subtle::NoBarrier_Load(wr_offset_);
|
| + CHECK_LT(wr_offset, size_);
|
| + return wr_offset;
|
| +}
|
| +
|
| +void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
|
| + // Add a memory fence to ensure the message content is completely read
|
| + // before updating the read offset.
|
| + base::subtle::Release_Store(rd_offset_, new_rd_offset);
|
| +
|
| + // Make sure the read pointer has been updated before sending a notification.
|
| + if (!read_event_cb_.is_null()) {
|
| + base::subtle::MemoryBarrier();
|
| + read_event_cb_.Run();
|
| + }
|
| +}
|
| +
|
| +void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
|
| + // Add a memory fence to ensure the message content is written
|
| + // before updating the write offset.
|
| + base::subtle::Release_Store(wr_offset_, new_wr_offset);
|
| +
|
| + // Make sure the write pointer has been updated before sending a notification.
|
| + if (!write_event_cb_.is_null()) {
|
| + base::subtle::MemoryBarrier();
|
| + write_event_cb_.Run();
|
| + }
|
| +}
|
| +
|
| +void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
|
| + internal_rd_offset_ = new_rd_offset;
|
| +}
|
| +
|
| +void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
|
| + internal_wr_offset_ = new_wr_offset;
|
| +}
|
| +
|
| +} // namespace media
|
| +} // namespace chromecast
|
|
|