Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(159)

Side by Side Diff: media/filters/ffmpeg_demuxer.cc

Issue 131092: FFmpegDemuxerStream::Read() now functions properly while stopped. (Closed)
Patch Set: Albert's comments Created 11 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « media/filters/ffmpeg_demuxer.h ('k') | media/filters/ffmpeg_demuxer_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/scoped_ptr.h" 5 #include "base/scoped_ptr.h"
6 #include "base/stl_util-inl.h"
6 #include "base/string_util.h" 7 #include "base/string_util.h"
7 #include "base/time.h" 8 #include "base/time.h"
8 #include "media/base/filter_host.h" 9 #include "media/base/filter_host.h"
9 #include "media/filters/ffmpeg_common.h" 10 #include "media/filters/ffmpeg_common.h"
10 #include "media/filters/ffmpeg_demuxer.h" 11 #include "media/filters/ffmpeg_demuxer.h"
11 #include "media/filters/ffmpeg_glue.h" 12 #include "media/filters/ffmpeg_glue.h"
12 13
14 namespace {
15
16 // Helper function to deep copy an AVPacket's data, size and timestamps.
17 // Returns NULL if a packet could not be cloned (i.e., out of memory).
18 AVPacket* ClonePacket(AVPacket* packet) {
19 scoped_ptr<AVPacket> clone(new AVPacket());
20 if (!clone.get() || av_new_packet(clone.get(), packet->size) < 0) {
21 return NULL;
22 }
23 DCHECK_EQ(clone->size, packet->size);
24 clone->dts = packet->dts;
25 clone->pts = packet->pts;
26 clone->duration = packet->duration;
27 memcpy(clone->data, packet->data, clone->size);
28 return clone.release();
29 }
30
31 } // namespace
32
13 namespace media { 33 namespace media {
14 34
15 // 35 //
16 // AVPacketBuffer 36 // AVPacketBuffer
17 // 37 //
18 class AVPacketBuffer : public Buffer { 38 class AVPacketBuffer : public Buffer {
19 public: 39 public:
20 AVPacketBuffer(AVPacket* packet, const base::TimeDelta& timestamp, 40 AVPacketBuffer(AVPacket* packet, const base::TimeDelta& timestamp,
21 const base::TimeDelta& duration) 41 const base::TimeDelta& duration)
22 : packet_(packet) { 42 : packet_(packet) {
(...skipping 21 matching lines...) Expand all
44 }; 64 };
45 65
46 66
47 // 67 //
48 // FFmpegDemuxerStream 68 // FFmpegDemuxerStream
49 // 69 //
50 FFmpegDemuxerStream::FFmpegDemuxerStream(FFmpegDemuxer* demuxer, 70 FFmpegDemuxerStream::FFmpegDemuxerStream(FFmpegDemuxer* demuxer,
51 AVStream* stream) 71 AVStream* stream)
52 : demuxer_(demuxer), 72 : demuxer_(demuxer),
53 stream_(stream), 73 stream_(stream),
54 discontinuous_(false) { 74 discontinuous_(false),
75 stopped_(false) {
55 DCHECK(demuxer_); 76 DCHECK(demuxer_);
56 77
57 // Determine our media format. 78 // Determine our media format.
58 switch (stream->codec->codec_type) { 79 switch (stream->codec->codec_type) {
59 case CODEC_TYPE_AUDIO: 80 case CODEC_TYPE_AUDIO:
60 media_format_.SetAsString(MediaFormat::kMimeType, 81 media_format_.SetAsString(MediaFormat::kMimeType,
61 mime_type::kFFmpegAudio); 82 mime_type::kFFmpegAudio);
62 break; 83 break;
63 case CODEC_TYPE_VIDEO: 84 case CODEC_TYPE_VIDEO:
64 media_format_.SetAsString(MediaFormat::kMimeType, 85 media_format_.SetAsString(MediaFormat::kMimeType,
65 mime_type::kFFmpegVideo); 86 mime_type::kFFmpegVideo);
66 break; 87 break;
67 default: 88 default:
68 NOTREACHED(); 89 NOTREACHED();
69 break; 90 break;
70 } 91 }
71 92
72 // Calculate the duration. 93 // Calculate the duration.
73 duration_ = ConvertTimestamp(stream->duration); 94 duration_ = ConvertTimestamp(stream->duration);
74 } 95 }
75 96
76 FFmpegDemuxerStream::~FFmpegDemuxerStream() { 97 FFmpegDemuxerStream::~FFmpegDemuxerStream() {
77 // Since |buffer_queue_| uses scoped_refptr everything will get released. 98 AutoLock auto_lock(lock_);
78 while (!read_queue_.empty()) { 99 DCHECK(stopped_);
79 delete read_queue_.front(); 100 DCHECK(read_queue_.empty());
80 read_queue_.pop_front(); 101 DCHECK(buffer_queue_.empty());
81 }
82 } 102 }
83 103
84 void* FFmpegDemuxerStream::QueryInterface(const char* id) { 104 void* FFmpegDemuxerStream::QueryInterface(const char* id) {
85 DCHECK(id); 105 DCHECK(id);
86 AVStreamProvider* interface_ptr = NULL; 106 AVStreamProvider* interface_ptr = NULL;
87 if (0 == strcmp(id, AVStreamProvider::interface_id())) { 107 if (0 == strcmp(id, AVStreamProvider::interface_id())) {
88 interface_ptr = this; 108 interface_ptr = this;
89 } 109 }
90 return interface_ptr; 110 return interface_ptr;
91 } 111 }
92 112
93 bool FFmpegDemuxerStream::HasPendingReads() { 113 bool FFmpegDemuxerStream::HasPendingReads() {
94 AutoLock auto_lock(lock_); 114 AutoLock auto_lock(lock_);
95 return !read_queue_.empty(); 115 return !read_queue_.empty();
96 } 116 }
97 117
98 base::TimeDelta FFmpegDemuxerStream::EnqueuePacket(AVPacket* packet) { 118 base::TimeDelta FFmpegDemuxerStream::EnqueuePacket(AVPacket* packet) {
99 base::TimeDelta timestamp = ConvertTimestamp(packet->pts); 119 base::TimeDelta timestamp = ConvertTimestamp(packet->pts);
100 base::TimeDelta duration = ConvertTimestamp(packet->duration); 120 base::TimeDelta duration = ConvertTimestamp(packet->duration);
101 Buffer* buffer = new AVPacketBuffer(packet, timestamp, duration); 121 scoped_refptr<Buffer> buffer =
122 new AVPacketBuffer(packet, timestamp, duration);
102 DCHECK(buffer); 123 DCHECK(buffer);
103 { 124 {
104 AutoLock auto_lock(lock_); 125 AutoLock auto_lock(lock_);
126 if (stopped_) {
127 NOTREACHED() << "Attempted to enqueue packet on a stopped stream.";
128 return timestamp;
129 }
105 buffer_queue_.push_back(buffer); 130 buffer_queue_.push_back(buffer);
106 } 131 }
107 FulfillPendingReads(); 132 FulfillPendingReads();
108 return timestamp; 133 return timestamp;
109 } 134 }
110 135
111 void FFmpegDemuxerStream::FlushBuffers() { 136 void FFmpegDemuxerStream::FlushBuffers() {
112 AutoLock auto_lock(lock_); 137 AutoLock auto_lock(lock_);
113 buffer_queue_.clear(); 138 buffer_queue_.clear();
114 discontinuous_ = true; 139 discontinuous_ = true;
115 } 140 }
116 141
142 void FFmpegDemuxerStream::Stop() {
143 // Since |buffer_queue_| can be very large, we swap queues and delete outside
144 // of the lock.
145 BufferQueue tmp_buffer_queue;
146 ReadQueue tmp_read_queue;
147 {
148 AutoLock auto_lock(lock_);
149 buffer_queue_.swap(tmp_buffer_queue);
150 read_queue_.swap(tmp_read_queue);
151 stopped_ = true;
152 }
153 tmp_buffer_queue.clear();
154 STLDeleteElements(&tmp_read_queue);
155 }
156
117 const MediaFormat& FFmpegDemuxerStream::media_format() { 157 const MediaFormat& FFmpegDemuxerStream::media_format() {
118 return media_format_; 158 return media_format_;
119 } 159 }
120 160
121 void FFmpegDemuxerStream::Read(Callback1<Buffer*>::Type* read_callback) { 161 void FFmpegDemuxerStream::Read(Callback1<Buffer*>::Type* read_callback) {
122 DCHECK(read_callback); 162 DCHECK(read_callback);
123 { 163 {
124 AutoLock auto_lock(lock_); 164 AutoLock auto_lock(lock_);
165 // Don't accept any additional reads if we've been told to stop.
166 //
167 // TODO(scherkus): it would be cleaner if we replied with an error message.
168 if (stopped_) {
169 delete read_callback;
170 return;
171 }
172
173 // Otherwise enqueue the request.
125 read_queue_.push_back(read_callback); 174 read_queue_.push_back(read_callback);
126 } 175 }
127 if (FulfillPendingReads()) { 176
177 // See if we can immediately fulfill this read and whether we need to demux.
178 bool post_demux_task = FulfillPendingReads();
179
180 // Since Read() may be executed on any thread, it's possible that StopTask()
181 // finishes executing on the demuxer thread and the message loop goes away.
182 //
183 // To prevent that we'll grab the lock and post a task at the same time, which
184 // will keep the message loop alive.
185 AutoLock auto_lock(lock_);
186 if (post_demux_task && !stopped_) {
128 demuxer_->PostDemuxTask(); 187 demuxer_->PostDemuxTask();
129 } 188 }
130 } 189 }
131 190
132 bool FFmpegDemuxerStream::FulfillPendingReads() { 191 bool FFmpegDemuxerStream::FulfillPendingReads() {
133 bool pending_reads = false; 192 bool pending_reads = false;
134 while (true) { 193 while (true) {
135 scoped_refptr<Buffer> buffer; 194 scoped_refptr<Buffer> buffer;
136 scoped_ptr<Callback1<Buffer*>::Type> read_callback; 195 scoped_ptr<Callback1<Buffer*>::Type> read_callback;
137 { 196 {
(...skipping 17 matching lines...) Expand all
155 } 214 }
156 return pending_reads; 215 return pending_reads;
157 } 216 }
158 217
159 base::TimeDelta FFmpegDemuxerStream::ConvertTimestamp(int64 timestamp) { 218 base::TimeDelta FFmpegDemuxerStream::ConvertTimestamp(int64 timestamp) {
160 AVRational time_base = { 1, base::Time::kMicrosecondsPerSecond }; 219 AVRational time_base = { 1, base::Time::kMicrosecondsPerSecond };
161 int64 microseconds = av_rescale_q(timestamp, stream_->time_base, time_base); 220 int64 microseconds = av_rescale_q(timestamp, stream_->time_base, time_base);
162 return base::TimeDelta::FromMicroseconds(microseconds); 221 return base::TimeDelta::FromMicroseconds(microseconds);
163 } 222 }
164 223
165
166 // 224 //
167 // FFmpegDemuxer 225 // FFmpegDemuxer
168 // 226 //
169 FFmpegDemuxer::FFmpegDemuxer() 227 FFmpegDemuxer::FFmpegDemuxer()
170 : thread_("DemuxerThread") { 228 : thread_("DemuxerThread") {
171 } 229 }
172 230
173 FFmpegDemuxer::~FFmpegDemuxer() { 231 FFmpegDemuxer::~FFmpegDemuxer() {
174 DCHECK(!thread_.IsRunning()); 232 DCHECK(!thread_.IsRunning());
175 DCHECK(!format_context_.get()); 233 DCHECK(!format_context_.get());
176 // TODO(scherkus): I believe we need to use av_close_input_file() here 234 // TODO(scherkus): I believe we need to use av_close_input_file() here
177 // instead of scoped_ptr_malloc calling av_free(). 235 // instead of scoped_ptr_malloc calling av_free().
178 // 236 //
179 // Note that av_close_input_file() doesn't close the codecs so we need to 237 // Note that av_close_input_file() doesn't close the codecs so we need to
180 // figure out who's responsible for closing the them. 238 // figure out who's responsible for closing the them.
181 } 239 }
182 240
183 void FFmpegDemuxer::PostDemuxTask() { 241 void FFmpegDemuxer::PostDemuxTask() {
184 thread_.message_loop()->PostTask(FROM_HERE, 242 thread_.message_loop()->PostTask(FROM_HERE,
185 NewRunnableMethod(this, &FFmpegDemuxer::DemuxTask)); 243 NewRunnableMethod(this, &FFmpegDemuxer::DemuxTask));
186 } 244 }
187 245
188 void FFmpegDemuxer::Stop() { 246 void FFmpegDemuxer::Stop() {
189 thread_.Stop(); 247 // Stop our thread and post a task notifying the streams to stop as well.
248 if (thread_.IsRunning()) {
249 thread_.message_loop()->PostTask(FROM_HERE,
250 NewRunnableMethod(this, &FFmpegDemuxer::StopTask));
251 thread_.Stop();
252 }
190 format_context_.reset(); 253 format_context_.reset();
191 } 254 }
192 255
193 void FFmpegDemuxer::Seek(base::TimeDelta time) { 256 void FFmpegDemuxer::Seek(base::TimeDelta time) {
194 // TODO(hclam): by returning from this method, it is assumed that the seek 257 // TODO(hclam): by returning from this method, it is assumed that the seek
195 // operation is completed and filters behind the demuxer is good to issue 258 // operation is completed and filters behind the demuxer is good to issue
196 // more reads, but we are posting a task here, which makes the seek operation 259 // more reads, but we are posting a task here, which makes the seek operation
197 // asynchronous, should change how seek works to make it fully asynchronous. 260 // asynchronous, should change how seek works to make it fully asynchronous.
198 thread_.message_loop()->PostTask(FROM_HERE, 261 thread_.message_loop()->PostTask(FROM_HERE,
199 NewRunnableMethod(this, &FFmpegDemuxer::SeekTask, time)); 262 NewRunnableMethod(this, &FFmpegDemuxer::SeekTask, time));
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after
357 av_free_packet(packet.get()); 420 av_free_packet(packet.get());
358 } 421 }
359 422
360 // Create a loop by posting another task. This allows seek and message loop 423 // Create a loop by posting another task. This allows seek and message loop
361 // quit tasks to get processed. 424 // quit tasks to get processed.
362 if (StreamsHavePendingReads()) { 425 if (StreamsHavePendingReads()) {
363 PostDemuxTask(); 426 PostDemuxTask();
364 } 427 }
365 } 428 }
366 429
430 void FFmpegDemuxer::StopTask() {
431 StreamVector::iterator iter;
432 for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
433 (*iter)->Stop();
434 }
435 }
436
367 bool FFmpegDemuxer::StreamsHavePendingReads() { 437 bool FFmpegDemuxer::StreamsHavePendingReads() {
438 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
368 StreamVector::iterator iter; 439 StreamVector::iterator iter;
369 for (iter = streams_.begin(); iter != streams_.end(); ++iter) { 440 for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
370 if ((*iter)->HasPendingReads()) { 441 if ((*iter)->HasPendingReads()) {
371 return true; 442 return true;
372 } 443 }
373 } 444 }
374 return false; 445 return false;
375 } 446 }
376 447
377 void FFmpegDemuxer::StreamHasEnded() { 448 void FFmpegDemuxer::StreamHasEnded() {
449 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
378 StreamVector::iterator iter; 450 StreamVector::iterator iter;
379 for (iter = streams_.begin(); iter != streams_.end(); ++iter) { 451 for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
380 AVPacket* packet = new AVPacket(); 452 AVPacket* packet = new AVPacket();
381 memset(packet, 0, sizeof(*packet)); 453 memset(packet, 0, sizeof(*packet));
382 (*iter)->EnqueuePacket(packet); 454 (*iter)->EnqueuePacket(packet);
383 } 455 }
384 } 456 }
385 457
386 AVPacket* FFmpegDemuxer::ClonePacket(AVPacket* packet) {
387 scoped_ptr<AVPacket> clone(new AVPacket());
388 if (!clone.get() || av_new_packet(clone.get(), packet->size) < 0) {
389 return NULL;
390 }
391 DCHECK_EQ(clone->size, packet->size);
392 clone->dts = packet->dts;
393 clone->pts = packet->pts;
394 clone->duration = packet->duration;
395 memcpy(clone->data, packet->data, clone->size);
396 return clone.release();
397 }
398
399 } // namespace media 458 } // namespace media
OLDNEW
« no previous file with comments | « media/filters/ffmpeg_demuxer.h ('k') | media/filters/ffmpeg_demuxer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698