Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(324)

Unified Diff: media/base/pipeline_impl.cc

Issue 149123: Asynchronous initialization of media::PipelineThread... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/base/pipeline_impl.cc
===================================================================
--- media/base/pipeline_impl.cc (revision 19701)
+++ media/base/pipeline_impl.cc (working copy)
@@ -3,7 +3,7 @@
// found in the LICENSE file.
//
// TODO(scherkus): clean up PipelineImpl... too many crazy function names,
-// potential deadlocks, nested message loops, etc...
+// potential deadlocks, etc...
#include "base/compiler_specific.h"
#include "base/condition_variable.h"
@@ -289,46 +289,37 @@
: pipeline_(pipeline),
thread_("PipelineThread"),
time_update_callback_scheduled_(false),
- host_initializing_(NULL) {
+ state_(kCreated) {
}
PipelineThread::~PipelineThread() {
Stop();
+ DCHECK(state_ == kStopped || state_ == kError);
}
// This method is called on the client's thread. It starts the pipeline's
-// dedicated thread and posts a task to call the StartTask method on that
+// dedicated thread and posts a task to call the StartTask() method on that
// thread.
bool PipelineThread::Start(FilterFactory* filter_factory,
const std::string& url,
PipelineCallback* init_complete_callback) {
+ DCHECK_EQ(kCreated, state_);
if (thread_.Start()) {
- filter_factory->AddRef();
- PostTask(NewRunnableMethod(this,
- &PipelineThread::StartTask,
- filter_factory,
- url,
- // TODO(ralphl): what happens to this callback?
- // is it copied by NewRunnableTask? Just pointer
- // or is the callback itself copied?
- init_complete_callback));
+ filter_factory_ = filter_factory;
+ url_ = url;
+ init_callback_.reset(init_complete_callback);
+ PostTask(NewRunnableMethod(this, &PipelineThread::StartTask));
return true;
}
return false;
}
// Called on the client's thread. If the thread has been started, then posts
-// a task to call the StopTask method, then waits until the thread has stopped.
-// There is a critical section that wraps the entire duration of the StartTask
-// method. This method waits for that Lock to be released so that we know
-// that the thread is not executing a nested message loop. This way we know
-// that that Thread::Stop call will quit the appropriate message loop.
-//
-// TODO(scherkus): this can potentially deadlock, hack away our lock usage!!
+// a task to call the StopTask() method, then waits until the thread has
+// stopped.
void PipelineThread::Stop() {
if (thread_.IsRunning()) {
PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
- AutoLock lock_crit(initialization_lock_);
thread_.Stop();
}
DCHECK(filter_hosts_.empty());
@@ -352,19 +343,11 @@
PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
}
-// May be called on any thread, and therefore we always assume the worst
-// possible race condition. This could, for example, be called from a filter's
-// thread just as the pipeline thread is exiting the call to the filter's
-// Initialize() method. Therefore, we make NO assumptions, and post work
-// in every case, even the trivial one of a thread calling this method from
-// within it's Initialize method. This means that we will always run a nested
-// message loop, and the InitializationCompleteTask will Quit that loop
-// immediately in the trivial case.
void PipelineThread::InitializationComplete(FilterHostImpl* host) {
- DCHECK(host == host_initializing_);
- PostTask(NewRunnableMethod(this,
- &PipelineThread::InitializationCompleteTask,
- host));
+ if (IsPipelineOk()) {
+ // Continue the start task by proceeding to the next stage.
+ PostTask(NewRunnableMethod(this, &PipelineThread::StartTask));
+ }
}
// Called from any thread. Updates the pipeline time and schedules a task to
@@ -377,9 +360,9 @@
}
}
-// Called from any thread. Sets the pipeline error_ member and schedules a
+// Called from any thread. Sets the pipeline |error_| member and schedules a
// task to stop all the filters in the pipeline. Note that the thread will
-// continue to run until the client calls Pipeline::Stop, but nothing will
+// continue to run until the client calls Pipeline::Stop(), but nothing will
// be processed since filters will not be able to post tasks.
void PipelineThread::Error(PipelineError error) {
// If this method returns false, then an error has already happened, so no
@@ -389,67 +372,106 @@
}
}
-// Called from any thread. Used by FilterHostImpl::PostTask method and used
-// internally.
+// This is a helper method to post task on message_loop(). This method is only
+// called from this class or from Pipeline.
void PipelineThread::PostTask(Task* task) {
message_loop()->PostTask(FROM_HERE, task);
}
// Main initialization method called on the pipeline thread. This code attempts
-// to use the specified filter factory to build a pipeline. It starts by
-// creating a DataSource, connects it to a Demuxer, and then connects the
-// Demuxer's audio stream to an AudioDecoder which is then connected to an
-// AudioRenderer. If the media has video, then it connects a VideoDecoder to
-// the Demuxer's video stream, and then connects the VideoDecoder to a
-// VideoRenderer. When all required filters have been created and have called
-// their FilterHost's InitializationComplete method, the pipeline's
-// initialized_ member is set to true, and, if the client provided an
-// init_complete_callback, it is called with "true".
-// If initializatoin fails, the client's callback will still be called, but
+// to use the specified filter factory to build a pipeline.
+// Initialization step performed in this method depends on current state of this
+// object, indicated by |state_|. After each step of initialization, this
+// object transits to the next stage. It starts by creating a DataSource,
+// connects it to a Demuxer, and then connects the Demuxer's audio stream to an
+// AudioDecoder which is then connected to an AudioRenderer. If the media has
+// video, then it connects a VideoDecoder to the Demuxer's video stream, and
+// then connects the VideoDecoder to a VideoRenderer.
+//
+// When all required filters have been created and have called their
+// FilterHost's InitializationComplete method, the pipeline's |initialized_|
+// member is set to true, and, if the client provided an
+// |init_complete_callback_|, it is called with "true".
+//
+// If initialization fails, the client's callback will still be called, but
// the bool parameter passed to it will be false.
//
-// Note that at each step in this process, the initialization of any filter
-// may require running the pipeline thread's message loop recursively. This is
-// handled by the CreateFilter method.
-void PipelineThread::StartTask(FilterFactory* filter_factory,
- const std::string& url,
- PipelineCallback* init_complete_callback) {
- // During the entire StartTask we hold the initialization_lock_ so that
- // if the client calls the Pipeline::Stop method while we are running a
- // nested message loop, we can correctly unwind out of it before calling
- // the Thread::Stop method.
- AutoLock auto_lock(initialization_lock_);
+// TODO(hclam): StartTask is now starting the pipeline asynchronous. It
awong 2009/07/01 23:39:46 asynchronous -> asynchronously
+// works like a big state change table. If we no longer need to start filters
+// in order, we need to get rid of all the state change.
+void PipelineThread::StartTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
- // Add ourselves as a destruction observer of the thread's message loop so
- // we can delete filters at an appropriate time (when all tasks have been
- // processed and the thread is about to be destroyed).
- message_loop()->AddDestructionObserver(this);
+ // If we have received the stop signal, return immediately.
+ if (state_ == kStopped)
+ return;
- scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url);
- if (PipelineOk()) {
- scoped_refptr<Demuxer> demuxer =
- CreateFilter<Demuxer, DataSource>(filter_factory, data_source);
- if (PipelineOk()) {
- Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer);
+ DCHECK(state_ == kCreated || IsPipelineInitializing());
+
+ // Just created, create data source.
+ if (state_ == kCreated) {
+ message_loop()->AddDestructionObserver(this);
+ state_ = kInitDataSource;
+ CreateDataSource();
+ return;
+ }
+
+ // Data source created, create demuxer.
+ if (state_ == kInitDataSource) {
+ state_ = kInitDemuxer;
+ CreateDemuxer();
+ return;
+ }
+
+ // Demuxer created, create audio decoder.
+ if (state_ == kInitDemuxer) {
+ state_ = kInitAudioDecoder;
+ // If this method returns false, then there's no audio stream.
+ if (CreateDecoder<AudioDecoder>())
+ return;
+ }
+
+ // Assuming audio decoder was created, create audio renderer.
+ if (state_ == kInitAudioDecoder) {
+ state_ = kInitAudioRenderer;
+ // Returns false if there's no audio stream.
+ if (CreateRenderer<AudioDecoder, AudioRenderer>()) {
+ pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type());
+ return;
}
- if (PipelineOk()) {
- Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer);
- }
}
- if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) {
- Error(PIPELINE_ERROR_COULD_NOT_RENDER);
+ // Assuming audio renderer was created, create video decoder.
+ if (state_ == kInitAudioRenderer) {
+ // Then perform the stage of initialization, i.e. initialize video decoder.
+ state_ = kInitVideoDecoder;
+ if (CreateDecoder<VideoDecoder>())
+ return;
}
- pipeline_->initialized_ = PipelineOk();
+ // Assuming video decoder was created, create video renderer.
+ if (state_ == kInitVideoDecoder) {
+ state_ = kInitVideoRenderer;
+ if (CreateRenderer<VideoDecoder, VideoRenderer>()) {
+ pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type());
+ return;
+ }
+ }
- // No matter what, we're done with the filter factory, and
- // client callback so get rid of them.
- filter_factory->Release();
- if (init_complete_callback) {
- init_complete_callback->Run(pipeline_->initialized_);
- delete init_complete_callback;
+ if (state_ == kInitVideoRenderer) {
+ if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) {
+ Error(PIPELINE_ERROR_COULD_NOT_RENDER);
+ return;
+ }
+
+ state_ = kStarted;
+ pipeline_->initialized_ = true;
+ filter_factory_ = NULL;
+ if (init_callback_.get()) {
+ init_callback_->Run(true);
+ init_callback_.reset();
+ }
}
}
@@ -462,7 +484,22 @@
// Stop() tasks even if we've already stopped. Perhaps this should no-op for
// additional calls, however most of this logic will be changing.
void PipelineThread::StopTask() {
- if (PipelineOk()) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
+ if (IsPipelineInitializing()) {
+ // If IsPipelineOk() is true, the pipeline was simply stopped during
+ // initialization. Otherwise it is a failure.
+ state_ = IsPipelineOk() ? kStopped : kError;
+ filter_factory_ = NULL;
+ if (init_callback_.get()) {
+ init_callback_->Run(false);
+ init_callback_.reset();
+ }
+ } else {
+ state_ = kStopped;
+ }
+
+ if (IsPipelineOk()) {
pipeline_->error_ = PIPELINE_STOPPING;
}
@@ -518,52 +555,11 @@
++iter) {
(*iter)->Stop();
}
-
- if (host_initializing_) {
- host_initializing_ = NULL;
- message_loop()->Quit();
- }
}
-template <class Decoder, class Renderer>
-void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) {
- DCHECK(PipelineOk());
- const std::string major_mime_type = Decoder::major_mime_type();
- const int num_outputs = demuxer->GetNumberOfStreams();
- for (int i = 0; i < num_outputs; ++i) {
- scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
- std::string value;
- if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
- 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
- scoped_refptr<Decoder> decoder =
- CreateFilter<Decoder, DemuxerStream>(filter_factory, stream);
- if (PipelineOk()) {
- DCHECK(decoder);
- CreateFilter<Renderer, Decoder>(filter_factory, decoder);
- }
- if (PipelineOk()) {
- pipeline_->InsertRenderedMimeType(major_mime_type);
- }
- break;
- }
- }
-}
+void PipelineThread::SetPlaybackRateTask(float rate) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
-
-// Task runs as a result of a filter calling InitializationComplete. If for
-// some reason StopTask has been executed prior to this, the host_initializing_
-// member will be NULL, and the message loop will have been quit already, so
-// we don't want to do it again.
-void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) {
- if (host == host_initializing_) {
- host_initializing_ = NULL;
- message_loop()->Quit();
- } else {
- DCHECK(!host_initializing_);
- }
-}
-
-void PipelineThread::SetPlaybackRateTask(float rate) {
pipeline_->InternalSetPlaybackRate(rate);
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
@@ -574,6 +570,8 @@
void PipelineThread::SeekTask(base::TimeDelta time,
PipelineCallback* seek_callback) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
++iter) {
@@ -595,6 +593,8 @@
}
void PipelineThread::SetVolumeTask(float volume) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
pipeline_->volume_ = volume;
scoped_refptr<AudioRenderer> audio_renderer;
GetFilter(&audio_renderer);
@@ -604,6 +604,8 @@
}
void PipelineThread::SetTimeTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
time_update_callback_scheduled_ = false;
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
@@ -614,6 +616,8 @@
template <class Filter>
void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
*filter_out = NULL;
for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end() && NULL == *filter_out;
@@ -623,78 +627,102 @@
}
template <class Filter, class Source>
-scoped_refptr<Filter> PipelineThread::CreateFilter(
- FilterFactory* filter_factory,
- Source source,
- const MediaFormat& media_format) {
- DCHECK(PipelineOk());
+void PipelineThread::CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat& media_format) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
if (!filter) {
Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
} else {
- DCHECK(!host_initializing_);
- host_initializing_ = new FilterHostImpl(this, filter.get());
- if (NULL == host_initializing_) {
- Error(PIPELINE_ERROR_OUT_OF_MEMORY);
- } else {
- // Create a dedicated thread for this filter.
- if (SupportsSetMessageLoop<Filter>()) {
- // TODO(scherkus): figure out a way to name these threads so it matches
- // the filter type.
- scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
- if (!thread.get() || !thread->Start()) {
- NOTREACHED() << "Could not start filter thread";
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- } else {
- filter->SetMessageLoop(thread->message_loop());
- filter_threads_.push_back(thread.release());
- }
+ scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
+ // Create a dedicated thread for this filter.
+ if (SupportsSetMessageLoop<Filter>()) {
+ // TODO(scherkus): figure out a way to name these threads so it matches
+ // the filter type.
+ scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ } else {
+ filter->SetMessageLoop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
}
+ }
- // Creating a thread could have failed, verify we're still OK.
- if (PipelineOk()) {
- filter_hosts_.push_back(host_initializing_);
- filter->SetFilterHost(host_initializing_);
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- }
+ // Creating a thread could have failed, verify we're still OK.
+ if (IsPipelineOk()) {
+ filter_hosts_.push_back(host.get());
+ filter->SetFilterHost(host.release());
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
}
}
}
- if (PipelineOk()) {
- // Now we run the thread's message loop recursively. We want all
- // pending tasks to be processed, so we set nestable tasks to be allowed
- // and then run the loop. The only way we exit the loop is as the result
- // of a call to FilterHost::InitializationComplete, FilterHost::Error, or
- // Pipeline::Stop. In each of these cases, the corresponding task method
- // sets host_initializing_ to NULL to signal that the message loop's Quit
- // method has already been called, and then calls message_loop()->Quit().
- // The setting of |host_initializing_| to NULL in the task prevents a
- // subsequent task from accidentally quitting the wrong (non-nested) loop.
- message_loop()->SetNestableTasksAllowed(true);
- message_loop()->Run();
- message_loop()->SetNestableTasksAllowed(false);
- DCHECK(!host_initializing_);
- } else {
- // This could still be set if we never ran the message loop (for example,
- // if the fiter returned false from it's Initialize() method), so make sure
- // to reset it.
- host_initializing_ = NULL;
- }
- if (!PipelineOk()) {
- filter = NULL;
- }
- return filter;
}
-scoped_refptr<DataSource> PipelineThread::CreateDataSource(
- FilterFactory* filter_factory, const std::string& url) {
+void PipelineThread::CreateDataSource() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
MediaFormat url_format;
url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
- url_format.SetAsString(MediaFormat::kURL, url);
- return CreateFilter<DataSource>(filter_factory, url, url_format);
+ url_format.SetAsString(MediaFormat::kURL, url_);
+ CreateFilter<DataSource>(filter_factory_, url_, url_format);
}
+void PipelineThread::CreateDemuxer() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
+ scoped_refptr<DataSource> data_source;
+ GetFilter(&data_source);
+ DCHECK(data_source);
+ CreateFilter<Demuxer, DataSource>(filter_factory_, data_source);
+}
+
+template <class Decoder>
+bool PipelineThread::CreateDecoder() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
+ scoped_refptr<Demuxer> demuxer;
+ GetFilter(&demuxer);
+ DCHECK(demuxer);
+
+ const std::string major_mime_type = Decoder::major_mime_type();
+ const int num_outputs = demuxer->GetNumberOfStreams();
+ for (int i = 0; i < num_outputs; ++i) {
+ scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
+ std::string value;
+ if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
+ 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
+ CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream);
+ return true;
+ }
+ }
+ return false;
+}
+
+template <class Decoder, class Renderer>
+bool PipelineThread::CreateRenderer() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
+ scoped_refptr<Decoder> decoder;
+ GetFilter(&decoder);
+
+ if (decoder) {
+ // If the decoder was created.
+ const std::string major_mime_type = Decoder::major_mime_type();
+ CreateFilter<Renderer, Decoder>(filter_factory_, decoder);
+ return true;
+ }
+ return false;
+}
+
// Called as a result of destruction of the thread.
//
// TODO(scherkus): this can block the client due to synchronous Stop() API call.

Powered by Google App Engine
This is Rietveld 408576698