Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1067)

Unified Diff: media/filters/ffmpeg_demuxer.cc

Issue 11420070: Remove locking from FFmpegDemuxerStream and associated TODOs from media code. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/filters/ffmpeg_demuxer.cc
diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc
index 0a690e83464a9e72815cf541492799696e2ceb34..efcd41905ed29470578dbcd9dc360783cc098925 100644
--- a/media/filters/ffmpeg_demuxer.cc
+++ b/media/filters/ffmpeg_demuxer.cc
@@ -35,6 +35,7 @@ FFmpegDemuxerStream::FFmpegDemuxerStream(
FFmpegDemuxer* demuxer,
AVStream* stream)
: demuxer_(demuxer),
+ message_loop_(base::MessageLoopProxy::current()),
stream_(stream),
type_(UNKNOWN),
stopped_(false),
@@ -67,17 +68,15 @@ FFmpegDemuxerStream::FFmpegDemuxerStream(
}
bool FFmpegDemuxerStream::HasPendingReads() {
- DCHECK(demuxer_->message_loop()->BelongsToCurrentThread());
- base::AutoLock auto_lock(lock_);
+ DCHECK(message_loop_->BelongsToCurrentThread());
DCHECK(!stopped_ || read_queue_.empty())
<< "Read queue should have been emptied if demuxing stream is stopped";
return !read_queue_.empty();
}
void FFmpegDemuxerStream::EnqueuePacket(ScopedAVPacket packet) {
- DCHECK(demuxer_->message_loop()->BelongsToCurrentThread());
+ DCHECK(message_loop_->BelongsToCurrentThread());
- base::AutoLock auto_lock(lock_);
if (stopped_) {
NOTREACHED() << "Attempted to enqueue packet on a stopped stream";
return;
@@ -105,28 +104,24 @@ void FFmpegDemuxerStream::EnqueuePacket(ScopedAVPacket packet) {
last_packet_timestamp_ != kNoTimestamp() &&
last_packet_timestamp_ < buffer->GetTimestamp()) {
buffered_ranges_.Add(last_packet_timestamp_, buffer->GetTimestamp());
- demuxer_->message_loop()->PostTask(FROM_HERE, base::Bind(
- &FFmpegDemuxer::NotifyBufferingChanged, demuxer_));
+ demuxer_->NotifyBufferingChanged();
}
last_packet_timestamp_ = buffer->GetTimestamp();
}
buffer_queue_.push_back(buffer);
- FulfillPendingRead();
- return;
+ SatisfyPendingReads();
}
void FFmpegDemuxerStream::FlushBuffers() {
- DCHECK(demuxer_->message_loop()->BelongsToCurrentThread());
- base::AutoLock auto_lock(lock_);
+ DCHECK(message_loop_->BelongsToCurrentThread());
DCHECK(read_queue_.empty()) << "Read requests should be empty";
buffer_queue_.clear();
last_packet_timestamp_ = kNoTimestamp();
}
void FFmpegDemuxerStream::Stop() {
- DCHECK(demuxer_->message_loop()->BelongsToCurrentThread());
- base::AutoLock auto_lock(lock_);
+ DCHECK(message_loop_->BelongsToCurrentThread());
buffer_queue_.clear();
for (ReadQueue::iterator it = read_queue_.begin();
it != read_queue_.end(); ++it) {
@@ -146,76 +141,32 @@ DemuxerStream::Type FFmpegDemuxerStream::type() {
}
void FFmpegDemuxerStream::Read(const ReadCB& read_cb) {
- DCHECK(!read_cb.is_null());
-
- base::AutoLock auto_lock(lock_);
- // Don't accept any additional reads if we've been told to stop.
- // The demuxer_ may have been destroyed in the pipleine thread.
- //
- // TODO(scherkus): it would be cleaner if we replied with an error message.
- if (stopped_) {
- read_cb.Run(DemuxerStream::kOk,
- scoped_refptr<DecoderBuffer>(DecoderBuffer::CreateEOSBuffer()));
- return;
- }
-
- // Buffers are only queued when there are no pending reads.
- DCHECK(buffer_queue_.empty() || read_queue_.empty());
-
- if (buffer_queue_.empty()) {
- demuxer_->message_loop()->PostTask(FROM_HERE, base::Bind(
- &FFmpegDemuxerStream::ReadTask, this, read_cb));
+ if (!message_loop_->BelongsToCurrentThread()) {
+ message_loop_->PostTask(FROM_HERE, base::Bind(
+ &FFmpegDemuxerStream::Read, this, read_cb));
return;
}
- // Send the oldest buffer back.
- scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front();
- buffer_queue_.pop_front();
- read_cb.Run(DemuxerStream::kOk, buffer);
-}
-
-void FFmpegDemuxerStream::ReadTask(const ReadCB& read_cb) {
- DCHECK(demuxer_->message_loop()->BelongsToCurrentThread());
-
- base::AutoLock auto_lock(lock_);
// Don't accept any additional reads if we've been told to stop.
+ // The |demuxer_| may have been destroyed in the pipeline thread.
//
- // TODO(scherkus): it would be cleaner if we replied with an error message.
+ // TODO(scherkus): it would be cleaner to reply with an error message.
if (stopped_) {
read_cb.Run(DemuxerStream::kOk,
scoped_refptr<DecoderBuffer>(DecoderBuffer::CreateEOSBuffer()));
xhwang 2012/11/20 17:54:34 ditto
scherkus (not reviewing) 2012/11/20 19:16:07 Done.
return;
}
- // Enqueue the callback and attempt to satisfy it immediately.
read_queue_.push_back(read_cb);
- FulfillPendingRead();
-
- // Check if there are still pending reads, demux some more.
- if (!read_queue_.empty()) {
- demuxer_->PostDemuxTask();
- }
+ SatisfyPendingReads();
}
-void FFmpegDemuxerStream::FulfillPendingRead() {
- DCHECK(demuxer_->message_loop()->BelongsToCurrentThread());
- lock_.AssertAcquired();
- if (buffer_queue_.empty() || read_queue_.empty()) {
+void FFmpegDemuxerStream::EnableBitstreamConverter() {
+ if (!message_loop_->BelongsToCurrentThread()) {
+ message_loop_->PostTask(FROM_HERE, base::Bind(
+ &FFmpegDemuxerStream::EnableBitstreamConverter, this));
return;
}
-
- // Dequeue a buffer and pending read pair.
- scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front();
- ReadCB read_cb(read_queue_.front());
- buffer_queue_.pop_front();
- read_queue_.pop_front();
-
- // Execute the callback.
- read_cb.Run(DemuxerStream::kOk, buffer);
-}
-
-void FFmpegDemuxerStream::EnableBitstreamConverter() {
- base::AutoLock auto_lock(lock_);
CHECK(bitstream_converter_.get());
bitstream_converter_enabled_ = true;
}
@@ -231,7 +182,6 @@ const VideoDecoderConfig& FFmpegDemuxerStream::video_decoder_config() {
}
FFmpegDemuxerStream::~FFmpegDemuxerStream() {
- base::AutoLock auto_lock(lock_);
DCHECK(stopped_);
DCHECK(read_queue_.empty());
DCHECK(buffer_queue_.empty());
@@ -242,10 +192,27 @@ base::TimeDelta FFmpegDemuxerStream::GetElapsedTime() const {
}
Ranges<base::TimeDelta> FFmpegDemuxerStream::GetBufferedRanges() const {
- base::AutoLock auto_lock(lock_);
return buffered_ranges_;
}
+void FFmpegDemuxerStream::SatisfyPendingReads() {
xhwang 2012/11/20 17:54:34 Add DCHECK(message_loop_->BelongsToCurrentThread()
scherkus (not reviewing) 2012/11/20 19:16:07 Done.
+ while (!read_queue_.empty() && !buffer_queue_.empty()) {
+ ReadCB read_cb = read_queue_.front();
+ read_queue_.pop_front();
+
+ // Send earliest buffer back on a new execution stack to avoid recursing.
+ scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front();
+ buffer_queue_.pop_front();
+ message_loop_->PostTask(FROM_HERE, base::Bind(
+ read_cb, DemuxerStream::kOk, buffer));
+ }
+
+ // No buffers but pending reads? Ask for more!
+ if (!read_queue_.empty() && buffer_queue_.empty()) {
+ demuxer_->NotifyHasPendingRead();
+ }
+}
+
// static
base::TimeDelta FFmpegDemuxerStream::ConvertStreamTimestamp(
const AVRational& time_base, int64 timestamp) {
@@ -277,11 +244,6 @@ FFmpegDemuxer::FFmpegDemuxer(
FFmpegDemuxer::~FFmpegDemuxer() {}
-void FFmpegDemuxer::PostDemuxTask() {
- message_loop_->PostTask(FROM_HERE,
- base::Bind(&FFmpegDemuxer::DemuxTask, this));
-}
-
void FFmpegDemuxer::Stop(const base::Closure& callback) {
// Post a task to notify the streams to stop as well.
message_loop_->PostTask(FROM_HERE,
@@ -329,10 +291,6 @@ base::TimeDelta FFmpegDemuxer::GetStartTime() const {
return start_time_;
}
-scoped_refptr<base::MessageLoopProxy> FFmpegDemuxer::message_loop() {
- return message_loop_;
-}
-
// Helper for calculating the bitrate of the media based on information stored
// in |format_context| or failing that the size and duration of the media.
//
@@ -600,9 +558,6 @@ void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) {
}
// Queue the packet with the appropriate stream.
- // TODO(scherkus): should we post this back to the pipeline thread? I'm
- // worried about downstream filters (i.e., decoders) executing on this
- // thread.
DCHECK_GE(packet->stream_index, 0);
DCHECK_LT(packet->stream_index, static_cast<int>(streams_.size()));
@@ -619,18 +574,13 @@ void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) {
// Create a loop by posting another task. This allows seek and message loop
// quit tasks to get processed.
if (StreamsHavePendingReads()) {
- PostDemuxTask();
+ message_loop_->PostTask(FROM_HERE, base::Bind(
+ &FFmpegDemuxer::DemuxTask, this));
}
}
void FFmpegDemuxer::StopTask(const base::Closure& callback) {
DCHECK(message_loop_->BelongsToCurrentThread());
- StreamVector::iterator iter;
- for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
- if (*iter)
- (*iter)->Stop();
- }
-
url_protocol_.Abort();
data_source_->Stop(BindToLoop(message_loop_, base::Bind(
&FFmpegDemuxer::OnDataSourceStopped, this, callback)));
@@ -643,6 +593,12 @@ void FFmpegDemuxer::OnDataSourceStopped(const base::Closure& callback) {
// thread and drop their results on the floor.
DCHECK(message_loop_->BelongsToCurrentThread());
blocking_thread_.Stop();
+
+ StreamVector::iterator iter;
+ for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
+ if (*iter)
+ (*iter)->Stop();
+ }
callback.Run();
}
@@ -680,6 +636,11 @@ void FFmpegDemuxer::StreamHasEnded() {
}
}
+void FFmpegDemuxer::NotifyHasPendingRead() {
xhwang 2012/11/20 17:54:34 This function is only called once in SatisfyPendin
scherkus (not reviewing) 2012/11/20 19:16:07 I think it's fine keep around -- it's clearer than
+ DCHECK(message_loop_->BelongsToCurrentThread());
+ DemuxTask();
+}
+
void FFmpegDemuxer::NotifyBufferingChanged() {
DCHECK(message_loop_->BelongsToCurrentThread());
Ranges<base::TimeDelta> buffered;

Powered by Google App Engine
This is Rietveld 408576698