Index: content/renderer/scheduler/task_queue_manager.cc |
diff --git a/content/renderer/scheduler/task_queue_manager.cc b/content/renderer/scheduler/task_queue_manager.cc |
index 49495f8788bd6508382d008f5feb4db4b423bbbf..227a7e87e20b74143446b7561706ab8305665612 100644 |
--- a/content/renderer/scheduler/task_queue_manager.cc |
+++ b/content/renderer/scheduler/task_queue_manager.cc |
@@ -11,10 +11,9 @@ |
namespace content { |
namespace internal { |
-class TaskRunner : public base::SingleThreadTaskRunner { |
+class TaskQueue : public base::SingleThreadTaskRunner { |
public: |
- TaskRunner(base::WeakPtr<TaskQueueManager> task_queue_manager, |
- size_t queue_index); |
+ TaskQueue(TaskQueueManager* task_queue_manager); |
// base::SingleThreadTaskRunner implementation. |
virtual bool RunsTasksOnCurrentThread() const override; |
@@ -26,62 +25,155 @@ class TaskRunner : public base::SingleThreadTaskRunner { |
const base::Closure& task, |
base::TimeDelta delay) override; |
+ // Adds a task at the end of the incoming task queue and schedules a call to |
+ // TaskQueueManager::DoWork() if the incoming queue was empty and automatic |
+ // pumping is enabled. Can be called on an arbitrary thread. |
+ void EnqueueTask(const base::PendingTask& pending_task); |
+ |
+ bool IsQueueEmpty() const; |
+ |
+ void SetAutoPump(bool auto_pump); |
+ void PumpQueue(); |
+ |
+ bool UpdateWorkQueue(); |
+ base::PendingTask TakeTaskFromWorkQueue(); |
+ |
+ void WillDeleteTaskQueueManager(); |
+ |
+ base::TaskQueue& work_queue() { return work_queue_; } |
+ |
private: |
- virtual ~TaskRunner(); |
+ virtual ~TaskQueue(); |
+ |
+ void PumpQueueLocked(); |
+ void EnqueueTaskLocked(const base::PendingTask& pending_task); |
+ |
+ // This lock protects all members except the work queue. |
+ mutable base::Lock lock_; |
+ TaskQueueManager* task_queue_manager_; |
+ base::TaskQueue incoming_queue_; |
+ bool auto_pump_; |
- base::WeakPtr<TaskQueueManager> task_queue_manager_; |
- const size_t queue_index_; |
+ base::TaskQueue work_queue_; |
- DISALLOW_COPY_AND_ASSIGN(TaskRunner); |
+ DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
}; |
-TaskRunner::TaskRunner(base::WeakPtr<TaskQueueManager> task_queue_manager, |
- size_t queue_index) |
- : task_queue_manager_(task_queue_manager), queue_index_(queue_index) { |
+TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager) |
+ : task_queue_manager_(task_queue_manager), auto_pump_(true) { |
+} |
+ |
+TaskQueue::~TaskQueue() { |
} |
-TaskRunner::~TaskRunner() { |
+void TaskQueue::WillDeleteTaskQueueManager() { |
+ base::AutoLock lock(lock_); |
+ task_queue_manager_ = nullptr; |
} |
-bool TaskRunner::RunsTasksOnCurrentThread() const { |
+bool TaskQueue::RunsTasksOnCurrentThread() const { |
+ base::AutoLock lock(lock_); |
if (!task_queue_manager_) |
return false; |
return task_queue_manager_->RunsTasksOnCurrentThread(); |
} |
-bool TaskRunner::PostDelayedTask(const tracked_objects::Location& from_here, |
- const base::Closure& task, |
- base::TimeDelta delay) { |
+bool TaskQueue::PostDelayedTask(const tracked_objects::Location& from_here, |
+ const base::Closure& task, |
+ base::TimeDelta delay) { |
+ base::AutoLock lock(lock_); |
if (!task_queue_manager_) |
return false; |
- return task_queue_manager_->PostDelayedTask( |
- queue_index_, from_here, task, delay); |
+ |
+ base::PendingTask pending_task(from_here, task); |
+ task_queue_manager_->DidQueueTask(&pending_task); |
+ |
+ if (delay > base::TimeDelta()) { |
+ return task_queue_manager_->PostDelayedTask( |
+ from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); |
+ } |
+ EnqueueTaskLocked(pending_task); |
+ return true; |
} |
-bool TaskRunner::PostNonNestableDelayedTask( |
+bool TaskQueue::PostNonNestableDelayedTask( |
const tracked_objects::Location& from_here, |
const base::Closure& task, |
base::TimeDelta delay) { |
+ base::AutoLock lock(lock_); |
if (!task_queue_manager_) |
return false; |
return task_queue_manager_->PostNonNestableDelayedTask( |
- queue_index_, from_here, task, delay); |
+ from_here, task, delay); |
} |
-struct TaskQueue { |
- TaskQueue() : auto_pump(true) {} |
- ~TaskQueue() {} |
+bool TaskQueue::IsQueueEmpty() const { |
+ if (!work_queue_.empty()) |
+ return false; |
- scoped_refptr<TaskRunner> task_runner; |
+ { |
+ base::AutoLock lock(lock_); |
+ return incoming_queue_.empty(); |
+ } |
+} |
- base::Lock incoming_queue_lock; |
- base::TaskQueue incoming_queue; |
+bool TaskQueue::UpdateWorkQueue() { |
+ if (!work_queue_.empty()) |
+ return true; |
- bool auto_pump; |
- base::TaskQueue work_queue; |
+ { |
+ base::AutoLock lock(lock_); |
+ if (!auto_pump_ || incoming_queue_.empty()) |
+ return false; |
+ work_queue_.Swap(&incoming_queue_); |
+ return true; |
+ } |
+} |
- DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
-}; |
+base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { |
+ base::PendingTask pending_task = work_queue_.front(); |
+ work_queue_.pop(); |
+ return pending_task; |
+} |
+ |
+void TaskQueue::EnqueueTask(const base::PendingTask& pending_task) { |
+ base::AutoLock lock(lock_); |
+ EnqueueTaskLocked(pending_task); |
+} |
+ |
+void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
+ lock_.AssertAcquired(); |
+ if (!task_queue_manager_) |
+ return; |
+ if (auto_pump_ && incoming_queue_.empty()) |
+ task_queue_manager_->PostDoWorkOnMainRunner(); |
+ incoming_queue_.push(pending_task); |
+} |
+ |
+void TaskQueue::SetAutoPump(bool auto_pump) { |
+ base::AutoLock lock(lock_); |
+ if (auto_pump) { |
+ auto_pump_ = true; |
+ PumpQueueLocked(); |
+ } else { |
+ auto_pump_ = false; |
+ } |
+} |
+ |
+void TaskQueue::PumpQueueLocked() { |
+ lock_.AssertAcquired(); |
+ while (!incoming_queue_.empty()) { |
+ work_queue_.push(incoming_queue_.front()); |
+ incoming_queue_.pop(); |
+ } |
+ if (!work_queue_.empty()) |
+ task_queue_manager_->PostDoWorkOnMainRunner(); |
+} |
+ |
+void TaskQueue::PumpQueue() { |
+ base::AutoLock lock(lock_); |
+ PumpQueueLocked(); |
+} |
} // namespace |
@@ -94,76 +186,49 @@ TaskQueueManager::TaskQueueManager( |
weak_factory_(this) { |
DCHECK(main_task_runner->RunsTasksOnCurrentThread()); |
+ task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr(); |
for (size_t i = 0; i < task_queue_count; i++) { |
- scoped_ptr<internal::TaskQueue> queue(new internal::TaskQueue()); |
- queue->task_runner = make_scoped_refptr( |
- new internal::TaskRunner(weak_factory_.GetWeakPtr(), i)); |
- queues_.push_back(queue.release()); |
+ scoped_refptr<internal::TaskQueue> queue( |
+ make_scoped_refptr(new internal::TaskQueue(this))); |
+ queues_.push_back(queue); |
} |
std::vector<const base::TaskQueue*> work_queues; |
for (const auto& queue: queues_) |
- work_queues.push_back(&queue->work_queue); |
+ work_queues.push_back(&queue->work_queue()); |
selector_->RegisterWorkQueues(work_queues); |
} |
TaskQueueManager::~TaskQueueManager() { |
+ for (auto& queue : queues_) |
+ queue->WillDeleteTaskQueueManager(); |
} |
internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const { |
DCHECK_LT(queue_index, queues_.size()); |
- return queues_[queue_index]; |
+ return queues_[queue_index].get(); |
} |
scoped_refptr<base::SingleThreadTaskRunner> |
TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const { |
- return Queue(queue_index)->task_runner; |
-} |
- |
-bool TaskQueueManager::IsQueueEmpty(size_t queue_index) { |
- internal::TaskQueue* queue = Queue(queue_index); |
- if (!queue->work_queue.empty()) |
- return false; |
- base::AutoLock lock(queue->incoming_queue_lock); |
- return queue->incoming_queue.empty(); |
+ return Queue(queue_index); |
} |
-void TaskQueueManager::EnqueueTask(size_t queue_index, |
- const base::PendingTask& pending_task) { |
+bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const { |
internal::TaskQueue* queue = Queue(queue_index); |
- base::AutoLock lock(queue->incoming_queue_lock); |
- if (queue->auto_pump && queue->incoming_queue.empty()) |
- PostDoWorkOnMainRunner(); |
- queue->incoming_queue.push(pending_task); |
+ return queue->IsQueueEmpty(); |
} |
void TaskQueueManager::SetAutoPump(size_t queue_index, bool auto_pump) { |
- internal::TaskQueue* queue = Queue(queue_index); |
- base::AutoLock lock(queue->incoming_queue_lock); |
- if (auto_pump) { |
- queue->auto_pump = true; |
- PumpQueueLocked(queue); |
- } else { |
- queue->auto_pump = false; |
- } |
-} |
- |
-void TaskQueueManager::PumpQueueLocked(internal::TaskQueue* queue) { |
main_thread_checker_.CalledOnValidThread(); |
- queue->incoming_queue_lock.AssertAcquired(); |
- while (!queue->incoming_queue.empty()) { |
- queue->work_queue.push(queue->incoming_queue.front()); |
- queue->incoming_queue.pop(); |
- } |
- if (!queue->work_queue.empty()) |
- PostDoWorkOnMainRunner(); |
+ internal::TaskQueue* queue = Queue(queue_index); |
+ queue->SetAutoPump(auto_pump); |
} |
void TaskQueueManager::PumpQueue(size_t queue_index) { |
main_thread_checker_.CalledOnValidThread(); |
internal::TaskQueue* queue = Queue(queue_index); |
- base::AutoLock lock(queue->incoming_queue_lock); |
- PumpQueueLocked(queue); |
+ queue->PumpQueue(); |
} |
bool TaskQueueManager::UpdateWorkQueues() { |
@@ -172,23 +237,14 @@ bool TaskQueueManager::UpdateWorkQueues() { |
// there. |
main_thread_checker_.CalledOnValidThread(); |
bool has_work = false; |
- for (auto& queue: queues_) { |
- if (!queue->work_queue.empty()) { |
- has_work = true; |
- continue; |
- } |
- base::AutoLock lock(queue->incoming_queue_lock); |
- if (!queue->auto_pump || queue->incoming_queue.empty()) |
- continue; |
- queue->work_queue.Swap(&queue->incoming_queue); |
- has_work = true; |
- } |
+ for (auto& queue : queues_) |
+ has_work |= queue->UpdateWorkQueue(); |
return has_work; |
} |
void TaskQueueManager::PostDoWorkOnMainRunner() { |
main_task_runner_->PostTask( |
- FROM_HERE, Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr())); |
+ FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_)); |
} |
void TaskQueueManager::DoWork() { |
@@ -203,12 +259,15 @@ void TaskQueueManager::DoWork() { |
RunTaskFromWorkQueue(queue_index); |
} |
+void TaskQueueManager::DidQueueTask(base::PendingTask* pending_task) { |
+ pending_task->sequence_num = task_sequence_num_.GetNext(); |
+ task_annotator_.DidQueueTask("TaskQueueManager::PostTask", *pending_task); |
+} |
+ |
void TaskQueueManager::RunTaskFromWorkQueue(size_t queue_index) { |
main_thread_checker_.CalledOnValidThread(); |
internal::TaskQueue* queue = Queue(queue_index); |
- DCHECK(!queue->work_queue.empty()); |
- base::PendingTask pending_task = queue->work_queue.front(); |
- queue->work_queue.pop(); |
+ base::PendingTask pending_task = queue->TakeTaskFromWorkQueue(); |
task_annotator_.RunTask( |
"TaskQueueManager::PostTask", "TaskQueueManager::RunTask", pending_task); |
} |
@@ -218,31 +277,14 @@ bool TaskQueueManager::RunsTasksOnCurrentThread() const { |
} |
bool TaskQueueManager::PostDelayedTask( |
- size_t queue_index, |
const tracked_objects::Location& from_here, |
const base::Closure& task, |
base::TimeDelta delay) { |
- int sequence_num = task_sequence_num_.GetNext(); |
- |
- base::PendingTask pending_task(from_here, task); |
- pending_task.sequence_num = sequence_num; |
- |
- task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task); |
- if (delay > base::TimeDelta()) { |
- return main_task_runner_->PostDelayedTask( |
- from_here, |
- Bind(&TaskQueueManager::EnqueueTask, |
- weak_factory_.GetWeakPtr(), |
- queue_index, |
- pending_task), |
- delay); |
- } |
- EnqueueTask(queue_index, pending_task); |
- return true; |
+ DCHECK(delay > base::TimeDelta()); |
+ return main_task_runner_->PostDelayedTask(from_here, task, delay); |
} |
bool TaskQueueManager::PostNonNestableDelayedTask( |
- size_t queue_index, |
const tracked_objects::Location& from_here, |
const base::Closure& task, |
base::TimeDelta delay) { |