| Index: media/base/pipeline_impl.cc
|
| diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
|
| index 68d509c6eb7445dd08a77953fc91572167a70793..56a71a6e25375e7628579434db3feee6edb12f08 100644
|
| --- a/media/base/pipeline_impl.cc
|
| +++ b/media/base/pipeline_impl.cc
|
| @@ -31,24 +31,6 @@ bool SupportsSetMessageLoop() {
|
| }
|
| }
|
|
|
| -// Small helper function to help us name filter threads for debugging.
|
| -//
|
| -// TODO(scherkus): figure out a cleaner way to derive the filter thread name.
|
| -template <class Filter>
|
| -const char* GetThreadName() {
|
| - DCHECK(SupportsSetMessageLoop<Filter>());
|
| - switch (Filter::filter_type()) {
|
| - case FILTER_DEMUXER:
|
| - return "DemuxerThread";
|
| - case FILTER_AUDIO_DECODER:
|
| - return "AudioDecoderThread";
|
| - case FILTER_VIDEO_DECODER:
|
| - return "VideoDecoderThread";
|
| - default:
|
| - return "FilterThread";
|
| - }
|
| -}
|
| -
|
| // Helper function used with NewRunnableMethod to implement a (very) crude
|
| // blocking counter.
|
| //
|
| @@ -64,61 +46,62 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
|
|
|
| } // namespace
|
|
|
| -PipelineImpl::PipelineImpl(MessageLoop* message_loop)
|
| - : message_loop_(message_loop) {
|
| +PipelineImpl::PipelineImpl() {
|
| ResetState();
|
| }
|
|
|
| PipelineImpl::~PipelineImpl() {
|
| - DCHECK(!pipeline_internal_)
|
| - << "Stop() must complete before destroying object";
|
| + Stop();
|
| }
|
|
|
| -// Creates the PipelineInternal and calls it's start method.
|
| +// Creates the PipelineThread and calls it's start method.
|
| bool PipelineImpl::Start(FilterFactory* factory,
|
| const std::string& url,
|
| - PipelineCallback* start_callback) {
|
| - DCHECK(!pipeline_internal_);
|
| + PipelineCallback* init_complete_callback) {
|
| + DCHECK(!pipeline_thread_);
|
| DCHECK(factory);
|
| - if (pipeline_internal_ || !factory) {
|
| - return false;
|
| - }
|
| -
|
| - // Create and start the PipelineInternal.
|
| - pipeline_internal_ = new PipelineInternal(this, message_loop_);
|
| - if (!pipeline_internal_) {
|
| - NOTREACHED() << "Could not create PipelineInternal";
|
| - return false;
|
| + DCHECK(!initialized_);
|
| + DCHECK(!IsPipelineThread());
|
| + if (!pipeline_thread_ && factory) {
|
| + pipeline_thread_ = new PipelineThread(this);
|
| + if (pipeline_thread_) {
|
| + // TODO(ralphl): Does the callback get copied by these fancy templates?
|
| + // if so, then do I want to always delete it here???
|
| + if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
|
| + return true;
|
| + }
|
| + pipeline_thread_ = NULL; // Releases reference to destroy thread
|
| + }
|
| }
|
| - pipeline_internal_->Start(factory, url, start_callback);
|
| - return true;
|
| + delete init_complete_callback;
|
| + return false;
|
| }
|
|
|
| -// Stop the PipelineInternal who will NULL our reference to it and reset our
|
| -// state to a newly created PipelineImpl object.
|
| -void PipelineImpl::Stop(PipelineCallback* stop_callback) {
|
| - if (pipeline_internal_) {
|
| - pipeline_internal_->Stop(stop_callback);
|
| +// Stop the PipelineThread and return to a state identical to that of a newly
|
| +// created PipelineImpl object.
|
| +void PipelineImpl::Stop() {
|
| + DCHECK(!IsPipelineThread());
|
| +
|
| + if (pipeline_thread_) {
|
| + pipeline_thread_->Stop();
|
| }
|
| + ResetState();
|
| }
|
|
|
| void PipelineImpl::Seek(base::TimeDelta time,
|
| PipelineCallback* seek_callback) {
|
| + DCHECK(!IsPipelineThread());
|
| +
|
| if (IsPipelineOk()) {
|
| - pipeline_internal_->Seek(time, seek_callback);
|
| + pipeline_thread_->Seek(time, seek_callback);
|
| } else {
|
| NOTREACHED();
|
| }
|
| }
|
|
|
| -bool PipelineImpl::IsRunning() const {
|
| - AutoLock auto_lock(const_cast<Lock&>(lock_));
|
| - return pipeline_internal_ != NULL;
|
| -}
|
| -
|
| bool PipelineImpl::IsInitialized() const {
|
| AutoLock auto_lock(lock_);
|
| - return pipeline_internal_ && pipeline_internal_->IsInitialized();
|
| + return initialized_;
|
| }
|
|
|
| bool PipelineImpl::IsRendered(const std::string& major_mime_type) const {
|
| @@ -134,8 +117,10 @@ float PipelineImpl::GetPlaybackRate() const {
|
| }
|
|
|
| void PipelineImpl::SetPlaybackRate(float rate) {
|
| + DCHECK(!IsPipelineThread());
|
| +
|
| if (IsPipelineOk() && rate >= 0.0f) {
|
| - pipeline_internal_->SetPlaybackRate(rate);
|
| + pipeline_thread_->SetPlaybackRate(rate);
|
| } else {
|
| // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped.
|
| DCHECK(rate == 0.0f && playback_rate_ == 0.0f);
|
| @@ -148,8 +133,10 @@ float PipelineImpl::GetVolume() const {
|
| }
|
|
|
| void PipelineImpl::SetVolume(float volume) {
|
| + DCHECK(!IsPipelineThread());
|
| +
|
| if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) {
|
| - pipeline_internal_->SetVolume(volume);
|
| + pipeline_thread_->SetVolume(volume);
|
| } else {
|
| NOTREACHED();
|
| }
|
| @@ -195,7 +182,8 @@ PipelineError PipelineImpl::GetError() const {
|
|
|
| void PipelineImpl::ResetState() {
|
| AutoLock auto_lock(lock_);
|
| - pipeline_internal_ = NULL;
|
| + pipeline_thread_ = NULL;
|
| + initialized_ = false;
|
| duration_ = base::TimeDelta();
|
| buffered_time_ = base::TimeDelta();
|
| buffered_bytes_ = 0;
|
| @@ -210,7 +198,12 @@ void PipelineImpl::ResetState() {
|
| }
|
|
|
| bool PipelineImpl::IsPipelineOk() const {
|
| - return pipeline_internal_ && PIPELINE_OK == error_;
|
| + return pipeline_thread_ && initialized_ && PIPELINE_OK == error_;
|
| +}
|
| +
|
| +bool PipelineImpl::IsPipelineThread() const {
|
| + return pipeline_thread_ &&
|
| + PlatformThread::CurrentId() == pipeline_thread_->thread_id();
|
| }
|
|
|
| void PipelineImpl::SetDuration(base::TimeDelta duration) {
|
| @@ -270,86 +263,100 @@ void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) {
|
|
|
| //-----------------------------------------------------------------------------
|
|
|
| -PipelineInternal::PipelineInternal(PipelineImpl* pipeline,
|
| - MessageLoop* message_loop)
|
| +PipelineThread::PipelineThread(PipelineImpl* pipeline)
|
| : pipeline_(pipeline),
|
| - message_loop_(message_loop),
|
| + thread_("PipelineThread"),
|
| state_(kCreated) {
|
| }
|
|
|
| -PipelineInternal::~PipelineInternal() {
|
| - DCHECK(state_ == kCreated || state_ == kStopped);
|
| +PipelineThread::~PipelineThread() {
|
| + Stop();
|
| + DCHECK(state_ == kStopped || state_ == kError);
|
| }
|
|
|
| -// Called on client's thread.
|
| -void PipelineInternal::Start(FilterFactory* filter_factory,
|
| - const std::string& url,
|
| - PipelineCallback* start_callback) {
|
| - DCHECK(filter_factory);
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url,
|
| - start_callback));
|
| +// This method is called on the client's thread. It starts the pipeline's
|
| +// dedicated thread and posts a task to call the StartTask() method on that
|
| +// thread.
|
| +bool PipelineThread::Start(FilterFactory* filter_factory,
|
| + const std::string& url,
|
| + PipelineCallback* init_complete_callback) {
|
| + DCHECK_EQ(kCreated, state_);
|
| + if (thread_.Start()) {
|
| + filter_factory_ = filter_factory;
|
| + url_ = url;
|
| + init_callback_.reset(init_complete_callback);
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::StartTask));
|
| + return true;
|
| + }
|
| + return false;
|
| }
|
|
|
| -// Called on client's thread.
|
| -void PipelineInternal::Stop(PipelineCallback* stop_callback) {
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback));
|
| +// Called on the client's thread. If the thread has been started, then posts
|
| +// a task to call the StopTask() method, then waits until the thread has
|
| +// stopped.
|
| +void PipelineThread::Stop() {
|
| + if (thread_.IsRunning()) {
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::StopTask));
|
| + thread_.Stop();
|
| + }
|
| + DCHECK(filter_hosts_.empty());
|
| + DCHECK(filter_threads_.empty());
|
| }
|
|
|
| // Called on client's thread.
|
| -void PipelineInternal::Seek(base::TimeDelta time,
|
| +void PipelineThread::Seek(base::TimeDelta time,
|
| PipelineCallback* seek_callback) {
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::SeekTask, time,
|
| - seek_callback));
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback));
|
| }
|
|
|
| // Called on client's thread.
|
| -void PipelineInternal::SetPlaybackRate(float rate) {
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate));
|
| +void PipelineThread::SetPlaybackRate(float rate) {
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
|
| }
|
|
|
| // Called on client's thread.
|
| -void PipelineInternal::SetVolume(float volume) {
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume));
|
| +void PipelineThread::SetVolume(float volume) {
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
|
| }
|
|
|
| // Called from any thread.
|
| -void PipelineInternal::InitializationComplete(FilterHostImpl* host) {
|
| +void PipelineThread::InitializationComplete(FilterHostImpl* host) {
|
| if (IsPipelineOk()) {
|
| - // Continue the initialize task by proceeding to the next stage.
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::InitializeTask));
|
| + // Continue the start task by proceeding to the next stage.
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::StartTask));
|
| }
|
| }
|
|
|
| // Called from any thread. Updates the pipeline time.
|
| -void PipelineInternal::SetTime(base::TimeDelta time) {
|
| - // TODO(scherkus): why not post a task?
|
| - pipeline_->SetTime(time);
|
| +void PipelineThread::SetTime(base::TimeDelta time) {
|
| + pipeline()->SetTime(time);
|
| }
|
|
|
| -// Called from any thread. Sets the pipeline |error_| member and destroys all
|
| -// filters.
|
| -void PipelineInternal::Error(PipelineError error) {
|
| - message_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &PipelineInternal::ErrorTask, error));
|
| +// Called from any thread. Sets the pipeline |error_| member and schedules a
|
| +// task to stop all the filters in the pipeline. Note that the thread will
|
| +// continue to run until the client calls Pipeline::Stop(), but nothing will
|
| +// be processed since filters will not be able to post tasks.
|
| +void PipelineThread::Error(PipelineError error) {
|
| + // If this method returns false, then an error has already happened, so no
|
| + // reason to run the StopTask again. It's going to happen.
|
| + if (pipeline()->InternalSetError(error)) {
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &PipelineThread::StopTask));
|
| + }
|
| }
|
|
|
| -void PipelineInternal::StartTask(FilterFactory* filter_factory,
|
| - const std::string& url,
|
| - PipelineCallback* start_callback) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| - DCHECK_EQ(kCreated, state_);
|
| - filter_factory_ = filter_factory;
|
| - url_ = url;
|
| - start_callback_.reset(start_callback);
|
| -
|
| - // Kick off initialization.
|
| - InitializeTask();
|
| +// Called as a result of destruction of the thread.
|
| +//
|
| +// TODO(scherkus): this can block the client due to synchronous Stop() API call.
|
| +void PipelineThread::WillDestroyCurrentMessageLoop() {
|
| + STLDeleteElements(&filter_hosts_);
|
| + STLDeleteElements(&filter_threads_);
|
| }
|
|
|
| // Main initialization method called on the pipeline thread. This code attempts
|
| @@ -363,17 +370,18 @@ void PipelineInternal::StartTask(FilterFactory* filter_factory,
|
| // then connects the VideoDecoder to a VideoRenderer.
|
| //
|
| // When all required filters have been created and have called their
|
| -// FilterHost's InitializationComplete() method, the pipeline will update its
|
| -// state to kStarted and |init_callback_|, will be executed.
|
| +// FilterHost's InitializationComplete method, the pipeline's |initialized_|
|
| +// member is set to true, and, if the client provided an
|
| +// |init_complete_callback_|, it is called with "true".
|
| //
|
| // If initialization fails, the client's callback will still be called, but
|
| // the bool parameter passed to it will be false.
|
| //
|
| -// TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It
|
| +// TODO(hclam): StartTask is now starting the pipeline asynchronously. It
|
| // works like a big state change table. If we no longer need to start filters
|
| // in order, we need to get rid of all the state change.
|
| -void PipelineInternal::InitializeTask() {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::StartTask() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
|
|
| // If we have received the stop signal, return immediately.
|
| if (state_ == kStopped)
|
| @@ -383,6 +391,7 @@ void PipelineInternal::InitializeTask() {
|
|
|
| // Just created, create data source.
|
| if (state_ == kCreated) {
|
| + message_loop()->AddDestructionObserver(this);
|
| state_ = kInitDataSource;
|
| CreateDataSource();
|
| return;
|
| @@ -437,77 +446,99 @@ void PipelineInternal::InitializeTask() {
|
| }
|
|
|
| state_ = kStarted;
|
| + pipeline_->initialized_ = true;
|
| filter_factory_ = NULL;
|
| - if (start_callback_.get()) {
|
| - start_callback_->Run(true);
|
| - start_callback_.reset();
|
| + if (init_callback_.get()) {
|
| + init_callback_->Run(true);
|
| + init_callback_.reset();
|
| }
|
| }
|
| }
|
|
|
| // This method is called as a result of the client calling Pipeline::Stop() or
|
| // as the result of an error condition. If there is no error, then set the
|
| -// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the
|
| +// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
|
| // reverse order.
|
| //
|
| // TODO(scherkus): beware! this can get posted multiple times since we post
|
| // Stop() tasks even if we've already stopped. Perhaps this should no-op for
|
| // additional calls, however most of this logic will be changing.
|
| -void PipelineInternal::StopTask(PipelineCallback* stop_callback) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| - stop_callback_.reset(stop_callback);
|
| +void PipelineThread::StopTask() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
|
|
| - // If we've already stopped, return immediately.
|
| - if (state_ == kStopped) {
|
| - return;
|
| + if (IsPipelineInitializing()) {
|
| + // If IsPipelineOk() is true, the pipeline was simply stopped during
|
| + // initialization. Otherwise it is a failure.
|
| + state_ = IsPipelineOk() ? kStopped : kError;
|
| + filter_factory_ = NULL;
|
| + if (init_callback_.get()) {
|
| + init_callback_->Run(false);
|
| + init_callback_.reset();
|
| + }
|
| + } else {
|
| + state_ = kStopped;
|
| }
|
|
|
| - // Carry out setting the error, notifying the client and destroying filters.
|
| - ErrorTask(PIPELINE_STOPPING);
|
| -
|
| - // We no longer need to examine our previous state, set it to stopped.
|
| - state_ = kStopped;
|
| -
|
| - // Reset the pipeline and set our reference to NULL so we don't accidentally
|
| - // modify the pipeline. Once remaining tasks execute we will be destroyed.
|
| - pipeline_->ResetState();
|
| - pipeline_ = NULL;
|
| -
|
| - // Notify the client that stopping has finished.
|
| - if (stop_callback_.get()) {
|
| - stop_callback_->Run(true);
|
| - stop_callback_.reset();
|
| + if (IsPipelineOk()) {
|
| + pipeline_->error_ = PIPELINE_STOPPING;
|
| }
|
| -}
|
|
|
| -void PipelineInternal::ErrorTask(PipelineError error) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| - DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
|
| + // Stop every filter.
|
| + for (FilterHostVector::iterator iter = filter_hosts_.begin();
|
| + iter != filter_hosts_.end();
|
| + ++iter) {
|
| + (*iter)->Stop();
|
| + }
|
|
|
| - // Suppress executing additional error logic.
|
| - if (state_ == kError) {
|
| - return;
|
| + // Figure out how many threads we have to stop.
|
| + //
|
| + // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
|
| + FilterThreadVector running_threads;
|
| + for (FilterThreadVector::iterator iter = filter_threads_.begin();
|
| + iter != filter_threads_.end();
|
| + ++iter) {
|
| + if ((*iter)->IsRunning()) {
|
| + running_threads.push_back(*iter);
|
| + }
|
| }
|
|
|
| - // Update our error code first in case we execute the start callback.
|
| - pipeline_->error_ = error;
|
| + // Crude blocking counter implementation.
|
| + Lock lock;
|
| + ConditionVariable wait_for_zero(&lock);
|
| + int count = running_threads.size();
|
|
|
| - // Notify the client that starting did not complete, if necessary.
|
| - if (IsPipelineInitializing() && start_callback_.get()) {
|
| - start_callback_->Run(false);
|
| + // Post a task to every filter's thread to ensure that they've completed their
|
| + // stopping logic before stopping the threads themselves.
|
| + //
|
| + // TODO(scherkus): again, Stop() should either be synchronous or we should
|
| + // receive a signal from filters that they have indeed stopped.
|
| + for (FilterThreadVector::iterator iter = running_threads.begin();
|
| + iter != running_threads.end();
|
| + ++iter) {
|
| + (*iter)->message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
|
| }
|
| - start_callback_.reset();
|
| - filter_factory_ = NULL;
|
|
|
| - // We no longer need to examine our previous state, set it to stopped.
|
| - state_ = kError;
|
| + // Wait on our "blocking counter".
|
| + {
|
| + AutoLock auto_lock(lock);
|
| + while (count > 0) {
|
| + wait_for_zero.Wait();
|
| + }
|
| + }
|
|
|
| - // Destroy every filter and reset the pipeline as well.
|
| - DestroyFilters();
|
| + // Stop every running filter thread.
|
| + //
|
| + // TODO(scherkus): can we watchdog this section to detect wedged threads?
|
| + for (FilterThreadVector::iterator iter = running_threads.begin();
|
| + iter != running_threads.end();
|
| + ++iter) {
|
| + (*iter)->Stop();
|
| + }
|
| }
|
|
|
| -void PipelineInternal::SetPlaybackRateTask(float rate) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::SetPlaybackRateTask(float rate) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
|
|
| pipeline_->InternalSetPlaybackRate(rate);
|
| for (FilterHostVector::iterator iter = filter_hosts_.begin();
|
| @@ -517,10 +548,9 @@ void PipelineInternal::SetPlaybackRateTask(float rate) {
|
| }
|
| }
|
|
|
| -void PipelineInternal::SeekTask(base::TimeDelta time,
|
| - PipelineCallback* seek_callback) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| - seek_callback_.reset(seek_callback);
|
| +void PipelineThread::SeekTask(base::TimeDelta time,
|
| + PipelineCallback* seek_callback) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
|
|
| for (FilterHostVector::iterator iter = filter_hosts_.begin();
|
| iter != filter_hosts_.end();
|
| @@ -536,14 +566,14 @@ void PipelineInternal::SeekTask(base::TimeDelta time,
|
| // immediately here or we set time and do callback when we have new
|
| // frames/packets.
|
| SetTime(time);
|
| - if (seek_callback_.get()) {
|
| - seek_callback_->Run(true);
|
| - seek_callback_.reset();
|
| + if (seek_callback) {
|
| + seek_callback->Run(true);
|
| + delete seek_callback;
|
| }
|
| }
|
|
|
| -void PipelineInternal::SetVolumeTask(float volume) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::SetVolumeTask(float volume) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
|
|
| pipeline_->volume_ = volume;
|
| scoped_refptr<AudioRenderer> audio_renderer;
|
| @@ -554,46 +584,44 @@ void PipelineInternal::SetVolumeTask(float volume) {
|
| }
|
|
|
| template <class Filter, class Source>
|
| -void PipelineInternal::CreateFilter(FilterFactory* filter_factory,
|
| - Source source,
|
| - const MediaFormat& media_format) {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::CreateFilter(FilterFactory* filter_factory,
|
| + Source source,
|
| + const MediaFormat& media_format) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
| DCHECK(IsPipelineOk());
|
|
|
| - // Create the filter.
|
| scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
|
| if (!filter) {
|
| Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
|
| - return;
|
| - }
|
| -
|
| - // Create a dedicated thread for this filter if applicable.
|
| - if (SupportsSetMessageLoop<Filter>()) {
|
| - scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>()));
|
| - if (!thread.get() || !thread->Start()) {
|
| - NOTREACHED() << "Could not start filter thread";
|
| - Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
|
| - return;
|
| + } else {
|
| + scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
|
| + // Create a dedicated thread for this filter.
|
| + if (SupportsSetMessageLoop<Filter>()) {
|
| + // TODO(scherkus): figure out a way to name these threads so it matches
|
| + // the filter type.
|
| + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
|
| + if (!thread.get() || !thread->Start()) {
|
| + NOTREACHED() << "Could not start filter thread";
|
| + Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
|
| + } else {
|
| + filter->set_message_loop(thread->message_loop());
|
| + filter_threads_.push_back(thread.release());
|
| + }
|
| }
|
|
|
| - filter->set_message_loop(thread->message_loop());
|
| - filter_threads_.push_back(thread.release());
|
| - }
|
| -
|
| - // Create the filter's host.
|
| - DCHECK(IsPipelineOk());
|
| - scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
|
| - filter->set_host(host.get());
|
| - filter_hosts_.push_back(host.release());
|
| -
|
| - // Now initialize the filter.
|
| - if (!filter->Initialize(source)) {
|
| - Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
|
| + // Creating a thread could have failed, verify we're still OK.
|
| + if (IsPipelineOk()) {
|
| + filter_hosts_.push_back(host.get());
|
| + filter->set_host(host.release());
|
| + if (!filter->Initialize(source)) {
|
| + Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
|
| + }
|
| + }
|
| }
|
| }
|
|
|
| -void PipelineInternal::CreateDataSource() {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::CreateDataSource() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
| DCHECK(IsPipelineOk());
|
|
|
| MediaFormat url_format;
|
| @@ -602,8 +630,8 @@ void PipelineInternal::CreateDataSource() {
|
| CreateFilter<DataSource>(filter_factory_, url_, url_format);
|
| }
|
|
|
| -void PipelineInternal::CreateDemuxer() {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::CreateDemuxer() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
| DCHECK(IsPipelineOk());
|
|
|
| scoped_refptr<DataSource> data_source;
|
| @@ -613,8 +641,8 @@ void PipelineInternal::CreateDemuxer() {
|
| }
|
|
|
| template <class Decoder>
|
| -bool PipelineInternal::CreateDecoder() {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +bool PipelineThread::CreateDecoder() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
| DCHECK(IsPipelineOk());
|
|
|
| scoped_refptr<Demuxer> demuxer;
|
| @@ -636,8 +664,8 @@ bool PipelineInternal::CreateDecoder() {
|
| }
|
|
|
| template <class Decoder, class Renderer>
|
| -bool PipelineInternal::CreateRenderer() {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +bool PipelineThread::CreateRenderer() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
| DCHECK(IsPipelineOk());
|
|
|
| scoped_refptr<Decoder> decoder;
|
| @@ -653,8 +681,8 @@ bool PipelineInternal::CreateRenderer() {
|
| }
|
|
|
| template <class Filter>
|
| -void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const {
|
| - DCHECK_EQ(MessageLoop::current(), message_loop_);
|
| +void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
|
|
|
| *filter_out = NULL;
|
| for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
|
| @@ -664,53 +692,4 @@ void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const {
|
| }
|
| }
|
|
|
| -void PipelineInternal::DestroyFilters() {
|
| - // Stop every filter.
|
| - for (FilterHostVector::iterator iter = filter_hosts_.begin();
|
| - iter != filter_hosts_.end();
|
| - ++iter) {
|
| - (*iter)->Stop();
|
| - }
|
| -
|
| - // Crude blocking counter implementation.
|
| - Lock lock;
|
| - ConditionVariable wait_for_zero(&lock);
|
| - int count = filter_threads_.size();
|
| -
|
| - // Post a task to every filter's thread to ensure that they've completed their
|
| - // stopping logic before stopping the threads themselves.
|
| - //
|
| - // TODO(scherkus): again, Stop() should either be synchronous or we should
|
| - // receive a signal from filters that they have indeed stopped.
|
| - for (FilterThreadVector::iterator iter = filter_threads_.begin();
|
| - iter != filter_threads_.end();
|
| - ++iter) {
|
| - (*iter)->message_loop()->PostTask(FROM_HERE,
|
| - NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
|
| - }
|
| -
|
| - // Wait on our "blocking counter".
|
| - {
|
| - AutoLock auto_lock(lock);
|
| - while (count > 0) {
|
| - wait_for_zero.Wait();
|
| - }
|
| - }
|
| -
|
| - // Stop every running filter thread.
|
| - //
|
| - // TODO(scherkus): can we watchdog this section to detect wedged threads?
|
| - for (FilterThreadVector::iterator iter = filter_threads_.begin();
|
| - iter != filter_threads_.end();
|
| - ++iter) {
|
| - (*iter)->Stop();
|
| - }
|
| -
|
| - // Reset the pipeline, which will decrement a reference to this object.
|
| - // We will get destroyed as soon as the remaining tasks finish executing.
|
| - // To be safe, we'll set our pipeline reference to NULL.
|
| - STLDeleteElements(&filter_hosts_);
|
| - STLDeleteElements(&filter_threads_);
|
| -}
|
| -
|
| } // namespace media
|
|
|