Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(309)

Side by Side Diff: media/base/composite_filter.cc

Issue 5744002: Refactor PipelineImpl to use CompositeFilter to manage Filter state transitions. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add explicit MessageLoop usage to make sure code is running on the expected threads. Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/stl_util-inl.h"
6 #include "media/base/callback.h"
7 #include "media/base/composite_filter.h"
scherkus (not reviewing) 2010/12/15 16:44:02 include corresponding .h first followed by blank l
acolwell GONE FROM CHROMIUM 2010/12/15 18:20:11 Done.
8
9 namespace media {
10
11 CompositeFilter::CompositeFilter(MessageLoop* message_loop)
12 {
scherkus (not reviewing) 2010/12/15 16:44:02 style: { on previous line
acolwell GONE FROM CHROMIUM 2010/12/15 18:20:11 Done.
13 Init(message_loop, NULL);
14 }
15
16 CompositeFilter::CompositeFilter(MessageLoop* message_loop,
17 ThreadFactoryFunction thread_factory)
18 {
scherkus (not reviewing) 2010/12/15 16:44:02 style: { on previous line
acolwell GONE FROM CHROMIUM 2010/12/15 18:20:11 Done.
19 DCHECK(thread_factory);
20 Init(message_loop, thread_factory);
21 }
22
23 void CompositeFilter::Init(MessageLoop* message_loop,
24 ThreadFactoryFunction thread_factory) {
25 DCHECK(message_loop);
26 message_loop_ = message_loop;
27 thread_factory_ = thread_factory;
28
29 if (!thread_factory_) {
30 thread_factory_ = &CompositeFilter::DefaultThreadFactory;
31 }
32
33 state_ = kCreated;
34 sequence_index_ = -1;
35 host_ = NULL;
36 error_ = PIPELINE_OK;
37 }
38
39 CompositeFilter::~CompositeFilter() {
40 DCHECK_EQ(message_loop_, MessageLoop::current());
41 DCHECK(state_ == kCreated || state_ == kStopped);
42
43 // Stop every running filter thread.
44 for (FilterThreadVector::iterator iter = filter_threads_.begin();
45 iter != filter_threads_.end();
46 ++iter) {
47 (*iter)->Stop();
48 }
49
50 // Reset the pipeline, which will decrement a reference to this object.
51 // We will get destroyed as soon as the remaining tasks finish executing.
52 // To be safe, we'll set our pipeline reference to NULL.
53 filters_.clear();
54 STLDeleteElements(&filter_threads_);
55 }
56
57 bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) {
58 DCHECK_EQ(message_loop_, MessageLoop::current());
59 if (!filter.get() || state_ != kCreated || !host_)
60 return false;
61
62 // Create a dedicated thread for this filter if applicable.
63 if (filter->requires_message_loop()) {
64 scoped_ptr<base::Thread> thread(
65 thread_factory_(filter->message_loop_name()));
66
67 if (!thread.get() || !thread->Start()) {
68 return false;
69 }
70
71 filter->set_message_loop(thread->message_loop());
72 filter_threads_.push_back(thread.release());
73 }
74
75 // Register ourselves as the filter's host.
76 filter->set_host(this);
77 filters_.push_back(make_scoped_refptr(filter.get()));
78 return true;
79 }
80
81 const char* CompositeFilter::major_mime_type() const {
82 return "";
83 }
84
85 void CompositeFilter::set_host(FilterHost* host) {
86 DCHECK_EQ(message_loop_, MessageLoop::current());
87 DCHECK(host);
88 DCHECK(!host_);
89 host_ = host;
90 }
91
92 FilterHost* CompositeFilter::host() {
93 return host_;
94 }
95
96 bool CompositeFilter::requires_message_loop() const {
97 return false;
98 }
99
100 const char* CompositeFilter::message_loop_name() const {
101 return "CompositeFilter";
102 }
103
104 void CompositeFilter::set_message_loop(MessageLoop* message_loop) {
105 NOTREACHED() << "Message loop should not be set.";
106 }
107
108 MessageLoop* CompositeFilter::message_loop() {
109 return NULL;
110 }
111
112 void CompositeFilter::Play(FilterCallback* play_callback) {
113 DCHECK_EQ(message_loop_, MessageLoop::current());
114 scoped_ptr<FilterCallback> callback(play_callback);
115 if (callback_.get()) {
116 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
117 callback->Run();
118 return;
119 } else if (state_ == kPlaying) {
120 callback->Run();
121 return;
122 } else if (!host_ || (state_ != kPaused && state_ != kCreated)) {
123 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
124 callback->Run();
125 return;
126 }
127
128 ChangeState(kPlayPending);
129 callback_.reset(callback.release());
130 StartSerialCallSequence();
131 }
132
133 void CompositeFilter::Pause(FilterCallback* pause_callback) {
134 DCHECK_EQ(message_loop_, MessageLoop::current());
135 scoped_ptr<FilterCallback> callback(pause_callback);
136 if (callback_.get()) {
137 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
138 callback->Run();
139 return;
140 } else if (state_ == kPaused) {
141 callback->Run();
142 return;
143 } else if (!host_ || state_ != kPlaying) {
144 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
145 callback->Run();
146 return;
147 }
148
149 ChangeState(kPausePending);
150 callback_.reset(callback.release());
151 StartSerialCallSequence();
152 }
153
154 void CompositeFilter::Flush(FilterCallback* flush_callback) {
155 DCHECK_EQ(message_loop_, MessageLoop::current());
156 scoped_ptr<FilterCallback> callback(flush_callback);
157 if (callback_.get()) {
158 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
159 callback->Run();
160 return;
161 } else if (!host_ || (state_ != kCreated && state_ != kPaused)) {
162 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
163 callback->Run();
164 return;
165 }
166
167 ChangeState(kFlushPending);
168 callback_.reset(callback.release());
169 StartParallelCallSequence();
170 }
171
172 void CompositeFilter::Stop(FilterCallback* stop_callback) {
173 DCHECK_EQ(message_loop_, MessageLoop::current());
174 scoped_ptr<FilterCallback> callback(stop_callback);
175 if (!host_) {
176 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
177 callback->Run();
178 return;
179 } else if (state_ == kStopped) {
180 callback->Run();
181 return;
182 }
183
184 switch(state_) {
185 case kError:
186 case kCreated:
187 case kPaused:
188 case kPlaying:
189 ChangeState(kStopPending);
190 break;
191 case kPlayPending:
192 ChangeState(kStopWhilePlayPending);
193 break;
194 case kPausePending:
195 ChangeState(kStopWhilePausePending);
196 break;
197 case kFlushPending:
198 ChangeState(kStopWhileFlushPending);
199 break;
200 case kSeekPending:
201 ChangeState(kStopWhileSeekPending);
202 break;
203 default:
204 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
205 callback->Run();
206 return;
207 }
208
209 callback_.reset(callback.release());
210 if (state_ == kStopPending) {
211 StartSerialCallSequence();
212 }
213 }
214
215 void CompositeFilter::SetPlaybackRate(float playback_rate) {
216 DCHECK_EQ(message_loop_, MessageLoop::current());
217 for (FilterVector::iterator iter = filters_.begin();
218 iter != filters_.end();
219 ++iter) {
220 (*iter)->SetPlaybackRate(playback_rate);
221 }
222 }
223
224 void CompositeFilter::Seek(base::TimeDelta time,
225 FilterCallback* seek_callback) {
226 DCHECK_EQ(message_loop_, MessageLoop::current());
227 scoped_ptr<FilterCallback> callback(seek_callback);
228 if (callback_.get()) {
229 SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
230 callback->Run();
231 return;
232 } else if (!host_ || (state_ != kPaused && state_ != kCreated)) {
233 SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
234 callback->Run();
235 return;
236 }
237
238 ChangeState(kSeekPending);
239 callback_.reset(callback.release());
240 pending_seek_time_ = time;
241 StartSerialCallSequence();
242 }
243
244 void CompositeFilter::OnAudioRendererDisabled() {
245 DCHECK_EQ(message_loop_, MessageLoop::current());
246 for (FilterVector::iterator iter = filters_.begin();
247 iter != filters_.end();
248 ++iter) {
249 (*iter)->OnAudioRendererDisabled();
250 }
251 }
252
253 base::Thread* CompositeFilter::DefaultThreadFactory(
254 const char* thread_name) {
255 return new base::Thread(thread_name);
256 }
257
258 void CompositeFilter::ChangeState(State new_state) {
259 state_ = new_state;
260 }
261
262 void CompositeFilter::StartSerialCallSequence() {
263 DCHECK_EQ(message_loop_, MessageLoop::current());
264 error_ = PIPELINE_OK;
265
266 if (filters_.size() > 0) {
267 sequence_index_ = 0;
268 CallFilter(filters_[sequence_index_],
269 NewThreadSafeCallback(&CompositeFilter::SerialCallback));
270 } else {
271 sequence_index_ = -1;
272 SerialCallback();
273 }
274 }
275
276 void CompositeFilter::StartParallelCallSequence() {
277 DCHECK_EQ(message_loop_, MessageLoop::current());
278 error_ = PIPELINE_OK;
279
280 if (filters_.size() > 0) {
281 sequence_index_ = 0;
282 for (size_t i = 0; i < filters_.size(); i++)
283 CallFilter(filters_[i],
scherkus (not reviewing) 2010/12/15 16:44:02 I get freaked out when I see multi-line for/while
acolwell GONE FROM CHROMIUM 2010/12/15 18:20:11 Done.
284 NewThreadSafeCallback(&CompositeFilter::ParallelCallback));
285 } else {
286 sequence_index_ = -1;
287 ParallelCallback();
288 }
289 }
290
291 void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter,
292 FilterCallback* callback) {
293 switch(state_) {
294 case kPlayPending:
295 filter->Play(callback);
296 break;
297 case kPausePending:
298 filter->Pause(callback);
299 break;
300 case kFlushPending:
301 filter->Flush(callback);
302 break;
303 case kStopPending:
304 filter->Stop(callback);
305 break;
306 case kSeekPending:
307 filter->Seek(pending_seek_time_, callback);
308 break;
309 default:
310 delete callback;
311 ChangeState(kError);
312 HandleError(PIPELINE_ERROR_INVALID_STATE);
313 }
314 }
315
316 void CompositeFilter::DispatchPendingCallback() {
317 if (callback_.get()) {
318 scoped_ptr<FilterCallback> callback(callback_.release());
319 callback->Run();
320 }
321 }
322
323 CompositeFilter::State CompositeFilter::GetNextState(State state) const {
324 State ret = kInvalid;
325 switch (state) {
326 case kPlayPending:
327 ret = kPlaying;
328 break;
329 case kPausePending:
330 ret = kPaused;
331 case kFlushPending:
332 ret = kPaused;
333 break;
334 case kStopPending:
335 ret = kStopped;
336 break;
337 case kSeekPending:
338 ret = kPaused;
339 break;
340 case kStopWhilePlayPending:
341 case kStopWhilePausePending:
342 case kStopWhileFlushPending:
343 case kStopWhileSeekPending:
344 ret = kStopPending;
345 break;
346
347 case kInvalid:
348 case kCreated:
349 case kPlaying:
350 case kPaused:
351 case kStopped:
352 case kError:
353 ret = kInvalid;
354 break;
355
356 // default: intentionally left out to catch missing states.
357 }
358
359 return ret;
360 }
361
362 void CompositeFilter::SerialCallback() {
363 DCHECK_EQ(message_loop_, MessageLoop::current());
364 if (error_ != PIPELINE_OK) {
365 // We encountered an error. Terminate the sequence now.
366 ChangeState(kError);
367 HandleError(error_);
368 return;
369 }
370
371 sequence_index_++;
372 if (sequence_index_ == filters_.size()) {
373 // All filters have been successfully called without error.
374 OnCallSequenceDone();
375 } else if (GetNextState(state_) == kStopPending) {
376 // Abort sequence early and start issuing Stop() calls.
377 ChangeState(kStopPending);
378 StartSerialCallSequence();
379 } else {
380 // We aren't done with the sequence. Call the next filter.
381 CallFilter(filters_[sequence_index_],
382 NewThreadSafeCallback(&CompositeFilter::SerialCallback));
383 }
384 }
385
386 void CompositeFilter::ParallelCallback() {
387 DCHECK_EQ(message_loop_, MessageLoop::current());
388 sequence_index_++;
389 if (sequence_index_ == filters_.size()) {
390 if (error_ != PIPELINE_OK) {
391 // We encountered an error.
392 ChangeState(kError);
393 HandleError(error_);
394 return;
395 }
396
397 OnCallSequenceDone();
398 }
399 }
400
401 void CompositeFilter::OnCallSequenceDone() {
402 State next_state = GetNextState(state_);
403
404 if (next_state == kInvalid) {
405 // We somehow got into an unexpected state.
406 ChangeState(kError);
407 HandleError(PIPELINE_ERROR_INVALID_STATE);
408 }
409
410 ChangeState(next_state);
411
412 if (state_ == kStopPending) {
413 // Handle a deferred Stop().
414 StartSerialCallSequence();
415 } else {
416 // Call the callback to indicate that the operation has completed.
417 DispatchPendingCallback();
418 }
419 }
420
421 void CompositeFilter::SendErrorToHost(PipelineError error) {
422 if (host_)
423 host_->SetError(error);
424 }
425
426 void CompositeFilter::HandleError(PipelineError error) {
427 if (error != PIPELINE_OK) {
428 SendErrorToHost(error);
429 }
430
431 DispatchPendingCallback();
432 }
433
434 FilterCallback* CompositeFilter::NewThreadSafeCallback(
435 void (CompositeFilter::*method)()) {
scherkus (not reviewing) 2010/12/15 16:44:02 de-indent by 2
acolwell GONE FROM CHROMIUM 2010/12/15 18:20:11 Done.
436 return TaskToCallbackAdapter::NewCallback(
437 NewRunnableMethod(this,
438 &CompositeFilter::OnCallback,
439 message_loop_,
440 method));
441 }
442
443 void CompositeFilter::OnCallback(MessageLoop* message_loop,
444 void (CompositeFilter::*method)()) {
445 if (MessageLoop::current() != message_loop) {
446 // Posting callback to the proper thread.
447 message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method));
448 return;
449 }
450
451 (this->*method)();
452 }
453
454 bool CompositeFilter::CanForwardError() {
455 return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused);
456 }
457
458 // media::FilterHost methods.
459 void CompositeFilter::SetError(PipelineError error) {
460 if (message_loop_ != MessageLoop::current()) {
461 message_loop_->PostTask(FROM_HERE,
462 NewRunnableMethod(this, &CompositeFilter::SetError, error));
463 return;
464 }
465
466 DCHECK_EQ(message_loop_, MessageLoop::current());
467 DCHECK(host_);
468
469 // Drop errors recieved while stopping or stopped.
470 // This shields the owner of this object from having
471 // to deal with errors it can't do anything about.
472 if (state_ == kStopPending || state_ == kStopped)
473 return;
474
475 error_ = error;
476 if (host_ && CanForwardError())
477 host_->SetError(error);
478 }
479
480 base::TimeDelta CompositeFilter::GetTime() const {
481 DCHECK(host_);
482 return host_ ? host_->GetTime() : base::TimeDelta();
483 }
484
485 base::TimeDelta CompositeFilter::GetDuration() const {
486 DCHECK(host_);
487 return host_ ? host_->GetDuration() : base::TimeDelta();
488 }
489
490 void CompositeFilter::SetTime(base::TimeDelta time) {
491 DCHECK(host_);
scherkus (not reviewing) 2010/12/15 16:44:02 I'd enforce a guarantee that host_ must be present
acolwell GONE FROM CHROMIUM 2010/12/15 18:20:11 Once I move the FilterHost impl to an inner class
scherkus (not reviewing) 2010/12/15 18:57:44 Sounds good!
492 if (host_)
493 host_->SetTime(time);
494 }
495
496 void CompositeFilter::SetDuration(base::TimeDelta duration) {
497 DCHECK(host_);
498 if (host_)
499 host_->SetDuration(duration);
500 }
501
502 void CompositeFilter::SetBufferedTime(base::TimeDelta buffered_time) {
503 DCHECK(host_);
504 if (host_)
505 host_->SetBufferedTime(buffered_time);
506 }
507
508 void CompositeFilter::SetTotalBytes(int64 total_bytes) {
509 DCHECK(host_);
510 if (host_)
511 host_->SetTotalBytes(total_bytes);
512 }
513
514 void CompositeFilter::SetBufferedBytes(int64 buffered_bytes) {
515 DCHECK(host_);
516 if (host_)
517 host_->SetBufferedBytes(buffered_bytes);
518 }
519
520 void CompositeFilter::SetVideoSize(size_t width, size_t height) {
521 DCHECK(host_);
522 if (host_)
523 host_->SetVideoSize(width, height);
524 }
525
526 void CompositeFilter::SetStreaming(bool streaming) {
527 DCHECK(host_);
528 if (host_)
529 host_->SetStreaming(streaming);
530 }
531
532 void CompositeFilter::NotifyEnded() {
533 DCHECK(host_);
534 if (host_)
535 host_->NotifyEnded();
536 }
537
538 void CompositeFilter::SetLoaded(bool loaded) {
539 DCHECK(host_);
540 if (host_)
541 host_->SetLoaded(loaded);
542 }
543
544 void CompositeFilter::SetNetworkActivity(bool network_activity) {
545 DCHECK(host_);
546 if (host_)
547 host_->SetNetworkActivity(network_activity);
548 }
549
550 void CompositeFilter::DisableAudioRenderer() {
551 DCHECK(host_);
552 if (host_)
553 host_->DisableAudioRenderer();
554 }
555
556 void CompositeFilter::SetCurrentReadPosition(int64 offset) {
557 DCHECK(host_);
558 if (host_)
559 host_->SetCurrentReadPosition(offset);
560 }
561
562 int64 CompositeFilter::GetCurrentReadPosition() {
563 DCHECK(host_);
564 return host_ ? host_->GetCurrentReadPosition() : 0;
565 }
566
567 } // namespace media
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698