Chromium Code Reviews| Index: media/filters/decoder_stream.cc |
| diff --git a/media/filters/decoder_stream.cc b/media/filters/decoder_stream.cc |
| index c4adc632ba89a897cbc222bae14e3737ff3f93df..b150171591627ecbda7fd5aff908fa6e9057c72b 100644 |
| --- a/media/filters/decoder_stream.cc |
| +++ b/media/filters/decoder_stream.cc |
| @@ -45,12 +45,14 @@ DecoderStream<StreamType>::DecoderStream( |
| const SetDecryptorReadyCB& set_decryptor_ready_cb) |
| : task_runner_(task_runner), |
| state_(STATE_UNINITIALIZED), |
| + error_(OK), |
| stream_(NULL), |
| decoder_selector_( |
| new DecoderSelector<StreamType>(task_runner, |
| decoders.Pass(), |
| set_decryptor_ready_cb)), |
| active_splice_(false), |
| + pending_decode_requests_(0), |
| weak_factory_(this) {} |
| template <DemuxerStream::Type StreamType> |
| @@ -85,36 +87,44 @@ void DecoderStream<StreamType>::Read(const ReadCB& read_cb) { |
| FUNCTION_DVLOG(2); |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || |
| - state_ == STATE_ERROR) << state_; |
| + state_ == STATE_ERROR || state_ == STATE_REINITIALIZING_DECODER || |
| + state_ == STATE_PENDING_DEMUXER_READ) |
| + << state_; |
| // No two reads in the flight at any time. |
| DCHECK(read_cb_.is_null()); |
| // No read during resetting or stopping process. |
| DCHECK(reset_cb_.is_null()); |
| DCHECK(stop_cb_.is_null()); |
| + read_cb_ = read_cb; |
| + |
| if (state_ == STATE_ERROR) { |
| + DCHECK_NE(error_, OK); |
| task_runner_->PostTask(FROM_HERE, base::Bind( |
| - read_cb, DECODE_ERROR, scoped_refptr<Output>())); |
| + base::ResetAndReturn(&read_cb_), error_, scoped_refptr<Output>())); |
| return; |
| } |
| - read_cb_ = read_cb; |
| + if (!ready_outputs_.empty()) { |
| + task_runner_->PostTask(FROM_HERE, base::Bind( |
| + base::ResetAndReturn(&read_cb_), OK, ready_outputs_.front())); |
| + ready_outputs_.pop_front(); |
| + } |
| - if (state_ == STATE_FLUSHING_DECODER) { |
| - FlushDecoder(); |
| + // Decoder may be in reinitializing state as result of the previous Read(). |
| + if (state_ == STATE_REINITIALIZING_DECODER) |
| return; |
| - } |
| - scoped_refptr<Output> output = decoder_->GetDecodeOutput(); |
| + if (!CanDecodeAnotherBuffer()) |
| + return; |
| - // If the decoder has queued output ready to go we don't need a demuxer read. |
| - if (output) { |
| - task_runner_->PostTask( |
| - FROM_HERE, base::Bind(base::ResetAndReturn(&read_cb_), OK, output)); |
| + if (state_ == STATE_FLUSHING_DECODER) { |
| + FlushDecoder(); |
| return; |
| } |
| - ReadFromDemuxerStream(); |
| + if (state_ != STATE_PENDING_DEMUXER_READ) |
| + ReadFromDemuxerStream(); |
| } |
| template <DemuxerStream::Type StreamType> |
| @@ -127,6 +137,13 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) { |
| reset_cb_ = closure; |
| + if (!read_cb_.is_null()) { |
| + task_runner_->PostTask(FROM_HERE, base::Bind( |
| + base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); |
| + } |
| + |
| + ready_outputs_.clear(); |
| + |
| // During decoder reinitialization, the Decoder does not need to be and |
| // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder |
| // reinitialization. |
| @@ -139,11 +156,6 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) { |
| if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_) |
| return; |
| - // The Decoder API guarantees that if Decoder::Reset() is called during |
| - // a pending decode, the decode callback must be fired before the reset |
| - // callback is fired. Therefore, we can call Decoder::Reset() regardless |
| - // of if we have a pending decode and always satisfy the reset callback when |
| - // the decoder reset is finished. |
| if (decrypting_demuxer_stream_) { |
| decrypting_demuxer_stream_->Reset(base::Bind( |
| &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr())); |
| @@ -173,9 +185,10 @@ void DecoderStream<StreamType>::Stop(const base::Closure& closure) { |
| weak_factory_.InvalidateWeakPtrs(); |
| // Post callbacks to prevent reentrance into this object. |
| - if (!read_cb_.is_null()) |
| + if (!read_cb_.is_null()) { |
| task_runner_->PostTask(FROM_HERE, base::Bind( |
| base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); |
| + } |
| if (!reset_cb_.is_null()) |
| task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_)); |
| @@ -211,6 +224,25 @@ bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const { |
| } |
| template <DemuxerStream::Type StreamType> |
| +bool DecoderStream<StreamType>::CanDecodeAnotherBuffer() const { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + |
| + // Limit total number of outputs stored in |ready_outputs_| and being decoded. |
| + // It only makes sense to saturate decoder completely when output queue is |
| + // empty. |
| + // TODO(sergeyu): Is that the best way to throttle decoding requests? |
|
xhwang
2014/04/25 00:36:03
I don't see a better way to do that. The current a
Sergey Ulanov
2014/04/26 00:59:29
removed TODO
|
| + int num_decodes = |
| + static_cast<int>(ready_outputs_.size()) + pending_decode_requests_; |
| + return num_decodes < decoder_->GetMaxDecodeRequests(); |
| +} |
| + |
| +template <> |
| +bool DecoderStream<DemuxerStream::AUDIO>::CanDecodeAnotherBuffer() const { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + return !pending_decode_requests_ && ready_outputs_.empty(); |
| +} |
| + |
| +template <DemuxerStream::Type StreamType> |
| void DecoderStream<StreamType>::OnDecoderSelected( |
| scoped_ptr<Decoder> selected_decoder, |
| scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) { |
| @@ -253,22 +285,11 @@ void DecoderStream<StreamType>::SatisfyRead( |
| } |
| template <DemuxerStream::Type StreamType> |
| -void DecoderStream<StreamType>::AbortRead() { |
| - // Abort read during pending reset. It is safe to fire the |read_cb_| directly |
| - // instead of posting it because the renderer won't call into this class |
| - // again when it's in kFlushing state. |
| - // TODO(xhwang): Improve the resetting process to avoid this dependency on the |
| - // caller. |
| - DCHECK(!reset_cb_.is_null()); |
| - SatisfyRead(ABORTED, NULL); |
| -} |
| - |
| -template <DemuxerStream::Type StreamType> |
| void DecoderStream<StreamType>::Decode( |
| const scoped_refptr<DecoderBuffer>& buffer) { |
| FUNCTION_DVLOG(2); |
| DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; |
| - DCHECK(!read_cb_.is_null()); |
| + DCHECK(CanDecodeAnotherBuffer()); |
| DCHECK(reset_cb_.is_null()); |
| DCHECK(stop_cb_.is_null()); |
| DCHECK(buffer); |
| @@ -276,6 +297,7 @@ void DecoderStream<StreamType>::Decode( |
| int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size(); |
| TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this); |
| + ++pending_decode_requests_; |
| decoder_->Decode(buffer, |
| base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, |
| weak_factory_.GetWeakPtr(), |
| @@ -284,7 +306,8 @@ void DecoderStream<StreamType>::Decode( |
| template <DemuxerStream::Type StreamType> |
| void DecoderStream<StreamType>::FlushDecoder() { |
| - Decode(DecoderBuffer::CreateEOSBuffer()); |
| + if (pending_decode_requests_ == 0) |
|
xhwang
2014/04/25 00:36:03
This also depends on the fact that GVD doesn't hav
Sergey Ulanov
2014/04/26 00:59:29
Added condition in VideoDecoder comment requiring
xhwang
2014/04/29 20:01:26
By "decoding delay", I mean the case where VideoDe
Sergey Ulanov
2014/04/30 18:56:51
Ah, I see your point. Yes, decoders must never wai
|
| + Decode(DecoderBuffer::CreateEOSBuffer()); |
| } |
| template <DemuxerStream::Type StreamType> |
| @@ -292,28 +315,40 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
| int buffer_size, |
| typename Decoder::Status status, |
| const scoped_refptr<Output>& output) { |
| - FUNCTION_DVLOG(2); |
| - DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; |
| - DCHECK(!read_cb_.is_null()); |
| + FUNCTION_DVLOG(2) << status << " " << output; |
| + DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || |
| + state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) |
| + << state_; |
| DCHECK(stop_cb_.is_null()); |
| DCHECK_EQ(status == Decoder::kOk, output != NULL); |
| + DCHECK_GT(pending_decode_requests_, 0); |
| + |
| + --pending_decode_requests_; |
| TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this); |
| + if (state_ == STATE_ERROR) |
|
xhwang
2014/04/25 00:36:03
DCHECK(!read_cb_.is_null())?
Sergey Ulanov
2014/04/26 00:59:29
I think you meant "DCHECK(read_cb_.is_null())"
Do
|
| + return; |
| + |
| if (status == Decoder::kDecodeError) { |
| state_ = STATE_ERROR; |
| - SatisfyRead(DECODE_ERROR, NULL); |
| + error_ = DECODE_ERROR; |
|
xhwang
2014/04/25 00:36:03
Shall we clear the ready_outputs_ in case of error
Sergey Ulanov
2014/04/26 00:59:29
Done.
|
| + if (!read_cb_.is_null()) |
| + SatisfyRead(DECODE_ERROR, NULL); |
| return; |
| } |
| if (status == Decoder::kDecryptError) { |
| state_ = STATE_ERROR; |
| - SatisfyRead(DECRYPT_ERROR, NULL); |
| + error_ = DECRYPT_ERROR; |
|
xhwang
2014/04/25 00:36:03
ditto
Sergey Ulanov
2014/04/26 00:59:29
Done.
|
| + if (!read_cb_.is_null()) |
| + SatisfyRead(DECRYPT_ERROR, NULL); |
| return; |
| } |
| if (status == Decoder::kAborted) { |
| - SatisfyRead(ABORTED, NULL); |
| + if (!read_cb_.is_null()) |
|
xhwang
2014/04/25 00:36:03
kAborted must be a result of Reset(). In Reset() c
Sergey Ulanov
2014/04/26 00:59:29
There are some tests that return kAborted from dec
xhwang
2014/04/29 20:01:26
You are right. kAborted can also be a result of De
|
| + SatisfyRead(ABORTED, NULL); |
| return; |
| } |
| @@ -324,10 +359,8 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
| // Drop decoding result if Reset() was called during decoding. |
| // The resetting process will be handled when the decoder is reset. |
| - if (!reset_cb_.is_null()) { |
| - AbortRead(); |
| + if (!reset_cb_.is_null()) |
| return; |
| - } |
| // Decoder flushed. Reinitialize the decoder. |
| if (state_ == STATE_FLUSHING_DECODER && |
| @@ -345,14 +378,27 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
| } |
| DCHECK(output); |
| - SatisfyRead(OK, output); |
| + |
| + // Store decoded output. |
| + ready_outputs_.push_back(output); |
| + scoped_refptr<Output> extra_output; |
| + while ((extra_output = decoder_->GetDecodeOutput()) != NULL) { |
| + ready_outputs_.push_back(extra_output); |
| + } |
| + |
| + // Satisfy outstanding read request, if any. |
| + if (!read_cb_.is_null()) { |
| + scoped_refptr<Output> read_result = ready_outputs_.front(); |
| + ready_outputs_.pop_front(); |
| + SatisfyRead(OK, output); |
| + } |
| } |
| template <DemuxerStream::Type StreamType> |
| void DecoderStream<StreamType>::ReadFromDemuxerStream() { |
| FUNCTION_DVLOG(2); |
| DCHECK_EQ(state_, STATE_NORMAL) << state_; |
| - DCHECK(!read_cb_.is_null()); |
| + DCHECK(CanDecodeAnotherBuffer()); |
| DCHECK(reset_cb_.is_null()); |
| DCHECK(stop_cb_.is_null()); |
| @@ -367,11 +413,16 @@ void DecoderStream<StreamType>::OnBufferReady( |
| const scoped_refptr<DecoderBuffer>& buffer) { |
| FUNCTION_DVLOG(2) << ": " << status; |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| - DCHECK_EQ(state_, STATE_PENDING_DEMUXER_READ) << state_; |
| + DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR || |
| + state_ == STATE_STOPPED) |
| + << state_; |
| DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status; |
| - DCHECK(!read_cb_.is_null()); |
| DCHECK(stop_cb_.is_null()); |
| + // Decoding has been stopped (e.g due to an error). |
| + if (state_ != STATE_PENDING_DEMUXER_READ) |
|
xhwang
2014/04/25 00:36:03
Can we
DCHECK_EQ(state_, STATE_ERROR);
DCHECK(rea
Sergey Ulanov
2014/04/26 00:59:29
Done. state_ can be either STATE_ERROR or STATE_ST
|
| + return; |
| + |
| state_ = STATE_NORMAL; |
| if (status == DemuxerStream::kConfigChanged) { |
| @@ -383,7 +434,6 @@ void DecoderStream<StreamType>::OnBufferReady( |
| state_ = STATE_FLUSHING_DECODER; |
| if (!reset_cb_.is_null()) { |
| - AbortRead(); |
| // If we are using DecryptingDemuxerStream, we already called DDS::Reset() |
| // which will continue the resetting process in it's callback. |
| if (!decrypting_demuxer_stream_) |
| @@ -396,7 +446,6 @@ void DecoderStream<StreamType>::OnBufferReady( |
| } |
| if (!reset_cb_.is_null()) { |
| - AbortRead(); |
| // If we are using DecryptingDemuxerStream, we already called DDS::Reset() |
| // which will continue the resetting process in it's callback. |
| if (!decrypting_demuxer_stream_) |
| @@ -419,6 +468,10 @@ void DecoderStream<StreamType>::OnBufferReady( |
| DCHECK(status == DemuxerStream::kOk) << status; |
| Decode(buffer); |
| + |
| + // Read more data if the decoder supports multiple parallel decoding requests. |
| + if (CanDecodeAnotherBuffer() && !buffer->end_of_stream()) |
| + ReadFromDemuxerStream(); |
| } |
| template <DemuxerStream::Type StreamType> |
| @@ -426,6 +479,7 @@ void DecoderStream<StreamType>::ReinitializeDecoder() { |
| FUNCTION_DVLOG(2); |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_; |
| + DCHECK_EQ(pending_decode_requests_, 0); |
| DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig()); |
| state_ = STATE_REINITIALIZING_DECODER; |
| @@ -451,9 +505,8 @@ void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) { |
| state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR; |
| if (!reset_cb_.is_null()) { |
| - if (!read_cb_.is_null()) |
| - AbortRead(); |
| base::ResetAndReturn(&reset_cb_).Run(); |
| + return; |
| } |
| if (read_cb_.is_null()) |