Chromium Code Reviews| Index: media/base/composite_filter.cc |
| diff --git a/media/base/composite_filter.cc b/media/base/composite_filter.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..c29d74a5710eaa0a7f9fd4fd8a829ce6894f953c |
| --- /dev/null |
| +++ b/media/base/composite_filter.cc |
| @@ -0,0 +1,567 @@ |
| +// Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "base/stl_util-inl.h" |
| +#include "media/base/callback.h" |
| +#include "media/base/composite_filter.h" |
|
scherkus (not reviewing)
2010/12/15 16:44:02
include corresponding .h first followed by blank l
acolwell GONE FROM CHROMIUM
2010/12/15 18:20:11
Done.
|
| + |
| +namespace media { |
| + |
| +CompositeFilter::CompositeFilter(MessageLoop* message_loop) |
| +{ |
|
scherkus (not reviewing)
2010/12/15 16:44:02
style: { on previous line
acolwell GONE FROM CHROMIUM
2010/12/15 18:20:11
Done.
|
| + Init(message_loop, NULL); |
| +} |
| + |
| +CompositeFilter::CompositeFilter(MessageLoop* message_loop, |
| + ThreadFactoryFunction thread_factory) |
| +{ |
|
scherkus (not reviewing)
2010/12/15 16:44:02
style: { on previous line
acolwell GONE FROM CHROMIUM
2010/12/15 18:20:11
Done.
|
| + DCHECK(thread_factory); |
| + Init(message_loop, thread_factory); |
| +} |
| + |
| +void CompositeFilter::Init(MessageLoop* message_loop, |
| + ThreadFactoryFunction thread_factory) { |
| + DCHECK(message_loop); |
| + message_loop_ = message_loop; |
| + thread_factory_ = thread_factory; |
| + |
| + if (!thread_factory_) { |
| + thread_factory_ = &CompositeFilter::DefaultThreadFactory; |
| + } |
| + |
| + state_ = kCreated; |
| + sequence_index_ = -1; |
| + host_ = NULL; |
| + error_ = PIPELINE_OK; |
| +} |
| + |
| +CompositeFilter::~CompositeFilter() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + DCHECK(state_ == kCreated || state_ == kStopped); |
| + |
| + // Stop every running filter thread. |
| + for (FilterThreadVector::iterator iter = filter_threads_.begin(); |
| + iter != filter_threads_.end(); |
| + ++iter) { |
| + (*iter)->Stop(); |
| + } |
| + |
| + // Reset the pipeline, which will decrement a reference to this object. |
| + // We will get destroyed as soon as the remaining tasks finish executing. |
| + // To be safe, we'll set our pipeline reference to NULL. |
| + filters_.clear(); |
| + STLDeleteElements(&filter_threads_); |
| +} |
| + |
| +bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + if (!filter.get() || state_ != kCreated || !host_) |
| + return false; |
| + |
| + // Create a dedicated thread for this filter if applicable. |
| + if (filter->requires_message_loop()) { |
| + scoped_ptr<base::Thread> thread( |
| + thread_factory_(filter->message_loop_name())); |
| + |
| + if (!thread.get() || !thread->Start()) { |
| + return false; |
| + } |
| + |
| + filter->set_message_loop(thread->message_loop()); |
| + filter_threads_.push_back(thread.release()); |
| + } |
| + |
| + // Register ourselves as the filter's host. |
| + filter->set_host(this); |
| + filters_.push_back(make_scoped_refptr(filter.get())); |
| + return true; |
| +} |
| + |
| +const char* CompositeFilter::major_mime_type() const { |
| + return ""; |
| +} |
| + |
| +void CompositeFilter::set_host(FilterHost* host) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + DCHECK(host); |
| + DCHECK(!host_); |
| + host_ = host; |
| +} |
| + |
| +FilterHost* CompositeFilter::host() { |
| + return host_; |
| +} |
| + |
| +bool CompositeFilter::requires_message_loop() const { |
| + return false; |
| +} |
| + |
| +const char* CompositeFilter::message_loop_name() const { |
| + return "CompositeFilter"; |
| +} |
| + |
| +void CompositeFilter::set_message_loop(MessageLoop* message_loop) { |
| + NOTREACHED() << "Message loop should not be set."; |
| +} |
| + |
| +MessageLoop* CompositeFilter::message_loop() { |
| + return NULL; |
| +} |
| + |
| +void CompositeFilter::Play(FilterCallback* play_callback) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + scoped_ptr<FilterCallback> callback(play_callback); |
| + if (callback_.get()) { |
| + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| + callback->Run(); |
| + return; |
| + } else if (state_ == kPlaying) { |
| + callback->Run(); |
| + return; |
| + } else if (!host_ || (state_ != kPaused && state_ != kCreated)) { |
| + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| + callback->Run(); |
| + return; |
| + } |
| + |
| + ChangeState(kPlayPending); |
| + callback_.reset(callback.release()); |
| + StartSerialCallSequence(); |
| +} |
| + |
| +void CompositeFilter::Pause(FilterCallback* pause_callback) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + scoped_ptr<FilterCallback> callback(pause_callback); |
| + if (callback_.get()) { |
| + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| + callback->Run(); |
| + return; |
| + } else if (state_ == kPaused) { |
| + callback->Run(); |
| + return; |
| + } else if (!host_ || state_ != kPlaying) { |
| + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| + callback->Run(); |
| + return; |
| + } |
| + |
| + ChangeState(kPausePending); |
| + callback_.reset(callback.release()); |
| + StartSerialCallSequence(); |
| +} |
| + |
| +void CompositeFilter::Flush(FilterCallback* flush_callback) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + scoped_ptr<FilterCallback> callback(flush_callback); |
| + if (callback_.get()) { |
| + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| + callback->Run(); |
| + return; |
| + } else if (!host_ || (state_ != kCreated && state_ != kPaused)) { |
| + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| + callback->Run(); |
| + return; |
| + } |
| + |
| + ChangeState(kFlushPending); |
| + callback_.reset(callback.release()); |
| + StartParallelCallSequence(); |
| +} |
| + |
| +void CompositeFilter::Stop(FilterCallback* stop_callback) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + scoped_ptr<FilterCallback> callback(stop_callback); |
| + if (!host_) { |
| + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| + callback->Run(); |
| + return; |
| + } else if (state_ == kStopped) { |
| + callback->Run(); |
| + return; |
| + } |
| + |
| + switch(state_) { |
| + case kError: |
| + case kCreated: |
| + case kPaused: |
| + case kPlaying: |
| + ChangeState(kStopPending); |
| + break; |
| + case kPlayPending: |
| + ChangeState(kStopWhilePlayPending); |
| + break; |
| + case kPausePending: |
| + ChangeState(kStopWhilePausePending); |
| + break; |
| + case kFlushPending: |
| + ChangeState(kStopWhileFlushPending); |
| + break; |
| + case kSeekPending: |
| + ChangeState(kStopWhileSeekPending); |
| + break; |
| + default: |
| + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| + callback->Run(); |
| + return; |
| + } |
| + |
| + callback_.reset(callback.release()); |
| + if (state_ == kStopPending) { |
| + StartSerialCallSequence(); |
| + } |
| +} |
| + |
| +void CompositeFilter::SetPlaybackRate(float playback_rate) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + for (FilterVector::iterator iter = filters_.begin(); |
| + iter != filters_.end(); |
| + ++iter) { |
| + (*iter)->SetPlaybackRate(playback_rate); |
| + } |
| +} |
| + |
| +void CompositeFilter::Seek(base::TimeDelta time, |
| + FilterCallback* seek_callback) { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + scoped_ptr<FilterCallback> callback(seek_callback); |
| + if (callback_.get()) { |
| + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| + callback->Run(); |
| + return; |
| + } else if (!host_ || (state_ != kPaused && state_ != kCreated)) { |
| + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| + callback->Run(); |
| + return; |
| + } |
| + |
| + ChangeState(kSeekPending); |
| + callback_.reset(callback.release()); |
| + pending_seek_time_ = time; |
| + StartSerialCallSequence(); |
| +} |
| + |
| +void CompositeFilter::OnAudioRendererDisabled() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + for (FilterVector::iterator iter = filters_.begin(); |
| + iter != filters_.end(); |
| + ++iter) { |
| + (*iter)->OnAudioRendererDisabled(); |
| + } |
| +} |
| + |
| +base::Thread* CompositeFilter::DefaultThreadFactory( |
| + const char* thread_name) { |
| + return new base::Thread(thread_name); |
| +} |
| + |
| +void CompositeFilter::ChangeState(State new_state) { |
| + state_ = new_state; |
| +} |
| + |
| +void CompositeFilter::StartSerialCallSequence() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + error_ = PIPELINE_OK; |
| + |
| + if (filters_.size() > 0) { |
| + sequence_index_ = 0; |
| + CallFilter(filters_[sequence_index_], |
| + NewThreadSafeCallback(&CompositeFilter::SerialCallback)); |
| + } else { |
| + sequence_index_ = -1; |
| + SerialCallback(); |
| + } |
| +} |
| + |
| +void CompositeFilter::StartParallelCallSequence() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + error_ = PIPELINE_OK; |
| + |
| + if (filters_.size() > 0) { |
| + sequence_index_ = 0; |
| + for (size_t i = 0; i < filters_.size(); i++) |
| + CallFilter(filters_[i], |
|
scherkus (not reviewing)
2010/12/15 16:44:02
I get freaked out when I see multi-line for/while
acolwell GONE FROM CHROMIUM
2010/12/15 18:20:11
Done.
|
| + NewThreadSafeCallback(&CompositeFilter::ParallelCallback)); |
| + } else { |
| + sequence_index_ = -1; |
| + ParallelCallback(); |
| + } |
| +} |
| + |
| +void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter, |
| + FilterCallback* callback) { |
| + switch(state_) { |
| + case kPlayPending: |
| + filter->Play(callback); |
| + break; |
| + case kPausePending: |
| + filter->Pause(callback); |
| + break; |
| + case kFlushPending: |
| + filter->Flush(callback); |
| + break; |
| + case kStopPending: |
| + filter->Stop(callback); |
| + break; |
| + case kSeekPending: |
| + filter->Seek(pending_seek_time_, callback); |
| + break; |
| + default: |
| + delete callback; |
| + ChangeState(kError); |
| + HandleError(PIPELINE_ERROR_INVALID_STATE); |
| + } |
| +} |
| + |
| +void CompositeFilter::DispatchPendingCallback() { |
| + if (callback_.get()) { |
| + scoped_ptr<FilterCallback> callback(callback_.release()); |
| + callback->Run(); |
| + } |
| +} |
| + |
| +CompositeFilter::State CompositeFilter::GetNextState(State state) const { |
| + State ret = kInvalid; |
| + switch (state) { |
| + case kPlayPending: |
| + ret = kPlaying; |
| + break; |
| + case kPausePending: |
| + ret = kPaused; |
| + case kFlushPending: |
| + ret = kPaused; |
| + break; |
| + case kStopPending: |
| + ret = kStopped; |
| + break; |
| + case kSeekPending: |
| + ret = kPaused; |
| + break; |
| + case kStopWhilePlayPending: |
| + case kStopWhilePausePending: |
| + case kStopWhileFlushPending: |
| + case kStopWhileSeekPending: |
| + ret = kStopPending; |
| + break; |
| + |
| + case kInvalid: |
| + case kCreated: |
| + case kPlaying: |
| + case kPaused: |
| + case kStopped: |
| + case kError: |
| + ret = kInvalid; |
| + break; |
| + |
| + // default: intentionally left out to catch missing states. |
| + } |
| + |
| + return ret; |
| +} |
| + |
| +void CompositeFilter::SerialCallback() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + if (error_ != PIPELINE_OK) { |
| + // We encountered an error. Terminate the sequence now. |
| + ChangeState(kError); |
| + HandleError(error_); |
| + return; |
| + } |
| + |
| + sequence_index_++; |
| + if (sequence_index_ == filters_.size()) { |
| + // All filters have been successfully called without error. |
| + OnCallSequenceDone(); |
| + } else if (GetNextState(state_) == kStopPending) { |
| + // Abort sequence early and start issuing Stop() calls. |
| + ChangeState(kStopPending); |
| + StartSerialCallSequence(); |
| + } else { |
| + // We aren't done with the sequence. Call the next filter. |
| + CallFilter(filters_[sequence_index_], |
| + NewThreadSafeCallback(&CompositeFilter::SerialCallback)); |
| + } |
| +} |
| + |
| +void CompositeFilter::ParallelCallback() { |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + sequence_index_++; |
| + if (sequence_index_ == filters_.size()) { |
| + if (error_ != PIPELINE_OK) { |
| + // We encountered an error. |
| + ChangeState(kError); |
| + HandleError(error_); |
| + return; |
| + } |
| + |
| + OnCallSequenceDone(); |
| + } |
| +} |
| + |
| +void CompositeFilter::OnCallSequenceDone() { |
| + State next_state = GetNextState(state_); |
| + |
| + if (next_state == kInvalid) { |
| + // We somehow got into an unexpected state. |
| + ChangeState(kError); |
| + HandleError(PIPELINE_ERROR_INVALID_STATE); |
| + } |
| + |
| + ChangeState(next_state); |
| + |
| + if (state_ == kStopPending) { |
| + // Handle a deferred Stop(). |
| + StartSerialCallSequence(); |
| + } else { |
| + // Call the callback to indicate that the operation has completed. |
| + DispatchPendingCallback(); |
| + } |
| +} |
| + |
| +void CompositeFilter::SendErrorToHost(PipelineError error) { |
| + if (host_) |
| + host_->SetError(error); |
| +} |
| + |
| +void CompositeFilter::HandleError(PipelineError error) { |
| + if (error != PIPELINE_OK) { |
| + SendErrorToHost(error); |
| + } |
| + |
| + DispatchPendingCallback(); |
| +} |
| + |
| +FilterCallback* CompositeFilter::NewThreadSafeCallback( |
| + void (CompositeFilter::*method)()) { |
|
scherkus (not reviewing)
2010/12/15 16:44:02
de-indent by 2
acolwell GONE FROM CHROMIUM
2010/12/15 18:20:11
Done.
|
| + return TaskToCallbackAdapter::NewCallback( |
| + NewRunnableMethod(this, |
| + &CompositeFilter::OnCallback, |
| + message_loop_, |
| + method)); |
| +} |
| + |
| +void CompositeFilter::OnCallback(MessageLoop* message_loop, |
| + void (CompositeFilter::*method)()) { |
| + if (MessageLoop::current() != message_loop) { |
| + // Posting callback to the proper thread. |
| + message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method)); |
| + return; |
| + } |
| + |
| + (this->*method)(); |
| +} |
| + |
| +bool CompositeFilter::CanForwardError() { |
| + return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused); |
| +} |
| + |
| +// media::FilterHost methods. |
| +void CompositeFilter::SetError(PipelineError error) { |
| + if (message_loop_ != MessageLoop::current()) { |
| + message_loop_->PostTask(FROM_HERE, |
| + NewRunnableMethod(this, &CompositeFilter::SetError, error)); |
| + return; |
| + } |
| + |
| + DCHECK_EQ(message_loop_, MessageLoop::current()); |
| + DCHECK(host_); |
| + |
| + // Drop errors recieved while stopping or stopped. |
| + // This shields the owner of this object from having |
| + // to deal with errors it can't do anything about. |
| + if (state_ == kStopPending || state_ == kStopped) |
| + return; |
| + |
| + error_ = error; |
| + if (host_ && CanForwardError()) |
| + host_->SetError(error); |
| +} |
| + |
| +base::TimeDelta CompositeFilter::GetTime() const { |
| + DCHECK(host_); |
| + return host_ ? host_->GetTime() : base::TimeDelta(); |
| +} |
| + |
| +base::TimeDelta CompositeFilter::GetDuration() const { |
| + DCHECK(host_); |
| + return host_ ? host_->GetDuration() : base::TimeDelta(); |
| +} |
| + |
| +void CompositeFilter::SetTime(base::TimeDelta time) { |
| + DCHECK(host_); |
|
scherkus (not reviewing)
2010/12/15 16:44:02
I'd enforce a guarantee that host_ must be present
acolwell GONE FROM CHROMIUM
2010/12/15 18:20:11
Once I move the FilterHost impl to an inner class
scherkus (not reviewing)
2010/12/15 18:57:44
Sounds good!
|
| + if (host_) |
| + host_->SetTime(time); |
| +} |
| + |
| +void CompositeFilter::SetDuration(base::TimeDelta duration) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetDuration(duration); |
| +} |
| + |
| +void CompositeFilter::SetBufferedTime(base::TimeDelta buffered_time) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetBufferedTime(buffered_time); |
| +} |
| + |
| +void CompositeFilter::SetTotalBytes(int64 total_bytes) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetTotalBytes(total_bytes); |
| +} |
| + |
| +void CompositeFilter::SetBufferedBytes(int64 buffered_bytes) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetBufferedBytes(buffered_bytes); |
| +} |
| + |
| +void CompositeFilter::SetVideoSize(size_t width, size_t height) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetVideoSize(width, height); |
| +} |
| + |
| +void CompositeFilter::SetStreaming(bool streaming) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetStreaming(streaming); |
| +} |
| + |
| +void CompositeFilter::NotifyEnded() { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->NotifyEnded(); |
| +} |
| + |
| +void CompositeFilter::SetLoaded(bool loaded) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetLoaded(loaded); |
| +} |
| + |
| +void CompositeFilter::SetNetworkActivity(bool network_activity) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetNetworkActivity(network_activity); |
| +} |
| + |
| +void CompositeFilter::DisableAudioRenderer() { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->DisableAudioRenderer(); |
| +} |
| + |
| +void CompositeFilter::SetCurrentReadPosition(int64 offset) { |
| + DCHECK(host_); |
| + if (host_) |
| + host_->SetCurrentReadPosition(offset); |
| +} |
| + |
| +int64 CompositeFilter::GetCurrentReadPosition() { |
| + DCHECK(host_); |
| + return host_ ? host_->GetCurrentReadPosition() : 0; |
| +} |
| + |
| +} // namespace media |