Index: media/base/pipeline_impl.cc |
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc |
index 532236d14c1b1e85c7ff313cc2c4912147c87822..d7f153d5b009b862ecf71aaf161085fe597f5043 100644 |
--- a/media/base/pipeline_impl.cc |
+++ b/media/base/pipeline_impl.cc |
@@ -349,9 +349,28 @@ bool PipelineImpl::IsPipelineInitializing() { |
state_ == kInitVideoRenderer; |
} |
+bool PipelineImpl::IsPipelineStopped() { |
+ DCHECK_EQ(MessageLoop::current(), message_loop_); |
+ return state_ == kStopped || state_ == kError; |
+} |
+ |
+void PipelineImpl::FinishInitialization() { |
+ DCHECK_EQ(MessageLoop::current(), message_loop_); |
+ // Execute the seek callback, if present. Note that this might be the |
+ // initial callback passed into Start(). |
+ if (seek_callback_.get()) { |
+ seek_callback_->Run(); |
+ seek_callback_.reset(); |
+ } |
+ filter_factory_ = NULL; |
+} |
+ |
// static |
-bool PipelineImpl::StateTransitionsToStarted(State state) { |
- return state == kPausing || state == kSeeking || state == kStarting; |
+bool PipelineImpl::TransientState(State state) { |
+ return state == kPausing || |
+ state == kSeeking || |
+ state == kStarting || |
+ state == kStopping; |
} |
// static |
@@ -363,6 +382,8 @@ PipelineImpl::State PipelineImpl::FindNextState(State current) { |
return kStarting; |
if (current == kStarting) |
return kStarted; |
+ if (current == kStopping) |
+ return kStopped; |
return current; |
} |
@@ -371,8 +392,6 @@ void PipelineImpl::SetError(PipelineError error) { |
DCHECK(error != PIPELINE_OK) << "PIPELINE_OK isn't an error!"; |
LOG(INFO) << "Media pipeline error: " << error; |
- AutoLock auto_lock(lock_); |
- error_ = error; |
message_loop_->PostTask(FROM_HERE, |
NewRunnableMethod(this, &PipelineImpl::ErrorChangedTask, error)); |
} |
@@ -531,7 +550,7 @@ void PipelineImpl::InitializeTask() { |
DCHECK_EQ(MessageLoop::current(), message_loop_); |
// If we have received the stop or error signal, return immediately. |
- if (state_ == kStopped || state_ == kError) |
+ if (state_ == kStopping || IsPipelineStopped()) |
return; |
DCHECK(state_ == kCreated || IsPipelineInitializing()); |
@@ -591,10 +610,6 @@ void PipelineImpl::InitializeTask() { |
return; |
} |
- // We've successfully created and initialized every filter, so we no longer |
- // need the filter factory. |
- filter_factory_ = NULL; |
- |
// Initialization was successful, we are now considered paused, so it's safe |
// to set the initial playback rate and volume. |
PlaybackRateChangedTask(GetPlaybackRate()); |
@@ -610,69 +625,57 @@ void PipelineImpl::InitializeTask() { |
} |
// This method is called as a result of the client calling Pipeline::Stop() or |
-// as the result of an error condition. If there is no error, then set the |
-// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the |
-// reverse order. |
+// as the result of an error condition. |
+// We stop the filters in the reverse order. |
// |
// TODO(scherkus): beware! this can get posted multiple times since we post |
// Stop() tasks even if we've already stopped. Perhaps this should no-op for |
// additional calls, however most of this logic will be changing. |
void PipelineImpl::StopTask(PipelineCallback* stop_callback) { |
DCHECK_EQ(MessageLoop::current(), message_loop_); |
- stop_callback_.reset(stop_callback); |
+ PipelineError error = GetError(); |
- // If we've already stopped, return immediately. |
- if (state_ == kStopped) { |
+ if (state_ == kStopped || (state_ == kStopping && error == PIPELINE_OK)) { |
scherkus (not reviewing)
2010/06/02 02:13:05
this bit of logic is confusing me...
so if we're
|
+ // If we are already stopped or stopping normally, return immediately. |
+ delete stop_callback; |
return; |
+ } else if (state_ == kError || |
scherkus (not reviewing)
2010/06/02 02:13:05
nit: we return, don't need else
|
+ (state_ == kStopping && error != PIPELINE_OK)) { |
+ // If we are stopping due to SetError(), stop normally instead of |
+ // going to error state. |
+ AutoLock auto_lock(lock_); |
+ error_ = PIPELINE_OK; |
} |
- // Carry out setting the error, notifying the client and destroying filters. |
- ErrorChangedTask(PIPELINE_STOPPING); |
- |
- // We no longer need to examine our previous state, set it to stopped. |
- state_ = kStopped; |
- |
- // Reset the pipeline. |
- ResetState(); |
+ stop_callback_.reset(stop_callback); |
- // Notify the client that stopping has finished. |
- if (stop_callback_.get()) { |
- stop_callback_->Run(); |
- stop_callback_.reset(); |
+ if (IsPipelineInitializing()) { |
+ FinishInitialization(); |
} |
+ |
+ StartDestroyingFilters(); |
} |
void PipelineImpl::ErrorChangedTask(PipelineError error) { |
DCHECK_EQ(MessageLoop::current(), message_loop_); |
DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!"; |
- // Suppress executing additional error logic. |
- // TODO(hclam): Remove the condition for kStopped. It is there only because |
- // FFmpegDemuxer submits a read error while reading after it is called to |
- // stop. After FFmpegDemuxer is cleaned up we should remove this condition |
- // and add an extra assert. |
- if (state_ == kError || state_ == kStopped) { |
+ // Suppress executing additional error logic. Note that if we are currently |
+ // performing a normal stop, then we return immediately and continue the |
+ // normal stop. |
+ if (IsPipelineStopped() || state_ == kStopping) { |
return; |
} |
+ AutoLock auto_lock(lock_); |
+ error_ = error; |
+ |
// Notify the client that starting did not complete, if necessary. |
- if (IsPipelineInitializing() && seek_callback_.get()) { |
- seek_callback_->Run(); |
+ if (IsPipelineInitializing()) { |
+ FinishInitialization(); |
} |
- seek_callback_.reset(); |
- filter_factory_ = NULL; |
- |
- // We no longer need to examine our previous state, set it to stopped. |
- state_ = kError; |
- // Destroy every filter and reset the pipeline as well. |
- DestroyFilters(); |
- |
- // If our owner has requested to be notified of an error, execute |
- // |error_callback_| unless we have a "good" error. |
- if (error_callback_.get() && error != PIPELINE_STOPPING) { |
- error_callback_->Run(); |
- } |
+ StartDestroyingFilters(); |
} |
void PipelineImpl::PlaybackRateChangedTask(float playback_rate) { |
@@ -790,11 +793,11 @@ void PipelineImpl::FilterStateTransitionTask() { |
DCHECK_EQ(MessageLoop::current(), message_loop_); |
// No reason transitioning if we've errored or have stopped. |
- if (state_ == kError || state_ == kStopped) { |
+ if (IsPipelineStopped()) { |
return; |
} |
- if (!StateTransitionsToStarted(state_)) { |
+ if (!TransientState(state_)) { |
NOTREACHED() << "Invalid current state: " << state_; |
SetError(PIPELINE_ERROR_ABORT); |
return; |
@@ -811,13 +814,13 @@ void PipelineImpl::FilterStateTransitionTask() { |
clock_.SetTime(seek_timestamp_); |
} |
- if (StateTransitionsToStarted(state_)) { |
+ if (TransientState(state_)) { |
remaining_transitions_ = filters_.size(); |
} |
} |
// Carry out the action for the current state. |
- if (StateTransitionsToStarted(state_)) { |
+ if (TransientState(state_)) { |
MediaFilter* filter = filters_[filters_.size() - remaining_transitions_]; |
if (state_ == kPausing) { |
filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
@@ -826,16 +829,13 @@ void PipelineImpl::FilterStateTransitionTask() { |
NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} else if (state_ == kStarting) { |
filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
+ } else if (state_ == kStopping) { |
+ filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
} else { |
NOTREACHED(); |
} |
} else if (state_ == kStarted) { |
- // Execute the seek callback, if present. Note that this might be the |
- // initial callback passed into Start(). |
- if (seek_callback_.get()) { |
- seek_callback_->Run(); |
- seek_callback_.reset(); |
- } |
+ FinishInitialization(); |
// Finally, reset our seeking timestamp back to zero. |
seek_timestamp_ = base::TimeDelta(); |
@@ -848,11 +848,52 @@ void PipelineImpl::FilterStateTransitionTask() { |
rendered_mime_types_.end(); |
if (!waiting_for_clock_update_) |
clock_.Play(); |
+ } else if (IsPipelineStopped()) { |
+ FinishDestroyingFiltersTask(); |
} else { |
NOTREACHED(); |
} |
} |
+void PipelineImpl::FinishDestroyingFiltersTask() { |
+ DCHECK_EQ(MessageLoop::current(), message_loop_); |
+ DCHECK(IsPipelineStopped()); |
+ |
+ // Stop every running filter thread. |
+ // |
+ // TODO(scherkus): can we watchdog this section to detect wedged threads? |
+ for (FilterThreadVector::iterator iter = filter_threads_.begin(); |
+ iter != filter_threads_.end(); |
+ ++iter) { |
+ (*iter)->Stop(); |
+ } |
+ |
+ // Reset the pipeline, which will decrement a reference to this object. |
+ // We will get destroyed as soon as the remaining tasks finish executing. |
+ // To be safe, we'll set our pipeline reference to NULL. |
+ filters_.clear(); |
+ filter_types_.clear(); |
+ STLDeleteElements(&filter_threads_); |
+ |
+ if (PIPELINE_OK == GetError()) { |
+ // Destroying filters due to Stop(). |
+ ResetState(); |
+ |
+ // Notify the client that stopping has finished. |
+ if (stop_callback_.get()) { |
+ stop_callback_->Run(); |
+ stop_callback_.reset(); |
+ } |
+ } else { |
+ // Destroying filters due to SetError(). |
+ state_ = kError; |
+ // If our owner has requested to be notified of an error. |
+ if (error_callback_.get()) { |
+ error_callback_->Run(); |
+ } |
+ } |
+} |
+ |
template <class Filter, class Source> |
void PipelineImpl::CreateFilter(FilterFactory* filter_factory, |
Source source, |
@@ -965,54 +1006,24 @@ void PipelineImpl::GetFilter(scoped_refptr<Filter>* filter_out) const { |
} |
} |
-void PipelineImpl::DestroyFilters() { |
- // Stop every filter. |
- for (FilterVector::iterator iter = filters_.begin(); |
- iter != filters_.end(); |
- ++iter) { |
- (*iter)->Stop(); |
- } |
- |
- // Crude blocking counter implementation. |
- Lock lock; |
- ConditionVariable wait_for_zero(&lock); |
- int count = filter_threads_.size(); |
- |
- // Post a task to every filter's thread to ensure that they've completed their |
- // stopping logic before stopping the threads themselves. |
- // |
- // TODO(scherkus): again, Stop() should either be synchronous or we should |
- // receive a signal from filters that they have indeed stopped. |
- for (FilterThreadVector::iterator iter = filter_threads_.begin(); |
- iter != filter_threads_.end(); |
- ++iter) { |
- (*iter)->message_loop()->PostTask(FROM_HERE, |
- NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); |
- } |
+void PipelineImpl::StartDestroyingFilters() { |
+ DCHECK_EQ(MessageLoop::current(), message_loop_); |
+ DCHECK_NE(kStopped, state_); |
- // Wait on our "blocking counter". |
- { |
- AutoLock auto_lock(lock); |
- while (count > 0) { |
- wait_for_zero.Wait(); |
- } |
+ if (state_ == kStopping) { |
+ return; // Do not call Stop() on filters twice. |
scherkus (not reviewing)
2010/06/02 02:13:05
nit: two spaces between ; and //
|
} |
- // Stop every running filter thread. |
- // |
- // TODO(scherkus): can we watchdog this section to detect wedged threads? |
- for (FilterThreadVector::iterator iter = filter_threads_.begin(); |
- iter != filter_threads_.end(); |
- ++iter) { |
- (*iter)->Stop(); |
+ remaining_transitions_ = filters_.size(); |
+ if (remaining_transitions_ > 0) { |
+ state_ = kStopping; |
+ filters_.front()->Stop(NewCallback( |
+ this, &PipelineImpl::OnFilterStateTransition)); |
+ } else { |
+ state_ = kStopped; |
+ message_loop_->PostTask(FROM_HERE, |
+ NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask)); |
} |
- |
- // Reset the pipeline, which will decrement a reference to this object. |
- // We will get destroyed as soon as the remaining tasks finish executing. |
- // To be safe, we'll set our pipeline reference to NULL. |
- filters_.clear(); |
- filter_types_.clear(); |
- STLDeleteElements(&filter_threads_); |
} |
} // namespace media |