Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(387)

Side by Side Diff: media/base/pipeline_impl.cc

Issue 155338: Implemented injected message loops for PipelineImpl. (Closed)
Patch Set: Merged with git-svn Created 11 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « media/base/pipeline_impl.h ('k') | media/base/pipeline_impl_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2008-2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 // 4 //
5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names, 5 // TODO(scherkus): clean up PipelineImpl... too many crazy function names,
6 // potential deadlocks, etc... 6 // potential deadlocks, etc...
7 7
8 #include "base/compiler_specific.h" 8 #include "base/compiler_specific.h"
9 #include "base/condition_variable.h" 9 #include "base/condition_variable.h"
10 #include "base/stl_util-inl.h" 10 #include "base/stl_util-inl.h"
(...skipping 13 matching lines...) Expand all
24 switch (Filter::filter_type()) { 24 switch (Filter::filter_type()) {
25 case FILTER_DEMUXER: 25 case FILTER_DEMUXER:
26 case FILTER_AUDIO_DECODER: 26 case FILTER_AUDIO_DECODER:
27 case FILTER_VIDEO_DECODER: 27 case FILTER_VIDEO_DECODER:
28 return true; 28 return true;
29 default: 29 default:
30 return false; 30 return false;
31 } 31 }
32 } 32 }
33 33
34 // Small helper function to help us name filter threads for debugging.
35 //
36 // TODO(scherkus): figure out a cleaner way to derive the filter thread name.
37 template <class Filter>
38 const char* GetThreadName() {
39 DCHECK(SupportsSetMessageLoop<Filter>());
40 switch (Filter::filter_type()) {
41 case FILTER_DEMUXER:
42 return "DemuxerThread";
43 case FILTER_AUDIO_DECODER:
44 return "AudioDecoderThread";
45 case FILTER_VIDEO_DECODER:
46 return "VideoDecoderThread";
47 default:
48 return "FilterThread";
49 }
50 }
51
34 // Helper function used with NewRunnableMethod to implement a (very) crude 52 // Helper function used with NewRunnableMethod to implement a (very) crude
35 // blocking counter. 53 // blocking counter.
36 // 54 //
37 // TODO(scherkus): remove this as soon as Stop() is made asynchronous. 55 // TODO(scherkus): remove this as soon as Stop() is made asynchronous.
38 void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { 56 void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
39 AutoLock auto_lock(*lock); 57 AutoLock auto_lock(*lock);
40 --(*count); 58 --(*count);
41 CHECK(*count >= 0); 59 CHECK(*count >= 0);
42 if (*count == 0) { 60 if (*count == 0) {
43 cond_var->Signal(); 61 cond_var->Signal();
44 } 62 }
45 } 63 }
46 64
47 } // namespace 65 } // namespace
48 66
49 PipelineImpl::PipelineImpl() { 67 PipelineImpl::PipelineImpl(MessageLoop* message_loop)
68 : message_loop_(message_loop) {
50 ResetState(); 69 ResetState();
51 } 70 }
52 71
53 PipelineImpl::~PipelineImpl() { 72 PipelineImpl::~PipelineImpl() {
54 Stop(); 73 DCHECK(!pipeline_internal_)
74 << "Stop() must complete before destroying object";
55 } 75 }
56 76
57 // Creates the PipelineThread and calls it's start method. 77 // Creates the PipelineInternal and calls it's start method.
58 bool PipelineImpl::Start(FilterFactory* factory, 78 bool PipelineImpl::Start(FilterFactory* factory,
59 const std::string& url, 79 const std::string& url,
60 PipelineCallback* init_complete_callback) { 80 PipelineCallback* start_callback) {
61 DCHECK(!pipeline_thread_); 81 DCHECK(!pipeline_internal_);
62 DCHECK(factory); 82 DCHECK(factory);
63 DCHECK(!initialized_); 83 if (pipeline_internal_ || !factory) {
64 DCHECK(!IsPipelineThread()); 84 return false;
65 if (!pipeline_thread_ && factory) {
66 pipeline_thread_ = new PipelineThread(this);
67 if (pipeline_thread_) {
68 // TODO(ralphl): Does the callback get copied by these fancy templates?
69 // if so, then do I want to always delete it here???
70 if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
71 return true;
72 }
73 pipeline_thread_ = NULL; // Releases reference to destroy thread
74 }
75 } 85 }
76 delete init_complete_callback; 86
77 return false; 87 // Create and start the PipelineInternal.
88 pipeline_internal_ = new PipelineInternal(this, message_loop_);
89 if (!pipeline_internal_) {
90 NOTREACHED() << "Could not create PipelineInternal";
91 return false;
92 }
93 pipeline_internal_->Start(factory, url, start_callback);
94 return true;
78 } 95 }
79 96
80 // Stop the PipelineThread and return to a state identical to that of a newly 97 // Stop the PipelineInternal who will NULL our reference to it and reset our
81 // created PipelineImpl object. 98 // state to a newly created PipelineImpl object.
82 void PipelineImpl::Stop() { 99 void PipelineImpl::Stop(PipelineCallback* stop_callback) {
83 DCHECK(!IsPipelineThread()); 100 if (pipeline_internal_) {
84 101 pipeline_internal_->Stop(stop_callback);
85 if (pipeline_thread_) {
86 pipeline_thread_->Stop();
87 } 102 }
88 ResetState();
89 } 103 }
90 104
91 void PipelineImpl::Seek(base::TimeDelta time, 105 void PipelineImpl::Seek(base::TimeDelta time,
92 PipelineCallback* seek_callback) { 106 PipelineCallback* seek_callback) {
93 DCHECK(!IsPipelineThread());
94
95 if (IsPipelineOk()) { 107 if (IsPipelineOk()) {
96 pipeline_thread_->Seek(time, seek_callback); 108 pipeline_internal_->Seek(time, seek_callback);
97 } else { 109 } else {
98 NOTREACHED(); 110 NOTREACHED();
99 } 111 }
100 } 112 }
101 113
114 bool PipelineImpl::IsRunning() const {
115 AutoLock auto_lock(const_cast<Lock&>(lock_));
116 return pipeline_internal_ != NULL;
117 }
118
102 bool PipelineImpl::IsInitialized() const { 119 bool PipelineImpl::IsInitialized() const {
103 AutoLock auto_lock(lock_); 120 AutoLock auto_lock(lock_);
104 return initialized_; 121 return pipeline_internal_ && pipeline_internal_->IsInitialized();
105 } 122 }
106 123
107 bool PipelineImpl::IsRendered(const std::string& major_mime_type) const { 124 bool PipelineImpl::IsRendered(const std::string& major_mime_type) const {
108 AutoLock auto_lock(lock_); 125 AutoLock auto_lock(lock_);
109 bool is_rendered = (rendered_mime_types_.find(major_mime_type) != 126 bool is_rendered = (rendered_mime_types_.find(major_mime_type) !=
110 rendered_mime_types_.end()); 127 rendered_mime_types_.end());
111 return is_rendered; 128 return is_rendered;
112 } 129 }
113 130
114 float PipelineImpl::GetPlaybackRate() const { 131 float PipelineImpl::GetPlaybackRate() const {
115 AutoLock auto_lock(lock_); 132 AutoLock auto_lock(lock_);
116 return playback_rate_; 133 return playback_rate_;
117 } 134 }
118 135
119 void PipelineImpl::SetPlaybackRate(float rate) { 136 void PipelineImpl::SetPlaybackRate(float rate) {
120 DCHECK(!IsPipelineThread());
121
122 if (IsPipelineOk() && rate >= 0.0f) { 137 if (IsPipelineOk() && rate >= 0.0f) {
123 pipeline_thread_->SetPlaybackRate(rate); 138 pipeline_internal_->SetPlaybackRate(rate);
124 } else { 139 } else {
125 // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped. 140 // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped.
126 DCHECK(rate == 0.0f && playback_rate_ == 0.0f); 141 DCHECK(rate == 0.0f && playback_rate_ == 0.0f);
127 } 142 }
128 } 143 }
129 144
130 float PipelineImpl::GetVolume() const { 145 float PipelineImpl::GetVolume() const {
131 AutoLock auto_lock(lock_); 146 AutoLock auto_lock(lock_);
132 return volume_; 147 return volume_;
133 } 148 }
134 149
135 void PipelineImpl::SetVolume(float volume) { 150 void PipelineImpl::SetVolume(float volume) {
136 DCHECK(!IsPipelineThread());
137
138 if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) { 151 if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) {
139 pipeline_thread_->SetVolume(volume); 152 pipeline_internal_->SetVolume(volume);
140 } else { 153 } else {
141 NOTREACHED(); 154 NOTREACHED();
142 } 155 }
143 } 156 }
144 157
145 base::TimeDelta PipelineImpl::GetTime() const { 158 base::TimeDelta PipelineImpl::GetTime() const {
146 AutoLock auto_lock(lock_); 159 AutoLock auto_lock(lock_);
147 return time_; 160 return time_;
148 } 161 }
149 162
(...skipping 25 matching lines...) Expand all
175 *height_out = video_height_; 188 *height_out = video_height_;
176 } 189 }
177 190
178 PipelineError PipelineImpl::GetError() const { 191 PipelineError PipelineImpl::GetError() const {
179 AutoLock auto_lock(lock_); 192 AutoLock auto_lock(lock_);
180 return error_; 193 return error_;
181 } 194 }
182 195
183 void PipelineImpl::ResetState() { 196 void PipelineImpl::ResetState() {
184 AutoLock auto_lock(lock_); 197 AutoLock auto_lock(lock_);
185 pipeline_thread_ = NULL; 198 pipeline_internal_ = NULL;
186 initialized_ = false;
187 duration_ = base::TimeDelta(); 199 duration_ = base::TimeDelta();
188 buffered_time_ = base::TimeDelta(); 200 buffered_time_ = base::TimeDelta();
189 buffered_bytes_ = 0; 201 buffered_bytes_ = 0;
190 total_bytes_ = 0; 202 total_bytes_ = 0;
191 video_width_ = 0; 203 video_width_ = 0;
192 video_height_ = 0; 204 video_height_ = 0;
193 volume_ = 0.0f; 205 volume_ = 0.0f;
194 playback_rate_ = 0.0f; 206 playback_rate_ = 0.0f;
195 error_ = PIPELINE_OK; 207 error_ = PIPELINE_OK;
196 time_ = base::TimeDelta(); 208 time_ = base::TimeDelta();
197 rendered_mime_types_.clear(); 209 rendered_mime_types_.clear();
198 } 210 }
199 211
200 bool PipelineImpl::IsPipelineOk() const { 212 bool PipelineImpl::IsPipelineOk() const {
201 return pipeline_thread_ && initialized_ && PIPELINE_OK == error_; 213 return pipeline_internal_ && PIPELINE_OK == error_;
202 }
203
204 bool PipelineImpl::IsPipelineThread() const {
205 return pipeline_thread_ &&
206 PlatformThread::CurrentId() == pipeline_thread_->thread_id();
207 } 214 }
208 215
209 void PipelineImpl::SetDuration(base::TimeDelta duration) { 216 void PipelineImpl::SetDuration(base::TimeDelta duration) {
210 AutoLock auto_lock(lock_); 217 AutoLock auto_lock(lock_);
211 duration_ = duration; 218 duration_ = duration;
212 } 219 }
213 220
214 void PipelineImpl::SetBufferedTime(base::TimeDelta buffered_time) { 221 void PipelineImpl::SetBufferedTime(base::TimeDelta buffered_time) {
215 AutoLock auto_lock(lock_); 222 AutoLock auto_lock(lock_);
216 buffered_time_ = buffered_time; 223 buffered_time_ = buffered_time;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
256 } 263 }
257 264
258 void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) { 265 void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) {
259 AutoLock auto_lock(lock_); 266 AutoLock auto_lock(lock_);
260 rendered_mime_types_.insert(major_mime_type); 267 rendered_mime_types_.insert(major_mime_type);
261 } 268 }
262 269
263 270
264 //----------------------------------------------------------------------------- 271 //-----------------------------------------------------------------------------
265 272
266 PipelineThread::PipelineThread(PipelineImpl* pipeline) 273 PipelineInternal::PipelineInternal(PipelineImpl* pipeline,
274 MessageLoop* message_loop)
267 : pipeline_(pipeline), 275 : pipeline_(pipeline),
268 thread_("PipelineThread"), 276 message_loop_(message_loop),
269 state_(kCreated) { 277 state_(kCreated) {
270 } 278 }
271 279
272 PipelineThread::~PipelineThread() { 280 PipelineInternal::~PipelineInternal() {
273 Stop(); 281 DCHECK(state_ == kCreated || state_ == kStopped);
274 DCHECK(state_ == kStopped || state_ == kError);
275 }
276
277 // This method is called on the client's thread. It starts the pipeline's
278 // dedicated thread and posts a task to call the StartTask() method on that
279 // thread.
280 bool PipelineThread::Start(FilterFactory* filter_factory,
281 const std::string& url,
282 PipelineCallback* init_complete_callback) {
283 DCHECK_EQ(kCreated, state_);
284 if (thread_.Start()) {
285 filter_factory_ = filter_factory;
286 url_ = url;
287 init_callback_.reset(init_complete_callback);
288 message_loop()->PostTask(FROM_HERE,
289 NewRunnableMethod(this, &PipelineThread::StartTask));
290 return true;
291 }
292 return false;
293 }
294
295 // Called on the client's thread. If the thread has been started, then posts
296 // a task to call the StopTask() method, then waits until the thread has
297 // stopped.
298 void PipelineThread::Stop() {
299 if (thread_.IsRunning()) {
300 message_loop()->PostTask(FROM_HERE,
301 NewRunnableMethod(this, &PipelineThread::StopTask));
302 thread_.Stop();
303 }
304 DCHECK(filter_hosts_.empty());
305 DCHECK(filter_threads_.empty());
306 } 282 }
307 283
308 // Called on client's thread. 284 // Called on client's thread.
309 void PipelineThread::Seek(base::TimeDelta time, 285 void PipelineInternal::Start(FilterFactory* filter_factory,
310 PipelineCallback* seek_callback) { 286 const std::string& url,
311 message_loop()->PostTask(FROM_HERE, 287 PipelineCallback* start_callback) {
312 NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback)); 288 DCHECK(filter_factory);
289 message_loop_->PostTask(FROM_HERE,
290 NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url,
291 start_callback));
313 } 292 }
314 293
315 // Called on client's thread. 294 // Called on client's thread.
316 void PipelineThread::SetPlaybackRate(float rate) { 295 void PipelineInternal::Stop(PipelineCallback* stop_callback) {
317 message_loop()->PostTask(FROM_HERE, 296 message_loop_->PostTask(FROM_HERE,
318 NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); 297 NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback));
319 } 298 }
320 299
321 // Called on client's thread. 300 // Called on client's thread.
322 void PipelineThread::SetVolume(float volume) { 301 void PipelineInternal::Seek(base::TimeDelta time,
323 message_loop()->PostTask(FROM_HERE, 302 PipelineCallback* seek_callback) {
324 NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); 303 message_loop_->PostTask(FROM_HERE,
304 NewRunnableMethod(this, &PipelineInternal::SeekTask, time,
305 seek_callback));
306 }
307
308 // Called on client's thread.
309 void PipelineInternal::SetPlaybackRate(float rate) {
310 message_loop_->PostTask(FROM_HERE,
311 NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate));
312 }
313
314 // Called on client's thread.
315 void PipelineInternal::SetVolume(float volume) {
316 message_loop_->PostTask(FROM_HERE,
317 NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume));
325 } 318 }
326 319
327 // Called from any thread. 320 // Called from any thread.
328 void PipelineThread::InitializationComplete(FilterHostImpl* host) { 321 void PipelineInternal::InitializationComplete(FilterHostImpl* host) {
329 if (IsPipelineOk()) { 322 if (IsPipelineOk()) {
330 // Continue the start task by proceeding to the next stage. 323 // Continue the initialize task by proceeding to the next stage.
331 message_loop()->PostTask(FROM_HERE, 324 message_loop_->PostTask(FROM_HERE,
332 NewRunnableMethod(this, &PipelineThread::StartTask)); 325 NewRunnableMethod(this, &PipelineInternal::InitializeTask));
333 } 326 }
334 } 327 }
335 328
336 // Called from any thread. Updates the pipeline time. 329 // Called from any thread. Updates the pipeline time.
337 void PipelineThread::SetTime(base::TimeDelta time) { 330 void PipelineInternal::SetTime(base::TimeDelta time) {
338 pipeline()->SetTime(time); 331 // TODO(scherkus): why not post a task?
332 pipeline_->SetTime(time);
339 } 333 }
340 334
341 // Called from any thread. Sets the pipeline |error_| member and schedules a 335 // Called from any thread. Sets the pipeline |error_| member and destroys all
342 // task to stop all the filters in the pipeline. Note that the thread will 336 // filters.
343 // continue to run until the client calls Pipeline::Stop(), but nothing will 337 void PipelineInternal::Error(PipelineError error) {
344 // be processed since filters will not be able to post tasks. 338 message_loop_->PostTask(FROM_HERE,
345 void PipelineThread::Error(PipelineError error) { 339 NewRunnableMethod(this, &PipelineInternal::ErrorTask, error));
346 // If this method returns false, then an error has already happened, so no
347 // reason to run the StopTask again. It's going to happen.
348 if (pipeline()->InternalSetError(error)) {
349 message_loop()->PostTask(FROM_HERE,
350 NewRunnableMethod(this, &PipelineThread::StopTask));
351 }
352 } 340 }
353 341
354 // Called as a result of destruction of the thread. 342 void PipelineInternal::StartTask(FilterFactory* filter_factory,
355 // 343 const std::string& url,
356 // TODO(scherkus): this can block the client due to synchronous Stop() API call. 344 PipelineCallback* start_callback) {
357 void PipelineThread::WillDestroyCurrentMessageLoop() { 345 DCHECK_EQ(MessageLoop::current(), message_loop_);
358 STLDeleteElements(&filter_hosts_); 346 DCHECK_EQ(kCreated, state_);
359 STLDeleteElements(&filter_threads_); 347 filter_factory_ = filter_factory;
348 url_ = url;
349 start_callback_.reset(start_callback);
350
351 // Kick off initialization.
352 InitializeTask();
360 } 353 }
361 354
362 // Main initialization method called on the pipeline thread. This code attempts 355 // Main initialization method called on the pipeline thread. This code attempts
363 // to use the specified filter factory to build a pipeline. 356 // to use the specified filter factory to build a pipeline.
364 // Initialization step performed in this method depends on current state of this 357 // Initialization step performed in this method depends on current state of this
365 // object, indicated by |state_|. After each step of initialization, this 358 // object, indicated by |state_|. After each step of initialization, this
366 // object transits to the next stage. It starts by creating a DataSource, 359 // object transits to the next stage. It starts by creating a DataSource,
367 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an 360 // connects it to a Demuxer, and then connects the Demuxer's audio stream to an
368 // AudioDecoder which is then connected to an AudioRenderer. If the media has 361 // AudioDecoder which is then connected to an AudioRenderer. If the media has
369 // video, then it connects a VideoDecoder to the Demuxer's video stream, and 362 // video, then it connects a VideoDecoder to the Demuxer's video stream, and
370 // then connects the VideoDecoder to a VideoRenderer. 363 // then connects the VideoDecoder to a VideoRenderer.
371 // 364 //
372 // When all required filters have been created and have called their 365 // When all required filters have been created and have called their
373 // FilterHost's InitializationComplete method, the pipeline's |initialized_| 366 // FilterHost's InitializationComplete() method, the pipeline will update its
374 // member is set to true, and, if the client provided an 367 // state to kStarted and |init_callback_|, will be executed.
375 // |init_complete_callback_|, it is called with "true".
376 // 368 //
377 // If initialization fails, the client's callback will still be called, but 369 // If initialization fails, the client's callback will still be called, but
378 // the bool parameter passed to it will be false. 370 // the bool parameter passed to it will be false.
379 // 371 //
380 // TODO(hclam): StartTask is now starting the pipeline asynchronously. It 372 // TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It
381 // works like a big state change table. If we no longer need to start filters 373 // works like a big state change table. If we no longer need to start filters
382 // in order, we need to get rid of all the state change. 374 // in order, we need to get rid of all the state change.
383 void PipelineThread::StartTask() { 375 void PipelineInternal::InitializeTask() {
384 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 376 DCHECK_EQ(MessageLoop::current(), message_loop_);
385 377
386 // If we have received the stop signal, return immediately. 378 // If we have received the stop signal, return immediately.
387 if (state_ == kStopped) 379 if (state_ == kStopped)
388 return; 380 return;
389 381
390 DCHECK(state_ == kCreated || IsPipelineInitializing()); 382 DCHECK(state_ == kCreated || IsPipelineInitializing());
391 383
392 // Just created, create data source. 384 // Just created, create data source.
393 if (state_ == kCreated) { 385 if (state_ == kCreated) {
394 message_loop()->AddDestructionObserver(this);
395 state_ = kInitDataSource; 386 state_ = kInitDataSource;
396 CreateDataSource(); 387 CreateDataSource();
397 return; 388 return;
398 } 389 }
399 390
400 // Data source created, create demuxer. 391 // Data source created, create demuxer.
401 if (state_ == kInitDataSource) { 392 if (state_ == kInitDataSource) {
402 state_ = kInitDemuxer; 393 state_ = kInitDemuxer;
403 CreateDemuxer(); 394 CreateDemuxer();
404 return; 395 return;
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
439 } 430 }
440 } 431 }
441 432
442 if (state_ == kInitVideoRenderer) { 433 if (state_ == kInitVideoRenderer) {
443 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { 434 if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) {
444 Error(PIPELINE_ERROR_COULD_NOT_RENDER); 435 Error(PIPELINE_ERROR_COULD_NOT_RENDER);
445 return; 436 return;
446 } 437 }
447 438
448 state_ = kStarted; 439 state_ = kStarted;
449 pipeline_->initialized_ = true;
450 filter_factory_ = NULL; 440 filter_factory_ = NULL;
451 if (init_callback_.get()) { 441 if (start_callback_.get()) {
452 init_callback_->Run(true); 442 start_callback_->Run(true);
453 init_callback_.reset(); 443 start_callback_.reset();
454 } 444 }
455 } 445 }
456 } 446 }
457 447
458 // This method is called as a result of the client calling Pipeline::Stop() or 448 // This method is called as a result of the client calling Pipeline::Stop() or
459 // as the result of an error condition. If there is no error, then set the 449 // as the result of an error condition. If there is no error, then set the
460 // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the 450 // pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the
461 // reverse order. 451 // reverse order.
462 // 452 //
463 // TODO(scherkus): beware! this can get posted multiple times since we post 453 // TODO(scherkus): beware! this can get posted multiple times since we post
464 // Stop() tasks even if we've already stopped. Perhaps this should no-op for 454 // Stop() tasks even if we've already stopped. Perhaps this should no-op for
465 // additional calls, however most of this logic will be changing. 455 // additional calls, however most of this logic will be changing.
466 void PipelineThread::StopTask() { 456 void PipelineInternal::StopTask(PipelineCallback* stop_callback) {
467 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 457 DCHECK_EQ(MessageLoop::current(), message_loop_);
458 stop_callback_.reset(stop_callback);
468 459
469 if (IsPipelineInitializing()) { 460 // If we've already stopped, return immediately.
470 // If IsPipelineOk() is true, the pipeline was simply stopped during 461 if (state_ == kStopped) {
471 // initialization. Otherwise it is a failure. 462 return;
472 state_ = IsPipelineOk() ? kStopped : kError;
473 filter_factory_ = NULL;
474 if (init_callback_.get()) {
475 init_callback_->Run(false);
476 init_callback_.reset();
477 }
478 } else {
479 state_ = kStopped;
480 } 463 }
481 464
482 if (IsPipelineOk()) { 465 // Carry out setting the error, notifying the client and destroying filters.
483 pipeline_->error_ = PIPELINE_STOPPING; 466 ErrorTask(PIPELINE_STOPPING);
484 }
485 467
486 // Stop every filter. 468 // We no longer need to examine our previous state, set it to stopped.
487 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 469 state_ = kStopped;
488 iter != filter_hosts_.end();
489 ++iter) {
490 (*iter)->Stop();
491 }
492 470
493 // Figure out how many threads we have to stop. 471 // Reset the pipeline and set our reference to NULL so we don't accidentally
494 // 472 // modify the pipeline. Once remaining tasks execute we will be destroyed.
495 // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. 473 pipeline_->ResetState();
496 FilterThreadVector running_threads; 474 pipeline_ = NULL;
497 for (FilterThreadVector::iterator iter = filter_threads_.begin();
498 iter != filter_threads_.end();
499 ++iter) {
500 if ((*iter)->IsRunning()) {
501 running_threads.push_back(*iter);
502 }
503 }
504 475
505 // Crude blocking counter implementation. 476 // Notify the client that stopping has finished.
506 Lock lock; 477 if (stop_callback_.get()) {
507 ConditionVariable wait_for_zero(&lock); 478 stop_callback_->Run(true);
508 int count = running_threads.size(); 479 stop_callback_.reset();
509
510 // Post a task to every filter's thread to ensure that they've completed their
511 // stopping logic before stopping the threads themselves.
512 //
513 // TODO(scherkus): again, Stop() should either be synchronous or we should
514 // receive a signal from filters that they have indeed stopped.
515 for (FilterThreadVector::iterator iter = running_threads.begin();
516 iter != running_threads.end();
517 ++iter) {
518 (*iter)->message_loop()->PostTask(FROM_HERE,
519 NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
520 }
521
522 // Wait on our "blocking counter".
523 {
524 AutoLock auto_lock(lock);
525 while (count > 0) {
526 wait_for_zero.Wait();
527 }
528 }
529
530 // Stop every running filter thread.
531 //
532 // TODO(scherkus): can we watchdog this section to detect wedged threads?
533 for (FilterThreadVector::iterator iter = running_threads.begin();
534 iter != running_threads.end();
535 ++iter) {
536 (*iter)->Stop();
537 } 480 }
538 } 481 }
539 482
540 void PipelineThread::SetPlaybackRateTask(float rate) { 483 void PipelineInternal::ErrorTask(PipelineError error) {
541 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 484 DCHECK_EQ(MessageLoop::current(), message_loop_);
485 DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
486
487 // Suppress executing additional error logic.
488 if (state_ == kError) {
489 return;
490 }
491
492 // Update our error code first in case we execute the start callback.
493 pipeline_->error_ = error;
494
495 // Notify the client that starting did not complete, if necessary.
496 if (IsPipelineInitializing() && start_callback_.get()) {
497 start_callback_->Run(false);
498 }
499 start_callback_.reset();
500 filter_factory_ = NULL;
501
502 // We no longer need to examine our previous state, set it to stopped.
503 state_ = kError;
504
505 // Destroy every filter and reset the pipeline as well.
506 DestroyFilters();
507 }
508
509 void PipelineInternal::SetPlaybackRateTask(float rate) {
510 DCHECK_EQ(MessageLoop::current(), message_loop_);
542 511
543 pipeline_->InternalSetPlaybackRate(rate); 512 pipeline_->InternalSetPlaybackRate(rate);
544 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 513 for (FilterHostVector::iterator iter = filter_hosts_.begin();
545 iter != filter_hosts_.end(); 514 iter != filter_hosts_.end();
546 ++iter) { 515 ++iter) {
547 (*iter)->media_filter()->SetPlaybackRate(rate); 516 (*iter)->media_filter()->SetPlaybackRate(rate);
548 } 517 }
549 } 518 }
550 519
551 void PipelineThread::SeekTask(base::TimeDelta time, 520 void PipelineInternal::SeekTask(base::TimeDelta time,
552 PipelineCallback* seek_callback) { 521 PipelineCallback* seek_callback) {
553 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 522 DCHECK_EQ(MessageLoop::current(), message_loop_);
523 seek_callback_.reset(seek_callback);
554 524
555 for (FilterHostVector::iterator iter = filter_hosts_.begin(); 525 for (FilterHostVector::iterator iter = filter_hosts_.begin();
556 iter != filter_hosts_.end(); 526 iter != filter_hosts_.end();
557 ++iter) { 527 ++iter) {
558 (*iter)->media_filter()->Seek(time); 528 (*iter)->media_filter()->Seek(time);
559 } 529 }
560 530
561 // TODO(hclam): we should set the time when the above seek operations were all 531 // TODO(hclam): we should set the time when the above seek operations were all
562 // successful and first frame/packet at the desired time is decoded. I'm 532 // successful and first frame/packet at the desired time is decoded. I'm
563 // setting the time here because once we do the callback the user can ask for 533 // setting the time here because once we do the callback the user can ask for
564 // current time immediately, which is the old time. In order to get rid this 534 // current time immediately, which is the old time. In order to get rid this
565 // little glitch, we either assume the seek was successful and time is updated 535 // little glitch, we either assume the seek was successful and time is updated
566 // immediately here or we set time and do callback when we have new 536 // immediately here or we set time and do callback when we have new
567 // frames/packets. 537 // frames/packets.
568 SetTime(time); 538 SetTime(time);
569 if (seek_callback) { 539 if (seek_callback_.get()) {
570 seek_callback->Run(true); 540 seek_callback_->Run(true);
571 delete seek_callback; 541 seek_callback_.reset();
572 } 542 }
573 } 543 }
574 544
575 void PipelineThread::SetVolumeTask(float volume) { 545 void PipelineInternal::SetVolumeTask(float volume) {
576 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 546 DCHECK_EQ(MessageLoop::current(), message_loop_);
577 547
578 pipeline_->volume_ = volume; 548 pipeline_->volume_ = volume;
579 scoped_refptr<AudioRenderer> audio_renderer; 549 scoped_refptr<AudioRenderer> audio_renderer;
580 GetFilter(&audio_renderer); 550 GetFilter(&audio_renderer);
581 if (audio_renderer) { 551 if (audio_renderer) {
582 audio_renderer->SetVolume(volume); 552 audio_renderer->SetVolume(volume);
583 } 553 }
584 } 554 }
585 555
586 template <class Filter, class Source> 556 template <class Filter, class Source>
587 void PipelineThread::CreateFilter(FilterFactory* filter_factory, 557 void PipelineInternal::CreateFilter(FilterFactory* filter_factory,
588 Source source, 558 Source source,
589 const MediaFormat& media_format) { 559 const MediaFormat& media_format) {
590 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 560 DCHECK_EQ(MessageLoop::current(), message_loop_);
591 DCHECK(IsPipelineOk()); 561 DCHECK(IsPipelineOk());
592 562
563 // Create the filter.
593 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); 564 scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
594 if (!filter) { 565 if (!filter) {
595 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); 566 Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
596 } else { 567 return;
597 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); 568 }
598 // Create a dedicated thread for this filter. 569
599 if (SupportsSetMessageLoop<Filter>()) { 570 // Create a dedicated thread for this filter if applicable.
600 // TODO(scherkus): figure out a way to name these threads so it matches 571 if (SupportsSetMessageLoop<Filter>()) {
601 // the filter type. 572 scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>()));
602 scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); 573 if (!thread.get() || !thread->Start()) {
603 if (!thread.get() || !thread->Start()) { 574 NOTREACHED() << "Could not start filter thread";
604 NOTREACHED() << "Could not start filter thread"; 575 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
605 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); 576 return;
606 } else {
607 filter->set_message_loop(thread->message_loop());
608 filter_threads_.push_back(thread.release());
609 }
610 } 577 }
611 578
612 // Creating a thread could have failed, verify we're still OK. 579 filter->set_message_loop(thread->message_loop());
613 if (IsPipelineOk()) { 580 filter_threads_.push_back(thread.release());
614 filter_hosts_.push_back(host.get()); 581 }
615 filter->set_host(host.release()); 582
616 if (!filter->Initialize(source)) { 583 // Create the filter's host.
617 Error(PIPELINE_ERROR_INITIALIZATION_FAILED); 584 DCHECK(IsPipelineOk());
618 } 585 scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
619 } 586 filter->set_host(host.get());
587 filter_hosts_.push_back(host.release());
588
589 // Now initialize the filter.
590 if (!filter->Initialize(source)) {
591 Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
620 } 592 }
621 } 593 }
622 594
623 void PipelineThread::CreateDataSource() { 595 void PipelineInternal::CreateDataSource() {
624 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 596 DCHECK_EQ(MessageLoop::current(), message_loop_);
625 DCHECK(IsPipelineOk()); 597 DCHECK(IsPipelineOk());
626 598
627 MediaFormat url_format; 599 MediaFormat url_format;
628 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); 600 url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
629 url_format.SetAsString(MediaFormat::kURL, url_); 601 url_format.SetAsString(MediaFormat::kURL, url_);
630 CreateFilter<DataSource>(filter_factory_, url_, url_format); 602 CreateFilter<DataSource>(filter_factory_, url_, url_format);
631 } 603 }
632 604
633 void PipelineThread::CreateDemuxer() { 605 void PipelineInternal::CreateDemuxer() {
634 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 606 DCHECK_EQ(MessageLoop::current(), message_loop_);
635 DCHECK(IsPipelineOk()); 607 DCHECK(IsPipelineOk());
636 608
637 scoped_refptr<DataSource> data_source; 609 scoped_refptr<DataSource> data_source;
638 GetFilter(&data_source); 610 GetFilter(&data_source);
639 DCHECK(data_source); 611 DCHECK(data_source);
640 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); 612 CreateFilter<Demuxer, DataSource>(filter_factory_, data_source);
641 } 613 }
642 614
643 template <class Decoder> 615 template <class Decoder>
644 bool PipelineThread::CreateDecoder() { 616 bool PipelineInternal::CreateDecoder() {
645 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 617 DCHECK_EQ(MessageLoop::current(), message_loop_);
646 DCHECK(IsPipelineOk()); 618 DCHECK(IsPipelineOk());
647 619
648 scoped_refptr<Demuxer> demuxer; 620 scoped_refptr<Demuxer> demuxer;
649 GetFilter(&demuxer); 621 GetFilter(&demuxer);
650 DCHECK(demuxer); 622 DCHECK(demuxer);
651 623
652 const std::string major_mime_type = Decoder::major_mime_type(); 624 const std::string major_mime_type = Decoder::major_mime_type();
653 const int num_outputs = demuxer->GetNumberOfStreams(); 625 const int num_outputs = demuxer->GetNumberOfStreams();
654 for (int i = 0; i < num_outputs; ++i) { 626 for (int i = 0; i < num_outputs; ++i) {
655 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); 627 scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
656 std::string value; 628 std::string value;
657 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && 629 if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
658 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { 630 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
659 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); 631 CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream);
660 return true; 632 return true;
661 } 633 }
662 } 634 }
663 return false; 635 return false;
664 } 636 }
665 637
666 template <class Decoder, class Renderer> 638 template <class Decoder, class Renderer>
667 bool PipelineThread::CreateRenderer() { 639 bool PipelineInternal::CreateRenderer() {
668 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 640 DCHECK_EQ(MessageLoop::current(), message_loop_);
669 DCHECK(IsPipelineOk()); 641 DCHECK(IsPipelineOk());
670 642
671 scoped_refptr<Decoder> decoder; 643 scoped_refptr<Decoder> decoder;
672 GetFilter(&decoder); 644 GetFilter(&decoder);
673 645
674 if (decoder) { 646 if (decoder) {
675 // If the decoder was created. 647 // If the decoder was created.
676 const std::string major_mime_type = Decoder::major_mime_type(); 648 const std::string major_mime_type = Decoder::major_mime_type();
677 CreateFilter<Renderer, Decoder>(filter_factory_, decoder); 649 CreateFilter<Renderer, Decoder>(filter_factory_, decoder);
678 return true; 650 return true;
679 } 651 }
680 return false; 652 return false;
681 } 653 }
682 654
683 template <class Filter> 655 template <class Filter>
684 void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { 656 void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const {
685 DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); 657 DCHECK_EQ(MessageLoop::current(), message_loop_);
686 658
687 *filter_out = NULL; 659 *filter_out = NULL;
688 for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); 660 for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
689 iter != filter_hosts_.end() && NULL == *filter_out; 661 iter != filter_hosts_.end() && NULL == *filter_out;
690 iter++) { 662 iter++) {
691 (*iter)->GetFilter(filter_out); 663 (*iter)->GetFilter(filter_out);
692 } 664 }
693 } 665 }
694 666
667 void PipelineInternal::DestroyFilters() {
668 // Stop every filter.
669 for (FilterHostVector::iterator iter = filter_hosts_.begin();
670 iter != filter_hosts_.end();
671 ++iter) {
672 (*iter)->Stop();
673 }
674
675 // Crude blocking counter implementation.
676 Lock lock;
677 ConditionVariable wait_for_zero(&lock);
678 int count = filter_threads_.size();
679
680 // Post a task to every filter's thread to ensure that they've completed their
681 // stopping logic before stopping the threads themselves.
682 //
683 // TODO(scherkus): again, Stop() should either be synchronous or we should
684 // receive a signal from filters that they have indeed stopped.
685 for (FilterThreadVector::iterator iter = filter_threads_.begin();
686 iter != filter_threads_.end();
687 ++iter) {
688 (*iter)->message_loop()->PostTask(FROM_HERE,
689 NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
690 }
691
692 // Wait on our "blocking counter".
693 {
694 AutoLock auto_lock(lock);
695 while (count > 0) {
696 wait_for_zero.Wait();
697 }
698 }
699
700 // Stop every running filter thread.
701 //
702 // TODO(scherkus): can we watchdog this section to detect wedged threads?
703 for (FilterThreadVector::iterator iter = filter_threads_.begin();
704 iter != filter_threads_.end();
705 ++iter) {
706 (*iter)->Stop();
707 }
708
709 // Reset the pipeline, which will decrement a reference to this object.
710 // We will get destroyed as soon as the remaining tasks finish executing.
711 // To be safe, we'll set our pipeline reference to NULL.
712 STLDeleteElements(&filter_hosts_);
713 STLDeleteElements(&filter_threads_);
714 }
715
695 } // namespace media 716 } // namespace media
OLDNEW
« no previous file with comments | « media/base/pipeline_impl.h ('k') | media/base/pipeline_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698