Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(143)

Side by Side Diff: media/filters/chunk_demuxer.cc

Issue 7203002: Adding ChunkDemuxer implementation. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fixed unit tests and nits Created 9 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « media/filters/chunk_demuxer.h ('k') | media/filters/chunk_demuxer_factory.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Implements a Demuxer that can switch among different data sources mid-stream.
6 // Uses FFmpegDemuxer under the covers, so see the caveats at the top of
7 // ffmpeg_demuxer.h.
8
9 #include "media/filters/chunk_demuxer.h"
10
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop.h"
14 #include "media/base/filter_host.h"
15 #include "media/base/data_buffer.h"
16 #include "media/ffmpeg/ffmpeg_common.h"
17 #include "media/filters/ffmpeg_glue.h"
18 #include "media/filters/in_memory_url_protocol.h"
19 #include "media/webm/webm_cluster_parser.h"
20 #include "media/webm/webm_constants.h"
21 #include "media/webm/webm_info_parser.h"
22 #include "media/webm/webm_tracks_parser.h"
23
24 namespace media {
25
26 // WebM File Header. This is prepended to the INFO & TRACKS
27 // data passed to Init() before handing it to FFmpeg. Essentially
28 // we are making the INFO & TRACKS data look like a small WebM
29 // file so we can use FFmpeg to initialize the AVFormatContext.
30 //
31 // TODO(acolwell): Remove this once GetAVStream() has been removed from
32 // the DemuxerStream interface.
33 static const uint8 kWebMHeader[] = {
34 0x1A, 0x45, 0xDF, 0xA3, 0x9F, // EBML (size = 0x1f)
35 0x42, 0x86, 0x81, 0x01, // EBMLVersion = 1
36 0x42, 0xF7, 0x81, 0x01, // EBMLReadVersion = 1
37 0x42, 0xF2, 0x81, 0x04, // EBMLMaxIDLength = 4
38 0x42, 0xF3, 0x81, 0x08, // EBMLMaxSizeLength = 8
39 0x42, 0x82, 0x84, 0x77, 0x65, 0x62, 0x6D, // DocType = "webm"
40 0x42, 0x87, 0x81, 0x02, // DocTypeVersion = 2
41 0x42, 0x85, 0x81, 0x02, // DocTypeReadVersion = 2
42 // EBML end
43 0x18, 0x53, 0x80, 0x67, // Segment
44 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // segment(size = 0)
45 // INFO goes here.
46 };
47
48 // Offset of the segment size field in kWebMHeader. Used to update
49 // the segment size field before handing the buffer to FFmpeg.
50 static const int kSegmentSizeOffset = sizeof(kWebMHeader) - 8;
51
52 static const uint8 kEmptyCluster[] = {
53 0x1F, 0x43, 0xB6, 0x75, 0x80 // CLUSTER (size = 0)
54 };
55
56 static Buffer* CreateBuffer(const uint8* data, size_t size) {
57 scoped_array<uint8> buf(new uint8[size]);
58 memcpy(buf.get(), data, size);
59 return new DataBuffer(buf.release(), size);
60 }
61
62 class ChunkDemuxerStream : public DemuxerStream {
63 public:
64 typedef std::deque<scoped_refptr<Buffer> > BufferQueue;
65 typedef std::deque<ReadCallback> ReadCBQueue;
66
67 ChunkDemuxerStream(Type type, AVStream* stream);
68 virtual ~ChunkDemuxerStream();
69
70 void Flush();
71
72 // Checks if it is ok to add the |buffers| to the stream.
73 bool CanAddBuffers(const BufferQueue& buffers) const;
74
75 void AddBuffers(const BufferQueue& buffers);
76 void Shutdown();
77
78 bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const;
79
80 // DemuxerStream methods.
81 virtual void Read(const ReadCallback& read_callback);
82 virtual Type type();
83 virtual const MediaFormat& media_format();
84 virtual void EnableBitstreamConverter();
85 virtual AVStream* GetAVStream();
86
87 private:
88 static void RunCallback(ReadCallback cb, scoped_refptr<Buffer> buffer);
89
90 Type type_;
91 MediaFormat media_format_;
92 AVStream* av_stream_;
93
94 mutable base::Lock lock_;
95 ReadCBQueue read_cbs_;
96 BufferQueue buffers_;
97 bool shutdown_called_;
98
99 // Keeps track of the timestamp of the last buffer we have
100 // added to |buffers_|. This is used to enforce buffers with strictly
101 // monotonically increasing timestamps.
102 base::TimeDelta last_buffer_timestamp_;
103
104 DISALLOW_IMPLICIT_CONSTRUCTORS(ChunkDemuxerStream);
105 };
106
107 ChunkDemuxerStream::ChunkDemuxerStream(Type type, AVStream* stream)
108 : type_(type),
109 av_stream_(stream),
110 shutdown_called_(false),
111 last_buffer_timestamp_(kNoTimestamp) {
112 }
113
114 ChunkDemuxerStream::~ChunkDemuxerStream() {}
115
116 void ChunkDemuxerStream::Flush() {
117 VLOG(1) << "Flush()";
118 base::AutoLock auto_lock(lock_);
119 buffers_.clear();
120 last_buffer_timestamp_ = kNoTimestamp;
121 }
122
123 bool ChunkDemuxerStream::CanAddBuffers(const BufferQueue& buffers) const {
124 base::AutoLock auto_lock(lock_);
125
126 // If we haven't seen any buffers yet than anything can be added.
127 if (last_buffer_timestamp_ == kNoTimestamp)
128 return true;
129
130 if (buffers.empty())
131 return true;
132
133 return (buffers.front()->GetTimestamp() > last_buffer_timestamp_);
134 }
135
136 void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) {
137 if (buffers.empty())
138 return;
139
140 std::deque<base::Closure> callbacks;
141 {
142 base::AutoLock auto_lock(lock_);
143
144 for (BufferQueue::const_iterator itr = buffers.begin();
145 itr != buffers.end(); itr++) {
146
147 base::TimeDelta current_ts = (*itr)->GetTimestamp();
148 if (last_buffer_timestamp_ != kNoTimestamp) {
149 DCHECK_GT(current_ts.ToInternalValue(),
150 last_buffer_timestamp_.ToInternalValue());
151 }
152
153 last_buffer_timestamp_ = current_ts;
154
155 buffers_.push_back(*itr);
156 }
157
158 while (!buffers_.empty() && !read_cbs_.empty()) {
159 callbacks.push_back(base::Bind(&ChunkDemuxerStream::RunCallback,
160 read_cbs_.front(),
161 buffers_.front()));
162 buffers_.pop_front();
163 read_cbs_.pop_front();
164 }
165 }
166
167 while (!callbacks.empty()) {
168 callbacks.front().Run();
169 callbacks.pop_front();
170 }
171 }
172
173 void ChunkDemuxerStream::Shutdown() {
174 std::deque<ReadCallback> callbacks;
175 {
176 base::AutoLock auto_lock(lock_);
177 shutdown_called_ = true;
178
179 // Collect all the pending Read() callbacks.
180 while (!read_cbs_.empty()) {
181 callbacks.push_back(read_cbs_.front());
182 read_cbs_.pop_front();
183 }
184 }
185
186 // Pass NULL to all callbacks to signify read failure.
187 while (!callbacks.empty()) {
188 callbacks.front().Run(NULL);
189 callbacks.pop_front();
190 }
191 }
192
193 bool ChunkDemuxerStream::GetLastBufferTimestamp(
194 base::TimeDelta* timestamp) const {
195 base::AutoLock auto_lock(lock_);
196
197 if (buffers_.empty())
198 return false;
199
200 *timestamp = buffers_.back()->GetTimestamp();
201 return true;
202 }
203
204 // Helper function used to make Closures for ReadCallbacks.
205 //static
206 void ChunkDemuxerStream::RunCallback(ReadCallback cb,
207 scoped_refptr<Buffer> buffer) {
208 cb.Run(buffer);
209 }
210
211 // Helper function that makes sure |read_callback| runs on |message_loop|.
212 static void RunOnMessageLoop(const DemuxerStream::ReadCallback& read_callback,
213 MessageLoop* message_loop,
214 Buffer* buffer) {
215 if (MessageLoop::current() != message_loop) {
216 message_loop->PostTask(FROM_HERE,
217 NewRunnableFunction(&RunOnMessageLoop,
218 read_callback,
219 message_loop,
220 scoped_refptr<Buffer>(buffer)));
221 return;
222 }
223
224 read_callback.Run(buffer);
225 }
226
227 // DemuxerStream methods.
228 void ChunkDemuxerStream::Read(const ReadCallback& read_callback) {
229 scoped_refptr<Buffer> buffer;
230
231 {
232 base::AutoLock auto_lock(lock_);
233
234 if (!shutdown_called_) {
235 if (buffers_.empty()) {
236 // Wrap & store |read_callback| so that it will
237 // get called on the current MessageLoop.
238 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
239 read_callback,
240 MessageLoop::current()));
241 return;
242 }
243
244 if (!read_cbs_.empty()) {
245 // Wrap & store |read_callback| so that it will
246 // get called on the current MessageLoop.
247 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
248 read_callback,
249 MessageLoop::current()));
250 return;
251 }
252
253 buffer = buffers_.front();
254 buffers_.pop_front();
255 }
256 }
257
258 read_callback.Run(buffer);
259 }
260
261 DemuxerStream::Type ChunkDemuxerStream::type() { return type_; }
262
263 const MediaFormat& ChunkDemuxerStream::media_format() { return media_format_; }
264
265 void ChunkDemuxerStream::EnableBitstreamConverter() {}
266
267 AVStream* ChunkDemuxerStream::GetAVStream() { return av_stream_; }
268
269 ChunkDemuxer::ChunkDemuxer()
270 : state_(WAITING_FOR_INIT),
271 format_context_(NULL),
272 buffered_bytes_(0),
273 seek_waits_for_data_(true) {
274 }
275
276 ChunkDemuxer::~ChunkDemuxer() {
277 DCHECK_NE(state_, INITIALIZED);
278
279 if (!format_context_)
280 return;
281
282 DestroyAVFormatContext(format_context_);
283 format_context_ = NULL;
284 }
285
286 bool ChunkDemuxer::Init(const uint8* data, int size) {
287 DCHECK(data);
288 DCHECK_GT(size, 0);
289
290 base::AutoLock auto_lock(lock_);
291 DCHECK_EQ(state_, WAITING_FOR_INIT);
292
293 const uint8* cur = data;
294 int cur_size = size;
295 WebMInfoParser info_parser;
296 int res = info_parser.Parse(cur, cur_size);
297
298 if (res <= 0) {
299 ChangeState(INIT_ERROR);
300 return false;
301 }
302
303 cur += res;
304 cur_size -= res;
305
306 WebMTracksParser tracks_parser(info_parser.timecode_scale());
307 res = tracks_parser.Parse(cur, cur_size);
308
309 if (res <= 0) {
310 ChangeState(INIT_ERROR);
311 return false;
312 }
313
314 double mult = info_parser.timecode_scale() / 1000.0;
315 duration_ = base::TimeDelta::FromMicroseconds(info_parser.duration() * mult);
316
317 cluster_parser_.reset(new WebMClusterParser(
318 info_parser.timecode_scale(),
319 tracks_parser.audio_track_num(),
320 tracks_parser.audio_default_duration(),
321 tracks_parser.video_track_num(),
322 tracks_parser.video_default_duration()));
323
324 format_context_ = CreateFormatContext(data, size);
325
326 if (!format_context_ || !SetupStreams() || !ParsePendingBuffers()) {
327 ChangeState(INIT_ERROR);
328 return false;
329 }
330
331 ChangeState(INITIALIZED);
332 return true;
333 }
334
335 // Filter implementation.
336 void ChunkDemuxer::set_host(FilterHost* filter_host) {
337 Demuxer::set_host(filter_host);
338 filter_host->SetDuration(duration_);
339 filter_host->SetCurrentReadPosition(0);
340 }
341
342 void ChunkDemuxer::Stop(FilterCallback* callback) {
343 VLOG(1) << "Stop()";
344
345 callback->Run();
346 delete callback;
347 }
348
349 void ChunkDemuxer::Seek(base::TimeDelta time, const FilterStatusCB& cb) {
350 VLOG(1) << "Seek(" << time.InSecondsF() << ")";
351
352 {
353 base::AutoLock auto_lock(lock_);
354
355 if (seek_waits_for_data_) {
356 seek_cb_ = cb;
357 return;
358 }
359 }
360
361 cb.Run(PIPELINE_OK);
362 }
363
364 void ChunkDemuxer::OnAudioRendererDisabled() {
365 base::AutoLock auto_lock(lock_);
366 audio_ = NULL;
367 }
368
369 void ChunkDemuxer::SetPreload(Preload preload) {}
370
371 // Demuxer implementation.
372 scoped_refptr<DemuxerStream> ChunkDemuxer::GetStream(
373 DemuxerStream::Type type) {
374 if (type == DemuxerStream::VIDEO)
375 return video_;
376
377 if (type == DemuxerStream::AUDIO)
378 return audio_;
379
380 return NULL;
381 }
382
383 base::TimeDelta ChunkDemuxer::GetStartTime() const {
384 VLOG(1) << "GetStartTime()";
385 // TODO(acolwell) : Fix this so it uses the time on the first packet.
386 return base::TimeDelta();
387 }
388
389 void ChunkDemuxer::FlushData() {
390 base::AutoLock auto_lock(lock_);
391 if (audio_.get())
392 audio_->Flush();
393
394 if (video_.get())
395 video_->Flush();
396
397 pending_buffers_.clear();
398 seek_waits_for_data_ = true;
399 }
400
401 bool ChunkDemuxer::AddData(const uint8* data, unsigned length) {
402 VLOG(1) << "AddData(" << length << ")";
403 DCHECK(data);
404 DCHECK_GT(length, 0u);
405
406 int64 buffered_bytes = 0;
407 base::TimeDelta buffered_ts = base::TimeDelta::FromSeconds(-1);
408
409 FilterStatusCB cb;
410 {
411 base::AutoLock auto_lock(lock_);
412
413 switch(state_) {
414 case WAITING_FOR_INIT:
415 pending_buffers_.push_back(CreateBuffer(data, length));
416 return true;
417 break;
418
419 case INITIALIZED:
420 if (!ParseAndAddData_Locked(data, length)) {
421 VLOG(1) << "AddData(): parsing data failed";
422 return false;
423 }
424 break;
425
426 case INIT_ERROR:
427 case SHUTDOWN:
428 VLOG(1) << "AddData(): called in unexpected state " << state_;
429 return false;
430 break;
431 }
432
433 seek_waits_for_data_ = false;
434
435 base::TimeDelta tmp;
436 if (audio_.get() && audio_->GetLastBufferTimestamp(&tmp) &&
437 tmp > buffered_ts) {
438 buffered_ts = tmp;
439 }
440
441 if (video_.get() && video_->GetLastBufferTimestamp(&tmp) &&
442 tmp > buffered_ts) {
443 buffered_ts = tmp;
444 }
445
446 buffered_bytes = buffered_bytes_;
447
448 if (!seek_cb_.is_null())
449 std::swap(cb, seek_cb_);
450 }
451
452 // Notify the host of 'network activity' because we got data.
453 if (host()) {
454 host()->SetBufferedBytes(buffered_bytes);
455
456 if (buffered_ts.InSeconds() >= 0) {
457 host()->SetBufferedTime(buffered_ts);
458 }
459
460 host()->SetNetworkActivity(true);
461 }
462
463 if (!cb.is_null())
464 cb.Run(PIPELINE_OK);
465
466 return true;
467 }
468
469 void ChunkDemuxer::Shutdown() {
470 FilterStatusCB cb;
471 {
472 base::AutoLock auto_lock(lock_);
473
474 std::swap(cb, seek_cb_);
475
476 if (audio_.get())
477 audio_->Shutdown();
478
479 if (video_.get())
480 video_->Shutdown();
481
482 ChangeState(SHUTDOWN);
483 }
484
485 if (!cb.is_null())
486 cb.Run(PIPELINE_ERROR_ABORT);
487 }
488
489 void ChunkDemuxer::ChangeState(State new_state) {
490 lock_.AssertAcquired();
491 state_ = new_state;
492 }
493
494 AVFormatContext* ChunkDemuxer::CreateFormatContext(const uint8* data,
495 int size) const {
496 int segment_size = size + sizeof(kEmptyCluster);
497 int buf_size = sizeof(kWebMHeader) + segment_size;
498 scoped_array<uint8> buf(new uint8[buf_size]);
499 memcpy(buf.get(), kWebMHeader, sizeof(kWebMHeader));
500 memcpy(buf.get() + sizeof(kWebMHeader), data, size);
501 memcpy(buf.get() + sizeof(kWebMHeader) + size, kEmptyCluster,
502 sizeof(kEmptyCluster));
503
504 // Update the segment size in the buffer.
505 int64 tmp = (segment_size & GG_LONGLONG(0x00FFFFFFFFFFFFFF)) |
506 GG_LONGLONG(0x0100000000000000);
507 for (int i = 0; i < 8; i++) {
508 buf[kSegmentSizeOffset + i] = (tmp >> (8 * (7 - i))) & 0xff;
509 }
510
511 InMemoryUrlProtocol imup(buf.get(), buf_size, true);
512 std::string key = FFmpegGlue::GetInstance()->AddProtocol(&imup);
513
514 // Open FFmpeg AVFormatContext.
515 AVFormatContext* context = NULL;
516 int result = av_open_input_file(&context, key.c_str(), NULL, 0, NULL);
517
518 // Remove ourself from protocol list.
519 FFmpegGlue::GetInstance()->RemoveProtocol(&imup);
520
521 if (result < 0)
522 return NULL;
523
524 return context;
525 }
526
527 bool ChunkDemuxer::SetupStreams() {
528 int result = av_find_stream_info(format_context_);
529
530 if (result < 0)
531 return false;
532
533 bool no_supported_streams = true;
534 for (size_t i = 0; i < format_context_->nb_streams; ++i) {
535 AVStream* stream = format_context_->streams[i];
536 AVCodecContext* codec_context = stream->codec;
537 CodecType codec_type = codec_context->codec_type;
538
539 if (codec_type == CODEC_TYPE_AUDIO &&
540 stream->codec->codec_id == CODEC_ID_VORBIS &&
541 !audio_.get()) {
542 audio_ = new ChunkDemuxerStream(DemuxerStream::AUDIO, stream);
543 no_supported_streams = false;
544 continue;
545 }
546
547 if (codec_type == CODEC_TYPE_VIDEO &&
548 stream->codec->codec_id == CODEC_ID_VP8 &&
549 !video_.get()) {
550 video_ = new ChunkDemuxerStream(DemuxerStream::VIDEO, stream);
551 no_supported_streams = false;
552 continue;
553 }
554 }
555
556 return !no_supported_streams;
557 }
558
559 bool ChunkDemuxer::ParsePendingBuffers() {
560 bool had_pending_buffers = !pending_buffers_.empty();
561 // Handle any buffers that came in between the time the pipeline was
562 // started and Init() was called.
563 while(!pending_buffers_.empty()) {
564 scoped_refptr<media::Buffer> buf = pending_buffers_.front();
565 pending_buffers_.pop_front();
566
567 if (!ParseAndAddData_Locked(buf->GetData(), buf->GetDataSize())) {
568 pending_buffers_.clear();
569 ChangeState(INIT_ERROR);
570 return false;
571 }
572 }
573
574 seek_waits_for_data_ = !had_pending_buffers;
575 return true;
576 }
577
578 bool ChunkDemuxer::ParseAndAddData_Locked(const uint8* data, int length) {
579 if (!cluster_parser_.get())
580 return false;
581
582 const uint8* cur = data;
583 int cur_size = length;
584
585 while (cur_size > 0) {
586 int res = cluster_parser_->Parse(cur, cur_size);
587
588 if (res <= 0) {
589 VLOG(1) << "ParseAndAddData_Locked() : cluster parsing failed.";
590 return false;
591 }
592
593 // Make sure we can add the buffers to both streams before we acutally
594 // add them. This allows us to accept all of the data or none of it.
595 if ((audio_.get() &&
596 !audio_->CanAddBuffers(cluster_parser_->audio_buffers())) ||
597 (video_.get() &&
598 !video_->CanAddBuffers(cluster_parser_->video_buffers()))) {
599 return false;
600 }
601
602 if (audio_.get())
603 audio_->AddBuffers(cluster_parser_->audio_buffers());
604
605 if (video_.get())
606 video_->AddBuffers(cluster_parser_->video_buffers());
607
608 cur += res;
609 cur_size -= res;
610 }
611
612 // TODO(acolwell) : make this more representative of what is actually
613 // buffered.
614 buffered_bytes_ += length;
615
616 return true;
617 }
618
619 } // namespace media
OLDNEW
« no previous file with comments | « media/filters/chunk_demuxer.h ('k') | media/filters/chunk_demuxer_factory.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698