Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(442)

Unified Diff: media/filters/decoder_base.h

Issue 146068: Switching decoders to use the injected message loop. (Closed)
Patch Set: More fixes Created 11 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « media/base/pipeline_impl.cc ('k') | media/filters/ffmpeg_audio_decoder.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: media/filters/decoder_base.h
diff --git a/media/filters/decoder_base.h b/media/filters/decoder_base.h
index 4f70f7a009cd328c0ec1f9f148acd43f9238366b..150843927cb1646bdff89c76dfc568565f409c66 100644
--- a/media/filters/decoder_base.h
+++ b/media/filters/decoder_base.h
@@ -10,6 +10,7 @@
#include <deque>
#include "base/lock.h"
+#include "base/stl_util-inl.h"
#include "base/task.h"
#include "base/thread.h"
#include "media/base/buffers.h"
@@ -25,120 +26,61 @@ class DecoderBase : public Decoder {
// MediaFilter implementation.
virtual void Stop() {
- OnStop();
- {
- AutoLock auto_lock(lock_);
- running_ = false;
- if (process_task_) {
- process_task_->Cancel();
- process_task_ = NULL;
- }
- DiscardQueues_Locked();
- }
-
- // Stop our decoding thread.
- thread_.Stop();
+ message_loop()->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &DecoderBase::StopTask));
}
virtual void Seek(base::TimeDelta time) {
- // Delegate to the subclass first.
- OnSeek(time);
-
- // Flush the result queue.
- AutoLock auto_lock(lock_);
- result_queue_.clear();
-
- // Flush the input queue. This will trigger more reads from the demuxer.
- input_queue_.clear();
-
- // Turn on the seeking flag so that we can discard buffers until a
- // discontinuous buffer is received.
- seeking_ = true;
-
- // Trigger more reads and keep the process loop rolling.
- ScheduleProcessTask_Locked();
+ message_loop()->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &DecoderBase::SeekTask, time));
}
// Decoder implementation.
virtual bool Initialize(DemuxerStream* demuxer_stream) {
- demuxer_stream_ = demuxer_stream;
-
- // Start our internal decoding thread.
- if (!thread_.Start()) {
- host()->Error(PIPELINE_ERROR_DECODE);
- return false;
- }
-
- if (!OnInitialize(demuxer_stream)) {
- // Release our resources and stop our thread.
- // TODO(scherkus): shouldn't stop a thread inside Initialize(), but until I
- // figure out proper error signaling semantics we're going to do it anyway!!
- host()->Error(PIPELINE_ERROR_DECODE);
- demuxer_stream_ = NULL;
- thread_.Stop();
- return false;
- }
-
- DCHECK(!media_format_.empty());
- host()->InitializationComplete();
+ message_loop()->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &DecoderBase::InitializeTask, demuxer_stream));
return true;
}
virtual const MediaFormat& media_format() { return media_format_; }
- // Audio or Video decoder.
+ // Audio or video decoder.
virtual void Read(ReadCallback* read_callback) {
- AutoLock auto_lock(lock_);
- if (IsRunning()) {
- read_queue_.push_back(read_callback);
- ScheduleProcessTask_Locked();
- } else {
- delete read_callback;
- }
+ message_loop()->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &DecoderBase::ReadTask, read_callback));
}
void OnReadComplete(Buffer* buffer) {
- AutoLock auto_lock(lock_);
- if (IsRunning()) {
- // Once the |seeking_| flag is set we ignore every buffers here
- // until we receive a discontinuous buffer and we will turn off the
- // |seeking_| flag.
- if (buffer->IsDiscontinuous()) {
- // TODO(hclam): put a DCHECK here to assert |seeking_| being true.
- // I cannot do this now because seek operation is not fully
- // asynchronous. There may be pending seek requests even before the
- // previous was finished.
- seeking_ = false;
- }
- if (!seeking_)
- input_queue_.push_back(buffer);
- --pending_reads_;
- ScheduleProcessTask_Locked();
- }
+ // Little bit of magic here to get NewRunnableMethod() to generate a Task
+ // that holds onto a reference via scoped_refptr<>.
+ //
+ // TODO(scherkus): change the callback format to pass a scoped_refptr<> or
+ // better yet see if we can get away with not using reference counting.
+ scoped_refptr<Buffer> buffer_ref = buffer;
+ message_loop()->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref));
}
protected:
- // |thread_name| is mandatory and is used to identify the thread in debuggers.
- explicit DecoderBase(const char* thread_name)
- : running_(true),
- demuxer_stream_(NULL),
- thread_(thread_name),
- pending_reads_(0),
- process_task_(NULL),
- seeking_(false) {
+ DecoderBase()
+ : pending_reads_(0),
+ seeking_(false),
+ state_(UNINITIALIZED),
+ thread_id_(NULL) {
}
virtual ~DecoderBase() {
- DCHECK(!thread_.IsRunning());
- DCHECK(!process_task_);
+ DCHECK(state_ == UNINITIALIZED || state_ == STOPPED);
+ DCHECK(result_queue_.empty());
+ DCHECK(read_queue_.empty());
}
// This method is called by the derived class from within the OnDecode method.
// It places an output buffer in the result queue. It must be called from
// within the OnDecode method.
void EnqueueResult(Output* output) {
- AutoLock auto_lock(lock_);
- if (IsRunning()) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+ if (!stopped_) {
result_queue_.push_back(output);
}
}
@@ -166,156 +108,185 @@ class DecoderBase : public Decoder {
// the EnequeueResult() method from within this method.
virtual void OnDecode(Buffer* input) = 0;
- bool IsRunning() const { return running_; }
+ // Used for subclasses who friend unit tests and need to set the thread id.
+ virtual void set_thread_id(PlatformThreadId thread_id) {
+ thread_id_ = thread_id;
+ }
MediaFormat media_format_;
private:
- // The GCL compiler does not like .cc files that directly access members of
- // a base class. This inline method helps.
+ // GCC doesn't let us access superclass member variables directly, so use
+ // a helper to get around the situation.
+ //
+ // TODO(scherkus): another reason to add protected accessors to MediaFilter.
FilterHost* host() const { return Decoder::host_; }
+ MessageLoop* message_loop() const { return Decoder::message_loop_; }
- // Schedules a task that will execute the ProcessTask method.
- void ScheduleProcessTask_Locked() {
- lock_.AssertAcquired();
- DCHECK(IsRunning());
- if (!process_task_) {
- process_task_ = NewRunnableMethod(this, &DecoderBase::ProcessTask);
- thread_.message_loop()->PostTask(FROM_HERE, process_task_);
- }
+ void StopTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+ // Delegate to the subclass first.
+ OnStop();
+
+ // Throw away all buffers in all queues.
+ result_queue_.clear();
+ STLDeleteElements(&read_queue_);
+ state_ = STOPPED;
}
- // The core work loop of the decoder base. This method will run the methods
- // SubmitReads_Locked(), ProcessInput_Locked(), and ProcessOutput_Locked() in
- // a loop until they either produce no further work, or the filter is stopped.
- // Once there is no further work to do, the method returns. A later call to
- // the ScheduleProcessTask_Locked() method will start this task again.
- void ProcessTask() {
- AutoLock auto_lock(lock_);
- bool did_some_work;
- do {
- did_some_work = SubmitReads_Locked();
- did_some_work |= ProcessInput_Locked();
- did_some_work |= ProcessOutput_Locked();
- } while (IsRunning() && did_some_work);
- DCHECK(process_task_ || !IsRunning());
- process_task_ = NULL;
+ void SeekTask(base::TimeDelta time) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+ // Delegate to the subclass first.
+ OnSeek(time);
+
+ // Flush the result queue.
+ result_queue_.clear();
+
+ // Turn on the seeking flag so that we can discard buffers until a
+ // discontinuous buffer is received.
+ seeking_ = true;
}
- // If necessary, calls the |demuxer_stream_| to read buffers. Returns true
- // if reads have happened, else false. This method must be called with
- // |lock_| acquired. If the method submits any reads, then it will Release()
- // the |lock_| when calling the demuxer and then re-Acquire() the |lock_|.
- bool SubmitReads_Locked() {
- lock_.AssertAcquired();
- bool did_read = false;
- if (IsRunning() &&
- pending_reads_ + input_queue_.size() < read_queue_.size()) {
- did_read = true;
- size_t read = read_queue_.size() - pending_reads_ - input_queue_.size();
- pending_reads_ += read;
- {
- AutoUnlock unlock(lock_);
- while (read) {
- demuxer_stream_->
- Read(NewCallback(this, &DecoderBase::OnReadComplete));
- --read;
- }
- }
+ void InitializeTask(DemuxerStream* demuxer_stream) {
+ DCHECK(state_ == UNINITIALIZED);
+ DCHECK(!demuxer_stream_);
+ DCHECK(!thread_id_ || thread_id_ == PlatformThread::CurrentId());
+ demuxer_stream_ = demuxer_stream;
+
+ // Grab the thread id for debugging.
+ thread_id_ = PlatformThread::CurrentId();
+
+ // Delegate to subclass first.
+ if (!OnInitialize(demuxer_stream_)) {
+ host()->Error(PIPELINE_ERROR_DECODE);
+ return;
}
- return did_read;
+
+ // TODO(scherkus): subclass shouldn't mutate superclass media format.
+ DCHECK(!media_format_.empty()) << "Subclass did not set media_format_";
+ state_ = INITIALIZED;
+ host()->InitializationComplete();
}
- // If the |input_queue_| has any buffers, this method will call the derived
- // class's OnDecode() method.
- bool ProcessInput_Locked() {
- lock_.AssertAcquired();
- bool did_decode = false;
- while (IsRunning() && !input_queue_.empty()) {
- did_decode = true;
- scoped_refptr<Buffer> input = input_queue_.front();
- input_queue_.pop_front();
- // Release |lock_| before calling the derived class to do the decode.
- {
- AutoUnlock unlock(lock_);
- OnDecode(input);
- }
+ void ReadTask(ReadCallback* read_callback) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+ // TODO(scherkus): should reply with a null operation (empty buffer).
+ if (stopped_) {
+ delete read_callback;
+ return;
}
- return did_decode;
- }
- // Removes any buffers from the |result_queue_| and calls the next callback
- // in the |read_queue_|.
- bool ProcessOutput_Locked() {
- lock_.AssertAcquired();
- bool called_renderer = false;
- while (IsRunning() && !read_queue_.empty() && !result_queue_.empty()) {
- called_renderer = true;
- scoped_refptr<Output> output = result_queue_.front();
- result_queue_.pop_front();
- scoped_ptr<ReadCallback> read_callback(read_queue_.front());
- read_queue_.pop_front();
- // Release |lock_| before calling the renderer.
- {
- AutoUnlock unlock(lock_);
- read_callback->Run(output);
- }
+ // Enqueue the callback and attempt to fulfill it immediately.
+ read_queue_.push_back(read_callback);
+ FulfillPendingRead();
+
+ // Issue reads as necessary.
+ while (pending_reads_ < read_queue_.size()) {
+ demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
+ ++pending_reads_;
}
- return called_renderer;
}
- // Throw away all buffers in all queues.
- void DiscardQueues_Locked() {
- lock_.AssertAcquired();
- input_queue_.clear();
- result_queue_.clear();
- while (!read_queue_.empty()) {
- delete read_queue_.front();
- read_queue_.pop_front();
+ void ReadCompleteTask(scoped_refptr<Buffer> buffer) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+ DCHECK_GT(pending_reads_, 0u);
+ --pending_reads_;
+ if (stopped_) {
+ return;
+ }
+
+ // Once the |seeking_| flag is set we ignore every buffers here
+ // until we receive a discontinuous buffer and we will turn off the
+ // |seeking_| flag.
+ if (buffer->IsDiscontinuous()) {
+ // TODO(hclam): put a DCHECK here to assert |seeking_| being true.
+ // I cannot do this now because seek operation is not fully
+ // asynchronous. There may be pending seek requests even before the
+ // previous was finished.
+ seeking_ = false;
+ }
+ if (seeking_) {
+ return;
}
- }
- // The critical section for the decoder.
- Lock lock_;
+ // Decode the frame right away.
+ OnDecode(buffer);
+
+ // Attempt to fulfill a pending read callback and schedule additional reads
+ // if necessary.
+ FulfillPendingRead();
+
+ // Issue reads as necessary.
+ //
+ // Note that it's possible for us to decode but not produce a frame, in
+ // which case |pending_reads_| will remain less than |read_queue_| so we
+ // need to schedule an additional read.
+ DCHECK_LE(pending_reads_, read_queue_.size());
+ while (pending_reads_ < read_queue_.size()) {
+ demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
+ ++pending_reads_;
+ }
+ }
- // If false, then the Stop() method has been called, and no further processing
- // of buffers should occur.
- bool running_;
+ // Attempts to fulfill a single pending read by dequeuing a buffer and read
+ // callback pair and executing the callback.
+ void FulfillPendingRead() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+ if (read_queue_.empty() || result_queue_.empty()) {
+ return;
+ }
- // Pointer to the demuxer stream that will feed us compressed buffers.
- scoped_refptr<DemuxerStream> demuxer_stream_;
+ // Dequeue a frame and read callback pair.
+ scoped_refptr<Output> output = result_queue_.front();
+ scoped_ptr<ReadCallback> read_callback(read_queue_.front());
+ result_queue_.pop_front();
+ read_queue_.pop_front();
- // The dedicated decoding thread for this filter.
- base::Thread thread_;
+ // Execute the callback!
+ read_callback->Run(output);
+ }
- // Number of times we have called Read() on the demuxer that have not yet
- // been satisfied.
+ // Tracks the number of asynchronous reads issued to |demuxer_stream_|.
+ // Using size_t since it is always compared against deque::size().
size_t pending_reads_;
- CancelableTask* process_task_;
+ // If true, then Stop() has been called and no further processing of buffers
+ // should occur.
+ bool stopped_;
+
+ // An internal state of the decoder that indicates that are waiting for seek
+ // to complete. We expect to receive a discontinuous frame/packet from the
+ // demuxer to signal that seeking is completed.
+ bool seeking_;
- // Queue of buffers read from the |demuxer_stream_|.
- typedef std::deque< scoped_refptr<Buffer> > InputQueue;
- InputQueue input_queue_;
+ // Pointer to the demuxer stream that will feed us compressed buffers.
+ scoped_refptr<DemuxerStream> demuxer_stream_;
// Queue of decoded samples produced in the OnDecode() method of the decoder.
// Any samples placed in this queue will be assigned to the OutputQueue
// buffers once the OnDecode() method returns.
+ //
// TODO(ralphl): Eventually we want to have decoders get their destination
// buffer from the OutputQueue and write to it directly. Until we change
// from the Assignable buffer to callbacks and renderer-allocated buffers,
// we need this extra queue.
- typedef std::deque< scoped_refptr<Output> > ResultQueue;
+ typedef std::deque<scoped_refptr<Output> > ResultQueue;
ResultQueue result_queue_;
// Queue of callbacks supplied by the renderer through the Read() method.
typedef std::deque<ReadCallback*> ReadQueue;
ReadQueue read_queue_;
- // An internal state of the decoder that indicates that are waiting for seek
- // to complete. We expect to receive a discontinuous frame/packet from the
- // demuxer to signal that seeking is completed.
- bool seeking_;
+ // Simple state tracking variable.
+ enum State {
+ UNINITIALIZED,
+ INITIALIZED,
+ STOPPED,
+ };
+ State state_;
+
+ // Used for debugging.
+ PlatformThreadId thread_id_;
DISALLOW_COPY_AND_ASSIGN(DecoderBase);
};
« no previous file with comments | « media/base/pipeline_impl.cc ('k') | media/filters/ffmpeg_audio_decoder.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698