Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Side by Side Diff: media/filters/chunk_demuxer.cc

Issue 7203002: Adding ChunkDemuxer implementation. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Unit tests and bug fixes Created 9 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Implements a Demuxer that can switch among different data sources mid-stream.
6 // Uses FFmpegDemuxer under the covers, so see the caveats at the top of
7 // ffmpeg_demuxer.h.
8
9 #include "media/filters/chunk_demuxer.h"
10
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop.h"
14 #include "media/base/filter_host.h"
15 #include "media/base/data_buffer.h"
16 #include "media/ffmpeg/ffmpeg_common.h"
17 #include "media/filters/ffmpeg_glue.h"
18 #include "media/filters/in_memory_url_protocol.h"
19 #include "media/webm/webm_cluster_parser.h"
20 #include "media/webm/webm_constants.h"
21 #include "media/webm/webm_info_parser.h"
22 #include "media/webm/webm_tracks_parser.h"
23
24 namespace media {
25
26 // WebM File Header. This is prepended to the INFO & TRACKS
27 // data passed to Init() before handing it to FFmpeg. Essentially
28 // we are making the INFO & TRACKS data look like a small WebM
29 // file so we can use FFmpeg to initialize the AVFormatContext.
30 //
31 // TODO(acolwell): Remove this once GetAVStream() has been removed from
32 // the DemuxerStream interface.
33 static const uint8 kWebMHeader[] = {
34 0x1A, 0x45, 0xDF, 0xA3, 0x9F, // EBML (size = 0x1f)
35 0x42, 0x86, 0x81, 0x01, // EBMLVersion = 1
36 0x42, 0xF7, 0x81, 0x01, // EBMLReadVersion = 1
37 0x42, 0xF2, 0x81, 0x04, // EBMLMaxIDLength = 4
38 0x42, 0xF3, 0x81, 0x08, // EBMLMaxSizeLength = 8
39 0x42, 0x82, 0x84, 0x77, 0x65, 0x62, 0x6D, // DocType = "webm"
40 0x42, 0x87, 0x81, 0x02, // DocTypeVersion = 2
41 0x42, 0x85, 0x81, 0x02, // DocTypeReadVersion = 2
42 // EBML end
43 0x18, 0x53, 0x80, 0x67, // Segment
44 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // segment(size = 0)
45 // INFO goes here.
46 };
47
48 // Offset of the segment size field in kWebMHeader. Used to update
49 // the segment size field before handing the buffer to FFmpeg.
50 static const int kSegmentSizeOffset = sizeof(kWebMHeader) - 8;
51
52 static const uint8 kEmptyCluster[] = {
53 0x1F, 0x43, 0xB6, 0x75, 0x80 // CLUSTER (size = 0)
54 };
55
56 static Buffer* CreateBuffer(const uint8* data, size_t size) {
57 scoped_array<uint8> buf(new uint8[size]);
58 memcpy(buf.get(), data, size);
59 return new DataBuffer(buf.release(), size);
60 }
61
62 class ChunkDemuxerStream : public DemuxerStream {
63 public:
64 typedef std::deque<scoped_refptr<Buffer> > BufferQueue;
65 typedef std::deque<ReadCallback> ReadCBQueue;
66
67 ChunkDemuxerStream(Type type, AVStream* stream);
68 virtual ~ChunkDemuxerStream();
69
70 void Flush();
71
72 // Checks if it is ok to add the |buffers| to the stream.
73 bool CanAddBuffers(const BufferQueue& buffers) const;
74
75 void AddBuffers(const BufferQueue& buffers);
76 void Shutdown();
77
78 bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const;
79
80 // DemuxerStream methods.
81 virtual void Read(const ReadCallback& read_callback);
82 virtual Type type();
83 virtual const MediaFormat& media_format();
84 virtual void EnableBitstreamConverter();
85 virtual AVStream* GetAVStream();
86
87 private:
88 static void RunCallback(ReadCallback cb, scoped_refptr<Buffer> buffer);
89
90 Type type_;
91 MediaFormat media_format_;
92 AVStream* av_stream_;
93
94 mutable base::Lock lock_;
95 ReadCBQueue read_cbs_;
96 BufferQueue buffers_;
97 bool shutdown_called_;
98
99 // Keeps track of the timestamp of the last buffer we have
100 // added to |buffers_|. This is used to enforce buffers with strictly
101 // monotonically increasing timestamps.
102 scoped_ptr<base::TimeDelta> last_buffer_timestamp_;
scherkus (not reviewing) 2011/06/28 20:03:29 why not use kNoTimestamp instead of a pointer?
acolwell GONE FROM CHROMIUM 2011/06/29 16:38:43 oh yeah.. duh.. Done.
103
104 DISALLOW_IMPLICIT_CONSTRUCTORS(ChunkDemuxerStream);
105 };
106
107 ChunkDemuxerStream::ChunkDemuxerStream(Type type, AVStream* stream)
108 : type_(type),
109 av_stream_(stream),
110 shutdown_called_(false) {
111 }
112
113 ChunkDemuxerStream::~ChunkDemuxerStream() {}
114
115 void ChunkDemuxerStream::Flush() {
116 VLOG(1) << "Flush()";
117 base::AutoLock auto_lock(lock_);
118 buffers_.clear();
119 last_buffer_timestamp_.reset();
120 }
121
122 bool ChunkDemuxerStream::CanAddBuffers(const BufferQueue& buffers) const {
123 base::AutoLock auto_lock(lock_);
124
125 // If we haven't seen any buffers yet than anything can be added.
126 if (!last_buffer_timestamp_.get())
127 return true;
128
129 if (buffers.empty())
130 return true;
131
132 return (buffers.front()->GetTimestamp() > *last_buffer_timestamp_);
133 }
134
135 void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) {
136 if (buffers.empty())
137 return;
138
139 std::deque<base::Closure> callbacks;
140 {
141 base::AutoLock auto_lock(lock_);
142
143 for (BufferQueue::const_iterator itr = buffers.begin();
144 itr != buffers.end(); itr++) {
145
146 base::TimeDelta current_ts = (*itr)->GetTimestamp();
147 if (!last_buffer_timestamp_.get()) {
148 last_buffer_timestamp_.reset(new base::TimeDelta(current_ts));
149 } else {
150 DCHECK_GT(current_ts.ToInternalValue(),
151 last_buffer_timestamp_->ToInternalValue());
152 }
153
154 *last_buffer_timestamp_ = current_ts;
155
156 buffers_.push_back(*itr);
157 }
158
159 while (!buffers_.empty() && !read_cbs_.empty()) {
160 callbacks.push_back(base::Bind(&ChunkDemuxerStream::RunCallback,
161 read_cbs_.front(),
162 buffers_.front()));
163 buffers_.pop_front();
164 read_cbs_.pop_front();
165 }
166 }
167
168 while (!callbacks.empty()) {
169 callbacks.front().Run();
170 callbacks.pop_front();
171 }
172 }
173
174 void ChunkDemuxerStream::Shutdown() {
175 std::deque<ReadCallback> callbacks;
176 {
177 base::AutoLock auto_lock(lock_);
178 shutdown_called_ = true;
179
180 // Collect all the pending Read() callbacks.
181 while (!read_cbs_.empty()) {
182 callbacks.push_back(read_cbs_.front());
183 read_cbs_.pop_front();
184 }
185 }
186
187 // Pass NULL to all callbacks to signify read failure.
188 while (!callbacks.empty()) {
189 callbacks.front().Run(NULL);
190 callbacks.pop_front();
191 }
192 }
193
194 bool ChunkDemuxerStream::GetLastBufferTimestamp(
195 base::TimeDelta* timestamp) const {
196 base::AutoLock auto_lock(lock_);
197
198 if (buffers_.empty())
199 return false;
200
201 *timestamp = buffers_.back()->GetTimestamp();
202 return true;
203 }
204
205 // Helper function used to make Closures for ReadCallbacks.
206 //static
207 void ChunkDemuxerStream::RunCallback(ReadCallback cb,
208 scoped_refptr<Buffer> buffer) {
209 cb.Run(buffer);
210 }
211
212 // Helper function that makes sure |read_callback| runs on |message_loop|.
213 static void RunOnMessageLoop(const DemuxerStream::ReadCallback& read_callback,
214 MessageLoop* message_loop,
215 Buffer* buffer) {
216 if (MessageLoop::current() != message_loop) {
217 message_loop->PostTask(FROM_HERE,
218 NewRunnableFunction(&RunOnMessageLoop,
219 read_callback,
220 message_loop,
221 scoped_refptr<Buffer>(buffer)));
222 return;
223 }
224
225 read_callback.Run(buffer);
226 }
227
228 // DemuxerStream methods.
229 void ChunkDemuxerStream::Read(const ReadCallback& read_callback) {
230 scoped_refptr<Buffer> buffer;
231
232 {
233 base::AutoLock auto_lock(lock_);
234
235 if (!shutdown_called_) {
236 if (buffers_.empty()) {
237 // Wrap & store |read_callback| so that it will
238 // get called on the current MessageLoop.
239 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
240 read_callback,
241 MessageLoop::current()));
242 return;
243 }
244
245 if (!read_cbs_.empty()) {
246 // Wrap & store |read_callback| so that it will
247 // get called on the current MessageLoop.
248 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
249 read_callback,
250 MessageLoop::current()));
251 return;
252 }
253
254 buffer = buffers_.front();
255 buffers_.pop_front();
256 }
257 }
258
259 read_callback.Run(buffer);
260 }
261
262 DemuxerStream::Type ChunkDemuxerStream::type() { return type_; }
263
264 const MediaFormat& ChunkDemuxerStream::media_format() { return media_format_; }
265
266 void ChunkDemuxerStream::EnableBitstreamConverter() {}
267
268 AVStream* ChunkDemuxerStream::GetAVStream() { return av_stream_; }
269
270 ChunkDemuxer::ChunkDemuxer()
271 : state_(WAITING_FOR_INIT),
272 format_context_(NULL),
273 buffered_bytes_(0),
274 seek_waits_for_data_(true) {
275 }
276
277 ChunkDemuxer::~ChunkDemuxer() {
278 DCHECK_NE(state_, INITIALIZED);
279
280 if (!format_context_)
281 return;
282
283 DestroyAVFormatContext(format_context_);
284 format_context_ = NULL;
285 }
286
287 bool ChunkDemuxer::Init(const uint8* data, int size) {
288 DCHECK(data);
289 DCHECK_GT(size, 0);
290
291 base::AutoLock auto_lock(lock_);
292 DCHECK_EQ(state_, WAITING_FOR_INIT);
293
294 const uint8* cur = data;
295 int cur_size = size;
296 WebMInfoParser info_parser;
297 int res = info_parser.Parse(cur, cur_size);
298
299 if (res <= 0) {
300 ChangeState(INIT_ERROR);
301 return false;
302 }
303
304 cur += res;
305 cur_size -= res;
306
307 WebMTracksParser tracks_parser(info_parser.timecode_scale());
308 res = tracks_parser.Parse(cur, cur_size);
309
310 if (res <= 0) {
311 ChangeState(INIT_ERROR);
312 return false;
313 }
314
315 double mult = info_parser.timecode_scale() / 1000.0;
316 duration_ = base::TimeDelta::FromMicroseconds(info_parser.duration() * mult);
317
318 cluster_parser_.reset(new WebMClusterParser(
319 info_parser.timecode_scale(),
320 tracks_parser.audio_track_num(),
321 tracks_parser.audio_default_duration(),
322 tracks_parser.video_track_num(),
323 tracks_parser.video_default_duration()));
324
325 format_context_ = CreateFormatContext(data, size);
326
327 if (!format_context_ || !SetupStreams() || !ParsePendingBuffers()) {
328 ChangeState(INIT_ERROR);
329 return false;
330 }
331
332 ChangeState(INITIALIZED);
333 return true;
334 }
335
336 // Filter implementation.
337 void ChunkDemuxer::set_host(FilterHost* filter_host) {
338 Demuxer::set_host(filter_host);
339 filter_host->SetDuration(duration_);
340 filter_host->SetCurrentReadPosition(0);
341 }
342
343 void ChunkDemuxer::Stop(FilterCallback* callback) {
344 VLOG(1) << "Stop()";
345
346 callback->Run();
347 delete callback;
348 }
349
350 void ChunkDemuxer::Seek(base::TimeDelta time, const FilterStatusCB& cb) {
351 VLOG(1) << "Seek(" << time.InSecondsF() << ")";
352
353 {
354 base::AutoLock auto_lock(lock_);
355
356 if (seek_waits_for_data_) {
357 seek_cb_ = cb;
358 return;
359 }
360 }
361
362 cb.Run(PIPELINE_OK);
363 }
364
365 void ChunkDemuxer::OnAudioRendererDisabled() {
366 base::AutoLock auto_lock(lock_);
367 audio_ = NULL;
368 }
369
370 void ChunkDemuxer::SetPreload(Preload preload) {}
371
372 // Demuxer implementation.
373 scoped_refptr<DemuxerStream> ChunkDemuxer::GetStream(
374 DemuxerStream::Type type) {
375 if (type == DemuxerStream::VIDEO)
376 return video_;
377
378 if (type == DemuxerStream::AUDIO)
379 return audio_;
380
381 return NULL;
382 }
383
384 base::TimeDelta ChunkDemuxer::GetStartTime() const {
385 VLOG(1) << "GetStartTime()";
386 // TODO(acolwell) : Fix this so it uses the time on the first packet.
387 return base::TimeDelta();
388 }
389
390 void ChunkDemuxer::FlushData() {
391 base::AutoLock auto_lock(lock_);
392 if (audio_.get())
393 audio_->Flush();
394
395 if (video_.get())
396 video_->Flush();
397
398 pending_buffers_.clear();
399 seek_waits_for_data_ = true;
400 }
401
402 bool ChunkDemuxer::AddData(const uint8* data, unsigned length) {
403 VLOG(1) << "AddData(" << length << ")";
404 DCHECK(data);
405 DCHECK_GT(length, 0);
406
407 int64 buffered_bytes = 0;
408 base::TimeDelta buffered_ts = base::TimeDelta::FromSeconds(-1);
409
410 FilterStatusCB cb;
411 {
412 base::AutoLock auto_lock(lock_);
413
414 switch(state_) {
415 case WAITING_FOR_INIT:
416 pending_buffers_.push_back(CreateBuffer(data, length));
417 return true;
418 break;
419
420 case INITIALIZED:
421 if (!ParseAndAddData_Locked(data, length)) {
422 VLOG(1) << "AddData(): parsing data failed";
423 return false;
424 }
425 break;
426
427 case INIT_ERROR:
428 case SHUTDOWN:
429 VLOG(1) << "AddData(): called in unexpected state " << state_;
430 return false;
431 break;
432 }
433
434 seek_waits_for_data_ = false;
435
436 base::TimeDelta tmp;
437 if (audio_.get() && audio_->GetLastBufferTimestamp(&tmp) &&
438 tmp > buffered_ts) {
439 buffered_ts = tmp;
440 }
441
442 if (video_.get() && video_->GetLastBufferTimestamp(&tmp) &&
443 tmp > buffered_ts) {
444 buffered_ts = tmp;
445 }
446
447 buffered_bytes = buffered_bytes_;
448
449 if (!seek_cb_.is_null())
450 std::swap(cb, seek_cb_);
451 }
452
453 // Notify the host of 'network activity' because we got data.
454 if (host()) {
455 host()->SetBufferedBytes(buffered_bytes);
456
457 if (buffered_ts.InSeconds() >= 0) {
458 host()->SetBufferedTime(buffered_ts);
459 }
460
461 host()->SetNetworkActivity(true);
462 }
463
464 if (!cb.is_null())
465 cb.Run(PIPELINE_OK);
466
467 return true;
468 }
469
470 void ChunkDemuxer::Shutdown() {
471 FilterStatusCB cb;
472 {
473 base::AutoLock auto_lock(lock_);
474
475 std::swap(cb, seek_cb_);
476
477 if (audio_.get())
478 audio_->Shutdown();
479
480 if (video_.get())
481 video_->Shutdown();
482
483 ChangeState(SHUTDOWN);
484 }
485
486 if (!cb.is_null())
487 cb.Run(PIPELINE_ERROR_ABORT);
488 }
489
490 void ChunkDemuxer::ChangeState(State new_state) {
491 lock_.AssertAcquired();
492 state_ = new_state;
493 }
494
495 AVFormatContext* ChunkDemuxer::CreateFormatContext(const uint8* data,
496 int size) const {
497 int segment_size = size + sizeof(kEmptyCluster);
498 int buf_size = sizeof(kWebMHeader) + segment_size;
499 scoped_array<uint8> buf(new uint8[buf_size]);
500 memcpy(buf.get(), kWebMHeader, sizeof(kWebMHeader));
501 memcpy(buf.get() + sizeof(kWebMHeader), data, size);
502 memcpy(buf.get() + sizeof(kWebMHeader) + size, kEmptyCluster,
503 sizeof(kEmptyCluster));
504
505 // Update the segment size in the buffer.
506 int64 tmp = (segment_size & GG_LONGLONG(0x00FFFFFFFFFFFFFF)) |
507 GG_LONGLONG(0x0100000000000000);
508 for (int i = 0; i < 8; i++) {
509 buf[kSegmentSizeOffset + i] = (tmp >> (8 * (7 - i))) & 0xff;
510 }
511
512 InMemoryUrlProtocol imup(buf.get(), buf_size, true);
513 std::string key = FFmpegGlue::GetInstance()->AddProtocol(&imup);
514
515 // Open FFmpeg AVFormatContext.
516 AVFormatContext* context = NULL;
517 int result = av_open_input_file(&context, key.c_str(), NULL, 0, NULL);
518
519 // Remove ourself from protocol list.
520 FFmpegGlue::GetInstance()->RemoveProtocol(&imup);
521
522 if (result < 0)
523 return NULL;
524
525 return context;
526 }
527
528 bool ChunkDemuxer::SetupStreams() {
529 int result = av_find_stream_info(format_context_);
530
531 if (result < 0)
532 return false;
533
534 bool no_supported_streams = true;
535 for (size_t i = 0; i < format_context_->nb_streams; ++i) {
536 AVStream* stream = format_context_->streams[i];
537 AVCodecContext* codec_context = stream->codec;
538 CodecType codec_type = codec_context->codec_type;
539
540 if (codec_type == CODEC_TYPE_AUDIO &&
541 stream->codec->codec_id == CODEC_ID_VORBIS &&
542 !audio_.get()) {
543 audio_ = new ChunkDemuxerStream(DemuxerStream::AUDIO, stream);
544 no_supported_streams = false;
545 continue;
546 }
547
548 if (codec_type == CODEC_TYPE_VIDEO &&
549 stream->codec->codec_id == CODEC_ID_VP8 &&
550 !video_.get()) {
551 video_ = new ChunkDemuxerStream(DemuxerStream::VIDEO, stream);
552 no_supported_streams = false;
553 continue;
554 }
555 }
556
557 return !no_supported_streams;
558 }
559
560 bool ChunkDemuxer::ParsePendingBuffers() {
561 bool had_pending_buffers = !pending_buffers_.empty();
562 // Handle any buffers that came in between the time the pipeline was
563 // started and Init() was called.
564 while(!pending_buffers_.empty()) {
565 scoped_refptr<media::Buffer> buf = pending_buffers_.front();
566 pending_buffers_.pop_front();
567
568 if (!ParseAndAddData_Locked(buf->GetData(), buf->GetDataSize())) {
569 pending_buffers_.clear();
570 ChangeState(INIT_ERROR);
571 return false;
572 }
573 }
574
575 seek_waits_for_data_ = !had_pending_buffers;
576 return true;
577 }
578
579 bool ChunkDemuxer::ParseAndAddData_Locked(const uint8* data, int length) {
580 if (!cluster_parser_.get())
581 return false;
582
583 const uint8* cur = data;
584 int cur_size = length;
585
586 while (cur_size > 0) {
587 int res = cluster_parser_->Parse(cur, cur_size);
588
589 if (res <= 0) {
590 VLOG(1) << "ParseAndAddData_Locked() : cluster parsing failed.";
591 return false;
592 }
593
594 // Make sure we can add the buffers to both streams before we acutally
595 // add them. This allows us to accept all of the data or none of it.
596 if ((audio_.get() &&
597 !audio_->CanAddBuffers(cluster_parser_->audio_buffers())) ||
598 (video_.get() &&
599 !video_->CanAddBuffers(cluster_parser_->video_buffers()))) {
600 return false;
601 }
602
603 if (audio_.get())
604 audio_->AddBuffers(cluster_parser_->audio_buffers());
605
606 if (video_.get())
607 video_->AddBuffers(cluster_parser_->video_buffers());
608
609 cur += res;
610 cur_size -= res;
611 }
612
613 // TODO(acolwell) : make this more representative of what is actually
614 // buffered.
615 buffered_bytes_ += length;
616
617 return true;
618 }
619
620 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698