OLD | NEW |
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // A base class that provides the plumbing for a decoder filters. | 5 // A base class that provides the plumbing for a decoder filters. |
6 | 6 |
7 #ifndef MEDIA_FILTERS_DECODER_BASE_H_ | 7 #ifndef MEDIA_FILTERS_DECODER_BASE_H_ |
8 #define MEDIA_FILTERS_DECODER_BASE_H_ | 8 #define MEDIA_FILTERS_DECODER_BASE_H_ |
9 | 9 |
10 #include <deque> | 10 #include <deque> |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
45 virtual void Initialize(DemuxerStream* demuxer_stream, | 45 virtual void Initialize(DemuxerStream* demuxer_stream, |
46 FilterCallback* callback) { | 46 FilterCallback* callback) { |
47 this->message_loop()->PostTask(FROM_HERE, | 47 this->message_loop()->PostTask(FROM_HERE, |
48 NewRunnableMethod(this, &DecoderBase::InitializeTask, demuxer_stream, | 48 NewRunnableMethod(this, &DecoderBase::InitializeTask, demuxer_stream, |
49 callback)); | 49 callback)); |
50 } | 50 } |
51 | 51 |
52 virtual const MediaFormat& media_format() { return media_format_; } | 52 virtual const MediaFormat& media_format() { return media_format_; } |
53 | 53 |
54 // Audio or video decoder. | 54 // Audio or video decoder. |
55 virtual void Read(ReadCallback* read_callback) { | 55 virtual void FillThisBuffer(scoped_refptr<Output> output) { |
56 this->message_loop()->PostTask(FROM_HERE, | 56 this->message_loop()->PostTask(FROM_HERE, |
57 NewRunnableMethod(this, &DecoderBase::ReadTask, read_callback)); | 57 NewRunnableMethod(this, &DecoderBase::ReadTask, output)); |
58 } | 58 } |
59 | 59 |
60 protected: | 60 protected: |
61 DecoderBase() | 61 DecoderBase() |
62 : pending_reads_(0), | 62 : pending_reads_(0), |
| 63 pending_requests_(0), |
63 expecting_discontinuous_(false), | 64 expecting_discontinuous_(false), |
64 state_(kUninitialized) { | 65 state_(kUninitialized) { |
65 } | 66 } |
66 | 67 |
67 virtual ~DecoderBase() { | 68 virtual ~DecoderBase() { |
68 DCHECK(state_ == kUninitialized || state_ == kStopped); | 69 DCHECK(state_ == kUninitialized || state_ == kStopped); |
69 DCHECK(result_queue_.empty()); | 70 DCHECK(result_queue_.empty()); |
70 DCHECK(read_queue_.empty()); | |
71 } | 71 } |
72 | 72 |
73 // This method is called by the derived class from within the OnDecode method. | 73 // This method is called by the derived class from within the OnDecode method. |
74 // It places an output buffer in the result queue. It must be called from | 74 // It places an output buffer in the result queue. It must be called from |
75 // within the OnDecode method. | 75 // within the OnDecode method. |
76 void EnqueueResult(Output* output) { | 76 void EnqueueResult(Output* output) { |
77 DCHECK_EQ(MessageLoop::current(), this->message_loop()); | 77 DCHECK_EQ(MessageLoop::current(), this->message_loop()); |
78 if (!IsStopped()) { | 78 if (!IsStopped()) { |
79 result_queue_.push_back(output); | 79 result_queue_.push_back(output); |
80 } | 80 } |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
117 void OnDecodeComplete() { | 117 void OnDecodeComplete() { |
118 // Attempt to fulfill a pending read callback and schedule additional reads | 118 // Attempt to fulfill a pending read callback and schedule additional reads |
119 // if necessary. | 119 // if necessary. |
120 bool fulfilled = FulfillPendingRead(); | 120 bool fulfilled = FulfillPendingRead(); |
121 | 121 |
122 // Issue reads as necessary. | 122 // Issue reads as necessary. |
123 // | 123 // |
124 // Note that it's possible for us to decode but not produce a frame, in | 124 // Note that it's possible for us to decode but not produce a frame, in |
125 // which case |pending_reads_| will remain less than |read_queue_| so we | 125 // which case |pending_reads_| will remain less than |read_queue_| so we |
126 // need to schedule an additional read. | 126 // need to schedule an additional read. |
127 DCHECK_LE(pending_reads_, read_queue_.size()); | 127 DCHECK_LE(pending_reads_, pending_requests_); |
128 if (!fulfilled) { | 128 if (!fulfilled) { |
129 DCHECK_LT(pending_reads_, read_queue_.size()); | 129 DCHECK_LT(pending_reads_, pending_requests_); |
130 demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete)); | 130 demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete)); |
131 ++pending_reads_; | 131 ++pending_reads_; |
132 } | 132 } |
133 } | 133 } |
134 | 134 |
135 private: | 135 private: |
136 bool IsStopped() { return state_ == kStopped; } | 136 bool IsStopped() { return state_ == kStopped; } |
137 | 137 |
138 void OnReadComplete(Buffer* buffer) { | 138 void OnReadComplete(Buffer* buffer) { |
139 // Little bit of magic here to get NewRunnableMethod() to generate a Task | 139 // Little bit of magic here to get NewRunnableMethod() to generate a Task |
140 // that holds onto a reference via scoped_refptr<>. | 140 // that holds onto a reference via scoped_refptr<>. |
141 // | 141 // |
142 // TODO(scherkus): change the callback format to pass a scoped_refptr<> or | 142 // TODO(scherkus): change the callback format to pass a scoped_refptr<> or |
143 // better yet see if we can get away with not using reference counting. | 143 // better yet see if we can get away with not using reference counting. |
144 scoped_refptr<Buffer> buffer_ref = buffer; | 144 scoped_refptr<Buffer> buffer_ref = buffer; |
145 this->message_loop()->PostTask(FROM_HERE, | 145 this->message_loop()->PostTask(FROM_HERE, |
146 NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref)); | 146 NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref)); |
147 } | 147 } |
148 | 148 |
149 void StopTask() { | 149 void StopTask() { |
150 DCHECK_EQ(MessageLoop::current(), this->message_loop()); | 150 DCHECK_EQ(MessageLoop::current(), this->message_loop()); |
151 | 151 |
152 // Delegate to the subclass first. | 152 // Delegate to the subclass first. |
153 DoStop(); | 153 DoStop(); |
154 | 154 |
155 // Throw away all buffers in all queues. | 155 // Throw away all buffers in all queues. |
156 result_queue_.clear(); | 156 result_queue_.clear(); |
157 STLDeleteElements(&read_queue_); | |
158 state_ = kStopped; | 157 state_ = kStopped; |
159 } | 158 } |
160 | 159 |
161 void SeekTask(base::TimeDelta time, FilterCallback* callback) { | 160 void SeekTask(base::TimeDelta time, FilterCallback* callback) { |
162 DCHECK_EQ(MessageLoop::current(), this->message_loop()); | 161 DCHECK_EQ(MessageLoop::current(), this->message_loop()); |
163 DCHECK_EQ(0u, pending_reads_) << "Pending reads should have completed"; | 162 DCHECK_EQ(0u, pending_reads_) << "Pending reads should have completed"; |
164 DCHECK(read_queue_.empty()) << "Read requests should be empty"; | 163 DCHECK_EQ(0u, pending_requests_) << "Pending requests should be empty"; |
165 | 164 |
166 // Delegate to the subclass first. | 165 // Delegate to the subclass first. |
167 // | 166 // |
168 // TODO(scherkus): if we have the strong assertion that there are no pending | 167 // TODO(scherkus): if we have the strong assertion that there are no pending |
169 // reads in the entire pipeline when we receive Seek(), subclasses could | 168 // reads in the entire pipeline when we receive Seek(), subclasses could |
170 // either flush their buffers here or wait for IsDiscontinuous(). I'm | 169 // either flush their buffers here or wait for IsDiscontinuous(). I'm |
171 // inclined to say that they should still wait for IsDiscontinuous() so they | 170 // inclined to say that they should still wait for IsDiscontinuous() so they |
172 // don't have duplicated logic for Seek() and actual discontinuous frames. | 171 // don't have duplicated logic for Seek() and actual discontinuous frames. |
173 DoSeek(time, | 172 DoSeek(time, |
174 NewRunnableMethod(this, &DecoderBase::OnSeekComplete, callback)); | 173 NewRunnableMethod(this, &DecoderBase::OnSeekComplete, callback)); |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
208 // Delegate to subclass first. | 207 // Delegate to subclass first. |
209 if (!*success) { | 208 if (!*success) { |
210 this->host()->SetError(PIPELINE_ERROR_DECODE); | 209 this->host()->SetError(PIPELINE_ERROR_DECODE); |
211 } else { | 210 } else { |
212 // TODO(scherkus): subclass shouldn't mutate superclass media format. | 211 // TODO(scherkus): subclass shouldn't mutate superclass media format. |
213 DCHECK(!media_format_.empty()) << "Subclass did not set media_format_"; | 212 DCHECK(!media_format_.empty()) << "Subclass did not set media_format_"; |
214 state_ = kInitialized; | 213 state_ = kInitialized; |
215 } | 214 } |
216 } | 215 } |
217 | 216 |
218 void ReadTask(ReadCallback* read_callback) { | 217 void ReadTask(scoped_refptr<Output> output) { |
219 DCHECK_EQ(MessageLoop::current(), this->message_loop()); | 218 DCHECK_EQ(MessageLoop::current(), this->message_loop()); |
220 | 219 |
221 // TODO(scherkus): should reply with a null operation (empty buffer). | 220 // TODO(scherkus): should reply with a null operation (empty buffer). |
222 if (IsStopped()) { | 221 if (IsStopped()) |
223 delete read_callback; | |
224 return; | 222 return; |
225 } | |
226 | 223 |
227 // Enqueue the callback and attempt to fulfill it immediately. | 224 ++pending_requests_; |
228 read_queue_.push_back(read_callback); | 225 |
| 226 // Try to fulfill it immediately. |
229 if (FulfillPendingRead()) | 227 if (FulfillPendingRead()) |
230 return; | 228 return; |
231 | 229 |
232 // Since we can't fulfill a read request now then submit a read | 230 // Since we can't fulfill a read request now then submit a read |
233 // request to the demuxer stream. | 231 // request to the demuxer stream. |
234 demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete)); | 232 demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete)); |
235 ++pending_reads_; | 233 ++pending_reads_; |
236 } | 234 } |
237 | 235 |
238 void ReadCompleteTask(scoped_refptr<Buffer> buffer) { | 236 void ReadCompleteTask(scoped_refptr<Buffer> buffer) { |
(...skipping 14 matching lines...) Expand all Loading... |
253 // Decode the frame right away. | 251 // Decode the frame right away. |
254 DoDecode(buffer); | 252 DoDecode(buffer); |
255 } | 253 } |
256 | 254 |
257 // Attempts to fulfill a single pending read by dequeuing a buffer and read | 255 // Attempts to fulfill a single pending read by dequeuing a buffer and read |
258 // callback pair and executing the callback. | 256 // callback pair and executing the callback. |
259 // | 257 // |
260 // Return true if one read request is fulfilled. | 258 // Return true if one read request is fulfilled. |
261 bool FulfillPendingRead() { | 259 bool FulfillPendingRead() { |
262 DCHECK_EQ(MessageLoop::current(), this->message_loop()); | 260 DCHECK_EQ(MessageLoop::current(), this->message_loop()); |
263 if (read_queue_.empty() || result_queue_.empty()) { | 261 if (!pending_requests_ || result_queue_.empty()) { |
264 return false; | 262 return false; |
265 } | 263 } |
266 | 264 |
267 // Dequeue a frame and read callback pair. | 265 // Dequeue a frame and read callback pair. |
268 scoped_refptr<Output> output = result_queue_.front(); | 266 scoped_refptr<Output> output = result_queue_.front(); |
269 scoped_ptr<ReadCallback> read_callback(read_queue_.front()); | |
270 result_queue_.pop_front(); | 267 result_queue_.pop_front(); |
271 read_queue_.pop_front(); | |
272 | 268 |
273 // Execute the callback! | 269 // Execute the callback! |
274 read_callback->Run(output); | 270 --pending_requests_; |
| 271 Decoder::fill_buffer_done_callback()->Run(output); |
275 return true; | 272 return true; |
276 } | 273 } |
277 | 274 |
278 // Tracks the number of asynchronous reads issued to |demuxer_stream_|. | 275 // Tracks the number of asynchronous reads issued to |demuxer_stream_|. |
279 // Using size_t since it is always compared against deque::size(). | 276 // Using size_t since it is always compared against deque::size(). |
280 size_t pending_reads_; | 277 size_t pending_reads_; |
| 278 // Tracks the number of asynchronous reads issued from renderer. |
| 279 size_t pending_requests_; |
281 | 280 |
282 // A flag used for debugging that we expect our next read to be discontinuous. | 281 // A flag used for debugging that we expect our next read to be discontinuous. |
283 bool expecting_discontinuous_; | 282 bool expecting_discontinuous_; |
284 | 283 |
285 // Pointer to the demuxer stream that will feed us compressed buffers. | 284 // Pointer to the demuxer stream that will feed us compressed buffers. |
286 scoped_refptr<DemuxerStream> demuxer_stream_; | 285 scoped_refptr<DemuxerStream> demuxer_stream_; |
287 | 286 |
288 // Queue of decoded samples produced in the OnDecode() method of the decoder. | 287 // Queue of decoded samples produced in the OnDecode() method of the decoder. |
289 // Any samples placed in this queue will be assigned to the OutputQueue | 288 // Any samples placed in this queue will be assigned to the OutputQueue |
290 // buffers once the OnDecode() method returns. | 289 // buffers once the OnDecode() method returns. |
291 // | 290 // |
292 // TODO(ralphl): Eventually we want to have decoders get their destination | 291 // TODO(ralphl): Eventually we want to have decoders get their destination |
293 // buffer from the OutputQueue and write to it directly. Until we change | 292 // buffer from the OutputQueue and write to it directly. Until we change |
294 // from the Assignable buffer to callbacks and renderer-allocated buffers, | 293 // from the Assignable buffer to callbacks and renderer-allocated buffers, |
295 // we need this extra queue. | 294 // we need this extra queue. |
296 typedef std::deque<scoped_refptr<Output> > ResultQueue; | 295 typedef std::deque<scoped_refptr<Output> > ResultQueue; |
297 ResultQueue result_queue_; | 296 ResultQueue result_queue_; |
298 | 297 |
299 // Queue of callbacks supplied by the renderer through the Read() method. | |
300 typedef std::deque<ReadCallback*> ReadQueue; | |
301 ReadQueue read_queue_; | |
302 | |
303 // Pause callback. | 298 // Pause callback. |
304 scoped_ptr<FilterCallback> pause_callback_; | 299 scoped_ptr<FilterCallback> pause_callback_; |
305 | 300 |
306 // Simple state tracking variable. | 301 // Simple state tracking variable. |
307 enum State { | 302 enum State { |
308 kUninitialized, | 303 kUninitialized, |
309 kInitialized, | 304 kInitialized, |
310 kStopped, | 305 kStopped, |
311 }; | 306 }; |
312 State state_; | 307 State state_; |
313 | 308 |
314 DISALLOW_COPY_AND_ASSIGN(DecoderBase); | 309 DISALLOW_COPY_AND_ASSIGN(DecoderBase); |
315 }; | 310 }; |
316 | 311 |
317 } // namespace media | 312 } // namespace media |
318 | 313 |
319 #endif // MEDIA_FILTERS_DECODER_BASE_H_ | 314 #endif // MEDIA_FILTERS_DECODER_BASE_H_ |
OLD | NEW |