Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(820)

Side by Side Diff: media/base/pipeline_impl.cc

Issue 155396: Revert "Implemented injected message loops for PipelineImpl." (Closed)
Patch Set: Created 11 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « media/base/pipeline_impl.h ('k') | media/base/pipeline_impl_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 // 4 //
5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names,
6 // potential deadlocks, etc... 6 // potential deadlocks, etc...
7 7
8 #include "base/compiler_specific.h" 8 #include "base/compiler_specific.h"
9 #include "base/condition_variable.h" 9 #include "base/condition_variable.h"
10 #include "base/stl_util-inl.h" 10 #include "base/stl_util-inl.h"
(...skipping 13 matching lines...) Expand all
24 switch (Filter::filter_type()) { 24 switch (Filter::filter_type()) {
25 case FILTER_DEMUXER: 25 case FILTER_DEMUXER:
26 case FILTER_AUDIO_DECODER: 26 case FILTER_AUDIO_DECODER:
27 case FILTER_VIDEO_DECODER: 27 case FILTER_VIDEO_DECODER:
28 return true; 28 return true;
29 default: 29 default:
30 return false; 30 return false;
31 } 31 }
32 } 32 }
33 33
34 // Small helper function to help us name filter threads for debugging.
35 //
36 // TODO(scherkus): figure out a cleaner way to derive the filter thread name.
37 template <class Filter>
38 const char* GetThreadName() {
39 DCHECK(SupportsSetMessageLoop<Filter>());
40 switch (Filter::filter_type()) {
41 case FILTER_DEMUXER:
42 return "DemuxerThread";
43 case FILTER_AUDIO_DECODER:
44 return "AudioDecoderThread";
45 case FILTER_VIDEO_DECODER:
46 return "VideoDecoderThread";
47 default:
48 return "FilterThread";
49 }
50 }
51
52 // Helper function used with NewRunnableMethod to implement a (very) crude 34 // Helper function used with NewRunnableMethod to implement a (very) crude
53 // blocking counter. 35 // blocking counter.
54 // 36 //
55 // TODO(scherkus): remove this as soon as Stop() is made asynchronous. 37 // TODO(scherkus): remove this as soon as Stop() is made asynchronous.
56 void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { 38 void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
57 AutoLock auto_lock(*lock); 39 AutoLock auto_lock(*lock);
58 --(*count); 40 --(*count);
59 CHECK(*count >= 0); 41 CHECK(*count >= 0);
60 if (*count == 0) { 42 if (*count == 0) {
61 cond_var->Signal(); 43 cond_var->Signal();
62 } 44 }
63 } 45 }
64 46
65 } // namespace 47 } // namespace
66 48
67 PipelineImpl::PipelineImpl(MessageLoop* message_loop) 49 PipelineImpl::PipelineImpl() {
68 : message_loop_(message_loop) {
69 ResetState(); 50 ResetState();
70 } 51 }
71 52
72 PipelineImpl::~PipelineImpl() { 53 PipelineImpl::~PipelineImpl() {
73 DCHECK(!pipeline_internal_) 54 Stop();
74 << "Stop() must complete before destroying object";
75 } 55 }
76 56
77 // Creates the PipelineInternal and calls it's start method. 57 // Creates the PipelineThread and calls it's start method.
78 bool PipelineImpl::Start(FilterFactory* factory, 58 bool PipelineImpl::Start(FilterFactory* factory,
79 const std::string& url, 59 const std::string& url,
80 PipelineCallback* start_callback) { 60 PipelineCallback* init_complete_callback) {
81 DCHECK(!pipeline_internal_); 61 DCHECK(!pipeline_thread_);
82 DCHECK(factory); 62 DCHECK(factory);
83 if (pipeline_internal_ || !factory) { 63 DCHECK(!initialized_);
84 return false; 64 DCHECK(!IsPipelineThread());
65 if (!pipeline_thread_ && factory) {
66 pipeline_thread_ = new PipelineThread(this);
67 if (pipeline_thread_) {
68 // TODO(ralphl): Does the callback get copied by these fancy templates?
69 // if so, then do I want to always delete it here???
70 if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
71 return true;
72 }
73 pipeline_thread_ = NULL; // Releases reference to destroy thread
74 }
85 } 75 }
86 76 delete init_complete_callback;
87 // Create and start the PipelineInternal. 77 return false;
88 pipeline_internal_ = new PipelineInternal(this, message_loop_);
89 if (!pipeline_internal_) {
90 NOTREACHED() << "Could not create PipelineInternal";
91 return false;
92 }
93 pipeline_internal_->Start(factory, url, start_callback);
94 return true;
95 } 78 }
96 79
97 // Stop the PipelineInternal who will NULL our reference to it and reset our 80 // Stop the PipelineThread and return to a state identical to that of a newly
98 // state to a newly created PipelineImpl object. 81 // created PipelineImpl object.
99 void PipelineImpl::Stop(PipelineCallback* stop_callback) { 82 void PipelineImpl::Stop() {
100 if (pipeline_internal_) { 83 DCHECK(!IsPipelineThread());
101 pipeline_internal_->Stop(stop_callback); 84
85 if (pipeline_thread_) {
86 pipeline_thread_->Stop();
102 } 87 }
88 ResetState();
103 } 89 }
104 90
105 void PipelineImpl::Seek(base::TimeDelta time, 91 void PipelineImpl::Seek(base::TimeDelta time,
106 PipelineCallback* seek_callback) { 92 PipelineCallback* seek_callback) {
93 DCHECK(!IsPipelineThread());
94
107 if (IsPipelineOk()) { 95 if (IsPipelineOk()) {
108 pipeline_internal_->Seek(time, seek_callback); 96 pipeline_thread_->Seek(time, seek_callback);
109 } else { 97 } else {
110 NOTREACHED(); 98 NOTREACHED();
111 } 99 }
112 } 100 }
113 101
114 bool PipelineImpl::IsRunning() const {
115 AutoLock auto_lock(const_cast<Lock&>(lock_));
116 return pipeline_internal_ != NULL;
117 }
118
119 bool PipelineImpl::IsInitialized() const { 102 bool PipelineImpl::IsInitialized() const {
120 AutoLock auto_lock(lock_); 103 AutoLock auto_lock(lock_);
121 return pipeline_internal_ && pipeline_internal_->IsInitialized(); 104 return initialized_;
122 } 105 }
123 106
124 bool PipelineImpl::IsRendered(const std::string& major_mime_type) const { 107 bool PipelineImpl::IsRendered(const std::string& major_mime_type) const {
125 AutoLock auto_lock(lock_); 108 AutoLock auto_lock(lock_);
126 bool is_rendered = (rendered_mime_types_.find(major_mime_type) != 109 bool is_rendered = (rendered_mime_types_.find(major_mime_type) !=
127 rendered_mime_types_.end()); 110 rendered_mime_types_.end());
128 return is_rendered; 111 return is_rendered;
129 } 112 }
130 113
131 float PipelineImpl::GetPlaybackRate() const { 114 float PipelineImpl::GetPlaybackRate() const {
132 AutoLock auto_lock(lock_); 115 AutoLock auto_lock(lock_);
133 return playback_rate_; 116 return playback_rate_;
134 } 117 }
135 118
136 void PipelineImpl::SetPlaybackRate(float rate) { 119 void PipelineImpl::SetPlaybackRate(float rate) {
120 DCHECK(!IsPipelineThread());
121
137 if (IsPipelineOk() && rate >= 0.0f) { 122 if (IsPipelineOk() && rate >= 0.0f) {
138 pipeline_internal_->SetPlaybackRate(rate); 123 pipeline_thread_->SetPlaybackRate(rate);
139 } else { 124 } else {
140 // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped. 125 // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped.
141 DCHECK(rate == 0.0f && playback_rate_ == 0.0f); 126 DCHECK(rate == 0.0f && playback_rate_ == 0.0f);
142 } 127 }
143 } 128 }
144 129
145 float PipelineImpl::GetVolume() const { 130 float PipelineImpl::GetVolume() const {
146 AutoLock auto_lock(lock_); 131 AutoLock auto_lock(lock_);
147 return volume_; 132 return volume_;
148 } 133 }
149 134
150 void PipelineImpl::SetVolume(float volume) { 135 void PipelineImpl::SetVolume(float volume) {
136 DCHECK(!IsPipelineThread());
137
151 if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) { 138 if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) {
152 pipeline_internal_->SetVolume(volume); 139 pipeline_thread_->SetVolume(volume);
153 } else { 140 } else {
154 NOTREACHED(); 141 NOTREACHED();
155 } 142 }
156 } 143 }
157 144
158 base::TimeDelta PipelineImpl::GetTime() const { 145 base::TimeDelta PipelineImpl::GetTime() const {
159 AutoLock auto_lock(lock_); 146 AutoLock auto_lock(lock_);
160 return time_; 147 return time_;
161 } 148 }
162 149
(...skipping 25 matching lines...) Expand all
188 *height_out = video_height_; 175 *height_out = video_height_;
189 } 176 }
190 177
191 PipelineError PipelineImpl::GetError() const { 178 PipelineError PipelineImpl::GetError() const {
192 AutoLock auto_lock(lock_); 179 AutoLock auto_lock(lock_);
193 return error_; 180 return error_;
194 } 181 }
195 182
196 void PipelineImpl::ResetState() { 183 void PipelineImpl::ResetState() {
197 AutoLock auto_lock(lock_); 184 AutoLock auto_lock(lock_);
198 pipeline_internal_ = NULL; 185 pipeline_thread_ = NULL;
186 initialized_ = false;
199 duration_ = base::TimeDelta(); 187 duration_ = base::TimeDelta();
200 buffered_time_ = base::TimeDelta(); 188 buffered_time_ = base::TimeDelta();
201 buffered_bytes_ = 0; 189 buffered_bytes_ = 0;
202 total_bytes_ = 0; 190 total_bytes_ = 0;
203 video_width_ = 0; 191 video_width_ = 0;
204 video_height_ = 0; 192 video_height_ = 0;
205 volume_ = 0.0f; 193 volume_ = 0.0f;
206 playback_rate_ = 0.0f; 194 playback_rate_ = 0.0f;
207 error_ = PIPELINE_OK; 195 error_ = PIPELINE_OK;
208 time_ = base::TimeDelta(); 196 time_ = base::TimeDelta();
209 rendered_mime_types_.clear(); 197 rendered_mime_types_.clear();
210 } 198 }
211 199
212 bool PipelineImpl::IsPipelineOk() const { 200 bool PipelineImpl::IsPipelineOk() const {
213 return pipeline_internal_ && PIPELINE_OK == error_; 201 return pipeline_thread_ && initialized_ && PIPELINE_OK == error_;
202 }
203
204 bool PipelineImpl::IsPipelineThread() const {
205 return pipeline_thread_ &&
206 PlatformThread::CurrentId() == pipeline_thread_->thread_id();
214 } 207 }
215 208
216 void PipelineImpl::SetDuration(base::TimeDelta duration) { 209 void PipelineImpl::SetDuration(base::TimeDelta duration) {
217 AutoLock auto_lock(lock_); 210 AutoLock auto_lock(lock_);
218 duration_ = duration; 211 duration_ = duration;
219 } 212 }
220 213
221 void PipelineImpl::SetBufferedTime(base::TimeDelta buffered_time) { 214 void PipelineImpl::SetBufferedTime(base::TimeDelta buffered_time) {
222 AutoLock auto_lock(lock_); 215 AutoLock auto_lock(lock_);
223 buffered_time_ = buffered_time; 216 buffered_time_ = buffered_time;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
263 } 256 }
264 257
265 void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) { 258 void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) {
266 AutoLock auto_lock(lock_); 259 AutoLock auto_lock(lock_);
267 rendered_mime_types_.insert(major_mime_type); 260 rendered_mime_types_.insert(major_mime_type);
268 } 261 }
269 262
270 263
271 //----------------------------------------------------------------------------- 264 //-----------------------------------------------------------------------------
272 265
273 PipelineInternal::PipelineInternal(PipelineImpl* pipeline, 266 PipelineThread::PipelineThread(PipelineImpl* pipeline)
274 MessageLoop* message_loop)
275 : pipeline_(pipeline), 267 : pipeline_(pipeline),
276 message_loop_(message_loop), 268 thread_("PipelineThread"),
277 state_(kCreated) { 269 state_(kCreated) {
278 } 270 }
279 271
280 PipelineInternal::~PipelineInternal() { 272 PipelineThread::~PipelineThread() {
281 DCHECK(state_ == kCreated || state_ == kStopped); 273 Stop();
274 DCHECK(state_ == kStopped || state_ == kError);
275 }
276
277 // This method is called on the client's thread. It starts the pipeline's
278 // dedicated thread and posts a task to call the StartTask() method on that
279 // thread.
280 bool PipelineThread::Start(FilterFactory* filter_factory,
281 const std::string& url,
282 PipelineCallback* init_complete_callback) {
283 DCHECK_EQ(kCreated, state_);
284 if (thread_.Start()) {
285 filter_factory_ = filter_factory;
286 url_ = url;
287 init_callback_.reset(init_complete_callback);
288 message_loop()->PostTask(FROM_HERE,
289 NewRunnableMethod(this, &PipelineThread::StartTask));
290 return true;
291 }
292 return false;
293 }
294
295 // Called on the client's thread. If the thread has been started, then posts
296 // a task to call the StopTask() method, then waits until the thread has
297 // stopped.
298 void PipelineThread::Stop() {
299 if (thread_.IsRunning()) {
300 message_loop()->PostTask(FROM_HERE,
301 NewRunnableMethod(this, &PipelineThread::StopTask));
302 thread_.Stop();
303 }
304 DCHECK(filter_hosts_.empty());
305 DCHECK(filter_threads_.empty());
282 } 306 }
283 307
284 // Called on client's thread. 308 // Called on client's thread.
285 void PipelineInternal::Start(FilterFactory* filter_factory, 309 void PipelineThread::Seek(base::TimeDelta time,
286 const std::string& url, 310 PipelineCallback* seek_callback) {
287 PipelineCallback* start_callback) { 311 message_loop()->PostTask(FROM_HERE,
288 DCHECK(filter_factory); 312 NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback));
289 message_loop_->PostTask(FROM_HERE,
290 NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url,
291 start_callback));
292 } 313 }
293 314
294 // Called on client's thread. 315 // Called on client's thread.
295 void PipelineInternal::Stop(PipelineCallback* stop_callback) { 316 void PipelineThread::SetPlaybackRate(float rate) {
296 message_loop_->PostTask(FROM_HERE, 317 message_loop()->PostTask(FROM_HERE,
297 NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback)); 318 NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
298 } 319 }
299 320
300 // Called on client's thread. 321 // Called on client's thread.
301 void PipelineInternal::Seek(base::TimeDelta time, 322 void PipelineThread::SetVolume(float volume) {
302 PipelineCallback* seek_callback) { 323 message_loop()->PostTask(FROM_HERE,
303 message_loop_->PostTask(FROM_HERE, 324 NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
304 NewRunnableMethod(this, &PipelineInternal::SeekTask, time,
305 seek_callback));
306 }
307
308 // Called on client's thread.
309 void PipelineInternal::SetPlaybackRate(float rate) {
310 message_loop_->PostTask(FROM_HERE,
311 NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate));
312 }
313
314 // Called on client's thread.
315 void PipelineInternal::SetVolume(float volume) {
316 message_loop_->PostTask(FROM_HERE,
317 NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume));
318 } 325 }
319 326
320 // Called from any thread. 327 // Called from any thread.
321 void PipelineInternal::InitializationComplete(FilterHostImpl* host) { 328 void PipelineThread::InitializationComplete(FilterHostImpl* host) {
322 if (IsPipelineOk()) { 329 if (IsPipelineOk()) {
323 // Continue the initialize task by proceeding to the next stage. 330 // Continue the start task by proceeding to the next stage.
324 message_loop_->PostTask(FROM_HERE, 331 message_loop()->PostTask(FROM_HERE,
325 NewRunnableMethod(this, &PipelineInternal::InitializeTask)); 332 NewRunnableMethod(this, &PipelineThread::StartTask));
326 } 333 }
327 } 334 }
328 335
329 // Called from any thread. Updates the pipeline time. 336 // Called from any thread. Updates the pipeline time.
330 void PipelineInternal::SetTime(base::TimeDelta time) { 337 void PipelineThread::SetTime(base::TimeDelta time) {
331 // TODO(scherkus): why not post a task? 338 pipeline()->SetTime(time);
332 pipeline_->SetTime(time);
333 } 339 }
334 340
335 // Called from any thread. Sets the pipeline |error_| member and destroys all 341 // Called from any thread. Sets the pipeline |error_| member and schedules a
336 // filters. 342 // task to stop all the filters in the pipeline. Note that the thread will
337 void PipelineInternal::Error(PipelineError error) { 343 // continue to run until the client calls Pipeline::Stop(), but nothing will
338 message_loop_->PostTask(FROM_HERE, 344 // be processed since filters will not be able to post tasks.
339 NewRunnableMethod(this, &PipelineInternal::ErrorTask, error)); 345 void PipelineThread::Error(PipelineError error) {
346 // If this method returns false, then an error has already happened, so no
347 // reason to run the StopTask again. It's going to happen.
348 if (pipeline()->InternalSetError(error)) {
349 message_loop()->PostTask(FROM_HERE,
350 NewRunnableMethod(this, &PipelineThread::StopTask));
351 }
340 } 352 }
341 353
342 void PipelineInternal::StartTask(FilterFactory* filter_factory, 354 // Called as a result of destruction of the thread.
343 const std::string& url, 355 //
344 PipelineCallback* start_callback) { 356 // TODO(scherkus): this can block the client due to synchronous Stop() API call.
345 DCHECK_EQ(MessageLoop::current(), message_loop_); 357 void PipelineThread::WillDestroyCurrentMessageLoop() {
346 DCHECK_EQ(kCreated, state_); 358 STLDeleteElements(&filter_hosts_);
347 filter_factory_ = filter_factory; 359 STLDeleteElements(&filter_threads_);
348 url_ = url;
349 start_callback_.reset(start_callback);
350
351 // Kick off initialization.
352 InitializeTask();
353 } 360 }
354 361
355 // Main initialization method called on the pipeline thread. This code attempts 362 // Main initialization method called on the pipeline thread. This code attempts
356 // to use the specified filter factory to build a pipeline. 363 // to use the specified filter factory to build a pipeline.
357 // Initialization step performed in this method depends on current state of this 364 // Initialization step performed in this method depends on current state of this
358 // object, indicated by |state_|. After each step of initialization, this 365 // object, indicated by |state_|. After each step of initialization, this
359 // object transits to the next stage. It starts by creating a DataSource, 366 // object transits to the next stage. It starts by creating a DataSource,
360 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an 367 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an
361 // AudioDecoder which is then connected to an AudioRenderer. If the media has 368 // AudioDecoder which is then connected to an AudioRenderer. If the media has
362 // video, then it connects a VideoDecoder to the Demuxer's video stream, and 369 // video, then it connects a VideoDecoder to the Demuxer's video stream, and
363 // then connects the VideoDecoder to a VideoRenderer. 370 // then connects the VideoDecoder to a VideoRenderer.
364 // 371 //
365 // When all required filters have been created and have called their 372 // When all required filters have been created and have called their
366 // FilterHost's InitializationComplete() method, the pipeline will update its 373 // FilterHost's InitializationComplete method, the pipeline's |initialized_|
367 // state to kStarted and |init_callback_|, will be executed. 374 // member is set to true, and, if the client provided an
375 // |init_complete_callback_|, it is called with "true".
368 // 376 //
369 // If initialization fails, the client's callback will still be called, but 377 // If initialization fails, the client's callback will still be called, but
370 // the bool parameter passed to it will be false. 378 // the bool parameter passed to it will be false.
371 // 379 //
372 // TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It 380 // TODO(hclam): StartTask is now starting the pipeline asynchronously. It
373 // works like a big state change table. If we no longer need to start filters 381 // works like a big state change table. If we no longer need to start filters
374 // in order, we need to get rid of all the state change. 382 // in order, we need to get rid of all the state change.
375 void PipelineInternal::InitializeTask() { 383 void PipelineThread::StartTask() {
376 DCHECK_EQ(MessageLoop::current(), message_loop_); 384 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
377 385
378 // If we have received the stop signal, return immediately. 386 // If we have received the stop signal, return immediately.
379 if (state_ == kStopped) 387 if (state_ == kStopped)
380 return; 388 return;
381 389
382 DCHECK(state_ == kCreated || IsPipelineInitializing()); 390 DCHECK(state_ == kCreated || IsPipelineInitializing());
383 391
384 // Just created, create data source. 392 // Just created, create data source.
385 if (state_ == kCreated) { 393 if (state_ == kCreated) {
394 message_loop()->AddDestructionObserver(this);
386 state_ = kInitDataSource; 395 state_ = kInitDataSource;
387 CreateDataSource(); 396 CreateDataSource();
388 return; 397 return;
389 } 398 }
390 399
391 // Data source created, create demuxer. 400 // Data source created, create demuxer.
392 if (state_ == kInitDataSource) { 401 if (state_ == kInitDataSource) {
393 state_ = kInitDemuxer; 402 state_ = kInitDemuxer;
394 CreateDemuxer(); 403 CreateDemuxer();
395 return; 404 return;
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
430 } 439 }
431 } 440 }
432 441
433 if (state_ == kInitVideoRenderer) { 442 if (state_ == kInitVideoRenderer) {
434 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { 443 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) {
435 Error(PIPELINE_ERROR_COULD_NOT_RENDER); 444 Error(PIPELINE_ERROR_COULD_NOT_RENDER);
436 return; 445 return;
437 } 446 }
438 447
439 state_ = kStarted; 448 state_ = kStarted;
449 pipeline_->initialized_ = true;
440 filter_factory_ = NULL; 450 filter_factory_ = NULL;
441 if (start_callback_.get()) { 451 if (init_callback_.get()) {
442 start_callback_->Run(true); 452 init_callback_->Run(true);
443 start_callback_.reset(); 453 init_callback_.reset();
444 } 454 }
445 } 455 }
446 } 456 }
447 457
448 // This method is called as a result of the client calling Pipeline::Stop() or 458 // This method is called as a result of the client calling Pipeline::Stop() or
449 // as the result of an error condition. If there is no error, then set the 459 // as the result of an error condition. If there is no error, then set the
450 // pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the 460 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
451 // reverse order. 461 // reverse order.
452 // 462 //
453 // TODO(scherkus): beware! this can get posted multiple times since we post 463 // TODO(scherkus): beware! this can get posted multiple times since we post
454 // Stop() tasks even if we've already stopped. Perhaps this should no-op for 464 // Stop() tasks even if we've already stopped. Perhaps this should no-op for
455 // additional calls, however most of this logic will be changing. 465 // additional calls, however most of this logic will be changing.
456 void PipelineInternal::StopTask(PipelineCallback* stop_callback) { 466 void PipelineThread::StopTask() {
457 DCHECK_EQ(MessageLoop::current(), message_loop_); 467 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
458 stop_callback_.reset(stop_callback);
459 468
460 // If we've already stopped, return immediately. 469 if (IsPipelineInitializing()) {
461 if (state_ == kStopped) { 470 // If IsPipelineOk() is true, the pipeline was simply stopped during
462 return; 471 // initialization. Otherwise it is a failure.
472 state_ = IsPipelineOk() ? kStopped : kError;
473 filter_factory_ = NULL;
474 if (init_callback_.get()) {
475 init_callback_->Run(false);
476 init_callback_.reset();
477 }
478 } else {
479 state_ = kStopped;
463 } 480 }
464 481
465 // Carry out setting the error, notifying the client and destroying filters. 482 if (IsPipelineOk()) {
466 ErrorTask(PIPELINE_STOPPING); 483 pipeline_->error_ = PIPELINE_STOPPING;
484 }
467 485
468 // We no longer need to examine our previous state, set it to stopped. 486 // Stop every filter.
469 state_ = kStopped; 487 for (FilterHostVector::iterator iter = filter_hosts_.begin();
488 iter != filter_hosts_.end();
489 ++iter) {
490 (*iter)->Stop();
491 }
470 492
471 // Reset the pipeline and set our reference to NULL so we don't accidentally 493 // Figure out how many threads we have to stop.
472 // modify the pipeline. Once remaining tasks execute we will be destroyed. 494 //
473 pipeline_->ResetState(); 495 // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
474 pipeline_ = NULL; 496 FilterThreadVector running_threads;
497 for (FilterThreadVector::iterator iter = filter_threads_.begin();
498 iter != filter_threads_.end();
499 ++iter) {
500 if ((*iter)->IsRunning()) {
501 running_threads.push_back(*iter);
502 }
503 }
475 504
476 // Notify the client that stopping has finished. 505 // Crude blocking counter implementation.
477 if (stop_callback_.get()) { 506 Lock lock;
478 stop_callback_->Run(true); 507 ConditionVariable wait_for_zero(&lock);
479 stop_callback_.reset(); 508 int count = running_threads.size();
509
510 // Post a task to every filter's thread to ensure that they've completed their
511 // stopping logic before stopping the threads themselves.
512 //
513 // TODO(scherkus): again, Stop() should either be synchronous or we should
514 // receive a signal from filters that they have indeed stopped.
515 for (FilterThreadVector::iterator iter = running_threads.begin();
516 iter != running_threads.end();
517 ++iter) {
518 (*iter)->message_loop()->PostTask(FROM_HERE,
519 NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
520 }
521
522 // Wait on our "blocking counter".
523 {
524 AutoLock auto_lock(lock);
525 while (count > 0) {
526 wait_for_zero.Wait();
527 }
528 }
529
530 // Stop every running filter thread.
531 //
532 // TODO(scherkus): can we watchdog this section to detect wedged threads?
533 for (FilterThreadVector::iterator iter = running_threads.begin();
534 iter != running_threads.end();
535 ++iter) {
536 (*iter)->Stop();
480 } 537 }
481 } 538 }
482 539
483 void PipelineInternal::ErrorTask(PipelineError error) { 540 void PipelineThread::SetPlaybackRateTask(float rate) {
484 DCHECK_EQ(MessageLoop::current(), message_loop_); 541 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
485 DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
486
487 // Suppress executing additional error logic.
488 if (state_ == kError) {
489 return;
490 }
491
492 // Update our error code first in case we execute the start callback.
493 pipeline_->error_ = error;
494
495 // Notify the client that starting did not complete, if necessary.
496 if (IsPipelineInitializing() && start_callback_.get()) {
497 start_callback_->Run(false);
498 }
499 start_callback_.reset();
500 filter_factory_ = NULL;
501
502 // We no longer need to examine our previous state, set it to stopped.
503 state_ = kError;
504
505 // Destroy every filter and reset the pipeline as well.
506 DestroyFilters();
507 }
508
509 void PipelineInternal::SetPlaybackRateTask(float rate) {
510 DCHECK_EQ(MessageLoop::current(), message_loop_);
511 542
512 pipeline_->InternalSetPlaybackRate(rate); 543 pipeline_->InternalSetPlaybackRate(rate);
513 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 544 for (FilterHostVector::iterator iter = filter_hosts_.begin();
514 iter != filter_hosts_.end(); 545 iter != filter_hosts_.end();
515 ++iter) { 546 ++iter) {
516 (*iter)->media_filter()->SetPlaybackRate(rate); 547 (*iter)->media_filter()->SetPlaybackRate(rate);
517 } 548 }
518 } 549 }
519 550
520 void PipelineInternal::SeekTask(base::TimeDelta time, 551 void PipelineThread::SeekTask(base::TimeDelta time,
521 PipelineCallback* seek_callback) { 552 PipelineCallback* seek_callback) {
522 DCHECK_EQ(MessageLoop::current(), message_loop_); 553 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
523 seek_callback_.reset(seek_callback);
524 554
525 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 555 for (FilterHostVector::iterator iter = filter_hosts_.begin();
526 iter != filter_hosts_.end(); 556 iter != filter_hosts_.end();
527 ++iter) { 557 ++iter) {
528 (*iter)->media_filter()->Seek(time); 558 (*iter)->media_filter()->Seek(time);
529 } 559 }
530 560
531 // TODO(hclam): we should set the time when the above seek operations were all 561 // TODO(hclam): we should set the time when the above seek operations were all
532 // successful and first frame/packet at the desired time is decoded. I'm 562 // successful and first frame/packet at the desired time is decoded. I'm
533 // setting the time here because once we do the callback the user can ask for 563 // setting the time here because once we do the callback the user can ask for
534 // current time immediately, which is the old time. In order to get rid this 564 // current time immediately, which is the old time. In order to get rid this
535 // little glitch, we either assume the seek was successful and time is updated 565 // little glitch, we either assume the seek was successful and time is updated
536 // immediately here or we set time and do callback when we have new 566 // immediately here or we set time and do callback when we have new
537 // frames/packets. 567 // frames/packets.
538 SetTime(time); 568 SetTime(time);
539 if (seek_callback_.get()) { 569 if (seek_callback) {
540 seek_callback_->Run(true); 570 seek_callback->Run(true);
541 seek_callback_.reset(); 571 delete seek_callback;
542 } 572 }
543 } 573 }
544 574
545 void PipelineInternal::SetVolumeTask(float volume) { 575 void PipelineThread::SetVolumeTask(float volume) {
546 DCHECK_EQ(MessageLoop::current(), message_loop_); 576 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
547 577
548 pipeline_->volume_ = volume; 578 pipeline_->volume_ = volume;
549 scoped_refptr<AudioRenderer> audio_renderer; 579 scoped_refptr<AudioRenderer> audio_renderer;
550 GetFilter(&audio_renderer); 580 GetFilter(&audio_renderer);
551 if (audio_renderer) { 581 if (audio_renderer) {
552 audio_renderer->SetVolume(volume); 582 audio_renderer->SetVolume(volume);
553 } 583 }
554 } 584 }
555 585
556 template <class Filter, class Source> 586 template <class Filter, class Source>
557 void PipelineInternal::CreateFilter(FilterFactory* filter_factory, 587 void PipelineThread::CreateFilter(FilterFactory* filter_factory,
558 Source source, 588 Source source,
559 const MediaFormat& media_format) { 589 const MediaFormat& media_format) {
560 DCHECK_EQ(MessageLoop::current(), message_loop_); 590 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
561 DCHECK(IsPipelineOk()); 591 DCHECK(IsPipelineOk());
562 592
563 // Create the filter.
564 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); 593 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
565 if (!filter) { 594 if (!filter) {
566 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); 595 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
567 return; 596 } else {
568 } 597 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
569 598 // Create a dedicated thread for this filter.
570 // Create a dedicated thread for this filter if applicable. 599 if (SupportsSetMessageLoop<Filter>()) {
571 if (SupportsSetMessageLoop<Filter>()) { 600 // TODO(scherkus): figure out a way to name these threads so it matches
572 scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>())); 601 // the filter type.
573 if (!thread.get() || !thread->Start()) { 602 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
574 NOTREACHED() << "Could not start filter thread"; 603 if (!thread.get() || !thread->Start()) {
575 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); 604 NOTREACHED() << "Could not start filter thread";
576 return; 605 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
606 } else {
607 filter->set_message_loop(thread->message_loop());
608 filter_threads_.push_back(thread.release());
609 }
577 } 610 }
578 611
579 filter->set_message_loop(thread->message_loop()); 612 // Creating a thread could have failed, verify we're still OK.
580 filter_threads_.push_back(thread.release()); 613 if (IsPipelineOk()) {
581 } 614 filter_hosts_.push_back(host.get());
582 615 filter->set_host(host.release());
583 // Create the filter's host. 616 if (!filter->Initialize(source)) {
584 DCHECK(IsPipelineOk()); 617 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
585 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); 618 }
586 filter->set_host(host.get()); 619 }
587 filter_hosts_.push_back(host.release());
588
589 // Now initialize the filter.
590 if (!filter->Initialize(source)) {
591 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
592 } 620 }
593 } 621 }
594 622
595 void PipelineInternal::CreateDataSource() { 623 void PipelineThread::CreateDataSource() {
596 DCHECK_EQ(MessageLoop::current(), message_loop_); 624 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
597 DCHECK(IsPipelineOk()); 625 DCHECK(IsPipelineOk());
598 626
599 MediaFormat url_format; 627 MediaFormat url_format;
600 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); 628 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
601 url_format.SetAsString(MediaFormat::kURL, url_); 629 url_format.SetAsString(MediaFormat::kURL, url_);
602 CreateFilter<DataSource>(filter_factory_, url_, url_format); 630 CreateFilter<DataSource>(filter_factory_, url_, url_format);
603 } 631 }
604 632
605 void PipelineInternal::CreateDemuxer() { 633 void PipelineThread::CreateDemuxer() {
606 DCHECK_EQ(MessageLoop::current(), message_loop_); 634 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
607 DCHECK(IsPipelineOk()); 635 DCHECK(IsPipelineOk());
608 636
609 scoped_refptr<DataSource> data_source; 637 scoped_refptr<DataSource> data_source;
610 GetFilter(&data_source); 638 GetFilter(&data_source);
611 DCHECK(data_source); 639 DCHECK(data_source);
612 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); 640 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source);
613 } 641 }
614 642
615 template <class Decoder> 643 template <class Decoder>
616 bool PipelineInternal::CreateDecoder() { 644 bool PipelineThread::CreateDecoder() {
617 DCHECK_EQ(MessageLoop::current(), message_loop_); 645 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
618 DCHECK(IsPipelineOk()); 646 DCHECK(IsPipelineOk());
619 647
620 scoped_refptr<Demuxer> demuxer; 648 scoped_refptr<Demuxer> demuxer;
621 GetFilter(&demuxer); 649 GetFilter(&demuxer);
622 DCHECK(demuxer); 650 DCHECK(demuxer);
623 651
624 const std::string major_mime_type = Decoder::major_mime_type(); 652 const std::string major_mime_type = Decoder::major_mime_type();
625 const int num_outputs = demuxer->GetNumberOfStreams(); 653 const int num_outputs = demuxer->GetNumberOfStreams();
626 for (int i = 0; i < num_outputs; ++i) { 654 for (int i = 0; i < num_outputs; ++i) {
627 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); 655 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
628 std::string value; 656 std::string value;
629 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && 657 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
630 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { 658 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
631 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); 659 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream);
632 return true; 660 return true;
633 } 661 }
634 } 662 }
635 return false; 663 return false;
636 } 664 }
637 665
638 template <class Decoder, class Renderer> 666 template <class Decoder, class Renderer>
639 bool PipelineInternal::CreateRenderer() { 667 bool PipelineThread::CreateRenderer() {
640 DCHECK_EQ(MessageLoop::current(), message_loop_); 668 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
641 DCHECK(IsPipelineOk()); 669 DCHECK(IsPipelineOk());
642 670
643 scoped_refptr<Decoder> decoder; 671 scoped_refptr<Decoder> decoder;
644 GetFilter(&decoder); 672 GetFilter(&decoder);
645 673
646 if (decoder) { 674 if (decoder) {
647 // If the decoder was created. 675 // If the decoder was created.
648 const std::string major_mime_type = Decoder::major_mime_type(); 676 const std::string major_mime_type = Decoder::major_mime_type();
649 CreateFilter<Renderer, Decoder>(filter_factory_, decoder); 677 CreateFilter<Renderer, Decoder>(filter_factory_, decoder);
650 return true; 678 return true;
651 } 679 }
652 return false; 680 return false;
653 } 681 }
654 682
655 template <class Filter> 683 template <class Filter>
656 void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const { 684 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
657 DCHECK_EQ(MessageLoop::current(), message_loop_); 685 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
658 686
659 *filter_out = NULL; 687 *filter_out = NULL;
660 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); 688 for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
661 iter != filter_hosts_.end() && NULL == *filter_out; 689 iter != filter_hosts_.end() && NULL == *filter_out;
662 iter++) { 690 iter++) {
663 (*iter)->GetFilter(filter_out); 691 (*iter)->GetFilter(filter_out);
664 } 692 }
665 } 693 }
666 694
667 void PipelineInternal::DestroyFilters() {
668 // Stop every filter.
669 for (FilterHostVector::iterator iter = filter_hosts_.begin();
670 iter != filter_hosts_.end();
671 ++iter) {
672 (*iter)->Stop();
673 }
674
675 // Crude blocking counter implementation.
676 Lock lock;
677 ConditionVariable wait_for_zero(&lock);
678 int count = filter_threads_.size();
679
680 // Post a task to every filter's thread to ensure that they've completed their
681 // stopping logic before stopping the threads themselves.
682 //
683 // TODO(scherkus): again, Stop() should either be synchronous or we should
684 // receive a signal from filters that they have indeed stopped.
685 for (FilterThreadVector::iterator iter = filter_threads_.begin();
686 iter != filter_threads_.end();
687 ++iter) {
688 (*iter)->message_loop()->PostTask(FROM_HERE,
689 NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
690 }
691
692 // Wait on our "blocking counter".
693 {
694 AutoLock auto_lock(lock);
695 while (count > 0) {
696 wait_for_zero.Wait();
697 }
698 }
699
700 // Stop every running filter thread.
701 //
702 // TODO(scherkus): can we watchdog this section to detect wedged threads?
703 for (FilterThreadVector::iterator iter = filter_threads_.begin();
704 iter != filter_threads_.end();
705 ++iter) {
706 (*iter)->Stop();
707 }
708
709 // Reset the pipeline, which will decrement a reference to this object.
710 // We will get destroyed as soon as the remaining tasks finish executing.
711 // To be safe, we'll set our pipeline reference to NULL.
712 STLDeleteElements(&filter_hosts_);
713 STLDeleteElements(&filter_threads_);
714 }
715
716 } // namespace media 695 } // namespace media
OLDNEW
« no previous file with comments | « media/base/pipeline_impl.h ('k') | media/base/pipeline_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698