OLD | NEW |
---|---|
1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 // | 4 // |
5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, | 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, |
6 // potential deadlocks, nested message loops, etc... | 6 // potential deadlocks, etc... |
7 | 7 |
8 #include "base/compiler_specific.h" | 8 #include "base/compiler_specific.h" |
9 #include "base/condition_variable.h" | 9 #include "base/condition_variable.h" |
10 #include "base/stl_util-inl.h" | 10 #include "base/stl_util-inl.h" |
11 #include "media/base/filter_host_impl.h" | 11 #include "media/base/filter_host_impl.h" |
12 #include "media/base/media_format.h" | 12 #include "media/base/media_format.h" |
13 #include "media/base/pipeline_impl.h" | 13 #include "media/base/pipeline_impl.h" |
14 | 14 |
15 namespace media { | 15 namespace media { |
16 | 16 |
(...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
282 rendered_mime_types_.insert(major_mime_type); | 282 rendered_mime_types_.insert(major_mime_type); |
283 } | 283 } |
284 | 284 |
285 | 285 |
286 //----------------------------------------------------------------------------- | 286 //----------------------------------------------------------------------------- |
287 | 287 |
288 PipelineThread::PipelineThread(PipelineImpl* pipeline) | 288 PipelineThread::PipelineThread(PipelineImpl* pipeline) |
289 : pipeline_(pipeline), | 289 : pipeline_(pipeline), |
290 thread_("PipelineThread"), | 290 thread_("PipelineThread"), |
291 time_update_callback_scheduled_(false), | 291 time_update_callback_scheduled_(false), |
292 host_initializing_(NULL) { | 292 state_(kCreated) { |
293 } | 293 } |
294 | 294 |
295 PipelineThread::~PipelineThread() { | 295 PipelineThread::~PipelineThread() { |
296 Stop(); | 296 Stop(); |
297 DCHECK(state_ == kStopped || state_ == kError); | |
297 } | 298 } |
298 | 299 |
299 // This method is called on the client's thread. It starts the pipeline's | 300 // This method is called on the client's thread. It starts the pipeline's |
300 // dedicated thread and posts a task to call the StartTask method on that | 301 // dedicated thread and posts a task to call the StartTask() method on that |
301 // thread. | 302 // thread. |
302 bool PipelineThread::Start(FilterFactory* filter_factory, | 303 bool PipelineThread::Start(FilterFactory* filter_factory, |
303 const std::string& url, | 304 const std::string& url, |
304 PipelineCallback* init_complete_callback) { | 305 PipelineCallback* init_complete_callback) { |
306 DCHECK_EQ(kCreated, state_); | |
305 if (thread_.Start()) { | 307 if (thread_.Start()) { |
306 filter_factory->AddRef(); | 308 filter_factory_ = filter_factory; |
307 PostTask(NewRunnableMethod(this, | 309 url_ = url; |
308 &PipelineThread::StartTask, | 310 init_callback_.reset(init_complete_callback); |
309 filter_factory, | 311 PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
310 url, | |
311 // TODO(ralphl): what happens to this callback? | |
312 // is it copied by NewRunnableTask? Just pointer | |
313 // or is the callback itself copied? | |
314 init_complete_callback)); | |
315 return true; | 312 return true; |
316 } | 313 } |
317 return false; | 314 return false; |
318 } | 315 } |
319 | 316 |
320 // Called on the client's thread. If the thread has been started, then posts | 317 // Called on the client's thread. If the thread has been started, then posts |
321 // a task to call the StopTask method, then waits until the thread has stopped. | 318 // a task to call the StopTask() method, then waits until the thread has |
322 // There is a critical section that wraps the entire duration of the StartTask | 319 // stopped. |
323 // method. This method waits for that Lock to be released so that we know | |
324 // that the thread is not executing a nested message loop. This way we know | |
325 // that that Thread::Stop call will quit the appropriate message loop. | |
326 // | |
327 // TODO(scherkus): this can potentially deadlock, hack away our lock usage!! | |
328 void PipelineThread::Stop() { | 320 void PipelineThread::Stop() { |
329 if (thread_.IsRunning()) { | 321 if (thread_.IsRunning()) { |
330 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); | 322 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); |
331 AutoLock lock_crit(initialization_lock_); | |
332 thread_.Stop(); | 323 thread_.Stop(); |
333 } | 324 } |
334 DCHECK(filter_hosts_.empty()); | 325 DCHECK(filter_hosts_.empty()); |
335 DCHECK(filter_threads_.empty()); | 326 DCHECK(filter_threads_.empty()); |
336 } | 327 } |
337 | 328 |
338 // Called on client's thread. | 329 // Called on client's thread. |
339 void PipelineThread::SetPlaybackRate(float rate) { | 330 void PipelineThread::SetPlaybackRate(float rate) { |
340 PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); | 331 PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); |
341 } | 332 } |
342 | 333 |
343 // Called on client's thread. | 334 // Called on client's thread. |
344 void PipelineThread::Seek(base::TimeDelta time, | 335 void PipelineThread::Seek(base::TimeDelta time, |
345 PipelineCallback* seek_callback) { | 336 PipelineCallback* seek_callback) { |
346 PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time, | 337 PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time, |
347 seek_callback)); | 338 seek_callback)); |
348 } | 339 } |
349 | 340 |
350 // Called on client's thread. | 341 // Called on client's thread. |
351 void PipelineThread::SetVolume(float volume) { | 342 void PipelineThread::SetVolume(float volume) { |
352 PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); | 343 PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); |
353 } | 344 } |
354 | 345 |
355 // May be called on any thread, and therefore we always assume the worst | |
356 // possible race condition. This could, for example, be called from a filter's | |
357 // thread just as the pipeline thread is exiting the call to the filter's | |
358 // Initialize() method. Therefore, we make NO assumptions, and post work | |
359 // in every case, even the trivial one of a thread calling this method from | |
360 // within it's Initialize method. This means that we will always run a nested | |
361 // message loop, and the InitializationCompleteTask will Quit that loop | |
362 // immediately in the trivial case. | |
363 void PipelineThread::InitializationComplete(FilterHostImpl* host) { | 346 void PipelineThread::InitializationComplete(FilterHostImpl* host) { |
364 DCHECK(host == host_initializing_); | 347 if (IsPipelineOk()) { |
365 PostTask(NewRunnableMethod(this, | 348 // Continue the start task by proceeding to the next stage. |
366 &PipelineThread::InitializationCompleteTask, | 349 PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
367 host)); | 350 } |
368 } | 351 } |
369 | 352 |
370 // Called from any thread. Updates the pipeline time and schedules a task to | 353 // Called from any thread. Updates the pipeline time and schedules a task to |
371 // call back to filters that have registered a callback for time updates. | 354 // call back to filters that have registered a callback for time updates. |
372 void PipelineThread::SetTime(base::TimeDelta time) { | 355 void PipelineThread::SetTime(base::TimeDelta time) { |
373 pipeline()->SetTime(time); | 356 pipeline()->SetTime(time); |
374 if (!time_update_callback_scheduled_) { | 357 if (!time_update_callback_scheduled_) { |
375 time_update_callback_scheduled_ = true; | 358 time_update_callback_scheduled_ = true; |
376 PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask)); | 359 PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask)); |
377 } | 360 } |
378 } | 361 } |
379 | 362 |
380 // Called from any thread. Sets the pipeline error_ member and schedules a | 363 // Called from any thread. Sets the pipeline |error_| member and schedules a |
381 // task to stop all the filters in the pipeline. Note that the thread will | 364 // task to stop all the filters in the pipeline. Note that the thread will |
382 // continue to run until the client calls Pipeline::Stop, but nothing will | 365 // continue to run until the client calls Pipeline::Stop(), but nothing will |
383 // be processed since filters will not be able to post tasks. | 366 // be processed since filters will not be able to post tasks. |
384 void PipelineThread::Error(PipelineError error) { | 367 void PipelineThread::Error(PipelineError error) { |
385 // If this method returns false, then an error has already happened, so no | 368 // If this method returns false, then an error has already happened, so no |
386 // reason to run the StopTask again. It's going to happen. | 369 // reason to run the StopTask again. It's going to happen. |
387 if (pipeline()->InternalSetError(error)) { | 370 if (pipeline()->InternalSetError(error)) { |
388 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); | 371 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); |
389 } | 372 } |
390 } | 373 } |
391 | 374 |
392 // Called from any thread. Used by FilterHostImpl::PostTask method and used | 375 // This is a helper method to post task on message_loop(). This method is only |
393 // internally. | 376 // called from this class or from Pipeline. |
394 void PipelineThread::PostTask(Task* task) { | 377 void PipelineThread::PostTask(Task* task) { |
395 message_loop()->PostTask(FROM_HERE, task); | 378 message_loop()->PostTask(FROM_HERE, task); |
396 } | 379 } |
397 | 380 |
398 | 381 |
399 // Main initialization method called on the pipeline thread. This code attempts | 382 // Main initialization method called on the pipeline thread. This code attempts |
400 // to use the specified filter factory to build a pipeline. It starts by | 383 // to use the specified filter factory to build a pipeline. |
401 // creating a DataSource, connects it to a Demuxer, and then connects the | 384 // Initialization step performed in this method depends on current state of this |
402 // Demuxer's audio stream to an AudioDecoder which is then connected to an | 385 // object, indicated by |state_|. After each step of initialization, this |
403 // AudioRenderer. If the media has video, then it connects a VideoDecoder to | 386 // object transits to the next stage. It starts by creating a DataSource, |
404 // the Demuxer's video stream, and then connects the VideoDecoder to a | 387 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an |
405 // VideoRenderer. When all required filters have been created and have called | 388 // AudioDecoder which is then connected to an AudioRenderer. If the media has |
406 // their FilterHost's InitializationComplete method, the pipeline's | 389 // video, then it connects a VideoDecoder to the Demuxer's video stream, and |
407 // initialized_ member is set to true, and, if the client provided an | 390 // then connects the VideoDecoder to a VideoRenderer. |
408 // init_complete_callback, it is called with "true". | 391 // |
409 // If initializatoin fails, the client's callback will still be called, but | 392 // When all required filters have been created and have called their |
393 // FilterHost's InitializationComplete method, the pipeline's |initialized_| | |
394 // member is set to true, and, if the client provided an | |
395 // |init_complete_callback_|, it is called with "true". | |
396 // | |
397 // If initialization fails, the client's callback will still be called, but | |
410 // the bool parameter passed to it will be false. | 398 // the bool parameter passed to it will be false. |
411 // | 399 // |
412 // Note that at each step in this process, the initialization of any filter | 400 // TODO(hclam): StartTask is now starting the pipeline asynchronous. It |
awong
2009/07/01 23:39:46
asynchronous -> asynchronously
| |
413 // may require running the pipeline thread's message loop recursively. This is | 401 // works like a big state change table. If we no longer need to start filters |
414 // handled by the CreateFilter method. | 402 // in order, we need to get rid of all the state change. |
415 void PipelineThread::StartTask(FilterFactory* filter_factory, | 403 void PipelineThread::StartTask() { |
416 const std::string& url, | 404 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
417 PipelineCallback* init_complete_callback) { | |
418 // During the entire StartTask we hold the initialization_lock_ so that | |
419 // if the client calls the Pipeline::Stop method while we are running a | |
420 // nested message loop, we can correctly unwind out of it before calling | |
421 // the Thread::Stop method. | |
422 AutoLock auto_lock(initialization_lock_); | |
423 | 405 |
424 // Add ourselves as a destruction observer of the thread's message loop so | 406 // If we have received the stop signal, return immediately. |
425 // we can delete filters at an appropriate time (when all tasks have been | 407 if (state_ == kStopped) |
426 // processed and the thread is about to be destroyed). | 408 return; |
427 message_loop()->AddDestructionObserver(this); | |
428 | 409 |
429 scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url); | 410 DCHECK(state_ == kCreated || IsPipelineInitializing()); |
430 if (PipelineOk()) { | 411 |
431 scoped_refptr<Demuxer> demuxer = | 412 // Just created, create data source. |
432 CreateFilter<Demuxer, DataSource>(filter_factory, data_source); | 413 if (state_ == kCreated) { |
433 if (PipelineOk()) { | 414 message_loop()->AddDestructionObserver(this); |
434 Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer); | 415 state_ = kInitDataSource; |
435 } | 416 CreateDataSource(); |
436 if (PipelineOk()) { | 417 return; |
437 Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer); | 418 } |
419 | |
420 // Data source created, create demuxer. | |
421 if (state_ == kInitDataSource) { | |
422 state_ = kInitDemuxer; | |
423 CreateDemuxer(); | |
424 return; | |
425 } | |
426 | |
427 // Demuxer created, create audio decoder. | |
428 if (state_ == kInitDemuxer) { | |
429 state_ = kInitAudioDecoder; | |
430 // If this method returns false, then there's no audio stream. | |
431 if (CreateDecoder<AudioDecoder>()) | |
432 return; | |
433 } | |
434 | |
435 // Assuming audio decoder was created, create audio renderer. | |
436 if (state_ == kInitAudioDecoder) { | |
437 state_ = kInitAudioRenderer; | |
438 // Returns false if there's no audio stream. | |
439 if (CreateRenderer<AudioDecoder, AudioRenderer>()) { | |
440 pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type()); | |
441 return; | |
438 } | 442 } |
439 } | 443 } |
440 | 444 |
441 if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) { | 445 // Assuming audio renderer was created, create video decoder. |
442 Error(PIPELINE_ERROR_COULD_NOT_RENDER); | 446 if (state_ == kInitAudioRenderer) { |
447 // Then perform the stage of initialization, i.e. initialize video decoder. | |
448 state_ = kInitVideoDecoder; | |
449 if (CreateDecoder<VideoDecoder>()) | |
450 return; | |
443 } | 451 } |
444 | 452 |
445 pipeline_->initialized_ = PipelineOk(); | 453 // Assuming video decoder was created, create video renderer. |
454 if (state_ == kInitVideoDecoder) { | |
455 state_ = kInitVideoRenderer; | |
456 if (CreateRenderer<VideoDecoder, VideoRenderer>()) { | |
457 pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type()); | |
458 return; | |
459 } | |
460 } | |
446 | 461 |
447 // No matter what, we're done with the filter factory, and | 462 if (state_ == kInitVideoRenderer) { |
448 // client callback so get rid of them. | 463 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { |
449 filter_factory->Release(); | 464 Error(PIPELINE_ERROR_COULD_NOT_RENDER); |
450 if (init_complete_callback) { | 465 return; |
451 init_complete_callback->Run(pipeline_->initialized_); | 466 } |
452 delete init_complete_callback; | 467 |
468 state_ = kStarted; | |
469 pipeline_->initialized_ = true; | |
470 filter_factory_ = NULL; | |
471 if (init_callback_.get()) { | |
472 init_callback_->Run(true); | |
473 init_callback_.reset(); | |
474 } | |
453 } | 475 } |
454 } | 476 } |
455 | 477 |
456 // This method is called as a result of the client calling Pipeline::Stop() or | 478 // This method is called as a result of the client calling Pipeline::Stop() or |
457 // as the result of an error condition. If there is no error, then set the | 479 // as the result of an error condition. If there is no error, then set the |
458 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the | 480 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the |
459 // reverse order. | 481 // reverse order. |
460 // | 482 // |
461 // TODO(scherkus): beware! this can get posted multiple times since we post | 483 // TODO(scherkus): beware! this can get posted multiple times since we post |
462 // Stop() tasks even if we've already stopped. Perhaps this should no-op for | 484 // Stop() tasks even if we've already stopped. Perhaps this should no-op for |
463 // additional calls, however most of this logic will be changing. | 485 // additional calls, however most of this logic will be changing. |
464 void PipelineThread::StopTask() { | 486 void PipelineThread::StopTask() { |
465 if (PipelineOk()) { | 487 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
488 | |
489 if (IsPipelineInitializing()) { | |
490 // If IsPipelineOk() is true, the pipeline was simply stopped during | |
491 // initialization. Otherwise it is a failure. | |
492 state_ = IsPipelineOk() ? kStopped : kError; | |
493 filter_factory_ = NULL; | |
494 if (init_callback_.get()) { | |
495 init_callback_->Run(false); | |
496 init_callback_.reset(); | |
497 } | |
498 } else { | |
499 state_ = kStopped; | |
500 } | |
501 | |
502 if (IsPipelineOk()) { | |
466 pipeline_->error_ = PIPELINE_STOPPING; | 503 pipeline_->error_ = PIPELINE_STOPPING; |
467 } | 504 } |
468 | 505 |
469 // Stop every filter. | 506 // Stop every filter. |
470 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 507 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
471 iter != filter_hosts_.end(); | 508 iter != filter_hosts_.end(); |
472 ++iter) { | 509 ++iter) { |
473 (*iter)->Stop(); | 510 (*iter)->Stop(); |
474 } | 511 } |
475 | 512 |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
511 } | 548 } |
512 | 549 |
513 // Stop every running filter thread. | 550 // Stop every running filter thread. |
514 // | 551 // |
515 // TODO(scherkus): can we watchdog this section to detect wedged threads? | 552 // TODO(scherkus): can we watchdog this section to detect wedged threads? |
516 for (FilterThreadVector::iterator iter = running_threads.begin(); | 553 for (FilterThreadVector::iterator iter = running_threads.begin(); |
517 iter != running_threads.end(); | 554 iter != running_threads.end(); |
518 ++iter) { | 555 ++iter) { |
519 (*iter)->Stop(); | 556 (*iter)->Stop(); |
520 } | 557 } |
521 | |
522 if (host_initializing_) { | |
523 host_initializing_ = NULL; | |
524 message_loop()->Quit(); | |
525 } | |
526 } | |
527 | |
528 template <class Decoder, class Renderer> | |
529 void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) { | |
530 DCHECK(PipelineOk()); | |
531 const std::string major_mime_type = Decoder::major_mime_type(); | |
532 const int num_outputs = demuxer->GetNumberOfStreams(); | |
533 for (int i = 0; i < num_outputs; ++i) { | |
534 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); | |
535 std::string value; | |
536 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && | |
537 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { | |
538 scoped_refptr<Decoder> decoder = | |
539 CreateFilter<Decoder, DemuxerStream>(filter_factory, stream); | |
540 if (PipelineOk()) { | |
541 DCHECK(decoder); | |
542 CreateFilter<Renderer, Decoder>(filter_factory, decoder); | |
543 } | |
544 if (PipelineOk()) { | |
545 pipeline_->InsertRenderedMimeType(major_mime_type); | |
546 } | |
547 break; | |
548 } | |
549 } | |
550 } | |
551 | |
552 | |
553 // Task runs as a result of a filter calling InitializationComplete. If for | |
554 // some reason StopTask has been executed prior to this, the host_initializing_ | |
555 // member will be NULL, and the message loop will have been quit already, so | |
556 // we don't want to do it again. | |
557 void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) { | |
558 if (host == host_initializing_) { | |
559 host_initializing_ = NULL; | |
560 message_loop()->Quit(); | |
561 } else { | |
562 DCHECK(!host_initializing_); | |
563 } | |
564 } | 558 } |
565 | 559 |
566 void PipelineThread::SetPlaybackRateTask(float rate) { | 560 void PipelineThread::SetPlaybackRateTask(float rate) { |
561 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
562 | |
567 pipeline_->InternalSetPlaybackRate(rate); | 563 pipeline_->InternalSetPlaybackRate(rate); |
568 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 564 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
569 iter != filter_hosts_.end(); | 565 iter != filter_hosts_.end(); |
570 ++iter) { | 566 ++iter) { |
571 (*iter)->media_filter()->SetPlaybackRate(rate); | 567 (*iter)->media_filter()->SetPlaybackRate(rate); |
572 } | 568 } |
573 } | 569 } |
574 | 570 |
575 void PipelineThread::SeekTask(base::TimeDelta time, | 571 void PipelineThread::SeekTask(base::TimeDelta time, |
576 PipelineCallback* seek_callback) { | 572 PipelineCallback* seek_callback) { |
573 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
574 | |
577 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 575 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
578 iter != filter_hosts_.end(); | 576 iter != filter_hosts_.end(); |
579 ++iter) { | 577 ++iter) { |
580 (*iter)->media_filter()->Seek(time); | 578 (*iter)->media_filter()->Seek(time); |
581 } | 579 } |
582 | 580 |
583 // TODO(hclam): we should set the time when the above seek operations were all | 581 // TODO(hclam): we should set the time when the above seek operations were all |
584 // successful and first frame/packet at the desired time is decoded. I'm | 582 // successful and first frame/packet at the desired time is decoded. I'm |
585 // setting the time here because once we do the callback the user can ask for | 583 // setting the time here because once we do the callback the user can ask for |
586 // current time immediately, which is the old time. In order to get rid this | 584 // current time immediately, which is the old time. In order to get rid this |
587 // little glitch, we either assume the seek was successful and time is updated | 585 // little glitch, we either assume the seek was successful and time is updated |
588 // immediately here or we set time and do callback when we have new | 586 // immediately here or we set time and do callback when we have new |
589 // frames/packets. | 587 // frames/packets. |
590 SetTime(time); | 588 SetTime(time); |
591 if (seek_callback) { | 589 if (seek_callback) { |
592 seek_callback->Run(true); | 590 seek_callback->Run(true); |
593 delete seek_callback; | 591 delete seek_callback; |
594 } | 592 } |
595 } | 593 } |
596 | 594 |
597 void PipelineThread::SetVolumeTask(float volume) { | 595 void PipelineThread::SetVolumeTask(float volume) { |
596 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
597 | |
598 pipeline_->volume_ = volume; | 598 pipeline_->volume_ = volume; |
599 scoped_refptr<AudioRenderer> audio_renderer; | 599 scoped_refptr<AudioRenderer> audio_renderer; |
600 GetFilter(&audio_renderer); | 600 GetFilter(&audio_renderer); |
601 if (audio_renderer) { | 601 if (audio_renderer) { |
602 audio_renderer->SetVolume(volume); | 602 audio_renderer->SetVolume(volume); |
603 } | 603 } |
604 } | 604 } |
605 | 605 |
606 void PipelineThread::SetTimeTask() { | 606 void PipelineThread::SetTimeTask() { |
607 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
608 | |
607 time_update_callback_scheduled_ = false; | 609 time_update_callback_scheduled_ = false; |
608 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 610 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
609 iter != filter_hosts_.end(); | 611 iter != filter_hosts_.end(); |
610 ++iter) { | 612 ++iter) { |
611 (*iter)->RunTimeUpdateCallback(pipeline_->time_); | 613 (*iter)->RunTimeUpdateCallback(pipeline_->time_); |
612 } | 614 } |
613 } | 615 } |
614 | 616 |
615 template <class Filter> | 617 template <class Filter> |
616 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { | 618 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { |
619 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
620 | |
617 *filter_out = NULL; | 621 *filter_out = NULL; |
618 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); | 622 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); |
619 iter != filter_hosts_.end() && NULL == *filter_out; | 623 iter != filter_hosts_.end() && NULL == *filter_out; |
620 iter++) { | 624 iter++) { |
621 (*iter)->GetFilter(filter_out); | 625 (*iter)->GetFilter(filter_out); |
622 } | 626 } |
623 } | 627 } |
624 | 628 |
625 template <class Filter, class Source> | 629 template <class Filter, class Source> |
626 scoped_refptr<Filter> PipelineThread::CreateFilter( | 630 void PipelineThread::CreateFilter(FilterFactory* filter_factory, |
627 FilterFactory* filter_factory, | 631 Source source, |
628 Source source, | 632 const MediaFormat& media_format) { |
629 const MediaFormat& media_format) { | 633 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
630 DCHECK(PipelineOk()); | 634 DCHECK(IsPipelineOk()); |
635 | |
631 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); | 636 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); |
632 if (!filter) { | 637 if (!filter) { |
633 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); | 638 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); |
634 } else { | 639 } else { |
635 DCHECK(!host_initializing_); | 640 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); |
636 host_initializing_ = new FilterHostImpl(this, filter.get()); | 641 // Create a dedicated thread for this filter. |
637 if (NULL == host_initializing_) { | 642 if (SupportsSetMessageLoop<Filter>()) { |
638 Error(PIPELINE_ERROR_OUT_OF_MEMORY); | 643 // TODO(scherkus): figure out a way to name these threads so it matches |
639 } else { | 644 // the filter type. |
640 // Create a dedicated thread for this filter. | 645 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); |
641 if (SupportsSetMessageLoop<Filter>()) { | 646 if (!thread.get() || !thread->Start()) { |
642 // TODO(scherkus): figure out a way to name these threads so it matches | 647 NOTREACHED() << "Could not start filter thread"; |
643 // the filter type. | 648 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
644 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); | 649 } else { |
645 if (!thread.get() || !thread->Start()) { | 650 filter->SetMessageLoop(thread->message_loop()); |
646 NOTREACHED() << "Could not start filter thread"; | 651 filter_threads_.push_back(thread.release()); |
647 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); | |
648 } else { | |
649 filter->SetMessageLoop(thread->message_loop()); | |
650 filter_threads_.push_back(thread.release()); | |
651 } | |
652 } | 652 } |
653 } | |
653 | 654 |
654 // Creating a thread could have failed, verify we're still OK. | 655 // Creating a thread could have failed, verify we're still OK. |
655 if (PipelineOk()) { | 656 if (IsPipelineOk()) { |
656 filter_hosts_.push_back(host_initializing_); | 657 filter_hosts_.push_back(host.get()); |
657 filter->SetFilterHost(host_initializing_); | 658 filter->SetFilterHost(host.release()); |
658 if (!filter->Initialize(source)) { | 659 if (!filter->Initialize(source)) { |
659 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); | 660 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
660 } | |
661 } | 661 } |
662 } | 662 } |
663 } | 663 } |
664 if (PipelineOk()) { | |
665 // Now we run the thread's message loop recursively. We want all | |
666 // pending tasks to be processed, so we set nestable tasks to be allowed | |
667 // and then run the loop. The only way we exit the loop is as the result | |
668 // of a call to FilterHost::InitializationComplete, FilterHost::Error, or | |
669 // Pipeline::Stop. In each of these cases, the corresponding task method | |
670 // sets host_initializing_ to NULL to signal that the message loop's Quit | |
671 // method has already been called, and then calls message_loop()->Quit(). | |
672 // The setting of |host_initializing_| to NULL in the task prevents a | |
673 // subsequent task from accidentally quitting the wrong (non-nested) loop. | |
674 message_loop()->SetNestableTasksAllowed(true); | |
675 message_loop()->Run(); | |
676 message_loop()->SetNestableTasksAllowed(false); | |
677 DCHECK(!host_initializing_); | |
678 } else { | |
679 // This could still be set if we never ran the message loop (for example, | |
680 // if the fiter returned false from it's Initialize() method), so make sure | |
681 // to reset it. | |
682 host_initializing_ = NULL; | |
683 } | |
684 if (!PipelineOk()) { | |
685 filter = NULL; | |
686 } | |
687 return filter; | |
688 } | 664 } |
689 | 665 |
690 scoped_refptr<DataSource> PipelineThread::CreateDataSource( | 666 void PipelineThread::CreateDataSource() { |
691 FilterFactory* filter_factory, const std::string& url) { | 667 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
668 DCHECK(IsPipelineOk()); | |
669 | |
692 MediaFormat url_format; | 670 MediaFormat url_format; |
693 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); | 671 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); |
694 url_format.SetAsString(MediaFormat::kURL, url); | 672 url_format.SetAsString(MediaFormat::kURL, url_); |
695 return CreateFilter<DataSource>(filter_factory, url, url_format); | 673 CreateFilter<DataSource>(filter_factory_, url_, url_format); |
674 } | |
675 | |
676 void PipelineThread::CreateDemuxer() { | |
677 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
678 DCHECK(IsPipelineOk()); | |
679 | |
680 scoped_refptr<DataSource> data_source; | |
681 GetFilter(&data_source); | |
682 DCHECK(data_source); | |
683 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); | |
684 } | |
685 | |
686 template <class Decoder> | |
687 bool PipelineThread::CreateDecoder() { | |
688 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
689 DCHECK(IsPipelineOk()); | |
690 | |
691 scoped_refptr<Demuxer> demuxer; | |
692 GetFilter(&demuxer); | |
693 DCHECK(demuxer); | |
694 | |
695 const std::string major_mime_type = Decoder::major_mime_type(); | |
696 const int num_outputs = demuxer->GetNumberOfStreams(); | |
697 for (int i = 0; i < num_outputs; ++i) { | |
698 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); | |
699 std::string value; | |
700 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && | |
701 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { | |
702 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); | |
703 return true; | |
704 } | |
705 } | |
706 return false; | |
707 } | |
708 | |
709 template <class Decoder, class Renderer> | |
710 bool PipelineThread::CreateRenderer() { | |
711 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
712 DCHECK(IsPipelineOk()); | |
713 | |
714 scoped_refptr<Decoder> decoder; | |
715 GetFilter(&decoder); | |
716 | |
717 if (decoder) { | |
718 // If the decoder was created. | |
719 const std::string major_mime_type = Decoder::major_mime_type(); | |
720 CreateFilter<Renderer, Decoder>(filter_factory_, decoder); | |
721 return true; | |
722 } | |
723 return false; | |
696 } | 724 } |
697 | 725 |
698 // Called as a result of destruction of the thread. | 726 // Called as a result of destruction of the thread. |
699 // | 727 // |
700 // TODO(scherkus): this can block the client due to synchronous Stop() API call. | 728 // TODO(scherkus): this can block the client due to synchronous Stop() API call. |
701 void PipelineThread::WillDestroyCurrentMessageLoop() { | 729 void PipelineThread::WillDestroyCurrentMessageLoop() { |
702 STLDeleteElements(&filter_hosts_); | 730 STLDeleteElements(&filter_hosts_); |
703 STLDeleteElements(&filter_threads_); | 731 STLDeleteElements(&filter_threads_); |
704 } | 732 } |
705 | 733 |
706 } // namespace media | 734 } // namespace media |
OLD | NEW |