Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(400)

Side by Side Diff: media/filters/decoder_base.h

Issue 146068: Switching decoders to use the injected message loop. (Closed)
Patch Set: More fixes Created 11 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « media/base/pipeline_impl.cc ('k') | media/filters/ffmpeg_audio_decoder.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. Use of this 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. Use of this
2 // source code is governed by a BSD-style license that can be found in the 2 // source code is governed by a BSD-style license that can be found in the
3 // LICENSE file. 3 // LICENSE file.
4 4
5 // A base class that provides the plumbing for a decoder filters. 5 // A base class that provides the plumbing for a decoder filters.
6 6
7 #ifndef MEDIA_FILTERS_DECODER_BASE_H_ 7 #ifndef MEDIA_FILTERS_DECODER_BASE_H_
8 #define MEDIA_FILTERS_DECODER_BASE_H_ 8 #define MEDIA_FILTERS_DECODER_BASE_H_
9 9
10 #include <deque> 10 #include <deque>
11 11
12 #include "base/lock.h" 12 #include "base/lock.h"
13 #include "base/stl_util-inl.h"
13 #include "base/task.h" 14 #include "base/task.h"
14 #include "base/thread.h" 15 #include "base/thread.h"
15 #include "media/base/buffers.h" 16 #include "media/base/buffers.h"
16 #include "media/base/filters.h" 17 #include "media/base/filters.h"
17 #include "media/base/filter_host.h" 18 #include "media/base/filter_host.h"
18 19
19 namespace media { 20 namespace media {
20 21
21 template <class Decoder, class Output> 22 template <class Decoder, class Output>
22 class DecoderBase : public Decoder { 23 class DecoderBase : public Decoder {
23 public: 24 public:
24 typedef CallbackRunner< Tuple1<Output*> > ReadCallback; 25 typedef CallbackRunner< Tuple1<Output*> > ReadCallback;
25 26
26 // MediaFilter implementation. 27 // MediaFilter implementation.
27 virtual void Stop() { 28 virtual void Stop() {
28 OnStop(); 29 message_loop()->PostTask(FROM_HERE,
29 { 30 NewRunnableMethod(this, &DecoderBase::StopTask));
30 AutoLock auto_lock(lock_);
31 running_ = false;
32 if (process_task_) {
33 process_task_->Cancel();
34 process_task_ = NULL;
35 }
36 DiscardQueues_Locked();
37 }
38
39 // Stop our decoding thread.
40 thread_.Stop();
41 } 31 }
42 32
43 virtual void Seek(base::TimeDelta time) { 33 virtual void Seek(base::TimeDelta time) {
44 // Delegate to the subclass first. 34 message_loop()->PostTask(FROM_HERE,
45 OnSeek(time); 35 NewRunnableMethod(this, &DecoderBase::SeekTask, time));
46
47 // Flush the result queue.
48 AutoLock auto_lock(lock_);
49 result_queue_.clear();
50
51 // Flush the input queue. This will trigger more reads from the demuxer.
52 input_queue_.clear();
53
54 // Turn on the seeking flag so that we can discard buffers until a
55 // discontinuous buffer is received.
56 seeking_ = true;
57
58 // Trigger more reads and keep the process loop rolling.
59 ScheduleProcessTask_Locked();
60 } 36 }
61 37
62 // Decoder implementation. 38 // Decoder implementation.
63 virtual bool Initialize(DemuxerStream* demuxer_stream) { 39 virtual bool Initialize(DemuxerStream* demuxer_stream) {
64 demuxer_stream_ = demuxer_stream; 40 message_loop()->PostTask(FROM_HERE,
65 41 NewRunnableMethod(this, &DecoderBase::InitializeTask, demuxer_stream));
66 // Start our internal decoding thread.
67 if (!thread_.Start()) {
68 host()->Error(PIPELINE_ERROR_DECODE);
69 return false;
70 }
71
72 if (!OnInitialize(demuxer_stream)) {
73 // Release our resources and stop our thread.
74 // TODO(scherkus): shouldn't stop a thread inside Initialize(), but until I
75 // figure out proper error signaling semantics we're going to do it anyway !!
76 host()->Error(PIPELINE_ERROR_DECODE);
77 demuxer_stream_ = NULL;
78 thread_.Stop();
79 return false;
80 }
81
82 DCHECK(!media_format_.empty());
83 host()->InitializationComplete();
84 return true; 42 return true;
85 } 43 }
86 44
87 virtual const MediaFormat& media_format() { return media_format_; } 45 virtual const MediaFormat& media_format() { return media_format_; }
88 46
89 // Audio or Video decoder. 47 // Audio or video decoder.
90 virtual void Read(ReadCallback* read_callback) { 48 virtual void Read(ReadCallback* read_callback) {
91 AutoLock auto_lock(lock_); 49 message_loop()->PostTask(FROM_HERE,
92 if (IsRunning()) { 50 NewRunnableMethod(this, &DecoderBase::ReadTask, read_callback));
93 read_queue_.push_back(read_callback);
94 ScheduleProcessTask_Locked();
95 } else {
96 delete read_callback;
97 }
98 } 51 }
99 52
100 void OnReadComplete(Buffer* buffer) { 53 void OnReadComplete(Buffer* buffer) {
101 AutoLock auto_lock(lock_); 54 // Little bit of magic here to get NewRunnableMethod() to generate a Task
102 if (IsRunning()) { 55 // that holds onto a reference via scoped_refptr<>.
103 // Once the |seeking_| flag is set we ignore every buffers here 56 //
104 // until we receive a discontinuous buffer and we will turn off the 57 // TODO(scherkus): change the callback format to pass a scoped_refptr<> or
105 // |seeking_| flag. 58 // better yet see if we can get away with not using reference counting.
106 if (buffer->IsDiscontinuous()) { 59 scoped_refptr<Buffer> buffer_ref = buffer;
107 // TODO(hclam): put a DCHECK here to assert |seeking_| being true. 60 message_loop()->PostTask(FROM_HERE,
108 // I cannot do this now because seek operation is not fully 61 NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref));
109 // asynchronous. There may be pending seek requests even before the
110 // previous was finished.
111 seeking_ = false;
112 }
113 if (!seeking_)
114 input_queue_.push_back(buffer);
115 --pending_reads_;
116 ScheduleProcessTask_Locked();
117 }
118 } 62 }
119 63
120 protected: 64 protected:
121 // |thread_name| is mandatory and is used to identify the thread in debuggers. 65 DecoderBase()
122 explicit DecoderBase(const char* thread_name) 66 : pending_reads_(0),
123 : running_(true), 67 seeking_(false),
124 demuxer_stream_(NULL), 68 state_(UNINITIALIZED),
125 thread_(thread_name), 69 thread_id_(NULL) {
126 pending_reads_(0),
127 process_task_(NULL),
128 seeking_(false) {
129 } 70 }
130 71
131 virtual ~DecoderBase() { 72 virtual ~DecoderBase() {
132 DCHECK(!thread_.IsRunning()); 73 DCHECK(state_ == UNINITIALIZED || state_ == STOPPED);
133 DCHECK(!process_task_); 74 DCHECK(result_queue_.empty());
75 DCHECK(read_queue_.empty());
134 } 76 }
135 77
136 // This method is called by the derived class from within the OnDecode method. 78 // This method is called by the derived class from within the OnDecode method.
137 // It places an output buffer in the result queue. It must be called from 79 // It places an output buffer in the result queue. It must be called from
138 // within the OnDecode method. 80 // within the OnDecode method.
139 void EnqueueResult(Output* output) { 81 void EnqueueResult(Output* output) {
140 AutoLock auto_lock(lock_); 82 DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
141 if (IsRunning()) { 83 if (!stopped_) {
142 result_queue_.push_back(output); 84 result_queue_.push_back(output);
143 } 85 }
144 } 86 }
145 87
146 // Method that must be implemented by the derived class. Called from within 88 // Method that must be implemented by the derived class. Called from within
147 // the DecoderBase::Initialize() method before any reads are submitted to 89 // the DecoderBase::Initialize() method before any reads are submitted to
148 // the demuxer stream. Returns true if successful, otherwise false indicates 90 // the demuxer stream. Returns true if successful, otherwise false indicates
149 // a fatal error. The derived class should NOT call the filter host's 91 // a fatal error. The derived class should NOT call the filter host's
150 // InitializationComplete() method. If this method returns true, then the 92 // InitializationComplete() method. If this method returns true, then the
151 // base class will call the host to complete initialization. During this 93 // base class will call the host to complete initialization. During this
152 // call, the derived class must fill in the media_format_ member. 94 // call, the derived class must fill in the media_format_ member.
153 virtual bool OnInitialize(DemuxerStream* demuxer_stream) = 0; 95 virtual bool OnInitialize(DemuxerStream* demuxer_stream) = 0;
154 96
155 // Method that may be implemented by the derived class if desired. It will 97 // Method that may be implemented by the derived class if desired. It will
156 // be called from within the MediaFilter::Stop() method prior to stopping the 98 // be called from within the MediaFilter::Stop() method prior to stopping the
157 // base class. 99 // base class.
158 virtual void OnStop() {} 100 virtual void OnStop() {}
159 101
160 // Derived class can implement this method and perform seeking logic prior 102 // Derived class can implement this method and perform seeking logic prior
161 // to the base class. 103 // to the base class.
162 virtual void OnSeek(base::TimeDelta time) {} 104 virtual void OnSeek(base::TimeDelta time) {}
163 105
164 // Method that must be implemented by the derived class. If the decode 106 // Method that must be implemented by the derived class. If the decode
165 // operation produces one or more outputs, the derived class should call 107 // operation produces one or more outputs, the derived class should call
166 // the EnequeueResult() method from within this method. 108 // the EnequeueResult() method from within this method.
167 virtual void OnDecode(Buffer* input) = 0; 109 virtual void OnDecode(Buffer* input) = 0;
168 110
169 bool IsRunning() const { return running_; } 111 // Used for subclasses who friend unit tests and need to set the thread id.
112 virtual void set_thread_id(PlatformThreadId thread_id) {
113 thread_id_ = thread_id;
114 }
170 115
171 MediaFormat media_format_; 116 MediaFormat media_format_;
172 117
173 private: 118 private:
174 // The GCL compiler does not like .cc files that directly access members of 119 // GCC doesn't let us access superclass member variables directly, so use
175 // a base class. This inline method helps. 120 // a helper to get around the situation.
121 //
122 // TODO(scherkus): another reason to add protected accessors to MediaFilter.
176 FilterHost* host() const { return Decoder::host_; } 123 FilterHost* host() const { return Decoder::host_; }
124 MessageLoop* message_loop() const { return Decoder::message_loop_; }
177 125
178 // Schedules a task that will execute the ProcessTask method. 126 void StopTask() {
179 void ScheduleProcessTask_Locked() { 127 DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
180 lock_.AssertAcquired(); 128 // Delegate to the subclass first.
181 DCHECK(IsRunning()); 129 OnStop();
182 if (!process_task_) { 130
183 process_task_ = NewRunnableMethod(this, &DecoderBase::ProcessTask); 131 // Throw away all buffers in all queues.
184 thread_.message_loop()->PostTask(FROM_HERE, process_task_); 132 result_queue_.clear();
133 STLDeleteElements(&read_queue_);
134 state_ = STOPPED;
135 }
136
137 void SeekTask(base::TimeDelta time) {
138 DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
139 // Delegate to the subclass first.
140 OnSeek(time);
141
142 // Flush the result queue.
143 result_queue_.clear();
144
145 // Turn on the seeking flag so that we can discard buffers until a
146 // discontinuous buffer is received.
147 seeking_ = true;
148 }
149
150 void InitializeTask(DemuxerStream* demuxer_stream) {
151 DCHECK(state_ == UNINITIALIZED);
152 DCHECK(!demuxer_stream_);
153 DCHECK(!thread_id_ || thread_id_ == PlatformThread::CurrentId());
154 demuxer_stream_ = demuxer_stream;
155
156 // Grab the thread id for debugging.
157 thread_id_ = PlatformThread::CurrentId();
158
159 // Delegate to subclass first.
160 if (!OnInitialize(demuxer_stream_)) {
161 host()->Error(PIPELINE_ERROR_DECODE);
162 return;
163 }
164
165 // TODO(scherkus): subclass shouldn't mutate superclass media format.
166 DCHECK(!media_format_.empty()) << "Subclass did not set media_format_";
167 state_ = INITIALIZED;
168 host()->InitializationComplete();
169 }
170
171 void ReadTask(ReadCallback* read_callback) {
172 DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
173 // TODO(scherkus): should reply with a null operation (empty buffer).
174 if (stopped_) {
175 delete read_callback;
176 return;
177 }
178
179 // Enqueue the callback and attempt to fulfill it immediately.
180 read_queue_.push_back(read_callback);
181 FulfillPendingRead();
182
183 // Issue reads as necessary.
184 while (pending_reads_ < read_queue_.size()) {
185 demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
186 ++pending_reads_;
185 } 187 }
186 } 188 }
187 189
188 // The core work loop of the decoder base. This method will run the methods 190 void ReadCompleteTask(scoped_refptr<Buffer> buffer) {
189 // SubmitReads_Locked(), ProcessInput_Locked(), and ProcessOutput_Locked() in 191 DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
190 // a loop until they either produce no further work, or the filter is stopped. 192 DCHECK_GT(pending_reads_, 0u);
191 // Once there is no further work to do, the method returns. A later call to 193 --pending_reads_;
192 // the ScheduleProcessTask_Locked() method will start this task again. 194 if (stopped_) {
193 void ProcessTask() { 195 return;
194 AutoLock auto_lock(lock_); 196 }
195 bool did_some_work;
196 do {
197 did_some_work = SubmitReads_Locked();
198 did_some_work |= ProcessInput_Locked();
199 did_some_work |= ProcessOutput_Locked();
200 } while (IsRunning() && did_some_work);
201 DCHECK(process_task_ || !IsRunning());
202 process_task_ = NULL;
203 }
204 197
205 // If necessary, calls the |demuxer_stream_| to read buffers. Returns true 198 // Once the |seeking_| flag is set we ignore every buffers here
206 // if reads have happened, else false. This method must be called with 199 // until we receive a discontinuous buffer and we will turn off the
207 // |lock_| acquired. If the method submits any reads, then it will Release() 200 // |seeking_| flag.
208 // the |lock_| when calling the demuxer and then re-Acquire() the |lock_|. 201 if (buffer->IsDiscontinuous()) {
209 bool SubmitReads_Locked() { 202 // TODO(hclam): put a DCHECK here to assert |seeking_| being true.
210 lock_.AssertAcquired(); 203 // I cannot do this now because seek operation is not fully
211 bool did_read = false; 204 // asynchronous. There may be pending seek requests even before the
212 if (IsRunning() && 205 // previous was finished.
213 pending_reads_ + input_queue_.size() < read_queue_.size()) { 206 seeking_ = false;
214 did_read = true;
215 size_t read = read_queue_.size() - pending_reads_ - input_queue_.size();
216 pending_reads_ += read;
217 {
218 AutoUnlock unlock(lock_);
219 while (read) {
220 demuxer_stream_->
221 Read(NewCallback(this, &DecoderBase::OnReadComplete));
222 --read;
223 }
224 }
225 } 207 }
226 return did_read; 208 if (seeking_) {
227 } 209 return;
210 }
228 211
229 // If the |input_queue_| has any buffers, this method will call the derived 212 // Decode the frame right away.
230 // class's OnDecode() method. 213 OnDecode(buffer);
231 bool ProcessInput_Locked() {
232 lock_.AssertAcquired();
233 bool did_decode = false;
234 while (IsRunning() && !input_queue_.empty()) {
235 did_decode = true;
236 scoped_refptr<Buffer> input = input_queue_.front();
237 input_queue_.pop_front();
238 // Release |lock_| before calling the derived class to do the decode.
239 {
240 AutoUnlock unlock(lock_);
241 OnDecode(input);
242 }
243 }
244 return did_decode;
245 }
246 214
247 // Removes any buffers from the |result_queue_| and calls the next callback 215 // Attempt to fulfill a pending read callback and schedule additional reads
248 // in the |read_queue_|. 216 // if necessary.
249 bool ProcessOutput_Locked() { 217 FulfillPendingRead();
250 lock_.AssertAcquired();
251 bool called_renderer = false;
252 while (IsRunning() && !read_queue_.empty() && !result_queue_.empty()) {
253 called_renderer = true;
254 scoped_refptr<Output> output = result_queue_.front();
255 result_queue_.pop_front();
256 scoped_ptr<ReadCallback> read_callback(read_queue_.front());
257 read_queue_.pop_front();
258 // Release |lock_| before calling the renderer.
259 {
260 AutoUnlock unlock(lock_);
261 read_callback->Run(output);
262 }
263 }
264 return called_renderer;
265 }
266 218
267 // Throw away all buffers in all queues. 219 // Issue reads as necessary.
268 void DiscardQueues_Locked() { 220 //
269 lock_.AssertAcquired(); 221 // Note that it's possible for us to decode but not produce a frame, in
270 input_queue_.clear(); 222 // which case |pending_reads_| will remain less than |read_queue_| so we
271 result_queue_.clear(); 223 // need to schedule an additional read.
272 while (!read_queue_.empty()) { 224 DCHECK_LE(pending_reads_, read_queue_.size());
273 delete read_queue_.front(); 225 while (pending_reads_ < read_queue_.size()) {
274 read_queue_.pop_front(); 226 demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
227 ++pending_reads_;
275 } 228 }
276 } 229 }
277 230
278 // The critical section for the decoder. 231 // Attempts to fulfill a single pending read by dequeuing a buffer and read
279 Lock lock_; 232 // callback pair and executing the callback.
233 void FulfillPendingRead() {
234 DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
235 if (read_queue_.empty() || result_queue_.empty()) {
236 return;
237 }
280 238
281 // If false, then the Stop() method has been called, and no further processing 239 // Dequeue a frame and read callback pair.
282 // of buffers should occur. 240 scoped_refptr<Output> output = result_queue_.front();
283 bool running_; 241 scoped_ptr<ReadCallback> read_callback(read_queue_.front());
242 result_queue_.pop_front();
243 read_queue_.pop_front();
244
245 // Execute the callback!
246 read_callback->Run(output);
247 }
248
249 // Tracks the number of asynchronous reads issued to |demuxer_stream_|.
250 // Using size_t since it is always compared against deque::size().
251 size_t pending_reads_;
252
253 // If true, then Stop() has been called and no further processing of buffers
254 // should occur.
255 bool stopped_;
256
257 // An internal state of the decoder that indicates that are waiting for seek
258 // to complete. We expect to receive a discontinuous frame/packet from the
259 // demuxer to signal that seeking is completed.
260 bool seeking_;
284 261
285 // Pointer to the demuxer stream that will feed us compressed buffers. 262 // Pointer to the demuxer stream that will feed us compressed buffers.
286 scoped_refptr<DemuxerStream> demuxer_stream_; 263 scoped_refptr<DemuxerStream> demuxer_stream_;
287 264
288 // The dedicated decoding thread for this filter.
289 base::Thread thread_;
290
291 // Number of times we have called Read() on the demuxer that have not yet
292 // been satisfied.
293 size_t pending_reads_;
294
295 CancelableTask* process_task_;
296
297 // Queue of buffers read from the |demuxer_stream_|.
298 typedef std::deque< scoped_refptr<Buffer> > InputQueue;
299 InputQueue input_queue_;
300
301 // Queue of decoded samples produced in the OnDecode() method of the decoder. 265 // Queue of decoded samples produced in the OnDecode() method of the decoder.
302 // Any samples placed in this queue will be assigned to the OutputQueue 266 // Any samples placed in this queue will be assigned to the OutputQueue
303 // buffers once the OnDecode() method returns. 267 // buffers once the OnDecode() method returns.
268 //
304 // TODO(ralphl): Eventually we want to have decoders get their destination 269 // TODO(ralphl): Eventually we want to have decoders get their destination
305 // buffer from the OutputQueue and write to it directly. Until we change 270 // buffer from the OutputQueue and write to it directly. Until we change
306 // from the Assignable buffer to callbacks and renderer-allocated buffers, 271 // from the Assignable buffer to callbacks and renderer-allocated buffers,
307 // we need this extra queue. 272 // we need this extra queue.
308 typedef std::deque< scoped_refptr<Output> > ResultQueue; 273 typedef std::deque<scoped_refptr<Output> > ResultQueue;
309 ResultQueue result_queue_; 274 ResultQueue result_queue_;
310 275
311 // Queue of callbacks supplied by the renderer through the Read() method. 276 // Queue of callbacks supplied by the renderer through the Read() method.
312 typedef std::deque<ReadCallback*> ReadQueue; 277 typedef std::deque<ReadCallback*> ReadQueue;
313 ReadQueue read_queue_; 278 ReadQueue read_queue_;
314 279
315 // An internal state of the decoder that indicates that are waiting for seek 280 // Simple state tracking variable.
316 // to complete. We expect to receive a discontinuous frame/packet from the 281 enum State {
317 // demuxer to signal that seeking is completed. 282 UNINITIALIZED,
318 bool seeking_; 283 INITIALIZED,
284 STOPPED,
285 };
286 State state_;
287
288 // Used for debugging.
289 PlatformThreadId thread_id_;
319 290
320 DISALLOW_COPY_AND_ASSIGN(DecoderBase); 291 DISALLOW_COPY_AND_ASSIGN(DecoderBase);
321 }; 292 };
322 293
323 } // namespace media 294 } // namespace media
324 295
325 #endif // MEDIA_FILTERS_DECODER_BASE_H_ 296 #endif // MEDIA_FILTERS_DECODER_BASE_H_
OLDNEW
« no previous file with comments | « media/base/pipeline_impl.cc ('k') | media/filters/ffmpeg_audio_decoder.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698