Index: media/filters/decoder_stream.cc |
diff --git a/media/filters/decoder_stream.cc b/media/filters/decoder_stream.cc |
index c4adc632ba89a897cbc222bae14e3737ff3f93df..b150171591627ecbda7fd5aff908fa6e9057c72b 100644 |
--- a/media/filters/decoder_stream.cc |
+++ b/media/filters/decoder_stream.cc |
@@ -45,12 +45,14 @@ DecoderStream<StreamType>::DecoderStream( |
const SetDecryptorReadyCB& set_decryptor_ready_cb) |
: task_runner_(task_runner), |
state_(STATE_UNINITIALIZED), |
+ error_(OK), |
stream_(NULL), |
decoder_selector_( |
new DecoderSelector<StreamType>(task_runner, |
decoders.Pass(), |
set_decryptor_ready_cb)), |
active_splice_(false), |
+ pending_decode_requests_(0), |
weak_factory_(this) {} |
template <DemuxerStream::Type StreamType> |
@@ -85,36 +87,44 @@ void DecoderStream<StreamType>::Read(const ReadCB& read_cb) { |
FUNCTION_DVLOG(2); |
DCHECK(task_runner_->BelongsToCurrentThread()); |
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || |
- state_ == STATE_ERROR) << state_; |
+ state_ == STATE_ERROR || state_ == STATE_REINITIALIZING_DECODER || |
+ state_ == STATE_PENDING_DEMUXER_READ) |
+ << state_; |
// No two reads in the flight at any time. |
DCHECK(read_cb_.is_null()); |
// No read during resetting or stopping process. |
DCHECK(reset_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
+ read_cb_ = read_cb; |
+ |
if (state_ == STATE_ERROR) { |
+ DCHECK_NE(error_, OK); |
task_runner_->PostTask(FROM_HERE, base::Bind( |
- read_cb, DECODE_ERROR, scoped_refptr<Output>())); |
+ base::ResetAndReturn(&read_cb_), error_, scoped_refptr<Output>())); |
return; |
} |
- read_cb_ = read_cb; |
+ if (!ready_outputs_.empty()) { |
+ task_runner_->PostTask(FROM_HERE, base::Bind( |
+ base::ResetAndReturn(&read_cb_), OK, ready_outputs_.front())); |
+ ready_outputs_.pop_front(); |
+ } |
- if (state_ == STATE_FLUSHING_DECODER) { |
- FlushDecoder(); |
+ // Decoder may be in reinitializing state as result of the previous Read(). |
+ if (state_ == STATE_REINITIALIZING_DECODER) |
return; |
- } |
- scoped_refptr<Output> output = decoder_->GetDecodeOutput(); |
+ if (!CanDecodeAnotherBuffer()) |
+ return; |
- // If the decoder has queued output ready to go we don't need a demuxer read. |
- if (output) { |
- task_runner_->PostTask( |
- FROM_HERE, base::Bind(base::ResetAndReturn(&read_cb_), OK, output)); |
+ if (state_ == STATE_FLUSHING_DECODER) { |
+ FlushDecoder(); |
return; |
} |
- ReadFromDemuxerStream(); |
+ if (state_ != STATE_PENDING_DEMUXER_READ) |
+ ReadFromDemuxerStream(); |
} |
template <DemuxerStream::Type StreamType> |
@@ -127,6 +137,13 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) { |
reset_cb_ = closure; |
+ if (!read_cb_.is_null()) { |
+ task_runner_->PostTask(FROM_HERE, base::Bind( |
+ base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); |
+ } |
+ |
+ ready_outputs_.clear(); |
+ |
// During decoder reinitialization, the Decoder does not need to be and |
// cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder |
// reinitialization. |
@@ -139,11 +156,6 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) { |
if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_) |
return; |
- // The Decoder API guarantees that if Decoder::Reset() is called during |
- // a pending decode, the decode callback must be fired before the reset |
- // callback is fired. Therefore, we can call Decoder::Reset() regardless |
- // of if we have a pending decode and always satisfy the reset callback when |
- // the decoder reset is finished. |
if (decrypting_demuxer_stream_) { |
decrypting_demuxer_stream_->Reset(base::Bind( |
&DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr())); |
@@ -173,9 +185,10 @@ void DecoderStream<StreamType>::Stop(const base::Closure& closure) { |
weak_factory_.InvalidateWeakPtrs(); |
// Post callbacks to prevent reentrance into this object. |
- if (!read_cb_.is_null()) |
+ if (!read_cb_.is_null()) { |
task_runner_->PostTask(FROM_HERE, base::Bind( |
base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); |
+ } |
if (!reset_cb_.is_null()) |
task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_)); |
@@ -211,6 +224,25 @@ bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const { |
} |
template <DemuxerStream::Type StreamType> |
+bool DecoderStream<StreamType>::CanDecodeAnotherBuffer() const { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ |
+ // Limit total number of outputs stored in |ready_outputs_| and being decoded. |
+ // It only makes sense to saturate decoder completely when output queue is |
+ // empty. |
+ // TODO(sergeyu): Is that the best way to throttle decoding requests? |
xhwang
2014/04/25 00:36:03
I don't see a better way to do that. The current a
Sergey Ulanov
2014/04/26 00:59:29
removed TODO
|
+ int num_decodes = |
+ static_cast<int>(ready_outputs_.size()) + pending_decode_requests_; |
+ return num_decodes < decoder_->GetMaxDecodeRequests(); |
+} |
+ |
+template <> |
+bool DecoderStream<DemuxerStream::AUDIO>::CanDecodeAnotherBuffer() const { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ return !pending_decode_requests_ && ready_outputs_.empty(); |
+} |
+ |
+template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::OnDecoderSelected( |
scoped_ptr<Decoder> selected_decoder, |
scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) { |
@@ -253,22 +285,11 @@ void DecoderStream<StreamType>::SatisfyRead( |
} |
template <DemuxerStream::Type StreamType> |
-void DecoderStream<StreamType>::AbortRead() { |
- // Abort read during pending reset. It is safe to fire the |read_cb_| directly |
- // instead of posting it because the renderer won't call into this class |
- // again when it's in kFlushing state. |
- // TODO(xhwang): Improve the resetting process to avoid this dependency on the |
- // caller. |
- DCHECK(!reset_cb_.is_null()); |
- SatisfyRead(ABORTED, NULL); |
-} |
- |
-template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::Decode( |
const scoped_refptr<DecoderBuffer>& buffer) { |
FUNCTION_DVLOG(2); |
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; |
- DCHECK(!read_cb_.is_null()); |
+ DCHECK(CanDecodeAnotherBuffer()); |
DCHECK(reset_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
DCHECK(buffer); |
@@ -276,6 +297,7 @@ void DecoderStream<StreamType>::Decode( |
int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size(); |
TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this); |
+ ++pending_decode_requests_; |
decoder_->Decode(buffer, |
base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, |
weak_factory_.GetWeakPtr(), |
@@ -284,7 +306,8 @@ void DecoderStream<StreamType>::Decode( |
template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::FlushDecoder() { |
- Decode(DecoderBuffer::CreateEOSBuffer()); |
+ if (pending_decode_requests_ == 0) |
xhwang
2014/04/25 00:36:03
This also depends on the fact that GVD doesn't hav
Sergey Ulanov
2014/04/26 00:59:29
Added condition in VideoDecoder comment requiring
xhwang
2014/04/29 20:01:26
By "decoding delay", I mean the case where VideoDe
Sergey Ulanov
2014/04/30 18:56:51
Ah, I see your point. Yes, decoders must never wai
|
+ Decode(DecoderBuffer::CreateEOSBuffer()); |
} |
template <DemuxerStream::Type StreamType> |
@@ -292,28 +315,40 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
int buffer_size, |
typename Decoder::Status status, |
const scoped_refptr<Output>& output) { |
- FUNCTION_DVLOG(2); |
- DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; |
- DCHECK(!read_cb_.is_null()); |
+ FUNCTION_DVLOG(2) << status << " " << output; |
+ DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || |
+ state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) |
+ << state_; |
DCHECK(stop_cb_.is_null()); |
DCHECK_EQ(status == Decoder::kOk, output != NULL); |
+ DCHECK_GT(pending_decode_requests_, 0); |
+ |
+ --pending_decode_requests_; |
TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this); |
+ if (state_ == STATE_ERROR) |
xhwang
2014/04/25 00:36:03
DCHECK(!read_cb_.is_null())?
Sergey Ulanov
2014/04/26 00:59:29
I think you meant "DCHECK(read_cb_.is_null())"
Do
|
+ return; |
+ |
if (status == Decoder::kDecodeError) { |
state_ = STATE_ERROR; |
- SatisfyRead(DECODE_ERROR, NULL); |
+ error_ = DECODE_ERROR; |
xhwang
2014/04/25 00:36:03
Shall we clear the ready_outputs_ in case of error
Sergey Ulanov
2014/04/26 00:59:29
Done.
|
+ if (!read_cb_.is_null()) |
+ SatisfyRead(DECODE_ERROR, NULL); |
return; |
} |
if (status == Decoder::kDecryptError) { |
state_ = STATE_ERROR; |
- SatisfyRead(DECRYPT_ERROR, NULL); |
+ error_ = DECRYPT_ERROR; |
xhwang
2014/04/25 00:36:03
ditto
Sergey Ulanov
2014/04/26 00:59:29
Done.
|
+ if (!read_cb_.is_null()) |
+ SatisfyRead(DECRYPT_ERROR, NULL); |
return; |
} |
if (status == Decoder::kAborted) { |
- SatisfyRead(ABORTED, NULL); |
+ if (!read_cb_.is_null()) |
xhwang
2014/04/25 00:36:03
kAborted must be a result of Reset(). In Reset() c
Sergey Ulanov
2014/04/26 00:59:29
There are some tests that return kAborted from dec
xhwang
2014/04/29 20:01:26
You are right. kAborted can also be a result of De
|
+ SatisfyRead(ABORTED, NULL); |
return; |
} |
@@ -324,10 +359,8 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
// Drop decoding result if Reset() was called during decoding. |
// The resetting process will be handled when the decoder is reset. |
- if (!reset_cb_.is_null()) { |
- AbortRead(); |
+ if (!reset_cb_.is_null()) |
return; |
- } |
// Decoder flushed. Reinitialize the decoder. |
if (state_ == STATE_FLUSHING_DECODER && |
@@ -345,14 +378,27 @@ void DecoderStream<StreamType>::OnDecodeOutputReady( |
} |
DCHECK(output); |
- SatisfyRead(OK, output); |
+ |
+ // Store decoded output. |
+ ready_outputs_.push_back(output); |
+ scoped_refptr<Output> extra_output; |
+ while ((extra_output = decoder_->GetDecodeOutput()) != NULL) { |
+ ready_outputs_.push_back(extra_output); |
+ } |
+ |
+ // Satisfy outstanding read request, if any. |
+ if (!read_cb_.is_null()) { |
+ scoped_refptr<Output> read_result = ready_outputs_.front(); |
+ ready_outputs_.pop_front(); |
+ SatisfyRead(OK, output); |
+ } |
} |
template <DemuxerStream::Type StreamType> |
void DecoderStream<StreamType>::ReadFromDemuxerStream() { |
FUNCTION_DVLOG(2); |
DCHECK_EQ(state_, STATE_NORMAL) << state_; |
- DCHECK(!read_cb_.is_null()); |
+ DCHECK(CanDecodeAnotherBuffer()); |
DCHECK(reset_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
@@ -367,11 +413,16 @@ void DecoderStream<StreamType>::OnBufferReady( |
const scoped_refptr<DecoderBuffer>& buffer) { |
FUNCTION_DVLOG(2) << ": " << status; |
DCHECK(task_runner_->BelongsToCurrentThread()); |
- DCHECK_EQ(state_, STATE_PENDING_DEMUXER_READ) << state_; |
+ DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR || |
+ state_ == STATE_STOPPED) |
+ << state_; |
DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status; |
- DCHECK(!read_cb_.is_null()); |
DCHECK(stop_cb_.is_null()); |
+ // Decoding has been stopped (e.g due to an error). |
+ if (state_ != STATE_PENDING_DEMUXER_READ) |
xhwang
2014/04/25 00:36:03
Can we
DCHECK_EQ(state_, STATE_ERROR);
DCHECK(rea
Sergey Ulanov
2014/04/26 00:59:29
Done. state_ can be either STATE_ERROR or STATE_ST
|
+ return; |
+ |
state_ = STATE_NORMAL; |
if (status == DemuxerStream::kConfigChanged) { |
@@ -383,7 +434,6 @@ void DecoderStream<StreamType>::OnBufferReady( |
state_ = STATE_FLUSHING_DECODER; |
if (!reset_cb_.is_null()) { |
- AbortRead(); |
// If we are using DecryptingDemuxerStream, we already called DDS::Reset() |
// which will continue the resetting process in it's callback. |
if (!decrypting_demuxer_stream_) |
@@ -396,7 +446,6 @@ void DecoderStream<StreamType>::OnBufferReady( |
} |
if (!reset_cb_.is_null()) { |
- AbortRead(); |
// If we are using DecryptingDemuxerStream, we already called DDS::Reset() |
// which will continue the resetting process in it's callback. |
if (!decrypting_demuxer_stream_) |
@@ -419,6 +468,10 @@ void DecoderStream<StreamType>::OnBufferReady( |
DCHECK(status == DemuxerStream::kOk) << status; |
Decode(buffer); |
+ |
+ // Read more data if the decoder supports multiple parallel decoding requests. |
+ if (CanDecodeAnotherBuffer() && !buffer->end_of_stream()) |
+ ReadFromDemuxerStream(); |
} |
template <DemuxerStream::Type StreamType> |
@@ -426,6 +479,7 @@ void DecoderStream<StreamType>::ReinitializeDecoder() { |
FUNCTION_DVLOG(2); |
DCHECK(task_runner_->BelongsToCurrentThread()); |
DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_; |
+ DCHECK_EQ(pending_decode_requests_, 0); |
DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig()); |
state_ = STATE_REINITIALIZING_DECODER; |
@@ -451,9 +505,8 @@ void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) { |
state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR; |
if (!reset_cb_.is_null()) { |
- if (!read_cb_.is_null()) |
- AbortRead(); |
base::ResetAndReturn(&reset_cb_).Run(); |
+ return; |
} |
if (read_cb_.is_null()) |