Chromium Code Reviews| Index: content/renderer/scheduler/task_queue_manager.cc |
| diff --git a/content/renderer/scheduler/task_queue_manager.cc b/content/renderer/scheduler/task_queue_manager.cc |
| index f167ab62fd2fbd24df677bd72aeb039951c4f0df..84408723ecdc016fc42ebc88baa52baea97580ab 100644 |
| --- a/content/renderer/scheduler/task_queue_manager.cc |
| +++ b/content/renderer/scheduler/task_queue_manager.cc |
| @@ -5,6 +5,7 @@ |
| #include "content/renderer/scheduler/task_queue_manager.h" |
| #include <queue> |
| +#include <set> |
| #include "base/bind.h" |
| #include "base/trace_event/trace_event.h" |
| @@ -20,6 +21,29 @@ const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); |
| namespace content { |
| namespace internal { |
| +// Now() is somewhat expensive so it makes sense not to call Now() unless we |
| +// really need to. |
| +class LazyNow { |
| + public: |
| + explicit LazyNow(base::TimeTicks now) |
| + : task_queue_manager_(nullptr), now_(now) { |
| + DCHECK(!now.is_null()); |
| + } |
| + |
| + explicit LazyNow(TaskQueueManager* task_queue_manager) |
| + : task_queue_manager_(task_queue_manager) {} |
| + |
| + base::TimeTicks Now() { |
| + if (now_.is_null()) |
| + now_ = task_queue_manager_->Now(); |
| + return now_; |
| + } |
| + |
| + private: |
| + TaskQueueManager* task_queue_manager_; // NOT OWNED |
| + base::TimeTicks now_; |
| +}; |
| + |
| class TaskQueue : public base::SingleThreadTaskRunner { |
| public: |
| TaskQueue(TaskQueueManager* task_queue_manager); |
| @@ -43,7 +67,7 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy); |
| void PumpQueue(); |
| - bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task, |
| + bool UpdateWorkQueue(LazyNow* lazy_now, |
| const base::PendingTask* previous_task); |
| base::PendingTask TakeTaskFromWorkQueue(); |
| @@ -68,10 +92,19 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| base::TimeDelta delay, |
| TaskType task_type); |
| - // Adds a task at the end of the incoming task queue and schedules a call to |
| - // TaskQueueManager::DoWork() if the incoming queue was empty and automatic |
| - // pumping is enabled. Can be called on an arbitrary thread. |
| - void EnqueueTask(const base::PendingTask& pending_task); |
| + // Delayed task posted to the underlying run loop, which locks |lock_| and |
| + // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks |
| + // that need to be run now. |
| + void MoveReadyDelayedTasksToIncomingQueue(); |
| + |
| + // Enqueues any delayed tasks which should be run now on the incoming_queue_ |
| + // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled. |
| + // Must be called with |lock_| locked. |
| + void MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now); |
| + |
| + // Posts MoveReadyDelayedTasksToIncomingQueue if there isn't already a task |
| + // posted on the underlying runloop for the next task's scheduled run time. |
| + void ScheduleDelayedWorkLocked(LazyNow* lazy_now); |
| void PumpQueueLocked(); |
| bool TaskIsOlderThanQueuedTasks(const base::PendingTask* task); |
| @@ -83,6 +116,8 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| TaskQueueManager::PumpPolicy pump_policy); |
| static void QueueAsValueInto(const base::TaskQueue& queue, |
| base::trace_event::TracedValue* state); |
| + static void QueueAsValueInto(const base::DelayedTaskQueue& queue, |
| + base::trace_event::TracedValue* state); |
| static void TaskAsValueInto(const base::PendingTask& task, |
| base::trace_event::TracedValue* state); |
| @@ -93,9 +128,9 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| base::TaskQueue incoming_queue_; |
| TaskQueueManager::PumpPolicy pump_policy_; |
| const char* name_; |
| - std::priority_queue<base::TimeTicks, |
| - std::vector<base::TimeTicks>, |
| - std::greater<base::TimeTicks>> delayed_task_run_times_; |
| + base::DelayedTaskQueue delayed_task_queue_; |
| + std::set<base::TimeTicks> in_flight_kick_delayed_tasks_; |
| + base::ThreadChecker main_thread_checker_; |
|
Sami
2015/03/18 11:13:21
nit: main_thread_checker isn't protected by the lo
alex clarke (OOO till 29th)
2015/03/18 12:15:10
Done.
|
| base::TaskQueue work_queue_; |
| @@ -115,6 +150,8 @@ TaskQueue::~TaskQueue() { |
| void TaskQueue::WillDeleteTaskQueueManager() { |
| base::AutoLock lock(lock_); |
| task_queue_manager_ = nullptr; |
| + // TODO(scheduler-dev): Should we also clear the other queues here too? |
| + delayed_task_queue_ = base::DelayedTaskQueue(); |
| } |
| bool TaskQueue::RunsTasksOnCurrentThread() const { |
| @@ -135,15 +172,64 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, |
| task_queue_manager_->DidQueueTask(&pending_task); |
| if (delay > base::TimeDelta()) { |
| - pending_task.delayed_run_time = task_queue_manager_->Now() + delay; |
| - delayed_task_run_times_.push(pending_task.delayed_run_time); |
| - return task_queue_manager_->PostDelayedTask( |
| - FROM_HERE, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay); |
| + base::TimeTicks now = task_queue_manager_->Now(); |
| + pending_task.delayed_run_time = now + delay; |
| + delayed_task_queue_.push(pending_task); |
|
Sami
2015/03/18 11:13:21
nit: call TraceQueueSize() here.
alex clarke (OOO till 29th)
2015/03/18 12:15:10
Done.
|
| + // If we changed the topmost task, then it is time to reschedule. |
| + if (delayed_task_queue_.top().task.Equals(pending_task.task)) { |
| + LazyNow lazy_now(now); |
| + ScheduleDelayedWorkLocked(&lazy_now); |
| + } |
| + return true; |
| } |
| EnqueueTaskLocked(pending_task); |
| return true; |
| } |
| +void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { |
| + DCHECK(main_thread_checker_.CalledOnValidThread()); |
| + if (!task_queue_manager_) |
|
Sami
2015/03/18 11:13:21
This should be checked while holding the lock.
alex clarke (OOO till 29th)
2015/03/18 12:15:10
Done.
|
| + return; |
| + |
| + base::AutoLock lock(lock_); |
| + LazyNow lazy_now(task_queue_manager_); |
| + MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); |
| +} |
| + |
| +void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { |
| + lock_.AssertAcquired(); |
| + // Enqueue all delayed tasks that should be running now. |
| + while (!delayed_task_queue_.empty() && |
| + delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) { |
| + in_flight_kick_delayed_tasks_.erase( |
| + delayed_task_queue_.top().delayed_run_time); |
| + EnqueueTaskLocked(delayed_task_queue_.top()); |
| + delayed_task_queue_.pop(); |
| + } |
|
Sami
2015/03/18 11:13:21
nit: call TraceQueueSize() here.
alex clarke (OOO till 29th)
2015/03/18 12:15:10
Done.
|
| + ScheduleDelayedWorkLocked(lazy_now); |
| +} |
| + |
| +void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { |
| + lock_.AssertAcquired(); |
| + // Any remaining tasks are in the future, so queue a task to kick them. |
| + if (!delayed_task_queue_.empty()) { |
| + base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time; |
| + DCHECK(next_run_time > lazy_now->Now()); |
|
Sami
2015/03/18 11:13:21
nit: DCHECK_GT
alex clarke (OOO till 29th)
2015/03/18 12:15:10
Done.
|
| + // Make sure we don't have more than one |
| + // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled |
| + // run time (note it's fine to have multiple ones in flight for distinct |
| + // run times). |
| + if (in_flight_kick_delayed_tasks_.find(next_run_time) == |
| + in_flight_kick_delayed_tasks_.end()) { |
| + in_flight_kick_delayed_tasks_.insert(next_run_time); |
| + base::TimeDelta delay = next_run_time - lazy_now->Now(); |
| + task_queue_manager_->PostDelayedTask( |
| + FROM_HERE, |
| + Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay); |
| + } |
| + } |
| +} |
| + |
| bool TaskQueue::IsQueueEmpty() const { |
| if (!work_queue_.empty()) |
| return false; |
| @@ -189,20 +275,16 @@ bool TaskQueue::ShouldAutoPumpQueueLocked( |
| return true; |
| } |
| -bool TaskQueue::UpdateWorkQueue( |
| - base::TimeTicks* next_pending_delayed_task, |
| - const base::PendingTask* previous_task) { |
| +bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, |
| + const base::PendingTask* previous_task) { |
| if (!work_queue_.empty()) |
| return true; |
| { |
| base::AutoLock lock(lock_); |
| - if (!delayed_task_run_times_.empty()) { |
| - *next_pending_delayed_task = |
| - std::min(*next_pending_delayed_task, delayed_task_run_times_.top()); |
| - } |
| if (!ShouldAutoPumpQueueLocked(previous_task)) |
| return false; |
| + MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now); |
| work_queue_.Swap(&incoming_queue_); |
| TraceQueueSize(true); |
| return true; |
| @@ -226,17 +308,13 @@ void TaskQueue::TraceQueueSize(bool is_locked) const { |
| lock_.Acquire(); |
| else |
| lock_.AssertAcquired(); |
| - TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_, |
| - incoming_queue_.size() + work_queue_.size()); |
| + TRACE_COUNTER1( |
| + TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_, |
| + incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size()); |
| if (!is_locked) |
| lock_.Release(); |
| } |
| -void TaskQueue::EnqueueTask(const base::PendingTask& pending_task) { |
| - base::AutoLock lock(lock_); |
| - EnqueueTaskLocked(pending_task); |
| -} |
| - |
| void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
| lock_.AssertAcquired(); |
| if (!task_queue_manager_) |
| @@ -247,11 +325,6 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
| incoming_queue_.push(pending_task); |
| if (!pending_task.delayed_run_time.is_null()) { |
| - // Update the time of the next pending delayed task. |
| - while (!delayed_task_run_times_.empty() && |
| - delayed_task_run_times_.top() <= pending_task.delayed_run_time) { |
| - delayed_task_run_times_.pop(); |
| - } |
| // Clear the delayed run time because we've already applied the delay |
| // before getting here. |
| incoming_queue_.back().delayed_run_time = base::TimeTicks(); |
| @@ -270,6 +343,10 @@ void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { |
| void TaskQueue::PumpQueueLocked() { |
| lock_.AssertAcquired(); |
| + if (task_queue_manager_) { |
| + LazyNow lazy_now(task_queue_manager_); |
| + MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); |
| + } |
| while (!incoming_queue_.empty()) { |
| work_queue_.push(incoming_queue_.front()); |
| incoming_queue_.pop(); |
| @@ -294,6 +371,8 @@ void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { |
| state->EndArray(); |
| state->BeginArray("work_queue"); |
| QueueAsValueInto(work_queue_, state); |
| + state->BeginArray("delayed_task_queue"); |
| + QueueAsValueInto(delayed_task_queue_, state); |
| state->EndArray(); |
| state->EndDictionary(); |
| } |
| @@ -325,6 +404,16 @@ void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, |
| } |
| // static |
| +void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, |
| + base::trace_event::TracedValue* state) { |
| + base::DelayedTaskQueue queue_copy(queue); |
| + while (!queue_copy.empty()) { |
| + TaskAsValueInto(queue_copy.top(), state); |
| + queue_copy.pop(); |
| + } |
| +} |
| + |
| +// static |
| void TaskQueue::TaskAsValueInto(const base::PendingTask& task, |
| base::trace_event::TracedValue* state) { |
| state->BeginDictionary(); |
| @@ -405,16 +494,15 @@ void TaskQueueManager::PumpQueue(size_t queue_index) { |
| } |
| bool TaskQueueManager::UpdateWorkQueues( |
| - base::TimeTicks* next_pending_delayed_task, |
| const base::PendingTask* previous_task) { |
| // TODO(skyostil): This is not efficient when the number of queues grows very |
| // large due to the number of locks taken. Consider optimizing when we get |
| // there. |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| + internal::LazyNow lazy_now(this); |
| bool has_work = false; |
| for (auto& queue : queues_) { |
| - has_work |= queue->UpdateWorkQueue(next_pending_delayed_task, |
| - previous_task); |
| + has_work |= queue->UpdateWorkQueue(&lazy_now, previous_task); |
| if (!queue->work_queue().empty()) { |
| // Currently we should not be getting tasks with delayed run times in any |
| // of the work queues. |
| @@ -447,22 +535,14 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) { |
| } |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| - base::TimeTicks next_pending_delayed_task( |
| - base::TimeTicks::FromInternalValue(kMaxTimeTicks)); |
| - |
| - // Pass nullptr to UpdateWorkQueues here to prevent waking up an |
| + // Pass nullptr to UpdateWorkQueues here to prevent waking up a |
| // pump-after-wakeup queue. |
| - if (!UpdateWorkQueues(&next_pending_delayed_task, nullptr)) |
| + if (!UpdateWorkQueues(nullptr)) |
| return; |
| base::PendingTask previous_task((tracked_objects::Location()), |
| (base::Closure())); |
| for (int i = 0; i < work_batch_size_; i++) { |
| - // Interrupt the work batch if we should run the next delayed task. |
| - if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks && |
| - Now() >= next_pending_delayed_task) |
| - return; |
| - |
| size_t queue_index; |
| if (!SelectWorkQueueToService(&queue_index)) |
| return; |
| @@ -471,7 +551,7 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) { |
| MaybePostDoWorkOnMainRunner(); |
| ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task); |
| - if (!UpdateWorkQueues(&next_pending_delayed_task, &previous_task)) |
| + if (!UpdateWorkQueues(&previous_task)) |
| return; |
| } |
| } |