Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(956)

Unified Diff: media/base/pipeline_impl.cc

Issue 2101015: Change MediaFilter::Stop() to accept a callback so that Stop() is asynchronous. (Closed) Base URL: http://src.chromium.org/git/chromium.git
Patch Set: Revert filter changes. Going to make that another patch. Created 10 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « media/base/pipeline_impl.h ('k') | media/base/pipeline_impl_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: media/base/pipeline_impl.cc
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 532236d14c1b1e85c7ff313cc2c4912147c87822..d7f153d5b009b862ecf71aaf161085fe597f5043 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -349,9 +349,28 @@ bool PipelineImpl::IsPipelineInitializing() {
state_ == kInitVideoRenderer;
}
+bool PipelineImpl::IsPipelineStopped() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ return state_ == kStopped || state_ == kError;
+}
+
+void PipelineImpl::FinishInitialization() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ // Execute the seek callback, if present. Note that this might be the
+ // initial callback passed into Start().
+ if (seek_callback_.get()) {
+ seek_callback_->Run();
+ seek_callback_.reset();
+ }
+ filter_factory_ = NULL;
+}
+
// static
-bool PipelineImpl::StateTransitionsToStarted(State state) {
- return state == kPausing || state == kSeeking || state == kStarting;
+bool PipelineImpl::TransientState(State state) {
+ return state == kPausing ||
+ state == kSeeking ||
+ state == kStarting ||
+ state == kStopping;
}
// static
@@ -363,6 +382,8 @@ PipelineImpl::State PipelineImpl::FindNextState(State current) {
return kStarting;
if (current == kStarting)
return kStarted;
+ if (current == kStopping)
+ return kStopped;
return current;
}
@@ -371,8 +392,6 @@ void PipelineImpl::SetError(PipelineError error) {
DCHECK(error != PIPELINE_OK) << "PIPELINE_OK isn't an error!";
LOG(INFO) << "Media pipeline error: " << error;
- AutoLock auto_lock(lock_);
- error_ = error;
message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &PipelineImpl::ErrorChangedTask, error));
}
@@ -531,7 +550,7 @@ void PipelineImpl::InitializeTask() {
DCHECK_EQ(MessageLoop::current(), message_loop_);
// If we have received the stop or error signal, return immediately.
- if (state_ == kStopped || state_ == kError)
+ if (state_ == kStopping || IsPipelineStopped())
return;
DCHECK(state_ == kCreated || IsPipelineInitializing());
@@ -591,10 +610,6 @@ void PipelineImpl::InitializeTask() {
return;
}
- // We've successfully created and initialized every filter, so we no longer
- // need the filter factory.
- filter_factory_ = NULL;
-
// Initialization was successful, we are now considered paused, so it's safe
// to set the initial playback rate and volume.
PlaybackRateChangedTask(GetPlaybackRate());
@@ -610,69 +625,57 @@ void PipelineImpl::InitializeTask() {
}
// This method is called as a result of the client calling Pipeline::Stop() or
-// as the result of an error condition. If there is no error, then set the
-// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the
-// reverse order.
+// as the result of an error condition.
+// We stop the filters in the reverse order.
//
// TODO(scherkus): beware! this can get posted multiple times since we post
// Stop() tasks even if we've already stopped. Perhaps this should no-op for
// additional calls, however most of this logic will be changing.
void PipelineImpl::StopTask(PipelineCallback* stop_callback) {
DCHECK_EQ(MessageLoop::current(), message_loop_);
- stop_callback_.reset(stop_callback);
+ PipelineError error = GetError();
- // If we've already stopped, return immediately.
- if (state_ == kStopped) {
+ if (state_ == kStopped || (state_ == kStopping && error == PIPELINE_OK)) {
scherkus (not reviewing) 2010/06/02 02:13:05 this bit of logic is confusing me... so if we're
+ // If we are already stopped or stopping normally, return immediately.
+ delete stop_callback;
return;
+ } else if (state_ == kError ||
scherkus (not reviewing) 2010/06/02 02:13:05 nit: we return, don't need else
+ (state_ == kStopping && error != PIPELINE_OK)) {
+ // If we are stopping due to SetError(), stop normally instead of
+ // going to error state.
+ AutoLock auto_lock(lock_);
+ error_ = PIPELINE_OK;
}
- // Carry out setting the error, notifying the client and destroying filters.
- ErrorChangedTask(PIPELINE_STOPPING);
-
- // We no longer need to examine our previous state, set it to stopped.
- state_ = kStopped;
-
- // Reset the pipeline.
- ResetState();
+ stop_callback_.reset(stop_callback);
- // Notify the client that stopping has finished.
- if (stop_callback_.get()) {
- stop_callback_->Run();
- stop_callback_.reset();
+ if (IsPipelineInitializing()) {
+ FinishInitialization();
}
+
+ StartDestroyingFilters();
}
void PipelineImpl::ErrorChangedTask(PipelineError error) {
DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
- // Suppress executing additional error logic.
- // TODO(hclam): Remove the condition for kStopped. It is there only because
- // FFmpegDemuxer submits a read error while reading after it is called to
- // stop. After FFmpegDemuxer is cleaned up we should remove this condition
- // and add an extra assert.
- if (state_ == kError || state_ == kStopped) {
+ // Suppress executing additional error logic. Note that if we are currently
+ // performing a normal stop, then we return immediately and continue the
+ // normal stop.
+ if (IsPipelineStopped() || state_ == kStopping) {
return;
}
+ AutoLock auto_lock(lock_);
+ error_ = error;
+
// Notify the client that starting did not complete, if necessary.
- if (IsPipelineInitializing() && seek_callback_.get()) {
- seek_callback_->Run();
+ if (IsPipelineInitializing()) {
+ FinishInitialization();
}
- seek_callback_.reset();
- filter_factory_ = NULL;
-
- // We no longer need to examine our previous state, set it to stopped.
- state_ = kError;
- // Destroy every filter and reset the pipeline as well.
- DestroyFilters();
-
- // If our owner has requested to be notified of an error, execute
- // |error_callback_| unless we have a "good" error.
- if (error_callback_.get() && error != PIPELINE_STOPPING) {
- error_callback_->Run();
- }
+ StartDestroyingFilters();
}
void PipelineImpl::PlaybackRateChangedTask(float playback_rate) {
@@ -790,11 +793,11 @@ void PipelineImpl::FilterStateTransitionTask() {
DCHECK_EQ(MessageLoop::current(), message_loop_);
// No reason transitioning if we've errored or have stopped.
- if (state_ == kError || state_ == kStopped) {
+ if (IsPipelineStopped()) {
return;
}
- if (!StateTransitionsToStarted(state_)) {
+ if (!TransientState(state_)) {
NOTREACHED() << "Invalid current state: " << state_;
SetError(PIPELINE_ERROR_ABORT);
return;
@@ -811,13 +814,13 @@ void PipelineImpl::FilterStateTransitionTask() {
clock_.SetTime(seek_timestamp_);
}
- if (StateTransitionsToStarted(state_)) {
+ if (TransientState(state_)) {
remaining_transitions_ = filters_.size();
}
}
// Carry out the action for the current state.
- if (StateTransitionsToStarted(state_)) {
+ if (TransientState(state_)) {
MediaFilter* filter = filters_[filters_.size() - remaining_transitions_];
if (state_ == kPausing) {
filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
@@ -826,16 +829,13 @@ void PipelineImpl::FilterStateTransitionTask() {
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStarting) {
filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ } else if (state_ == kStopping) {
+ filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else {
NOTREACHED();
}
} else if (state_ == kStarted) {
- // Execute the seek callback, if present. Note that this might be the
- // initial callback passed into Start().
- if (seek_callback_.get()) {
- seek_callback_->Run();
- seek_callback_.reset();
- }
+ FinishInitialization();
// Finally, reset our seeking timestamp back to zero.
seek_timestamp_ = base::TimeDelta();
@@ -848,11 +848,52 @@ void PipelineImpl::FilterStateTransitionTask() {
rendered_mime_types_.end();
if (!waiting_for_clock_update_)
clock_.Play();
+ } else if (IsPipelineStopped()) {
+ FinishDestroyingFiltersTask();
} else {
NOTREACHED();
}
}
+void PipelineImpl::FinishDestroyingFiltersTask() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK(IsPipelineStopped());
+
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
+ // Reset the pipeline, which will decrement a reference to this object.
+ // We will get destroyed as soon as the remaining tasks finish executing.
+ // To be safe, we'll set our pipeline reference to NULL.
+ filters_.clear();
+ filter_types_.clear();
+ STLDeleteElements(&filter_threads_);
+
+ if (PIPELINE_OK == GetError()) {
+ // Destroying filters due to Stop().
+ ResetState();
+
+ // Notify the client that stopping has finished.
+ if (stop_callback_.get()) {
+ stop_callback_->Run();
+ stop_callback_.reset();
+ }
+ } else {
+ // Destroying filters due to SetError().
+ state_ = kError;
+ // If our owner has requested to be notified of an error.
+ if (error_callback_.get()) {
+ error_callback_->Run();
+ }
+ }
+}
+
template <class Filter, class Source>
void PipelineImpl::CreateFilter(FilterFactory* filter_factory,
Source source,
@@ -965,54 +1006,24 @@ void PipelineImpl::GetFilter(scoped_refptr<Filter>* filter_out) const {
}
}
-void PipelineImpl::DestroyFilters() {
- // Stop every filter.
- for (FilterVector::iterator iter = filters_.begin();
- iter != filters_.end();
- ++iter) {
- (*iter)->Stop();
- }
-
- // Crude blocking counter implementation.
- Lock lock;
- ConditionVariable wait_for_zero(&lock);
- int count = filter_threads_.size();
-
- // Post a task to every filter's thread to ensure that they've completed their
- // stopping logic before stopping the threads themselves.
- //
- // TODO(scherkus): again, Stop() should either be synchronous or we should
- // receive a signal from filters that they have indeed stopped.
- for (FilterThreadVector::iterator iter = filter_threads_.begin();
- iter != filter_threads_.end();
- ++iter) {
- (*iter)->message_loop()->PostTask(FROM_HERE,
- NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
- }
+void PipelineImpl::StartDestroyingFilters() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK_NE(kStopped, state_);
- // Wait on our "blocking counter".
- {
- AutoLock auto_lock(lock);
- while (count > 0) {
- wait_for_zero.Wait();
- }
+ if (state_ == kStopping) {
+ return; // Do not call Stop() on filters twice.
scherkus (not reviewing) 2010/06/02 02:13:05 nit: two spaces between ; and //
}
- // Stop every running filter thread.
- //
- // TODO(scherkus): can we watchdog this section to detect wedged threads?
- for (FilterThreadVector::iterator iter = filter_threads_.begin();
- iter != filter_threads_.end();
- ++iter) {
- (*iter)->Stop();
+ remaining_transitions_ = filters_.size();
+ if (remaining_transitions_ > 0) {
+ state_ = kStopping;
+ filters_.front()->Stop(NewCallback(
+ this, &PipelineImpl::OnFilterStateTransition));
+ } else {
+ state_ = kStopped;
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask));
}
-
- // Reset the pipeline, which will decrement a reference to this object.
- // We will get destroyed as soon as the remaining tasks finish executing.
- // To be safe, we'll set our pipeline reference to NULL.
- filters_.clear();
- filter_types_.clear();
- STLDeleteElements(&filter_threads_);
}
} // namespace media
« no previous file with comments | « media/base/pipeline_impl.h ('k') | media/base/pipeline_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698