Index: media/base/pipeline_impl.cc |
=================================================================== |
--- media/base/pipeline_impl.cc (revision 70274) |
+++ media/base/pipeline_impl.cc (working copy) |
@@ -22,7 +22,6 @@ |
scoped_refptr<Demuxer> demuxer_; |
scoped_refptr<AudioDecoder> audio_decoder_; |
scoped_refptr<VideoDecoder> video_decoder_; |
- scoped_refptr<CompositeFilter> composite_; |
}; |
PipelineImpl::PipelineImpl(MessageLoop* message_loop) |
@@ -30,6 +29,7 @@ |
clock_(new ClockImpl(&base::Time::Now)), |
waiting_for_clock_update_(false), |
state_(kCreated), |
+ remaining_transitions_(0), |
current_bytes_(0) { |
ResetState(); |
} |
@@ -317,10 +317,6 @@ |
rendered_mime_types_.clear(); |
} |
-void PipelineImpl::set_state(State next_state) { |
- state_ = next_state; |
-} |
- |
bool PipelineImpl::IsPipelineOk() { |
return PIPELINE_OK == GetError(); |
} |
@@ -576,25 +572,22 @@ |
// Just created, create data source. |
if (state_ == kCreated) { |
- set_state(kInitDataSource); |
+ state_ = kInitDataSource; |
pipeline_init_state_.reset(new PipelineInitState()); |
- pipeline_init_state_->composite_ = new CompositeFilter(message_loop_); |
- pipeline_init_state_->composite_->set_host(this); |
- |
InitializeDataSource(); |
return; |
} |
// Data source created, create demuxer. |
if (state_ == kInitDataSource) { |
- set_state(kInitDemuxer); |
+ state_ = kInitDemuxer; |
InitializeDemuxer(pipeline_init_state_->data_source_); |
return; |
} |
// Demuxer created, create audio decoder. |
if (state_ == kInitDemuxer) { |
- set_state(kInitAudioDecoder); |
+ state_ = kInitAudioDecoder; |
// If this method returns false, then there's no audio stream. |
if (InitializeAudioDecoder(pipeline_init_state_->demuxer_)) |
return; |
@@ -602,7 +595,7 @@ |
// Assuming audio decoder was created, create audio renderer. |
if (state_ == kInitAudioDecoder) { |
- set_state(kInitAudioRenderer); |
+ state_ = kInitAudioRenderer; |
// Returns false if there's no audio stream. |
if (InitializeAudioRenderer(pipeline_init_state_->audio_decoder_)) { |
InsertRenderedMimeType(mime_type::kMajorTypeAudio); |
@@ -613,14 +606,14 @@ |
// Assuming audio renderer was created, create video decoder. |
if (state_ == kInitAudioRenderer) { |
// Then perform the stage of initialization, i.e. initialize video decoder. |
- set_state(kInitVideoDecoder); |
+ state_ = kInitVideoDecoder; |
if (InitializeVideoDecoder(pipeline_init_state_->demuxer_)) |
return; |
} |
// Assuming video decoder was created, create video renderer. |
if (state_ == kInitVideoDecoder) { |
- set_state(kInitVideoRenderer); |
+ state_ = kInitVideoRenderer; |
if (InitializeVideoRenderer(pipeline_init_state_->video_decoder_)) { |
InsertRenderedMimeType(mime_type::kMajorTypeVideo); |
return; |
@@ -636,8 +629,6 @@ |
// Clear the collection of filters. |
filter_collection_->Clear(); |
- pipeline_filter_ = pipeline_init_state_->composite_; |
- |
// Clear init state since we're done initializing. |
pipeline_init_state_.reset(); |
@@ -646,11 +637,12 @@ |
PlaybackRateChangedTask(GetPlaybackRate()); |
VolumeChangedTask(GetVolume()); |
- // Fire the seek request to get the filters to preroll. |
+ // Fire the initial seek request to get the filters to preroll. |
seek_pending_ = true; |
- set_state(kSeeking); |
+ state_ = kSeeking; |
+ remaining_transitions_ = filters_.size(); |
seek_timestamp_ = base::TimeDelta(); |
- pipeline_filter_->Seek(seek_timestamp_, |
+ filters_.front()->Seek(seek_timestamp_, |
NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} |
} |
@@ -713,7 +705,11 @@ |
AutoLock auto_lock(lock_); |
clock_->SetPlaybackRate(playback_rate); |
} |
- pipeline_filter_->SetPlaybackRate(playback_rate); |
+ for (FilterVector::iterator iter = filters_.begin(); |
+ iter != filters_.end(); |
+ ++iter) { |
+ (*iter)->SetPlaybackRate(playback_rate); |
+ } |
} |
void PipelineImpl::VolumeChangedTask(float volume) { |
@@ -749,9 +745,10 @@ |
// kSeeking (for each filter) |
// kStarting (for each filter) |
// kStarted |
- set_state(kPausing); |
+ state_ = kPausing; |
seek_timestamp_ = time; |
seek_callback_.reset(seek_callback); |
+ remaining_transitions_ = filters_.size(); |
// Kick off seeking! |
{ |
@@ -760,7 +757,7 @@ |
if (!waiting_for_clock_update_) |
clock_->Pause(); |
} |
- pipeline_filter_->Pause( |
+ filters_.front()->Pause( |
NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} |
@@ -793,7 +790,7 @@ |
} |
// Transition to ended, executing the callback if present. |
- set_state(kEnded); |
+ state_ = kEnded; |
if (ended_callback_.get()) { |
ended_callback_->Run(); |
} |
@@ -817,7 +814,11 @@ |
audio_disabled_ = true; |
// Notify all filters of disabled audio renderer. |
- pipeline_filter_->OnAudioRendererDisabled(); |
+ for (FilterVector::iterator iter = filters_.begin(); |
+ iter != filters_.end(); |
+ ++iter) { |
+ (*iter)->OnAudioRendererDisabled(); |
+ } |
} |
void PipelineImpl::FilterStateTransitionTask() { |
@@ -836,31 +837,42 @@ |
// Decrement the number of remaining transitions, making sure to transition |
// to the next state if needed. |
- set_state(FindNextState(state_)); |
- if (state_ == kSeeking) { |
- AutoLock auto_lock(lock_); |
- clock_->SetTime(seek_timestamp_); |
+ DCHECK(remaining_transitions_ <= filters_.size()); |
+ DCHECK(remaining_transitions_ > 0u); |
+ if (--remaining_transitions_ == 0) { |
+ state_ = FindNextState(state_); |
+ if (state_ == kSeeking) { |
+ AutoLock auto_lock(lock_); |
+ clock_->SetTime(seek_timestamp_); |
+ } |
+ |
+ if (TransientState(state_)) { |
+ remaining_transitions_ = filters_.size(); |
+ } |
} |
// Carry out the action for the current state. |
if (TransientState(state_)) { |
+ Filter* filter = filters_[filters_.size() - remaining_transitions_]; |
if (state_ == kPausing) { |
- pipeline_filter_->Pause( |
- NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
+ filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} else if (state_ == kFlushing) { |
- pipeline_filter_->Flush( |
- NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
+ // We had to use parallel flushing all filters. |
+ if (remaining_transitions_ == filters_.size()) { |
+ for (size_t i = 0; i < filters_.size(); i++) { |
+ filters_[i]->Flush( |
+ NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
+ } |
+ } |
} else if (state_ == kSeeking) { |
- pipeline_filter_->Seek(seek_timestamp_, |
+ filter->Seek(seek_timestamp_, |
NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} else if (state_ == kStarting) { |
- pipeline_filter_->Play( |
- NewCallback(this,&PipelineImpl::OnFilterStateTransition)); |
+ filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} else if (state_ == kStopping) { |
- pipeline_filter_->Stop( |
- NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
+ filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} else { |
- NOTREACHED() << "Unexpected state: " << state_; |
+ NOTREACHED(); |
} |
} else if (state_ == kStarted) { |
FinishInitialization(); |
@@ -885,7 +897,7 @@ |
} else if (IsPipelineStopped()) { |
FinishDestroyingFiltersTask(); |
} else { |
- NOTREACHED() << "Unexpected state: " << state_; |
+ NOTREACHED(); |
} |
} |
@@ -893,11 +905,24 @@ |
DCHECK_EQ(MessageLoop::current(), message_loop_); |
DCHECK(IsPipelineStopped()); |
+ // Stop every running filter thread. |
+ // |
+ // TODO(scherkus): can we watchdog this section to detect wedged threads? |
+ for (FilterThreadVector::iterator iter = filter_threads_.begin(); |
+ iter != filter_threads_.end(); |
+ ++iter) { |
+ (*iter)->Stop(); |
+ } |
+ |
// Clear renderer references. |
audio_renderer_ = NULL; |
video_renderer_ = NULL; |
- pipeline_filter_ = NULL; |
+ // Reset the pipeline, which will decrement a reference to this object. |
+ // We will get destroyed as soon as the remaining tasks finish executing. |
+ // To be safe, we'll set our pipeline reference to NULL. |
+ filters_.clear(); |
+ STLDeleteElements(&filter_threads_); |
stop_pending_ = false; |
tearing_down_ = false; |
@@ -913,7 +938,7 @@ |
} |
} else { |
// Destroying filters due to SetError(). |
- set_state(kError); |
+ state_ = kError; |
// If our owner has requested to be notified of an error. |
if (error_callback_.get()) { |
error_callback_->Run(); |
@@ -922,12 +947,28 @@ |
} |
bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) { |
- bool ret = pipeline_init_state_->composite_->AddFilter(filter.get()); |
+ DCHECK_EQ(MessageLoop::current(), message_loop_); |
+ DCHECK(IsPipelineOk()); |
- if (!ret) { |
- SetError(PIPELINE_ERROR_INITIALIZATION_FAILED); |
+ // Create a dedicated thread for this filter if applicable. |
+ if (filter->requires_message_loop()) { |
+ scoped_ptr<base::Thread> thread( |
+ new base::Thread(filter->message_loop_name())); |
+ if (!thread.get() || !thread->Start()) { |
+ NOTREACHED() << "Could not start filter thread"; |
+ SetError(PIPELINE_ERROR_INITIALIZATION_FAILED); |
+ return false; |
+ } |
+ |
+ filter->set_message_loop(thread->message_loop()); |
+ filter_threads_.push_back(thread.release()); |
} |
- return ret; |
+ |
+ // Register ourselves as the filter's host. |
+ DCHECK(IsPipelineOk()); |
+ filter->set_host(this); |
+ filters_.push_back(make_scoped_refptr(filter.get())); |
+ return true; |
} |
void PipelineImpl::InitializeDataSource() { |
@@ -1104,21 +1145,23 @@ |
tearing_down_ = true; |
if (IsPipelineInitializing()) { |
- // Make it look like initialization was successful. |
- pipeline_filter_ = pipeline_init_state_->composite_; |
- pipeline_init_state_.reset(); |
+ // Notify the client that starting did not complete, if necessary. |
+ FinishInitialization(); |
+ } |
- set_state(kStopping); |
- pipeline_filter_->Stop(NewCallback( |
- this, &PipelineImpl::OnFilterStateTransition)); |
- |
- FinishInitialization(); |
- } else if (pipeline_filter_.get()) { |
- set_state(kPausing); |
- pipeline_filter_->Pause(NewCallback( |
- this, &PipelineImpl::OnFilterStateTransition)); |
+ remaining_transitions_ = filters_.size(); |
+ if (remaining_transitions_ > 0) { |
+ if (IsPipelineInitializing()) { |
+ state_ = kStopping; |
+ filters_.front()->Stop(NewCallback( |
+ this, &PipelineImpl::OnFilterStateTransition)); |
+ } else { |
+ state_ = kPausing; |
+ filters_.front()->Pause(NewCallback( |
+ this, &PipelineImpl::OnFilterStateTransition)); |
+ } |
} else { |
- set_state(kStopped); |
+ state_ = kStopped; |
message_loop_->PostTask(FROM_HERE, |
NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask)); |
} |