| Index: content/renderer/scheduler/task_queue_manager.cc
|
| diff --git a/content/renderer/scheduler/task_queue_manager.cc b/content/renderer/scheduler/task_queue_manager.cc
|
| index 565aece83c5d1fb909da95aa011abe296315561d..7a9c124c36f7b3068fa9b43c28a1a97896585ef5 100644
|
| --- a/content/renderer/scheduler/task_queue_manager.cc
|
| +++ b/content/renderer/scheduler/task_queue_manager.cc
|
| @@ -5,6 +5,7 @@
|
| #include "content/renderer/scheduler/task_queue_manager.h"
|
|
|
| #include <queue>
|
| +#include <set>
|
|
|
| #include "base/bind.h"
|
| #include "base/trace_event/trace_event.h"
|
| @@ -20,6 +21,29 @@ const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
|
| namespace content {
|
| namespace internal {
|
|
|
| +// Now() is somewhat expensive so it makes sense not to call Now() unless we
|
| +// really need to.
|
| +class LazyNow {
|
| + public:
|
| + explicit LazyNow(base::TimeTicks now)
|
| + : task_queue_manager_(nullptr), now_(now) {
|
| + DCHECK(!now.is_null());
|
| + }
|
| +
|
| + explicit LazyNow(TaskQueueManager* task_queue_manager)
|
| + : task_queue_manager_(task_queue_manager) {}
|
| +
|
| + base::TimeTicks Now() {
|
| + if (now_.is_null())
|
| + now_ = task_queue_manager_->Now();
|
| + return now_;
|
| + }
|
| +
|
| + private:
|
| + TaskQueueManager* task_queue_manager_; // NOT OWNED
|
| + base::TimeTicks now_;
|
| +};
|
| +
|
| class TaskQueue : public base::SingleThreadTaskRunner {
|
| public:
|
| TaskQueue(TaskQueueManager* task_queue_manager);
|
| @@ -46,7 +70,7 @@ class TaskQueue : public base::SingleThreadTaskRunner {
|
| bool NextPendingDelayedTaskRunTime(
|
| base::TimeTicks* next_pending_delayed_task);
|
|
|
| - bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task,
|
| + bool UpdateWorkQueue(LazyNow* lazy_now,
|
| const base::PendingTask* previous_task);
|
| base::PendingTask TakeTaskFromWorkQueue();
|
|
|
| @@ -71,37 +95,47 @@ class TaskQueue : public base::SingleThreadTaskRunner {
|
| base::TimeDelta delay,
|
| TaskType task_type);
|
|
|
| - // Adds a task at the end of the incoming task queue and schedules a call to
|
| - // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
|
| - // pumping is enabled. Can be called on an arbitrary thread.
|
| - void EnqueueTask(const base::PendingTask& pending_task);
|
| + // Delayed task posted to the underlying run loop, which locks |lock_| and
|
| + // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks
|
| + // that need to be run now.
|
| + void MoveReadyDelayedTasksToIncomingQueue();
|
| +
|
| + // Enqueues any delayed tasks which should be run now on the incoming_queue_
|
| + // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled.
|
| + // Must be called with |lock_| locked.
|
| + void MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now);
|
| +
|
| + // Posts MoveReadyDelayedTasksToIncomingQueue if there isn't already a task
|
| + // posted on the underlying runloop for the next task's scheduled run time.
|
| + void ScheduleDelayedWorkLocked(LazyNow* lazy_now);
|
|
|
| void PumpQueueLocked();
|
| bool TaskIsOlderThanQueuedTasks(const base::PendingTask* task);
|
| bool ShouldAutoPumpQueueLocked(const base::PendingTask* previous_task);
|
| void EnqueueTaskLocked(const base::PendingTask& pending_task);
|
| - bool NextPendingDelayedTaskRunTimeLocked(
|
| - base::TimeTicks* next_pending_delayed_task);
|
|
|
| void TraceQueueSize(bool is_locked) const;
|
| static const char* PumpPolicyToString(
|
| TaskQueueManager::PumpPolicy pump_policy);
|
| static void QueueAsValueInto(const base::TaskQueue& queue,
|
| base::trace_event::TracedValue* state);
|
| + static void QueueAsValueInto(const base::DelayedTaskQueue& queue,
|
| + base::trace_event::TracedValue* state);
|
| static void TaskAsValueInto(const base::PendingTask& task,
|
| base::trace_event::TracedValue* state);
|
|
|
| - // This lock protects all members except the work queue.
|
| + // This lock protects all members except the work queue and the
|
| + // main_thread_checker_.
|
| mutable base::Lock lock_;
|
| base::PlatformThreadId thread_id_;
|
| TaskQueueManager* task_queue_manager_;
|
| base::TaskQueue incoming_queue_;
|
| TaskQueueManager::PumpPolicy pump_policy_;
|
| const char* name_;
|
| - std::priority_queue<base::TimeTicks,
|
| - std::vector<base::TimeTicks>,
|
| - std::greater<base::TimeTicks>> delayed_task_run_times_;
|
| + base::DelayedTaskQueue delayed_task_queue_;
|
| + std::set<base::TimeTicks> in_flight_kick_delayed_tasks_;
|
|
|
| + base::ThreadChecker main_thread_checker_;
|
| base::TaskQueue work_queue_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(TaskQueue);
|
| @@ -120,6 +154,8 @@ TaskQueue::~TaskQueue() {
|
| void TaskQueue::WillDeleteTaskQueueManager() {
|
| base::AutoLock lock(lock_);
|
| task_queue_manager_ = nullptr;
|
| + // TODO(scheduler-dev): Should we also clear the other queues here too?
|
| + delayed_task_queue_ = base::DelayedTaskQueue();
|
| }
|
|
|
| bool TaskQueue::RunsTasksOnCurrentThread() const {
|
| @@ -140,15 +176,66 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
|
| task_queue_manager_->DidQueueTask(&pending_task);
|
|
|
| if (delay > base::TimeDelta()) {
|
| - pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
|
| - delayed_task_run_times_.push(pending_task.delayed_run_time);
|
| - return task_queue_manager_->PostDelayedTask(
|
| - FROM_HERE, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
|
| + base::TimeTicks now = task_queue_manager_->Now();
|
| + pending_task.delayed_run_time = now + delay;
|
| + delayed_task_queue_.push(pending_task);
|
| + TraceQueueSize(true);
|
| + // If we changed the topmost task, then it is time to reschedule.
|
| + if (delayed_task_queue_.top().task.Equals(pending_task.task)) {
|
| + LazyNow lazy_now(now);
|
| + ScheduleDelayedWorkLocked(&lazy_now);
|
| + }
|
| + return true;
|
| }
|
| EnqueueTaskLocked(pending_task);
|
| return true;
|
| }
|
|
|
| +void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + base::AutoLock lock(lock_);
|
| + if (!task_queue_manager_)
|
| + return;
|
| +
|
| + LazyNow lazy_now(task_queue_manager_);
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
|
| +}
|
| +
|
| +void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) {
|
| + lock_.AssertAcquired();
|
| + // Enqueue all delayed tasks that should be running now.
|
| + while (!delayed_task_queue_.empty() &&
|
| + delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) {
|
| + in_flight_kick_delayed_tasks_.erase(
|
| + delayed_task_queue_.top().delayed_run_time);
|
| + EnqueueTaskLocked(delayed_task_queue_.top());
|
| + delayed_task_queue_.pop();
|
| + }
|
| + TraceQueueSize(true);
|
| + ScheduleDelayedWorkLocked(lazy_now);
|
| +}
|
| +
|
| +void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
|
| + lock_.AssertAcquired();
|
| + // Any remaining tasks are in the future, so queue a task to kick them.
|
| + if (!delayed_task_queue_.empty()) {
|
| + base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time;
|
| + DCHECK_GT(next_run_time, lazy_now->Now());
|
| + // Make sure we don't have more than one
|
| + // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
|
| + // run time (note it's fine to have multiple ones in flight for distinct
|
| + // run times).
|
| + if (in_flight_kick_delayed_tasks_.find(next_run_time) ==
|
| + in_flight_kick_delayed_tasks_.end()) {
|
| + in_flight_kick_delayed_tasks_.insert(next_run_time);
|
| + base::TimeDelta delay = next_run_time - lazy_now->Now();
|
| + task_queue_manager_->PostDelayedTask(
|
| + FROM_HERE,
|
| + Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay);
|
| + }
|
| + }
|
| +}
|
| +
|
| bool TaskQueue::IsQueueEmpty() const {
|
| if (!work_queue_.empty())
|
| return false;
|
| @@ -197,31 +284,23 @@ bool TaskQueue::ShouldAutoPumpQueueLocked(
|
| bool TaskQueue::NextPendingDelayedTaskRunTime(
|
| base::TimeTicks* next_pending_delayed_task) {
|
| base::AutoLock lock(lock_);
|
| - return NextPendingDelayedTaskRunTimeLocked(next_pending_delayed_task);
|
| -}
|
| -
|
| -bool TaskQueue::NextPendingDelayedTaskRunTimeLocked(
|
| - base::TimeTicks* next_pending_delayed_task) {
|
| - lock_.AssertAcquired();
|
| - if (!delayed_task_run_times_.empty()) {
|
| - *next_pending_delayed_task =
|
| - std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
|
| - return true;
|
| - }
|
| - return false;
|
| + if (delayed_task_queue_.empty())
|
| + return false;
|
| + *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time;
|
| + return true;
|
| }
|
|
|
| -bool TaskQueue::UpdateWorkQueue(
|
| - base::TimeTicks* next_pending_delayed_task,
|
| - const base::PendingTask* previous_task) {
|
| +bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now,
|
| + const base::PendingTask* previous_task) {
|
| if (!work_queue_.empty())
|
| return true;
|
|
|
| {
|
| base::AutoLock lock(lock_);
|
| - NextPendingDelayedTaskRunTimeLocked(next_pending_delayed_task);
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
|
| if (!ShouldAutoPumpQueueLocked(previous_task))
|
| return false;
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
|
| work_queue_.Swap(&incoming_queue_);
|
| TraceQueueSize(true);
|
| return true;
|
| @@ -245,17 +324,13 @@ void TaskQueue::TraceQueueSize(bool is_locked) const {
|
| lock_.Acquire();
|
| else
|
| lock_.AssertAcquired();
|
| - TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
|
| - incoming_queue_.size() + work_queue_.size());
|
| + TRACE_COUNTER1(
|
| + TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
|
| + incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size());
|
| if (!is_locked)
|
| lock_.Release();
|
| }
|
|
|
| -void TaskQueue::EnqueueTask(const base::PendingTask& pending_task) {
|
| - base::AutoLock lock(lock_);
|
| - EnqueueTaskLocked(pending_task);
|
| -}
|
| -
|
| void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
|
| lock_.AssertAcquired();
|
| if (!task_queue_manager_)
|
| @@ -266,11 +341,6 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
|
| incoming_queue_.push(pending_task);
|
|
|
| if (!pending_task.delayed_run_time.is_null()) {
|
| - // Update the time of the next pending delayed task.
|
| - while (!delayed_task_run_times_.empty() &&
|
| - delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
|
| - delayed_task_run_times_.pop();
|
| - }
|
| // Clear the delayed run time because we've already applied the delay
|
| // before getting here.
|
| incoming_queue_.back().delayed_run_time = base::TimeTicks();
|
| @@ -289,6 +359,10 @@ void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
|
|
|
| void TaskQueue::PumpQueueLocked() {
|
| lock_.AssertAcquired();
|
| + if (task_queue_manager_) {
|
| + LazyNow lazy_now(task_queue_manager_);
|
| + MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
|
| + }
|
| while (!incoming_queue_.empty()) {
|
| work_queue_.push(incoming_queue_.front());
|
| incoming_queue_.pop();
|
| @@ -313,6 +387,8 @@ void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const {
|
| state->EndArray();
|
| state->BeginArray("work_queue");
|
| QueueAsValueInto(work_queue_, state);
|
| + state->BeginArray("delayed_task_queue");
|
| + QueueAsValueInto(delayed_task_queue_, state);
|
| state->EndArray();
|
| state->EndDictionary();
|
| }
|
| @@ -344,6 +420,16 @@ void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
|
| }
|
|
|
| // static
|
| +void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue,
|
| + base::trace_event::TracedValue* state) {
|
| + base::DelayedTaskQueue queue_copy(queue);
|
| + while (!queue_copy.empty()) {
|
| + TaskAsValueInto(queue_copy.top(), state);
|
| + queue_copy.pop();
|
| + }
|
| +}
|
| +
|
| +// static
|
| void TaskQueue::TaskAsValueInto(const base::PendingTask& task,
|
| base::trace_event::TracedValue* state) {
|
| state->BeginDictionary();
|
| @@ -416,8 +502,13 @@ base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() {
|
| base::TimeTicks next_pending_delayed_task(
|
| base::TimeTicks::FromInternalValue(kMaxTimeTicks));
|
| for (auto& queue : queues_) {
|
| - found_pending_task |=
|
| - queue->NextPendingDelayedTaskRunTime(&next_pending_delayed_task);
|
| + base::TimeTicks queues_next_pending_delayed_task;
|
| + if (queue->NextPendingDelayedTaskRunTime(
|
| + &queues_next_pending_delayed_task)) {
|
| + found_pending_task = true;
|
| + next_pending_delayed_task =
|
| + std::min(next_pending_delayed_task, queues_next_pending_delayed_task);
|
| + }
|
| }
|
|
|
| if (!found_pending_task)
|
| @@ -442,16 +533,15 @@ void TaskQueueManager::PumpQueue(size_t queue_index) {
|
| }
|
|
|
| bool TaskQueueManager::UpdateWorkQueues(
|
| - base::TimeTicks* next_pending_delayed_task,
|
| const base::PendingTask* previous_task) {
|
| // TODO(skyostil): This is not efficient when the number of queues grows very
|
| // large due to the number of locks taken. Consider optimizing when we get
|
| // there.
|
| DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + internal::LazyNow lazy_now(this);
|
| bool has_work = false;
|
| for (auto& queue : queues_) {
|
| - has_work |= queue->UpdateWorkQueue(next_pending_delayed_task,
|
| - previous_task);
|
| + has_work |= queue->UpdateWorkQueue(&lazy_now, previous_task);
|
| if (!queue->work_queue().empty()) {
|
| // Currently we should not be getting tasks with delayed run times in any
|
| // of the work queues.
|
| @@ -484,22 +574,14 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) {
|
| }
|
| DCHECK(main_thread_checker_.CalledOnValidThread());
|
|
|
| - base::TimeTicks next_pending_delayed_task(
|
| - base::TimeTicks::FromInternalValue(kMaxTimeTicks));
|
| -
|
| - // Pass nullptr to UpdateWorkQueues here to prevent waking up an
|
| + // Pass nullptr to UpdateWorkQueues here to prevent waking up a
|
| // pump-after-wakeup queue.
|
| - if (!UpdateWorkQueues(&next_pending_delayed_task, nullptr))
|
| + if (!UpdateWorkQueues(nullptr))
|
| return;
|
|
|
| base::PendingTask previous_task((tracked_objects::Location()),
|
| (base::Closure()));
|
| for (int i = 0; i < work_batch_size_; i++) {
|
| - // Interrupt the work batch if we should run the next delayed task.
|
| - if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
|
| - Now() >= next_pending_delayed_task)
|
| - return;
|
| -
|
| size_t queue_index;
|
| if (!SelectWorkQueueToService(&queue_index))
|
| return;
|
| @@ -508,7 +590,7 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) {
|
| MaybePostDoWorkOnMainRunner();
|
| ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task);
|
|
|
| - if (!UpdateWorkQueues(&next_pending_delayed_task, &previous_task))
|
| + if (!UpdateWorkQueues(&previous_task))
|
| return;
|
| }
|
| }
|
|
|