Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(294)

Side by Side Diff: media/base/pipeline_impl.cc

Issue 149123: Asynchronous initialization of media::PipelineThread... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 // 4 //
5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names,
6 // potential deadlocks, nested message loops, etc... 6 // potential deadlocks, etc...
7 7
8 #include "base/compiler_specific.h" 8 #include "base/compiler_specific.h"
9 #include "base/condition_variable.h" 9 #include "base/condition_variable.h"
10 #include "base/stl_util-inl.h" 10 #include "base/stl_util-inl.h"
11 #include "media/base/filter_host_impl.h" 11 #include "media/base/filter_host_impl.h"
12 #include "media/base/media_format.h" 12 #include "media/base/media_format.h"
13 #include "media/base/pipeline_impl.h" 13 #include "media/base/pipeline_impl.h"
14 14
15 namespace media { 15 namespace media {
16 16
(...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after
282 rendered_mime_types_.insert(major_mime_type); 282 rendered_mime_types_.insert(major_mime_type);
283 } 283 }
284 284
285 285
286 //----------------------------------------------------------------------------- 286 //-----------------------------------------------------------------------------
287 287
288 PipelineThread::PipelineThread(PipelineImpl* pipeline) 288 PipelineThread::PipelineThread(PipelineImpl* pipeline)
289 : pipeline_(pipeline), 289 : pipeline_(pipeline),
290 thread_("PipelineThread"), 290 thread_("PipelineThread"),
291 time_update_callback_scheduled_(false), 291 time_update_callback_scheduled_(false),
292 host_initializing_(NULL) { 292 state_(kCreated) {
293 } 293 }
294 294
295 PipelineThread::~PipelineThread() { 295 PipelineThread::~PipelineThread() {
296 Stop(); 296 Stop();
297 DCHECK(state_ == kStopped || state_ == kError);
297 } 298 }
298 299
299 // This method is called on the client's thread. It starts the pipeline's 300 // This method is called on the client's thread. It starts the pipeline's
300 // dedicated thread and posts a task to call the StartTask method on that 301 // dedicated thread and posts a task to call the StartTask() method on that
301 // thread. 302 // thread.
302 bool PipelineThread::Start(FilterFactory* filter_factory, 303 bool PipelineThread::Start(FilterFactory* filter_factory,
303 const std::string& url, 304 const std::string& url,
304 PipelineCallback* init_complete_callback) { 305 PipelineCallback* init_complete_callback) {
306 DCHECK_EQ(kCreated, state_);
305 if (thread_.Start()) { 307 if (thread_.Start()) {
306 filter_factory->AddRef(); 308 filter_factory_ = filter_factory;
307 PostTask(NewRunnableMethod(this, 309 url_ = url;
308 &PipelineThread::StartTask, 310 init_callback_.reset(init_complete_callback);
309 filter_factory, 311 PostTask(NewRunnableMethod(this, &PipelineThread::StartTask));
310 url,
311 // TODO(ralphl): what happens to this callback?
312 // is it copied by NewRunnableTask? Just pointer
313 // or is the callback itself copied?
314 init_complete_callback));
315 return true; 312 return true;
316 } 313 }
317 return false; 314 return false;
318 } 315 }
319 316
320 // Called on the client's thread. If the thread has been started, then posts 317 // Called on the client's thread. If the thread has been started, then posts
321 // a task to call the StopTask method, then waits until the thread has stopped. 318 // a task to call the StopTask() method, then waits until the thread has
322 // There is a critical section that wraps the entire duration of the StartTask 319 // stopped.
323 // method. This method waits for that Lock to be released so that we know
324 // that the thread is not executing a nested message loop. This way we know
325 // that that Thread::Stop call will quit the appropriate message loop.
326 //
327 // TODO(scherkus): this can potentially deadlock, hack away our lock usage!!
328 void PipelineThread::Stop() { 320 void PipelineThread::Stop() {
329 if (thread_.IsRunning()) { 321 if (thread_.IsRunning()) {
330 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); 322 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
331 AutoLock lock_crit(initialization_lock_);
332 thread_.Stop(); 323 thread_.Stop();
333 } 324 }
334 DCHECK(filter_hosts_.empty()); 325 DCHECK(filter_hosts_.empty());
335 DCHECK(filter_threads_.empty()); 326 DCHECK(filter_threads_.empty());
336 } 327 }
337 328
338 // Called on client's thread. 329 // Called on client's thread.
339 void PipelineThread::SetPlaybackRate(float rate) { 330 void PipelineThread::SetPlaybackRate(float rate) {
340 PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); 331 PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
341 } 332 }
342 333
343 // Called on client's thread. 334 // Called on client's thread.
344 void PipelineThread::Seek(base::TimeDelta time, 335 void PipelineThread::Seek(base::TimeDelta time,
345 PipelineCallback* seek_callback) { 336 PipelineCallback* seek_callback) {
346 PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time, 337 PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time,
347 seek_callback)); 338 seek_callback));
348 } 339 }
349 340
350 // Called on client's thread. 341 // Called on client's thread.
351 void PipelineThread::SetVolume(float volume) { 342 void PipelineThread::SetVolume(float volume) {
352 PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); 343 PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
353 } 344 }
354 345
355 // May be called on any thread, and therefore we always assume the worst
356 // possible race condition. This could, for example, be called from a filter's
357 // thread just as the pipeline thread is exiting the call to the filter's
358 // Initialize() method. Therefore, we make NO assumptions, and post work
359 // in every case, even the trivial one of a thread calling this method from
360 // within it's Initialize method. This means that we will always run a nested
361 // message loop, and the InitializationCompleteTask will Quit that loop
362 // immediately in the trivial case.
363 void PipelineThread::InitializationComplete(FilterHostImpl* host) { 346 void PipelineThread::InitializationComplete(FilterHostImpl* host) {
364 DCHECK(host == host_initializing_); 347 if (IsPipelineOk()) {
365 PostTask(NewRunnableMethod(this, 348 // Continue the start task by proceeding to the next stage.
366 &PipelineThread::InitializationCompleteTask, 349 PostTask(NewRunnableMethod(this, &PipelineThread::StartTask));
367 host)); 350 }
368 } 351 }
369 352
370 // Called from any thread. Updates the pipeline time and schedules a task to 353 // Called from any thread. Updates the pipeline time and schedules a task to
371 // call back to filters that have registered a callback for time updates. 354 // call back to filters that have registered a callback for time updates.
372 void PipelineThread::SetTime(base::TimeDelta time) { 355 void PipelineThread::SetTime(base::TimeDelta time) {
373 pipeline()->SetTime(time); 356 pipeline()->SetTime(time);
374 if (!time_update_callback_scheduled_) { 357 if (!time_update_callback_scheduled_) {
375 time_update_callback_scheduled_ = true; 358 time_update_callback_scheduled_ = true;
376 PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask)); 359 PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask));
377 } 360 }
378 } 361 }
379 362
380 // Called from any thread. Sets the pipeline error_ member and schedules a 363 // Called from any thread. Sets the pipeline |error_| member and schedules a
381 // task to stop all the filters in the pipeline. Note that the thread will 364 // task to stop all the filters in the pipeline. Note that the thread will
382 // continue to run until the client calls Pipeline::Stop, but nothing will 365 // continue to run until the client calls Pipeline::Stop(), but nothing will
383 // be processed since filters will not be able to post tasks. 366 // be processed since filters will not be able to post tasks.
384 void PipelineThread::Error(PipelineError error) { 367 void PipelineThread::Error(PipelineError error) {
385 // If this method returns false, then an error has already happened, so no 368 // If this method returns false, then an error has already happened, so no
386 // reason to run the StopTask again. It's going to happen. 369 // reason to run the StopTask again. It's going to happen.
387 if (pipeline()->InternalSetError(error)) { 370 if (pipeline()->InternalSetError(error)) {
388 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); 371 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
389 } 372 }
390 } 373 }
391 374
392 // Called from any thread. Used by FilterHostImpl::PostTask method and used 375 // This is a helper method to post task on message_loop(). This method is only
393 // internally. 376 // called from this class or from Pipeline.
394 void PipelineThread::PostTask(Task* task) { 377 void PipelineThread::PostTask(Task* task) {
395 message_loop()->PostTask(FROM_HERE, task); 378 message_loop()->PostTask(FROM_HERE, task);
396 } 379 }
397 380
398 381
399 // Main initialization method called on the pipeline thread. This code attempts 382 // Main initialization method called on the pipeline thread. This code attempts
400 // to use the specified filter factory to build a pipeline. It starts by 383 // to use the specified filter factory to build a pipeline.
401 // creating a DataSource, connects it to a Demuxer, and then connects the 384 // Initialization step performed in this method depends on current state of this
402 // Demuxer's audio stream to an AudioDecoder which is then connected to an 385 // object, indicated by |state_|. After each step of initialization, this
403 // AudioRenderer. If the media has video, then it connects a VideoDecoder to 386 // object transits to the next stage. It starts by creating a DataSource,
404 // the Demuxer's video stream, and then connects the VideoDecoder to a 387 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an
405 // VideoRenderer. When all required filters have been created and have called 388 // AudioDecoder which is then connected to an AudioRenderer. If the media has
406 // their FilterHost's InitializationComplete method, the pipeline's 389 // video, then it connects a VideoDecoder to the Demuxer's video stream, and
407 // initialized_ member is set to true, and, if the client provided an 390 // then connects the VideoDecoder to a VideoRenderer.
408 // init_complete_callback, it is called with "true". 391 //
409 // If initializatoin fails, the client's callback will still be called, but 392 // When all required filters have been created and have called their
393 // FilterHost's InitializationComplete method, the pipeline's |initialized_|
394 // member is set to true, and, if the client provided an
395 // |init_complete_callback_|, it is called with "true".
396 //
397 // If initialization fails, the client's callback will still be called, but
410 // the bool parameter passed to it will be false. 398 // the bool parameter passed to it will be false.
411 // 399 //
412 // Note that at each step in this process, the initialization of any filter 400 // TODO(hclam): StartTask is now starting the pipeline asynchronous. It
awong 2009/07/01 23:39:46 asynchronous -> asynchronously
413 // may require running the pipeline thread's message loop recursively. This is 401 // works like a big state change table. If we no longer need to start filters
414 // handled by the CreateFilter method. 402 // in order, we need to get rid of all the state change.
415 void PipelineThread::StartTask(FilterFactory* filter_factory, 403 void PipelineThread::StartTask() {
416 const std::string& url, 404 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
417 PipelineCallback* init_complete_callback) {
418 // During the entire StartTask we hold the initialization_lock_ so that
419 // if the client calls the Pipeline::Stop method while we are running a
420 // nested message loop, we can correctly unwind out of it before calling
421 // the Thread::Stop method.
422 AutoLock auto_lock(initialization_lock_);
423 405
424 // Add ourselves as a destruction observer of the thread's message loop so 406 // If we have received the stop signal, return immediately.
425 // we can delete filters at an appropriate time (when all tasks have been 407 if (state_ == kStopped)
426 // processed and the thread is about to be destroyed). 408 return;
427 message_loop()->AddDestructionObserver(this);
428 409
429 scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url); 410 DCHECK(state_ == kCreated || IsPipelineInitializing());
430 if (PipelineOk()) { 411
431 scoped_refptr<Demuxer> demuxer = 412 // Just created, create data source.
432 CreateFilter<Demuxer, DataSource>(filter_factory, data_source); 413 if (state_ == kCreated) {
433 if (PipelineOk()) { 414 message_loop()->AddDestructionObserver(this);
434 Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer); 415 state_ = kInitDataSource;
435 } 416 CreateDataSource();
436 if (PipelineOk()) { 417 return;
437 Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer); 418 }
419
420 // Data source created, create demuxer.
421 if (state_ == kInitDataSource) {
422 state_ = kInitDemuxer;
423 CreateDemuxer();
424 return;
425 }
426
427 // Demuxer created, create audio decoder.
428 if (state_ == kInitDemuxer) {
429 state_ = kInitAudioDecoder;
430 // If this method returns false, then there's no audio stream.
431 if (CreateDecoder<AudioDecoder>())
432 return;
433 }
434
435 // Assuming audio decoder was created, create audio renderer.
436 if (state_ == kInitAudioDecoder) {
437 state_ = kInitAudioRenderer;
438 // Returns false if there's no audio stream.
439 if (CreateRenderer<AudioDecoder, AudioRenderer>()) {
440 pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type());
441 return;
438 } 442 }
439 } 443 }
440 444
441 if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) { 445 // Assuming audio renderer was created, create video decoder.
442 Error(PIPELINE_ERROR_COULD_NOT_RENDER); 446 if (state_ == kInitAudioRenderer) {
447 // Then perform the stage of initialization, i.e. initialize video decoder.
448 state_ = kInitVideoDecoder;
449 if (CreateDecoder<VideoDecoder>())
450 return;
443 } 451 }
444 452
445 pipeline_->initialized_ = PipelineOk(); 453 // Assuming video decoder was created, create video renderer.
454 if (state_ == kInitVideoDecoder) {
455 state_ = kInitVideoRenderer;
456 if (CreateRenderer<VideoDecoder, VideoRenderer>()) {
457 pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type());
458 return;
459 }
460 }
446 461
447 // No matter what, we're done with the filter factory, and 462 if (state_ == kInitVideoRenderer) {
448 // client callback so get rid of them. 463 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) {
449 filter_factory->Release(); 464 Error(PIPELINE_ERROR_COULD_NOT_RENDER);
450 if (init_complete_callback) { 465 return;
451 init_complete_callback->Run(pipeline_->initialized_); 466 }
452 delete init_complete_callback; 467
468 state_ = kStarted;
469 pipeline_->initialized_ = true;
470 filter_factory_ = NULL;
471 if (init_callback_.get()) {
472 init_callback_->Run(true);
473 init_callback_.reset();
474 }
453 } 475 }
454 } 476 }
455 477
456 // This method is called as a result of the client calling Pipeline::Stop() or 478 // This method is called as a result of the client calling Pipeline::Stop() or
457 // as the result of an error condition. If there is no error, then set the 479 // as the result of an error condition. If there is no error, then set the
458 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the 480 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
459 // reverse order. 481 // reverse order.
460 // 482 //
461 // TODO(scherkus): beware! this can get posted multiple times since we post 483 // TODO(scherkus): beware! this can get posted multiple times since we post
462 // Stop() tasks even if we've already stopped. Perhaps this should no-op for 484 // Stop() tasks even if we've already stopped. Perhaps this should no-op for
463 // additional calls, however most of this logic will be changing. 485 // additional calls, however most of this logic will be changing.
464 void PipelineThread::StopTask() { 486 void PipelineThread::StopTask() {
465 if (PipelineOk()) { 487 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
488
489 if (IsPipelineInitializing()) {
490 // If IsPipelineOk() is true, the pipeline was simply stopped during
491 // initialization. Otherwise it is a failure.
492 state_ = IsPipelineOk() ? kStopped : kError;
493 filter_factory_ = NULL;
494 if (init_callback_.get()) {
495 init_callback_->Run(false);
496 init_callback_.reset();
497 }
498 } else {
499 state_ = kStopped;
500 }
501
502 if (IsPipelineOk()) {
466 pipeline_->error_ = PIPELINE_STOPPING; 503 pipeline_->error_ = PIPELINE_STOPPING;
467 } 504 }
468 505
469 // Stop every filter. 506 // Stop every filter.
470 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 507 for (FilterHostVector::iterator iter = filter_hosts_.begin();
471 iter != filter_hosts_.end(); 508 iter != filter_hosts_.end();
472 ++iter) { 509 ++iter) {
473 (*iter)->Stop(); 510 (*iter)->Stop();
474 } 511 }
475 512
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
511 } 548 }
512 549
513 // Stop every running filter thread. 550 // Stop every running filter thread.
514 // 551 //
515 // TODO(scherkus): can we watchdog this section to detect wedged threads? 552 // TODO(scherkus): can we watchdog this section to detect wedged threads?
516 for (FilterThreadVector::iterator iter = running_threads.begin(); 553 for (FilterThreadVector::iterator iter = running_threads.begin();
517 iter != running_threads.end(); 554 iter != running_threads.end();
518 ++iter) { 555 ++iter) {
519 (*iter)->Stop(); 556 (*iter)->Stop();
520 } 557 }
521
522 if (host_initializing_) {
523 host_initializing_ = NULL;
524 message_loop()->Quit();
525 }
526 }
527
528 template <class Decoder, class Renderer>
529 void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) {
530 DCHECK(PipelineOk());
531 const std::string major_mime_type = Decoder::major_mime_type();
532 const int num_outputs = demuxer->GetNumberOfStreams();
533 for (int i = 0; i < num_outputs; ++i) {
534 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
535 std::string value;
536 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
537 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
538 scoped_refptr<Decoder> decoder =
539 CreateFilter<Decoder, DemuxerStream>(filter_factory, stream);
540 if (PipelineOk()) {
541 DCHECK(decoder);
542 CreateFilter<Renderer, Decoder>(filter_factory, decoder);
543 }
544 if (PipelineOk()) {
545 pipeline_->InsertRenderedMimeType(major_mime_type);
546 }
547 break;
548 }
549 }
550 }
551
552
553 // Task runs as a result of a filter calling InitializationComplete. If for
554 // some reason StopTask has been executed prior to this, the host_initializing_
555 // member will be NULL, and the message loop will have been quit already, so
556 // we don't want to do it again.
557 void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) {
558 if (host == host_initializing_) {
559 host_initializing_ = NULL;
560 message_loop()->Quit();
561 } else {
562 DCHECK(!host_initializing_);
563 }
564 } 558 }
565 559
566 void PipelineThread::SetPlaybackRateTask(float rate) { 560 void PipelineThread::SetPlaybackRateTask(float rate) {
561 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
562
567 pipeline_->InternalSetPlaybackRate(rate); 563 pipeline_->InternalSetPlaybackRate(rate);
568 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 564 for (FilterHostVector::iterator iter = filter_hosts_.begin();
569 iter != filter_hosts_.end(); 565 iter != filter_hosts_.end();
570 ++iter) { 566 ++iter) {
571 (*iter)->media_filter()->SetPlaybackRate(rate); 567 (*iter)->media_filter()->SetPlaybackRate(rate);
572 } 568 }
573 } 569 }
574 570
575 void PipelineThread::SeekTask(base::TimeDelta time, 571 void PipelineThread::SeekTask(base::TimeDelta time,
576 PipelineCallback* seek_callback) { 572 PipelineCallback* seek_callback) {
573 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
574
577 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 575 for (FilterHostVector::iterator iter = filter_hosts_.begin();
578 iter != filter_hosts_.end(); 576 iter != filter_hosts_.end();
579 ++iter) { 577 ++iter) {
580 (*iter)->media_filter()->Seek(time); 578 (*iter)->media_filter()->Seek(time);
581 } 579 }
582 580
583 // TODO(hclam): we should set the time when the above seek operations were all 581 // TODO(hclam): we should set the time when the above seek operations were all
584 // successful and first frame/packet at the desired time is decoded. I'm 582 // successful and first frame/packet at the desired time is decoded. I'm
585 // setting the time here because once we do the callback the user can ask for 583 // setting the time here because once we do the callback the user can ask for
586 // current time immediately, which is the old time. In order to get rid this 584 // current time immediately, which is the old time. In order to get rid this
587 // little glitch, we either assume the seek was successful and time is updated 585 // little glitch, we either assume the seek was successful and time is updated
588 // immediately here or we set time and do callback when we have new 586 // immediately here or we set time and do callback when we have new
589 // frames/packets. 587 // frames/packets.
590 SetTime(time); 588 SetTime(time);
591 if (seek_callback) { 589 if (seek_callback) {
592 seek_callback->Run(true); 590 seek_callback->Run(true);
593 delete seek_callback; 591 delete seek_callback;
594 } 592 }
595 } 593 }
596 594
597 void PipelineThread::SetVolumeTask(float volume) { 595 void PipelineThread::SetVolumeTask(float volume) {
596 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
597
598 pipeline_->volume_ = volume; 598 pipeline_->volume_ = volume;
599 scoped_refptr<AudioRenderer> audio_renderer; 599 scoped_refptr<AudioRenderer> audio_renderer;
600 GetFilter(&audio_renderer); 600 GetFilter(&audio_renderer);
601 if (audio_renderer) { 601 if (audio_renderer) {
602 audio_renderer->SetVolume(volume); 602 audio_renderer->SetVolume(volume);
603 } 603 }
604 } 604 }
605 605
606 void PipelineThread::SetTimeTask() { 606 void PipelineThread::SetTimeTask() {
607 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
608
607 time_update_callback_scheduled_ = false; 609 time_update_callback_scheduled_ = false;
608 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 610 for (FilterHostVector::iterator iter = filter_hosts_.begin();
609 iter != filter_hosts_.end(); 611 iter != filter_hosts_.end();
610 ++iter) { 612 ++iter) {
611 (*iter)->RunTimeUpdateCallback(pipeline_->time_); 613 (*iter)->RunTimeUpdateCallback(pipeline_->time_);
612 } 614 }
613 } 615 }
614 616
615 template <class Filter> 617 template <class Filter>
616 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { 618 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
619 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
620
617 *filter_out = NULL; 621 *filter_out = NULL;
618 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); 622 for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
619 iter != filter_hosts_.end() && NULL == *filter_out; 623 iter != filter_hosts_.end() && NULL == *filter_out;
620 iter++) { 624 iter++) {
621 (*iter)->GetFilter(filter_out); 625 (*iter)->GetFilter(filter_out);
622 } 626 }
623 } 627 }
624 628
625 template <class Filter, class Source> 629 template <class Filter, class Source>
626 scoped_refptr<Filter> PipelineThread::CreateFilter( 630 void PipelineThread::CreateFilter(FilterFactory* filter_factory,
627 FilterFactory* filter_factory, 631 Source source,
628 Source source, 632 const MediaFormat& media_format) {
629 const MediaFormat& media_format) { 633 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
630 DCHECK(PipelineOk()); 634 DCHECK(IsPipelineOk());
635
631 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); 636 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
632 if (!filter) { 637 if (!filter) {
633 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); 638 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
634 } else { 639 } else {
635 DCHECK(!host_initializing_); 640 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
636 host_initializing_ = new FilterHostImpl(this, filter.get()); 641 // Create a dedicated thread for this filter.
637 if (NULL == host_initializing_) { 642 if (SupportsSetMessageLoop<Filter>()) {
638 Error(PIPELINE_ERROR_OUT_OF_MEMORY); 643 // TODO(scherkus): figure out a way to name these threads so it matches
639 } else { 644 // the filter type.
640 // Create a dedicated thread for this filter. 645 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
641 if (SupportsSetMessageLoop<Filter>()) { 646 if (!thread.get() || !thread->Start()) {
642 // TODO(scherkus): figure out a way to name these threads so it matches 647 NOTREACHED() << "Could not start filter thread";
643 // the filter type. 648 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
644 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); 649 } else {
645 if (!thread.get() || !thread->Start()) { 650 filter->SetMessageLoop(thread->message_loop());
646 NOTREACHED() << "Could not start filter thread"; 651 filter_threads_.push_back(thread.release());
647 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
648 } else {
649 filter->SetMessageLoop(thread->message_loop());
650 filter_threads_.push_back(thread.release());
651 }
652 } 652 }
653 }
653 654
654 // Creating a thread could have failed, verify we're still OK. 655 // Creating a thread could have failed, verify we're still OK.
655 if (PipelineOk()) { 656 if (IsPipelineOk()) {
656 filter_hosts_.push_back(host_initializing_); 657 filter_hosts_.push_back(host.get());
657 filter->SetFilterHost(host_initializing_); 658 filter->SetFilterHost(host.release());
658 if (!filter->Initialize(source)) { 659 if (!filter->Initialize(source)) {
659 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); 660 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
660 }
661 } 661 }
662 } 662 }
663 } 663 }
664 if (PipelineOk()) {
665 // Now we run the thread's message loop recursively. We want all
666 // pending tasks to be processed, so we set nestable tasks to be allowed
667 // and then run the loop. The only way we exit the loop is as the result
668 // of a call to FilterHost::InitializationComplete, FilterHost::Error, or
669 // Pipeline::Stop. In each of these cases, the corresponding task method
670 // sets host_initializing_ to NULL to signal that the message loop's Quit
671 // method has already been called, and then calls message_loop()->Quit().
672 // The setting of |host_initializing_| to NULL in the task prevents a
673 // subsequent task from accidentally quitting the wrong (non-nested) loop.
674 message_loop()->SetNestableTasksAllowed(true);
675 message_loop()->Run();
676 message_loop()->SetNestableTasksAllowed(false);
677 DCHECK(!host_initializing_);
678 } else {
679 // This could still be set if we never ran the message loop (for example,
680 // if the fiter returned false from it's Initialize() method), so make sure
681 // to reset it.
682 host_initializing_ = NULL;
683 }
684 if (!PipelineOk()) {
685 filter = NULL;
686 }
687 return filter;
688 } 664 }
689 665
690 scoped_refptr<DataSource> PipelineThread::CreateDataSource( 666 void PipelineThread::CreateDataSource() {
691 FilterFactory* filter_factory, const std::string& url) { 667 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
668 DCHECK(IsPipelineOk());
669
692 MediaFormat url_format; 670 MediaFormat url_format;
693 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); 671 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
694 url_format.SetAsString(MediaFormat::kURL, url); 672 url_format.SetAsString(MediaFormat::kURL, url_);
695 return CreateFilter<DataSource>(filter_factory, url, url_format); 673 CreateFilter<DataSource>(filter_factory_, url_, url_format);
674 }
675
676 void PipelineThread::CreateDemuxer() {
677 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
678 DCHECK(IsPipelineOk());
679
680 scoped_refptr<DataSource> data_source;
681 GetFilter(&data_source);
682 DCHECK(data_source);
683 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source);
684 }
685
686 template <class Decoder>
687 bool PipelineThread::CreateDecoder() {
688 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
689 DCHECK(IsPipelineOk());
690
691 scoped_refptr<Demuxer> demuxer;
692 GetFilter(&demuxer);
693 DCHECK(demuxer);
694
695 const std::string major_mime_type = Decoder::major_mime_type();
696 const int num_outputs = demuxer->GetNumberOfStreams();
697 for (int i = 0; i < num_outputs; ++i) {
698 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
699 std::string value;
700 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
701 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
702 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream);
703 return true;
704 }
705 }
706 return false;
707 }
708
709 template <class Decoder, class Renderer>
710 bool PipelineThread::CreateRenderer() {
711 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
712 DCHECK(IsPipelineOk());
713
714 scoped_refptr<Decoder> decoder;
715 GetFilter(&decoder);
716
717 if (decoder) {
718 // If the decoder was created.
719 const std::string major_mime_type = Decoder::major_mime_type();
720 CreateFilter<Renderer, Decoder>(filter_factory_, decoder);
721 return true;
722 }
723 return false;
696 } 724 }
697 725
698 // Called as a result of destruction of the thread. 726 // Called as a result of destruction of the thread.
699 // 727 //
700 // TODO(scherkus): this can block the client due to synchronous Stop() API call. 728 // TODO(scherkus): this can block the client due to synchronous Stop() API call.
701 void PipelineThread::WillDestroyCurrentMessageLoop() { 729 void PipelineThread::WillDestroyCurrentMessageLoop() {
702 STLDeleteElements(&filter_hosts_); 730 STLDeleteElements(&filter_hosts_);
703 STLDeleteElements(&filter_threads_); 731 STLDeleteElements(&filter_threads_);
704 } 732 }
705 733
706 } // namespace media 734 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698