OLD | NEW |
---|---|
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 // | 4 // |
5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, | 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, |
6 // potential deadlocks, etc... | 6 // potential deadlocks, etc... |
7 | 7 |
8 #include "base/callback.h" | 8 #include "base/callback.h" |
9 #include "base/compiler_specific.h" | 9 #include "base/compiler_specific.h" |
10 #include "base/condition_variable.h" | 10 #include "base/condition_variable.h" |
(...skipping 331 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
342 bool PipelineImpl::IsPipelineInitializing() { | 342 bool PipelineImpl::IsPipelineInitializing() { |
343 DCHECK_EQ(MessageLoop::current(), message_loop_); | 343 DCHECK_EQ(MessageLoop::current(), message_loop_); |
344 return state_ == kInitDataSource || | 344 return state_ == kInitDataSource || |
345 state_ == kInitDemuxer || | 345 state_ == kInitDemuxer || |
346 state_ == kInitAudioDecoder || | 346 state_ == kInitAudioDecoder || |
347 state_ == kInitAudioRenderer || | 347 state_ == kInitAudioRenderer || |
348 state_ == kInitVideoDecoder || | 348 state_ == kInitVideoDecoder || |
349 state_ == kInitVideoRenderer; | 349 state_ == kInitVideoRenderer; |
350 } | 350 } |
351 | 351 |
352 // static | 352 bool PipelineImpl::IsPipelineStopped() { |
353 bool PipelineImpl::StateTransitionsToStarted(State state) { | 353 DCHECK_EQ(MessageLoop::current(), message_loop_); |
354 return state == kPausing || state == kSeeking || state == kStarting; | 354 return state_ == kStopped || state_ == kError; |
355 } | |
356 | |
357 void PipelineImpl::FinishInitialization() { | |
358 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
359 // Execute the seek callback, if present. Note that this might be the | |
360 // initial callback passed into Start(). | |
361 if (seek_callback_.get()) { | |
362 seek_callback_->Run(); | |
363 seek_callback_.reset(); | |
364 } | |
365 filter_factory_ = NULL; | |
355 } | 366 } |
356 | 367 |
357 // static | 368 // static |
369 bool PipelineImpl::TransientState(State state) { | |
370 return state == kPausing || | |
371 state == kSeeking || | |
372 state == kStarting || | |
373 state == kStopping; | |
374 } | |
375 | |
376 // static | |
358 PipelineImpl::State PipelineImpl::FindNextState(State current) { | 377 PipelineImpl::State PipelineImpl::FindNextState(State current) { |
359 // TODO(scherkus): refactor InitializeTask() to make use of this function. | 378 // TODO(scherkus): refactor InitializeTask() to make use of this function. |
360 if (current == kPausing) | 379 if (current == kPausing) |
361 return kSeeking; | 380 return kSeeking; |
362 if (current == kSeeking) | 381 if (current == kSeeking) |
363 return kStarting; | 382 return kStarting; |
364 if (current == kStarting) | 383 if (current == kStarting) |
365 return kStarted; | 384 return kStarted; |
385 if (current == kStopping) | |
386 return kStopped; | |
366 return current; | 387 return current; |
367 } | 388 } |
368 | 389 |
369 void PipelineImpl::SetError(PipelineError error) { | 390 void PipelineImpl::SetError(PipelineError error) { |
370 DCHECK(IsRunning()); | 391 DCHECK(IsRunning()); |
371 DCHECK(error != PIPELINE_OK) << "PIPELINE_OK isn't an error!"; | 392 DCHECK(error != PIPELINE_OK) << "PIPELINE_OK isn't an error!"; |
372 LOG(INFO) << "Media pipeline error: " << error; | 393 LOG(INFO) << "Media pipeline error: " << error; |
373 | 394 |
374 AutoLock auto_lock(lock_); | |
375 error_ = error; | |
376 message_loop_->PostTask(FROM_HERE, | 395 message_loop_->PostTask(FROM_HERE, |
377 NewRunnableMethod(this, &PipelineImpl::ErrorChangedTask, error)); | 396 NewRunnableMethod(this, &PipelineImpl::ErrorChangedTask, error)); |
378 } | 397 } |
379 | 398 |
380 base::TimeDelta PipelineImpl::GetTime() const { | 399 base::TimeDelta PipelineImpl::GetTime() const { |
381 DCHECK(IsRunning()); | 400 DCHECK(IsRunning()); |
382 return GetCurrentTime(); | 401 return GetCurrentTime(); |
383 } | 402 } |
384 | 403 |
385 base::TimeDelta PipelineImpl::GetDuration() const { | 404 base::TimeDelta PipelineImpl::GetDuration() const { |
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
524 // FilterHost's InitializationComplete() method, the pipeline will update its | 543 // FilterHost's InitializationComplete() method, the pipeline will update its |
525 // state to kStarted and |init_callback_|, will be executed. | 544 // state to kStarted and |init_callback_|, will be executed. |
526 // | 545 // |
527 // TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It | 546 // TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It |
528 // works like a big state change table. If we no longer need to start filters | 547 // works like a big state change table. If we no longer need to start filters |
529 // in order, we need to get rid of all the state change. | 548 // in order, we need to get rid of all the state change. |
530 void PipelineImpl::InitializeTask() { | 549 void PipelineImpl::InitializeTask() { |
531 DCHECK_EQ(MessageLoop::current(), message_loop_); | 550 DCHECK_EQ(MessageLoop::current(), message_loop_); |
532 | 551 |
533 // If we have received the stop or error signal, return immediately. | 552 // If we have received the stop or error signal, return immediately. |
534 if (state_ == kStopped || state_ == kError) | 553 if (state_ == kStopping || IsPipelineStopped()) |
535 return; | 554 return; |
536 | 555 |
537 DCHECK(state_ == kCreated || IsPipelineInitializing()); | 556 DCHECK(state_ == kCreated || IsPipelineInitializing()); |
538 | 557 |
539 // Just created, create data source. | 558 // Just created, create data source. |
540 if (state_ == kCreated) { | 559 if (state_ == kCreated) { |
541 state_ = kInitDataSource; | 560 state_ = kInitDataSource; |
542 CreateDataSource(); | 561 CreateDataSource(); |
543 return; | 562 return; |
544 } | 563 } |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
584 return; | 603 return; |
585 } | 604 } |
586 } | 605 } |
587 | 606 |
588 if (state_ == kInitVideoRenderer) { | 607 if (state_ == kInitVideoRenderer) { |
589 if (!IsPipelineOk() || !HasRenderedMimeTypes()) { | 608 if (!IsPipelineOk() || !HasRenderedMimeTypes()) { |
590 SetError(PIPELINE_ERROR_COULD_NOT_RENDER); | 609 SetError(PIPELINE_ERROR_COULD_NOT_RENDER); |
591 return; | 610 return; |
592 } | 611 } |
593 | 612 |
594 // We've successfully created and initialized every filter, so we no longer | |
595 // need the filter factory. | |
596 filter_factory_ = NULL; | |
597 | |
598 // Initialization was successful, we are now considered paused, so it's safe | 613 // Initialization was successful, we are now considered paused, so it's safe |
599 // to set the initial playback rate and volume. | 614 // to set the initial playback rate and volume. |
600 PlaybackRateChangedTask(GetPlaybackRate()); | 615 PlaybackRateChangedTask(GetPlaybackRate()); |
601 VolumeChangedTask(GetVolume()); | 616 VolumeChangedTask(GetVolume()); |
602 | 617 |
603 // Fire the initial seek request to get the filters to preroll. | 618 // Fire the initial seek request to get the filters to preroll. |
604 state_ = kSeeking; | 619 state_ = kSeeking; |
605 remaining_transitions_ = filters_.size(); | 620 remaining_transitions_ = filters_.size(); |
606 seek_timestamp_ = base::TimeDelta(); | 621 seek_timestamp_ = base::TimeDelta(); |
607 filters_.front()->Seek(seek_timestamp_, | 622 filters_.front()->Seek(seek_timestamp_, |
608 NewCallback(this, &PipelineImpl::OnFilterStateTransition)); | 623 NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
609 } | 624 } |
610 } | 625 } |
611 | 626 |
612 // This method is called as a result of the client calling Pipeline::Stop() or | 627 // This method is called as a result of the client calling Pipeline::Stop() or |
613 // as the result of an error condition. If there is no error, then set the | 628 // as the result of an error condition. |
614 // pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the | 629 // We stop the filters in the reverse order. |
615 // reverse order. | |
616 // | 630 // |
617 // TODO(scherkus): beware! this can get posted multiple times since we post | 631 // TODO(scherkus): beware! this can get posted multiple times since we post |
618 // Stop() tasks even if we've already stopped. Perhaps this should no-op for | 632 // Stop() tasks even if we've already stopped. Perhaps this should no-op for |
619 // additional calls, however most of this logic will be changing. | 633 // additional calls, however most of this logic will be changing. |
620 void PipelineImpl::StopTask(PipelineCallback* stop_callback) { | 634 void PipelineImpl::StopTask(PipelineCallback* stop_callback) { |
621 DCHECK_EQ(MessageLoop::current(), message_loop_); | 635 DCHECK_EQ(MessageLoop::current(), message_loop_); |
636 PipelineError error = GetError(); | |
637 | |
638 if (state_ == kStopped || (state_ == kStopping && error == PIPELINE_OK)) { | |
scherkus (not reviewing)
2010/06/02 02:13:05
this bit of logic is confusing me...
so if we're
| |
639 // If we are already stopped or stopping normally, return immediately. | |
640 delete stop_callback; | |
641 return; | |
642 } else if (state_ == kError || | |
scherkus (not reviewing)
2010/06/02 02:13:05
nit: we return, don't need else
| |
643 (state_ == kStopping && error != PIPELINE_OK)) { | |
644 // If we are stopping due to SetError(), stop normally instead of | |
645 // going to error state. | |
646 AutoLock auto_lock(lock_); | |
647 error_ = PIPELINE_OK; | |
648 } | |
649 | |
622 stop_callback_.reset(stop_callback); | 650 stop_callback_.reset(stop_callback); |
623 | 651 |
624 // If we've already stopped, return immediately. | 652 if (IsPipelineInitializing()) { |
625 if (state_ == kStopped) { | 653 FinishInitialization(); |
626 return; | |
627 } | 654 } |
628 | 655 |
629 // Carry out setting the error, notifying the client and destroying filters. | 656 StartDestroyingFilters(); |
630 ErrorChangedTask(PIPELINE_STOPPING); | |
631 | |
632 // We no longer need to examine our previous state, set it to stopped. | |
633 state_ = kStopped; | |
634 | |
635 // Reset the pipeline. | |
636 ResetState(); | |
637 | |
638 // Notify the client that stopping has finished. | |
639 if (stop_callback_.get()) { | |
640 stop_callback_->Run(); | |
641 stop_callback_.reset(); | |
642 } | |
643 } | 657 } |
644 | 658 |
645 void PipelineImpl::ErrorChangedTask(PipelineError error) { | 659 void PipelineImpl::ErrorChangedTask(PipelineError error) { |
646 DCHECK_EQ(MessageLoop::current(), message_loop_); | 660 DCHECK_EQ(MessageLoop::current(), message_loop_); |
647 DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!"; | 661 DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!"; |
648 | 662 |
649 // Suppress executing additional error logic. | 663 // Suppress executing additional error logic. Note that if we are currently |
650 // TODO(hclam): Remove the condition for kStopped. It is there only because | 664 // performing a normal stop, then we return immediately and continue the |
651 // FFmpegDemuxer submits a read error while reading after it is called to | 665 // normal stop. |
652 // stop. After FFmpegDemuxer is cleaned up we should remove this condition | 666 if (IsPipelineStopped() || state_ == kStopping) { |
653 // and add an extra assert. | |
654 if (state_ == kError || state_ == kStopped) { | |
655 return; | 667 return; |
656 } | 668 } |
657 | 669 |
670 AutoLock auto_lock(lock_); | |
671 error_ = error; | |
672 | |
658 // Notify the client that starting did not complete, if necessary. | 673 // Notify the client that starting did not complete, if necessary. |
659 if (IsPipelineInitializing() && seek_callback_.get()) { | 674 if (IsPipelineInitializing()) { |
660 seek_callback_->Run(); | 675 FinishInitialization(); |
661 } | 676 } |
662 seek_callback_.reset(); | |
663 filter_factory_ = NULL; | |
664 | 677 |
665 // We no longer need to examine our previous state, set it to stopped. | 678 StartDestroyingFilters(); |
666 state_ = kError; | |
667 | |
668 // Destroy every filter and reset the pipeline as well. | |
669 DestroyFilters(); | |
670 | |
671 // If our owner has requested to be notified of an error, execute | |
672 // |error_callback_| unless we have a "good" error. | |
673 if (error_callback_.get() && error != PIPELINE_STOPPING) { | |
674 error_callback_->Run(); | |
675 } | |
676 } | 679 } |
677 | 680 |
678 void PipelineImpl::PlaybackRateChangedTask(float playback_rate) { | 681 void PipelineImpl::PlaybackRateChangedTask(float playback_rate) { |
679 DCHECK_EQ(MessageLoop::current(), message_loop_); | 682 DCHECK_EQ(MessageLoop::current(), message_loop_); |
680 { | 683 { |
681 AutoLock auto_lock(lock_); | 684 AutoLock auto_lock(lock_); |
682 clock_.SetPlaybackRate(playback_rate); | 685 clock_.SetPlaybackRate(playback_rate); |
683 } | 686 } |
684 for (FilterVector::iterator iter = filters_.begin(); | 687 for (FilterVector::iterator iter = filters_.begin(); |
685 iter != filters_.end(); | 688 iter != filters_.end(); |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
783 iter != filters_.end(); | 786 iter != filters_.end(); |
784 ++iter) { | 787 ++iter) { |
785 (*iter)->OnAudioRendererDisabled(); | 788 (*iter)->OnAudioRendererDisabled(); |
786 } | 789 } |
787 } | 790 } |
788 | 791 |
789 void PipelineImpl::FilterStateTransitionTask() { | 792 void PipelineImpl::FilterStateTransitionTask() { |
790 DCHECK_EQ(MessageLoop::current(), message_loop_); | 793 DCHECK_EQ(MessageLoop::current(), message_loop_); |
791 | 794 |
792 // No reason transitioning if we've errored or have stopped. | 795 // No reason transitioning if we've errored or have stopped. |
793 if (state_ == kError || state_ == kStopped) { | 796 if (IsPipelineStopped()) { |
794 return; | 797 return; |
795 } | 798 } |
796 | 799 |
797 if (!StateTransitionsToStarted(state_)) { | 800 if (!TransientState(state_)) { |
798 NOTREACHED() << "Invalid current state: " << state_; | 801 NOTREACHED() << "Invalid current state: " << state_; |
799 SetError(PIPELINE_ERROR_ABORT); | 802 SetError(PIPELINE_ERROR_ABORT); |
800 return; | 803 return; |
801 } | 804 } |
802 | 805 |
803 // Decrement the number of remaining transitions, making sure to transition | 806 // Decrement the number of remaining transitions, making sure to transition |
804 // to the next state if needed. | 807 // to the next state if needed. |
805 CHECK(remaining_transitions_ <= filters_.size()); | 808 CHECK(remaining_transitions_ <= filters_.size()); |
806 CHECK(remaining_transitions_ > 0u); | 809 CHECK(remaining_transitions_ > 0u); |
807 if (--remaining_transitions_ == 0) { | 810 if (--remaining_transitions_ == 0) { |
808 state_ = FindNextState(state_); | 811 state_ = FindNextState(state_); |
809 if (state_ == kSeeking) { | 812 if (state_ == kSeeking) { |
810 AutoLock auto_lock(lock_); | 813 AutoLock auto_lock(lock_); |
811 clock_.SetTime(seek_timestamp_); | 814 clock_.SetTime(seek_timestamp_); |
812 } | 815 } |
813 | 816 |
814 if (StateTransitionsToStarted(state_)) { | 817 if (TransientState(state_)) { |
815 remaining_transitions_ = filters_.size(); | 818 remaining_transitions_ = filters_.size(); |
816 } | 819 } |
817 } | 820 } |
818 | 821 |
819 // Carry out the action for the current state. | 822 // Carry out the action for the current state. |
820 if (StateTransitionsToStarted(state_)) { | 823 if (TransientState(state_)) { |
821 MediaFilter* filter = filters_[filters_.size() - remaining_transitions_]; | 824 MediaFilter* filter = filters_[filters_.size() - remaining_transitions_]; |
822 if (state_ == kPausing) { | 825 if (state_ == kPausing) { |
823 filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); | 826 filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
824 } else if (state_ == kSeeking) { | 827 } else if (state_ == kSeeking) { |
825 filter->Seek(seek_timestamp_, | 828 filter->Seek(seek_timestamp_, |
826 NewCallback(this, &PipelineImpl::OnFilterStateTransition)); | 829 NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
827 } else if (state_ == kStarting) { | 830 } else if (state_ == kStarting) { |
828 filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); | 831 filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); |
832 } else if (state_ == kStopping) { | |
833 filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); | |
829 } else { | 834 } else { |
830 NOTREACHED(); | 835 NOTREACHED(); |
831 } | 836 } |
832 } else if (state_ == kStarted) { | 837 } else if (state_ == kStarted) { |
833 // Execute the seek callback, if present. Note that this might be the | 838 FinishInitialization(); |
834 // initial callback passed into Start(). | |
835 if (seek_callback_.get()) { | |
836 seek_callback_->Run(); | |
837 seek_callback_.reset(); | |
838 } | |
839 | 839 |
840 // Finally, reset our seeking timestamp back to zero. | 840 // Finally, reset our seeking timestamp back to zero. |
841 seek_timestamp_ = base::TimeDelta(); | 841 seek_timestamp_ = base::TimeDelta(); |
842 | 842 |
843 AutoLock auto_lock(lock_); | 843 AutoLock auto_lock(lock_); |
844 // We use audio stream to update the clock. So if there is such a stream, | 844 // We use audio stream to update the clock. So if there is such a stream, |
845 // we pause the clock until we receive a valid timestamp. | 845 // we pause the clock until we receive a valid timestamp. |
846 waiting_for_clock_update_ = | 846 waiting_for_clock_update_ = |
847 rendered_mime_types_.find(mime_type::kMajorTypeAudio) != | 847 rendered_mime_types_.find(mime_type::kMajorTypeAudio) != |
848 rendered_mime_types_.end(); | 848 rendered_mime_types_.end(); |
849 if (!waiting_for_clock_update_) | 849 if (!waiting_for_clock_update_) |
850 clock_.Play(); | 850 clock_.Play(); |
851 } else if (IsPipelineStopped()) { | |
852 FinishDestroyingFiltersTask(); | |
851 } else { | 853 } else { |
852 NOTREACHED(); | 854 NOTREACHED(); |
853 } | 855 } |
854 } | 856 } |
855 | 857 |
858 void PipelineImpl::FinishDestroyingFiltersTask() { | |
859 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
860 DCHECK(IsPipelineStopped()); | |
861 | |
862 // Stop every running filter thread. | |
863 // | |
864 // TODO(scherkus): can we watchdog this section to detect wedged threads? | |
865 for (FilterThreadVector::iterator iter = filter_threads_.begin(); | |
866 iter != filter_threads_.end(); | |
867 ++iter) { | |
868 (*iter)->Stop(); | |
869 } | |
870 | |
871 // Reset the pipeline, which will decrement a reference to this object. | |
872 // We will get destroyed as soon as the remaining tasks finish executing. | |
873 // To be safe, we'll set our pipeline reference to NULL. | |
874 filters_.clear(); | |
875 filter_types_.clear(); | |
876 STLDeleteElements(&filter_threads_); | |
877 | |
878 if (PIPELINE_OK == GetError()) { | |
879 // Destroying filters due to Stop(). | |
880 ResetState(); | |
881 | |
882 // Notify the client that stopping has finished. | |
883 if (stop_callback_.get()) { | |
884 stop_callback_->Run(); | |
885 stop_callback_.reset(); | |
886 } | |
887 } else { | |
888 // Destroying filters due to SetError(). | |
889 state_ = kError; | |
890 // If our owner has requested to be notified of an error. | |
891 if (error_callback_.get()) { | |
892 error_callback_->Run(); | |
893 } | |
894 } | |
895 } | |
896 | |
856 template <class Filter, class Source> | 897 template <class Filter, class Source> |
857 void PipelineImpl::CreateFilter(FilterFactory* filter_factory, | 898 void PipelineImpl::CreateFilter(FilterFactory* filter_factory, |
858 Source source, | 899 Source source, |
859 const MediaFormat& media_format) { | 900 const MediaFormat& media_format) { |
860 DCHECK_EQ(MessageLoop::current(), message_loop_); | 901 DCHECK_EQ(MessageLoop::current(), message_loop_); |
861 DCHECK(IsPipelineOk()); | 902 DCHECK(IsPipelineOk()); |
862 | 903 |
863 // Create the filter. | 904 // Create the filter. |
864 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); | 905 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); |
865 if (!filter) { | 906 if (!filter) { |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
958 DCHECK_EQ(MessageLoop::current(), message_loop_); | 999 DCHECK_EQ(MessageLoop::current(), message_loop_); |
959 | 1000 |
960 FilterTypeMap::const_iterator ft = filter_types_.find(Filter::filter_type()); | 1001 FilterTypeMap::const_iterator ft = filter_types_.find(Filter::filter_type()); |
961 if (ft == filter_types_.end()) { | 1002 if (ft == filter_types_.end()) { |
962 *filter_out = NULL; | 1003 *filter_out = NULL; |
963 } else { | 1004 } else { |
964 *filter_out = reinterpret_cast<Filter*>(ft->second.get()); | 1005 *filter_out = reinterpret_cast<Filter*>(ft->second.get()); |
965 } | 1006 } |
966 } | 1007 } |
967 | 1008 |
968 void PipelineImpl::DestroyFilters() { | 1009 void PipelineImpl::StartDestroyingFilters() { |
969 // Stop every filter. | 1010 DCHECK_EQ(MessageLoop::current(), message_loop_); |
970 for (FilterVector::iterator iter = filters_.begin(); | 1011 DCHECK_NE(kStopped, state_); |
971 iter != filters_.end(); | 1012 |
972 ++iter) { | 1013 if (state_ == kStopping) { |
973 (*iter)->Stop(); | 1014 return; // Do not call Stop() on filters twice. |
scherkus (not reviewing)
2010/06/02 02:13:05
nit: two spaces between ; and //
| |
974 } | 1015 } |
975 | 1016 |
976 // Crude blocking counter implementation. | 1017 remaining_transitions_ = filters_.size(); |
977 Lock lock; | 1018 if (remaining_transitions_ > 0) { |
978 ConditionVariable wait_for_zero(&lock); | 1019 state_ = kStopping; |
979 int count = filter_threads_.size(); | 1020 filters_.front()->Stop(NewCallback( |
980 | 1021 this, &PipelineImpl::OnFilterStateTransition)); |
981 // Post a task to every filter's thread to ensure that they've completed their | 1022 } else { |
982 // stopping logic before stopping the threads themselves. | 1023 state_ = kStopped; |
983 // | 1024 message_loop_->PostTask(FROM_HERE, |
984 // TODO(scherkus): again, Stop() should either be synchronous or we should | 1025 NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask)); |
985 // receive a signal from filters that they have indeed stopped. | |
986 for (FilterThreadVector::iterator iter = filter_threads_.begin(); | |
987 iter != filter_threads_.end(); | |
988 ++iter) { | |
989 (*iter)->message_loop()->PostTask(FROM_HERE, | |
990 NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); | |
991 } | 1026 } |
992 | |
993 // Wait on our "blocking counter". | |
994 { | |
995 AutoLock auto_lock(lock); | |
996 while (count > 0) { | |
997 wait_for_zero.Wait(); | |
998 } | |
999 } | |
1000 | |
1001 // Stop every running filter thread. | |
1002 // | |
1003 // TODO(scherkus): can we watchdog this section to detect wedged threads? | |
1004 for (FilterThreadVector::iterator iter = filter_threads_.begin(); | |
1005 iter != filter_threads_.end(); | |
1006 ++iter) { | |
1007 (*iter)->Stop(); | |
1008 } | |
1009 | |
1010 // Reset the pipeline, which will decrement a reference to this object. | |
1011 // We will get destroyed as soon as the remaining tasks finish executing. | |
1012 // To be safe, we'll set our pipeline reference to NULL. | |
1013 filters_.clear(); | |
1014 filter_types_.clear(); | |
1015 STLDeleteElements(&filter_threads_); | |
1016 } | 1027 } |
1017 | 1028 |
1018 } // namespace media | 1029 } // namespace media |
OLD | NEW |