Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(861)

Unified Diff: media/filters/ffmpeg_demuxer.cc

Issue 2710133003: Replace FFmpegDemuxer thread per element with base::TaskScheduler. (Closed)
Patch Set: Give WeakPtr to URLProtocol. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « media/filters/ffmpeg_demuxer.h ('k') | media/filters/ffmpeg_demuxer_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: media/filters/ffmpeg_demuxer.cc
diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc
index 4d75b7c3861454ef7d03e4a0a2e7c4f07fca8991..090358f2423f701ecd57a2bd14675c975adbcdd3 100644
--- a/media/filters/ffmpeg_demuxer.cc
+++ b/media/filters/ffmpeg_demuxer.cc
@@ -22,6 +22,8 @@
#include "base/strings/stringprintf.h"
#include "base/sys_byteorder.h"
#include "base/task_runner_util.h"
+#include "base/task_scheduler/post_task.h"
+#include "base/threading/sequenced_worker_pool.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "media/audio/sample_rates.h"
@@ -845,7 +847,13 @@ FFmpegDemuxer::FFmpegDemuxer(
const scoped_refptr<MediaLog>& media_log)
: host_(NULL),
task_runner_(task_runner),
- blocking_thread_("FFmpegDemuxer"),
+ // FFmpeg has no asynchronous API, so we use base::WaitableEvents inside
+ // the BlockingUrlProtocol to handle hops to the render thread for network
+ // reads and seeks.
+ blocking_task_runner_(base::CreateSequencedTaskRunnerWithTraits(
+ base::TaskTraits().MayBlock().WithBaseSyncPrimitives().WithPriority(
+ base::TaskPriority::USER_BLOCKING))),
+ stopped_(false),
pending_read_(false),
data_source_(data_source),
media_log_(media_log),
@@ -866,6 +874,12 @@ FFmpegDemuxer::~FFmpegDemuxer() {
// NOTE: This class is not destroyed on |task_runner|, so we must ensure that
// there are no outstanding WeakPtrs by the time we reach here.
DCHECK(!weak_factory_.HasWeakPtrs());
+
+ // There may be outstanding tasks in the blocking pool which are trying to use
+ // these members, so release them in sequence with any outstanding calls. The
+ // earlier call to Abort() on |data_source_| prevents further access to it.
+ blocking_task_runner_->DeleteSoon(FROM_HERE, url_protocol_.release());
+ blocking_task_runner_->DeleteSoon(FROM_HERE, glue_.release());
}
std::string FFmpegDemuxer::GetDisplayName() const {
@@ -880,10 +894,11 @@ void FFmpegDemuxer::Initialize(DemuxerHost* host,
text_enabled_ = enable_text_tracks;
weak_this_ = cancel_pending_seek_factory_.GetWeakPtr();
+ // Give a WeakPtr to BlockingUrlProtocol since we'll need to release it on the
+ // blocking thread pool.
url_protocol_.reset(new BlockingUrlProtocol(
- data_source_,
- BindToCurrentLoop(base::Bind(&FFmpegDemuxer::OnDataSourceError,
- base::Unretained(this)))));
+ data_source_, BindToCurrentLoop(base::Bind(
+ &FFmpegDemuxer::OnDataSourceError, weak_this_))));
xhwang 2017/03/02 22:05:01 We have mixed use of weak_this_ and weak_factory_.
DaleCurtis 2017/03/02 22:48:53 weak_this_ is bound to a factory which should only
xhwang 2017/03/02 23:07:13 Ah, that's tricky and I didn't even notice it. A f
glue_.reset(new FFmpegGlue(url_protocol_.get()));
AVFormatContext* format_context = glue_->format_context();
@@ -899,9 +914,8 @@ void FFmpegDemuxer::Initialize(DemuxerHost* host,
format_context->max_analyze_duration = 60 * AV_TIME_BASE;
// Open the AVFormatContext using our glue layer.
- CHECK(blocking_thread_.Start());
base::PostTaskAndReplyWithResult(
- blocking_thread_.task_runner().get(), FROM_HERE,
+ blocking_task_runner_.get(), FROM_HERE,
base::Bind(&FFmpegGlue::OpenContext, base::Unretained(glue_.get())),
base::Bind(&FFmpegDemuxer::OnOpenContextDone, weak_factory_.GetWeakPtr(),
status_cb));
@@ -911,7 +925,7 @@ void FFmpegDemuxer::AbortPendingReads() {
DCHECK(task_runner_->BelongsToCurrentThread());
// If Stop() has been called, then drop this call.
- if (!blocking_thread_.IsRunning())
+ if (stopped_)
return;
// This should only be called after the demuxer has been initialized.
@@ -929,7 +943,7 @@ void FFmpegDemuxer::AbortPendingReads() {
data_source_->Abort();
// Aborting the read may cause EOF to be marked, undo this.
- blocking_thread_.task_runner()->PostTask(
+ blocking_task_runner_->PostTask(
FROM_HERE, base::Bind(&UnmarkEndOfStream, glue_->format_context()));
pending_read_ = false;
@@ -949,12 +963,6 @@ void FFmpegDemuxer::Stop() {
data_source_->Stop();
url_protocol_->Abort();
- // This will block until all tasks complete. Note that after this returns it's
- // possible for reply tasks (e.g., OnReadFrameDone()) to be queued on this
- // thread. Each of the reply task methods must check whether we've stopped the
- // thread and drop their results on the floor.
- blocking_thread_.Stop();
-
for (const auto& stream : streams_) {
if (stream)
stream->Stop();
@@ -963,7 +971,9 @@ void FFmpegDemuxer::Stop() {
data_source_ = NULL;
// Invalidate WeakPtrs on |task_runner_|, destruction may happen on another
- // thread.
+ // thread. We don't need to wait for any outstanding tasks since they will all
+ // fail to return after invalidating WeakPtrs.
+ stopped_ = true;
weak_factory_.InvalidateWeakPtrs();
cancel_pending_seek_factory_.InvalidateWeakPtrs();
}
@@ -1022,7 +1032,7 @@ void FFmpegDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) {
pending_seek_cb_ = cb;
base::PostTaskAndReplyWithResult(
- blocking_thread_.task_runner().get(), FROM_HERE,
+ blocking_task_runner_.get(), FROM_HERE,
base::Bind(&av_seek_frame, glue_->format_context(), seeking_stream->index,
ConvertToTimeBase(seeking_stream->time_base, seek_time),
// Always seek to a timestamp <= to the desired timestamp.
@@ -1167,7 +1177,7 @@ static int CalculateBitrate(AVFormatContext* format_context,
void FFmpegDemuxer::OnOpenContextDone(const PipelineStatusCB& status_cb,
bool result) {
DCHECK(task_runner_->BelongsToCurrentThread());
- if (!blocking_thread_.IsRunning()) {
+ if (stopped_) {
MEDIA_LOG(ERROR, media_log_) << GetDisplayName() << ": bad state";
status_cb.Run(PIPELINE_ERROR_ABORT);
return;
@@ -1181,20 +1191,17 @@ void FFmpegDemuxer::OnOpenContextDone(const PipelineStatusCB& status_cb,
// Fully initialize AVFormatContext by parsing the stream a little.
base::PostTaskAndReplyWithResult(
- blocking_thread_.task_runner().get(),
- FROM_HERE,
- base::Bind(&avformat_find_stream_info,
- glue_->format_context(),
+ blocking_task_runner_.get(), FROM_HERE,
+ base::Bind(&avformat_find_stream_info, glue_->format_context(),
static_cast<AVDictionary**>(NULL)),
base::Bind(&FFmpegDemuxer::OnFindStreamInfoDone,
- weak_factory_.GetWeakPtr(),
- status_cb));
+ weak_factory_.GetWeakPtr(), status_cb));
}
void FFmpegDemuxer::OnFindStreamInfoDone(const PipelineStatusCB& status_cb,
int result) {
DCHECK(task_runner_->BelongsToCurrentThread());
- if (!blocking_thread_.IsRunning() || !data_source_) {
+ if (stopped_ || !data_source_) {
MEDIA_LOG(ERROR, media_log_) << GetDisplayName() << ": bad state";
status_cb.Run(PIPELINE_ERROR_ABORT);
return;
@@ -1614,7 +1621,7 @@ void FFmpegDemuxer::OnSeekFrameDone(int result) {
DCHECK(task_runner_->BelongsToCurrentThread());
CHECK(!pending_seek_cb_.is_null());
- if (!blocking_thread_.IsRunning()) {
+ if (stopped_) {
MEDIA_LOG(ERROR, media_log_) << GetDisplayName() << ": bad state";
base::ResetAndReturn(&pending_seek_cb_).Run(PIPELINE_ERROR_ABORT);
return;
@@ -1700,8 +1707,8 @@ void FFmpegDemuxer::ReadFrameIfNeeded() {
DCHECK(task_runner_->BelongsToCurrentThread());
// Make sure we have work to do before reading.
- if (!blocking_thread_.IsRunning() || !StreamsHaveAvailableCapacity() ||
- pending_read_ || !pending_seek_cb_.is_null()) {
+ if (stopped_ || !StreamsHaveAvailableCapacity() || pending_read_ ||
+ !pending_seek_cb_.is_null()) {
return;
}
@@ -1713,11 +1720,9 @@ void FFmpegDemuxer::ReadFrameIfNeeded() {
pending_read_ = true;
base::PostTaskAndReplyWithResult(
- blocking_thread_.task_runner().get(),
- FROM_HERE,
+ blocking_task_runner_.get(), FROM_HERE,
base::Bind(&av_read_frame, glue_->format_context(), packet_ptr),
- base::Bind(&FFmpegDemuxer::OnReadFrameDone,
- weak_factory_.GetWeakPtr(),
+ base::Bind(&FFmpegDemuxer::OnReadFrameDone, weak_factory_.GetWeakPtr(),
base::Passed(&packet)));
}
@@ -1726,7 +1731,7 @@ void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) {
DCHECK(pending_read_);
pending_read_ = false;
- if (!blocking_thread_.IsRunning() || !pending_seek_cb_.is_null())
+ if (stopped_ || !pending_seek_cb_.is_null())
return;
// Consider the stream as ended if:
« no previous file with comments | « media/filters/ffmpeg_demuxer.h ('k') | media/filters/ffmpeg_demuxer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698