Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1115)

Side by Side Diff: media/base/composite_filter.cc

Issue 5744002: Refactor PipelineImpl to use CompositeFilter to manage Filter state transitions. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Applied code review suggestions. Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/base/composite_filter.h"
6
7 #include "base/stl_util-inl.h"
8 #include "media/base/callback.h"
9
10 namespace media {
11
12 class CompositeFilter::FilterHostImpl : public FilterHost {
13 public:
14 FilterHostImpl(CompositeFilter* parent, FilterHost* host);
15
16 FilterHost* host();
17
18 // media::FilterHost methods.
19 virtual void SetError(PipelineError error);
20 virtual base::TimeDelta GetTime() const;
21 virtual base::TimeDelta GetDuration() const;
22 virtual void SetTime(base::TimeDelta time);
23 virtual void SetDuration(base::TimeDelta duration);
24 virtual void SetBufferedTime(base::TimeDelta buffered_time);
25 virtual void SetTotalBytes(int64 total_bytes);
26 virtual void SetBufferedBytes(int64 buffered_bytes);
27 virtual void SetVideoSize(size_t width, size_t height);
28 virtual void SetStreaming(bool streaming);
29 virtual void NotifyEnded();
30 virtual void SetLoaded(bool loaded);
31 virtual void SetNetworkActivity(bool network_activity);
32 virtual void DisableAudioRenderer();
33 virtual void SetCurrentReadPosition(int64 offset);
34 virtual int64 GetCurrentReadPosition();
35
36 private:
37 CompositeFilter* parent_;
38 FilterHost* host_;
39
40 DISALLOW_COPY_AND_ASSIGN(FilterHostImpl);
41 };
42
43 CompositeFilter::CompositeFilter(MessageLoop* message_loop) {
44 Init(message_loop, NULL);
45 }
46
47 CompositeFilter::CompositeFilter(MessageLoop* message_loop,
48 ThreadFactoryFunction thread_factory) {
49 DCHECK(thread_factory);
50 Init(message_loop, thread_factory);
51 }
52
53 void CompositeFilter::Init(MessageLoop* message_loop,
54 ThreadFactoryFunction thread_factory) {
55 DCHECK(message_loop);
56 message_loop_ = message_loop;
57 thread_factory_ = thread_factory;
58
59 if (!thread_factory_) {
60 thread_factory_ = &CompositeFilter::DefaultThreadFactory;
61 }
62
63 state_ = kCreated;
64 sequence_index_ = 0;
65 error_ = PIPELINE_OK;
66 }
67
68 CompositeFilter::~CompositeFilter() {
69 DCHECK_EQ(message_loop_, MessageLoop::current());
70 DCHECK(state_ == kCreated || state_ == kStopped);
71
72 // Stop every running filter thread.
73 for (FilterThreadVector::iterator iter = filter_threads_.begin();
74 iter != filter_threads_.end();
75 ++iter) {
76 (*iter)->Stop();
77 }
78
79 // Reset the pipeline, which will decrement a reference to this object.
80 // We will get destroyed as soon as the remaining tasks finish executing.
81 // To be safe, we'll set our pipeline reference to NULL.
82 filters_.clear();
83 STLDeleteElements(&filter_threads_);
84 }
85
86 bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) {
87 DCHECK_EQ(message_loop_, MessageLoop::current());
88 if (!filter.get() || state_ != kCreated || !host())
89 return false;
90
91 // Create a dedicated thread for this filter if applicable.
92 if (filter->requires_message_loop()) {
93 scoped_ptr<base::Thread> thread(
94 thread_factory_(filter->message_loop_name()));
95
96 if (!thread.get() || !thread->Start()) {
97 return false;
98 }
99
100 filter->set_message_loop(thread->message_loop());
101 filter_threads_.push_back(thread.release());
102 }
103
104 // Register ourselves as the filter's host.
105 filter->set_host(host_impl_.get());
106 filters_.push_back(make_scoped_refptr(filter.get()));
107 return true;
108 }
109
110 const char* CompositeFilter::major_mime_type() const {
111 return "";
112 }
113
114 void CompositeFilter::set_host(FilterHost* host) {
115 DCHECK_EQ(message_loop_, MessageLoop::current());
116 DCHECK(host);
117 DCHECK(!host_impl_.get());
118 host_impl_.reset(new FilterHostImpl(this, host));
119 }
120
121 FilterHost* CompositeFilter::host() {
122 return host_impl_.get() ? host_impl_->host() : NULL;
123 }
124
125 bool CompositeFilter::requires_message_loop() const {
126 return false;
127 }
128
129 const char* CompositeFilter::message_loop_name() const {
130 return "CompositeFilter";
131 }
132
133 void CompositeFilter::set_message_loop(MessageLoop* message_loop) {
134 NOTREACHED() << "Message loop should not be set.";
135 }
136
137 MessageLoop* CompositeFilter::message_loop() {
138 return NULL;
139 }
140
141 void CompositeFilter::Play(FilterCallback* play_callback) {
142 DCHECK_EQ(message_loop_, MessageLoop::current());
143 scoped_ptr<FilterCallback> callback(play_callback);
144 if (callback_.get()) {
145 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
146 callback->Run();
147 return;
148 } else if (state_ == kPlaying) {
149 callback->Run();
150 return;
151 } else if (!host() || (state_ != kPaused && state_ != kCreated)) {
152 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
153 callback->Run();
154 return;
155 }
156
157 ChangeState(kPlayPending);
158 callback_.reset(callback.release());
159 StartSerialCallSequence();
160 }
161
162 void CompositeFilter::Pause(FilterCallback* pause_callback) {
163 DCHECK_EQ(message_loop_, MessageLoop::current());
164 scoped_ptr<FilterCallback> callback(pause_callback);
165 if (callback_.get()) {
166 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
167 callback->Run();
168 return;
169 } else if (state_ == kPaused) {
170 callback->Run();
171 return;
172 } else if (!host() || state_ != kPlaying) {
173 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
174 callback->Run();
175 return;
176 }
177
178 ChangeState(kPausePending);
179 callback_.reset(callback.release());
180 StartSerialCallSequence();
181 }
182
183 void CompositeFilter::Flush(FilterCallback* flush_callback) {
184 DCHECK_EQ(message_loop_, MessageLoop::current());
185 scoped_ptr<FilterCallback> callback(flush_callback);
186 if (callback_.get()) {
187 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
188 callback->Run();
189 return;
190 } else if (!host() || (state_ != kCreated && state_ != kPaused)) {
191 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
192 callback->Run();
193 return;
194 }
195
196 ChangeState(kFlushPending);
197 callback_.reset(callback.release());
198 StartParallelCallSequence();
199 }
200
201 void CompositeFilter::Stop(FilterCallback* stop_callback) {
202 DCHECK_EQ(message_loop_, MessageLoop::current());
203 scoped_ptr<FilterCallback> callback(stop_callback);
204 if (!host()) {
205 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
206 callback->Run();
207 return;
208 } else if (state_ == kStopped) {
209 callback->Run();
210 return;
211 }
212
213 switch(state_) {
214 case kError:
215 case kCreated:
216 case kPaused:
217 case kPlaying:
218 ChangeState(kStopPending);
219 break;
220 case kPlayPending:
221 ChangeState(kStopWhilePlayPending);
222 break;
223 case kPausePending:
224 ChangeState(kStopWhilePausePending);
225 break;
226 case kFlushPending:
227 ChangeState(kStopWhileFlushPending);
228 break;
229 case kSeekPending:
230 ChangeState(kStopWhileSeekPending);
231 break;
232 default:
233 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
234 callback->Run();
235 return;
236 }
237
238 callback_.reset(callback.release());
239 if (state_ == kStopPending) {
240 StartSerialCallSequence();
241 }
242 }
243
244 void CompositeFilter::SetPlaybackRate(float playback_rate) {
245 DCHECK_EQ(message_loop_, MessageLoop::current());
246 for (FilterVector::iterator iter = filters_.begin();
247 iter != filters_.end();
248 ++iter) {
249 (*iter)->SetPlaybackRate(playback_rate);
250 }
251 }
252
253 void CompositeFilter::Seek(base::TimeDelta time,
254 FilterCallback* seek_callback) {
255 DCHECK_EQ(message_loop_, MessageLoop::current());
256 scoped_ptr<FilterCallback> callback(seek_callback);
257 if (callback_.get()) {
258 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
259 callback->Run();
260 return;
261 } else if (!host() || (state_ != kPaused && state_ != kCreated)) {
262 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
263 callback->Run();
264 return;
265 }
266
267 ChangeState(kSeekPending);
268 callback_.reset(callback.release());
269 pending_seek_time_ = time;
270 StartSerialCallSequence();
271 }
272
273 void CompositeFilter::OnAudioRendererDisabled() {
274 DCHECK_EQ(message_loop_, MessageLoop::current());
275 for (FilterVector::iterator iter = filters_.begin();
276 iter != filters_.end();
277 ++iter) {
278 (*iter)->OnAudioRendererDisabled();
279 }
280 }
281
282 base::Thread* CompositeFilter::DefaultThreadFactory(
283 const char* thread_name) {
284 return new base::Thread(thread_name);
285 }
286
287 void CompositeFilter::ChangeState(State new_state) {
288 state_ = new_state;
289 }
290
291 void CompositeFilter::StartSerialCallSequence() {
292 DCHECK_EQ(message_loop_, MessageLoop::current());
293 error_ = PIPELINE_OK;
294
295 if (filters_.size() > 0) {
296 sequence_index_ = 0;
297 CallFilter(filters_[sequence_index_],
298 NewThreadSafeCallback(&CompositeFilter::SerialCallback));
299 } else {
300 sequence_index_ = 0;
301 SerialCallback();
302 }
303 }
304
305 void CompositeFilter::StartParallelCallSequence() {
306 DCHECK_EQ(message_loop_, MessageLoop::current());
307 error_ = PIPELINE_OK;
308
309 if (filters_.size() > 0) {
310 sequence_index_ = 0;
311 for (size_t i = 0; i < filters_.size(); i++) {
312 CallFilter(filters_[i],
313 NewThreadSafeCallback(&CompositeFilter::ParallelCallback));
314 }
315 } else {
316 sequence_index_ = 0;
317 ParallelCallback();
318 }
319 }
320
321 void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter,
322 FilterCallback* callback) {
323 switch(state_) {
324 case kPlayPending:
325 filter->Play(callback);
326 break;
327 case kPausePending:
328 filter->Pause(callback);
329 break;
330 case kFlushPending:
331 filter->Flush(callback);
332 break;
333 case kStopPending:
334 filter->Stop(callback);
335 break;
336 case kSeekPending:
337 filter->Seek(pending_seek_time_, callback);
338 break;
339 default:
340 delete callback;
341 ChangeState(kError);
342 HandleError(PIPELINE_ERROR_INVALID_STATE);
343 }
344 }
345
346 void CompositeFilter::DispatchPendingCallback() {
347 if (callback_.get()) {
348 scoped_ptr<FilterCallback> callback(callback_.release());
349 callback->Run();
350 }
351 }
352
353 CompositeFilter::State CompositeFilter::GetNextState(State state) const {
354 State ret = kInvalid;
355 switch (state) {
356 case kPlayPending:
357 ret = kPlaying;
358 break;
359 case kPausePending:
360 ret = kPaused;
361 case kFlushPending:
362 ret = kPaused;
363 break;
364 case kStopPending:
365 ret = kStopped;
366 break;
367 case kSeekPending:
368 ret = kPaused;
369 break;
370 case kStopWhilePlayPending:
371 case kStopWhilePausePending:
372 case kStopWhileFlushPending:
373 case kStopWhileSeekPending:
374 ret = kStopPending;
375 break;
376
377 case kInvalid:
378 case kCreated:
379 case kPlaying:
380 case kPaused:
381 case kStopped:
382 case kError:
383 ret = kInvalid;
384 break;
385
386 // default: intentionally left out to catch missing states.
387 }
388
389 return ret;
390 }
391
392 void CompositeFilter::SerialCallback() {
393 DCHECK_EQ(message_loop_, MessageLoop::current());
394 if (error_ != PIPELINE_OK) {
395 // We encountered an error. Terminate the sequence now.
396 ChangeState(kError);
397 HandleError(error_);
398 return;
399 }
400
401 if (filters_.size() > 0)
402 sequence_index_++;
403
404 if (sequence_index_ == filters_.size()) {
405 // All filters have been successfully called without error.
406 OnCallSequenceDone();
407 } else if (GetNextState(state_) == kStopPending) {
408 // Abort sequence early and start issuing Stop() calls.
409 ChangeState(kStopPending);
410 StartSerialCallSequence();
411 } else {
412 // We aren't done with the sequence. Call the next filter.
413 CallFilter(filters_[sequence_index_],
414 NewThreadSafeCallback(&CompositeFilter::SerialCallback));
415 }
416 }
417
418 void CompositeFilter::ParallelCallback() {
419 DCHECK_EQ(message_loop_, MessageLoop::current());
420
421 if (filters_.size() > 0)
422 sequence_index_++;
423
424 if (sequence_index_ == filters_.size()) {
425 if (error_ != PIPELINE_OK) {
426 // We encountered an error.
427 ChangeState(kError);
428 HandleError(error_);
429 return;
430 }
431
432 OnCallSequenceDone();
433 }
434 }
435
436 void CompositeFilter::OnCallSequenceDone() {
437 State next_state = GetNextState(state_);
438
439 if (next_state == kInvalid) {
440 // We somehow got into an unexpected state.
441 ChangeState(kError);
442 HandleError(PIPELINE_ERROR_INVALID_STATE);
443 }
444
445 ChangeState(next_state);
446
447 if (state_ == kStopPending) {
448 // Handle a deferred Stop().
449 StartSerialCallSequence();
450 } else {
451 // Call the callback to indicate that the operation has completed.
452 DispatchPendingCallback();
453 }
454 }
455
456 void CompositeFilter::SendErrorToHost(PipelineError error) {
457 if (host_impl_.get())
458 host_impl_.get()->host()->SetError(error);
459 }
460
461 void CompositeFilter::HandleError(PipelineError error) {
462 if (error != PIPELINE_OK) {
463 SendErrorToHost(error);
464 }
465
466 DispatchPendingCallback();
467 }
468
469 FilterCallback* CompositeFilter::NewThreadSafeCallback(
470 void (CompositeFilter::*method)()) {
471 return TaskToCallbackAdapter::NewCallback(
472 NewRunnableMethod(this,
473 &CompositeFilter::OnCallback,
474 message_loop_,
475 method));
476 }
477
478 void CompositeFilter::OnCallback(MessageLoop* message_loop,
479 void (CompositeFilter::*method)()) {
480 if (MessageLoop::current() != message_loop) {
481 // Posting callback to the proper thread.
482 message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method));
483 return;
484 }
485
486 (this->*method)();
487 }
488
489 bool CompositeFilter::CanForwardError() {
490 return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused);
491 }
492
493
494
495 void CompositeFilter::SetError(PipelineError error) {
496 if (message_loop_ != MessageLoop::current()) {
497 message_loop_->PostTask(FROM_HERE,
498 NewRunnableMethod(this, &CompositeFilter::SetError, error));
499 return;
500 }
501
502 DCHECK_EQ(message_loop_, MessageLoop::current());
503
504 // Drop errors recieved while stopping or stopped.
505 // This shields the owner of this object from having
506 // to deal with errors it can't do anything about.
507 if (state_ == kStopPending || state_ == kStopped)
508 return;
509
510 error_ = error;
511 if (CanForwardError())
512 SendErrorToHost(error);
513 }
514
515 CompositeFilter::FilterHostImpl::FilterHostImpl(CompositeFilter* parent,
516 FilterHost* host) :
517 parent_(parent),
518 host_(host) {
519 }
520
521 FilterHost* CompositeFilter::FilterHostImpl::host() {
522 return host_;
523 }
524
525 // media::FilterHost methods.
526 void CompositeFilter::FilterHostImpl::SetError(PipelineError error) {
527 parent_->SetError(error);
528 }
529
530 base::TimeDelta CompositeFilter::FilterHostImpl::GetTime() const {
531 return host_->GetTime();
532 }
533
534 base::TimeDelta CompositeFilter::FilterHostImpl::GetDuration() const {
535 return host_->GetDuration();
536 }
537
538 void CompositeFilter::FilterHostImpl::SetTime(base::TimeDelta time) {
539 host_->SetTime(time);
540 }
541
542 void CompositeFilter::FilterHostImpl::SetDuration(base::TimeDelta duration) {
543 host_->SetDuration(duration);
544 }
545
546 void CompositeFilter::FilterHostImpl::SetBufferedTime(
547 base::TimeDelta buffered_time) {
548 host_->SetBufferedTime(buffered_time);
549 }
550
551 void CompositeFilter::FilterHostImpl::SetTotalBytes(int64 total_bytes) {
552 host_->SetTotalBytes(total_bytes);
553 }
554
555 void CompositeFilter::FilterHostImpl::SetBufferedBytes(int64 buffered_bytes) {
556 host_->SetBufferedBytes(buffered_bytes);
557 }
558
559 void CompositeFilter::FilterHostImpl::SetVideoSize(size_t width,
560 size_t height) {
561 host_->SetVideoSize(width, height);
562 }
563
564 void CompositeFilter::FilterHostImpl::SetStreaming(bool streaming) {
565 host_->SetStreaming(streaming);
566 }
567
568 void CompositeFilter::FilterHostImpl::NotifyEnded() {
569 host_->NotifyEnded();
570 }
571
572 void CompositeFilter::FilterHostImpl::SetLoaded(bool loaded) {
573 host_->SetLoaded(loaded);
574 }
575
576 void CompositeFilter::FilterHostImpl::SetNetworkActivity(
577 bool network_activity) {
578 host_->SetNetworkActivity(network_activity);
579 }
580
581 void CompositeFilter::FilterHostImpl::DisableAudioRenderer() {
582 host_->DisableAudioRenderer();
583 }
584
585 void CompositeFilter::FilterHostImpl::SetCurrentReadPosition(int64 offset) {
586 host_->SetCurrentReadPosition(offset);
587 }
588
589 int64 CompositeFilter::FilterHostImpl::GetCurrentReadPosition() {
590 return host_->GetCurrentReadPosition();
591 }
592
593 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698