| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "chromecast/media/cma/ipc/media_message_fifo.h" | |
| 6 | |
| 7 #include <utility> | |
| 8 | |
| 9 #include "base/atomicops.h" | |
| 10 #include "base/bind.h" | |
| 11 #include "base/location.h" | |
| 12 #include "base/logging.h" | |
| 13 #include "base/macros.h" | |
| 14 #include "base/single_thread_task_runner.h" | |
| 15 #include "base/threading/thread_task_runner_handle.h" | |
| 16 #include "chromecast/media/cma/base/cma_logging.h" | |
| 17 #include "chromecast/media/cma/ipc/media_memory_chunk.h" | |
| 18 #include "chromecast/media/cma/ipc/media_message.h" | |
| 19 #include "chromecast/media/cma/ipc/media_message_type.h" | |
| 20 | |
| 21 namespace chromecast { | |
| 22 namespace media { | |
| 23 | |
| 24 class MediaMessageFlag | |
| 25 : public base::RefCountedThreadSafe<MediaMessageFlag> { | |
| 26 public: | |
| 27 // |offset| is the offset in the fifo of the media message. | |
| 28 explicit MediaMessageFlag(size_t offset); | |
| 29 | |
| 30 bool IsValid() const; | |
| 31 | |
| 32 void Invalidate(); | |
| 33 | |
| 34 size_t offset() const { return offset_; } | |
| 35 | |
| 36 private: | |
| 37 friend class base::RefCountedThreadSafe<MediaMessageFlag>; | |
| 38 virtual ~MediaMessageFlag(); | |
| 39 | |
| 40 const size_t offset_; | |
| 41 bool flag_; | |
| 42 | |
| 43 DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag); | |
| 44 }; | |
| 45 | |
| 46 MediaMessageFlag::MediaMessageFlag(size_t offset) | |
| 47 : offset_(offset), | |
| 48 flag_(true) { | |
| 49 } | |
| 50 | |
| 51 MediaMessageFlag::~MediaMessageFlag() { | |
| 52 } | |
| 53 | |
| 54 bool MediaMessageFlag::IsValid() const { | |
| 55 return flag_; | |
| 56 } | |
| 57 | |
| 58 void MediaMessageFlag::Invalidate() { | |
| 59 flag_ = false; | |
| 60 } | |
| 61 | |
| 62 class FifoOwnedMemory : public MediaMemoryChunk { | |
| 63 public: | |
| 64 FifoOwnedMemory(void* data, size_t size, | |
| 65 const scoped_refptr<MediaMessageFlag>& flag, | |
| 66 const base::Closure& release_msg_cb); | |
| 67 ~FifoOwnedMemory() override; | |
| 68 | |
| 69 // MediaMemoryChunk implementation. | |
| 70 void* data() const override { return data_; } | |
| 71 size_t size() const override { return size_; } | |
| 72 bool valid() const override { return flag_->IsValid(); } | |
| 73 | |
| 74 private: | |
| 75 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | |
| 76 base::Closure release_msg_cb_; | |
| 77 | |
| 78 void* const data_; | |
| 79 const size_t size_; | |
| 80 scoped_refptr<MediaMessageFlag> flag_; | |
| 81 | |
| 82 DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory); | |
| 83 }; | |
| 84 | |
| 85 FifoOwnedMemory::FifoOwnedMemory(void* data, | |
| 86 size_t size, | |
| 87 const scoped_refptr<MediaMessageFlag>& flag, | |
| 88 const base::Closure& release_msg_cb) | |
| 89 : task_runner_(base::ThreadTaskRunnerHandle::Get()), | |
| 90 release_msg_cb_(release_msg_cb), | |
| 91 data_(data), | |
| 92 size_(size), | |
| 93 flag_(flag) { | |
| 94 } | |
| 95 | |
| 96 FifoOwnedMemory::~FifoOwnedMemory() { | |
| 97 // Release the flag before notifying that the message has been released. | |
| 98 flag_ = scoped_refptr<MediaMessageFlag>(); | |
| 99 if (!release_msg_cb_.is_null()) { | |
| 100 if (task_runner_->BelongsToCurrentThread()) { | |
| 101 release_msg_cb_.Run(); | |
| 102 } else { | |
| 103 task_runner_->PostTask(FROM_HERE, release_msg_cb_); | |
| 104 } | |
| 105 } | |
| 106 } | |
| 107 | |
| 108 MediaMessageFifo::MediaMessageFifo(std::unique_ptr<MediaMemoryChunk> mem, | |
| 109 bool init) | |
| 110 : mem_(std::move(mem)), weak_factory_(this) { | |
| 111 CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor), | |
| 112 0u); | |
| 113 CHECK_GE(mem_->size(), sizeof(Descriptor)); | |
| 114 Descriptor* desc = static_cast<Descriptor*>(mem_->data()); | |
| 115 base_ = static_cast<void*>(&desc->first_item); | |
| 116 | |
| 117 // TODO(damienv): remove cast when atomic size_t is defined in Chrome. | |
| 118 // Currently, the sign differs. | |
| 119 rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset)); | |
| 120 wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset)); | |
| 121 | |
| 122 size_t max_size = mem_->size() - | |
| 123 (static_cast<char*>(base_) - static_cast<char*>(mem_->data())); | |
| 124 if (init) { | |
| 125 size_ = max_size; | |
| 126 desc->size = size_; | |
| 127 internal_rd_offset_ = 0; | |
| 128 internal_wr_offset_ = 0; | |
| 129 base::subtle::Release_Store(rd_offset_, 0); | |
| 130 base::subtle::Release_Store(wr_offset_, 0); | |
| 131 } else { | |
| 132 size_ = desc->size; | |
| 133 CHECK_LE(size_, max_size); | |
| 134 internal_rd_offset_ = current_rd_offset(); | |
| 135 internal_wr_offset_ = current_wr_offset(); | |
| 136 } | |
| 137 CMALOG(kLogControl) | |
| 138 << "MediaMessageFifo:" << " init=" << init << " size=" << size_; | |
| 139 CHECK_GT(size_, 0u) << size_; | |
| 140 | |
| 141 weak_this_ = weak_factory_.GetWeakPtr(); | |
| 142 thread_checker_.DetachFromThread(); | |
| 143 } | |
| 144 | |
| 145 MediaMessageFifo::~MediaMessageFifo() { | |
| 146 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 147 } | |
| 148 | |
| 149 void MediaMessageFifo::ObserveReadActivity( | |
| 150 const base::Closure& read_event_cb) { | |
| 151 read_event_cb_ = read_event_cb; | |
| 152 } | |
| 153 | |
| 154 void MediaMessageFifo::ObserveWriteActivity( | |
| 155 const base::Closure& write_event_cb) { | |
| 156 write_event_cb_ = write_event_cb; | |
| 157 } | |
| 158 | |
| 159 std::unique_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory( | |
| 160 size_t size_to_reserve) { | |
| 161 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 162 | |
| 163 // Capture first both the read and write offsets. | |
| 164 // and exit right away if not enough free space. | |
| 165 size_t wr_offset = internal_wr_offset(); | |
| 166 size_t rd_offset = current_rd_offset(); | |
| 167 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_; | |
| 168 size_t free_size = size_ - 1 - allocated_size; | |
| 169 if (free_size < size_to_reserve) | |
| 170 return std::unique_ptr<MediaMemoryChunk>(); | |
| 171 CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve); | |
| 172 | |
| 173 // Note: in the next 2 conditions, we have: | |
| 174 // trailing_byte_count < size_to_reserve | |
| 175 // and since at this stage: size_to_reserve <= free_size | |
| 176 // we also have trailing_byte_count <= free_size | |
| 177 // which means that all the trailing bytes are free space in the fifo. | |
| 178 size_t trailing_byte_count = size_ - wr_offset; | |
| 179 if (trailing_byte_count < MediaMessage::minimum_msg_size()) { | |
| 180 // If there is no space to even write the smallest message, | |
| 181 // skip the trailing bytes and come back to the beginning of the fifo. | |
| 182 // (no way to insert a padding message). | |
| 183 if (free_size < trailing_byte_count) | |
| 184 return std::unique_ptr<MediaMemoryChunk>(); | |
| 185 wr_offset = 0; | |
| 186 CommitInternalWrite(wr_offset); | |
| 187 | |
| 188 } else if (trailing_byte_count < size_to_reserve) { | |
| 189 // At this point, we know we have at least the space to write a message. | |
| 190 // However, to avoid splitting a message, a padding message is needed. | |
| 191 std::unique_ptr<MediaMemoryChunk> mem( | |
| 192 ReserveMemoryNoCheck(trailing_byte_count)); | |
| 193 std::unique_ptr<MediaMessage> padding_message( | |
| 194 MediaMessage::CreateMessage(PaddingMediaMsg, std::move(mem))); | |
| 195 } | |
| 196 | |
| 197 // Recalculate the free size and exit if not enough free space. | |
| 198 wr_offset = internal_wr_offset(); | |
| 199 allocated_size = (size_ + wr_offset - rd_offset) % size_; | |
| 200 free_size = size_ - 1 - allocated_size; | |
| 201 if (free_size < size_to_reserve) | |
| 202 return std::unique_ptr<MediaMemoryChunk>(); | |
| 203 | |
| 204 return ReserveMemoryNoCheck(size_to_reserve); | |
| 205 } | |
| 206 | |
| 207 std::unique_ptr<MediaMessage> MediaMessageFifo::Pop() { | |
| 208 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 209 | |
| 210 // Capture the read and write offsets. | |
| 211 size_t rd_offset = internal_rd_offset(); | |
| 212 size_t wr_offset = current_wr_offset(); | |
| 213 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_; | |
| 214 | |
| 215 if (allocated_size < MediaMessage::minimum_msg_size()) | |
| 216 return std::unique_ptr<MediaMessage>(); | |
| 217 | |
| 218 size_t trailing_byte_count = size_ - rd_offset; | |
| 219 if (trailing_byte_count < MediaMessage::minimum_msg_size()) { | |
| 220 // If there is no space to even have the smallest message, | |
| 221 // skip the trailing bytes and come back to the beginning of the fifo. | |
| 222 // Note: all the trailing bytes correspond to allocated bytes since: | |
| 223 // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size | |
| 224 rd_offset = 0; | |
| 225 allocated_size -= trailing_byte_count; | |
| 226 trailing_byte_count = size_; | |
| 227 CommitInternalRead(rd_offset); | |
| 228 } | |
| 229 | |
| 230 // The message should not be longer than the allocated size | |
| 231 // but since a message is a contiguous area of memory, it should also be | |
| 232 // smaller than |trailing_byte_count|. | |
| 233 size_t max_msg_size = std::min(allocated_size, trailing_byte_count); | |
| 234 if (max_msg_size < MediaMessage::minimum_msg_size()) | |
| 235 return std::unique_ptr<MediaMessage>(); | |
| 236 void* msg_src = static_cast<uint8_t*>(base_) + rd_offset; | |
| 237 | |
| 238 // Create a flag to protect the serialized structure of the message | |
| 239 // from being overwritten. | |
| 240 // The serialized structure starts at offset |rd_offset|. | |
| 241 scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset)); | |
| 242 rd_flags_.push_back(rd_flag); | |
| 243 std::unique_ptr<MediaMemoryChunk> mem(new FifoOwnedMemory( | |
| 244 msg_src, max_msg_size, rd_flag, | |
| 245 base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_))); | |
| 246 | |
| 247 // Create the message which wraps its the serialized structure. | |
| 248 std::unique_ptr<MediaMessage> message( | |
| 249 MediaMessage::MapMessage(std::move(mem))); | |
| 250 CHECK(message); | |
| 251 | |
| 252 // Update the internal read pointer. | |
| 253 rd_offset = (rd_offset + message->size()) % size_; | |
| 254 CommitInternalRead(rd_offset); | |
| 255 | |
| 256 return message; | |
| 257 } | |
| 258 | |
| 259 void MediaMessageFifo::Flush() { | |
| 260 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 261 | |
| 262 size_t wr_offset = current_wr_offset(); | |
| 263 | |
| 264 // Invalidate every memory region before flushing. | |
| 265 while (!rd_flags_.empty()) { | |
| 266 CMALOG(kLogControl) << "Invalidate flag"; | |
| 267 rd_flags_.front()->Invalidate(); | |
| 268 rd_flags_.pop_front(); | |
| 269 } | |
| 270 | |
| 271 // Flush by setting the read pointer to the value of the write pointer. | |
| 272 // Update first the internal read pointer then the public one. | |
| 273 CommitInternalRead(wr_offset); | |
| 274 CommitRead(wr_offset); | |
| 275 } | |
| 276 | |
| 277 std::unique_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck( | |
| 278 size_t size_to_reserve) { | |
| 279 size_t wr_offset = internal_wr_offset(); | |
| 280 | |
| 281 // Memory block corresponding to the serialized structure of the message. | |
| 282 void* msg_start = static_cast<uint8_t*>(base_) + wr_offset; | |
| 283 scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset)); | |
| 284 wr_flags_.push_back(wr_flag); | |
| 285 std::unique_ptr<MediaMemoryChunk> mem(new FifoOwnedMemory( | |
| 286 msg_start, size_to_reserve, wr_flag, | |
| 287 base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_))); | |
| 288 | |
| 289 // Update the internal write pointer. | |
| 290 wr_offset = (wr_offset + size_to_reserve) % size_; | |
| 291 CommitInternalWrite(wr_offset); | |
| 292 | |
| 293 return mem; | |
| 294 } | |
| 295 | |
| 296 void MediaMessageFifo::OnWrMemoryReleased() { | |
| 297 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 298 | |
| 299 if (wr_flags_.empty()) { | |
| 300 // Sanity check: when there is no protected memory area, | |
| 301 // the external write offset has no reason to be different from | |
| 302 // the internal write offset. | |
| 303 DCHECK_EQ(current_wr_offset(), internal_wr_offset()); | |
| 304 return; | |
| 305 } | |
| 306 | |
| 307 // Update the external write offset. | |
| 308 while (!wr_flags_.empty() && | |
| 309 (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) { | |
| 310 // TODO(damienv): Could add a sanity check to make sure the offset is | |
| 311 // between the external write offset and the read offset (not included). | |
| 312 wr_flags_.pop_front(); | |
| 313 } | |
| 314 | |
| 315 // Update the read offset to the first locked memory area | |
| 316 // or to the internal read pointer if nothing prevents it. | |
| 317 size_t external_wr_offset = internal_wr_offset(); | |
| 318 if (!wr_flags_.empty()) | |
| 319 external_wr_offset = wr_flags_.front()->offset(); | |
| 320 CommitWrite(external_wr_offset); | |
| 321 } | |
| 322 | |
| 323 void MediaMessageFifo::OnRdMemoryReleased() { | |
| 324 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 325 | |
| 326 if (rd_flags_.empty()) { | |
| 327 // Sanity check: when there is no protected memory area, | |
| 328 // the external read offset has no reason to be different from | |
| 329 // the internal read offset. | |
| 330 DCHECK_EQ(current_rd_offset(), internal_rd_offset()); | |
| 331 return; | |
| 332 } | |
| 333 | |
| 334 // Update the external read offset. | |
| 335 while (!rd_flags_.empty() && | |
| 336 (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) { | |
| 337 // TODO(damienv): Could add a sanity check to make sure the offset is | |
| 338 // between the external read offset and the write offset. | |
| 339 rd_flags_.pop_front(); | |
| 340 } | |
| 341 | |
| 342 // Update the read offset to the first locked memory area | |
| 343 // or to the internal read pointer if nothing prevents it. | |
| 344 size_t external_rd_offset = internal_rd_offset(); | |
| 345 if (!rd_flags_.empty()) | |
| 346 external_rd_offset = rd_flags_.front()->offset(); | |
| 347 CommitRead(external_rd_offset); | |
| 348 } | |
| 349 | |
| 350 size_t MediaMessageFifo::current_rd_offset() const { | |
| 351 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize)); | |
| 352 size_t rd_offset = base::subtle::Acquire_Load(rd_offset_); | |
| 353 CHECK_LT(rd_offset, size_); | |
| 354 return rd_offset; | |
| 355 } | |
| 356 | |
| 357 size_t MediaMessageFifo::current_wr_offset() const { | |
| 358 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize)); | |
| 359 | |
| 360 // When the fifo consumer acquires the write offset, | |
| 361 // we have to make sure that any possible following reads are actually | |
| 362 // returning results at least inline with the memory snapshot taken | |
| 363 // when the write offset was sampled. | |
| 364 // That's why an Acquire_Load is used here. | |
| 365 size_t wr_offset = base::subtle::Acquire_Load(wr_offset_); | |
| 366 CHECK_LT(wr_offset, size_); | |
| 367 return wr_offset; | |
| 368 } | |
| 369 | |
| 370 void MediaMessageFifo::CommitRead(size_t new_rd_offset) { | |
| 371 // Add a memory fence to ensure the message content is completely read | |
| 372 // before updating the read offset. | |
| 373 base::subtle::Release_Store(rd_offset_, new_rd_offset); | |
| 374 | |
| 375 // Since rd_offset_ is updated by a release_store above, any thread that | |
| 376 // does acquire_load is guaranteed to see the new rd_offset_ set above. | |
| 377 // So it is safe to send the notification. | |
| 378 if (!read_event_cb_.is_null()) { | |
| 379 read_event_cb_.Run(); | |
| 380 } | |
| 381 } | |
| 382 | |
| 383 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) { | |
| 384 // Add a memory fence to ensure the message content is written | |
| 385 // before updating the write offset. | |
| 386 base::subtle::Release_Store(wr_offset_, new_wr_offset); | |
| 387 | |
| 388 // Since wr_offset_ is updated by a release_store above, any thread that | |
| 389 // does acquire_load is guaranteed to see the new wr_offset_ set above. | |
| 390 // So it is safe to send the notification. | |
| 391 if (!write_event_cb_.is_null()) { | |
| 392 write_event_cb_.Run(); | |
| 393 } | |
| 394 } | |
| 395 | |
| 396 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) { | |
| 397 internal_rd_offset_ = new_rd_offset; | |
| 398 } | |
| 399 | |
| 400 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) { | |
| 401 internal_wr_offset_ = new_wr_offset; | |
| 402 } | |
| 403 | |
| 404 } // namespace media | |
| 405 } // namespace chromecast | |
| OLD | NEW |