Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(101)

Side by Side Diff: media/filters/chunk_demuxer.cc

Issue 7203002: Adding ChunkDemuxer implementation. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Addressed CR comments & split code out into separate files. Created 9 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Implements a Demuxer that can switch among different data sources mid-stream.
6 // Uses FFmpegDemuxer under the covers, so see the caveats at the top of
7 // ffmpeg_demuxer.h.
8
9 #include "media/filters/chunk_demuxer.h"
10
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop.h"
14 #include "media/base/filter_host.h"
15 #include "media/base/data_buffer.h"
16 #include "media/ffmpeg/ffmpeg_common.h"
17 #include "media/filters/in_memory_url_protocol.h"
scherkus (not reviewing) 2011/06/24 18:27:37 f > i
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 Done.
18 #include "media/filters/ffmpeg_glue.h"
19 #include "media/webm/webm_cluster_parser.h"
20 #include "media/webm/webm_constants.h"
21 #include "media/webm/webm_info_parser.h"
22 #include "media/webm/webm_tracks_parser.h"
23
24 namespace media {
25
26 // WebM File Header. This is prepended to the INFO & TRACKS
27 // data passed to Init() before handing it to FFmpeg. Essentially
28 // we are making the INFO & TRACKS data look like a small WebM
29 // file so we can use FFmpeg to initialize the AVFormatContext.
30 //
31 // TODO(acolwell): Remove this once GetAVStream() has been removed from
32 // the DemuxerStream interface.
33 static const uint8 kWebMHeader[] = {
34 0x1A, 0x45, 0xDF, 0xA3, 0x9F, // EBML (size = 0x1f)
scherkus (not reviewing) 2011/06/24 18:27:37 nit: two spaces between code + comments
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 Done.
35 0x42, 0x86, 0x81, 0x01, // EBMLVersion = 1
36 0x42, 0xF7, 0x81, 0x01, // EBMLReadVersion = 1
37 0x42, 0xF2, 0x81, 0x04, // EBMLMaxIDLength = 4
38 0x42, 0xF3, 0x81, 0x08, // EBMLMaxSizeLength = 8
39 0x42, 0x82, 0x84, 0x77, 0x65, 0x62, 0x6D, // DocType = "webm"
40 0x42, 0x87, 0x81, 0x02, // DocTypeVersion = 2
41 0x42, 0x85, 0x81, 0x02, // DocTypeReadVersion = 2
42 // EBML end
43 0x18, 0x53, 0x80, 0x67, // Segment
44 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // segment(size = 0)
45 // INFO goes here.
46 };
47
48 // Offset of the segment size field in kWebMHeader. Used to update
49 // the segment size field before handing the buffer to FFmpeg.
50 static const int kSegmentSizeOffset = sizeof(kWebMHeader) - 8;
51
52 static const uint8 kEmptyCluster[] = {
53 0x1F, 0x43, 0xB6, 0x75, 0x80 // CLUSTER (size = 0)
scherkus (not reviewing) 2011/06/24 18:27:37 ditto
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 Done.
54 };
55
56 static Buffer* CreateBuffer(const uint8* data, size_t size) {
57 scoped_array<uint8> buf(new uint8[size]);
58 memcpy(buf.get(), data, size);
59 return new DataBuffer(buf.release(), size);
60 }
61
62 class ChunkDemuxerStream : public DemuxerStream {
63 public:
64 typedef std::deque<scoped_refptr<Buffer> > BufferQueue;
65 typedef std::deque<ReadCallback> ReadCBQueue;
66
67 ChunkDemuxerStream(Type type, AVStream* stream);
68 virtual ~ChunkDemuxerStream();
69
70 void Flush();
71 void AddBuffers(const BufferQueue& buffers);
72 void Shutdown();
73
74 bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const;
75
76 // DemuxerStream methods.
77 virtual void Read(const ReadCallback& read_callback);
78 virtual Type type();
79 virtual const MediaFormat& media_format();
80 virtual void EnableBitstreamConverter();
81 virtual AVStream* GetAVStream();
82
83 private:
84 static void RunCallback(ReadCallback cb, scoped_refptr<Buffer> buffer);
85
86 Type type_;
87 MediaFormat media_format_;
88 AVStream* av_stream_;
89
90 mutable base::Lock lock_;
91 ReadCBQueue read_cbs_;
92 BufferQueue buffers_;
93 bool shutdown_called_;
94
95 DISALLOW_IMPLICIT_CONSTRUCTORS(ChunkDemuxerStream);
96 };
97
98 ChunkDemuxerStream::ChunkDemuxerStream(Type type, AVStream* stream)
99 : type_(type),
100 av_stream_(stream),
101 shutdown_called_(false) {
102 }
103
104 ChunkDemuxerStream::~ChunkDemuxerStream() {}
105
106 void ChunkDemuxerStream::Flush() {
107 VLOG(1) << "Flush()";
108 base::AutoLock auto_lock(lock_);
109 buffers_.clear();
110 }
111
112 void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) {
113 std::deque<base::Closure> callbacks;
114 {
115 base::AutoLock auto_lock(lock_);
116
117 for (BufferQueue::const_iterator itr = buffers.begin();
118 itr != buffers.end(); itr++) {
119 buffers_.push_back(*itr);
120 }
121
122 while (!buffers_.empty() && !read_cbs_.empty()) {
123 callbacks.push_back(base::Bind(&ChunkDemuxerStream::RunCallback,
124 read_cbs_.front(),
125 buffers_.front()));
126 buffers_.pop_front();
127 read_cbs_.pop_front();
128 }
129 }
130
131 while (!callbacks.empty()) {
132 callbacks.front().Run();
133 callbacks.pop_front();
134 }
135 }
136
137 void ChunkDemuxerStream::Shutdown() {
138 std::deque<ReadCallback> callbacks;
139 {
140 base::AutoLock auto_lock(lock_);
141 shutdown_called_ = true;
142
143 // Collect all the pending Read() callbacks.
144 while (!read_cbs_.empty()) {
145 callbacks.push_back(read_cbs_.front());
146 read_cbs_.pop_front();
147 }
148 }
149
150 // Pass NULL to all callbacks to signify read failure.
151 while (!callbacks.empty()) {
152 callbacks.front().Run(NULL);
153 callbacks.pop_front();
154 }
155 }
156
157 bool ChunkDemuxerStream::GetLastBufferTimestamp(
158 base::TimeDelta* timestamp) const {
159 base::AutoLock auto_lock(lock_);
160
161 if (buffers_.empty())
162 return false;
163
164 *timestamp = buffers_.back()->GetTimestamp();
165 return true;
166 }
167
168 // Helper function used to make Closures for ReadCallbacks.
169 //static
170 void ChunkDemuxerStream::RunCallback(ReadCallback cb,
171 scoped_refptr<Buffer> buffer) {
172 cb.Run(buffer);
173 }
174
175 // Helper function that makes sure |read_callback| runs on |message_loop|.
176 static void RunOnMessageLoop(const DemuxerStream::ReadCallback& read_callback,
177 MessageLoop* message_loop,
178 scoped_refptr<Buffer> buffer) {
179 if (MessageLoop::current() != message_loop) {
180 message_loop->PostTask(FROM_HERE,
181 NewRunnableFunction(&RunOnMessageLoop,
182 read_callback,
183 message_loop,
184 buffer));
185 return;
186 }
187
188 read_callback.Run(buffer);
189 }
190
191 // DemuxerStream methods.
192 void ChunkDemuxerStream::Read(const ReadCallback& read_callback) {
193 scoped_refptr<Buffer> buffer;
194
195 {
196 base::AutoLock auto_lock(lock_);
197
198 if (!shutdown_called_) {
199 if (buffers_.empty()) {
200 // Wrap & store |read_callback| so that it will
201 // get called on the current MessageLoop.
202 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
203 read_callback,
204 MessageLoop::current()));
205 return;
206 }
207
208 if (!read_cbs_.empty()) {
209 // Wrap & store |read_callback| so that it will
210 // get called on the current MessageLoop.
211 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
212 read_callback,
213 MessageLoop::current()));
214 return;
215 }
216
217 buffer = buffers_.front();
218 buffers_.pop_front();
219 }
220 }
221
222 read_callback.Run(buffer);
223 }
224
225 DemuxerStream::Type ChunkDemuxerStream::type() { return type_; }
226
227 const MediaFormat& ChunkDemuxerStream::media_format() { return media_format_; }
228
229 void ChunkDemuxerStream::EnableBitstreamConverter() {}
230
231 AVStream* ChunkDemuxerStream::GetAVStream() { return av_stream_; }
232
233 ChunkDemuxer::ChunkDemuxer()
234 : state_(WAITING_FOR_INIT),
235 format_context_(NULL),
236 buffered_bytes_(0),
237 first_seek_(true) {
238 }
239
240 ChunkDemuxer::~ChunkDemuxer() {
241 DCHECK_NE(state_, INITIALIZED);
242
243 if (!format_context_)
244 return;
245
246 DestroyAVFormatContext(format_context_);
247 format_context_ = NULL;
248 }
249
250 bool ChunkDemuxer::Init(const uint8* data, int size) {
251 DCHECK(data);
252 DCHECK_GT(size, 0);
253 VLOG(1) << "Init(" << size << ")";
254
255 base::AutoLock auto_lock(lock_);
256 DCHECK_EQ(state_, WAITING_FOR_INIT);
257
258 const uint8* cur = data;
259 int cur_size = size;
260 WebMInfoParser info_parser;
261 int res = info_parser.Parse(cur, cur_size);
262
263 if (res <= 0) {
264 ChangeState(INIT_ERROR);
265 return false;
266 }
267
268 cur += res;
269 cur_size -= res;
270
271 WebMTracksParser tracks_parser(info_parser.timecode_scale());
272 res = tracks_parser.Parse(cur, cur_size);
273
274 if (res <= 0) {
275 ChangeState(INIT_ERROR);
276 return false;
277 }
278
279 double mult = info_parser.timecode_scale() / 1000.0;
280 duration_ = base::TimeDelta::FromMicroseconds(info_parser.duration() * mult);
281
282 cluster_parser_.reset(new WebMClusterParser(
283 info_parser.timecode_scale(),
284 tracks_parser.audio_track_num(),
285 tracks_parser.audio_default_duration(),
286 tracks_parser.video_track_num(),
287 tracks_parser.video_default_duration()));
288
289 format_context_ = CreateFormatContext(data, size);
290
291 if (!format_context_ || !SetupStreams() || !ParsePendingBuffers()) {
292 ChangeState(INIT_ERROR);
293 return false;
294 }
295
296 ChangeState(INITIALIZED);
297 return true;
298 }
299
300 // Filter implementation.
301 void ChunkDemuxer::set_host(FilterHost* filter_host) {
302 Demuxer::set_host(filter_host);
303 filter_host->SetDuration(duration_);
304 filter_host->SetCurrentReadPosition(0);
305 }
306
307 void ChunkDemuxer::Stop(FilterCallback* callback) {
308 VLOG(1) << "Stop()";
309
310 callback->Run();
311 delete callback;
312 }
313
314 void ChunkDemuxer::Seek(base::TimeDelta time, const FilterStatusCB& cb) {
315 VLOG(1) << "Seek(" << time.InSecondsF() << ")";
316
317 bool run_callback = false;
318 {
319 base::AutoLock auto_lock(lock_);
320
321 if (first_seek_) {
322 first_seek_ = false;
323 run_callback = true;
324 } else {
325 seek_cb_ = cb;
326 seek_time_ = time;
327 }
328 }
329
330 if (run_callback)
331 cb.Run(PIPELINE_OK);
332 }
333
334 void ChunkDemuxer::OnAudioRendererDisabled() {
335 base::AutoLock auto_lock(lock_);
336 audio_ = NULL;
337 }
338
339 void ChunkDemuxer::SetPreload(Preload preload) {}
scherkus (not reviewing) 2011/06/24 18:27:37 NOTIMPLEMENTED?
acolwell GONE FROM CHROMIUM 2011/06/27 23:48:25 No. This gets called, but we can't really do anyth
340
341 // Demuxer implementation.
342 scoped_refptr<DemuxerStream> ChunkDemuxer::GetStream(
343 DemuxerStream::Type type) {
344 VLOG(1) << "GetStream(" << type << ")";
345
346 if (type == DemuxerStream::VIDEO)
347 return video_;
348
349 if (type == DemuxerStream::AUDIO)
350 return audio_;
351
352 return NULL;
353 }
354
355 base::TimeDelta ChunkDemuxer::GetStartTime() const {
356 VLOG(1) << "GetStartTime()";
357 // TODO(acolwell) : Fix this so it uses the time on the first packet.
358 return base::TimeDelta();
359 }
360
361 void ChunkDemuxer::FlushData() {
362 base::AutoLock auto_lock(lock_);
363 if (audio_.get())
364 audio_->Flush();
365
366 if (video_.get())
367 video_->Flush();
368
369 pending_buffers_.clear();
370 }
371
372 bool ChunkDemuxer::AddData(const uint8* data, unsigned length) {
373 VLOG(1) << "AddData(" << length << ")";
374
375 int64 buffered_bytes = 0;
376 base::TimeDelta buffered_ts = base::TimeDelta::FromSeconds(-1);
377
378 FilterStatusCB cb;
379 {
380 base::AutoLock auto_lock(lock_);
381
382 switch(state_) {
383 case WAITING_FOR_INIT:
384 pending_buffers_.push_back(CreateBuffer(data, length));
385 return false;
386 break;
387
388 case INITIALIZED:
389 if (!ParseAndAddData_Locked(data, length)) {
390 VLOG(1) << "AddData(): parsing data failed";
391 return false;
392 }
393 break;
394
395 case INIT_ERROR:
396 case SHUTDOWN:
397 VLOG(1) << "AddData(): called in unexpected state " << state_;
398 return false;
399 break;
400 }
401
402 base::TimeDelta tmp;
403 if (audio_.get() && audio_->GetLastBufferTimestamp(&tmp) &&
404 tmp > buffered_ts) {
405 buffered_ts = tmp;
406 }
407
408 if (video_.get() && video_->GetLastBufferTimestamp(&tmp) &&
409 tmp > buffered_ts) {
410 buffered_ts = tmp;
411 }
412
413 buffered_bytes = buffered_bytes_;
414
415 if (!seek_cb_.is_null())
416 std::swap(cb, seek_cb_);
417 }
418
419 // Notify the host of 'network activity' because we got data.
420 if (host()) {
421 host()->SetBufferedBytes(buffered_bytes);
422
423 if (buffered_ts.InSeconds() >= 0) {
424 host()->SetBufferedTime(buffered_ts);
425 }
426
427 host()->SetNetworkActivity(true);
428 }
429
430 if (!cb.is_null())
431 cb.Run(PIPELINE_OK);
432
433 return true;
434 }
435
436 void ChunkDemuxer::Shutdown() {
437 FilterStatusCB cb;
438 {
439 base::AutoLock auto_lock(lock_);
440
441 std::swap(cb, seek_cb_);
442
443 if (audio_.get())
444 audio_->Shutdown();
445
446 if (video_.get())
447 video_->Shutdown();
448
449 ChangeState(SHUTDOWN);
450 }
451
452 if (!cb.is_null())
453 cb.Run(PIPELINE_OK);
454 }
455
456 void ChunkDemuxer::ChangeState(State new_state) {
457 lock_.AssertAcquired();
458 state_ = new_state;
459 }
460
461 AVFormatContext* ChunkDemuxer::CreateFormatContext(const uint8* data,
462 int size) const {
463 int segment_size = size + sizeof(kEmptyCluster);
464 int buf_size = sizeof(kWebMHeader) + segment_size;
465 scoped_array<uint8> buf(new uint8[buf_size]);
466 memcpy(buf.get(), kWebMHeader, sizeof(kWebMHeader));
467 memcpy(buf.get() + sizeof(kWebMHeader), data, size);
468 memcpy(buf.get() + sizeof(kWebMHeader) + size, kEmptyCluster,
469 sizeof(kEmptyCluster));
470
471 // Update the segment size in the buffer.
472 int64 tmp = (segment_size & GG_LONGLONG(0x00FFFFFFFFFFFFFF)) |
473 GG_LONGLONG(0x0100000000000000);
474 for (int i = 0; i < 8; i++) {
475 buf[kSegmentSizeOffset + i] = (tmp >> (8 * (7 - i))) & 0xff;
476 }
477
478 InMemoryUrlProtocol imup(buf.get(), buf_size, true);
479 std::string key = FFmpegGlue::GetInstance()->AddProtocol(&imup);
480
481 // Open FFmpeg AVFormatContext.
482 AVFormatContext* context = NULL;
483 int result = av_open_input_file(&context, key.c_str(), NULL, 0, NULL);
484
485 // Remove ourself from protocol list.
486 FFmpegGlue::GetInstance()->RemoveProtocol(&imup);
487
488 if (result < 0)
489 return NULL;
490
491 return context;
492 }
493
494 bool ChunkDemuxer::SetupStreams() {
495 int result = av_find_stream_info(format_context_);
496
497 if (result < 0)
498 return false;
499
500 bool no_supported_streams = true;
501 for (size_t i = 0; i < format_context_->nb_streams; ++i) {
502 AVStream* stream = format_context_->streams[i];
503 AVCodecContext* codec_context = stream->codec;
504 CodecType codec_type = codec_context->codec_type;
505
506 if (codec_type == CODEC_TYPE_AUDIO &&
507 stream->codec->codec_id == CODEC_ID_VORBIS &&
508 !audio_.get()) {
509 audio_ = new ChunkDemuxerStream(DemuxerStream::AUDIO, stream);
510 no_supported_streams = false;
511 continue;
512 }
513
514 if (codec_type == CODEC_TYPE_VIDEO &&
515 stream->codec->codec_id == CODEC_ID_VP8 &&
516 !video_.get()) {
517 video_ = new ChunkDemuxerStream(DemuxerStream::VIDEO, stream);
518 no_supported_streams = false;
519 continue;
520 }
521 }
522
523 return !no_supported_streams;
524 }
525
526 bool ChunkDemuxer::ParsePendingBuffers() {
527 // Handle any buffers that came in between the time the pipeline was
528 // started and Init() was called.
529 while(!pending_buffers_.empty()) {
530 scoped_refptr<media::Buffer> buf = pending_buffers_.front();
531 pending_buffers_.pop_front();
532
533 if (!ParseAndAddData_Locked(buf->GetData(), buf->GetDataSize())) {
534 pending_buffers_.clear();
535 ChangeState(INIT_ERROR);
536 return false;
537 }
538 }
539
540 return true;
541 }
542
543 bool ChunkDemuxer::ParseAndAddData_Locked(const uint8* data, int length) {
544 if (!cluster_parser_.get())
545 return false;
546
547 const uint8* cur = data;
548 int cur_size = length;
549
550 while (cur_size > 0) {
551 cluster_parser_->Reset();
552 int res = cluster_parser_->Parse(cur, cur_size);
553
554 if (res <= 0) {
555 VLOG(1) << "ParseAndAddData_Locked() : cluster parsing failed.";
556 return false;
557 }
558
559 if (audio_.get())
560 audio_->AddBuffers(cluster_parser_->audio_buffers());
561
562 if (video_.get())
563 video_->AddBuffers(cluster_parser_->video_buffers());
564
565 cur += res;
566 cur_size -= res;
567 }
568
569 cluster_parser_->Reset();
570
571 // TODO(acolwell) : make this more representative of what is actually
572 // buffered.
573 buffered_bytes_ += length;
574
575 return true;
576 }
577
578 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698