| Index: media/filters/decoder_base.h
|
| diff --git a/media/filters/decoder_base.h b/media/filters/decoder_base.h
|
| index 4f70f7a009cd328c0ec1f9f148acd43f9238366b..150843927cb1646bdff89c76dfc568565f409c66 100644
|
| --- a/media/filters/decoder_base.h
|
| +++ b/media/filters/decoder_base.h
|
| @@ -10,6 +10,7 @@
|
| #include <deque>
|
|
|
| #include "base/lock.h"
|
| +#include "base/stl_util-inl.h"
|
| #include "base/task.h"
|
| #include "base/thread.h"
|
| #include "media/base/buffers.h"
|
| @@ -25,120 +26,61 @@ class DecoderBase : public Decoder {
|
|
|
| // MediaFilter implementation.
|
| virtual void Stop() {
|
| - OnStop();
|
| - {
|
| - AutoLock auto_lock(lock_);
|
| - running_ = false;
|
| - if (process_task_) {
|
| - process_task_->Cancel();
|
| - process_task_ = NULL;
|
| - }
|
| - DiscardQueues_Locked();
|
| - }
|
| -
|
| - // Stop our decoding thread.
|
| - thread_.Stop();
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &DecoderBase::StopTask));
|
| }
|
|
|
| virtual void Seek(base::TimeDelta time) {
|
| - // Delegate to the subclass first.
|
| - OnSeek(time);
|
| -
|
| - // Flush the result queue.
|
| - AutoLock auto_lock(lock_);
|
| - result_queue_.clear();
|
| -
|
| - // Flush the input queue. This will trigger more reads from the demuxer.
|
| - input_queue_.clear();
|
| -
|
| - // Turn on the seeking flag so that we can discard buffers until a
|
| - // discontinuous buffer is received.
|
| - seeking_ = true;
|
| -
|
| - // Trigger more reads and keep the process loop rolling.
|
| - ScheduleProcessTask_Locked();
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &DecoderBase::SeekTask, time));
|
| }
|
|
|
| // Decoder implementation.
|
| virtual bool Initialize(DemuxerStream* demuxer_stream) {
|
| - demuxer_stream_ = demuxer_stream;
|
| -
|
| - // Start our internal decoding thread.
|
| - if (!thread_.Start()) {
|
| - host()->Error(PIPELINE_ERROR_DECODE);
|
| - return false;
|
| - }
|
| -
|
| - if (!OnInitialize(demuxer_stream)) {
|
| - // Release our resources and stop our thread.
|
| - // TODO(scherkus): shouldn't stop a thread inside Initialize(), but until I
|
| - // figure out proper error signaling semantics we're going to do it anyway!!
|
| - host()->Error(PIPELINE_ERROR_DECODE);
|
| - demuxer_stream_ = NULL;
|
| - thread_.Stop();
|
| - return false;
|
| - }
|
| -
|
| - DCHECK(!media_format_.empty());
|
| - host()->InitializationComplete();
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &DecoderBase::InitializeTask, demuxer_stream));
|
| return true;
|
| }
|
|
|
| virtual const MediaFormat& media_format() { return media_format_; }
|
|
|
| - // Audio or Video decoder.
|
| + // Audio or video decoder.
|
| virtual void Read(ReadCallback* read_callback) {
|
| - AutoLock auto_lock(lock_);
|
| - if (IsRunning()) {
|
| - read_queue_.push_back(read_callback);
|
| - ScheduleProcessTask_Locked();
|
| - } else {
|
| - delete read_callback;
|
| - }
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &DecoderBase::ReadTask, read_callback));
|
| }
|
|
|
| void OnReadComplete(Buffer* buffer) {
|
| - AutoLock auto_lock(lock_);
|
| - if (IsRunning()) {
|
| - // Once the |seeking_| flag is set we ignore every buffers here
|
| - // until we receive a discontinuous buffer and we will turn off the
|
| - // |seeking_| flag.
|
| - if (buffer->IsDiscontinuous()) {
|
| - // TODO(hclam): put a DCHECK here to assert |seeking_| being true.
|
| - // I cannot do this now because seek operation is not fully
|
| - // asynchronous. There may be pending seek requests even before the
|
| - // previous was finished.
|
| - seeking_ = false;
|
| - }
|
| - if (!seeking_)
|
| - input_queue_.push_back(buffer);
|
| - --pending_reads_;
|
| - ScheduleProcessTask_Locked();
|
| - }
|
| + // Little bit of magic here to get NewRunnableMethod() to generate a Task
|
| + // that holds onto a reference via scoped_refptr<>.
|
| + //
|
| + // TODO(scherkus): change the callback format to pass a scoped_refptr<> or
|
| + // better yet see if we can get away with not using reference counting.
|
| + scoped_refptr<Buffer> buffer_ref = buffer;
|
| + message_loop()->PostTask(FROM_HERE,
|
| + NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref));
|
| }
|
|
|
| protected:
|
| - // |thread_name| is mandatory and is used to identify the thread in debuggers.
|
| - explicit DecoderBase(const char* thread_name)
|
| - : running_(true),
|
| - demuxer_stream_(NULL),
|
| - thread_(thread_name),
|
| - pending_reads_(0),
|
| - process_task_(NULL),
|
| - seeking_(false) {
|
| + DecoderBase()
|
| + : pending_reads_(0),
|
| + seeking_(false),
|
| + state_(UNINITIALIZED),
|
| + thread_id_(NULL) {
|
| }
|
|
|
| virtual ~DecoderBase() {
|
| - DCHECK(!thread_.IsRunning());
|
| - DCHECK(!process_task_);
|
| + DCHECK(state_ == UNINITIALIZED || state_ == STOPPED);
|
| + DCHECK(result_queue_.empty());
|
| + DCHECK(read_queue_.empty());
|
| }
|
|
|
| // This method is called by the derived class from within the OnDecode method.
|
| // It places an output buffer in the result queue. It must be called from
|
| // within the OnDecode method.
|
| void EnqueueResult(Output* output) {
|
| - AutoLock auto_lock(lock_);
|
| - if (IsRunning()) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
|
| + if (!stopped_) {
|
| result_queue_.push_back(output);
|
| }
|
| }
|
| @@ -166,156 +108,185 @@ class DecoderBase : public Decoder {
|
| // the EnequeueResult() method from within this method.
|
| virtual void OnDecode(Buffer* input) = 0;
|
|
|
| - bool IsRunning() const { return running_; }
|
| + // Used for subclasses who friend unit tests and need to set the thread id.
|
| + virtual void set_thread_id(PlatformThreadId thread_id) {
|
| + thread_id_ = thread_id;
|
| + }
|
|
|
| MediaFormat media_format_;
|
|
|
| private:
|
| - // The GCL compiler does not like .cc files that directly access members of
|
| - // a base class. This inline method helps.
|
| + // GCC doesn't let us access superclass member variables directly, so use
|
| + // a helper to get around the situation.
|
| + //
|
| + // TODO(scherkus): another reason to add protected accessors to MediaFilter.
|
| FilterHost* host() const { return Decoder::host_; }
|
| + MessageLoop* message_loop() const { return Decoder::message_loop_; }
|
|
|
| - // Schedules a task that will execute the ProcessTask method.
|
| - void ScheduleProcessTask_Locked() {
|
| - lock_.AssertAcquired();
|
| - DCHECK(IsRunning());
|
| - if (!process_task_) {
|
| - process_task_ = NewRunnableMethod(this, &DecoderBase::ProcessTask);
|
| - thread_.message_loop()->PostTask(FROM_HERE, process_task_);
|
| - }
|
| + void StopTask() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
|
| + // Delegate to the subclass first.
|
| + OnStop();
|
| +
|
| + // Throw away all buffers in all queues.
|
| + result_queue_.clear();
|
| + STLDeleteElements(&read_queue_);
|
| + state_ = STOPPED;
|
| }
|
|
|
| - // The core work loop of the decoder base. This method will run the methods
|
| - // SubmitReads_Locked(), ProcessInput_Locked(), and ProcessOutput_Locked() in
|
| - // a loop until they either produce no further work, or the filter is stopped.
|
| - // Once there is no further work to do, the method returns. A later call to
|
| - // the ScheduleProcessTask_Locked() method will start this task again.
|
| - void ProcessTask() {
|
| - AutoLock auto_lock(lock_);
|
| - bool did_some_work;
|
| - do {
|
| - did_some_work = SubmitReads_Locked();
|
| - did_some_work |= ProcessInput_Locked();
|
| - did_some_work |= ProcessOutput_Locked();
|
| - } while (IsRunning() && did_some_work);
|
| - DCHECK(process_task_ || !IsRunning());
|
| - process_task_ = NULL;
|
| + void SeekTask(base::TimeDelta time) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
|
| + // Delegate to the subclass first.
|
| + OnSeek(time);
|
| +
|
| + // Flush the result queue.
|
| + result_queue_.clear();
|
| +
|
| + // Turn on the seeking flag so that we can discard buffers until a
|
| + // discontinuous buffer is received.
|
| + seeking_ = true;
|
| }
|
|
|
| - // If necessary, calls the |demuxer_stream_| to read buffers. Returns true
|
| - // if reads have happened, else false. This method must be called with
|
| - // |lock_| acquired. If the method submits any reads, then it will Release()
|
| - // the |lock_| when calling the demuxer and then re-Acquire() the |lock_|.
|
| - bool SubmitReads_Locked() {
|
| - lock_.AssertAcquired();
|
| - bool did_read = false;
|
| - if (IsRunning() &&
|
| - pending_reads_ + input_queue_.size() < read_queue_.size()) {
|
| - did_read = true;
|
| - size_t read = read_queue_.size() - pending_reads_ - input_queue_.size();
|
| - pending_reads_ += read;
|
| - {
|
| - AutoUnlock unlock(lock_);
|
| - while (read) {
|
| - demuxer_stream_->
|
| - Read(NewCallback(this, &DecoderBase::OnReadComplete));
|
| - --read;
|
| - }
|
| - }
|
| + void InitializeTask(DemuxerStream* demuxer_stream) {
|
| + DCHECK(state_ == UNINITIALIZED);
|
| + DCHECK(!demuxer_stream_);
|
| + DCHECK(!thread_id_ || thread_id_ == PlatformThread::CurrentId());
|
| + demuxer_stream_ = demuxer_stream;
|
| +
|
| + // Grab the thread id for debugging.
|
| + thread_id_ = PlatformThread::CurrentId();
|
| +
|
| + // Delegate to subclass first.
|
| + if (!OnInitialize(demuxer_stream_)) {
|
| + host()->Error(PIPELINE_ERROR_DECODE);
|
| + return;
|
| }
|
| - return did_read;
|
| +
|
| + // TODO(scherkus): subclass shouldn't mutate superclass media format.
|
| + DCHECK(!media_format_.empty()) << "Subclass did not set media_format_";
|
| + state_ = INITIALIZED;
|
| + host()->InitializationComplete();
|
| }
|
|
|
| - // If the |input_queue_| has any buffers, this method will call the derived
|
| - // class's OnDecode() method.
|
| - bool ProcessInput_Locked() {
|
| - lock_.AssertAcquired();
|
| - bool did_decode = false;
|
| - while (IsRunning() && !input_queue_.empty()) {
|
| - did_decode = true;
|
| - scoped_refptr<Buffer> input = input_queue_.front();
|
| - input_queue_.pop_front();
|
| - // Release |lock_| before calling the derived class to do the decode.
|
| - {
|
| - AutoUnlock unlock(lock_);
|
| - OnDecode(input);
|
| - }
|
| + void ReadTask(ReadCallback* read_callback) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
|
| + // TODO(scherkus): should reply with a null operation (empty buffer).
|
| + if (stopped_) {
|
| + delete read_callback;
|
| + return;
|
| }
|
| - return did_decode;
|
| - }
|
|
|
| - // Removes any buffers from the |result_queue_| and calls the next callback
|
| - // in the |read_queue_|.
|
| - bool ProcessOutput_Locked() {
|
| - lock_.AssertAcquired();
|
| - bool called_renderer = false;
|
| - while (IsRunning() && !read_queue_.empty() && !result_queue_.empty()) {
|
| - called_renderer = true;
|
| - scoped_refptr<Output> output = result_queue_.front();
|
| - result_queue_.pop_front();
|
| - scoped_ptr<ReadCallback> read_callback(read_queue_.front());
|
| - read_queue_.pop_front();
|
| - // Release |lock_| before calling the renderer.
|
| - {
|
| - AutoUnlock unlock(lock_);
|
| - read_callback->Run(output);
|
| - }
|
| + // Enqueue the callback and attempt to fulfill it immediately.
|
| + read_queue_.push_back(read_callback);
|
| + FulfillPendingRead();
|
| +
|
| + // Issue reads as necessary.
|
| + while (pending_reads_ < read_queue_.size()) {
|
| + demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
|
| + ++pending_reads_;
|
| }
|
| - return called_renderer;
|
| }
|
|
|
| - // Throw away all buffers in all queues.
|
| - void DiscardQueues_Locked() {
|
| - lock_.AssertAcquired();
|
| - input_queue_.clear();
|
| - result_queue_.clear();
|
| - while (!read_queue_.empty()) {
|
| - delete read_queue_.front();
|
| - read_queue_.pop_front();
|
| + void ReadCompleteTask(scoped_refptr<Buffer> buffer) {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
|
| + DCHECK_GT(pending_reads_, 0u);
|
| + --pending_reads_;
|
| + if (stopped_) {
|
| + return;
|
| + }
|
| +
|
| + // Once the |seeking_| flag is set we ignore every buffers here
|
| + // until we receive a discontinuous buffer and we will turn off the
|
| + // |seeking_| flag.
|
| + if (buffer->IsDiscontinuous()) {
|
| + // TODO(hclam): put a DCHECK here to assert |seeking_| being true.
|
| + // I cannot do this now because seek operation is not fully
|
| + // asynchronous. There may be pending seek requests even before the
|
| + // previous was finished.
|
| + seeking_ = false;
|
| + }
|
| + if (seeking_) {
|
| + return;
|
| }
|
| - }
|
|
|
| - // The critical section for the decoder.
|
| - Lock lock_;
|
| + // Decode the frame right away.
|
| + OnDecode(buffer);
|
| +
|
| + // Attempt to fulfill a pending read callback and schedule additional reads
|
| + // if necessary.
|
| + FulfillPendingRead();
|
| +
|
| + // Issue reads as necessary.
|
| + //
|
| + // Note that it's possible for us to decode but not produce a frame, in
|
| + // which case |pending_reads_| will remain less than |read_queue_| so we
|
| + // need to schedule an additional read.
|
| + DCHECK_LE(pending_reads_, read_queue_.size());
|
| + while (pending_reads_ < read_queue_.size()) {
|
| + demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
|
| + ++pending_reads_;
|
| + }
|
| + }
|
|
|
| - // If false, then the Stop() method has been called, and no further processing
|
| - // of buffers should occur.
|
| - bool running_;
|
| + // Attempts to fulfill a single pending read by dequeuing a buffer and read
|
| + // callback pair and executing the callback.
|
| + void FulfillPendingRead() {
|
| + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
|
| + if (read_queue_.empty() || result_queue_.empty()) {
|
| + return;
|
| + }
|
|
|
| - // Pointer to the demuxer stream that will feed us compressed buffers.
|
| - scoped_refptr<DemuxerStream> demuxer_stream_;
|
| + // Dequeue a frame and read callback pair.
|
| + scoped_refptr<Output> output = result_queue_.front();
|
| + scoped_ptr<ReadCallback> read_callback(read_queue_.front());
|
| + result_queue_.pop_front();
|
| + read_queue_.pop_front();
|
|
|
| - // The dedicated decoding thread for this filter.
|
| - base::Thread thread_;
|
| + // Execute the callback!
|
| + read_callback->Run(output);
|
| + }
|
|
|
| - // Number of times we have called Read() on the demuxer that have not yet
|
| - // been satisfied.
|
| + // Tracks the number of asynchronous reads issued to |demuxer_stream_|.
|
| + // Using size_t since it is always compared against deque::size().
|
| size_t pending_reads_;
|
|
|
| - CancelableTask* process_task_;
|
| + // If true, then Stop() has been called and no further processing of buffers
|
| + // should occur.
|
| + bool stopped_;
|
| +
|
| + // An internal state of the decoder that indicates that are waiting for seek
|
| + // to complete. We expect to receive a discontinuous frame/packet from the
|
| + // demuxer to signal that seeking is completed.
|
| + bool seeking_;
|
|
|
| - // Queue of buffers read from the |demuxer_stream_|.
|
| - typedef std::deque< scoped_refptr<Buffer> > InputQueue;
|
| - InputQueue input_queue_;
|
| + // Pointer to the demuxer stream that will feed us compressed buffers.
|
| + scoped_refptr<DemuxerStream> demuxer_stream_;
|
|
|
| // Queue of decoded samples produced in the OnDecode() method of the decoder.
|
| // Any samples placed in this queue will be assigned to the OutputQueue
|
| // buffers once the OnDecode() method returns.
|
| + //
|
| // TODO(ralphl): Eventually we want to have decoders get their destination
|
| // buffer from the OutputQueue and write to it directly. Until we change
|
| // from the Assignable buffer to callbacks and renderer-allocated buffers,
|
| // we need this extra queue.
|
| - typedef std::deque< scoped_refptr<Output> > ResultQueue;
|
| + typedef std::deque<scoped_refptr<Output> > ResultQueue;
|
| ResultQueue result_queue_;
|
|
|
| // Queue of callbacks supplied by the renderer through the Read() method.
|
| typedef std::deque<ReadCallback*> ReadQueue;
|
| ReadQueue read_queue_;
|
|
|
| - // An internal state of the decoder that indicates that are waiting for seek
|
| - // to complete. We expect to receive a discontinuous frame/packet from the
|
| - // demuxer to signal that seeking is completed.
|
| - bool seeking_;
|
| + // Simple state tracking variable.
|
| + enum State {
|
| + UNINITIALIZED,
|
| + INITIALIZED,
|
| + STOPPED,
|
| + };
|
| + State state_;
|
| +
|
| + // Used for debugging.
|
| + PlatformThreadId thread_id_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(DecoderBase);
|
| };
|
|
|