Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3869)

Unified Diff: media/filters/decoder_stream.cc

Issue 239893002: Allow multiple concurrent Decode() requests in VideoDecoder interface. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/filters/decoder_stream.cc
diff --git a/media/filters/decoder_stream.cc b/media/filters/decoder_stream.cc
index c4adc632ba89a897cbc222bae14e3737ff3f93df..b150171591627ecbda7fd5aff908fa6e9057c72b 100644
--- a/media/filters/decoder_stream.cc
+++ b/media/filters/decoder_stream.cc
@@ -45,12 +45,14 @@ DecoderStream<StreamType>::DecoderStream(
const SetDecryptorReadyCB& set_decryptor_ready_cb)
: task_runner_(task_runner),
state_(STATE_UNINITIALIZED),
+ error_(OK),
stream_(NULL),
decoder_selector_(
new DecoderSelector<StreamType>(task_runner,
decoders.Pass(),
set_decryptor_ready_cb)),
active_splice_(false),
+ pending_decode_requests_(0),
weak_factory_(this) {}
template <DemuxerStream::Type StreamType>
@@ -85,36 +87,44 @@ void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
- state_ == STATE_ERROR) << state_;
+ state_ == STATE_ERROR || state_ == STATE_REINITIALIZING_DECODER ||
+ state_ == STATE_PENDING_DEMUXER_READ)
+ << state_;
// No two reads in the flight at any time.
DCHECK(read_cb_.is_null());
// No read during resetting or stopping process.
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
+ read_cb_ = read_cb;
+
if (state_ == STATE_ERROR) {
+ DCHECK_NE(error_, OK);
task_runner_->PostTask(FROM_HERE, base::Bind(
- read_cb, DECODE_ERROR, scoped_refptr<Output>()));
+ base::ResetAndReturn(&read_cb_), error_, scoped_refptr<Output>()));
return;
}
- read_cb_ = read_cb;
+ if (!ready_outputs_.empty()) {
+ task_runner_->PostTask(FROM_HERE, base::Bind(
+ base::ResetAndReturn(&read_cb_), OK, ready_outputs_.front()));
+ ready_outputs_.pop_front();
+ }
- if (state_ == STATE_FLUSHING_DECODER) {
- FlushDecoder();
+ // Decoder may be in reinitializing state as result of the previous Read().
+ if (state_ == STATE_REINITIALIZING_DECODER)
return;
- }
- scoped_refptr<Output> output = decoder_->GetDecodeOutput();
+ if (!CanDecodeAnotherBuffer())
+ return;
- // If the decoder has queued output ready to go we don't need a demuxer read.
- if (output) {
- task_runner_->PostTask(
- FROM_HERE, base::Bind(base::ResetAndReturn(&read_cb_), OK, output));
+ if (state_ == STATE_FLUSHING_DECODER) {
+ FlushDecoder();
return;
}
- ReadFromDemuxerStream();
+ if (state_ != STATE_PENDING_DEMUXER_READ)
+ ReadFromDemuxerStream();
}
template <DemuxerStream::Type StreamType>
@@ -127,6 +137,13 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
reset_cb_ = closure;
+ if (!read_cb_.is_null()) {
+ task_runner_->PostTask(FROM_HERE, base::Bind(
+ base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
+ }
+
+ ready_outputs_.clear();
+
// During decoder reinitialization, the Decoder does not need to be and
// cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
// reinitialization.
@@ -139,11 +156,6 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
return;
- // The Decoder API guarantees that if Decoder::Reset() is called during
- // a pending decode, the decode callback must be fired before the reset
- // callback is fired. Therefore, we can call Decoder::Reset() regardless
- // of if we have a pending decode and always satisfy the reset callback when
- // the decoder reset is finished.
if (decrypting_demuxer_stream_) {
decrypting_demuxer_stream_->Reset(base::Bind(
&DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
@@ -173,9 +185,10 @@ void DecoderStream<StreamType>::Stop(const base::Closure& closure) {
weak_factory_.InvalidateWeakPtrs();
// Post callbacks to prevent reentrance into this object.
- if (!read_cb_.is_null())
+ if (!read_cb_.is_null()) {
task_runner_->PostTask(FROM_HERE, base::Bind(
base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
+ }
if (!reset_cb_.is_null())
task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
@@ -211,6 +224,25 @@ bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
}
template <DemuxerStream::Type StreamType>
+bool DecoderStream<StreamType>::CanDecodeAnotherBuffer() const {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+
+ // Limit total number of outputs stored in |ready_outputs_| and being decoded.
+ // It only makes sense to saturate decoder completely when output queue is
+ // empty.
+ // TODO(sergeyu): Is that the best way to throttle decoding requests?
xhwang 2014/04/25 00:36:03 I don't see a better way to do that. The current a
Sergey Ulanov 2014/04/26 00:59:29 removed TODO
+ int num_decodes =
+ static_cast<int>(ready_outputs_.size()) + pending_decode_requests_;
+ return num_decodes < decoder_->GetMaxDecodeRequests();
+}
+
+template <>
+bool DecoderStream<DemuxerStream::AUDIO>::CanDecodeAnotherBuffer() const {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ return !pending_decode_requests_ && ready_outputs_.empty();
+}
+
+template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::OnDecoderSelected(
scoped_ptr<Decoder> selected_decoder,
scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
@@ -253,22 +285,11 @@ void DecoderStream<StreamType>::SatisfyRead(
}
template <DemuxerStream::Type StreamType>
-void DecoderStream<StreamType>::AbortRead() {
- // Abort read during pending reset. It is safe to fire the |read_cb_| directly
- // instead of posting it because the renderer won't call into this class
- // again when it's in kFlushing state.
- // TODO(xhwang): Improve the resetting process to avoid this dependency on the
- // caller.
- DCHECK(!reset_cb_.is_null());
- SatisfyRead(ABORTED, NULL);
-}
-
-template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::Decode(
const scoped_refptr<DecoderBuffer>& buffer) {
FUNCTION_DVLOG(2);
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
- DCHECK(!read_cb_.is_null());
+ DCHECK(CanDecodeAnotherBuffer());
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
DCHECK(buffer);
@@ -276,6 +297,7 @@ void DecoderStream<StreamType>::Decode(
int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
+ ++pending_decode_requests_;
decoder_->Decode(buffer,
base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
weak_factory_.GetWeakPtr(),
@@ -284,7 +306,8 @@ void DecoderStream<StreamType>::Decode(
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::FlushDecoder() {
- Decode(DecoderBuffer::CreateEOSBuffer());
+ if (pending_decode_requests_ == 0)
xhwang 2014/04/25 00:36:03 This also depends on the fact that GVD doesn't hav
Sergey Ulanov 2014/04/26 00:59:29 Added condition in VideoDecoder comment requiring
xhwang 2014/04/29 20:01:26 By "decoding delay", I mean the case where VideoDe
Sergey Ulanov 2014/04/30 18:56:51 Ah, I see your point. Yes, decoders must never wai
+ Decode(DecoderBuffer::CreateEOSBuffer());
}
template <DemuxerStream::Type StreamType>
@@ -292,28 +315,40 @@ void DecoderStream<StreamType>::OnDecodeOutputReady(
int buffer_size,
typename Decoder::Status status,
const scoped_refptr<Output>& output) {
- FUNCTION_DVLOG(2);
- DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
- DCHECK(!read_cb_.is_null());
+ FUNCTION_DVLOG(2) << status << " " << output;
+ DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
+ state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
+ << state_;
DCHECK(stop_cb_.is_null());
DCHECK_EQ(status == Decoder::kOk, output != NULL);
+ DCHECK_GT(pending_decode_requests_, 0);
+
+ --pending_decode_requests_;
TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
+ if (state_ == STATE_ERROR)
xhwang 2014/04/25 00:36:03 DCHECK(!read_cb_.is_null())?
Sergey Ulanov 2014/04/26 00:59:29 I think you meant "DCHECK(read_cb_.is_null())" Do
+ return;
+
if (status == Decoder::kDecodeError) {
state_ = STATE_ERROR;
- SatisfyRead(DECODE_ERROR, NULL);
+ error_ = DECODE_ERROR;
xhwang 2014/04/25 00:36:03 Shall we clear the ready_outputs_ in case of error
Sergey Ulanov 2014/04/26 00:59:29 Done.
+ if (!read_cb_.is_null())
+ SatisfyRead(DECODE_ERROR, NULL);
return;
}
if (status == Decoder::kDecryptError) {
state_ = STATE_ERROR;
- SatisfyRead(DECRYPT_ERROR, NULL);
+ error_ = DECRYPT_ERROR;
xhwang 2014/04/25 00:36:03 ditto
Sergey Ulanov 2014/04/26 00:59:29 Done.
+ if (!read_cb_.is_null())
+ SatisfyRead(DECRYPT_ERROR, NULL);
return;
}
if (status == Decoder::kAborted) {
- SatisfyRead(ABORTED, NULL);
+ if (!read_cb_.is_null())
xhwang 2014/04/25 00:36:03 kAborted must be a result of Reset(). In Reset() c
Sergey Ulanov 2014/04/26 00:59:29 There are some tests that return kAborted from dec
xhwang 2014/04/29 20:01:26 You are right. kAborted can also be a result of De
+ SatisfyRead(ABORTED, NULL);
return;
}
@@ -324,10 +359,8 @@ void DecoderStream<StreamType>::OnDecodeOutputReady(
// Drop decoding result if Reset() was called during decoding.
// The resetting process will be handled when the decoder is reset.
- if (!reset_cb_.is_null()) {
- AbortRead();
+ if (!reset_cb_.is_null())
return;
- }
// Decoder flushed. Reinitialize the decoder.
if (state_ == STATE_FLUSHING_DECODER &&
@@ -345,14 +378,27 @@ void DecoderStream<StreamType>::OnDecodeOutputReady(
}
DCHECK(output);
- SatisfyRead(OK, output);
+
+ // Store decoded output.
+ ready_outputs_.push_back(output);
+ scoped_refptr<Output> extra_output;
+ while ((extra_output = decoder_->GetDecodeOutput()) != NULL) {
+ ready_outputs_.push_back(extra_output);
+ }
+
+ // Satisfy outstanding read request, if any.
+ if (!read_cb_.is_null()) {
+ scoped_refptr<Output> read_result = ready_outputs_.front();
+ ready_outputs_.pop_front();
+ SatisfyRead(OK, output);
+ }
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::ReadFromDemuxerStream() {
FUNCTION_DVLOG(2);
DCHECK_EQ(state_, STATE_NORMAL) << state_;
- DCHECK(!read_cb_.is_null());
+ DCHECK(CanDecodeAnotherBuffer());
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
@@ -367,11 +413,16 @@ void DecoderStream<StreamType>::OnBufferReady(
const scoped_refptr<DecoderBuffer>& buffer) {
FUNCTION_DVLOG(2) << ": " << status;
DCHECK(task_runner_->BelongsToCurrentThread());
- DCHECK_EQ(state_, STATE_PENDING_DEMUXER_READ) << state_;
+ DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR ||
+ state_ == STATE_STOPPED)
+ << state_;
DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
- DCHECK(!read_cb_.is_null());
DCHECK(stop_cb_.is_null());
+ // Decoding has been stopped (e.g due to an error).
+ if (state_ != STATE_PENDING_DEMUXER_READ)
xhwang 2014/04/25 00:36:03 Can we DCHECK_EQ(state_, STATE_ERROR); DCHECK(rea
Sergey Ulanov 2014/04/26 00:59:29 Done. state_ can be either STATE_ERROR or STATE_ST
+ return;
+
state_ = STATE_NORMAL;
if (status == DemuxerStream::kConfigChanged) {
@@ -383,7 +434,6 @@ void DecoderStream<StreamType>::OnBufferReady(
state_ = STATE_FLUSHING_DECODER;
if (!reset_cb_.is_null()) {
- AbortRead();
// If we are using DecryptingDemuxerStream, we already called DDS::Reset()
// which will continue the resetting process in it's callback.
if (!decrypting_demuxer_stream_)
@@ -396,7 +446,6 @@ void DecoderStream<StreamType>::OnBufferReady(
}
if (!reset_cb_.is_null()) {
- AbortRead();
// If we are using DecryptingDemuxerStream, we already called DDS::Reset()
// which will continue the resetting process in it's callback.
if (!decrypting_demuxer_stream_)
@@ -419,6 +468,10 @@ void DecoderStream<StreamType>::OnBufferReady(
DCHECK(status == DemuxerStream::kOk) << status;
Decode(buffer);
+
+ // Read more data if the decoder supports multiple parallel decoding requests.
+ if (CanDecodeAnotherBuffer() && !buffer->end_of_stream())
+ ReadFromDemuxerStream();
}
template <DemuxerStream::Type StreamType>
@@ -426,6 +479,7 @@ void DecoderStream<StreamType>::ReinitializeDecoder() {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_;
+ DCHECK_EQ(pending_decode_requests_, 0);
DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig());
state_ = STATE_REINITIALIZING_DECODER;
@@ -451,9 +505,8 @@ void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR;
if (!reset_cb_.is_null()) {
- if (!read_cb_.is_null())
- AbortRead();
base::ResetAndReturn(&reset_cb_).Run();
+ return;
}
if (read_cb_.is_null())

Powered by Google App Engine
This is Rietveld 408576698