Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Side by Side Diff: media/base/composite_filter.cc

Issue 5744002: Refactor PipelineImpl to use CompositeFilter to manage Filter state transitions. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix CR nits & remove dead code. Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « media/base/composite_filter.h ('k') | media/base/composite_filter_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/base/composite_filter.h"
6
7 #include "base/stl_util-inl.h"
8 #include "media/base/callback.h"
9
10 namespace media {
11
12 class CompositeFilter::FilterHostImpl : public FilterHost {
13 public:
14 FilterHostImpl(CompositeFilter* parent, FilterHost* host);
15
16 FilterHost* host();
17
18 // media::FilterHost methods.
19 virtual void SetError(PipelineError error);
20 virtual base::TimeDelta GetTime() const;
21 virtual base::TimeDelta GetDuration() const;
22 virtual void SetTime(base::TimeDelta time);
23 virtual void SetDuration(base::TimeDelta duration);
24 virtual void SetBufferedTime(base::TimeDelta buffered_time);
25 virtual void SetTotalBytes(int64 total_bytes);
26 virtual void SetBufferedBytes(int64 buffered_bytes);
27 virtual void SetVideoSize(size_t width, size_t height);
28 virtual void SetStreaming(bool streaming);
29 virtual void NotifyEnded();
30 virtual void SetLoaded(bool loaded);
31 virtual void SetNetworkActivity(bool network_activity);
32 virtual void DisableAudioRenderer();
33 virtual void SetCurrentReadPosition(int64 offset);
34 virtual int64 GetCurrentReadPosition();
35
36 private:
37 CompositeFilter* parent_;
38 FilterHost* host_;
39
40 DISALLOW_COPY_AND_ASSIGN(FilterHostImpl);
41 };
42
43 CompositeFilter::CompositeFilter(MessageLoop* message_loop) {
44 Init(message_loop, NULL);
45 }
46
47 CompositeFilter::CompositeFilter(MessageLoop* message_loop,
48 ThreadFactoryFunction thread_factory) {
49 DCHECK(thread_factory);
50 Init(message_loop, thread_factory);
51 }
52
53 void CompositeFilter::Init(MessageLoop* message_loop,
54 ThreadFactoryFunction thread_factory) {
55 DCHECK(message_loop);
56 message_loop_ = message_loop;
57 thread_factory_ = thread_factory;
58
59 if (!thread_factory_) {
60 thread_factory_ = &CompositeFilter::DefaultThreadFactory;
61 }
62
63 state_ = kCreated;
64 sequence_index_ = 0;
65 error_ = PIPELINE_OK;
66 }
67
68 CompositeFilter::~CompositeFilter() {
69 DCHECK_EQ(message_loop_, MessageLoop::current());
70 DCHECK(state_ == kCreated || state_ == kStopped);
71
72 // Stop every running filter thread.
73 for (FilterThreadVector::iterator iter = filter_threads_.begin();
74 iter != filter_threads_.end();
75 ++iter) {
76 (*iter)->Stop();
77 }
78
79 // Reset the pipeline, which will decrement a reference to this object.
80 // We will get destroyed as soon as the remaining tasks finish executing.
81 // To be safe, we'll set our pipeline reference to NULL.
82 filters_.clear();
83 STLDeleteElements(&filter_threads_);
84 }
85
86 bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) {
87 DCHECK_EQ(message_loop_, MessageLoop::current());
88 if (!filter.get() || state_ != kCreated || !host())
89 return false;
90
91 // Create a dedicated thread for this filter if applicable.
92 if (filter->requires_message_loop()) {
93 scoped_ptr<base::Thread> thread(
94 thread_factory_(filter->message_loop_name()));
95
96 if (!thread.get() || !thread->Start()) {
97 return false;
98 }
99
100 filter->set_message_loop(thread->message_loop());
101 filter_threads_.push_back(thread.release());
102 }
103
104 // Register ourselves as the filter's host.
105 filter->set_host(host_impl_.get());
106 filters_.push_back(make_scoped_refptr(filter.get()));
107 return true;
108 }
109
110 const char* CompositeFilter::major_mime_type() const {
111 return "";
112 }
113
114 void CompositeFilter::set_host(FilterHost* host) {
115 DCHECK_EQ(message_loop_, MessageLoop::current());
116 DCHECK(host);
117 DCHECK(!host_impl_.get());
118 host_impl_.reset(new FilterHostImpl(this, host));
119 }
120
121 FilterHost* CompositeFilter::host() {
122 return host_impl_.get() ? host_impl_->host() : NULL;
123 }
124
125 bool CompositeFilter::requires_message_loop() const {
126 return false;
127 }
128
129 const char* CompositeFilter::message_loop_name() const {
130 return "CompositeFilter";
131 }
132
133 void CompositeFilter::set_message_loop(MessageLoop* message_loop) {
134 NOTREACHED() << "Message loop should not be set.";
135 }
136
137 MessageLoop* CompositeFilter::message_loop() {
138 return NULL;
139 }
140
141 void CompositeFilter::Play(FilterCallback* play_callback) {
142 DCHECK_EQ(message_loop_, MessageLoop::current());
143 scoped_ptr<FilterCallback> callback(play_callback);
144 if (callback_.get()) {
145 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
146 callback->Run();
147 return;
148 } else if (state_ == kPlaying) {
149 callback->Run();
150 return;
151 } else if (!host() || (state_ != kPaused && state_ != kCreated)) {
152 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
153 callback->Run();
154 return;
155 }
156
157 ChangeState(kPlayPending);
158 callback_.reset(callback.release());
159 StartSerialCallSequence();
160 }
161
162 void CompositeFilter::Pause(FilterCallback* pause_callback) {
163 DCHECK_EQ(message_loop_, MessageLoop::current());
164 scoped_ptr<FilterCallback> callback(pause_callback);
165 if (callback_.get()) {
166 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
167 callback->Run();
168 return;
169 } else if (state_ == kPaused) {
170 callback->Run();
171 return;
172 } else if (!host() || state_ != kPlaying) {
173 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
174 callback->Run();
175 return;
176 }
177
178 ChangeState(kPausePending);
179 callback_.reset(callback.release());
180 StartSerialCallSequence();
181 }
182
183 void CompositeFilter::Flush(FilterCallback* flush_callback) {
184 DCHECK_EQ(message_loop_, MessageLoop::current());
185 scoped_ptr<FilterCallback> callback(flush_callback);
186 if (callback_.get()) {
187 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
188 callback->Run();
189 return;
190 } else if (!host() || (state_ != kCreated && state_ != kPaused)) {
191 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
192 callback->Run();
193 return;
194 }
195
196 ChangeState(kFlushPending);
197 callback_.reset(callback.release());
198 StartParallelCallSequence();
199 }
200
201 void CompositeFilter::Stop(FilterCallback* stop_callback) {
202 DCHECK_EQ(message_loop_, MessageLoop::current());
203 scoped_ptr<FilterCallback> callback(stop_callback);
204 if (!host()) {
205 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
206 callback->Run();
207 return;
208 } else if (state_ == kStopped) {
209 callback->Run();
210 return;
211 }
212
213 switch(state_) {
214 case kError:
215 case kCreated:
216 case kPaused:
217 case kPlaying:
218 ChangeState(kStopPending);
219 break;
220 case kPlayPending:
221 ChangeState(kStopWhilePlayPending);
222 break;
223 case kPausePending:
224 ChangeState(kStopWhilePausePending);
225 break;
226 case kFlushPending:
227 ChangeState(kStopWhileFlushPending);
228 break;
229 case kSeekPending:
230 ChangeState(kStopWhileSeekPending);
231 break;
232 default:
233 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
234 callback->Run();
235 return;
236 }
237
238 callback_.reset(callback.release());
239 if (state_ == kStopPending) {
240 StartSerialCallSequence();
241 }
242 }
243
244 void CompositeFilter::SetPlaybackRate(float playback_rate) {
245 DCHECK_EQ(message_loop_, MessageLoop::current());
246 for (FilterVector::iterator iter = filters_.begin();
247 iter != filters_.end();
248 ++iter) {
249 (*iter)->SetPlaybackRate(playback_rate);
250 }
251 }
252
253 void CompositeFilter::Seek(base::TimeDelta time,
254 FilterCallback* seek_callback) {
255 DCHECK_EQ(message_loop_, MessageLoop::current());
256 scoped_ptr<FilterCallback> callback(seek_callback);
257 if (callback_.get()) {
258 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
259 callback->Run();
260 return;
261 } else if (!host() || (state_ != kPaused && state_ != kCreated)) {
262 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
263 callback->Run();
264 return;
265 }
266
267 ChangeState(kSeekPending);
268 callback_.reset(callback.release());
269 pending_seek_time_ = time;
270 StartSerialCallSequence();
271 }
272
273 void CompositeFilter::OnAudioRendererDisabled() {
274 DCHECK_EQ(message_loop_, MessageLoop::current());
275 for (FilterVector::iterator iter = filters_.begin();
276 iter != filters_.end();
277 ++iter) {
278 (*iter)->OnAudioRendererDisabled();
279 }
280 }
281
282 base::Thread* CompositeFilter::DefaultThreadFactory(
283 const char* thread_name) {
284 return new base::Thread(thread_name);
285 }
286
287 void CompositeFilter::ChangeState(State new_state) {
288 DCHECK_EQ(message_loop_, MessageLoop::current());
289 state_ = new_state;
290 }
291
292 void CompositeFilter::StartSerialCallSequence() {
293 DCHECK_EQ(message_loop_, MessageLoop::current());
294 error_ = PIPELINE_OK;
295
296 if (filters_.size() > 0) {
297 sequence_index_ = 0;
298 CallFilter(filters_[sequence_index_],
299 NewThreadSafeCallback(&CompositeFilter::SerialCallback));
300 } else {
301 sequence_index_ = 0;
302 SerialCallback();
303 }
304 }
305
306 void CompositeFilter::StartParallelCallSequence() {
307 DCHECK_EQ(message_loop_, MessageLoop::current());
308 error_ = PIPELINE_OK;
309
310 if (filters_.size() > 0) {
311 sequence_index_ = 0;
312 for (size_t i = 0; i < filters_.size(); i++) {
313 CallFilter(filters_[i],
314 NewThreadSafeCallback(&CompositeFilter::ParallelCallback));
315 }
316 } else {
317 sequence_index_ = 0;
318 ParallelCallback();
319 }
320 }
321
322 void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter,
323 FilterCallback* callback) {
324 switch(state_) {
325 case kPlayPending:
326 filter->Play(callback);
327 break;
328 case kPausePending:
329 filter->Pause(callback);
330 break;
331 case kFlushPending:
332 filter->Flush(callback);
333 break;
334 case kStopPending:
335 filter->Stop(callback);
336 break;
337 case kSeekPending:
338 filter->Seek(pending_seek_time_, callback);
339 break;
340 default:
341 delete callback;
342 ChangeState(kError);
343 HandleError(PIPELINE_ERROR_INVALID_STATE);
344 }
345 }
346
347 void CompositeFilter::DispatchPendingCallback() {
348 if (callback_.get()) {
349 scoped_ptr<FilterCallback> callback(callback_.release());
350 callback->Run();
351 }
352 }
353
354 CompositeFilter::State CompositeFilter::GetNextState(State state) const {
355 State ret = kInvalid;
356 switch (state) {
357 case kPlayPending:
358 ret = kPlaying;
359 break;
360 case kPausePending:
361 ret = kPaused;
362 case kFlushPending:
363 ret = kPaused;
364 break;
365 case kStopPending:
366 ret = kStopped;
367 break;
368 case kSeekPending:
369 ret = kPaused;
370 break;
371 case kStopWhilePlayPending:
372 case kStopWhilePausePending:
373 case kStopWhileFlushPending:
374 case kStopWhileSeekPending:
375 ret = kStopPending;
376 break;
377
378 case kInvalid:
379 case kCreated:
380 case kPlaying:
381 case kPaused:
382 case kStopped:
383 case kError:
384 ret = kInvalid;
385 break;
386
387 // default: intentionally left out to catch missing states.
388 }
389
390 return ret;
391 }
392
393 void CompositeFilter::SerialCallback() {
394 DCHECK_EQ(message_loop_, MessageLoop::current());
395 if (error_ != PIPELINE_OK) {
396 // We encountered an error. Terminate the sequence now.
397 ChangeState(kError);
398 HandleError(error_);
399 return;
400 }
401
402 if (filters_.size() > 0)
403 sequence_index_++;
404
405 if (sequence_index_ == filters_.size()) {
406 // All filters have been successfully called without error.
407 OnCallSequenceDone();
408 } else if (GetNextState(state_) == kStopPending) {
409 // Abort sequence early and start issuing Stop() calls.
410 ChangeState(kStopPending);
411 StartSerialCallSequence();
412 } else {
413 // We aren't done with the sequence. Call the next filter.
414 CallFilter(filters_[sequence_index_],
415 NewThreadSafeCallback(&CompositeFilter::SerialCallback));
416 }
417 }
418
419 void CompositeFilter::ParallelCallback() {
420 DCHECK_EQ(message_loop_, MessageLoop::current());
421
422 if (filters_.size() > 0)
423 sequence_index_++;
424
425 if (sequence_index_ == filters_.size()) {
426 if (error_ != PIPELINE_OK) {
427 // We encountered an error.
428 ChangeState(kError);
429 HandleError(error_);
430 return;
431 }
432
433 OnCallSequenceDone();
434 }
435 }
436
437 void CompositeFilter::OnCallSequenceDone() {
438 State next_state = GetNextState(state_);
439
440 if (next_state == kInvalid) {
441 // We somehow got into an unexpected state.
442 ChangeState(kError);
443 HandleError(PIPELINE_ERROR_INVALID_STATE);
444 }
445
446 ChangeState(next_state);
447
448 if (state_ == kStopPending) {
449 // Handle a deferred Stop().
450 StartSerialCallSequence();
451 } else {
452 // Call the callback to indicate that the operation has completed.
453 DispatchPendingCallback();
454 }
455 }
456
457 void CompositeFilter::SendErrorToHost(PipelineError error) {
458 if (host_impl_.get())
459 host_impl_.get()->host()->SetError(error);
460 }
461
462 void CompositeFilter::HandleError(PipelineError error) {
463 if (error != PIPELINE_OK) {
464 SendErrorToHost(error);
465 }
466
467 DispatchPendingCallback();
468 }
469
470 FilterCallback* CompositeFilter::NewThreadSafeCallback(
471 void (CompositeFilter::*method)()) {
472 return TaskToCallbackAdapter::NewCallback(
473 NewRunnableMethod(this,
474 &CompositeFilter::OnCallback,
475 message_loop_,
476 method));
477 }
478
479 void CompositeFilter::OnCallback(MessageLoop* message_loop,
480 void (CompositeFilter::*method)()) {
481 if (MessageLoop::current() != message_loop) {
482 // Posting callback to the proper thread.
483 message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method));
484 return;
485 }
486
487 (this->*method)();
488 }
489
490 bool CompositeFilter::CanForwardError() {
491 return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused);
492 }
493
494 void CompositeFilter::SetError(PipelineError error) {
495 // TODO(acolwell): Temporary hack to handle errors that occur
496 // during filter initialization. In this case we just forward
497 // the error to the host even if it is on the wrong thread. We
498 // have to do this because if we defer the call, we can't be
499 // sure the host will get the error before the "init done" callback
500 // is executed. This will be cleaned up when filter init is refactored.
501 if (state_ == kCreated) {
502 SendErrorToHost(error);
503 return;
504 }
505
506 if (message_loop_ != MessageLoop::current()) {
507 message_loop_->PostTask(FROM_HERE,
508 NewRunnableMethod(this, &CompositeFilter::SetError, error));
509 return;
510 }
511
512 DCHECK_EQ(message_loop_, MessageLoop::current());
513
514 // Drop errors recieved while stopping or stopped.
515 // This shields the owner of this object from having
516 // to deal with errors it can't do anything about.
517 if (state_ == kStopPending || state_ == kStopped)
518 return;
519
520 error_ = error;
521 if (CanForwardError())
522 SendErrorToHost(error);
523 }
524
525 CompositeFilter::FilterHostImpl::FilterHostImpl(CompositeFilter* parent,
526 FilterHost* host) :
527 parent_(parent),
528 host_(host) {
529 }
530
531 FilterHost* CompositeFilter::FilterHostImpl::host() {
532 return host_;
533 }
534
535 // media::FilterHost methods.
536 void CompositeFilter::FilterHostImpl::SetError(PipelineError error) {
537 parent_->SetError(error);
538 }
539
540 base::TimeDelta CompositeFilter::FilterHostImpl::GetTime() const {
541 return host_->GetTime();
542 }
543
544 base::TimeDelta CompositeFilter::FilterHostImpl::GetDuration() const {
545 return host_->GetDuration();
546 }
547
548 void CompositeFilter::FilterHostImpl::SetTime(base::TimeDelta time) {
549 host_->SetTime(time);
550 }
551
552 void CompositeFilter::FilterHostImpl::SetDuration(base::TimeDelta duration) {
553 host_->SetDuration(duration);
554 }
555
556 void CompositeFilter::FilterHostImpl::SetBufferedTime(
557 base::TimeDelta buffered_time) {
558 host_->SetBufferedTime(buffered_time);
559 }
560
561 void CompositeFilter::FilterHostImpl::SetTotalBytes(int64 total_bytes) {
562 host_->SetTotalBytes(total_bytes);
563 }
564
565 void CompositeFilter::FilterHostImpl::SetBufferedBytes(int64 buffered_bytes) {
566 host_->SetBufferedBytes(buffered_bytes);
567 }
568
569 void CompositeFilter::FilterHostImpl::SetVideoSize(size_t width,
570 size_t height) {
571 host_->SetVideoSize(width, height);
572 }
573
574 void CompositeFilter::FilterHostImpl::SetStreaming(bool streaming) {
575 host_->SetStreaming(streaming);
576 }
577
578 void CompositeFilter::FilterHostImpl::NotifyEnded() {
579 host_->NotifyEnded();
580 }
581
582 void CompositeFilter::FilterHostImpl::SetLoaded(bool loaded) {
583 host_->SetLoaded(loaded);
584 }
585
586 void CompositeFilter::FilterHostImpl::SetNetworkActivity(
587 bool network_activity) {
588 host_->SetNetworkActivity(network_activity);
589 }
590
591 void CompositeFilter::FilterHostImpl::DisableAudioRenderer() {
592 host_->DisableAudioRenderer();
593 }
594
595 void CompositeFilter::FilterHostImpl::SetCurrentReadPosition(int64 offset) {
596 host_->SetCurrentReadPosition(offset);
597 }
598
599 int64 CompositeFilter::FilterHostImpl::GetCurrentReadPosition() {
600 return host_->GetCurrentReadPosition();
601 }
602
603 } // namespace media
OLDNEW
« no previous file with comments | « media/base/composite_filter.h ('k') | media/base/composite_filter_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698