Index: media/base/pipeline_impl.cc |
=================================================================== |
--- media/base/pipeline_impl.cc (revision 19701) |
+++ media/base/pipeline_impl.cc (working copy) |
@@ -3,7 +3,7 @@ |
// found in the LICENSE file. |
// |
// TODO(scherkus): clean up PipelineImpl... too many crazy function names, |
-// potential deadlocks, nested message loops, etc... |
+// potential deadlocks, etc... |
#include "base/compiler_specific.h" |
#include "base/condition_variable.h" |
@@ -289,46 +289,37 @@ |
: pipeline_(pipeline), |
thread_("PipelineThread"), |
time_update_callback_scheduled_(false), |
- host_initializing_(NULL) { |
+ state_(kCreated) { |
} |
PipelineThread::~PipelineThread() { |
Stop(); |
+ DCHECK(state_ == kStopped || state_ == kError); |
} |
// This method is called on the client's thread. It starts the pipeline's |
-// dedicated thread and posts a task to call the StartTask method on that |
+// dedicated thread and posts a task to call the StartTask() method on that |
// thread. |
bool PipelineThread::Start(FilterFactory* filter_factory, |
const std::string& url, |
PipelineCallback* init_complete_callback) { |
+ DCHECK_EQ(kCreated, state_); |
if (thread_.Start()) { |
- filter_factory->AddRef(); |
- PostTask(NewRunnableMethod(this, |
- &PipelineThread::StartTask, |
- filter_factory, |
- url, |
- // TODO(ralphl): what happens to this callback? |
- // is it copied by NewRunnableTask? Just pointer |
- // or is the callback itself copied? |
- init_complete_callback)); |
+ filter_factory_ = filter_factory; |
+ url_ = url; |
+ init_callback_.reset(init_complete_callback); |
+ PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
return true; |
} |
return false; |
} |
// Called on the client's thread. If the thread has been started, then posts |
-// a task to call the StopTask method, then waits until the thread has stopped. |
-// There is a critical section that wraps the entire duration of the StartTask |
-// method. This method waits for that Lock to be released so that we know |
-// that the thread is not executing a nested message loop. This way we know |
-// that that Thread::Stop call will quit the appropriate message loop. |
-// |
-// TODO(scherkus): this can potentially deadlock, hack away our lock usage!! |
+// a task to call the StopTask() method, then waits until the thread has |
+// stopped. |
void PipelineThread::Stop() { |
if (thread_.IsRunning()) { |
PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); |
- AutoLock lock_crit(initialization_lock_); |
thread_.Stop(); |
} |
DCHECK(filter_hosts_.empty()); |
@@ -352,19 +343,11 @@ |
PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); |
} |
-// May be called on any thread, and therefore we always assume the worst |
-// possible race condition. This could, for example, be called from a filter's |
-// thread just as the pipeline thread is exiting the call to the filter's |
-// Initialize() method. Therefore, we make NO assumptions, and post work |
-// in every case, even the trivial one of a thread calling this method from |
-// within it's Initialize method. This means that we will always run a nested |
-// message loop, and the InitializationCompleteTask will Quit that loop |
-// immediately in the trivial case. |
void PipelineThread::InitializationComplete(FilterHostImpl* host) { |
- DCHECK(host == host_initializing_); |
- PostTask(NewRunnableMethod(this, |
- &PipelineThread::InitializationCompleteTask, |
- host)); |
+ if (IsPipelineOk()) { |
+ // Continue the start task by proceeding to the next stage. |
+ PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
+ } |
} |
// Called from any thread. Updates the pipeline time and schedules a task to |
@@ -377,9 +360,9 @@ |
} |
} |
-// Called from any thread. Sets the pipeline error_ member and schedules a |
+// Called from any thread. Sets the pipeline |error_| member and schedules a |
// task to stop all the filters in the pipeline. Note that the thread will |
-// continue to run until the client calls Pipeline::Stop, but nothing will |
+// continue to run until the client calls Pipeline::Stop(), but nothing will |
// be processed since filters will not be able to post tasks. |
void PipelineThread::Error(PipelineError error) { |
// If this method returns false, then an error has already happened, so no |
@@ -389,67 +372,106 @@ |
} |
} |
-// Called from any thread. Used by FilterHostImpl::PostTask method and used |
-// internally. |
+// This is a helper method to post task on message_loop(). This method is only |
+// called from this class or from Pipeline. |
void PipelineThread::PostTask(Task* task) { |
message_loop()->PostTask(FROM_HERE, task); |
} |
// Main initialization method called on the pipeline thread. This code attempts |
-// to use the specified filter factory to build a pipeline. It starts by |
-// creating a DataSource, connects it to a Demuxer, and then connects the |
-// Demuxer's audio stream to an AudioDecoder which is then connected to an |
-// AudioRenderer. If the media has video, then it connects a VideoDecoder to |
-// the Demuxer's video stream, and then connects the VideoDecoder to a |
-// VideoRenderer. When all required filters have been created and have called |
-// their FilterHost's InitializationComplete method, the pipeline's |
-// initialized_ member is set to true, and, if the client provided an |
-// init_complete_callback, it is called with "true". |
-// If initializatoin fails, the client's callback will still be called, but |
+// to use the specified filter factory to build a pipeline. |
+// Initialization step performed in this method depends on current state of this |
+// object, indicated by |state_|. After each step of initialization, this |
+// object transits to the next stage. It starts by creating a DataSource, |
+// connects it to a Demuxer, and then connects the Demuxer's audio stream to an |
+// AudioDecoder which is then connected to an AudioRenderer. If the media has |
+// video, then it connects a VideoDecoder to the Demuxer's video stream, and |
+// then connects the VideoDecoder to a VideoRenderer. |
+// |
+// When all required filters have been created and have called their |
+// FilterHost's InitializationComplete method, the pipeline's |initialized_| |
+// member is set to true, and, if the client provided an |
+// |init_complete_callback_|, it is called with "true". |
+// |
+// If initialization fails, the client's callback will still be called, but |
// the bool parameter passed to it will be false. |
// |
-// Note that at each step in this process, the initialization of any filter |
-// may require running the pipeline thread's message loop recursively. This is |
-// handled by the CreateFilter method. |
-void PipelineThread::StartTask(FilterFactory* filter_factory, |
- const std::string& url, |
- PipelineCallback* init_complete_callback) { |
- // During the entire StartTask we hold the initialization_lock_ so that |
- // if the client calls the Pipeline::Stop method while we are running a |
- // nested message loop, we can correctly unwind out of it before calling |
- // the Thread::Stop method. |
- AutoLock auto_lock(initialization_lock_); |
+// TODO(hclam): StartTask is now starting the pipeline asynchronous. It |
awong
2009/07/01 23:39:46
asynchronous -> asynchronously
|
+// works like a big state change table. If we no longer need to start filters |
+// in order, we need to get rid of all the state change. |
+void PipelineThread::StartTask() { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
- // Add ourselves as a destruction observer of the thread's message loop so |
- // we can delete filters at an appropriate time (when all tasks have been |
- // processed and the thread is about to be destroyed). |
- message_loop()->AddDestructionObserver(this); |
+ // If we have received the stop signal, return immediately. |
+ if (state_ == kStopped) |
+ return; |
- scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url); |
- if (PipelineOk()) { |
- scoped_refptr<Demuxer> demuxer = |
- CreateFilter<Demuxer, DataSource>(filter_factory, data_source); |
- if (PipelineOk()) { |
- Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer); |
+ DCHECK(state_ == kCreated || IsPipelineInitializing()); |
+ |
+ // Just created, create data source. |
+ if (state_ == kCreated) { |
+ message_loop()->AddDestructionObserver(this); |
+ state_ = kInitDataSource; |
+ CreateDataSource(); |
+ return; |
+ } |
+ |
+ // Data source created, create demuxer. |
+ if (state_ == kInitDataSource) { |
+ state_ = kInitDemuxer; |
+ CreateDemuxer(); |
+ return; |
+ } |
+ |
+ // Demuxer created, create audio decoder. |
+ if (state_ == kInitDemuxer) { |
+ state_ = kInitAudioDecoder; |
+ // If this method returns false, then there's no audio stream. |
+ if (CreateDecoder<AudioDecoder>()) |
+ return; |
+ } |
+ |
+ // Assuming audio decoder was created, create audio renderer. |
+ if (state_ == kInitAudioDecoder) { |
+ state_ = kInitAudioRenderer; |
+ // Returns false if there's no audio stream. |
+ if (CreateRenderer<AudioDecoder, AudioRenderer>()) { |
+ pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type()); |
+ return; |
} |
- if (PipelineOk()) { |
- Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer); |
- } |
} |
- if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) { |
- Error(PIPELINE_ERROR_COULD_NOT_RENDER); |
+ // Assuming audio renderer was created, create video decoder. |
+ if (state_ == kInitAudioRenderer) { |
+ // Then perform the stage of initialization, i.e. initialize video decoder. |
+ state_ = kInitVideoDecoder; |
+ if (CreateDecoder<VideoDecoder>()) |
+ return; |
} |
- pipeline_->initialized_ = PipelineOk(); |
+ // Assuming video decoder was created, create video renderer. |
+ if (state_ == kInitVideoDecoder) { |
+ state_ = kInitVideoRenderer; |
+ if (CreateRenderer<VideoDecoder, VideoRenderer>()) { |
+ pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type()); |
+ return; |
+ } |
+ } |
- // No matter what, we're done with the filter factory, and |
- // client callback so get rid of them. |
- filter_factory->Release(); |
- if (init_complete_callback) { |
- init_complete_callback->Run(pipeline_->initialized_); |
- delete init_complete_callback; |
+ if (state_ == kInitVideoRenderer) { |
+ if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { |
+ Error(PIPELINE_ERROR_COULD_NOT_RENDER); |
+ return; |
+ } |
+ |
+ state_ = kStarted; |
+ pipeline_->initialized_ = true; |
+ filter_factory_ = NULL; |
+ if (init_callback_.get()) { |
+ init_callback_->Run(true); |
+ init_callback_.reset(); |
+ } |
} |
} |
@@ -462,7 +484,22 @@ |
// Stop() tasks even if we've already stopped. Perhaps this should no-op for |
// additional calls, however most of this logic will be changing. |
void PipelineThread::StopTask() { |
- if (PipelineOk()) { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ |
+ if (IsPipelineInitializing()) { |
+ // If IsPipelineOk() is true, the pipeline was simply stopped during |
+ // initialization. Otherwise it is a failure. |
+ state_ = IsPipelineOk() ? kStopped : kError; |
+ filter_factory_ = NULL; |
+ if (init_callback_.get()) { |
+ init_callback_->Run(false); |
+ init_callback_.reset(); |
+ } |
+ } else { |
+ state_ = kStopped; |
+ } |
+ |
+ if (IsPipelineOk()) { |
pipeline_->error_ = PIPELINE_STOPPING; |
} |
@@ -518,52 +555,11 @@ |
++iter) { |
(*iter)->Stop(); |
} |
- |
- if (host_initializing_) { |
- host_initializing_ = NULL; |
- message_loop()->Quit(); |
- } |
} |
-template <class Decoder, class Renderer> |
-void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) { |
- DCHECK(PipelineOk()); |
- const std::string major_mime_type = Decoder::major_mime_type(); |
- const int num_outputs = demuxer->GetNumberOfStreams(); |
- for (int i = 0; i < num_outputs; ++i) { |
- scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); |
- std::string value; |
- if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && |
- 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { |
- scoped_refptr<Decoder> decoder = |
- CreateFilter<Decoder, DemuxerStream>(filter_factory, stream); |
- if (PipelineOk()) { |
- DCHECK(decoder); |
- CreateFilter<Renderer, Decoder>(filter_factory, decoder); |
- } |
- if (PipelineOk()) { |
- pipeline_->InsertRenderedMimeType(major_mime_type); |
- } |
- break; |
- } |
- } |
-} |
+void PipelineThread::SetPlaybackRateTask(float rate) { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
- |
-// Task runs as a result of a filter calling InitializationComplete. If for |
-// some reason StopTask has been executed prior to this, the host_initializing_ |
-// member will be NULL, and the message loop will have been quit already, so |
-// we don't want to do it again. |
-void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) { |
- if (host == host_initializing_) { |
- host_initializing_ = NULL; |
- message_loop()->Quit(); |
- } else { |
- DCHECK(!host_initializing_); |
- } |
-} |
- |
-void PipelineThread::SetPlaybackRateTask(float rate) { |
pipeline_->InternalSetPlaybackRate(rate); |
for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
iter != filter_hosts_.end(); |
@@ -574,6 +570,8 @@ |
void PipelineThread::SeekTask(base::TimeDelta time, |
PipelineCallback* seek_callback) { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ |
for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
iter != filter_hosts_.end(); |
++iter) { |
@@ -595,6 +593,8 @@ |
} |
void PipelineThread::SetVolumeTask(float volume) { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ |
pipeline_->volume_ = volume; |
scoped_refptr<AudioRenderer> audio_renderer; |
GetFilter(&audio_renderer); |
@@ -604,6 +604,8 @@ |
} |
void PipelineThread::SetTimeTask() { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ |
time_update_callback_scheduled_ = false; |
for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
iter != filter_hosts_.end(); |
@@ -614,6 +616,8 @@ |
template <class Filter> |
void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ |
*filter_out = NULL; |
for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); |
iter != filter_hosts_.end() && NULL == *filter_out; |
@@ -623,78 +627,102 @@ |
} |
template <class Filter, class Source> |
-scoped_refptr<Filter> PipelineThread::CreateFilter( |
- FilterFactory* filter_factory, |
- Source source, |
- const MediaFormat& media_format) { |
- DCHECK(PipelineOk()); |
+void PipelineThread::CreateFilter(FilterFactory* filter_factory, |
+ Source source, |
+ const MediaFormat& media_format) { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ DCHECK(IsPipelineOk()); |
+ |
scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); |
if (!filter) { |
Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); |
} else { |
- DCHECK(!host_initializing_); |
- host_initializing_ = new FilterHostImpl(this, filter.get()); |
- if (NULL == host_initializing_) { |
- Error(PIPELINE_ERROR_OUT_OF_MEMORY); |
- } else { |
- // Create a dedicated thread for this filter. |
- if (SupportsSetMessageLoop<Filter>()) { |
- // TODO(scherkus): figure out a way to name these threads so it matches |
- // the filter type. |
- scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); |
- if (!thread.get() || !thread->Start()) { |
- NOTREACHED() << "Could not start filter thread"; |
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
- } else { |
- filter->SetMessageLoop(thread->message_loop()); |
- filter_threads_.push_back(thread.release()); |
- } |
+ scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); |
+ // Create a dedicated thread for this filter. |
+ if (SupportsSetMessageLoop<Filter>()) { |
+ // TODO(scherkus): figure out a way to name these threads so it matches |
+ // the filter type. |
+ scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); |
+ if (!thread.get() || !thread->Start()) { |
+ NOTREACHED() << "Could not start filter thread"; |
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
+ } else { |
+ filter->SetMessageLoop(thread->message_loop()); |
+ filter_threads_.push_back(thread.release()); |
} |
+ } |
- // Creating a thread could have failed, verify we're still OK. |
- if (PipelineOk()) { |
- filter_hosts_.push_back(host_initializing_); |
- filter->SetFilterHost(host_initializing_); |
- if (!filter->Initialize(source)) { |
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
- } |
+ // Creating a thread could have failed, verify we're still OK. |
+ if (IsPipelineOk()) { |
+ filter_hosts_.push_back(host.get()); |
+ filter->SetFilterHost(host.release()); |
+ if (!filter->Initialize(source)) { |
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
} |
} |
} |
- if (PipelineOk()) { |
- // Now we run the thread's message loop recursively. We want all |
- // pending tasks to be processed, so we set nestable tasks to be allowed |
- // and then run the loop. The only way we exit the loop is as the result |
- // of a call to FilterHost::InitializationComplete, FilterHost::Error, or |
- // Pipeline::Stop. In each of these cases, the corresponding task method |
- // sets host_initializing_ to NULL to signal that the message loop's Quit |
- // method has already been called, and then calls message_loop()->Quit(). |
- // The setting of |host_initializing_| to NULL in the task prevents a |
- // subsequent task from accidentally quitting the wrong (non-nested) loop. |
- message_loop()->SetNestableTasksAllowed(true); |
- message_loop()->Run(); |
- message_loop()->SetNestableTasksAllowed(false); |
- DCHECK(!host_initializing_); |
- } else { |
- // This could still be set if we never ran the message loop (for example, |
- // if the fiter returned false from it's Initialize() method), so make sure |
- // to reset it. |
- host_initializing_ = NULL; |
- } |
- if (!PipelineOk()) { |
- filter = NULL; |
- } |
- return filter; |
} |
-scoped_refptr<DataSource> PipelineThread::CreateDataSource( |
- FilterFactory* filter_factory, const std::string& url) { |
+void PipelineThread::CreateDataSource() { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ DCHECK(IsPipelineOk()); |
+ |
MediaFormat url_format; |
url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); |
- url_format.SetAsString(MediaFormat::kURL, url); |
- return CreateFilter<DataSource>(filter_factory, url, url_format); |
+ url_format.SetAsString(MediaFormat::kURL, url_); |
+ CreateFilter<DataSource>(filter_factory_, url_, url_format); |
} |
+void PipelineThread::CreateDemuxer() { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ DCHECK(IsPipelineOk()); |
+ |
+ scoped_refptr<DataSource> data_source; |
+ GetFilter(&data_source); |
+ DCHECK(data_source); |
+ CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); |
+} |
+ |
+template <class Decoder> |
+bool PipelineThread::CreateDecoder() { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ DCHECK(IsPipelineOk()); |
+ |
+ scoped_refptr<Demuxer> demuxer; |
+ GetFilter(&demuxer); |
+ DCHECK(demuxer); |
+ |
+ const std::string major_mime_type = Decoder::major_mime_type(); |
+ const int num_outputs = demuxer->GetNumberOfStreams(); |
+ for (int i = 0; i < num_outputs; ++i) { |
+ scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); |
+ std::string value; |
+ if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && |
+ 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { |
+ CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+template <class Decoder, class Renderer> |
+bool PipelineThread::CreateRenderer() { |
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
+ DCHECK(IsPipelineOk()); |
+ |
+ scoped_refptr<Decoder> decoder; |
+ GetFilter(&decoder); |
+ |
+ if (decoder) { |
+ // If the decoder was created. |
+ const std::string major_mime_type = Decoder::major_mime_type(); |
+ CreateFilter<Renderer, Decoder>(filter_factory_, decoder); |
+ return true; |
+ } |
+ return false; |
+} |
+ |
// Called as a result of destruction of the thread. |
// |
// TODO(scherkus): this can block the client due to synchronous Stop() API call. |