| Index: components/scheduler/child/task_queue_impl.cc
|
| diff --git a/components/scheduler/child/task_queue_impl.cc b/components/scheduler/child/task_queue_impl.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6bd747bc3ad609124edd5a7559943e54bec63413
|
| --- /dev/null
|
| +++ b/components/scheduler/child/task_queue_impl.cc
|
| @@ -0,0 +1,504 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "components/scheduler/child/task_queue_impl.h"
|
| +
|
| +#include "components/scheduler/child/task_queue_manager.h"
|
| +
|
| +namespace scheduler {
|
| +namespace internal {
|
| +
|
| +TaskQueueImpl::TaskQueueImpl(
|
| + TaskQueueManager* task_queue_manager,
|
| + const Spec& spec,
|
| + const char* disabled_by_default_tracing_category,
|
| + const char* disabled_by_default_verbose_tracing_category)
|
| + : thread_id_(base::PlatformThread::CurrentId()),
|
| + task_queue_manager_(task_queue_manager),
|
| + pump_policy_(spec.pump_policy),
|
| + name_(spec.name),
|
| + disabled_by_default_tracing_category_(
|
| + disabled_by_default_tracing_category),
|
| + disabled_by_default_verbose_tracing_category_(
|
| + disabled_by_default_verbose_tracing_category),
|
| + wakeup_policy_(spec.wakeup_policy),
|
| + set_index_(0),
|
| + should_monitor_quiescence_(spec.should_monitor_quiescence),
|
| + should_notify_observers_(spec.should_notify_observers) {}
|
| +
|
| +TaskQueueImpl::~TaskQueueImpl() {}
|
| +
|
| +TaskQueueImpl::Task::Task()
|
| + : PendingTask(tracked_objects::Location(),
|
| + base::Closure(),
|
| + base::TimeTicks(),
|
| + true),
|
| +#ifndef NDEBUG
|
| + enqueue_order_set_(false),
|
| +#endif
|
| + enqueue_order_(0) {
|
| + sequence_num = 0;
|
| +}
|
| +
|
| +TaskQueueImpl::Task::Task(const tracked_objects::Location& posted_from,
|
| + const base::Closure& task,
|
| + int sequence_number,
|
| + bool nestable)
|
| + : PendingTask(posted_from, task, base::TimeTicks(), nestable),
|
| +#ifndef NDEBUG
|
| + enqueue_order_set_(false),
|
| +#endif
|
| + enqueue_order_(0) {
|
| + sequence_num = sequence_number;
|
| +}
|
| +
|
| +void TaskQueueImpl::UnregisterTaskQueue() {
|
| + if (!task_queue_manager_)
|
| + return;
|
| + task_queue_manager_->UnregisterTaskQueue(this);
|
| +
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + task_queue_manager_ = nullptr;
|
| + delayed_task_queue_ = std::priority_queue<Task>();
|
| + incoming_queue_ = std::queue<Task>();
|
| + work_queue_ = std::queue<Task>();
|
| + }
|
| +}
|
| +
|
| +bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
|
| + base::AutoLock lock(lock_);
|
| + return base::PlatformThread::CurrentId() == thread_id_;
|
| +}
|
| +
|
| +bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const base::Closure& task,
|
| + base::TimeDelta delay) {
|
| + return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL);
|
| +}
|
| +
|
| +bool TaskQueueImpl::PostNonNestableDelayedTask(
|
| + const tracked_objects::Location& from_here,
|
| + const base::Closure& task,
|
| + base::TimeDelta delay) {
|
| + return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE);
|
| +}
|
| +
|
| +bool TaskQueueImpl::PostDelayedTaskAt(
|
| + const tracked_objects::Location& from_here,
|
| + const base::Closure& task,
|
| + base::TimeTicks desired_run_time) {
|
| + base::AutoLock lock(lock_);
|
| + if (!task_queue_manager_)
|
| + return false;
|
| + LazyNow lazy_now(task_queue_manager_);
|
| + return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time,
|
| + TaskType::NORMAL);
|
| +}
|
| +
|
| +bool TaskQueueImpl::PostDelayedTaskImpl(
|
| + const tracked_objects::Location& from_here,
|
| + const base::Closure& task,
|
| + base::TimeDelta delay,
|
| + TaskType task_type) {
|
| + base::AutoLock lock(lock_);
|
| + if (!task_queue_manager_)
|
| + return false;
|
| + LazyNow lazy_now(task_queue_manager_);
|
| + base::TimeTicks desired_run_time;
|
| + if (delay > base::TimeDelta())
|
| + desired_run_time = lazy_now.Now() + delay;
|
| + return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time,
|
| + task_type);
|
| +}
|
| +
|
| +bool TaskQueueImpl::PostDelayedTaskLocked(
|
| + LazyNow* lazy_now,
|
| + const tracked_objects::Location& from_here,
|
| + const base::Closure& task,
|
| + base::TimeTicks desired_run_time,
|
| + TaskType task_type) {
|
| + lock_.AssertAcquired();
|
| + DCHECK(task_queue_manager_);
|
| + Task pending_task(from_here, task,
|
| + task_queue_manager_->GetNextSequenceNumber(),
|
| + task_type != TaskType::NON_NESTABLE);
|
| + task_queue_manager_->DidQueueTask(pending_task);
|
| +
|
| + if (!desired_run_time.is_null()) {
|
| + pending_task.delayed_run_time = std::max(lazy_now->Now(), desired_run_time);
|
| + // TODO(alexclarke): consider emplace() when C++11 library features allowed.
|
| + delayed_task_queue_.push(pending_task);
|
| + TraceQueueSize(true);
|
| + // Schedule a later call to MoveReadyDelayedTasksToIncomingQueue.
|
| + task_queue_manager_->ScheduleDelayedWork(this, desired_run_time, lazy_now);
|
| + return true;
|
| + }
|
| + pending_task.set_enqueue_order(pending_task.sequence_num);
|
| + EnqueueTaskLocked(pending_task);
|
| + return true;
|
| +}
|
| +
|
| +void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue(LazyNow* lazy_now) {
|
| + base::AutoLock lock(lock_);
|
| + if (!task_queue_manager_)
|
| + return;
|
| +
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
|
| +}
|
| +
|
| +void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked(
|
| + LazyNow* lazy_now) {
|
| + lock_.AssertAcquired();
|
| + // Enqueue all delayed tasks that should be running now.
|
| + while (!delayed_task_queue_.empty() &&
|
| + delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) {
|
| + // TODO(alexclarke): consider std::move() when allowed.
|
| + EnqueueDelayedTaskLocked(delayed_task_queue_.top());
|
| + delayed_task_queue_.pop();
|
| + }
|
| + TraceQueueSize(true);
|
| +}
|
| +
|
| +bool TaskQueueImpl::IsQueueEnabled() const {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + if (!task_queue_manager_)
|
| + return false;
|
| +
|
| + return task_queue_manager_->selector_.IsQueueEnabled(this);
|
| +}
|
| +
|
| +TaskQueue::QueueState TaskQueueImpl::GetQueueState() const {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + if (!work_queue_.empty())
|
| + return QueueState::HAS_WORK;
|
| +
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + if (incoming_queue_.empty()) {
|
| + return QueueState::EMPTY;
|
| + } else {
|
| + return QueueState::NEEDS_PUMPING;
|
| + }
|
| + }
|
| +}
|
| +
|
| +bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const Task* task) {
|
| + lock_.AssertAcquired();
|
| + // A null task is passed when UpdateQueue is called before any task is run.
|
| + // In this case we don't want to pump an after_wakeup queue, so return true
|
| + // here.
|
| + if (!task)
|
| + return true;
|
| +
|
| + // Return false if there are no task in the incoming queue.
|
| + if (incoming_queue_.empty())
|
| + return false;
|
| +
|
| + const TaskQueueImpl::Task& oldest_queued_task = incoming_queue_.front();
|
| + return task->enqueue_order() < oldest_queued_task.enqueue_order();
|
| +}
|
| +
|
| +bool TaskQueueImpl::ShouldAutoPumpQueueLocked(bool should_trigger_wakeup,
|
| + const Task* previous_task) {
|
| + lock_.AssertAcquired();
|
| + if (pump_policy_ == PumpPolicy::MANUAL)
|
| + return false;
|
| + if (pump_policy_ == PumpPolicy::AFTER_WAKEUP &&
|
| + (!should_trigger_wakeup || TaskIsOlderThanQueuedTasks(previous_task)))
|
| + return false;
|
| + if (incoming_queue_.empty())
|
| + return false;
|
| + return true;
|
| +}
|
| +
|
| +bool TaskQueueImpl::NextPendingDelayedTaskRunTime(
|
| + base::TimeTicks* next_pending_delayed_task) {
|
| + base::AutoLock lock(lock_);
|
| + if (delayed_task_queue_.empty())
|
| + return false;
|
| + *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time;
|
| + return true;
|
| +}
|
| +
|
| +void TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now,
|
| + bool should_trigger_wakeup,
|
| + const Task* previous_task) {
|
| + DCHECK(work_queue_.empty());
|
| + base::AutoLock lock(lock_);
|
| + if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup, previous_task))
|
| + return;
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
|
| + std::swap(work_queue_, incoming_queue_);
|
| + // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no
|
| + // longer needs to consider this queue for reloading.
|
| + task_queue_manager_->UnregisterAsUpdatableTaskQueue(this);
|
| + if (!work_queue_.empty()) {
|
| + DCHECK(task_queue_manager_);
|
| + task_queue_manager_->selector_.GetTaskQueueSets()->OnPushQueue(this);
|
| + TraceQueueSize(true);
|
| + }
|
| +}
|
| +
|
| +TaskQueueImpl::Task TaskQueueImpl::TakeTaskFromWorkQueue() {
|
| + // TODO(alexclarke): consider std::move() when allowed.
|
| + Task pending_task = work_queue_.front();
|
| + work_queue_.pop();
|
| + DCHECK(task_queue_manager_);
|
| + task_queue_manager_->selector_.GetTaskQueueSets()->OnPopQueue(this);
|
| + TraceQueueSize(false);
|
| + return pending_task;
|
| +}
|
| +
|
| +void TaskQueueImpl::TraceQueueSize(bool is_locked) const {
|
| + bool is_tracing;
|
| + TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_,
|
| + &is_tracing);
|
| + if (!is_tracing)
|
| + return;
|
| + if (!is_locked)
|
| + lock_.Acquire();
|
| + else
|
| + lock_.AssertAcquired();
|
| + TRACE_COUNTER1(
|
| + disabled_by_default_tracing_category_, GetName(),
|
| + incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size());
|
| + if (!is_locked)
|
| + lock_.Release();
|
| +}
|
| +
|
| +void TaskQueueImpl::EnqueueTaskLocked(const Task& pending_task) {
|
| + lock_.AssertAcquired();
|
| + if (!task_queue_manager_)
|
| + return;
|
| + if (incoming_queue_.empty())
|
| + task_queue_manager_->RegisterAsUpdatableTaskQueue(this);
|
| + if (pump_policy_ == PumpPolicy::AUTO && incoming_queue_.empty()) {
|
| + task_queue_manager_->MaybePostDoWorkOnMainRunner();
|
| + }
|
| + // TODO(alexclarke): consider std::move() when allowed.
|
| + incoming_queue_.push(pending_task);
|
| + TraceQueueSize(true);
|
| +}
|
| +
|
| +void TaskQueueImpl::EnqueueDelayedTaskLocked(const Task& pending_task) {
|
| + lock_.AssertAcquired();
|
| + if (!task_queue_manager_)
|
| + return;
|
| + if (incoming_queue_.empty())
|
| + task_queue_manager_->RegisterAsUpdatableTaskQueue(this);
|
| + // TODO(alexclarke): consider std::move() when allowed.
|
| + incoming_queue_.push(pending_task);
|
| + incoming_queue_.back().set_enqueue_order(
|
| + task_queue_manager_->GetNextSequenceNumber());
|
| + TraceQueueSize(true);
|
| +}
|
| +
|
| +void TaskQueueImpl::SetPumpPolicy(PumpPolicy pump_policy) {
|
| + base::AutoLock lock(lock_);
|
| + if (pump_policy == PumpPolicy::AUTO && pump_policy_ != PumpPolicy::AUTO) {
|
| + PumpQueueLocked();
|
| + }
|
| + pump_policy_ = pump_policy;
|
| +}
|
| +
|
| +void TaskQueueImpl::PumpQueueLocked() {
|
| + lock_.AssertAcquired();
|
| + if (!task_queue_manager_)
|
| + return;
|
| +
|
| + LazyNow lazy_now(task_queue_manager_);
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
|
| +
|
| + bool was_empty = work_queue_.empty();
|
| + while (!incoming_queue_.empty()) {
|
| + // TODO(alexclarke): consider std::move() when allowed.
|
| + work_queue_.push(incoming_queue_.front());
|
| + incoming_queue_.pop();
|
| + }
|
| + // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no longer
|
| + // needs to consider this queue for reloading.
|
| + task_queue_manager_->UnregisterAsUpdatableTaskQueue(this);
|
| + if (!work_queue_.empty()) {
|
| + if (was_empty)
|
| + task_queue_manager_->selector_.GetTaskQueueSets()->OnPushQueue(this);
|
| + task_queue_manager_->MaybePostDoWorkOnMainRunner();
|
| + }
|
| +}
|
| +
|
| +void TaskQueueImpl::PumpQueue() {
|
| + base::AutoLock lock(lock_);
|
| + PumpQueueLocked();
|
| +}
|
| +
|
| +const char* TaskQueueImpl::GetName() const {
|
| + return name_;
|
| +}
|
| +
|
| +bool TaskQueueImpl::GetWorkQueueFrontTaskEnqueueOrder(
|
| + int* enqueue_order) const {
|
| + if (work_queue_.empty())
|
| + return false;
|
| + *enqueue_order = work_queue_.front().enqueue_order();
|
| + return true;
|
| +}
|
| +
|
| +void TaskQueueImpl::PushTaskOntoWorkQueueForTest(const Task& task) {
|
| + work_queue_.push(task);
|
| +}
|
| +
|
| +void TaskQueueImpl::PopTaskFromWorkQueueForTest() {
|
| + work_queue_.pop();
|
| +}
|
| +
|
| +void TaskQueueImpl::SetQueuePriority(QueuePriority priority) {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + if (!task_queue_manager_)
|
| + return;
|
| +
|
| + task_queue_manager_->selector_.SetQueuePriority(this, priority);
|
| +}
|
| +
|
| +// static
|
| +const char* TaskQueueImpl::PumpPolicyToString(
|
| + TaskQueue::PumpPolicy pump_policy) {
|
| + switch (pump_policy) {
|
| + case TaskQueue::PumpPolicy::AUTO:
|
| + return "auto";
|
| + case TaskQueue::PumpPolicy::AFTER_WAKEUP:
|
| + return "after_wakeup";
|
| + case TaskQueue::PumpPolicy::MANUAL:
|
| + return "manual";
|
| + default:
|
| + NOTREACHED();
|
| + return nullptr;
|
| + }
|
| +}
|
| +
|
| +// static
|
| +const char* TaskQueueImpl::WakeupPolicyToString(
|
| + TaskQueue::WakeupPolicy wakeup_policy) {
|
| + switch (wakeup_policy) {
|
| + case TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES:
|
| + return "can_wake_other_queues";
|
| + case TaskQueue::WakeupPolicy::DONT_WAKE_OTHER_QUEUES:
|
| + return "dont_wake_other_queues";
|
| + default:
|
| + NOTREACHED();
|
| + return nullptr;
|
| + }
|
| +}
|
| +
|
| +// static
|
| +const char* TaskQueueImpl::PriorityToString(QueuePriority priority) {
|
| + switch (priority) {
|
| + case CONTROL_PRIORITY:
|
| + return "control";
|
| + case HIGH_PRIORITY:
|
| + return "high";
|
| + case NORMAL_PRIORITY:
|
| + return "normal";
|
| + case BEST_EFFORT_PRIORITY:
|
| + return "best_effort";
|
| + case DISABLED_PRIORITY:
|
| + return "disabled";
|
| + default:
|
| + NOTREACHED();
|
| + return nullptr;
|
| + }
|
| +}
|
| +
|
| +void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const {
|
| + base::AutoLock lock(lock_);
|
| + state->BeginDictionary();
|
| + state->SetString("name", GetName());
|
| + state->SetString("pump_policy", PumpPolicyToString(pump_policy_));
|
| + state->SetString("wakeup_policy", WakeupPolicyToString(wakeup_policy_));
|
| + bool verbose_tracing_enabled = false;
|
| + TRACE_EVENT_CATEGORY_GROUP_ENABLED(
|
| + disabled_by_default_verbose_tracing_category_, &verbose_tracing_enabled);
|
| + state->SetInteger("incoming_queue_size", incoming_queue_.size());
|
| + state->SetInteger("work_queue_size", work_queue_.size());
|
| + state->SetInteger("delayed_task_queue_size", delayed_task_queue_.size());
|
| + if (verbose_tracing_enabled) {
|
| + state->BeginArray("incoming_queue");
|
| + QueueAsValueInto(incoming_queue_, state);
|
| + state->EndArray();
|
| + state->BeginArray("work_queue");
|
| + QueueAsValueInto(work_queue_, state);
|
| + state->EndArray();
|
| + state->BeginArray("delayed_task_queue");
|
| + QueueAsValueInto(delayed_task_queue_, state);
|
| + state->EndArray();
|
| + }
|
| + state->SetString("priority",
|
| + PriorityToString(static_cast<QueuePriority>(set_index_)));
|
| + state->EndDictionary();
|
| +}
|
| +
|
| +void TaskQueueImpl::AddTaskObserver(
|
| + base::MessageLoop::TaskObserver* task_observer) {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + task_observers_.AddObserver(task_observer);
|
| +}
|
| +
|
| +void TaskQueueImpl::RemoveTaskObserver(
|
| + base::MessageLoop::TaskObserver* task_observer) {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + task_observers_.RemoveObserver(task_observer);
|
| +}
|
| +
|
| +void TaskQueueImpl::NotifyWillProcessTask(
|
| + const base::PendingTask& pending_task) {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + DCHECK(should_notify_observers_);
|
| + FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
|
| + WillProcessTask(pending_task));
|
| +}
|
| +
|
| +void TaskQueueImpl::NotifyDidProcessTask(
|
| + const base::PendingTask& pending_task) {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + DCHECK(should_notify_observers_);
|
| + FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
|
| + DidProcessTask(pending_task));
|
| +}
|
| +
|
| +// static
|
| +void TaskQueueImpl::QueueAsValueInto(const std::queue<Task>& queue,
|
| + base::trace_event::TracedValue* state) {
|
| + std::queue<Task> queue_copy(queue);
|
| + while (!queue_copy.empty()) {
|
| + TaskAsValueInto(queue_copy.front(), state);
|
| + queue_copy.pop();
|
| + }
|
| +}
|
| +
|
| +// static
|
| +void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue,
|
| + base::trace_event::TracedValue* state) {
|
| + std::priority_queue<Task> queue_copy(queue);
|
| + while (!queue_copy.empty()) {
|
| + TaskAsValueInto(queue_copy.top(), state);
|
| + queue_copy.pop();
|
| + }
|
| +}
|
| +
|
| +// static
|
| +void TaskQueueImpl::TaskAsValueInto(const Task& task,
|
| + base::trace_event::TracedValue* state) {
|
| + state->BeginDictionary();
|
| + state->SetString("posted_from", task.posted_from.ToString());
|
| + state->SetInteger("enqueue_order", task.enqueue_order());
|
| + state->SetInteger("sequence_num", task.sequence_num);
|
| + state->SetBoolean("nestable", task.nestable);
|
| + state->SetBoolean("is_high_res", task.is_high_res);
|
| + state->SetDouble(
|
| + "delayed_run_time",
|
| + (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L);
|
| + state->EndDictionary();
|
| +}
|
| +
|
| +} // namespace internal
|
| +} // namespace scheduler
|
|
|