Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6659)

Unified Diff: chromecast/media/cma/ipc/media_message_fifo.cc

Issue 529223003: IPC to pass media data using a lock free circular fifo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Add more comments. Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chromecast/media/cma/ipc/media_message_fifo.cc
diff --git a/chromecast/media/cma/ipc/media_message_fifo.cc b/chromecast/media/cma/ipc/media_message_fifo.cc
new file mode 100644
index 0000000000000000000000000000000000000000..d3dad3b7c9f0a7f0ae4b6d37a82a7bf2c2fa7691
--- /dev/null
+++ b/chromecast/media/cma/ipc/media_message_fifo.cc
@@ -0,0 +1,395 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chromecast/media/cma/ipc/media_message_fifo.h"
+
+#include "base/atomicops.h"
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "chromecast/media/cma/base/cma_logging.h"
+#include "chromecast/media/cma/ipc/media_memory_chunk.h"
+#include "chromecast/media/cma/ipc/media_message.h"
+#include "chromecast/media/cma/ipc/media_message_type.h"
+
+namespace chromecast {
+namespace media {
+
+class MediaMessageFlag
+ : public base::RefCountedThreadSafe<MediaMessageFlag> {
+ public:
+ // |offset| is the offset in the fifo of the media message.
+ explicit MediaMessageFlag(size_t offset);
+
+ bool IsValid() const;
+
+ void Invalidate();
+
+ size_t offset() const { return offset_; }
+
+ private:
+ friend class base::RefCountedThreadSafe<MediaMessageFlag>;
+ virtual ~MediaMessageFlag();
+
+ const size_t offset_;
+ bool flag_;
+
+ DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
+};
+
+MediaMessageFlag::MediaMessageFlag(size_t offset)
+ : offset_(offset),
+ flag_(true) {
+}
+
+MediaMessageFlag::~MediaMessageFlag() {
+}
+
+bool MediaMessageFlag::IsValid() const {
+ return flag_;
+}
+
+void MediaMessageFlag::Invalidate() {
+ flag_ = false;
+}
+
+class FifoOwnedMemory : public MediaMemoryChunk {
+ public:
+ FifoOwnedMemory(void* data, size_t size,
+ const scoped_refptr<MediaMessageFlag>& flag,
+ const base::Closure& release_msg_cb);
+ virtual ~FifoOwnedMemory();
+
+ // MediaMemoryChunk implementation.
+ virtual void* data() const OVERRIDE { return data_; }
+ virtual size_t size() const OVERRIDE { return size_; }
+ virtual bool valid() const OVERRIDE { return flag_->IsValid(); }
+
+ private:
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ base::Closure release_msg_cb_;
+
+ void* const data_;
+ const size_t size_;
+ scoped_refptr<MediaMessageFlag> flag_;
+
+ DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
+};
+
+FifoOwnedMemory::FifoOwnedMemory(
+ void* data, size_t size,
+ const scoped_refptr<MediaMessageFlag>& flag,
+ const base::Closure& release_msg_cb)
+ : task_runner_(base::MessageLoopProxy::current()),
+ release_msg_cb_(release_msg_cb),
+ data_(data),
+ size_(size),
+ flag_(flag) {
+}
+
+FifoOwnedMemory::~FifoOwnedMemory() {
+ // Release the flag before notifying that the message has been released.
+ flag_ = scoped_refptr<MediaMessageFlag>();
+ if (!release_msg_cb_.is_null()) {
+ if (task_runner_->BelongsToCurrentThread()) {
+ release_msg_cb_.Run();
+ } else {
+ task_runner_->PostTask(FROM_HERE, release_msg_cb_);
+ }
+ }
+}
+
+MediaMessageFifo::MediaMessageFifo(
+ scoped_ptr<MediaMemoryChunk> mem, bool init)
+ : mem_(mem.Pass()),
+ weak_factory_(this) {
+ CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
+ 0u);
+ CHECK_GE(mem_->size(), sizeof(Descriptor));
+ Descriptor* desc = static_cast<Descriptor*>(mem_->data());
+ base_ = static_cast<void*>(&desc->first_item);
+
+ // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
+ // Currently, the sign differs.
+ rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
+ wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
+
+ size_t max_size = mem_->size() -
+ (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
+ if (init) {
+ size_ = max_size;
+ desc->size = size_;
+ internal_rd_offset_ = 0;
+ internal_wr_offset_ = 0;
+ base::subtle::Acquire_Store(rd_offset_, 0);
+ base::subtle::Acquire_Store(wr_offset_, 0);
+ } else {
+ size_ = desc->size;
+ CHECK_LE(size_, max_size);
+ internal_rd_offset_ = current_rd_offset();
+ internal_wr_offset_ = current_wr_offset();
+ }
+ CMALOG(kLogControl)
+ << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
+ CHECK_GT(size_, 0) << size_;
+
+ weak_this_ = weak_factory_.GetWeakPtr();
+ thread_checker_.DetachFromThread();
+}
+
+MediaMessageFifo::~MediaMessageFifo() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+}
+
+void MediaMessageFifo::ObserveReadActivity(
+ const base::Closure& read_event_cb) {
+ read_event_cb_ = read_event_cb;
+}
+
+void MediaMessageFifo::ObserveWriteActivity(
+ const base::Closure& write_event_cb) {
+ write_event_cb_ = write_event_cb;
+}
+
+scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
+ size_t size_to_reserve) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // Capture first both the read and write offsets.
+ // and exit right away if not enough free space.
+ size_t wr_offset = internal_wr_offset();
+ size_t rd_offset = current_rd_offset();
+ size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
+ size_t free_size = size_ - 1 - allocated_size;
+ if (free_size < size_to_reserve)
+ return scoped_ptr<MediaMemoryChunk>();
+ CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
+
+ // Note: in the next 2 conditions, we have:
+ // trailing_byte_count < size_to_reserve
+ // and since at this stage: size_to_reserve <= free_size
+ // we also have trailing_byte_count <= free_size
+ // which means that all the trailing bytes are free space in the fifo.
+ size_t trailing_byte_count = size_ - wr_offset;
+ if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
+ // If there is no space to even write the smallest message,
+ // skip the trailing bytes and come back to the beginning of the fifo.
+ // (no way to insert a padding message).
+ if (free_size < trailing_byte_count)
+ return scoped_ptr<MediaMemoryChunk>();
+ wr_offset = 0;
+ CommitInternalWrite(wr_offset);
+
+ } else if (trailing_byte_count < size_to_reserve) {
+ // At this point, we know we have at least the space to write a message.
+ // However, to avoid splitting a message, a padding message is needed.
+ scoped_ptr<MediaMemoryChunk> mem(
+ ReserveMemoryNoCheck(trailing_byte_count));
+ scoped_ptr<MediaMessage> padding_message(
+ MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
+ }
+
+ // Recalculate the free size and exit if not enough free space.
+ wr_offset = internal_wr_offset();
+ allocated_size = (size_ + wr_offset - rd_offset) % size_;
+ free_size = size_ - 1 - allocated_size;
+ if (free_size < size_to_reserve)
+ return scoped_ptr<MediaMemoryChunk>();
+
+ return ReserveMemoryNoCheck(size_to_reserve);
+}
+
+scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // Capture the read and write offsets.
+ size_t rd_offset = internal_rd_offset();
+ size_t wr_offset = current_wr_offset();
+ size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
+
+ if (allocated_size < MediaMessage::minimum_msg_size())
+ return scoped_ptr<MediaMessage>();
+
+ size_t trailing_byte_count = size_ - rd_offset;
+ if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
+ // If there is no space to even have the smallest message,
+ // skip the trailing bytes and come back to the beginning of the fifo.
+ // Note: all the trailing bytes correspond to allocated bytes since:
+ // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
+ rd_offset = 0;
+ allocated_size -= trailing_byte_count;
+ trailing_byte_count = size_;
+ CommitInternalRead(rd_offset);
+ }
+
+ // The message should not be longer than the allocated size
+ // but since a message is a contiguous area of memory, it should also be
+ // smaller than |trailing_byte_count|.
+ size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
+ if (max_msg_size < MediaMessage::minimum_msg_size())
+ return scoped_ptr<MediaMessage>();
+ void* msg_src = static_cast<uint8*>(base_) + rd_offset;
+
+ // Create a flag to protect the serialized structure of the message
+ // from being overwritten.
+ // The serialized structure starts at offset |rd_offset|.
+ scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
+ rd_flags_.push_back(rd_flag);
+ scoped_ptr<MediaMemoryChunk> mem(
+ new FifoOwnedMemory(
+ msg_src, max_msg_size, rd_flag,
+ base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
+
+ // Create the message which wraps its the serialized structure.
+ scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
+ CHECK(message);
+
+ // Update the internal read pointer.
+ rd_offset = (rd_offset + message->size()) % size_;
+ CommitInternalRead(rd_offset);
+
+ return message.Pass();
+}
+
+void MediaMessageFifo::Flush() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ size_t wr_offset = current_wr_offset();
+
+ // Invalidate every memory region before flushing.
+ while (!rd_flags_.empty()) {
+ CMALOG(kLogControl) << "Invalidate flag";
+ rd_flags_.front()->Invalidate();
+ rd_flags_.pop_front();
+ }
+
+ // Flush by setting the read pointer to the value of the write pointer.
+ // Update first the internal read pointer then the public one.
+ CommitInternalRead(wr_offset);
+ CommitRead(wr_offset);
+}
+
+scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
+ size_t size_to_reserve) {
+ size_t wr_offset = internal_wr_offset();
+
+ // Memory block corresponding to the serialized structure of the message.
+ void* msg_start = static_cast<uint8*>(base_) + wr_offset;
+ scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
+ wr_flags_.push_back(wr_flag);
+ scoped_ptr<MediaMemoryChunk> mem(
+ new FifoOwnedMemory(
+ msg_start, size_to_reserve, wr_flag,
+ base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
+
+ // Update the internal write pointer.
+ wr_offset = (wr_offset + size_to_reserve) % size_;
+ CommitInternalWrite(wr_offset);
+
+ return mem.Pass();
+}
+
+void MediaMessageFifo::OnWrMemoryReleased() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ if (wr_flags_.empty()) {
+ // Sanity check: when there is no protected memory area,
+ // the external write offset has no reason to be different from
+ // the internal write offset.
+ DCHECK_EQ(current_wr_offset(), internal_wr_offset());
+ return;
+ }
+
+ // Update the external write offset.
+ while (!wr_flags_.empty() &&
+ (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
+ // TODO(damienv): Could add a sanity check to make sure the offset is
+ // between the external write offset and the read offset (not included).
+ wr_flags_.pop_front();
+ }
+
+ // Update the read offset to the first locked memory area
+ // or to the internal read pointer if nothing prevents it.
+ size_t external_wr_offset = internal_wr_offset();
+ if (!wr_flags_.empty())
+ external_wr_offset = wr_flags_.front()->offset();
+ CommitWrite(external_wr_offset);
+}
+
+void MediaMessageFifo::OnRdMemoryReleased() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ if (rd_flags_.empty()) {
+ // Sanity check: when there is no protected memory area,
+ // the external read offset has no reason to be different from
+ // the internal read offset.
+ DCHECK_EQ(current_rd_offset(), internal_rd_offset());
+ return;
+ }
+
+ // Update the external read offset.
+ while (!rd_flags_.empty() &&
+ (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
+ // TODO(damienv): Could add a sanity check to make sure the offset is
+ // between the external read offset and the write offset.
+ rd_flags_.pop_front();
+ }
+
+ // Update the read offset to the first locked memory area
+ // or to the internal read pointer if nothing prevents it.
+ size_t external_rd_offset = internal_rd_offset();
+ if (!rd_flags_.empty())
+ external_rd_offset = rd_flags_.front()->offset();
+ CommitRead(external_rd_offset);
+}
+
+size_t MediaMessageFifo::current_rd_offset() const {
+ DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
+ size_t rd_offset = base::subtle::NoBarrier_Load(rd_offset_);
+ CHECK_LT(rd_offset, size_);
+ return rd_offset;
+}
+
+size_t MediaMessageFifo::current_wr_offset() const {
+ DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
+ size_t wr_offset = base::subtle::NoBarrier_Load(wr_offset_);
+ CHECK_LT(wr_offset, size_);
+ return wr_offset;
+}
+
+void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
+ // Add a memory fence to ensure the message content is completely read
+ // before updating the read offset.
+ base::subtle::Release_Store(rd_offset_, new_rd_offset);
+
+ // Make sure the read pointer has been updated before sending a notification.
+ if (!read_event_cb_.is_null()) {
+ base::subtle::MemoryBarrier();
+ read_event_cb_.Run();
+ }
+}
+
+void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
+ // Add a memory fence to ensure the message content is written
+ // before updating the write offset.
+ base::subtle::Release_Store(wr_offset_, new_wr_offset);
+
+ // Make sure the write pointer has been updated before sending a notification.
+ if (!write_event_cb_.is_null()) {
+ base::subtle::MemoryBarrier();
+ write_event_cb_.Run();
+ }
+}
+
+void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
+ internal_rd_offset_ = new_rd_offset;
+}
+
+void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
+ internal_wr_offset_ = new_wr_offset;
+}
+
+} // namespace media
+} // namespace chromecast

Powered by Google App Engine
This is Rietveld 408576698