Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(315)

Side by Side Diff: media/base/pipeline_impl.cc

Issue 7484054: Migrate Pipeline & PipelineImpl to PipelineStatusCB. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 9 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 // 4 //
5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names,
6 // potential deadlocks, etc... 6 // potential deadlocks, etc...
7 7
8 #include "media/base/pipeline_impl.h" 8 #include "media/base/pipeline_impl.h"
9 9
10 #include <algorithm> 10 #include <algorithm>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/callback.h" 13 #include "base/callback.h"
14 #include "base/compiler_specific.h" 14 #include "base/compiler_specific.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "base/string_util.h" 16 #include "base/string_util.h"
17 #include "base/synchronization/condition_variable.h" 17 #include "base/synchronization/condition_variable.h"
18 #include "media/base/clock.h" 18 #include "media/base/clock.h"
19 #include "media/base/filter_collection.h" 19 #include "media/base/filter_collection.h"
20 20
21 namespace media { 21 namespace media {
22 22
23 const char kRawMediaScheme[] = "x-raw-media"; 23 const char kRawMediaScheme[] = "x-raw-media";
24 24
25 PipelineStatusNotification::PipelineStatusNotification() 25 PipelineStatusNotification::PipelineStatusNotification()
26 : cv_(&lock_), status_(PIPELINE_OK), notified_(false) { 26 : cv_(&lock_), status_(PIPELINE_OK), notified_(false) {
27 callback_.reset(NewCallback(this, &PipelineStatusNotification::Notify));
28 } 27 }
29 28
30 PipelineStatusNotification::~PipelineStatusNotification() { 29 PipelineStatusNotification::~PipelineStatusNotification() {
31 DCHECK(notified_); 30 DCHECK(notified_);
32 } 31 }
33 32
34 media::PipelineStatusCallback* PipelineStatusNotification::Callback() { 33 PipelineStatusCB PipelineStatusNotification::Callback() {
35 return callback_.release(); 34 return base::Bind(&PipelineStatusNotification::Notify,
35 base::Unretained(this));
36 } 36 }
37 37
38 void PipelineStatusNotification::Notify(media::PipelineStatus status) { 38 void PipelineStatusNotification::Notify(media::PipelineStatus status) {
39 base::AutoLock auto_lock(lock_); 39 base::AutoLock auto_lock(lock_);
40 DCHECK(!notified_); 40 DCHECK(!notified_);
41 notified_ = true; 41 notified_ = true;
42 status_ = status; 42 status_ = status;
43 cv_.Signal(); 43 cv_.Signal();
44 } 44 }
45 45
(...skipping 25 matching lines...) Expand all
71 ResetState(); 71 ResetState();
72 } 72 }
73 73
74 PipelineImpl::~PipelineImpl() { 74 PipelineImpl::~PipelineImpl() {
75 base::AutoLock auto_lock(lock_); 75 base::AutoLock auto_lock(lock_);
76 DCHECK(!running_) << "Stop() must complete before destroying object"; 76 DCHECK(!running_) << "Stop() must complete before destroying object";
77 DCHECK(!stop_pending_); 77 DCHECK(!stop_pending_);
78 DCHECK(!seek_pending_); 78 DCHECK(!seek_pending_);
79 } 79 }
80 80
81 void PipelineImpl::Init(PipelineStatusCallback* ended_callback, 81 void PipelineImpl::Init(const PipelineStatusCB& ended_callback,
82 PipelineStatusCallback* error_callback, 82 const PipelineStatusCB& error_callback,
83 PipelineStatusCallback* network_callback) { 83 const PipelineStatusCB& network_callback) {
84 DCHECK(!IsRunning()) 84 DCHECK(!IsRunning())
85 << "Init() should be called before the pipeline has started"; 85 << "Init() should be called before the pipeline has started";
86 ended_callback_.reset(ended_callback); 86 ended_callback_ = ended_callback;
87 error_callback_.reset(error_callback); 87 error_callback_ = error_callback;
88 network_callback_.reset(network_callback); 88 network_callback_ = network_callback;
89 } 89 }
90 90
91 // Creates the PipelineInternal and calls it's start method. 91 // Creates the PipelineInternal and calls it's start method.
92 bool PipelineImpl::Start(FilterCollection* collection, 92 bool PipelineImpl::Start(FilterCollection* collection,
93 const std::string& url, 93 const std::string& url,
94 PipelineStatusCallback* start_callback) { 94 const PipelineStatusCB& start_callback) {
95 base::AutoLock auto_lock(lock_); 95 base::AutoLock auto_lock(lock_);
96 scoped_ptr<PipelineStatusCallback> callback(start_callback);
97 scoped_ptr<FilterCollection> filter_collection(collection); 96 scoped_ptr<FilterCollection> filter_collection(collection);
98 97
99 if (running_) { 98 if (running_) {
100 VLOG(1) << "Media pipeline is already running"; 99 VLOG(1) << "Media pipeline is already running";
101 return false; 100 return false;
102 } 101 }
103 102
104 if (collection->IsEmpty()) { 103 if (collection->IsEmpty()) {
105 return false; 104 return false;
106 } 105 }
107 106
108 // Kick off initialization! 107 // Kick off initialization!
109 running_ = true; 108 running_ = true;
110 message_loop_->PostTask( 109 message_loop_->PostTask(
111 FROM_HERE, 110 FROM_HERE,
112 NewRunnableMethod(this, 111 NewRunnableMethod(this,
113 &PipelineImpl::StartTask, 112 &PipelineImpl::StartTask,
114 filter_collection.release(), 113 filter_collection.release(),
115 url, 114 url,
116 callback.release())); 115 start_callback));
117 return true; 116 return true;
118 } 117 }
119 118
120 void PipelineImpl::Stop(PipelineStatusCallback* stop_callback) { 119 void PipelineImpl::Stop(const PipelineStatusCB& stop_callback) {
121 base::AutoLock auto_lock(lock_); 120 base::AutoLock auto_lock(lock_);
122 scoped_ptr<PipelineStatusCallback> callback(stop_callback);
123 if (!running_) { 121 if (!running_) {
124 VLOG(1) << "Media pipeline has already stopped"; 122 VLOG(1) << "Media pipeline has already stopped";
125 return; 123 return;
126 } 124 }
127 125
128 // Stop the pipeline, which will set |running_| to false on behalf. 126 // Stop the pipeline, which will set |running_| to false on behalf.
129 message_loop_->PostTask(FROM_HERE, 127 message_loop_->PostTask(FROM_HERE,
130 NewRunnableMethod(this, &PipelineImpl::StopTask, callback.release())); 128 NewRunnableMethod(this, &PipelineImpl::StopTask, stop_callback));
131 } 129 }
132 130
133 void PipelineImpl::Seek(base::TimeDelta time, 131 void PipelineImpl::Seek(base::TimeDelta time,
134 PipelineStatusCallback* seek_callback) { 132 const PipelineStatusCB& seek_callback) {
135 base::AutoLock auto_lock(lock_); 133 base::AutoLock auto_lock(lock_);
136 scoped_ptr<PipelineStatusCallback> callback(seek_callback);
137 if (!running_) { 134 if (!running_) {
138 VLOG(1) << "Media pipeline must be running"; 135 VLOG(1) << "Media pipeline must be running";
139 return; 136 return;
140 } 137 }
141 138
142 message_loop_->PostTask(FROM_HERE, 139 message_loop_->PostTask(FROM_HERE,
143 NewRunnableMethod(this, &PipelineImpl::SeekTask, time, 140 NewRunnableMethod(this, &PipelineImpl::SeekTask, time,
scherkus (not reviewing) 2011/07/23 01:51:32 nit: fit on one line?
acolwell GONE FROM CHROMIUM 2011/07/27 16:17:04 Done.
144 callback.release())); 141 seek_callback));
145 } 142 }
146 143
147 bool PipelineImpl::IsRunning() const { 144 bool PipelineImpl::IsRunning() const {
148 base::AutoLock auto_lock(lock_); 145 base::AutoLock auto_lock(lock_);
149 return running_; 146 return running_;
150 } 147 }
151 148
152 bool PipelineImpl::IsInitialized() const { 149 bool PipelineImpl::IsInitialized() const {
153 // TODO(scherkus): perhaps replace this with a bool that is set/get under the 150 // TODO(scherkus): perhaps replace this with a bool that is set/get under the
154 // lock, because this is breaching the contract that |state_| is only accessed 151 // lock, because this is breaching the contract that |state_| is only accessed
(...skipping 257 matching lines...) Expand 10 before | Expand all | Expand 10 after
412 DCHECK(kSeeking == state_ || kPausing == state_ || 409 DCHECK(kSeeking == state_ || kPausing == state_ ||
413 kFlushing == state_ || kStarting == state_) 410 kFlushing == state_ || kStarting == state_)
414 << "Current state : " << state_; 411 << "Current state : " << state_;
415 return true; 412 return true;
416 } 413 }
417 414
418 void PipelineImpl::FinishInitialization() { 415 void PipelineImpl::FinishInitialization() {
419 DCHECK_EQ(MessageLoop::current(), message_loop_); 416 DCHECK_EQ(MessageLoop::current(), message_loop_);
420 // Execute the seek callback, if present. Note that this might be the 417 // Execute the seek callback, if present. Note that this might be the
421 // initial callback passed into Start(). 418 // initial callback passed into Start().
422 if (seek_callback_.get()) { 419 if (!seek_callback_.is_null()) {
423 seek_callback_->Run(status_); 420 seek_callback_.Run(status_);
424 seek_callback_.reset(); 421 seek_callback_.Reset();
425 } 422 }
426 } 423 }
427 424
428 // static 425 // static
429 bool PipelineImpl::TransientState(State state) { 426 bool PipelineImpl::TransientState(State state) {
430 return state == kPausing || 427 return state == kPausing ||
431 state == kFlushing || 428 state == kFlushing ||
432 state == kSeeking || 429 state == kSeeking ||
433 state == kStarting || 430 state == kStarting ||
434 state == kStopping; 431 state == kStopping;
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after
599 void PipelineImpl::OnUpdateStatistics(const PipelineStatistics& stats) { 596 void PipelineImpl::OnUpdateStatistics(const PipelineStatistics& stats) {
600 base::AutoLock auto_lock(lock_); 597 base::AutoLock auto_lock(lock_);
601 statistics_.audio_bytes_decoded += stats.audio_bytes_decoded; 598 statistics_.audio_bytes_decoded += stats.audio_bytes_decoded;
602 statistics_.video_bytes_decoded += stats.video_bytes_decoded; 599 statistics_.video_bytes_decoded += stats.video_bytes_decoded;
603 statistics_.video_frames_decoded += stats.video_frames_decoded; 600 statistics_.video_frames_decoded += stats.video_frames_decoded;
604 statistics_.video_frames_dropped += stats.video_frames_dropped; 601 statistics_.video_frames_dropped += stats.video_frames_dropped;
605 } 602 }
606 603
607 void PipelineImpl::StartTask(FilterCollection* filter_collection, 604 void PipelineImpl::StartTask(FilterCollection* filter_collection,
608 const std::string& url, 605 const std::string& url,
609 PipelineStatusCallback* start_callback) { 606 const PipelineStatusCB& start_callback) {
610 DCHECK_EQ(MessageLoop::current(), message_loop_); 607 DCHECK_EQ(MessageLoop::current(), message_loop_);
611 DCHECK_EQ(kCreated, state_); 608 DCHECK_EQ(kCreated, state_);
612 filter_collection_.reset(filter_collection); 609 filter_collection_.reset(filter_collection);
613 url_ = url; 610 url_ = url;
614 seek_callback_.reset(start_callback); 611 seek_callback_ = start_callback;
615 612
616 // Kick off initialization. 613 // Kick off initialization.
617 pipeline_init_state_.reset(new PipelineInitState()); 614 pipeline_init_state_.reset(new PipelineInitState());
618 pipeline_init_state_->composite_ = new CompositeFilter(message_loop_); 615 pipeline_init_state_->composite_ = new CompositeFilter(message_loop_);
619 pipeline_init_state_->composite_->set_host(this); 616 pipeline_init_state_->composite_->set_host(this);
620 617
621 bool raw_media = (base::strncasecmp(url.c_str(), kRawMediaScheme, 618 bool raw_media = (base::strncasecmp(url.c_str(), kRawMediaScheme,
622 strlen(kRawMediaScheme)) == 0); 619 strlen(kRawMediaScheme)) == 0);
623 if (raw_media) { 620 if (raw_media) {
624 set_state(kInitVideoDecoder); 621 set_state(kInitVideoDecoder);
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
737 } 734 }
738 } 735 }
739 736
740 // This method is called as a result of the client calling Pipeline::Stop() or 737 // This method is called as a result of the client calling Pipeline::Stop() or
741 // as the result of an error condition. 738 // as the result of an error condition.
742 // We stop the filters in the reverse order. 739 // We stop the filters in the reverse order.
743 // 740 //
744 // TODO(scherkus): beware! this can get posted multiple times since we post 741 // TODO(scherkus): beware! this can get posted multiple times since we post
745 // Stop() tasks even if we've already stopped. Perhaps this should no-op for 742 // Stop() tasks even if we've already stopped. Perhaps this should no-op for
746 // additional calls, however most of this logic will be changing. 743 // additional calls, however most of this logic will be changing.
747 void PipelineImpl::StopTask(PipelineStatusCallback* stop_callback) { 744 void PipelineImpl::StopTask(const PipelineStatusCB& stop_callback) {
748 DCHECK_EQ(MessageLoop::current(), message_loop_); 745 DCHECK_EQ(MessageLoop::current(), message_loop_);
749 DCHECK(!IsPipelineStopPending()); 746 DCHECK(!IsPipelineStopPending());
750 DCHECK_NE(state_, kStopped); 747 DCHECK_NE(state_, kStopped);
751 748
752 if (state_ == kStopped) { 749 if (state_ == kStopped) {
753 // Already stopped so just run callback. 750 // Already stopped so just run callback.
754 stop_callback->Run(status_); 751 stop_callback.Run(status_);
755 delete stop_callback;
756 return; 752 return;
757 } 753 }
758 754
759 if (IsPipelineTearingDown() && error_caused_teardown_) { 755 if (IsPipelineTearingDown() && error_caused_teardown_) {
760 // If we are stopping due to SetError(), stop normally instead of 756 // If we are stopping due to SetError(), stop normally instead of
761 // going to error state and calling |error_callback_|. This converts 757 // going to error state and calling |error_callback_|. This converts
762 // the teardown in progress from an error teardown into one that acts 758 // the teardown in progress from an error teardown into one that acts
763 // like the error never occurred. 759 // like the error never occurred.
764 base::AutoLock auto_lock(lock_); 760 base::AutoLock auto_lock(lock_);
765 status_ = PIPELINE_OK; 761 status_ = PIPELINE_OK;
766 error_caused_teardown_ = false; 762 error_caused_teardown_ = false;
767 } 763 }
768 764
769 stop_callback_.reset(stop_callback); 765 stop_callback_ = stop_callback;
770 766
771 stop_pending_ = true; 767 stop_pending_ = true;
772 if (!IsPipelineSeeking() && !IsPipelineTearingDown()) { 768 if (!IsPipelineSeeking() && !IsPipelineTearingDown()) {
773 // We will tear down pipeline immediately when there is no seek operation 769 // We will tear down pipeline immediately when there is no seek operation
774 // pending and no teardown in progress. This should include the case where 770 // pending and no teardown in progress. This should include the case where
775 // we are partially initialized. 771 // we are partially initialized.
776 TearDownPipeline(); 772 TearDownPipeline();
777 } 773 }
778 } 774 }
779 775
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
833 } 829 }
834 } 830 }
835 831
836 void PipelineImpl::PreloadChangedTask(Preload preload) { 832 void PipelineImpl::PreloadChangedTask(Preload preload) {
837 DCHECK_EQ(MessageLoop::current(), message_loop_); 833 DCHECK_EQ(MessageLoop::current(), message_loop_);
838 if (demuxer_) 834 if (demuxer_)
839 demuxer_->SetPreload(preload); 835 demuxer_->SetPreload(preload);
840 } 836 }
841 837
842 void PipelineImpl::SeekTask(base::TimeDelta time, 838 void PipelineImpl::SeekTask(base::TimeDelta time,
843 PipelineStatusCallback* seek_callback) { 839 const PipelineStatusCB& seek_callback) {
844 DCHECK_EQ(MessageLoop::current(), message_loop_); 840 DCHECK_EQ(MessageLoop::current(), message_loop_);
845 DCHECK(!IsPipelineStopPending()); 841 DCHECK(!IsPipelineStopPending());
846 842
847 // Suppress seeking if we're not fully started. 843 // Suppress seeking if we're not fully started.
848 if (state_ != kStarted && state_ != kEnded) { 844 if (state_ != kStarted && state_ != kEnded) {
849 // TODO(scherkus): should we run the callback? I'm tempted to say the API 845 // TODO(scherkus): should we run the callback? I'm tempted to say the API
850 // will only execute the first Seek() request. 846 // will only execute the first Seek() request.
851 VLOG(1) << "Media pipeline has not started, ignoring seek to " 847 VLOG(1) << "Media pipeline has not started, ignoring seek to "
852 << time.InMicroseconds(); 848 << time.InMicroseconds();
853 delete seek_callback;
854 return; 849 return;
855 } 850 }
856 851
857 DCHECK(!seek_pending_); 852 DCHECK(!seek_pending_);
858 seek_pending_ = true; 853 seek_pending_ = true;
859 854
860 // We'll need to pause every filter before seeking. The state transition 855 // We'll need to pause every filter before seeking. The state transition
861 // is as follows: 856 // is as follows:
862 // kStarted/kEnded 857 // kStarted/kEnded
863 // kPausing (for each filter) 858 // kPausing (for each filter)
864 // kSeeking (for each filter) 859 // kSeeking (for each filter)
865 // kStarting (for each filter) 860 // kStarting (for each filter)
866 // kStarted 861 // kStarted
867 set_state(kPausing); 862 set_state(kPausing);
868 seek_timestamp_ = time; 863 seek_timestamp_ = time;
869 seek_callback_.reset(seek_callback); 864 seek_callback_ = seek_callback;
870 865
871 // Kick off seeking! 866 // Kick off seeking!
872 { 867 {
873 base::AutoLock auto_lock(lock_); 868 base::AutoLock auto_lock(lock_);
874 // If we are waiting for a clock update, the clock hasn't been played yet. 869 // If we are waiting for a clock update, the clock hasn't been played yet.
875 if (!waiting_for_clock_update_) 870 if (!waiting_for_clock_update_)
876 clock_->Pause(); 871 clock_->Pause();
877 } 872 }
878 pipeline_filter_->Pause( 873 pipeline_filter_->Pause(
879 NewCallback(this, &PipelineImpl::OnFilterStateTransition)); 874 NewCallback(this, &PipelineImpl::OnFilterStateTransition));
(...skipping 22 matching lines...) Expand all
902 clock_->Play(); 897 clock_->Play();
903 } 898 }
904 } 899 }
905 900
906 if (video_renderer_ && !video_renderer_->HasEnded()) { 901 if (video_renderer_ && !video_renderer_->HasEnded()) {
907 return; 902 return;
908 } 903 }
909 904
910 // Transition to ended, executing the callback if present. 905 // Transition to ended, executing the callback if present.
911 set_state(kEnded); 906 set_state(kEnded);
912 if (ended_callback_.get()) { 907 if (!ended_callback_.is_null()) {
913 ended_callback_->Run(status_); 908 ended_callback_.Run(status_);
914 } 909 }
915 } 910 }
916 911
917 void PipelineImpl::NotifyNetworkEventTask() { 912 void PipelineImpl::NotifyNetworkEventTask() {
918 DCHECK_EQ(MessageLoop::current(), message_loop_); 913 DCHECK_EQ(MessageLoop::current(), message_loop_);
919 if (network_callback_.get()) { 914 if (!network_callback_.is_null()) {
920 network_callback_->Run(status_); 915 network_callback_.Run(status_);
921 } 916 }
922 } 917 }
923 918
924 void PipelineImpl::DisableAudioRendererTask() { 919 void PipelineImpl::DisableAudioRendererTask() {
925 DCHECK_EQ(MessageLoop::current(), message_loop_); 920 DCHECK_EQ(MessageLoop::current(), message_loop_);
926 921
927 base::AutoLock auto_lock(lock_); 922 base::AutoLock auto_lock(lock_);
928 has_audio_ = false; 923 has_audio_ = false;
929 audio_disabled_ = true; 924 audio_disabled_ = true;
930 925
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
1055 DCHECK_EQ(MessageLoop::current(), message_loop_); 1050 DCHECK_EQ(MessageLoop::current(), message_loop_);
1056 DCHECK(IsPipelineStopped()); 1051 DCHECK(IsPipelineStopped());
1057 1052
1058 // Clear filter references. 1053 // Clear filter references.
1059 audio_renderer_ = NULL; 1054 audio_renderer_ = NULL;
1060 video_renderer_ = NULL; 1055 video_renderer_ = NULL;
1061 demuxer_ = NULL; 1056 demuxer_ = NULL;
1062 1057
1063 pipeline_filter_ = NULL; 1058 pipeline_filter_ = NULL;
1064 1059
1065 if (error_caused_teardown_ && !IsPipelineOk() && error_callback_.get()) 1060 if (error_caused_teardown_ && !IsPipelineOk() && !error_callback_.is_null())
1066 error_callback_->Run(status_); 1061 error_callback_.Run(status_);
1067 1062
1068 if (stop_pending_) { 1063 if (stop_pending_) {
1069 stop_pending_ = false; 1064 stop_pending_ = false;
1070 ResetState(); 1065 ResetState();
1071 scoped_ptr<PipelineStatusCallback> stop_callback(stop_callback_.release()); 1066 PipelineStatusCB stop_cb;
1067 std::swap(stop_cb, stop_callback_);
1072 // Notify the client that stopping has finished. 1068 // Notify the client that stopping has finished.
1073 if (stop_callback.get()) { 1069 if (!stop_cb.is_null()) {
1074 stop_callback->Run(status_); 1070 stop_cb.Run(status_);
1075 } 1071 }
1076 } 1072 }
1077 1073
1078 tearing_down_ = false; 1074 tearing_down_ = false;
1079 error_caused_teardown_ = false; 1075 error_caused_teardown_ = false;
1080 } 1076 }
1081 1077
1082 bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) { 1078 bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) {
1083 bool ret = pipeline_init_state_->composite_->AddFilter(filter.get()); 1079 bool ret = pipeline_init_state_->composite_->AddFilter(filter.get());
1084 1080
(...skipping 218 matching lines...) Expand 10 before | Expand all | Expand 10 after
1303 case kStopping: 1299 case kStopping:
1304 case kStopped: 1300 case kStopped:
1305 NOTREACHED() << "Unexpected state for teardown: " << state_; 1301 NOTREACHED() << "Unexpected state for teardown: " << state_;
1306 break; 1302 break;
1307 // default: intentionally left out to force new states to cause compiler 1303 // default: intentionally left out to force new states to cause compiler
1308 // errors. 1304 // errors.
1309 }; 1305 };
1310 } 1306 }
1311 1307
1312 } // namespace media 1308 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698