Index: media/filters/decoder_stream.cc |
diff --git a/media/filters/decoder_stream.cc b/media/filters/decoder_stream.cc |
index 9d9d87cfc5281993b2d1d836b63ecf7f5eb5ff61..4f712be2f2227883e37710fd9ae5a1b6b9bd9751 100644 |
--- a/media/filters/decoder_stream.cc |
+++ b/media/filters/decoder_stream.cc |
@@ -51,6 +51,7 @@ DecoderStream<StreamType>::DecoderStream( |
decoders.Pass(), |
set_decryptor_ready_cb)), |
active_splice_(false), |
+ pending_decode_requests_(0), |
weak_factory_(this) {} |
template <DemuxerStream::Type StreamType> |
@@ -87,36 +88,45 @@ void DecoderStream<StreamType>::Read(const ReadCB& read_cb) { |
FUNCTION_DVLOG(2); |
DCHECK(task_runner_->BelongsToCurrentThread()); |
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || |
- state_ == STATE_ERROR) << state_; |
+ state_ == STATE_ERROR || state_ == STATE_REINITIALIZING_DECODER || |
+ state_ == STATE_PENDING_DEMUXER_READ) |
+ << state_; |
// No two reads in the flight at any time. |
DCHECK(read_cb_.is_null()); |
// No read during resetting or stopping process. |
DCHECK(reset_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
+ read_cb_ = read_cb; |
+ |
if (state_ == STATE_ERROR) { |
- task_runner_->PostTask(FROM_HERE, base::Bind( |
- read_cb, DECODE_ERROR, scoped_refptr<Output>())); |
+ task_runner_->PostTask(FROM_HERE, |
+ base::Bind(base::ResetAndReturn(&read_cb_), |
+ DECODE_ERROR, |
+ scoped_refptr<Output>())); |
return; |
} |
- read_cb_ = read_cb; |
+ if (!ready_outputs_.empty()) { |
+ task_runner_->PostTask(FROM_HERE, base::Bind( |
+ base::ResetAndReturn(&read_cb_), OK, ready_outputs_.front())); |
+ ready_outputs_.pop_front(); |
+ } |
- if (state_ == STATE_FLUSHING_DECODER) { |
- FlushDecoder(); |
+ // Decoder may be in reinitializing state as result of the previous Read(). |
+ if (state_ == STATE_REINITIALIZING_DECODER) |
return; |
- } |
- scoped_refptr<Output> output = decoder_->GetDecodeOutput(); |
+ if (!CanDecodeMore()) |
+ return; |
- // If the decoder has queued output ready to go we don't need a demuxer read. |
- if (output) { |
- task_runner_->PostTask( |
- FROM_HERE, base::Bind(base::ResetAndReturn(&read_cb_), OK, output)); |
+ if (state_ == STATE_FLUSHING_DECODER) { |
+ FlushDecoder(); |
return; |
} |
- ReadFromDemuxerStream(); |
+ if (state_ != STATE_PENDING_DEMUXER_READ) |
+ ReadFromDemuxerStream(); |
} |
template <DemuxerStream::Type StreamType> |
@@ -129,6 +139,13 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) { |
reset_cb_ = closure; |
+ if (!read_cb_.is_null()) { |
+ task_runner_->PostTask(FROM_HERE, base::Bind( |
+ base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); |
+ } |
+ |
+ ready_outputs_.clear(); |
+ |
// During decoder reinitialization, the Decoder does not need to be and |
// cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder |
// reinitialization. |
@@ -141,11 +158,6 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) { |
if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_) |
return; |
- // The Decoder API guarantees that if Decoder::Reset() is called during |
- // a pending decode, the decode callback must be fired before the reset |
- // callback is fired. Therefore, we can call Decoder::Reset() regardless |
- // of if we have a pending decode and always satisfy the reset callback when |
- // the decoder reset is finished. |
if (decrypting_demuxer_stream_) { |
decrypting_demuxer_stream_->Reset(base::Bind( |
&DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr())); |
@@ -175,9 +187,10 @@ void DecoderStream<StreamType>::Stop(const base::Closure& closure) { |
weak_factory_.InvalidateWeakPtrs(); |
// Post callbacks to prevent reentrance into this object. |
- if (!read_cb_.is_null()) |
+ if (!read_cb_.is_null()) { |
task_runner_->PostTask(FROM_HERE, base::Bind( |
base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); |
+ } |
if (!reset_cb_.is_null()) |
task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_)); |
@@ -213,6 +226,24 @@ bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const { |
} |
template <DemuxerStream::Type StreamType> |
+bool DecoderStream<StreamType>::CanDecodeMore() const { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ |
+ // Limit total number of outputs stored in |ready_outputs_| and being decoded. |
+ // It only makes sense to saturate decoder completely when output queue is |
+ // empty. |
+ int num_decodes = |
+ static_cast<int>(ready_outputs_.size()) + pending_decode_requests_; |
+ return num_decodes < decoder_->GetMaxDecodeRequests(); |
+} |
+ |
+template <> |
+bool DecoderStream<DemuxerStream::AUDIO>::CanDecodeMore() const { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ return !pending_decode_requests_ && ready_outputs_.empty(); |
+} |
+ |
+template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::OnDecoderSelected( |
scoped_ptr<Decoder> selected_decoder, |
scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) { |
@@ -255,22 +286,11 @@ void DecoderStream<StreamType>::SatisfyRead( |
} |
template <DemuxerStream::Type StreamType> |
-void DecoderStream<StreamType>::AbortRead() { |
- // Abort read during pending reset. It is safe to fire the |read_cb_| directly |
- // instead of posting it because the renderer won't call into this class |
- // again when it's in kFlushing state. |
- // TODO(xhwang): Improve the resetting process to avoid this dependency on the |
- // caller. |
- DCHECK(!reset_cb_.is_null()); |
- SatisfyRead(ABORTED, NULL); |
-} |
- |
-template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::Decode( |
const scoped_refptr<DecoderBuffer>& buffer) { |
FUNCTION_DVLOG(2); |
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; |
- DCHECK(!read_cb_.is_null()); |
+ DCHECK(CanDecodeMore()); |
DCHECK(reset_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
DCHECK(buffer); |
@@ -278,6 +298,7 @@ void DecoderStream<StreamType>::Decode( |
int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size(); |
TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this); |
+ ++pending_decode_requests_; |
decoder_->Decode(buffer, |
base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, |
weak_factory_.GetWeakPtr(), |
@@ -286,7 +307,8 @@ void DecoderStream<StreamType>::Decode( |
template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::FlushDecoder() { |
- Decode(DecoderBuffer::CreateEOSBuffer()); |
+ if (pending_decode_requests_ == 0) |
+ Decode(DecoderBuffer::CreateEOSBuffer()); |
} |
template <DemuxerStream::Type StreamType> |
@@ -294,28 +316,42 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
int buffer_size, |
typename Decoder::Status status, |
const scoped_refptr<Output>& output) { |
- FUNCTION_DVLOG(2); |
- DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; |
- DCHECK(!read_cb_.is_null()); |
+ FUNCTION_DVLOG(2) << status << " " << output; |
+ DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || |
+ state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) |
+ << state_; |
DCHECK(stop_cb_.is_null()); |
DCHECK_EQ(status == Decoder::kOk, output != NULL); |
+ DCHECK_GT(pending_decode_requests_, 0); |
+ |
+ --pending_decode_requests_; |
TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this); |
+ if (state_ == STATE_ERROR) { |
+ DCHECK(read_cb_.is_null()); |
+ return; |
+ } |
+ |
if (status == Decoder::kDecodeError) { |
state_ = STATE_ERROR; |
- SatisfyRead(DECODE_ERROR, NULL); |
+ ready_outputs_.clear(); |
+ if (!read_cb_.is_null()) |
+ SatisfyRead(DECODE_ERROR, NULL); |
return; |
} |
if (status == Decoder::kDecryptError) { |
state_ = STATE_ERROR; |
- SatisfyRead(DECRYPT_ERROR, NULL); |
+ ready_outputs_.clear(); |
+ if (!read_cb_.is_null()) |
+ SatisfyRead(DECRYPT_ERROR, NULL); |
return; |
} |
if (status == Decoder::kAborted) { |
- SatisfyRead(ABORTED, NULL); |
+ if (!read_cb_.is_null()) |
+ SatisfyRead(ABORTED, NULL); |
return; |
} |
@@ -326,10 +362,8 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
// Drop decoding result if Reset() was called during decoding. |
// The resetting process will be handled when the decoder is reset. |
- if (!reset_cb_.is_null()) { |
- AbortRead(); |
+ if (!reset_cb_.is_null()) |
return; |
- } |
// Decoder flushed. Reinitialize the decoder. |
if (state_ == STATE_FLUSHING_DECODER && |
@@ -347,14 +381,27 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
} |
DCHECK(output); |
- SatisfyRead(OK, output); |
+ |
+ // Store decoded output. |
+ ready_outputs_.push_back(output); |
+ scoped_refptr<Output> extra_output; |
+ while ((extra_output = decoder_->GetDecodeOutput()) != NULL) { |
+ ready_outputs_.push_back(extra_output); |
+ } |
+ |
+ // Satisfy outstanding read request, if any. |
+ if (!read_cb_.is_null()) { |
+ scoped_refptr<Output> read_result = ready_outputs_.front(); |
+ ready_outputs_.pop_front(); |
+ SatisfyRead(OK, output); |
+ } |
} |
template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::ReadFromDemuxerStream() { |
FUNCTION_DVLOG(2); |
DCHECK_EQ(state_, STATE_NORMAL) << state_; |
- DCHECK(!read_cb_.is_null()); |
+ DCHECK(CanDecodeMore()); |
DCHECK(reset_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
@@ -369,11 +416,19 @@ void DecoderStream<StreamType>::OnBufferReady( |
const scoped_refptr<DecoderBuffer>& buffer) { |
FUNCTION_DVLOG(2) << ": " << status; |
DCHECK(task_runner_->BelongsToCurrentThread()); |
- DCHECK_EQ(state_, STATE_PENDING_DEMUXER_READ) << state_; |
+ DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR || |
+ state_ == STATE_STOPPED) |
+ << state_; |
DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status; |
- DCHECK(!read_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
+ // Decoding has been stopped (e.g due to an error). |
+ if (state_ != STATE_PENDING_DEMUXER_READ) { |
+ DCHECK(state_ == STATE_ERROR || state_ == STATE_STOPPED); |
+ DCHECK(read_cb_.is_null()); |
+ return; |
+ } |
+ |
state_ = STATE_NORMAL; |
if (status == DemuxerStream::kConfigChanged) { |
@@ -385,7 +440,6 @@ void DecoderStream<StreamType>::OnBufferReady( |
state_ = STATE_FLUSHING_DECODER; |
if (!reset_cb_.is_null()) { |
- AbortRead(); |
// If we are using DecryptingDemuxerStream, we already called DDS::Reset() |
// which will continue the resetting process in it's callback. |
if (!decrypting_demuxer_stream_) |
@@ -398,7 +452,6 @@ void DecoderStream<StreamType>::OnBufferReady( |
} |
if (!reset_cb_.is_null()) { |
- AbortRead(); |
// If we are using DecryptingDemuxerStream, we already called DDS::Reset() |
// which will continue the resetting process in it's callback. |
if (!decrypting_demuxer_stream_) |
@@ -421,6 +474,10 @@ void DecoderStream<StreamType>::OnBufferReady( |
DCHECK(status == DemuxerStream::kOk) << status; |
Decode(buffer); |
+ |
+ // Read more data if the decoder supports multiple parallel decoding requests. |
+ if (CanDecodeMore() && !buffer->end_of_stream()) |
+ ReadFromDemuxerStream(); |
} |
template <DemuxerStream::Type StreamType> |
@@ -428,6 +485,7 @@ void DecoderStream<StreamType>::ReinitializeDecoder() { |
FUNCTION_DVLOG(2); |
DCHECK(task_runner_->BelongsToCurrentThread()); |
DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_; |
+ DCHECK_EQ(pending_decode_requests_, 0); |
DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig()); |
state_ = STATE_REINITIALIZING_DECODER; |
@@ -455,9 +513,8 @@ void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) { |
state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR; |
if (!reset_cb_.is_null()) { |
- if (!read_cb_.is_null()) |
- AbortRead(); |
base::ResetAndReturn(&reset_cb_).Run(); |
+ return; |
} |
if (read_cb_.is_null()) |