Chromium Code Reviews| Index: media/filters/adaptive_demuxer.cc |
| diff --git a/media/filters/adaptive_demuxer.cc b/media/filters/adaptive_demuxer.cc |
| index 5599b15c3b0eb9b7d61d28f1c40d9b6a8f71cc85..03c115163857f36f90131f519adae66140b810c1 100644 |
| --- a/media/filters/adaptive_demuxer.cc |
| +++ b/media/filters/adaptive_demuxer.cc |
| @@ -4,20 +4,158 @@ |
| #include "base/bind.h" |
| #include "base/logging.h" |
| +#include "base/message_loop.h" |
| #include "base/string_number_conversions.h" |
| #include "base/string_split.h" |
| #include "base/string_util.h" |
| +#include "media/ffmpeg/ffmpeg_common.h" |
| #include "media/filters/adaptive_demuxer.h" |
| namespace media { |
| +static const int64 kSwitchTimerPeriod = 5000; // In milliseconds. |
| + |
| +class StreamSwitchManager |
| + : public base::RefCountedThreadSafe<StreamSwitchManager> { |
| + |
| + public: |
| + StreamSwitchManager(); |
| + ~StreamSwitchManager(); |
| + |
| + void Init(AdaptiveDemuxer* demuxer); |
| + |
| + void Play(); |
| + void Pause(); |
| + void Stop(); |
| + |
| + private: |
| + void OnSwitchTimer(); |
| + void OnSwitchDone(PipelineStatus status); |
| + void StartSwitchTimer_Locked(); |
| + |
| + AdaptiveDemuxer* demuxer_; |
| + |
| + // Guards the members below. Only held for simple variable reads/writes, not |
| + // during async operation. |
| + base::Lock lock_; |
| + bool playing_; |
| + bool switch_pending_; |
| + bool switch_timer_running_; |
| + AdaptiveDemuxer::StreamIdVector video_ids_; |
| + int current_id_index_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(StreamSwitchManager); |
| +}; |
| + |
| +StreamSwitchManager::StreamSwitchManager() |
| + : demuxer_(NULL), |
| + playing_(false), |
| + switch_pending_(false), |
| + switch_timer_running_(false), |
| + current_id_index_(-1) { |
| +} |
| + |
| +StreamSwitchManager::~StreamSwitchManager() {} |
| + |
| +void StreamSwitchManager::Init(AdaptiveDemuxer* demuxer) { |
| + DCHECK(demuxer); |
| + demuxer_ = demuxer; |
| + video_ids_ = demuxer->GetVideoIds(); |
| + current_id_index_ = -1; |
| + |
| + if (video_ids_.size() > 0) { |
| + int current_id = demuxer->GetCurrentVideoId(); |
| + current_id_index_ = 0; |
| + for (int i = 0; i < video_ids_.size(); i++) { |
| + if (current_id == video_ids_[i]) { |
| + current_id_index_ = i; |
| + break; |
| + } |
| + } |
| + } |
| +} |
| + |
| +void StreamSwitchManager::Play() { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + DCHECK(!playing_); |
| + |
| + playing_ = true; |
| + |
| + if (!switch_timer_running_) { |
| + StartSwitchTimer_Locked(); |
| + } |
| +} |
| + |
| +void StreamSwitchManager::Pause() { |
| + base::AutoLock auto_lock(lock_); |
| + DCHECK(playing_); |
| + playing_ = false; |
| +} |
| + |
| +void StreamSwitchManager::Stop() { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + DCHECK(!playing_); |
| + demuxer_ = NULL; |
| +} |
| + |
| + |
| + |
| +void StreamSwitchManager::OnSwitchTimer() { |
| + int new_stream_id = -1; |
| + AdaptiveDemuxer* demuxer = NULL; |
| + SeekHelper seek_helper; |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + switch_timer_running_ = false; |
| + |
| + if (!switch_pending_) { |
| + int new_id_index = (current_id_index_ + 1) % video_ids_.size(); |
| + |
| + if (new_id_index != current_id_index_) { |
| + current_id_index_ = new_id_index; |
| + switch_pending_ = true; |
| + demuxer = demuxer_; |
| + new_stream_id = video_ids_[new_id_index]; |
| + } |
| + } |
| + |
| + if (playing_) |
| + StartSwitchTimer_Locked(); |
| + } |
| + |
| + if (demuxer && new_stream_id != -1) { |
| + demuxer->ChangeVideoStream(new_stream_id, |
| + base::Bind(&StreamSwitchManager::OnSwitchDone, |
| + this)); |
| + } |
| +} |
| + |
| +void StreamSwitchManager::OnSwitchDone(PipelineStatus status) { |
| + base::AutoLock auto_lock(lock_); |
| + switch_pending_ = false; |
| +} |
| + |
| +void StreamSwitchManager::StartSwitchTimer_Locked() { |
| + lock_.AssertAcquired(); |
| + |
| + switch_timer_running_ = true; |
| + MessageLoop::current()->PostDelayedTask( |
| + FROM_HERE, NewRunnableMethod(this, &StreamSwitchManager::OnSwitchTimer), |
| + kSwitchTimerPeriod); |
| +} |
| + |
| // |
| // AdaptiveDemuxerStream |
| // |
| AdaptiveDemuxerStream::AdaptiveDemuxerStream( |
| StreamVector const& streams, int initial_stream) |
| : streams_(streams), current_stream_index_(initial_stream), |
| - bitstream_converter_enabled_(false) { |
| + bitstream_converter_enabled_(false), |
| + pending_reads_(0), |
| + switch_index_(-1), |
| + last_buffer_timestamp_(kNoTimestamp) { |
| DCheckSanity(); |
| } |
| @@ -25,7 +163,8 @@ void AdaptiveDemuxerStream::DCheckSanity() { |
| // We only carry out sanity checks in debug mode. |
| if (!logging::DEBUG_MODE) |
| return; |
| - DCHECK(streams_[current_stream_index_].get()); |
| + DCHECK(streams_[current_stream_index_].get()) |
| + << "current_stream_index_ : " << current_stream_index_; |
| bool non_null_stream_seen = false; |
| Type type = DemuxerStream::UNKNOWN; |
| @@ -57,7 +196,27 @@ DemuxerStream* AdaptiveDemuxerStream::current_stream() { |
| } |
| void AdaptiveDemuxerStream::Read(const ReadCallback& read_callback) { |
| - current_stream()->Read(read_callback); |
| + DemuxerStream* stream = NULL; |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + read_cb_queue_.push_back(read_callback); |
| + |
| + // Check to make sure we aren't doing a stream switch. We only want to |
| + // make calls on | streams_[current_stream_index_] | when we aren't |
| + // in the middle of a stream switch. Since the callback is stored in |
| + // | read_cb_queue_ | we will issue the Read() on the new stream once |
| + // the switch has completed. |
| + if (!IsSwitchPending_Locked()) { |
| + stream = streams_[current_stream_index_]; |
| + |
| + pending_reads_++; |
| + } |
| + } |
| + |
| + if (stream) |
| + stream->Read(base::Bind(&AdaptiveDemuxerStream::OnReadDone, this)); |
| } |
| AVStream* AdaptiveDemuxerStream::GetAVStream() { |
| @@ -80,16 +239,156 @@ void AdaptiveDemuxerStream::EnableBitstreamConverter() { |
| current_stream()->EnableBitstreamConverter(); |
| } |
| -void AdaptiveDemuxerStream::ChangeCurrentStream(int index) { |
| - bool needs_bitstream_converter_enabled; |
| +void AdaptiveDemuxerStream::OnAdaptiveDemuxerSeek() { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + last_buffer_timestamp_ = kNoTimestamp; |
| + |
| + // XXAJC Figure out what to do if this happens during a stream switch. |
|
Ami GONE FROM CHROMIUM
2011/05/19 20:27:37
Boom.
|
| +} |
| + |
| +void AdaptiveDemuxerStream::ChangeCurrentStream(int index, |
| + const SeekHelper& seek_helper, |
| + const PipelineStatusCB& cb) { |
| + DCHECK_GE(index, 0); |
| + |
| + PipelineStatusCB error_cb; |
| + PipelineStatus error_status = PIPELINE_OK; |
| + |
| + bool start_switch = false; |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + DCHECK_LE(index, streams_.size()); |
| + DCHECK(streams_[index].get()); |
| + DCHECK(!IsSwitchPending_Locked()); |
| + |
| + // XXAJC - Still need to handle the case where the stream has ended. |
| + if (index == current_stream_index_) { |
| + error_cb = cb; |
| + } else { |
| + switch_cb_ = cb; |
| + switch_index_ = index; |
| + switch_seek_helper_ = seek_helper; |
| + start_switch = (pending_reads_ == 0); |
| + } |
| + } |
| + |
| + if (!error_cb.is_null()) { |
| + error_cb.Run(error_status); |
| + return; |
| + } |
| + |
| + if (start_switch) |
| + StartSwitch(); |
| +} |
| + |
| +void AdaptiveDemuxerStream::OnReadDone(Buffer* buffer) { |
| + ReadCallback read_cb; |
| + bool start_switch = false; |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + pending_reads_--; |
| + |
| + DCHECK_GE(pending_reads_, 0); |
| + DCHECK_GE(read_cb_queue_.size(), 0); |
| + |
| + read_cb = read_cb_queue_.front(); |
| + read_cb_queue_.pop_front(); |
| + |
| + if (buffer && !buffer->IsEndOfStream()) |
| + last_buffer_timestamp_ = buffer->GetTimestamp(); |
| + |
| + start_switch = (pending_reads_ == 0 && IsSwitchPending_Locked()); |
| + } |
| + |
| + if (!read_cb.is_null()) |
| + read_cb.Run(buffer); |
| + |
| + if (start_switch) |
| + StartSwitch(); |
| +} |
| + |
| +bool AdaptiveDemuxerStream::IsSwitchPending_Locked() const { |
| + lock_.AssertAcquired(); |
| + return !switch_cb_.is_null(); |
| +} |
| + |
| +void AdaptiveDemuxerStream::StartSwitch() { |
| + SeekHelper seek_helper; |
| + base::TimeDelta seek_point; |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + DCHECK(IsSwitchPending_Locked()); |
| + DCHECK_EQ(pending_reads_, 0); |
| + DCHECK_GE(switch_index_, 0); |
| + |
| + seek_point = last_buffer_timestamp_; |
| + seek_helper = switch_seek_helper_; |
| + |
| + // XXAJX add code to call switch_cb_ if we are at the end of the stream now. |
| + } |
| + |
| + if (seek_point == kNoTimestamp) { |
| + // We haven't seen a buffer so there is no need to seek. Just move on to |
| + // the next stage in the switch process. |
| + OnSwitchSeekDone(PIPELINE_OK, kNoTimestamp); |
| + return; |
| + } |
| + |
| + DCHECK(!seek_helper.is_null()); |
| + seek_helper.Run(seek_point, |
| + base::Bind(&AdaptiveDemuxerStream::OnSwitchSeekDone, this)); |
| +} |
| + |
| +void AdaptiveDemuxerStream::OnSwitchSeekDone(PipelineStatus status, |
| + const base::TimeDelta& seek_time) { |
| + DemuxerStream* stream = NULL; |
| + PipelineStatusCB switch_cb; |
| + int reads_to_request = 0; |
| + bool needs_bitstream_converter_enabled = false; |
| + |
| { |
| base::AutoLock auto_lock(lock_); |
| - current_stream_index_ = index; |
| - DCHECK(streams_[current_stream_index_]); |
| - needs_bitstream_converter_enabled = bitstream_converter_enabled_; |
| + |
| + if (status == PIPELINE_OK) { |
| + DCHECK(streams_[switch_index_]); |
| + |
| + current_stream_index_ = switch_index_; |
| + needs_bitstream_converter_enabled = bitstream_converter_enabled_; |
| + } |
| + |
| + // Clear stream switch state. |
| + switch_cb = switch_cb_; |
| + switch_cb_.Reset(); |
| + switch_index_ = -1; |
| + switch_seek_helper_.Reset(); |
| + |
| + // Get the number of outstanding Read()s on this object. |
| + reads_to_request = read_cb_queue_.size(); |
| + |
| + DCHECK_EQ(pending_reads_, 0); |
| + pending_reads_ = reads_to_request; |
| + |
| + stream = streams_[current_stream_index_]; |
| } |
| + |
| if (needs_bitstream_converter_enabled) |
| EnableBitstreamConverter(); |
| + |
| + if (stream) { |
| + // Make the Read() calls that were deferred during the stream switch. |
| + for(;reads_to_request > 0; --reads_to_request) |
| + stream->Read(base::Bind(&AdaptiveDemuxerStream::OnReadDone, this)); |
| + } |
| + |
| + // Signal that the stream switch has completed. |
| + if (!switch_cb.is_null()) |
| + switch_cb.Run(status); |
| } |
| // |
| @@ -101,7 +400,10 @@ AdaptiveDemuxer::AdaptiveDemuxer(DemuxerVector const& demuxers, |
| int initial_video_demuxer_index) |
| : demuxers_(demuxers), |
| current_audio_demuxer_index_(initial_audio_demuxer_index), |
| - current_video_demuxer_index_(initial_video_demuxer_index) { |
| + current_video_demuxer_index_(initial_video_demuxer_index), |
| + playback_rate_(0), |
| + switch_pending_(false), |
| + stream_switch_manager_(new StreamSwitchManager()){ |
| DCHECK(!demuxers_.empty()); |
| DCHECK_GE(current_audio_demuxer_index_, -1); |
| DCHECK_GE(current_video_demuxer_index_, -1); |
| @@ -123,6 +425,8 @@ AdaptiveDemuxer::AdaptiveDemuxer(DemuxerVector const& demuxers, |
| video_streams, current_video_demuxer_index_); |
| } |
| + stream_switch_manager_->Init(this); |
| + |
| // TODO(fischman): any streams in the underlying demuxers that aren't being |
| // consumed currently need to be sent to /dev/null or else FFmpegDemuxer will |
| // hold data for them in memory, waiting for them to get drained by a |
| @@ -131,17 +435,81 @@ AdaptiveDemuxer::AdaptiveDemuxer(DemuxerVector const& demuxers, |
| AdaptiveDemuxer::~AdaptiveDemuxer() {} |
| -void AdaptiveDemuxer::ChangeCurrentDemuxer(int audio_index, int video_index) { |
| +void AdaptiveDemuxer::ChangeVideoStream(int video_index, |
| + const PipelineStatusCB& done_cb) { |
| // TODO(fischman): this is currently broken because when a new Demuxer is to |
| // be used we need to set_host(host()) it, and we need to set_host(NULL) the |
| // current Demuxer if it's no longer being used. |
| + |
| + AdaptiveDemuxerStream* stream = NULL; |
| + PipelineStatus error_status = PIPELINE_OK; |
| + bool run_callback_with_ok = false; |
| + SeekHelper seek_helper; |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + if (!video_stream_) { |
| + error_status = PIPELINE_ERROR_INVALID_STATE; |
| + } else if (switch_pending_) { |
| + error_status = PIPELINE_ERROR_OPERATION_PENDING; |
| + } else if (current_video_demuxer_index_ == video_index) { |
| + run_callback_with_ok = true; |
| + } else { |
| + stream = video_stream_; |
| + switch_pending_ = true; |
| + seek_helper = base::Bind(&AdaptiveDemuxer::StartStreamSwitchSeek, |
| + this, |
| + DemuxerStream::VIDEO, |
| + video_index); |
| + } |
| + } |
| + |
| + if (run_callback_with_ok || error_status != PIPELINE_OK) { |
| + done_cb.Run(error_status); |
| + return; |
| + } |
| + |
| + DCHECK(stream); |
| + stream->ChangeCurrentStream( |
| + video_index, seek_helper, |
| + base::Bind(&AdaptiveDemuxer::ChangeVideoStreamDone, this, video_index, |
| + done_cb)); |
| +} |
| + |
| +int AdaptiveDemuxer::GetCurrentVideoId() const { |
| base::AutoLock auto_lock(lock_); |
| - current_audio_demuxer_index_ = audio_index; |
| - current_video_demuxer_index_ = video_index; |
| - if (audio_stream_) |
| - audio_stream_->ChangeCurrentStream(audio_index); |
| - if (video_stream_) |
| - video_stream_->ChangeCurrentStream(video_index); |
| + return current_video_demuxer_index_; |
| +} |
| + |
| +AdaptiveDemuxer::StreamIdVector AdaptiveDemuxer::GetVideoIds() const { |
| + StreamIdVector ids; |
| + base::AutoLock auto_lock(lock_); |
| + |
| + for (int i = 0; i < demuxers_.size(); i++) { |
| + if (demuxers_[i]->GetStream(DemuxerStream::VIDEO)) |
| + ids.push_back(i); |
| + } |
| + |
| + return ids; |
| +} |
| + |
| +void AdaptiveDemuxer::ChangeVideoStreamDone(int new_stream_index, |
| + const PipelineStatusCB& done_cb, |
| + PipelineStatus status) { |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + switch_pending_ = false; |
| + |
| + if (status == PIPELINE_OK) { |
| + demuxers_[current_video_demuxer_index_]->SetPlaybackRate(0); |
| + current_video_demuxer_index_ = new_stream_index; |
| + demuxers_[current_video_demuxer_index_]->SetPlaybackRate(playback_rate_); |
| + } |
| + } |
| + |
| + done_cb.Run(status); |
| } |
| Demuxer* AdaptiveDemuxer::current_demuxer(DemuxerStream::Type type) { |
| @@ -241,6 +609,8 @@ class CountingStatusCB : public base::RefCountedThreadSafe<CountingStatusCB> { |
| }; |
| void AdaptiveDemuxer::Stop(FilterCallback* callback) { |
| + stream_switch_manager_->Stop(); |
| + |
| // Stop() must be called on all of the demuxers even though only one demuxer |
| // is actively delivering audio and another one is delivering video. This |
| // just satisfies the contract that all demuxers must have Stop() called on |
| @@ -253,6 +623,13 @@ void AdaptiveDemuxer::Stop(FilterCallback* callback) { |
| } |
| void AdaptiveDemuxer::Seek(base::TimeDelta time, const FilterStatusCB& cb) { |
| + |
| + if (audio_stream_) |
| + audio_stream_->OnAdaptiveDemuxerSeek(); |
| + |
| + if (video_stream_) |
| + video_stream_->OnAdaptiveDemuxerSeek(); |
| + |
| Demuxer* audio = current_demuxer(DemuxerStream::AUDIO); |
| Demuxer* video = current_demuxer(DemuxerStream::VIDEO); |
| int count = (audio ? 1 : 0) + (video && audio != video ? 1 : 0); |
| @@ -276,6 +653,17 @@ void AdaptiveDemuxer::set_host(FilterHost* filter_host) { |
| } |
| void AdaptiveDemuxer::SetPlaybackRate(float playback_rate) { |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + if (playback_rate_ == 0 && playback_rate > 0) { |
| + stream_switch_manager_->Play(); |
| + } else if (playback_rate_ > 0 && playback_rate == 0) { |
| + stream_switch_manager_->Pause(); |
| + } |
| + |
| + playback_rate_ = playback_rate; |
| + } |
| + |
| Demuxer* audio = current_demuxer(DemuxerStream::AUDIO); |
| Demuxer* video = current_demuxer(DemuxerStream::VIDEO); |
| if (audio) audio->SetPlaybackRate(playback_rate); |
| @@ -300,6 +688,94 @@ scoped_refptr<DemuxerStream> AdaptiveDemuxer::GetStream( |
| } |
| } |
| +void AdaptiveDemuxer::StartStreamSwitchSeek(DemuxerStream::Type type, |
| + int stream_index, |
| + const base::TimeDelta& seek_time, |
| + const SeekHelperCB& seek_cb) { |
| + DCHECK(!seek_cb.is_null()); |
| + |
| + Demuxer* demuxer = NULL; |
| + base::TimeDelta seek_point; |
| + FilterStatusCB filter_cb; |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + demuxer = demuxers_[stream_index]; |
| + |
| + if (GetSeekTimeAfter(demuxer->GetStream(type)->GetAVStream(), seek_time, |
| + &seek_point)) { |
| + // We found a seek point. |
| + filter_cb = base::Bind(&AdaptiveDemuxer::OnStreamSeekDone, this, |
| + seek_point, seek_cb); |
| + } else { |
| + // We didn't find a seek point. Assume we don't have index data for it |
| + // yet. Seek to the specified time to force index data to be loaded. |
| + seek_point = seek_time; |
| + filter_cb = base::Bind(&AdaptiveDemuxer::OnIndexSeekDone, this, |
| + type, stream_index, seek_time, seek_cb); |
| + } |
| + } |
| + |
| + if (!demuxer) { |
| + seek_cb.Run(PIPELINE_ERROR_INVALID_STATE, base::TimeDelta()); |
| + return; |
| + } |
| + |
| + demuxer->Seek(seek_point, filter_cb); |
| +} |
| + |
| +void AdaptiveDemuxer::OnIndexSeekDone(DemuxerStream::Type type, |
| + int stream_index, |
| + const base::TimeDelta& seek_time, |
| + const SeekHelperCB& seek_cb, |
| + PipelineStatus status) { |
| + base::TimeDelta seek_point; |
| + FilterStatusCB filter_cb; |
| + |
| + Demuxer* demuxer = NULL; |
| + |
| + if (status != PIPELINE_OK) { |
| + seek_cb.Run(status, base::TimeDelta()); |
| + return; |
| + } |
| + |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + demuxer = demuxers_[stream_index]; |
| + |
| + // Look for a seek point now that we have index data. |
| + if (GetSeekTimeAfter(demuxer->GetStream(type)->GetAVStream(), seek_time, |
| + &seek_point)) { |
| + filter_cb = base::Bind(&AdaptiveDemuxer::OnStreamSeekDone, this, |
| + seek_point, seek_cb); |
| + } else { |
| + demuxer = NULL; |
| + } |
| + } |
| + |
| + if (demuxer) { |
| + demuxer->Seek(seek_point, filter_cb); |
| + return; |
| + } |
| + |
| + // No seek point for specified seek_time. |
| + seek_cb.Run(PIPELINE_ERROR_INITIALIZATION_FAILED, base::TimeDelta()); |
| +} |
| + |
| +void AdaptiveDemuxer::OnStreamSeekDone(const base::TimeDelta& seek_point, |
| + const SeekHelperCB& seek_cb, |
| + PipelineStatus status) { |
| + if (status != PIPELINE_OK) { |
| + seek_cb.Run(status, base::TimeDelta()); |
| + return; |
| + } |
| + |
| + seek_cb.Run(PIPELINE_OK, seek_point); |
| +} |
| + |
| + |
| // |
| // AdaptiveDemuxerFactory |
| // |