Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(98)

Unified Diff: media/filters/adaptive_demuxer.cc

Issue 7044008: Initial implementation of stream switching in AdaptiveDemuxer. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: _ Created 9 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/filters/adaptive_demuxer.cc
diff --git a/media/filters/adaptive_demuxer.cc b/media/filters/adaptive_demuxer.cc
index 5599b15c3b0eb9b7d61d28f1c40d9b6a8f71cc85..03c115163857f36f90131f519adae66140b810c1 100644
--- a/media/filters/adaptive_demuxer.cc
+++ b/media/filters/adaptive_demuxer.cc
@@ -4,20 +4,158 @@
#include "base/bind.h"
#include "base/logging.h"
+#include "base/message_loop.h"
#include "base/string_number_conversions.h"
#include "base/string_split.h"
#include "base/string_util.h"
+#include "media/ffmpeg/ffmpeg_common.h"
#include "media/filters/adaptive_demuxer.h"
namespace media {
+static const int64 kSwitchTimerPeriod = 5000; // In milliseconds.
+
+class StreamSwitchManager
+ : public base::RefCountedThreadSafe<StreamSwitchManager> {
+
+ public:
+ StreamSwitchManager();
+ ~StreamSwitchManager();
+
+ void Init(AdaptiveDemuxer* demuxer);
+
+ void Play();
+ void Pause();
+ void Stop();
+
+ private:
+ void OnSwitchTimer();
+ void OnSwitchDone(PipelineStatus status);
+ void StartSwitchTimer_Locked();
+
+ AdaptiveDemuxer* demuxer_;
+
+ // Guards the members below. Only held for simple variable reads/writes, not
+ // during async operation.
+ base::Lock lock_;
+ bool playing_;
+ bool switch_pending_;
+ bool switch_timer_running_;
+ AdaptiveDemuxer::StreamIdVector video_ids_;
+ int current_id_index_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamSwitchManager);
+};
+
+StreamSwitchManager::StreamSwitchManager()
+ : demuxer_(NULL),
+ playing_(false),
+ switch_pending_(false),
+ switch_timer_running_(false),
+ current_id_index_(-1) {
+}
+
+StreamSwitchManager::~StreamSwitchManager() {}
+
+void StreamSwitchManager::Init(AdaptiveDemuxer* demuxer) {
+ DCHECK(demuxer);
+ demuxer_ = demuxer;
+ video_ids_ = demuxer->GetVideoIds();
+ current_id_index_ = -1;
+
+ if (video_ids_.size() > 0) {
+ int current_id = demuxer->GetCurrentVideoId();
+ current_id_index_ = 0;
+ for (int i = 0; i < video_ids_.size(); i++) {
+ if (current_id == video_ids_[i]) {
+ current_id_index_ = i;
+ break;
+ }
+ }
+ }
+}
+
+void StreamSwitchManager::Play() {
+ base::AutoLock auto_lock(lock_);
+
+ DCHECK(!playing_);
+
+ playing_ = true;
+
+ if (!switch_timer_running_) {
+ StartSwitchTimer_Locked();
+ }
+}
+
+void StreamSwitchManager::Pause() {
+ base::AutoLock auto_lock(lock_);
+ DCHECK(playing_);
+ playing_ = false;
+}
+
+void StreamSwitchManager::Stop() {
+ base::AutoLock auto_lock(lock_);
+
+ DCHECK(!playing_);
+ demuxer_ = NULL;
+}
+
+
+
+void StreamSwitchManager::OnSwitchTimer() {
+ int new_stream_id = -1;
+ AdaptiveDemuxer* demuxer = NULL;
+ SeekHelper seek_helper;
+ {
+ base::AutoLock auto_lock(lock_);
+ switch_timer_running_ = false;
+
+ if (!switch_pending_) {
+ int new_id_index = (current_id_index_ + 1) % video_ids_.size();
+
+ if (new_id_index != current_id_index_) {
+ current_id_index_ = new_id_index;
+ switch_pending_ = true;
+ demuxer = demuxer_;
+ new_stream_id = video_ids_[new_id_index];
+ }
+ }
+
+ if (playing_)
+ StartSwitchTimer_Locked();
+ }
+
+ if (demuxer && new_stream_id != -1) {
+ demuxer->ChangeVideoStream(new_stream_id,
+ base::Bind(&StreamSwitchManager::OnSwitchDone,
+ this));
+ }
+}
+
+void StreamSwitchManager::OnSwitchDone(PipelineStatus status) {
+ base::AutoLock auto_lock(lock_);
+ switch_pending_ = false;
+}
+
+void StreamSwitchManager::StartSwitchTimer_Locked() {
+ lock_.AssertAcquired();
+
+ switch_timer_running_ = true;
+ MessageLoop::current()->PostDelayedTask(
+ FROM_HERE, NewRunnableMethod(this, &StreamSwitchManager::OnSwitchTimer),
+ kSwitchTimerPeriod);
+}
+
//
// AdaptiveDemuxerStream
//
AdaptiveDemuxerStream::AdaptiveDemuxerStream(
StreamVector const& streams, int initial_stream)
: streams_(streams), current_stream_index_(initial_stream),
- bitstream_converter_enabled_(false) {
+ bitstream_converter_enabled_(false),
+ pending_reads_(0),
+ switch_index_(-1),
+ last_buffer_timestamp_(kNoTimestamp) {
DCheckSanity();
}
@@ -25,7 +163,8 @@ void AdaptiveDemuxerStream::DCheckSanity() {
// We only carry out sanity checks in debug mode.
if (!logging::DEBUG_MODE)
return;
- DCHECK(streams_[current_stream_index_].get());
+ DCHECK(streams_[current_stream_index_].get())
+ << "current_stream_index_ : " << current_stream_index_;
bool non_null_stream_seen = false;
Type type = DemuxerStream::UNKNOWN;
@@ -57,7 +196,27 @@ DemuxerStream* AdaptiveDemuxerStream::current_stream() {
}
void AdaptiveDemuxerStream::Read(const ReadCallback& read_callback) {
- current_stream()->Read(read_callback);
+ DemuxerStream* stream = NULL;
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ read_cb_queue_.push_back(read_callback);
+
+ // Check to make sure we aren't doing a stream switch. We only want to
+ // make calls on | streams_[current_stream_index_] | when we aren't
+ // in the middle of a stream switch. Since the callback is stored in
+ // | read_cb_queue_ | we will issue the Read() on the new stream once
+ // the switch has completed.
+ if (!IsSwitchPending_Locked()) {
+ stream = streams_[current_stream_index_];
+
+ pending_reads_++;
+ }
+ }
+
+ if (stream)
+ stream->Read(base::Bind(&AdaptiveDemuxerStream::OnReadDone, this));
}
AVStream* AdaptiveDemuxerStream::GetAVStream() {
@@ -80,16 +239,156 @@ void AdaptiveDemuxerStream::EnableBitstreamConverter() {
current_stream()->EnableBitstreamConverter();
}
-void AdaptiveDemuxerStream::ChangeCurrentStream(int index) {
- bool needs_bitstream_converter_enabled;
+void AdaptiveDemuxerStream::OnAdaptiveDemuxerSeek() {
+ base::AutoLock auto_lock(lock_);
+
+ last_buffer_timestamp_ = kNoTimestamp;
+
+ // XXAJC Figure out what to do if this happens during a stream switch.
Ami GONE FROM CHROMIUM 2011/05/19 20:27:37 Boom.
+}
+
+void AdaptiveDemuxerStream::ChangeCurrentStream(int index,
+ const SeekHelper& seek_helper,
+ const PipelineStatusCB& cb) {
+ DCHECK_GE(index, 0);
+
+ PipelineStatusCB error_cb;
+ PipelineStatus error_status = PIPELINE_OK;
+
+ bool start_switch = false;
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ DCHECK_LE(index, streams_.size());
+ DCHECK(streams_[index].get());
+ DCHECK(!IsSwitchPending_Locked());
+
+ // XXAJC - Still need to handle the case where the stream has ended.
+ if (index == current_stream_index_) {
+ error_cb = cb;
+ } else {
+ switch_cb_ = cb;
+ switch_index_ = index;
+ switch_seek_helper_ = seek_helper;
+ start_switch = (pending_reads_ == 0);
+ }
+ }
+
+ if (!error_cb.is_null()) {
+ error_cb.Run(error_status);
+ return;
+ }
+
+ if (start_switch)
+ StartSwitch();
+}
+
+void AdaptiveDemuxerStream::OnReadDone(Buffer* buffer) {
+ ReadCallback read_cb;
+ bool start_switch = false;
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ pending_reads_--;
+
+ DCHECK_GE(pending_reads_, 0);
+ DCHECK_GE(read_cb_queue_.size(), 0);
+
+ read_cb = read_cb_queue_.front();
+ read_cb_queue_.pop_front();
+
+ if (buffer && !buffer->IsEndOfStream())
+ last_buffer_timestamp_ = buffer->GetTimestamp();
+
+ start_switch = (pending_reads_ == 0 && IsSwitchPending_Locked());
+ }
+
+ if (!read_cb.is_null())
+ read_cb.Run(buffer);
+
+ if (start_switch)
+ StartSwitch();
+}
+
+bool AdaptiveDemuxerStream::IsSwitchPending_Locked() const {
+ lock_.AssertAcquired();
+ return !switch_cb_.is_null();
+}
+
+void AdaptiveDemuxerStream::StartSwitch() {
+ SeekHelper seek_helper;
+ base::TimeDelta seek_point;
+
+ {
+ base::AutoLock auto_lock(lock_);
+ DCHECK(IsSwitchPending_Locked());
+ DCHECK_EQ(pending_reads_, 0);
+ DCHECK_GE(switch_index_, 0);
+
+ seek_point = last_buffer_timestamp_;
+ seek_helper = switch_seek_helper_;
+
+ // XXAJX add code to call switch_cb_ if we are at the end of the stream now.
+ }
+
+ if (seek_point == kNoTimestamp) {
+ // We haven't seen a buffer so there is no need to seek. Just move on to
+ // the next stage in the switch process.
+ OnSwitchSeekDone(PIPELINE_OK, kNoTimestamp);
+ return;
+ }
+
+ DCHECK(!seek_helper.is_null());
+ seek_helper.Run(seek_point,
+ base::Bind(&AdaptiveDemuxerStream::OnSwitchSeekDone, this));
+}
+
+void AdaptiveDemuxerStream::OnSwitchSeekDone(PipelineStatus status,
+ const base::TimeDelta& seek_time) {
+ DemuxerStream* stream = NULL;
+ PipelineStatusCB switch_cb;
+ int reads_to_request = 0;
+ bool needs_bitstream_converter_enabled = false;
+
{
base::AutoLock auto_lock(lock_);
- current_stream_index_ = index;
- DCHECK(streams_[current_stream_index_]);
- needs_bitstream_converter_enabled = bitstream_converter_enabled_;
+
+ if (status == PIPELINE_OK) {
+ DCHECK(streams_[switch_index_]);
+
+ current_stream_index_ = switch_index_;
+ needs_bitstream_converter_enabled = bitstream_converter_enabled_;
+ }
+
+ // Clear stream switch state.
+ switch_cb = switch_cb_;
+ switch_cb_.Reset();
+ switch_index_ = -1;
+ switch_seek_helper_.Reset();
+
+ // Get the number of outstanding Read()s on this object.
+ reads_to_request = read_cb_queue_.size();
+
+ DCHECK_EQ(pending_reads_, 0);
+ pending_reads_ = reads_to_request;
+
+ stream = streams_[current_stream_index_];
}
+
if (needs_bitstream_converter_enabled)
EnableBitstreamConverter();
+
+ if (stream) {
+ // Make the Read() calls that were deferred during the stream switch.
+ for(;reads_to_request > 0; --reads_to_request)
+ stream->Read(base::Bind(&AdaptiveDemuxerStream::OnReadDone, this));
+ }
+
+ // Signal that the stream switch has completed.
+ if (!switch_cb.is_null())
+ switch_cb.Run(status);
}
//
@@ -101,7 +400,10 @@ AdaptiveDemuxer::AdaptiveDemuxer(DemuxerVector const& demuxers,
int initial_video_demuxer_index)
: demuxers_(demuxers),
current_audio_demuxer_index_(initial_audio_demuxer_index),
- current_video_demuxer_index_(initial_video_demuxer_index) {
+ current_video_demuxer_index_(initial_video_demuxer_index),
+ playback_rate_(0),
+ switch_pending_(false),
+ stream_switch_manager_(new StreamSwitchManager()){
DCHECK(!demuxers_.empty());
DCHECK_GE(current_audio_demuxer_index_, -1);
DCHECK_GE(current_video_demuxer_index_, -1);
@@ -123,6 +425,8 @@ AdaptiveDemuxer::AdaptiveDemuxer(DemuxerVector const& demuxers,
video_streams, current_video_demuxer_index_);
}
+ stream_switch_manager_->Init(this);
+
// TODO(fischman): any streams in the underlying demuxers that aren't being
// consumed currently need to be sent to /dev/null or else FFmpegDemuxer will
// hold data for them in memory, waiting for them to get drained by a
@@ -131,17 +435,81 @@ AdaptiveDemuxer::AdaptiveDemuxer(DemuxerVector const& demuxers,
AdaptiveDemuxer::~AdaptiveDemuxer() {}
-void AdaptiveDemuxer::ChangeCurrentDemuxer(int audio_index, int video_index) {
+void AdaptiveDemuxer::ChangeVideoStream(int video_index,
+ const PipelineStatusCB& done_cb) {
// TODO(fischman): this is currently broken because when a new Demuxer is to
// be used we need to set_host(host()) it, and we need to set_host(NULL) the
// current Demuxer if it's no longer being used.
+
+ AdaptiveDemuxerStream* stream = NULL;
+ PipelineStatus error_status = PIPELINE_OK;
+ bool run_callback_with_ok = false;
+ SeekHelper seek_helper;
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ if (!video_stream_) {
+ error_status = PIPELINE_ERROR_INVALID_STATE;
+ } else if (switch_pending_) {
+ error_status = PIPELINE_ERROR_OPERATION_PENDING;
+ } else if (current_video_demuxer_index_ == video_index) {
+ run_callback_with_ok = true;
+ } else {
+ stream = video_stream_;
+ switch_pending_ = true;
+ seek_helper = base::Bind(&AdaptiveDemuxer::StartStreamSwitchSeek,
+ this,
+ DemuxerStream::VIDEO,
+ video_index);
+ }
+ }
+
+ if (run_callback_with_ok || error_status != PIPELINE_OK) {
+ done_cb.Run(error_status);
+ return;
+ }
+
+ DCHECK(stream);
+ stream->ChangeCurrentStream(
+ video_index, seek_helper,
+ base::Bind(&AdaptiveDemuxer::ChangeVideoStreamDone, this, video_index,
+ done_cb));
+}
+
+int AdaptiveDemuxer::GetCurrentVideoId() const {
base::AutoLock auto_lock(lock_);
- current_audio_demuxer_index_ = audio_index;
- current_video_demuxer_index_ = video_index;
- if (audio_stream_)
- audio_stream_->ChangeCurrentStream(audio_index);
- if (video_stream_)
- video_stream_->ChangeCurrentStream(video_index);
+ return current_video_demuxer_index_;
+}
+
+AdaptiveDemuxer::StreamIdVector AdaptiveDemuxer::GetVideoIds() const {
+ StreamIdVector ids;
+ base::AutoLock auto_lock(lock_);
+
+ for (int i = 0; i < demuxers_.size(); i++) {
+ if (demuxers_[i]->GetStream(DemuxerStream::VIDEO))
+ ids.push_back(i);
+ }
+
+ return ids;
+}
+
+void AdaptiveDemuxer::ChangeVideoStreamDone(int new_stream_index,
+ const PipelineStatusCB& done_cb,
+ PipelineStatus status) {
+ {
+ base::AutoLock auto_lock(lock_);
+
+ switch_pending_ = false;
+
+ if (status == PIPELINE_OK) {
+ demuxers_[current_video_demuxer_index_]->SetPlaybackRate(0);
+ current_video_demuxer_index_ = new_stream_index;
+ demuxers_[current_video_demuxer_index_]->SetPlaybackRate(playback_rate_);
+ }
+ }
+
+ done_cb.Run(status);
}
Demuxer* AdaptiveDemuxer::current_demuxer(DemuxerStream::Type type) {
@@ -241,6 +609,8 @@ class CountingStatusCB : public base::RefCountedThreadSafe<CountingStatusCB> {
};
void AdaptiveDemuxer::Stop(FilterCallback* callback) {
+ stream_switch_manager_->Stop();
+
// Stop() must be called on all of the demuxers even though only one demuxer
// is actively delivering audio and another one is delivering video. This
// just satisfies the contract that all demuxers must have Stop() called on
@@ -253,6 +623,13 @@ void AdaptiveDemuxer::Stop(FilterCallback* callback) {
}
void AdaptiveDemuxer::Seek(base::TimeDelta time, const FilterStatusCB& cb) {
+
+ if (audio_stream_)
+ audio_stream_->OnAdaptiveDemuxerSeek();
+
+ if (video_stream_)
+ video_stream_->OnAdaptiveDemuxerSeek();
+
Demuxer* audio = current_demuxer(DemuxerStream::AUDIO);
Demuxer* video = current_demuxer(DemuxerStream::VIDEO);
int count = (audio ? 1 : 0) + (video && audio != video ? 1 : 0);
@@ -276,6 +653,17 @@ void AdaptiveDemuxer::set_host(FilterHost* filter_host) {
}
void AdaptiveDemuxer::SetPlaybackRate(float playback_rate) {
+ {
+ base::AutoLock auto_lock(lock_);
+ if (playback_rate_ == 0 && playback_rate > 0) {
+ stream_switch_manager_->Play();
+ } else if (playback_rate_ > 0 && playback_rate == 0) {
+ stream_switch_manager_->Pause();
+ }
+
+ playback_rate_ = playback_rate;
+ }
+
Demuxer* audio = current_demuxer(DemuxerStream::AUDIO);
Demuxer* video = current_demuxer(DemuxerStream::VIDEO);
if (audio) audio->SetPlaybackRate(playback_rate);
@@ -300,6 +688,94 @@ scoped_refptr<DemuxerStream> AdaptiveDemuxer::GetStream(
}
}
+void AdaptiveDemuxer::StartStreamSwitchSeek(DemuxerStream::Type type,
+ int stream_index,
+ const base::TimeDelta& seek_time,
+ const SeekHelperCB& seek_cb) {
+ DCHECK(!seek_cb.is_null());
+
+ Demuxer* demuxer = NULL;
+ base::TimeDelta seek_point;
+ FilterStatusCB filter_cb;
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ demuxer = demuxers_[stream_index];
+
+ if (GetSeekTimeAfter(demuxer->GetStream(type)->GetAVStream(), seek_time,
+ &seek_point)) {
+ // We found a seek point.
+ filter_cb = base::Bind(&AdaptiveDemuxer::OnStreamSeekDone, this,
+ seek_point, seek_cb);
+ } else {
+ // We didn't find a seek point. Assume we don't have index data for it
+ // yet. Seek to the specified time to force index data to be loaded.
+ seek_point = seek_time;
+ filter_cb = base::Bind(&AdaptiveDemuxer::OnIndexSeekDone, this,
+ type, stream_index, seek_time, seek_cb);
+ }
+ }
+
+ if (!demuxer) {
+ seek_cb.Run(PIPELINE_ERROR_INVALID_STATE, base::TimeDelta());
+ return;
+ }
+
+ demuxer->Seek(seek_point, filter_cb);
+}
+
+void AdaptiveDemuxer::OnIndexSeekDone(DemuxerStream::Type type,
+ int stream_index,
+ const base::TimeDelta& seek_time,
+ const SeekHelperCB& seek_cb,
+ PipelineStatus status) {
+ base::TimeDelta seek_point;
+ FilterStatusCB filter_cb;
+
+ Demuxer* demuxer = NULL;
+
+ if (status != PIPELINE_OK) {
+ seek_cb.Run(status, base::TimeDelta());
+ return;
+ }
+
+ {
+ base::AutoLock auto_lock(lock_);
+
+ demuxer = demuxers_[stream_index];
+
+ // Look for a seek point now that we have index data.
+ if (GetSeekTimeAfter(demuxer->GetStream(type)->GetAVStream(), seek_time,
+ &seek_point)) {
+ filter_cb = base::Bind(&AdaptiveDemuxer::OnStreamSeekDone, this,
+ seek_point, seek_cb);
+ } else {
+ demuxer = NULL;
+ }
+ }
+
+ if (demuxer) {
+ demuxer->Seek(seek_point, filter_cb);
+ return;
+ }
+
+ // No seek point for specified seek_time.
+ seek_cb.Run(PIPELINE_ERROR_INITIALIZATION_FAILED, base::TimeDelta());
+}
+
+void AdaptiveDemuxer::OnStreamSeekDone(const base::TimeDelta& seek_point,
+ const SeekHelperCB& seek_cb,
+ PipelineStatus status) {
+ if (status != PIPELINE_OK) {
+ seek_cb.Run(status, base::TimeDelta());
+ return;
+ }
+
+ seek_cb.Run(PIPELINE_OK, seek_point);
+}
+
+
//
// AdaptiveDemuxerFactory
//

Powered by Google App Engine
This is Rietveld 408576698