Chromium Code Reviews| Index: media/filters/ffmpeg_demuxer.cc |
| diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc |
| index 4d75b7c3861454ef7d03e4a0a2e7c4f07fca8991..090358f2423f701ecd57a2bd14675c975adbcdd3 100644 |
| --- a/media/filters/ffmpeg_demuxer.cc |
| +++ b/media/filters/ffmpeg_demuxer.cc |
| @@ -22,6 +22,8 @@ |
| #include "base/strings/stringprintf.h" |
| #include "base/sys_byteorder.h" |
| #include "base/task_runner_util.h" |
| +#include "base/task_scheduler/post_task.h" |
| +#include "base/threading/sequenced_worker_pool.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "base/time/time.h" |
| #include "media/audio/sample_rates.h" |
| @@ -845,7 +847,13 @@ FFmpegDemuxer::FFmpegDemuxer( |
| const scoped_refptr<MediaLog>& media_log) |
| : host_(NULL), |
| task_runner_(task_runner), |
| - blocking_thread_("FFmpegDemuxer"), |
| + // FFmpeg has no asynchronous API, so we use base::WaitableEvents inside |
| + // the BlockingUrlProtocol to handle hops to the render thread for network |
| + // reads and seeks. |
| + blocking_task_runner_(base::CreateSequencedTaskRunnerWithTraits( |
| + base::TaskTraits().MayBlock().WithBaseSyncPrimitives().WithPriority( |
| + base::TaskPriority::USER_BLOCKING))), |
| + stopped_(false), |
| pending_read_(false), |
| data_source_(data_source), |
| media_log_(media_log), |
| @@ -866,6 +874,12 @@ FFmpegDemuxer::~FFmpegDemuxer() { |
| // NOTE: This class is not destroyed on |task_runner|, so we must ensure that |
| // there are no outstanding WeakPtrs by the time we reach here. |
| DCHECK(!weak_factory_.HasWeakPtrs()); |
| + |
| + // There may be outstanding tasks in the blocking pool which are trying to use |
| + // these members, so release them in sequence with any outstanding calls. The |
| + // earlier call to Abort() on |data_source_| prevents further access to it. |
| + blocking_task_runner_->DeleteSoon(FROM_HERE, url_protocol_.release()); |
| + blocking_task_runner_->DeleteSoon(FROM_HERE, glue_.release()); |
| } |
| std::string FFmpegDemuxer::GetDisplayName() const { |
| @@ -880,10 +894,11 @@ void FFmpegDemuxer::Initialize(DemuxerHost* host, |
| text_enabled_ = enable_text_tracks; |
| weak_this_ = cancel_pending_seek_factory_.GetWeakPtr(); |
| + // Give a WeakPtr to BlockingUrlProtocol since we'll need to release it on the |
| + // blocking thread pool. |
| url_protocol_.reset(new BlockingUrlProtocol( |
| - data_source_, |
| - BindToCurrentLoop(base::Bind(&FFmpegDemuxer::OnDataSourceError, |
| - base::Unretained(this))))); |
| + data_source_, BindToCurrentLoop(base::Bind( |
| + &FFmpegDemuxer::OnDataSourceError, weak_this_)))); |
|
xhwang
2017/03/02 22:05:01
We have mixed use of weak_this_ and weak_factory_.
DaleCurtis
2017/03/02 22:48:53
weak_this_ is bound to a factory which should only
xhwang
2017/03/02 23:07:13
Ah, that's tricky and I didn't even notice it. A f
|
| glue_.reset(new FFmpegGlue(url_protocol_.get())); |
| AVFormatContext* format_context = glue_->format_context(); |
| @@ -899,9 +914,8 @@ void FFmpegDemuxer::Initialize(DemuxerHost* host, |
| format_context->max_analyze_duration = 60 * AV_TIME_BASE; |
| // Open the AVFormatContext using our glue layer. |
| - CHECK(blocking_thread_.Start()); |
| base::PostTaskAndReplyWithResult( |
| - blocking_thread_.task_runner().get(), FROM_HERE, |
| + blocking_task_runner_.get(), FROM_HERE, |
| base::Bind(&FFmpegGlue::OpenContext, base::Unretained(glue_.get())), |
| base::Bind(&FFmpegDemuxer::OnOpenContextDone, weak_factory_.GetWeakPtr(), |
| status_cb)); |
| @@ -911,7 +925,7 @@ void FFmpegDemuxer::AbortPendingReads() { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| // If Stop() has been called, then drop this call. |
| - if (!blocking_thread_.IsRunning()) |
| + if (stopped_) |
| return; |
| // This should only be called after the demuxer has been initialized. |
| @@ -929,7 +943,7 @@ void FFmpegDemuxer::AbortPendingReads() { |
| data_source_->Abort(); |
| // Aborting the read may cause EOF to be marked, undo this. |
| - blocking_thread_.task_runner()->PostTask( |
| + blocking_task_runner_->PostTask( |
| FROM_HERE, base::Bind(&UnmarkEndOfStream, glue_->format_context())); |
| pending_read_ = false; |
| @@ -949,12 +963,6 @@ void FFmpegDemuxer::Stop() { |
| data_source_->Stop(); |
| url_protocol_->Abort(); |
| - // This will block until all tasks complete. Note that after this returns it's |
| - // possible for reply tasks (e.g., OnReadFrameDone()) to be queued on this |
| - // thread. Each of the reply task methods must check whether we've stopped the |
| - // thread and drop their results on the floor. |
| - blocking_thread_.Stop(); |
| - |
| for (const auto& stream : streams_) { |
| if (stream) |
| stream->Stop(); |
| @@ -963,7 +971,9 @@ void FFmpegDemuxer::Stop() { |
| data_source_ = NULL; |
| // Invalidate WeakPtrs on |task_runner_|, destruction may happen on another |
| - // thread. |
| + // thread. We don't need to wait for any outstanding tasks since they will all |
| + // fail to return after invalidating WeakPtrs. |
| + stopped_ = true; |
| weak_factory_.InvalidateWeakPtrs(); |
| cancel_pending_seek_factory_.InvalidateWeakPtrs(); |
| } |
| @@ -1022,7 +1032,7 @@ void FFmpegDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) { |
| pending_seek_cb_ = cb; |
| base::PostTaskAndReplyWithResult( |
| - blocking_thread_.task_runner().get(), FROM_HERE, |
| + blocking_task_runner_.get(), FROM_HERE, |
| base::Bind(&av_seek_frame, glue_->format_context(), seeking_stream->index, |
| ConvertToTimeBase(seeking_stream->time_base, seek_time), |
| // Always seek to a timestamp <= to the desired timestamp. |
| @@ -1167,7 +1177,7 @@ static int CalculateBitrate(AVFormatContext* format_context, |
| void FFmpegDemuxer::OnOpenContextDone(const PipelineStatusCB& status_cb, |
| bool result) { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| - if (!blocking_thread_.IsRunning()) { |
| + if (stopped_) { |
| MEDIA_LOG(ERROR, media_log_) << GetDisplayName() << ": bad state"; |
| status_cb.Run(PIPELINE_ERROR_ABORT); |
| return; |
| @@ -1181,20 +1191,17 @@ void FFmpegDemuxer::OnOpenContextDone(const PipelineStatusCB& status_cb, |
| // Fully initialize AVFormatContext by parsing the stream a little. |
| base::PostTaskAndReplyWithResult( |
| - blocking_thread_.task_runner().get(), |
| - FROM_HERE, |
| - base::Bind(&avformat_find_stream_info, |
| - glue_->format_context(), |
| + blocking_task_runner_.get(), FROM_HERE, |
| + base::Bind(&avformat_find_stream_info, glue_->format_context(), |
| static_cast<AVDictionary**>(NULL)), |
| base::Bind(&FFmpegDemuxer::OnFindStreamInfoDone, |
| - weak_factory_.GetWeakPtr(), |
| - status_cb)); |
| + weak_factory_.GetWeakPtr(), status_cb)); |
| } |
| void FFmpegDemuxer::OnFindStreamInfoDone(const PipelineStatusCB& status_cb, |
| int result) { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| - if (!blocking_thread_.IsRunning() || !data_source_) { |
| + if (stopped_ || !data_source_) { |
| MEDIA_LOG(ERROR, media_log_) << GetDisplayName() << ": bad state"; |
| status_cb.Run(PIPELINE_ERROR_ABORT); |
| return; |
| @@ -1614,7 +1621,7 @@ void FFmpegDemuxer::OnSeekFrameDone(int result) { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| CHECK(!pending_seek_cb_.is_null()); |
| - if (!blocking_thread_.IsRunning()) { |
| + if (stopped_) { |
| MEDIA_LOG(ERROR, media_log_) << GetDisplayName() << ": bad state"; |
| base::ResetAndReturn(&pending_seek_cb_).Run(PIPELINE_ERROR_ABORT); |
| return; |
| @@ -1700,8 +1707,8 @@ void FFmpegDemuxer::ReadFrameIfNeeded() { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| // Make sure we have work to do before reading. |
| - if (!blocking_thread_.IsRunning() || !StreamsHaveAvailableCapacity() || |
| - pending_read_ || !pending_seek_cb_.is_null()) { |
| + if (stopped_ || !StreamsHaveAvailableCapacity() || pending_read_ || |
| + !pending_seek_cb_.is_null()) { |
| return; |
| } |
| @@ -1713,11 +1720,9 @@ void FFmpegDemuxer::ReadFrameIfNeeded() { |
| pending_read_ = true; |
| base::PostTaskAndReplyWithResult( |
| - blocking_thread_.task_runner().get(), |
| - FROM_HERE, |
| + blocking_task_runner_.get(), FROM_HERE, |
| base::Bind(&av_read_frame, glue_->format_context(), packet_ptr), |
| - base::Bind(&FFmpegDemuxer::OnReadFrameDone, |
| - weak_factory_.GetWeakPtr(), |
| + base::Bind(&FFmpegDemuxer::OnReadFrameDone, weak_factory_.GetWeakPtr(), |
| base::Passed(&packet))); |
| } |
| @@ -1726,7 +1731,7 @@ void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) { |
| DCHECK(pending_read_); |
| pending_read_ = false; |
| - if (!blocking_thread_.IsRunning() || !pending_seek_cb_.is_null()) |
| + if (stopped_ || !pending_seek_cb_.is_null()) |
| return; |
| // Consider the stream as ended if: |