Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 // | 4 // |
| 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, | 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, |
| 6 // potential deadlocks, nested message loops, etc... | 6 // potential deadlocks, etc... |
| 7 | 7 |
| 8 #include "base/compiler_specific.h" | 8 #include "base/compiler_specific.h" |
| 9 #include "base/condition_variable.h" | 9 #include "base/condition_variable.h" |
| 10 #include "base/stl_util-inl.h" | 10 #include "base/stl_util-inl.h" |
| 11 #include "media/base/filter_host_impl.h" | 11 #include "media/base/filter_host_impl.h" |
| 12 #include "media/base/media_format.h" | 12 #include "media/base/media_format.h" |
| 13 #include "media/base/pipeline_impl.h" | 13 #include "media/base/pipeline_impl.h" |
| 14 | 14 |
| 15 namespace media { | 15 namespace media { |
| 16 | 16 |
| (...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 282 rendered_mime_types_.insert(major_mime_type); | 282 rendered_mime_types_.insert(major_mime_type); |
| 283 } | 283 } |
| 284 | 284 |
| 285 | 285 |
| 286 //----------------------------------------------------------------------------- | 286 //----------------------------------------------------------------------------- |
| 287 | 287 |
| 288 PipelineThread::PipelineThread(PipelineImpl* pipeline) | 288 PipelineThread::PipelineThread(PipelineImpl* pipeline) |
| 289 : pipeline_(pipeline), | 289 : pipeline_(pipeline), |
| 290 thread_("PipelineThread"), | 290 thread_("PipelineThread"), |
| 291 time_update_callback_scheduled_(false), | 291 time_update_callback_scheduled_(false), |
| 292 host_initializing_(NULL) { | 292 state_(kCreated) { |
| 293 } | 293 } |
| 294 | 294 |
| 295 PipelineThread::~PipelineThread() { | 295 PipelineThread::~PipelineThread() { |
| 296 Stop(); | 296 Stop(); |
| 297 DCHECK(state_ == kStopped || state_ == kError); | |
| 297 } | 298 } |
| 298 | 299 |
| 299 // This method is called on the client's thread. It starts the pipeline's | 300 // This method is called on the client's thread. It starts the pipeline's |
| 300 // dedicated thread and posts a task to call the StartTask method on that | 301 // dedicated thread and posts a task to call the StartTask() method on that |
| 301 // thread. | 302 // thread. |
| 302 bool PipelineThread::Start(FilterFactory* filter_factory, | 303 bool PipelineThread::Start(FilterFactory* filter_factory, |
| 303 const std::string& url, | 304 const std::string& url, |
| 304 PipelineCallback* init_complete_callback) { | 305 PipelineCallback* init_complete_callback) { |
| 306 DCHECK_EQ(kCreated, state_); | |
| 305 if (thread_.Start()) { | 307 if (thread_.Start()) { |
| 306 filter_factory->AddRef(); | 308 filter_factory_ = filter_factory; |
| 307 PostTask(NewRunnableMethod(this, | 309 url_ = url; |
| 308 &PipelineThread::StartTask, | 310 init_callback_.reset(init_complete_callback); |
| 309 filter_factory, | 311 PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
| 310 url, | |
| 311 // TODO(ralphl): what happens to this callback? | |
| 312 // is it copied by NewRunnableTask? Just pointer | |
| 313 // or is the callback itself copied? | |
| 314 init_complete_callback)); | |
| 315 return true; | 312 return true; |
| 316 } | 313 } |
| 317 return false; | 314 return false; |
| 318 } | 315 } |
| 319 | 316 |
| 320 // Called on the client's thread. If the thread has been started, then posts | 317 // Called on the client's thread. If the thread has been started, then posts |
| 321 // a task to call the StopTask method, then waits until the thread has stopped. | 318 // a task to call the StopTask() method, then waits until the thread has |
| 322 // There is a critical section that wraps the entire duration of the StartTask | 319 // stopped. |
| 323 // method. This method waits for that Lock to be released so that we know | |
| 324 // that the thread is not executing a nested message loop. This way we know | |
| 325 // that that Thread::Stop call will quit the appropriate message loop. | |
| 326 // | |
| 327 // TODO(scherkus): this can potentially deadlock, hack away our lock usage!! | |
| 328 void PipelineThread::Stop() { | 320 void PipelineThread::Stop() { |
| 329 if (thread_.IsRunning()) { | 321 if (thread_.IsRunning()) { |
| 330 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); | 322 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); |
| 331 AutoLock lock_crit(initialization_lock_); | |
| 332 thread_.Stop(); | 323 thread_.Stop(); |
| 333 } | 324 } |
| 334 DCHECK(filter_hosts_.empty()); | 325 DCHECK(filter_hosts_.empty()); |
| 335 DCHECK(filter_threads_.empty()); | 326 DCHECK(filter_threads_.empty()); |
| 336 } | 327 } |
| 337 | 328 |
| 338 // Called on client's thread. | 329 // Called on client's thread. |
| 339 void PipelineThread::SetPlaybackRate(float rate) { | 330 void PipelineThread::SetPlaybackRate(float rate) { |
| 340 PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); | 331 PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); |
| 341 } | 332 } |
| 342 | 333 |
| 343 // Called on client's thread. | 334 // Called on client's thread. |
| 344 void PipelineThread::Seek(base::TimeDelta time, | 335 void PipelineThread::Seek(base::TimeDelta time, |
| 345 PipelineCallback* seek_callback) { | 336 PipelineCallback* seek_callback) { |
| 346 PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time, | 337 PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time, |
| 347 seek_callback)); | 338 seek_callback)); |
| 348 } | 339 } |
| 349 | 340 |
| 350 // Called on client's thread. | 341 // Called on client's thread. |
| 351 void PipelineThread::SetVolume(float volume) { | 342 void PipelineThread::SetVolume(float volume) { |
| 352 PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); | 343 PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); |
| 353 } | 344 } |
| 354 | 345 |
| 355 // May be called on any thread, and therefore we always assume the worst | |
| 356 // possible race condition. This could, for example, be called from a filter's | |
| 357 // thread just as the pipeline thread is exiting the call to the filter's | |
| 358 // Initialize() method. Therefore, we make NO assumptions, and post work | |
| 359 // in every case, even the trivial one of a thread calling this method from | |
| 360 // within it's Initialize method. This means that we will always run a nested | |
| 361 // message loop, and the InitializationCompleteTask will Quit that loop | |
| 362 // immediately in the trivial case. | |
| 363 void PipelineThread::InitializationComplete(FilterHostImpl* host) { | 346 void PipelineThread::InitializationComplete(FilterHostImpl* host) { |
| 364 DCHECK(host == host_initializing_); | 347 if (IsPipelineOk()) { |
| 365 PostTask(NewRunnableMethod(this, | 348 // Continue the start task by proceeding to the next stage. |
| 366 &PipelineThread::InitializationCompleteTask, | 349 PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); |
| 367 host)); | 350 } |
| 368 } | 351 } |
| 369 | 352 |
| 370 // Called from any thread. Updates the pipeline time and schedules a task to | 353 // Called from any thread. Updates the pipeline time and schedules a task to |
| 371 // call back to filters that have registered a callback for time updates. | 354 // call back to filters that have registered a callback for time updates. |
| 372 void PipelineThread::SetTime(base::TimeDelta time) { | 355 void PipelineThread::SetTime(base::TimeDelta time) { |
| 373 pipeline()->SetTime(time); | 356 pipeline()->SetTime(time); |
| 374 if (!time_update_callback_scheduled_) { | 357 if (!time_update_callback_scheduled_) { |
| 375 time_update_callback_scheduled_ = true; | 358 time_update_callback_scheduled_ = true; |
| 376 PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask)); | 359 PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask)); |
| 377 } | 360 } |
| 378 } | 361 } |
| 379 | 362 |
| 380 // Called from any thread. Sets the pipeline error_ member and schedules a | 363 // Called from any thread. Sets the pipeline |error_| member and schedules a |
| 381 // task to stop all the filters in the pipeline. Note that the thread will | 364 // task to stop all the filters in the pipeline. Note that the thread will |
| 382 // continue to run until the client calls Pipeline::Stop, but nothing will | 365 // continue to run until the client calls Pipeline::Stop(), but nothing will |
| 383 // be processed since filters will not be able to post tasks. | 366 // be processed since filters will not be able to post tasks. |
| 384 void PipelineThread::Error(PipelineError error) { | 367 void PipelineThread::Error(PipelineError error) { |
| 385 // If this method returns false, then an error has already happened, so no | 368 // If this method returns false, then an error has already happened, so no |
| 386 // reason to run the StopTask again. It's going to happen. | 369 // reason to run the StopTask again. It's going to happen. |
| 387 if (pipeline()->InternalSetError(error)) { | 370 if (pipeline()->InternalSetError(error)) { |
| 388 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); | 371 PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); |
| 389 } | 372 } |
| 390 } | 373 } |
| 391 | 374 |
| 392 // Called from any thread. Used by FilterHostImpl::PostTask method and used | 375 // This is a helper method to post task on message_loop(). This method is only |
| 393 // internally. | 376 // called from this class or from Pipeline. |
| 394 void PipelineThread::PostTask(Task* task) { | 377 void PipelineThread::PostTask(Task* task) { |
| 395 message_loop()->PostTask(FROM_HERE, task); | 378 message_loop()->PostTask(FROM_HERE, task); |
| 396 } | 379 } |
| 397 | 380 |
| 398 | 381 |
| 399 // Main initialization method called on the pipeline thread. This code attempts | 382 // Main initialization method called on the pipeline thread. This code attempts |
| 400 // to use the specified filter factory to build a pipeline. It starts by | 383 // to use the specified filter factory to build a pipeline. |
| 401 // creating a DataSource, connects it to a Demuxer, and then connects the | 384 // Initialization step performed in this method depends on current state of this |
| 402 // Demuxer's audio stream to an AudioDecoder which is then connected to an | 385 // object, indicated by |state_|. After each step of initialization, this |
| 403 // AudioRenderer. If the media has video, then it connects a VideoDecoder to | 386 // object transits to the next stage. It starts by creating a DataSource, |
| 404 // the Demuxer's video stream, and then connects the VideoDecoder to a | 387 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an |
| 405 // VideoRenderer. When all required filters have been created and have called | 388 // AudioDecoder which is then connected to an AudioRenderer. If the media has |
| 406 // their FilterHost's InitializationComplete method, the pipeline's | 389 // video, then it connects a VideoDecoder to the Demuxer's video stream, and |
| 407 // initialized_ member is set to true, and, if the client provided an | 390 // then connects the VideoDecoder to a VideoRenderer. |
| 408 // init_complete_callback, it is called with "true". | 391 // |
| 409 // If initializatoin fails, the client's callback will still be called, but | 392 // When all required filters have been created and have called their |
| 393 // FilterHost's InitializationComplete method, the pipeline's |initialized_| | |
| 394 // member is set to true, and, if the client provided an | |
| 395 // |init_complete_callback_|, it is called with "true". | |
| 396 // | |
| 397 // If initialization fails, the client's callback will still be called, but | |
| 410 // the bool parameter passed to it will be false. | 398 // the bool parameter passed to it will be false. |
| 411 // | 399 // |
| 412 // Note that at each step in this process, the initialization of any filter | 400 // TODO(hclam): StartTask is now starting the pipeline asynchronous. It |
|
awong
2009/07/01 23:39:46
asynchronous -> asynchronously
| |
| 413 // may require running the pipeline thread's message loop recursively. This is | 401 // works like a big state change table. If we no longer need to start filters |
| 414 // handled by the CreateFilter method. | 402 // in order, we need to get rid of all the state change. |
| 415 void PipelineThread::StartTask(FilterFactory* filter_factory, | 403 void PipelineThread::StartTask() { |
| 416 const std::string& url, | 404 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| 417 PipelineCallback* init_complete_callback) { | |
| 418 // During the entire StartTask we hold the initialization_lock_ so that | |
| 419 // if the client calls the Pipeline::Stop method while we are running a | |
| 420 // nested message loop, we can correctly unwind out of it before calling | |
| 421 // the Thread::Stop method. | |
| 422 AutoLock auto_lock(initialization_lock_); | |
| 423 | 405 |
| 424 // Add ourselves as a destruction observer of the thread's message loop so | 406 // If we have received the stop signal, return immediately. |
| 425 // we can delete filters at an appropriate time (when all tasks have been | 407 if (state_ == kStopped) |
| 426 // processed and the thread is about to be destroyed). | 408 return; |
| 427 message_loop()->AddDestructionObserver(this); | |
| 428 | 409 |
| 429 scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url); | 410 DCHECK(state_ == kCreated || IsPipelineInitializing()); |
| 430 if (PipelineOk()) { | 411 |
| 431 scoped_refptr<Demuxer> demuxer = | 412 // Just created, create data source. |
| 432 CreateFilter<Demuxer, DataSource>(filter_factory, data_source); | 413 if (state_ == kCreated) { |
| 433 if (PipelineOk()) { | 414 message_loop()->AddDestructionObserver(this); |
| 434 Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer); | 415 state_ = kInitDataSource; |
| 435 } | 416 CreateDataSource(); |
| 436 if (PipelineOk()) { | 417 return; |
| 437 Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer); | 418 } |
| 419 | |
| 420 // Data source created, create demuxer. | |
| 421 if (state_ == kInitDataSource) { | |
| 422 state_ = kInitDemuxer; | |
| 423 CreateDemuxer(); | |
| 424 return; | |
| 425 } | |
| 426 | |
| 427 // Demuxer created, create audio decoder. | |
| 428 if (state_ == kInitDemuxer) { | |
| 429 state_ = kInitAudioDecoder; | |
| 430 // If this method returns false, then there's no audio stream. | |
| 431 if (CreateDecoder<AudioDecoder>()) | |
| 432 return; | |
| 433 } | |
| 434 | |
| 435 // Assuming audio decoder was created, create audio renderer. | |
| 436 if (state_ == kInitAudioDecoder) { | |
| 437 state_ = kInitAudioRenderer; | |
| 438 // Returns false if there's no audio stream. | |
| 439 if (CreateRenderer<AudioDecoder, AudioRenderer>()) { | |
| 440 pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type()); | |
| 441 return; | |
| 438 } | 442 } |
| 439 } | 443 } |
| 440 | 444 |
| 441 if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) { | 445 // Assuming audio renderer was created, create video decoder. |
| 442 Error(PIPELINE_ERROR_COULD_NOT_RENDER); | 446 if (state_ == kInitAudioRenderer) { |
| 447 // Then perform the stage of initialization, i.e. initialize video decoder. | |
| 448 state_ = kInitVideoDecoder; | |
| 449 if (CreateDecoder<VideoDecoder>()) | |
| 450 return; | |
| 443 } | 451 } |
| 444 | 452 |
| 445 pipeline_->initialized_ = PipelineOk(); | 453 // Assuming video decoder was created, create video renderer. |
| 454 if (state_ == kInitVideoDecoder) { | |
| 455 state_ = kInitVideoRenderer; | |
| 456 if (CreateRenderer<VideoDecoder, VideoRenderer>()) { | |
| 457 pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type()); | |
| 458 return; | |
| 459 } | |
| 460 } | |
| 446 | 461 |
| 447 // No matter what, we're done with the filter factory, and | 462 if (state_ == kInitVideoRenderer) { |
| 448 // client callback so get rid of them. | 463 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { |
| 449 filter_factory->Release(); | 464 Error(PIPELINE_ERROR_COULD_NOT_RENDER); |
| 450 if (init_complete_callback) { | 465 return; |
| 451 init_complete_callback->Run(pipeline_->initialized_); | 466 } |
| 452 delete init_complete_callback; | 467 |
| 468 state_ = kStarted; | |
| 469 pipeline_->initialized_ = true; | |
| 470 filter_factory_ = NULL; | |
| 471 if (init_callback_.get()) { | |
| 472 init_callback_->Run(true); | |
| 473 init_callback_.reset(); | |
| 474 } | |
| 453 } | 475 } |
| 454 } | 476 } |
| 455 | 477 |
| 456 // This method is called as a result of the client calling Pipeline::Stop() or | 478 // This method is called as a result of the client calling Pipeline::Stop() or |
| 457 // as the result of an error condition. If there is no error, then set the | 479 // as the result of an error condition. If there is no error, then set the |
| 458 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the | 480 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the |
| 459 // reverse order. | 481 // reverse order. |
| 460 // | 482 // |
| 461 // TODO(scherkus): beware! this can get posted multiple times since we post | 483 // TODO(scherkus): beware! this can get posted multiple times since we post |
| 462 // Stop() tasks even if we've already stopped. Perhaps this should no-op for | 484 // Stop() tasks even if we've already stopped. Perhaps this should no-op for |
| 463 // additional calls, however most of this logic will be changing. | 485 // additional calls, however most of this logic will be changing. |
| 464 void PipelineThread::StopTask() { | 486 void PipelineThread::StopTask() { |
| 465 if (PipelineOk()) { | 487 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| 488 | |
| 489 if (IsPipelineInitializing()) { | |
| 490 // If IsPipelineOk() is true, the pipeline was simply stopped during | |
| 491 // initialization. Otherwise it is a failure. | |
| 492 state_ = IsPipelineOk() ? kStopped : kError; | |
| 493 filter_factory_ = NULL; | |
| 494 if (init_callback_.get()) { | |
| 495 init_callback_->Run(false); | |
| 496 init_callback_.reset(); | |
| 497 } | |
| 498 } else { | |
| 499 state_ = kStopped; | |
| 500 } | |
| 501 | |
| 502 if (IsPipelineOk()) { | |
| 466 pipeline_->error_ = PIPELINE_STOPPING; | 503 pipeline_->error_ = PIPELINE_STOPPING; |
| 467 } | 504 } |
| 468 | 505 |
| 469 // Stop every filter. | 506 // Stop every filter. |
| 470 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 507 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| 471 iter != filter_hosts_.end(); | 508 iter != filter_hosts_.end(); |
| 472 ++iter) { | 509 ++iter) { |
| 473 (*iter)->Stop(); | 510 (*iter)->Stop(); |
| 474 } | 511 } |
| 475 | 512 |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 511 } | 548 } |
| 512 | 549 |
| 513 // Stop every running filter thread. | 550 // Stop every running filter thread. |
| 514 // | 551 // |
| 515 // TODO(scherkus): can we watchdog this section to detect wedged threads? | 552 // TODO(scherkus): can we watchdog this section to detect wedged threads? |
| 516 for (FilterThreadVector::iterator iter = running_threads.begin(); | 553 for (FilterThreadVector::iterator iter = running_threads.begin(); |
| 517 iter != running_threads.end(); | 554 iter != running_threads.end(); |
| 518 ++iter) { | 555 ++iter) { |
| 519 (*iter)->Stop(); | 556 (*iter)->Stop(); |
| 520 } | 557 } |
| 521 | |
| 522 if (host_initializing_) { | |
| 523 host_initializing_ = NULL; | |
| 524 message_loop()->Quit(); | |
| 525 } | |
| 526 } | |
| 527 | |
| 528 template <class Decoder, class Renderer> | |
| 529 void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) { | |
| 530 DCHECK(PipelineOk()); | |
| 531 const std::string major_mime_type = Decoder::major_mime_type(); | |
| 532 const int num_outputs = demuxer->GetNumberOfStreams(); | |
| 533 for (int i = 0; i < num_outputs; ++i) { | |
| 534 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); | |
| 535 std::string value; | |
| 536 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && | |
| 537 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { | |
| 538 scoped_refptr<Decoder> decoder = | |
| 539 CreateFilter<Decoder, DemuxerStream>(filter_factory, stream); | |
| 540 if (PipelineOk()) { | |
| 541 DCHECK(decoder); | |
| 542 CreateFilter<Renderer, Decoder>(filter_factory, decoder); | |
| 543 } | |
| 544 if (PipelineOk()) { | |
| 545 pipeline_->InsertRenderedMimeType(major_mime_type); | |
| 546 } | |
| 547 break; | |
| 548 } | |
| 549 } | |
| 550 } | |
| 551 | |
| 552 | |
| 553 // Task runs as a result of a filter calling InitializationComplete. If for | |
| 554 // some reason StopTask has been executed prior to this, the host_initializing_ | |
| 555 // member will be NULL, and the message loop will have been quit already, so | |
| 556 // we don't want to do it again. | |
| 557 void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) { | |
| 558 if (host == host_initializing_) { | |
| 559 host_initializing_ = NULL; | |
| 560 message_loop()->Quit(); | |
| 561 } else { | |
| 562 DCHECK(!host_initializing_); | |
| 563 } | |
| 564 } | 558 } |
| 565 | 559 |
| 566 void PipelineThread::SetPlaybackRateTask(float rate) { | 560 void PipelineThread::SetPlaybackRateTask(float rate) { |
| 561 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 562 | |
| 567 pipeline_->InternalSetPlaybackRate(rate); | 563 pipeline_->InternalSetPlaybackRate(rate); |
| 568 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 564 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| 569 iter != filter_hosts_.end(); | 565 iter != filter_hosts_.end(); |
| 570 ++iter) { | 566 ++iter) { |
| 571 (*iter)->media_filter()->SetPlaybackRate(rate); | 567 (*iter)->media_filter()->SetPlaybackRate(rate); |
| 572 } | 568 } |
| 573 } | 569 } |
| 574 | 570 |
| 575 void PipelineThread::SeekTask(base::TimeDelta time, | 571 void PipelineThread::SeekTask(base::TimeDelta time, |
| 576 PipelineCallback* seek_callback) { | 572 PipelineCallback* seek_callback) { |
| 573 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 574 | |
| 577 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 575 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| 578 iter != filter_hosts_.end(); | 576 iter != filter_hosts_.end(); |
| 579 ++iter) { | 577 ++iter) { |
| 580 (*iter)->media_filter()->Seek(time); | 578 (*iter)->media_filter()->Seek(time); |
| 581 } | 579 } |
| 582 | 580 |
| 583 // TODO(hclam): we should set the time when the above seek operations were all | 581 // TODO(hclam): we should set the time when the above seek operations were all |
| 584 // successful and first frame/packet at the desired time is decoded. I'm | 582 // successful and first frame/packet at the desired time is decoded. I'm |
| 585 // setting the time here because once we do the callback the user can ask for | 583 // setting the time here because once we do the callback the user can ask for |
| 586 // current time immediately, which is the old time. In order to get rid this | 584 // current time immediately, which is the old time. In order to get rid this |
| 587 // little glitch, we either assume the seek was successful and time is updated | 585 // little glitch, we either assume the seek was successful and time is updated |
| 588 // immediately here or we set time and do callback when we have new | 586 // immediately here or we set time and do callback when we have new |
| 589 // frames/packets. | 587 // frames/packets. |
| 590 SetTime(time); | 588 SetTime(time); |
| 591 if (seek_callback) { | 589 if (seek_callback) { |
| 592 seek_callback->Run(true); | 590 seek_callback->Run(true); |
| 593 delete seek_callback; | 591 delete seek_callback; |
| 594 } | 592 } |
| 595 } | 593 } |
| 596 | 594 |
| 597 void PipelineThread::SetVolumeTask(float volume) { | 595 void PipelineThread::SetVolumeTask(float volume) { |
| 596 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 597 | |
| 598 pipeline_->volume_ = volume; | 598 pipeline_->volume_ = volume; |
| 599 scoped_refptr<AudioRenderer> audio_renderer; | 599 scoped_refptr<AudioRenderer> audio_renderer; |
| 600 GetFilter(&audio_renderer); | 600 GetFilter(&audio_renderer); |
| 601 if (audio_renderer) { | 601 if (audio_renderer) { |
| 602 audio_renderer->SetVolume(volume); | 602 audio_renderer->SetVolume(volume); |
| 603 } | 603 } |
| 604 } | 604 } |
| 605 | 605 |
| 606 void PipelineThread::SetTimeTask() { | 606 void PipelineThread::SetTimeTask() { |
| 607 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 608 | |
| 607 time_update_callback_scheduled_ = false; | 609 time_update_callback_scheduled_ = false; |
| 608 for (FilterHostVector::iterator iter = filter_hosts_.begin(); | 610 for (FilterHostVector::iterator iter = filter_hosts_.begin(); |
| 609 iter != filter_hosts_.end(); | 611 iter != filter_hosts_.end(); |
| 610 ++iter) { | 612 ++iter) { |
| 611 (*iter)->RunTimeUpdateCallback(pipeline_->time_); | 613 (*iter)->RunTimeUpdateCallback(pipeline_->time_); |
| 612 } | 614 } |
| 613 } | 615 } |
| 614 | 616 |
| 615 template <class Filter> | 617 template <class Filter> |
| 616 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { | 618 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { |
| 619 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 620 | |
| 617 *filter_out = NULL; | 621 *filter_out = NULL; |
| 618 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); | 622 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); |
| 619 iter != filter_hosts_.end() && NULL == *filter_out; | 623 iter != filter_hosts_.end() && NULL == *filter_out; |
| 620 iter++) { | 624 iter++) { |
| 621 (*iter)->GetFilter(filter_out); | 625 (*iter)->GetFilter(filter_out); |
| 622 } | 626 } |
| 623 } | 627 } |
| 624 | 628 |
| 625 template <class Filter, class Source> | 629 template <class Filter, class Source> |
| 626 scoped_refptr<Filter> PipelineThread::CreateFilter( | 630 void PipelineThread::CreateFilter(FilterFactory* filter_factory, |
| 627 FilterFactory* filter_factory, | 631 Source source, |
| 628 Source source, | 632 const MediaFormat& media_format) { |
| 629 const MediaFormat& media_format) { | 633 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| 630 DCHECK(PipelineOk()); | 634 DCHECK(IsPipelineOk()); |
| 635 | |
| 631 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); | 636 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); |
| 632 if (!filter) { | 637 if (!filter) { |
| 633 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); | 638 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); |
| 634 } else { | 639 } else { |
| 635 DCHECK(!host_initializing_); | 640 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); |
| 636 host_initializing_ = new FilterHostImpl(this, filter.get()); | 641 // Create a dedicated thread for this filter. |
| 637 if (NULL == host_initializing_) { | 642 if (SupportsSetMessageLoop<Filter>()) { |
| 638 Error(PIPELINE_ERROR_OUT_OF_MEMORY); | 643 // TODO(scherkus): figure out a way to name these threads so it matches |
| 639 } else { | 644 // the filter type. |
| 640 // Create a dedicated thread for this filter. | 645 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); |
| 641 if (SupportsSetMessageLoop<Filter>()) { | 646 if (!thread.get() || !thread->Start()) { |
| 642 // TODO(scherkus): figure out a way to name these threads so it matches | 647 NOTREACHED() << "Could not start filter thread"; |
| 643 // the filter type. | 648 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
| 644 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); | 649 } else { |
| 645 if (!thread.get() || !thread->Start()) { | 650 filter->SetMessageLoop(thread->message_loop()); |
| 646 NOTREACHED() << "Could not start filter thread"; | 651 filter_threads_.push_back(thread.release()); |
| 647 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); | |
| 648 } else { | |
| 649 filter->SetMessageLoop(thread->message_loop()); | |
| 650 filter_threads_.push_back(thread.release()); | |
| 651 } | |
| 652 } | 652 } |
| 653 } | |
| 653 | 654 |
| 654 // Creating a thread could have failed, verify we're still OK. | 655 // Creating a thread could have failed, verify we're still OK. |
| 655 if (PipelineOk()) { | 656 if (IsPipelineOk()) { |
| 656 filter_hosts_.push_back(host_initializing_); | 657 filter_hosts_.push_back(host.get()); |
| 657 filter->SetFilterHost(host_initializing_); | 658 filter->SetFilterHost(host.release()); |
| 658 if (!filter->Initialize(source)) { | 659 if (!filter->Initialize(source)) { |
| 659 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); | 660 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); |
| 660 } | |
| 661 } | 661 } |
| 662 } | 662 } |
| 663 } | 663 } |
| 664 if (PipelineOk()) { | |
| 665 // Now we run the thread's message loop recursively. We want all | |
| 666 // pending tasks to be processed, so we set nestable tasks to be allowed | |
| 667 // and then run the loop. The only way we exit the loop is as the result | |
| 668 // of a call to FilterHost::InitializationComplete, FilterHost::Error, or | |
| 669 // Pipeline::Stop. In each of these cases, the corresponding task method | |
| 670 // sets host_initializing_ to NULL to signal that the message loop's Quit | |
| 671 // method has already been called, and then calls message_loop()->Quit(). | |
| 672 // The setting of |host_initializing_| to NULL in the task prevents a | |
| 673 // subsequent task from accidentally quitting the wrong (non-nested) loop. | |
| 674 message_loop()->SetNestableTasksAllowed(true); | |
| 675 message_loop()->Run(); | |
| 676 message_loop()->SetNestableTasksAllowed(false); | |
| 677 DCHECK(!host_initializing_); | |
| 678 } else { | |
| 679 // This could still be set if we never ran the message loop (for example, | |
| 680 // if the fiter returned false from it's Initialize() method), so make sure | |
| 681 // to reset it. | |
| 682 host_initializing_ = NULL; | |
| 683 } | |
| 684 if (!PipelineOk()) { | |
| 685 filter = NULL; | |
| 686 } | |
| 687 return filter; | |
| 688 } | 664 } |
| 689 | 665 |
| 690 scoped_refptr<DataSource> PipelineThread::CreateDataSource( | 666 void PipelineThread::CreateDataSource() { |
| 691 FilterFactory* filter_factory, const std::string& url) { | 667 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); |
| 668 DCHECK(IsPipelineOk()); | |
| 669 | |
| 692 MediaFormat url_format; | 670 MediaFormat url_format; |
| 693 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); | 671 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); |
| 694 url_format.SetAsString(MediaFormat::kURL, url); | 672 url_format.SetAsString(MediaFormat::kURL, url_); |
| 695 return CreateFilter<DataSource>(filter_factory, url, url_format); | 673 CreateFilter<DataSource>(filter_factory_, url_, url_format); |
| 674 } | |
| 675 | |
| 676 void PipelineThread::CreateDemuxer() { | |
| 677 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 678 DCHECK(IsPipelineOk()); | |
| 679 | |
| 680 scoped_refptr<DataSource> data_source; | |
| 681 GetFilter(&data_source); | |
| 682 DCHECK(data_source); | |
| 683 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); | |
| 684 } | |
| 685 | |
| 686 template <class Decoder> | |
| 687 bool PipelineThread::CreateDecoder() { | |
| 688 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 689 DCHECK(IsPipelineOk()); | |
| 690 | |
| 691 scoped_refptr<Demuxer> demuxer; | |
| 692 GetFilter(&demuxer); | |
| 693 DCHECK(demuxer); | |
| 694 | |
| 695 const std::string major_mime_type = Decoder::major_mime_type(); | |
| 696 const int num_outputs = demuxer->GetNumberOfStreams(); | |
| 697 for (int i = 0; i < num_outputs; ++i) { | |
| 698 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); | |
| 699 std::string value; | |
| 700 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && | |
| 701 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { | |
| 702 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); | |
| 703 return true; | |
| 704 } | |
| 705 } | |
| 706 return false; | |
| 707 } | |
| 708 | |
| 709 template <class Decoder, class Renderer> | |
| 710 bool PipelineThread::CreateRenderer() { | |
| 711 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); | |
| 712 DCHECK(IsPipelineOk()); | |
| 713 | |
| 714 scoped_refptr<Decoder> decoder; | |
| 715 GetFilter(&decoder); | |
| 716 | |
| 717 if (decoder) { | |
| 718 // If the decoder was created. | |
| 719 const std::string major_mime_type = Decoder::major_mime_type(); | |
| 720 CreateFilter<Renderer, Decoder>(filter_factory_, decoder); | |
| 721 return true; | |
| 722 } | |
| 723 return false; | |
| 696 } | 724 } |
| 697 | 725 |
| 698 // Called as a result of destruction of the thread. | 726 // Called as a result of destruction of the thread. |
| 699 // | 727 // |
| 700 // TODO(scherkus): this can block the client due to synchronous Stop() API call. | 728 // TODO(scherkus): this can block the client due to synchronous Stop() API call. |
| 701 void PipelineThread::WillDestroyCurrentMessageLoop() { | 729 void PipelineThread::WillDestroyCurrentMessageLoop() { |
| 702 STLDeleteElements(&filter_hosts_); | 730 STLDeleteElements(&filter_hosts_); |
| 703 STLDeleteElements(&filter_threads_); | 731 STLDeleteElements(&filter_threads_); |
| 704 } | 732 } |
| 705 | 733 |
| 706 } // namespace media | 734 } // namespace media |
| OLD | NEW |