Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Unified Diff: media/filters/chunk_demuxer.cc

Issue 7203002: Adding ChunkDemuxer implementation. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Addressed CR comments & split code out into separate files. Created 9 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/filters/chunk_demuxer.cc
diff --git a/media/filters/chunk_demuxer.cc b/media/filters/chunk_demuxer.cc
new file mode 100644
index 0000000000000000000000000000000000000000..2c23c799b6812a96baf22d0e0d8d405c8f257cb6
--- /dev/null
+++ b/media/filters/chunk_demuxer.cc
@@ -0,0 +1,578 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Implements a Demuxer that can switch among different data sources mid-stream.
+// Uses FFmpegDemuxer under the covers, so see the caveats at the top of
+// ffmpeg_demuxer.h.
+
+#include "media/filters/chunk_demuxer.h"
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "media/base/filter_host.h"
+#include "media/base/data_buffer.h"
+#include "media/ffmpeg/ffmpeg_common.h"
+#include "media/filters/in_memory_url_protocol.h"
scherkus (not reviewing) 2011/06/24 18:27:37 f > i
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 Done.
+#include "media/filters/ffmpeg_glue.h"
+#include "media/webm/webm_cluster_parser.h"
+#include "media/webm/webm_constants.h"
+#include "media/webm/webm_info_parser.h"
+#include "media/webm/webm_tracks_parser.h"
+
+namespace media {
+
+// WebM File Header. This is prepended to the INFO & TRACKS
+// data passed to Init() before handing it to FFmpeg. Essentially
+// we are making the INFO & TRACKS data look like a small WebM
+// file so we can use FFmpeg to initialize the AVFormatContext.
+//
+// TODO(acolwell): Remove this once GetAVStream() has been removed from
+// the DemuxerStream interface.
+static const uint8 kWebMHeader[] = {
+ 0x1A, 0x45, 0xDF, 0xA3, 0x9F, // EBML (size = 0x1f)
scherkus (not reviewing) 2011/06/24 18:27:37 nit: two spaces between code + comments
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 Done.
+ 0x42, 0x86, 0x81, 0x01, // EBMLVersion = 1
+ 0x42, 0xF7, 0x81, 0x01, // EBMLReadVersion = 1
+ 0x42, 0xF2, 0x81, 0x04, // EBMLMaxIDLength = 4
+ 0x42, 0xF3, 0x81, 0x08, // EBMLMaxSizeLength = 8
+ 0x42, 0x82, 0x84, 0x77, 0x65, 0x62, 0x6D, // DocType = "webm"
+ 0x42, 0x87, 0x81, 0x02, // DocTypeVersion = 2
+ 0x42, 0x85, 0x81, 0x02, // DocTypeReadVersion = 2
+ // EBML end
+ 0x18, 0x53, 0x80, 0x67, // Segment
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // segment(size = 0)
+ // INFO goes here.
+};
+
+// Offset of the segment size field in kWebMHeader. Used to update
+// the segment size field before handing the buffer to FFmpeg.
+static const int kSegmentSizeOffset = sizeof(kWebMHeader) - 8;
+
+static const uint8 kEmptyCluster[] = {
+ 0x1F, 0x43, 0xB6, 0x75, 0x80 // CLUSTER (size = 0)
scherkus (not reviewing) 2011/06/24 18:27:37 ditto
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 Done.
+};
+
+static Buffer* CreateBuffer(const uint8* data, size_t size) {
+ scoped_array<uint8> buf(new uint8[size]);
+ memcpy(buf.get(), data, size);
+ return new DataBuffer(buf.release(), size);
+}
+
+class ChunkDemuxerStream : public DemuxerStream {
+ public:
+ typedef std::deque<scoped_refptr<Buffer> > BufferQueue;
+ typedef std::deque<ReadCallback> ReadCBQueue;
+
+ ChunkDemuxerStream(Type type, AVStream* stream);
+ virtual ~ChunkDemuxerStream();
+
+ void Flush();
+ void AddBuffers(const BufferQueue& buffers);
+ void Shutdown();
+
+ bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const;
+
+ // DemuxerStream methods.
+ virtual void Read(const ReadCallback& read_callback);
+ virtual Type type();
+ virtual const MediaFormat& media_format();
+ virtual void EnableBitstreamConverter();
+ virtual AVStream* GetAVStream();
+
+ private:
+ static void RunCallback(ReadCallback cb, scoped_refptr<Buffer> buffer);
+
+ Type type_;
+ MediaFormat media_format_;
+ AVStream* av_stream_;
+
+ mutable base::Lock lock_;
+ ReadCBQueue read_cbs_;
+ BufferQueue buffers_;
+ bool shutdown_called_;
+
+ DISALLOW_IMPLICIT_CONSTRUCTORS(ChunkDemuxerStream);
+};
+
+ChunkDemuxerStream::ChunkDemuxerStream(Type type, AVStream* stream)
+ : type_(type),
+ av_stream_(stream),
+ shutdown_called_(false) {
+}
+
+ChunkDemuxerStream::~ChunkDemuxerStream() {}
+
+void ChunkDemuxerStream::Flush() {
+ VLOG(1) << "Flush()";
+ base::AutoLock auto_lock(lock_);
+ buffers_.clear();
+}
+
+void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) {
+ std::deque<base::Closure> callbacks;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ for (BufferQueue::const_iterator itr = buffers.begin();
+ itr != buffers.end(); itr++) {
+ buffers_.push_back(*itr);
+ }
+
+ while (!buffers_.empty() && !read_cbs_.empty()) {
+ callbacks.push_back(base::Bind(&ChunkDemuxerStream::RunCallback,
+ read_cbs_.front(),
+ buffers_.front()));
+ buffers_.pop_front();
+ read_cbs_.pop_front();
+ }
+ }
+
+ while (!callbacks.empty()) {
+ callbacks.front().Run();
+ callbacks.pop_front();
+ }
+}
+
+void ChunkDemuxerStream::Shutdown() {
+ std::deque<ReadCallback> callbacks;
+ {
+ base::AutoLock auto_lock(lock_);
+ shutdown_called_ = true;
+
+ // Collect all the pending Read() callbacks.
+ while (!read_cbs_.empty()) {
+ callbacks.push_back(read_cbs_.front());
+ read_cbs_.pop_front();
+ }
+ }
+
+ // Pass NULL to all callbacks to signify read failure.
+ while (!callbacks.empty()) {
+ callbacks.front().Run(NULL);
+ callbacks.pop_front();
+ }
+}
+
+bool ChunkDemuxerStream::GetLastBufferTimestamp(
+ base::TimeDelta* timestamp) const {
+ base::AutoLock auto_lock(lock_);
+
+ if (buffers_.empty())
+ return false;
+
+ *timestamp = buffers_.back()->GetTimestamp();
+ return true;
+}
+
+// Helper function used to make Closures for ReadCallbacks.
+//static
+void ChunkDemuxerStream::RunCallback(ReadCallback cb,
+ scoped_refptr<Buffer> buffer) {
+ cb.Run(buffer);
+}
+
+// Helper function that makes sure |read_callback| runs on |message_loop|.
+static void RunOnMessageLoop(const DemuxerStream::ReadCallback& read_callback,
+ MessageLoop* message_loop,
+ scoped_refptr<Buffer> buffer) {
+ if (MessageLoop::current() != message_loop) {
+ message_loop->PostTask(FROM_HERE,
+ NewRunnableFunction(&RunOnMessageLoop,
+ read_callback,
+ message_loop,
+ buffer));
+ return;
+ }
+
+ read_callback.Run(buffer);
+}
+
+// DemuxerStream methods.
+void ChunkDemuxerStream::Read(const ReadCallback& read_callback) {
+ scoped_refptr<Buffer> buffer;
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ if (!shutdown_called_) {
+ if (buffers_.empty()) {
+ // Wrap & store |read_callback| so that it will
+ // get called on the current MessageLoop.
+ read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
+ read_callback,
+ MessageLoop::current()));
+ return;
+ }
+
+ if (!read_cbs_.empty()) {
+ // Wrap & store |read_callback| so that it will
+ // get called on the current MessageLoop.
+ read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
+ read_callback,
+ MessageLoop::current()));
+ return;
+ }
+
+ buffer = buffers_.front();
+ buffers_.pop_front();
+ }
+ }
+
+ read_callback.Run(buffer);
+}
+
+DemuxerStream::Type ChunkDemuxerStream::type() { return type_; }
+
+const MediaFormat& ChunkDemuxerStream::media_format() { return media_format_; }
+
+void ChunkDemuxerStream::EnableBitstreamConverter() {}
+
+AVStream* ChunkDemuxerStream::GetAVStream() { return av_stream_; }
+
+ChunkDemuxer::ChunkDemuxer()
+ : state_(WAITING_FOR_INIT),
+ format_context_(NULL),
+ buffered_bytes_(0),
+ first_seek_(true) {
+}
+
+ChunkDemuxer::~ChunkDemuxer() {
+ DCHECK_NE(state_, INITIALIZED);
+
+ if (!format_context_)
+ return;
+
+ DestroyAVFormatContext(format_context_);
+ format_context_ = NULL;
+}
+
+bool ChunkDemuxer::Init(const uint8* data, int size) {
+ DCHECK(data);
+ DCHECK_GT(size, 0);
+ VLOG(1) << "Init(" << size << ")";
+
+ base::AutoLock auto_lock(lock_);
+ DCHECK_EQ(state_, WAITING_FOR_INIT);
+
+ const uint8* cur = data;
+ int cur_size = size;
+ WebMInfoParser info_parser;
+ int res = info_parser.Parse(cur, cur_size);
+
+ if (res <= 0) {
+ ChangeState(INIT_ERROR);
+ return false;
+ }
+
+ cur += res;
+ cur_size -= res;
+
+ WebMTracksParser tracks_parser(info_parser.timecode_scale());
+ res = tracks_parser.Parse(cur, cur_size);
+
+ if (res <= 0) {
+ ChangeState(INIT_ERROR);
+ return false;
+ }
+
+ double mult = info_parser.timecode_scale() / 1000.0;
+ duration_ = base::TimeDelta::FromMicroseconds(info_parser.duration() * mult);
+
+ cluster_parser_.reset(new WebMClusterParser(
+ info_parser.timecode_scale(),
+ tracks_parser.audio_track_num(),
+ tracks_parser.audio_default_duration(),
+ tracks_parser.video_track_num(),
+ tracks_parser.video_default_duration()));
+
+ format_context_ = CreateFormatContext(data, size);
+
+ if (!format_context_ || !SetupStreams() || !ParsePendingBuffers()) {
+ ChangeState(INIT_ERROR);
+ return false;
+ }
+
+ ChangeState(INITIALIZED);
+ return true;
+}
+
+// Filter implementation.
+void ChunkDemuxer::set_host(FilterHost* filter_host) {
+ Demuxer::set_host(filter_host);
+ filter_host->SetDuration(duration_);
+ filter_host->SetCurrentReadPosition(0);
+}
+
+void ChunkDemuxer::Stop(FilterCallback* callback) {
+ VLOG(1) << "Stop()";
+
+ callback->Run();
+ delete callback;
+}
+
+void ChunkDemuxer::Seek(base::TimeDelta time, const FilterStatusCB& cb) {
+ VLOG(1) << "Seek(" << time.InSecondsF() << ")";
+
+ bool run_callback = false;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ if (first_seek_) {
+ first_seek_ = false;
+ run_callback = true;
+ } else {
+ seek_cb_ = cb;
+ seek_time_ = time;
+ }
+ }
+
+ if (run_callback)
+ cb.Run(PIPELINE_OK);
+}
+
+void ChunkDemuxer::OnAudioRendererDisabled() {
+ base::AutoLock auto_lock(lock_);
+ audio_ = NULL;
+}
+
+void ChunkDemuxer::SetPreload(Preload preload) {}
scherkus (not reviewing) 2011/06/24 18:27:37 NOTIMPLEMENTED?
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 No. This gets called, but we can't really do anyth
+
+// Demuxer implementation.
+scoped_refptr<DemuxerStream> ChunkDemuxer::GetStream(
+ DemuxerStream::Type type) {
+ VLOG(1) << "GetStream(" << type << ")";
+
+ if (type == DemuxerStream::VIDEO)
+ return video_;
+
+ if (type == DemuxerStream::AUDIO)
+ return audio_;
+
+ return NULL;
+}
+
+base::TimeDelta ChunkDemuxer::GetStartTime() const {
+ VLOG(1) << "GetStartTime()";
+ // TODO(acolwell) : Fix this so it uses the time on the first packet.
+ return base::TimeDelta();
+}
+
+void ChunkDemuxer::FlushData() {
+ base::AutoLock auto_lock(lock_);
+ if (audio_.get())
+ audio_->Flush();
+
+ if (video_.get())
+ video_->Flush();
+
+ pending_buffers_.clear();
+}
+
+bool ChunkDemuxer::AddData(const uint8* data, unsigned length) {
+ VLOG(1) << "AddData(" << length << ")";
+
+ int64 buffered_bytes = 0;
+ base::TimeDelta buffered_ts = base::TimeDelta::FromSeconds(-1);
+
+ FilterStatusCB cb;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ switch(state_) {
+ case WAITING_FOR_INIT:
+ pending_buffers_.push_back(CreateBuffer(data, length));
+ return false;
+ break;
+
+ case INITIALIZED:
+ if (!ParseAndAddData_Locked(data, length)) {
+ VLOG(1) << "AddData(): parsing data failed";
+ return false;
+ }
+ break;
+
+ case INIT_ERROR:
+ case SHUTDOWN:
+ VLOG(1) << "AddData(): called in unexpected state " << state_;
+ return false;
+ break;
+ }
+
+ base::TimeDelta tmp;
+ if (audio_.get() && audio_->GetLastBufferTimestamp(&tmp) &&
+ tmp > buffered_ts) {
+ buffered_ts = tmp;
+ }
+
+ if (video_.get() && video_->GetLastBufferTimestamp(&tmp) &&
+ tmp > buffered_ts) {
+ buffered_ts = tmp;
+ }
+
+ buffered_bytes = buffered_bytes_;
+
+ if (!seek_cb_.is_null())
+ std::swap(cb, seek_cb_);
+ }
+
+ // Notify the host of 'network activity' because we got data.
+ if (host()) {
+ host()->SetBufferedBytes(buffered_bytes);
+
+ if (buffered_ts.InSeconds() >= 0) {
+ host()->SetBufferedTime(buffered_ts);
+ }
+
+ host()->SetNetworkActivity(true);
+ }
+
+ if (!cb.is_null())
+ cb.Run(PIPELINE_OK);
+
+ return true;
+}
+
+void ChunkDemuxer::Shutdown() {
+ FilterStatusCB cb;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ std::swap(cb, seek_cb_);
+
+ if (audio_.get())
+ audio_->Shutdown();
+
+ if (video_.get())
+ video_->Shutdown();
+
+ ChangeState(SHUTDOWN);
+ }
+
+ if (!cb.is_null())
+ cb.Run(PIPELINE_OK);
+}
+
+void ChunkDemuxer::ChangeState(State new_state) {
+ lock_.AssertAcquired();
+ state_ = new_state;
+}
+
+AVFormatContext* ChunkDemuxer::CreateFormatContext(const uint8* data,
+ int size) const {
+ int segment_size = size + sizeof(kEmptyCluster);
+ int buf_size = sizeof(kWebMHeader) + segment_size;
+ scoped_array<uint8> buf(new uint8[buf_size]);
+ memcpy(buf.get(), kWebMHeader, sizeof(kWebMHeader));
+ memcpy(buf.get() + sizeof(kWebMHeader), data, size);
+ memcpy(buf.get() + sizeof(kWebMHeader) + size, kEmptyCluster,
+ sizeof(kEmptyCluster));
+
+ // Update the segment size in the buffer.
+ int64 tmp = (segment_size & GG_LONGLONG(0x00FFFFFFFFFFFFFF)) |
+ GG_LONGLONG(0x0100000000000000);
+ for (int i = 0; i < 8; i++) {
+ buf[kSegmentSizeOffset + i] = (tmp >> (8 * (7 - i))) & 0xff;
+ }
+
+ InMemoryUrlProtocol imup(buf.get(), buf_size, true);
+ std::string key = FFmpegGlue::GetInstance()->AddProtocol(&imup);
+
+ // Open FFmpeg AVFormatContext.
+ AVFormatContext* context = NULL;
+ int result = av_open_input_file(&context, key.c_str(), NULL, 0, NULL);
+
+ // Remove ourself from protocol list.
+ FFmpegGlue::GetInstance()->RemoveProtocol(&imup);
+
+ if (result < 0)
+ return NULL;
+
+ return context;
+}
+
+bool ChunkDemuxer::SetupStreams() {
+ int result = av_find_stream_info(format_context_);
+
+ if (result < 0)
+ return false;
+
+ bool no_supported_streams = true;
+ for (size_t i = 0; i < format_context_->nb_streams; ++i) {
+ AVStream* stream = format_context_->streams[i];
+ AVCodecContext* codec_context = stream->codec;
+ CodecType codec_type = codec_context->codec_type;
+
+ if (codec_type == CODEC_TYPE_AUDIO &&
+ stream->codec->codec_id == CODEC_ID_VORBIS &&
+ !audio_.get()) {
+ audio_ = new ChunkDemuxerStream(DemuxerStream::AUDIO, stream);
+ no_supported_streams = false;
+ continue;
+ }
+
+ if (codec_type == CODEC_TYPE_VIDEO &&
+ stream->codec->codec_id == CODEC_ID_VP8 &&
+ !video_.get()) {
+ video_ = new ChunkDemuxerStream(DemuxerStream::VIDEO, stream);
+ no_supported_streams = false;
+ continue;
+ }
+ }
+
+ return !no_supported_streams;
+}
+
+bool ChunkDemuxer::ParsePendingBuffers() {
+ // Handle any buffers that came in between the time the pipeline was
+ // started and Init() was called.
+ while(!pending_buffers_.empty()) {
+ scoped_refptr<media::Buffer> buf = pending_buffers_.front();
+ pending_buffers_.pop_front();
+
+ if (!ParseAndAddData_Locked(buf->GetData(), buf->GetDataSize())) {
+ pending_buffers_.clear();
+ ChangeState(INIT_ERROR);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool ChunkDemuxer::ParseAndAddData_Locked(const uint8* data, int length) {
+ if (!cluster_parser_.get())
+ return false;
+
+ const uint8* cur = data;
+ int cur_size = length;
+
+ while (cur_size > 0) {
+ cluster_parser_->Reset();
+ int res = cluster_parser_->Parse(cur, cur_size);
+
+ if (res <= 0) {
+ VLOG(1) << "ParseAndAddData_Locked() : cluster parsing failed.";
+ return false;
+ }
+
+ if (audio_.get())
+ audio_->AddBuffers(cluster_parser_->audio_buffers());
+
+ if (video_.get())
+ video_->AddBuffers(cluster_parser_->video_buffers());
+
+ cur += res;
+ cur_size -= res;
+ }
+
+ cluster_parser_->Reset();
+
+ // TODO(acolwell) : make this more representative of what is actually
+ // buffered.
+ buffered_bytes_ += length;
+
+ return true;
+}
+
+} // namespace media

Powered by Google App Engine
This is Rietveld 408576698