OLD | NEW |
| (Empty) |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 // A base class that provides the plumbing for a decoder filters. | |
6 | |
7 #ifndef MEDIA_FILTERS_DECODER_BASE_H_ | |
8 #define MEDIA_FILTERS_DECODER_BASE_H_ | |
9 | |
10 #include <deque> | |
11 | |
12 #include "base/bind.h" | |
13 #include "base/callback.h" | |
14 #include "base/stl_util.h" | |
15 #include "base/task.h" | |
16 #include "base/threading/thread.h" | |
17 #include "media/base/buffers.h" | |
18 #include "media/base/callback.h" | |
19 #include "media/base/demuxer_stream.h" | |
20 #include "media/base/filters.h" | |
21 #include "media/base/filter_host.h" | |
22 | |
23 namespace media { | |
24 | |
25 // In this class, we over-specify method lookup via this-> to avoid unexpected | |
26 // name resolution issues due to the two-phase lookup needed for dependent | |
27 // name resolution in templates. | |
28 template <class Decoder, class Output> | |
29 class DecoderBase : public Decoder { | |
30 public: | |
31 // Filter implementation. | |
32 virtual void Stop(FilterCallback* callback) { | |
33 message_loop_->PostTask( | |
34 FROM_HERE, | |
35 NewRunnableMethod(this, &DecoderBase::StopTask, callback)); | |
36 } | |
37 | |
38 virtual void Seek(base::TimeDelta time, const FilterStatusCB& cb) { | |
39 message_loop_->PostTask( | |
40 FROM_HERE, | |
41 NewRunnableMethod(this, &DecoderBase::SeekTask, time, cb)); | |
42 } | |
43 | |
44 // Decoder implementation. | |
45 virtual void Initialize(DemuxerStream* demuxer_stream, | |
46 FilterCallback* callback, | |
47 StatisticsCallback* stats_callback) { | |
48 statistics_callback_.reset(stats_callback); | |
49 message_loop_->PostTask( | |
50 FROM_HERE, | |
51 NewRunnableMethod(this, | |
52 &DecoderBase::InitializeTask, | |
53 make_scoped_refptr(demuxer_stream), | |
54 callback)); | |
55 } | |
56 | |
57 // TODO(scherkus): Since FFmpegAudioDecoder is the only subclass this is a | |
58 // temporary hack until I finish removing DecoderBase. | |
59 void PostReadTaskHack(scoped_refptr<Output> output) { | |
60 message_loop_->PostTask(FROM_HERE, | |
61 NewRunnableMethod(this, &DecoderBase::ReadTask, output)); | |
62 } | |
63 | |
64 protected: | |
65 explicit DecoderBase(MessageLoop* message_loop) | |
66 : message_loop_(message_loop), | |
67 pending_reads_(0), | |
68 pending_requests_(0), | |
69 state_(kUninitialized) { | |
70 } | |
71 | |
72 virtual ~DecoderBase() { | |
73 DCHECK(state_ == kUninitialized || state_ == kStopped); | |
74 DCHECK(result_queue_.empty()); | |
75 } | |
76 | |
77 // This method is called by the derived class from within the OnDecode method. | |
78 // It places an output buffer in the result queue. It must be called from | |
79 // within the OnDecode method. | |
80 void EnqueueResult(Output* output) { | |
81 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
82 if (!IsStopped()) { | |
83 result_queue_.push_back(output); | |
84 } | |
85 } | |
86 | |
87 // TODO(ajwong): All these "Task*" used as completion callbacks should be | |
88 // FilterCallbacks. However, since NewCallback() cannot prebind parameters, | |
89 // we use NewRunnableMethod() instead which causes an unfortunate refcount. | |
90 // We should move stoyan's Mutant code into base/task.h and turn these | |
91 // back into FilterCallbacks. | |
92 | |
93 // Method that must be implemented by the derived class. Called from within | |
94 // the DecoderBase::Initialize() method before any reads are submitted to | |
95 // the demuxer stream. Returns true if successful, otherwise false indicates | |
96 // a fatal error. The derived class should NOT call the filter host's | |
97 // InitializationComplete() method. If this method returns true, then the | |
98 // base class will call the host to complete initialization. | |
99 virtual void DoInitialize(DemuxerStream* demuxer_stream, bool* success, | |
100 Task* done_cb) = 0; | |
101 | |
102 // Method that may be implemented by the derived class if desired. It will | |
103 // be called from within the Filter::Stop() method prior to stopping the | |
104 // base class. | |
105 virtual void DoStop(Task* done_cb) { | |
106 base::ScopedTaskRunner run_done_cb(done_cb); | |
107 } | |
108 | |
109 // Derived class can implement this method and perform seeking logic prior | |
110 // to the base class. | |
111 virtual void DoSeek(base::TimeDelta time, Task* done_cb) = 0; | |
112 | |
113 // Method that must be implemented by the derived class. If the decode | |
114 // operation produces one or more outputs, the derived class should call | |
115 // the EnequeueResult() method from within this method. | |
116 virtual void DoDecode(Buffer* input) = 0; | |
117 | |
118 void OnDecodeComplete(const PipelineStatistics& statistics) { | |
119 statistics_callback_->Run(statistics); | |
120 | |
121 // Attempt to fulfill a pending read callback and schedule additional reads | |
122 // if necessary. | |
123 bool fulfilled = FulfillPendingRead(); | |
124 | |
125 // Issue reads as necessary. | |
126 // | |
127 // Note that it's possible for us to decode but not produce a frame, in | |
128 // which case |pending_reads_| will remain less than |read_queue_| so we | |
129 // need to schedule an additional read. | |
130 DCHECK_LE(pending_reads_, pending_requests_); | |
131 if (!fulfilled) { | |
132 DCHECK_LT(pending_reads_, pending_requests_); | |
133 ++pending_reads_; | |
134 demuxer_stream_->Read(base::Bind(&DecoderBase::OnReadComplete, this)); | |
135 } | |
136 } | |
137 | |
138 // Provide access to subclasses. | |
139 MessageLoop* message_loop() { return message_loop_; } | |
140 | |
141 private: | |
142 bool IsStopped() { return state_ == kStopped; } | |
143 | |
144 void OnReadComplete(Buffer* buffer) { | |
145 // Little bit of magic here to get NewRunnableMethod() to generate a Task | |
146 // that holds onto a reference via scoped_refptr<>. | |
147 // | |
148 // TODO(scherkus): change the callback format to pass a scoped_refptr<> or | |
149 // better yet see if we can get away with not using reference counting. | |
150 scoped_refptr<Buffer> buffer_ref = buffer; | |
151 message_loop_->PostTask(FROM_HERE, | |
152 NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref)); | |
153 } | |
154 | |
155 void StopTask(FilterCallback* callback) { | |
156 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
157 | |
158 // Delegate to the subclass first. | |
159 DoStop(NewRunnableMethod(this, &DecoderBase::OnStopComplete, callback)); | |
160 } | |
161 | |
162 void OnStopComplete(FilterCallback* callback) { | |
163 // Throw away all buffers in all queues. | |
164 result_queue_.clear(); | |
165 state_ = kStopped; | |
166 | |
167 if (callback) { | |
168 callback->Run(); | |
169 delete callback; | |
170 } | |
171 } | |
172 | |
173 void SeekTask(base::TimeDelta time, const FilterStatusCB& cb) { | |
174 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
175 DCHECK_EQ(0u, pending_reads_) << "Pending reads should have completed"; | |
176 DCHECK_EQ(0u, pending_requests_) << "Pending requests should be empty"; | |
177 | |
178 // Delegate to the subclass first. | |
179 DoSeek(time, NewRunnableMethod(this, &DecoderBase::OnSeekComplete, cb)); | |
180 } | |
181 | |
182 void OnSeekComplete(const FilterStatusCB& cb) { | |
183 // Flush our decoded results. | |
184 result_queue_.clear(); | |
185 | |
186 // Signal that we're done seeking. | |
187 if (!cb.is_null()) | |
188 cb.Run(PIPELINE_OK); | |
189 } | |
190 | |
191 void InitializeTask(DemuxerStream* demuxer_stream, | |
192 FilterCallback* callback) { | |
193 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
194 CHECK(kUninitialized == state_); | |
195 CHECK(!demuxer_stream_); | |
196 demuxer_stream_ = demuxer_stream; | |
197 | |
198 bool* success = new bool; | |
199 DoInitialize(demuxer_stream, | |
200 success, | |
201 NewRunnableMethod(this, &DecoderBase::OnInitializeComplete, | |
202 success, callback)); | |
203 } | |
204 | |
205 void OnInitializeComplete(bool* success, FilterCallback* done_cb) { | |
206 // Note: The done_runner must be declared *last* to ensure proper | |
207 // destruction order. | |
208 scoped_ptr<bool> success_deleter(success); | |
209 AutoCallbackRunner done_runner(done_cb); | |
210 | |
211 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
212 // Delegate to subclass first. | |
213 if (!*success) { | |
214 this->host()->SetError(PIPELINE_ERROR_DECODE); | |
215 } else { | |
216 state_ = kInitialized; | |
217 } | |
218 } | |
219 | |
220 void ReadTask(scoped_refptr<Output> output) { | |
221 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
222 | |
223 // TODO(scherkus): should reply with a null operation (empty buffer). | |
224 if (IsStopped()) | |
225 return; | |
226 | |
227 ++pending_requests_; | |
228 | |
229 // Try to fulfill it immediately. | |
230 if (FulfillPendingRead()) | |
231 return; | |
232 | |
233 // Since we can't fulfill a read request now then submit a read | |
234 // request to the demuxer stream. | |
235 ++pending_reads_; | |
236 demuxer_stream_->Read(base::Bind(&DecoderBase::OnReadComplete, this)); | |
237 } | |
238 | |
239 void ReadCompleteTask(scoped_refptr<Buffer> buffer) { | |
240 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
241 DCHECK_GT(pending_reads_, 0u); | |
242 --pending_reads_; | |
243 if (IsStopped()) { | |
244 return; | |
245 } | |
246 | |
247 // Decode the frame right away. | |
248 DoDecode(buffer); | |
249 } | |
250 | |
251 // Attempts to fulfill a single pending read by dequeuing a buffer and read | |
252 // callback pair and executing the callback. | |
253 // | |
254 // Return true if one read request is fulfilled. | |
255 bool FulfillPendingRead() { | |
256 DCHECK_EQ(MessageLoop::current(), message_loop_); | |
257 if (!pending_requests_ || result_queue_.empty()) { | |
258 return false; | |
259 } | |
260 | |
261 // Dequeue a frame and read callback pair. | |
262 scoped_refptr<Output> output = result_queue_.front(); | |
263 result_queue_.pop_front(); | |
264 | |
265 // Execute the callback! | |
266 --pending_requests_; | |
267 | |
268 // TODO(scherkus): Since FFmpegAudioDecoder is the only subclass this is a | |
269 // temporary hack until I finish removing DecoderBase. | |
270 Decoder::ConsumeAudioSamples(output); | |
271 return true; | |
272 } | |
273 | |
274 MessageLoop* message_loop_; | |
275 | |
276 // Tracks the number of asynchronous reads issued to |demuxer_stream_|. | |
277 // Using size_t since it is always compared against deque::size(). | |
278 size_t pending_reads_; | |
279 // Tracks the number of asynchronous reads issued from renderer. | |
280 size_t pending_requests_; | |
281 | |
282 // Pointer to the demuxer stream that will feed us compressed buffers. | |
283 scoped_refptr<DemuxerStream> demuxer_stream_; | |
284 | |
285 // Queue of decoded samples produced in the OnDecode() method of the decoder. | |
286 // Any samples placed in this queue will be assigned to the OutputQueue | |
287 // buffers once the OnDecode() method returns. | |
288 // | |
289 // TODO(ralphl): Eventually we want to have decoders get their destination | |
290 // buffer from the OutputQueue and write to it directly. Until we change | |
291 // from the Assignable buffer to callbacks and renderer-allocated buffers, | |
292 // we need this extra queue. | |
293 typedef std::deque<scoped_refptr<Output> > ResultQueue; | |
294 ResultQueue result_queue_; | |
295 | |
296 // Simple state tracking variable. | |
297 enum State { | |
298 kUninitialized, | |
299 kInitialized, | |
300 kStopped, | |
301 }; | |
302 State state_; | |
303 | |
304 // Callback to update pipeline statistics. | |
305 scoped_ptr<StatisticsCallback> statistics_callback_; | |
306 | |
307 DISALLOW_COPY_AND_ASSIGN(DecoderBase); | |
308 }; | |
309 | |
310 } // namespace media | |
311 | |
312 #endif // MEDIA_FILTERS_DECODER_BASE_H_ | |
OLD | NEW |