Chromium Code Reviews| Index: media/filters/ffmpeg_demuxer.cc |
| diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc |
| index 0a690e83464a9e72815cf541492799696e2ceb34..efcd41905ed29470578dbcd9dc360783cc098925 100644 |
| --- a/media/filters/ffmpeg_demuxer.cc |
| +++ b/media/filters/ffmpeg_demuxer.cc |
| @@ -35,6 +35,7 @@ FFmpegDemuxerStream::FFmpegDemuxerStream( |
| FFmpegDemuxer* demuxer, |
| AVStream* stream) |
| : demuxer_(demuxer), |
| + message_loop_(base::MessageLoopProxy::current()), |
| stream_(stream), |
| type_(UNKNOWN), |
| stopped_(false), |
| @@ -67,17 +68,15 @@ FFmpegDemuxerStream::FFmpegDemuxerStream( |
| } |
| bool FFmpegDemuxerStream::HasPendingReads() { |
| - DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); |
| - base::AutoLock auto_lock(lock_); |
| + DCHECK(message_loop_->BelongsToCurrentThread()); |
| DCHECK(!stopped_ || read_queue_.empty()) |
| << "Read queue should have been emptied if demuxing stream is stopped"; |
| return !read_queue_.empty(); |
| } |
| void FFmpegDemuxerStream::EnqueuePacket(ScopedAVPacket packet) { |
| - DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); |
| + DCHECK(message_loop_->BelongsToCurrentThread()); |
| - base::AutoLock auto_lock(lock_); |
| if (stopped_) { |
| NOTREACHED() << "Attempted to enqueue packet on a stopped stream"; |
| return; |
| @@ -105,28 +104,24 @@ void FFmpegDemuxerStream::EnqueuePacket(ScopedAVPacket packet) { |
| last_packet_timestamp_ != kNoTimestamp() && |
| last_packet_timestamp_ < buffer->GetTimestamp()) { |
| buffered_ranges_.Add(last_packet_timestamp_, buffer->GetTimestamp()); |
| - demuxer_->message_loop()->PostTask(FROM_HERE, base::Bind( |
| - &FFmpegDemuxer::NotifyBufferingChanged, demuxer_)); |
| + demuxer_->NotifyBufferingChanged(); |
| } |
| last_packet_timestamp_ = buffer->GetTimestamp(); |
| } |
| buffer_queue_.push_back(buffer); |
| - FulfillPendingRead(); |
| - return; |
| + SatisfyPendingReads(); |
| } |
| void FFmpegDemuxerStream::FlushBuffers() { |
| - DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); |
| - base::AutoLock auto_lock(lock_); |
| + DCHECK(message_loop_->BelongsToCurrentThread()); |
| DCHECK(read_queue_.empty()) << "Read requests should be empty"; |
| buffer_queue_.clear(); |
| last_packet_timestamp_ = kNoTimestamp(); |
| } |
| void FFmpegDemuxerStream::Stop() { |
| - DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); |
| - base::AutoLock auto_lock(lock_); |
| + DCHECK(message_loop_->BelongsToCurrentThread()); |
| buffer_queue_.clear(); |
| for (ReadQueue::iterator it = read_queue_.begin(); |
| it != read_queue_.end(); ++it) { |
| @@ -146,76 +141,32 @@ DemuxerStream::Type FFmpegDemuxerStream::type() { |
| } |
| void FFmpegDemuxerStream::Read(const ReadCB& read_cb) { |
| - DCHECK(!read_cb.is_null()); |
| - |
| - base::AutoLock auto_lock(lock_); |
| - // Don't accept any additional reads if we've been told to stop. |
| - // The demuxer_ may have been destroyed in the pipleine thread. |
| - // |
| - // TODO(scherkus): it would be cleaner if we replied with an error message. |
| - if (stopped_) { |
| - read_cb.Run(DemuxerStream::kOk, |
| - scoped_refptr<DecoderBuffer>(DecoderBuffer::CreateEOSBuffer())); |
| - return; |
| - } |
| - |
| - // Buffers are only queued when there are no pending reads. |
| - DCHECK(buffer_queue_.empty() || read_queue_.empty()); |
| - |
| - if (buffer_queue_.empty()) { |
| - demuxer_->message_loop()->PostTask(FROM_HERE, base::Bind( |
| - &FFmpegDemuxerStream::ReadTask, this, read_cb)); |
| + if (!message_loop_->BelongsToCurrentThread()) { |
| + message_loop_->PostTask(FROM_HERE, base::Bind( |
| + &FFmpegDemuxerStream::Read, this, read_cb)); |
| return; |
| } |
| - // Send the oldest buffer back. |
| - scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front(); |
| - buffer_queue_.pop_front(); |
| - read_cb.Run(DemuxerStream::kOk, buffer); |
| -} |
| - |
| -void FFmpegDemuxerStream::ReadTask(const ReadCB& read_cb) { |
| - DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); |
| - |
| - base::AutoLock auto_lock(lock_); |
| // Don't accept any additional reads if we've been told to stop. |
| + // The |demuxer_| may have been destroyed in the pipeline thread. |
| // |
| - // TODO(scherkus): it would be cleaner if we replied with an error message. |
| + // TODO(scherkus): it would be cleaner to reply with an error message. |
| if (stopped_) { |
| read_cb.Run(DemuxerStream::kOk, |
| scoped_refptr<DecoderBuffer>(DecoderBuffer::CreateEOSBuffer())); |
|
xhwang
2012/11/20 17:54:34
ditto
scherkus (not reviewing)
2012/11/20 19:16:07
Done.
|
| return; |
| } |
| - // Enqueue the callback and attempt to satisfy it immediately. |
| read_queue_.push_back(read_cb); |
| - FulfillPendingRead(); |
| - |
| - // Check if there are still pending reads, demux some more. |
| - if (!read_queue_.empty()) { |
| - demuxer_->PostDemuxTask(); |
| - } |
| + SatisfyPendingReads(); |
| } |
| -void FFmpegDemuxerStream::FulfillPendingRead() { |
| - DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); |
| - lock_.AssertAcquired(); |
| - if (buffer_queue_.empty() || read_queue_.empty()) { |
| +void FFmpegDemuxerStream::EnableBitstreamConverter() { |
| + if (!message_loop_->BelongsToCurrentThread()) { |
| + message_loop_->PostTask(FROM_HERE, base::Bind( |
| + &FFmpegDemuxerStream::EnableBitstreamConverter, this)); |
| return; |
| } |
| - |
| - // Dequeue a buffer and pending read pair. |
| - scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front(); |
| - ReadCB read_cb(read_queue_.front()); |
| - buffer_queue_.pop_front(); |
| - read_queue_.pop_front(); |
| - |
| - // Execute the callback. |
| - read_cb.Run(DemuxerStream::kOk, buffer); |
| -} |
| - |
| -void FFmpegDemuxerStream::EnableBitstreamConverter() { |
| - base::AutoLock auto_lock(lock_); |
| CHECK(bitstream_converter_.get()); |
| bitstream_converter_enabled_ = true; |
| } |
| @@ -231,7 +182,6 @@ const VideoDecoderConfig& FFmpegDemuxerStream::video_decoder_config() { |
| } |
| FFmpegDemuxerStream::~FFmpegDemuxerStream() { |
| - base::AutoLock auto_lock(lock_); |
| DCHECK(stopped_); |
| DCHECK(read_queue_.empty()); |
| DCHECK(buffer_queue_.empty()); |
| @@ -242,10 +192,27 @@ base::TimeDelta FFmpegDemuxerStream::GetElapsedTime() const { |
| } |
| Ranges<base::TimeDelta> FFmpegDemuxerStream::GetBufferedRanges() const { |
| - base::AutoLock auto_lock(lock_); |
| return buffered_ranges_; |
| } |
| +void FFmpegDemuxerStream::SatisfyPendingReads() { |
|
xhwang
2012/11/20 17:54:34
Add DCHECK(message_loop_->BelongsToCurrentThread()
scherkus (not reviewing)
2012/11/20 19:16:07
Done.
|
| + while (!read_queue_.empty() && !buffer_queue_.empty()) { |
| + ReadCB read_cb = read_queue_.front(); |
| + read_queue_.pop_front(); |
| + |
| + // Send earliest buffer back on a new execution stack to avoid recursing. |
| + scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front(); |
| + buffer_queue_.pop_front(); |
| + message_loop_->PostTask(FROM_HERE, base::Bind( |
| + read_cb, DemuxerStream::kOk, buffer)); |
| + } |
| + |
| + // No buffers but pending reads? Ask for more! |
| + if (!read_queue_.empty() && buffer_queue_.empty()) { |
| + demuxer_->NotifyHasPendingRead(); |
| + } |
| +} |
| + |
| // static |
| base::TimeDelta FFmpegDemuxerStream::ConvertStreamTimestamp( |
| const AVRational& time_base, int64 timestamp) { |
| @@ -277,11 +244,6 @@ FFmpegDemuxer::FFmpegDemuxer( |
| FFmpegDemuxer::~FFmpegDemuxer() {} |
| -void FFmpegDemuxer::PostDemuxTask() { |
| - message_loop_->PostTask(FROM_HERE, |
| - base::Bind(&FFmpegDemuxer::DemuxTask, this)); |
| -} |
| - |
| void FFmpegDemuxer::Stop(const base::Closure& callback) { |
| // Post a task to notify the streams to stop as well. |
| message_loop_->PostTask(FROM_HERE, |
| @@ -329,10 +291,6 @@ base::TimeDelta FFmpegDemuxer::GetStartTime() const { |
| return start_time_; |
| } |
| -scoped_refptr<base::MessageLoopProxy> FFmpegDemuxer::message_loop() { |
| - return message_loop_; |
| -} |
| - |
| // Helper for calculating the bitrate of the media based on information stored |
| // in |format_context| or failing that the size and duration of the media. |
| // |
| @@ -600,9 +558,6 @@ void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) { |
| } |
| // Queue the packet with the appropriate stream. |
| - // TODO(scherkus): should we post this back to the pipeline thread? I'm |
| - // worried about downstream filters (i.e., decoders) executing on this |
| - // thread. |
| DCHECK_GE(packet->stream_index, 0); |
| DCHECK_LT(packet->stream_index, static_cast<int>(streams_.size())); |
| @@ -619,18 +574,13 @@ void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) { |
| // Create a loop by posting another task. This allows seek and message loop |
| // quit tasks to get processed. |
| if (StreamsHavePendingReads()) { |
| - PostDemuxTask(); |
| + message_loop_->PostTask(FROM_HERE, base::Bind( |
| + &FFmpegDemuxer::DemuxTask, this)); |
| } |
| } |
| void FFmpegDemuxer::StopTask(const base::Closure& callback) { |
| DCHECK(message_loop_->BelongsToCurrentThread()); |
| - StreamVector::iterator iter; |
| - for (iter = streams_.begin(); iter != streams_.end(); ++iter) { |
| - if (*iter) |
| - (*iter)->Stop(); |
| - } |
| - |
| url_protocol_.Abort(); |
| data_source_->Stop(BindToLoop(message_loop_, base::Bind( |
| &FFmpegDemuxer::OnDataSourceStopped, this, callback))); |
| @@ -643,6 +593,12 @@ void FFmpegDemuxer::OnDataSourceStopped(const base::Closure& callback) { |
| // thread and drop their results on the floor. |
| DCHECK(message_loop_->BelongsToCurrentThread()); |
| blocking_thread_.Stop(); |
| + |
| + StreamVector::iterator iter; |
| + for (iter = streams_.begin(); iter != streams_.end(); ++iter) { |
| + if (*iter) |
| + (*iter)->Stop(); |
| + } |
| callback.Run(); |
| } |
| @@ -680,6 +636,11 @@ void FFmpegDemuxer::StreamHasEnded() { |
| } |
| } |
| +void FFmpegDemuxer::NotifyHasPendingRead() { |
|
xhwang
2012/11/20 17:54:34
This function is only called once in SatisfyPendin
scherkus (not reviewing)
2012/11/20 19:16:07
I think it's fine keep around -- it's clearer than
|
| + DCHECK(message_loop_->BelongsToCurrentThread()); |
| + DemuxTask(); |
| +} |
| + |
| void FFmpegDemuxer::NotifyBufferingChanged() { |
| DCHECK(message_loop_->BelongsToCurrentThread()); |
| Ranges<base::TimeDelta> buffered; |