Chromium Code Reviews| Index: media/base/pipeline_impl.cc |
| =================================================================== |
| --- media/base/pipeline_impl.cc (revision 19701) |
| +++ media/base/pipeline_impl.cc (working copy) |
| @@ -3,7 +3,7 @@ |
| // found in the LICENSE file. |
| // |
| // TODO(scherkus): clean up PipelineImpl... too many crazy function names, |
| -// potential deadlocks, nested message loops, etc... |
| +// potential deadlocks, etc... |
| #include "base/compiler_specific.h" |
| #include "base/condition_variable.h" |
| @@ -289,46 +289,37 @@ |
| : pipeline_(pipeline), |
| thread_("PipelineThread"), |
| time_update_callback_scheduled_(false), |
| - host_initializing_(NULL) { |
| + state_(kCreated) { |
| } |
| PipelineThread::~PipelineThread() { |
| Stop(); |
| + DCHECK(state_ == kStopped || state_ == kError); |
| } |
| // This method is called on the client's thread. It starts the pipeline's |
| -// dedicated thread and posts a task to call the StartTask method on that |
| +// dedicated thread and posts a task to call the StartTask() method on that |
| // thread. |
| bool PipelineThread::Start(FilterFactory* filter_factory, |
| const std::string& url, |
| PipelineCallback* init_complete_callback) { |
| + DCHECK_EQ(kCreated, state_); |
| if (thread_.Start()) { |
| - filter_factory->AddRef(); |
| - PostTask(NewRunnableMethod(this, |
| - &PipelineThread::StartTask, |
| - filter_factory, |
| - url, |
| - // TODO(ralphl): what happens to this callback? |
| - // is it copied by NewRunnableTask? Just pointer |
| - // or is the callback itself copied? |
| - init_complete_callback)); |
| + filter_factory_ = filter_factory; |
| + url_ = url; |
| + init_callback_.reset(init_complete_callback); |
| + PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
| return true; |
| } |
| return false; |
| } |
| // Called on the client's thread. If the thread has been started, then posts |
| -// a task to call the StopTask method, then waits until the thread has stopped. |
| -// There is a critical section that wraps the entire duration of the StartTask |
| -// method. This method waits for that Lock to be released so that we know |
| -// that the thread is not executing a nested message loop. This way we know |
| -// that that Thread::Stop call will quit the appropriate message loop. |
| -// |
| -// TODO(scherkus): this can potentially deadlock, hack away our lock usage!! |
| +// a task to call the StopTask() method, then waits until the thread has |
| +// stopped. |
| void PipelineThread::Stop() { |
| if (thread_.IsRunning()) { |
| PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); |
| - AutoLock lock_crit(initialization_lock_); |
| thread_.Stop(); |
| } |
| DCHECK(filter_hosts_.empty()); |
| @@ -352,19 +343,11 @@ |
| PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); |
| } |
| -// May be called on any thread, and therefore we always assume the worst |
| -// possible race condition. This could, for example, be called from a filter's |
| -// thread just as the pipeline thread is exiting the call to the filter's |
| -// Initialize() method. Therefore, we make NO assumptions, and post work |
| -// in every case, even the trivial one of a thread calling this method from |
| -// within it's Initialize method. This means that we will always run a nested |
| -// message loop, and the InitializationCompleteTask will Quit that loop |
| -// immediately in the trivial case. |
| void PipelineThread::InitializationComplete(FilterHostImpl* host) { |
| - DCHECK(host == host_initializing_); |
| - PostTask(NewRunnableMethod(this, |
| - &PipelineThread::InitializationCompleteTask, |
| - host)); |
| + if (IsPipelineOk()) { |
| + // Continue the start task by proceeding to the next stage. |
| + PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
| + } |
| } |
| // Called from any thread. Updates the pipeline time and schedules a task to |
| @@ -377,9 +360,9 @@ |
| } |
| } |
| -// Called from any thread. Sets the pipeline error_ member and schedules a |
| +// Called from any thread. Sets the pipeline |error_| member and schedules a |
| // task to stop all the filters in the pipeline. Note that the thread will |
| -// continue to run until the client calls Pipeline::Stop, but nothing will |
| +// continue to run until the client calls Pipeline::Stop(), but nothing will |
| // be processed since filters will not be able to post tasks. |
| void PipelineThread::Error(PipelineError error) { |
| // If this method returns false, then an error has already happened, so no |
| @@ -389,67 +372,106 @@ |
| } |
| } |
| -// Called from any thread. Used by FilterHostImpl::PostTask method and used |
| -// internally. |
| +// This is a helper method to post task on message_loop(). This method is only |
| +// called from this class or from Pipeline. |
| void PipelineThread::PostTask(Task* task) { |
| message_loop()->PostTask(FROM_HERE, task); |
| } |
| // Main initialization method called on the pipeline thread. This code attempts |
| -// to use the specified filter factory to build a pipeline. It starts by |
| -// creating a DataSource, connects it to a Demuxer, and then connects the |
| -// Demuxer's audio stream to an AudioDecoder which is then connected to an |
| -// AudioRenderer. If the media has video, then it connects a VideoDecoder to |
| -// the Demuxer's video stream, and then connects the VideoDecoder to a |
| -// VideoRenderer. When all required filters have been created and have called |
| -// their FilterHost's InitializationComplete method, the pipeline's |
| -// initialized_ member is set to true, and, if the client provided an |
| -// init_complete_callback, it is called with "true". |
| -// If initializatoin fails, the client's callback will still be called, but |
| +// to use the specified filter factory to build a pipeline. |
| +// Initialization step performed in this method depends on current state of this |
| +// object, indicated by |state_|. After each step of initialization, this |
| +// object transits to the next stage. It starts by creating a DataSource, |
| +// connects it to a Demuxer, and then connects the Demuxer's audio stream to an |
| +// AudioDecoder which is then connected to an AudioRenderer. If the media has |
| +// video, then it connects a VideoDecoder to the Demuxer's video stream, and |
| +// then connects the VideoDecoder to a VideoRenderer. |
| +// |
| +// When all required filters have been created and have called their |
| +// FilterHost's InitializationComplete method, the pipeline's |initialized_| |
| +// member is set to true, and, if the client provided an |
| +// |init_complete_callback_|, it is called with "true". |
| +// |
| +// If initialization fails, the client's callback will still be called, but |
| // the bool parameter passed to it will be false. |
| // |
| -// Note that at each step in this process, the initialization of any filter |
| -// may require running the pipeline thread's message loop recursively. This is |
| -// handled by the CreateFilter method. |
| -void PipelineThread::StartTask(FilterFactory* filter_factory, |
| - const std::string& url, |
| - PipelineCallback* init_complete_callback) { |
| - // During the entire StartTask we hold the initialization_lock_ so that |
| - // if the client calls the Pipeline::Stop method while we are running a |
| - // nested message loop, we can correctly unwind out of it before calling |
| - // the Thread::Stop method. |
| - AutoLock auto_lock(initialization_lock_); |
| +// TODO(hclam): StartTask is now starting the pipeline asynchronous. It |
|
awong
2009/07/01 23:39:46
asynchronous -> asynchronously
|
| +// works like a big state change table. If we no longer need to start filters |
| +// in order, we need to get rid of all the state change. |
| +void PipelineThread::StartTask() { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| - // Add ourselves as a destruction observer of the thread's message loop so |
| - // we can delete filters at an appropriate time (when all tasks have been |
| - // processed and the thread is about to be destroyed). |
| - message_loop()->AddDestructionObserver(this); |
| + // If we have received the stop signal, return immediately. |
| + if (state_ == kStopped) |
| + return; |
| - scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url); |
| - if (PipelineOk()) { |
| - scoped_refptr<Demuxer> demuxer = |
| - CreateFilter<Demuxer, DataSource>(filter_factory, data_source); |
| - if (PipelineOk()) { |
| - Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer); |
| + DCHECK(state_ == kCreated || IsPipelineInitializing()); |
| + |
| + // Just created, create data source. |
| + if (state_ == kCreated) { |
| + message_loop()->AddDestructionObserver(this); |
| + state_ = kInitDataSource; |
| + CreateDataSource(); |
| + return; |
| + } |
| + |
| + // Data source created, create demuxer. |
| + if (state_ == kInitDataSource) { |
| + state_ = kInitDemuxer; |
| + CreateDemuxer(); |
| + return; |
| + } |
| + |
| + // Demuxer created, create audio decoder. |
| + if (state_ == kInitDemuxer) { |
| + state_ = kInitAudioDecoder; |
| + // If this method returns false, then there's no audio stream. |
| + if (CreateDecoder<AudioDecoder>()) |
| + return; |
| + } |
| + |
| + // Assuming audio decoder was created, create audio renderer. |
| + if (state_ == kInitAudioDecoder) { |
| + state_ = kInitAudioRenderer; |
| + // Returns false if there's no audio stream. |
| + if (CreateRenderer<AudioDecoder, AudioRenderer>()) { |
| + pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type()); |
| + return; |
| } |
| - if (PipelineOk()) { |
| - Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer); |
| - } |
| } |
| - if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) { |
| - Error(PIPELINE_ERROR_COULD_NOT_RENDER); |
| + // Assuming audio renderer was created, create video decoder. |
| + if (state_ == kInitAudioRenderer) { |
| + // Then perform the stage of initialization, i.e. initialize video decoder. |
| + state_ = kInitVideoDecoder; |
| + if (CreateDecoder<VideoDecoder>()) |
| + return; |
| } |
| - pipeline_->initialized_ = PipelineOk(); |
| + // Assuming video decoder was created, create video renderer. |
| + if (state_ == kInitVideoDecoder) { |
| + state_ = kInitVideoRenderer; |
| + if (CreateRenderer<VideoDecoder, VideoRenderer>()) { |
| + pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type()); |
| + return; |
| + } |
| + } |
| - // No matter what, we're done with the filter factory, and |
| - // client callback so get rid of them. |
| - filter_factory->Release(); |
| - if (init_complete_callback) { |
| - init_complete_callback->Run(pipeline_->initialized_); |
| - delete init_complete_callback; |
| + if (state_ == kInitVideoRenderer) { |
| + if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { |
| + Error(PIPELINE_ERROR_COULD_NOT_RENDER); |
| + return; |
| + } |
| + |
| + state_ = kStarted; |
| + pipeline_->initialized_ = true; |
| + filter_factory_ = NULL; |
| + if (init_callback_.get()) { |
| + init_callback_->Run(true); |
| + init_callback_.reset(); |
| + } |
| } |
| } |
| @@ -462,7 +484,22 @@ |
| // Stop() tasks even if we've already stopped. Perhaps this should no-op for |
| // additional calls, however most of this logic will be changing. |
| void PipelineThread::StopTask() { |
| - if (PipelineOk()) { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + |
| + if (IsPipelineInitializing()) { |
| + // If IsPipelineOk() is true, the pipeline was simply stopped during |
| + // initialization. Otherwise it is a failure. |
| + state_ = IsPipelineOk() ? kStopped : kError; |
| + filter_factory_ = NULL; |
| + if (init_callback_.get()) { |
| + init_callback_->Run(false); |
| + init_callback_.reset(); |
| + } |
| + } else { |
| + state_ = kStopped; |
| + } |
| + |
| + if (IsPipelineOk()) { |
| pipeline_->error_ = PIPELINE_STOPPING; |
| } |
| @@ -518,52 +555,11 @@ |
| ++iter) { |
| (*iter)->Stop(); |
| } |
| - |
| - if (host_initializing_) { |
| - host_initializing_ = NULL; |
| - message_loop()->Quit(); |
| - } |
| } |
| -template <class Decoder, class Renderer> |
| -void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) { |
| - DCHECK(PipelineOk()); |
| - const std::string major_mime_type = Decoder::major_mime_type(); |
| - const int num_outputs = demuxer->GetNumberOfStreams(); |
| - for (int i = 0; i < num_outputs; ++i) { |
| - scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); |
| - std::string value; |
| - if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && |
| - 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { |
| - scoped_refptr<Decoder> decoder = |
| - CreateFilter<Decoder, DemuxerStream>(filter_factory, stream); |
| - if (PipelineOk()) { |
| - DCHECK(decoder); |
| - CreateFilter<Renderer, Decoder>(filter_factory, decoder); |
| - } |
| - if (PipelineOk()) { |
| - pipeline_->InsertRenderedMimeType(major_mime_type); |
| - } |
| - break; |
| - } |
| - } |
| -} |
| +void PipelineThread::SetPlaybackRateTask(float rate) { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| - |
| -// Task runs as a result of a filter calling InitializationComplete. If for |
| -// some reason StopTask has been executed prior to this, the host_initializing_ |
| -// member will be NULL, and the message loop will have been quit already, so |
| -// we don't want to do it again. |
| -void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) { |
| - if (host == host_initializing_) { |
| - host_initializing_ = NULL; |
| - message_loop()->Quit(); |
| - } else { |
| - DCHECK(!host_initializing_); |
| - } |
| -} |
| - |
| -void PipelineThread::SetPlaybackRateTask(float rate) { |
| pipeline_->InternalSetPlaybackRate(rate); |
| for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| iter != filter_hosts_.end(); |
| @@ -574,6 +570,8 @@ |
| void PipelineThread::SeekTask(base::TimeDelta time, |
| PipelineCallback* seek_callback) { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + |
| for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| iter != filter_hosts_.end(); |
| ++iter) { |
| @@ -595,6 +593,8 @@ |
| } |
| void PipelineThread::SetVolumeTask(float volume) { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + |
| pipeline_->volume_ = volume; |
| scoped_refptr<AudioRenderer> audio_renderer; |
| GetFilter(&audio_renderer); |
| @@ -604,6 +604,8 @@ |
| } |
| void PipelineThread::SetTimeTask() { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + |
| time_update_callback_scheduled_ = false; |
| for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| iter != filter_hosts_.end(); |
| @@ -614,6 +616,8 @@ |
| template <class Filter> |
| void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + |
| *filter_out = NULL; |
| for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); |
| iter != filter_hosts_.end() && NULL == *filter_out; |
| @@ -623,78 +627,102 @@ |
| } |
| template <class Filter, class Source> |
| -scoped_refptr<Filter> PipelineThread::CreateFilter( |
| - FilterFactory* filter_factory, |
| - Source source, |
| - const MediaFormat& media_format) { |
| - DCHECK(PipelineOk()); |
| +void PipelineThread::CreateFilter(FilterFactory* filter_factory, |
| + Source source, |
| + const MediaFormat& media_format) { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + DCHECK(IsPipelineOk()); |
| + |
| scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); |
| if (!filter) { |
| Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); |
| } else { |
| - DCHECK(!host_initializing_); |
| - host_initializing_ = new FilterHostImpl(this, filter.get()); |
| - if (NULL == host_initializing_) { |
| - Error(PIPELINE_ERROR_OUT_OF_MEMORY); |
| - } else { |
| - // Create a dedicated thread for this filter. |
| - if (SupportsSetMessageLoop<Filter>()) { |
| - // TODO(scherkus): figure out a way to name these threads so it matches |
| - // the filter type. |
| - scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); |
| - if (!thread.get() || !thread->Start()) { |
| - NOTREACHED() << "Could not start filter thread"; |
| - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
| - } else { |
| - filter->SetMessageLoop(thread->message_loop()); |
| - filter_threads_.push_back(thread.release()); |
| - } |
| + scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); |
| + // Create a dedicated thread for this filter. |
| + if (SupportsSetMessageLoop<Filter>()) { |
| + // TODO(scherkus): figure out a way to name these threads so it matches |
| + // the filter type. |
| + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); |
| + if (!thread.get() || !thread->Start()) { |
| + NOTREACHED() << "Could not start filter thread"; |
| + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
| + } else { |
| + filter->SetMessageLoop(thread->message_loop()); |
| + filter_threads_.push_back(thread.release()); |
| } |
| + } |
| - // Creating a thread could have failed, verify we're still OK. |
| - if (PipelineOk()) { |
| - filter_hosts_.push_back(host_initializing_); |
| - filter->SetFilterHost(host_initializing_); |
| - if (!filter->Initialize(source)) { |
| - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
| - } |
| + // Creating a thread could have failed, verify we're still OK. |
| + if (IsPipelineOk()) { |
| + filter_hosts_.push_back(host.get()); |
| + filter->SetFilterHost(host.release()); |
| + if (!filter->Initialize(source)) { |
| + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
| } |
| } |
| } |
| - if (PipelineOk()) { |
| - // Now we run the thread's message loop recursively. We want all |
| - // pending tasks to be processed, so we set nestable tasks to be allowed |
| - // and then run the loop. The only way we exit the loop is as the result |
| - // of a call to FilterHost::InitializationComplete, FilterHost::Error, or |
| - // Pipeline::Stop. In each of these cases, the corresponding task method |
| - // sets host_initializing_ to NULL to signal that the message loop's Quit |
| - // method has already been called, and then calls message_loop()->Quit(). |
| - // The setting of |host_initializing_| to NULL in the task prevents a |
| - // subsequent task from accidentally quitting the wrong (non-nested) loop. |
| - message_loop()->SetNestableTasksAllowed(true); |
| - message_loop()->Run(); |
| - message_loop()->SetNestableTasksAllowed(false); |
| - DCHECK(!host_initializing_); |
| - } else { |
| - // This could still be set if we never ran the message loop (for example, |
| - // if the fiter returned false from it's Initialize() method), so make sure |
| - // to reset it. |
| - host_initializing_ = NULL; |
| - } |
| - if (!PipelineOk()) { |
| - filter = NULL; |
| - } |
| - return filter; |
| } |
| -scoped_refptr<DataSource> PipelineThread::CreateDataSource( |
| - FilterFactory* filter_factory, const std::string& url) { |
| +void PipelineThread::CreateDataSource() { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + DCHECK(IsPipelineOk()); |
| + |
| MediaFormat url_format; |
| url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); |
| - url_format.SetAsString(MediaFormat::kURL, url); |
| - return CreateFilter<DataSource>(filter_factory, url, url_format); |
| + url_format.SetAsString(MediaFormat::kURL, url_); |
| + CreateFilter<DataSource>(filter_factory_, url_, url_format); |
| } |
| +void PipelineThread::CreateDemuxer() { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + DCHECK(IsPipelineOk()); |
| + |
| + scoped_refptr<DataSource> data_source; |
| + GetFilter(&data_source); |
| + DCHECK(data_source); |
| + CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); |
| +} |
| + |
| +template <class Decoder> |
| +bool PipelineThread::CreateDecoder() { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + DCHECK(IsPipelineOk()); |
| + |
| + scoped_refptr<Demuxer> demuxer; |
| + GetFilter(&demuxer); |
| + DCHECK(demuxer); |
| + |
| + const std::string major_mime_type = Decoder::major_mime_type(); |
| + const int num_outputs = demuxer->GetNumberOfStreams(); |
| + for (int i = 0; i < num_outputs; ++i) { |
| + scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); |
| + std::string value; |
| + if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && |
| + 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { |
| + CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); |
| + return true; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| +template <class Decoder, class Renderer> |
| +bool PipelineThread::CreateRenderer() { |
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| + DCHECK(IsPipelineOk()); |
| + |
| + scoped_refptr<Decoder> decoder; |
| + GetFilter(&decoder); |
| + |
| + if (decoder) { |
| + // If the decoder was created. |
| + const std::string major_mime_type = Decoder::major_mime_type(); |
| + CreateFilter<Renderer, Decoder>(filter_factory_, decoder); |
| + return true; |
| + } |
| + return false; |
| +} |
| + |
| // Called as a result of destruction of the thread. |
| // |
| // TODO(scherkus): this can block the client due to synchronous Stop() API call. |