Chromium Code Reviews| Index: third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
| diff --git a/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc b/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
| index 02f5cef63fd6d4e0bf78028c66cfaef0c708a1f0..7b3d65c2379626f767a0e4c485bdcd0c4db393e9 100644 |
| --- a/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
| +++ b/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
| @@ -12,6 +12,35 @@ |
| namespace blink { |
| namespace scheduler { |
| + |
| +TaskQueue::TaskHandle::TaskHandle() : enqueue_order_(0), sequence_number_(0) {} |
| + |
| +TaskQueue::TaskHandle::TaskHandle(TaskQueue* task_queue, uint64_t enqueue_order) |
| + : enqueue_order_(enqueue_order), |
| +#if DCHECK_IS_ON() |
| + task_queue_(task_queue), |
| +#endif |
| + sequence_number_(0) { |
| + DCHECK_GT(enqueue_order, 0ull); |
| +} |
| + |
| +TaskQueue::TaskHandle::TaskHandle(TaskQueue* task_queue, |
| + base::TimeTicks scheduled_run_time, |
| + int sequence_number) |
| + : enqueue_order_(0), |
| + scheduled_run_time_(scheduled_run_time), |
| +#if DCHECK_IS_ON() |
| + task_queue_(task_queue), |
| +#endif |
| + sequence_number_(sequence_number) { |
| + DCHECK(!scheduled_run_time.is_null()); |
| +} |
| + |
| +TaskQueue::TaskHandle::operator bool() const { |
| + return !scheduled_run_time_.is_null() || |
| + internal::EnqueueOrderGenerator::isValidEnqueueOrder(enqueue_order_); |
| +} |
| + |
| namespace internal { |
| TaskQueueImpl::TaskQueueImpl( |
| @@ -91,12 +120,48 @@ TaskQueueImpl::Task::Task(const tracked_objects::Location& posted_from, |
| sequence_num = sequence_number; |
| } |
| +bool TaskQueueImpl::Task::DelayedRunTimeComparator::operator()( |
| + const Task& a, |
| + const Task& b) const { |
| + if (a.delayed_run_time < b.delayed_run_time) |
| + return true; |
| + |
| + if (a.delayed_run_time > b.delayed_run_time) |
| + return false; |
| + |
| + // If the times happen to match, then we use the sequence number to decide. |
| + // Compare the difference to support integer roll-over. |
| + return (a.sequence_num - b.sequence_num) < 0; |
| +} |
| + |
| +// static |
| +bool TaskQueueImpl::Task::EnqueueOrderComparatorFn(const Task& a, |
| + const Task& b) { |
| + return a.enqueue_order() < b.enqueue_order(); |
| +} |
| + |
| +// static |
| +bool TaskQueueImpl::Task::DelayedRunTimeComparatorFn(const Task& a, |
| + const Task& b) { |
| + if (a.delayed_run_time < b.delayed_run_time) |
| + return true; |
| + |
| + if (a.delayed_run_time > b.delayed_run_time) |
| + return false; |
| + |
| + // If the times happen to match, then we use the sequence number to decide. |
| + // Compare the difference to support integer roll-over. |
| + return (a.sequence_num - b.sequence_num) < 0; |
| +} |
| + |
| TaskQueueImpl::AnyThread::AnyThread(TaskQueueManager* task_queue_manager, |
| PumpPolicy pump_policy, |
| TimeDomain* time_domain) |
| : task_queue_manager(task_queue_manager), |
| pump_policy(pump_policy), |
| - time_domain(time_domain) {} |
| + time_domain(time_domain), |
| + immediate_incoming_queue(&TaskQueueImpl::Task::EnqueueOrderComparatorFn) { |
| +} |
| TaskQueueImpl::AnyThread::~AnyThread() {} |
| @@ -108,8 +173,14 @@ TaskQueueImpl::MainThreadOnly::MainThreadOnly( |
| : task_queue_manager(task_queue_manager), |
| pump_policy(pump_policy), |
| time_domain(time_domain), |
| - delayed_work_queue(new WorkQueue(task_queue, "delayed")), |
| - immediate_work_queue(new WorkQueue(task_queue, "immediate")), |
| + delayed_work_queue( |
| + new WorkQueue(task_queue, |
| + "delayed", |
| + &TaskQueueImpl::Task::DelayedRunTimeComparatorFn)), |
| + immediate_work_queue( |
| + new WorkQueue(task_queue, |
| + "immediate", |
| + &TaskQueueImpl::Task::EnqueueOrderComparatorFn)), |
| set_index(0), |
| is_enabled(true), |
| blame_context(nullptr) {} |
| @@ -128,8 +199,8 @@ void TaskQueueImpl::UnregisterTaskQueue() { |
| any_thread().task_queue_manager = nullptr; |
| main_thread_only().task_queue_manager = nullptr; |
| - main_thread_only().delayed_incoming_queue = std::priority_queue<Task>(); |
| - any_thread().immediate_incoming_queue = std::queue<Task>(); |
| + main_thread_only().delayed_incoming_queue.clear(); |
| + any_thread().immediate_incoming_queue.clear(); |
| main_thread_only().immediate_work_queue.reset(); |
| main_thread_only().delayed_work_queue.reset(); |
| } |
| @@ -139,6 +210,132 @@ bool TaskQueueImpl::RunsTasksOnCurrentThread() const { |
| return base::PlatformThread::CurrentId() == thread_id_; |
| } |
| +TaskQueue::TaskHandle TaskQueueImpl::PostCancellableDelayedTask( |
| + const tracked_objects::Location& from_here, |
| + const base::Closure& task, |
| + base::TimeDelta delay) { |
| + if (!main_thread_only().task_queue_manager) |
| + return TaskHandle(); |
| + |
| + EnqueueOrder sequence_number = |
| + main_thread_only().task_queue_manager->GetNextSequenceNumber(); |
| + |
| + if (delay.is_zero()) { |
| + base::AutoLock lock(any_thread_lock_); |
| + PushOntoImmediateIncomingQueueLocked( |
| + Task(from_here, task, base::TimeTicks(), sequence_number, true, |
| + sequence_number)); |
| + |
| + return TaskHandle(this, sequence_number); |
| + } else { |
| + DCHECK_GT(delay, base::TimeDelta()); |
| + base::TimeTicks time_domain_now = main_thread_only().time_domain->Now(); |
| + base::TimeTicks time_domain_delayed_run_time = time_domain_now + delay; |
| + PushOntoDelayedIncomingQueueFromMainThread( |
| + Task(from_here, task, time_domain_delayed_run_time, sequence_number, |
| + true), |
| + time_domain_now); |
| + |
| + return TaskHandle(this, time_domain_delayed_run_time, sequence_number); |
| + } |
| +} |
| + |
| +bool TaskQueueImpl::CancelTask(const TaskQueue::TaskHandle& handle) { |
| + if (!handle) |
| + return false; |
| + |
| + // If the TaskQueueManager has gone away, pretend we have canceled the task |
| + // because this simplifies logic in TimerBase::stop. |
| + if (!main_thread_only().task_queue_manager) |
|
Sami
2016/08/18 15:53:43
We don't seem to need TQM below anymore, so maybe
alex clarke (OOO till 29th)
2016/08/18 16:51:41
Right but we do need the immediate_work_queue & de
|
| + return true; |
| + |
| +#if DCHECK_IS_ON() |
| + DCHECK_EQ(handle.task_queue_, this); |
| +#endif |
| + |
| + if (handle.scheduled_run_time_.is_null()) { |
| + // It's an immediate task. |
| + Task fake_task(FROM_HERE, base::Closure(), handle.scheduled_run_time_, |
|
Sami
2016/08/18 15:53:43
nit: not sure if the compiler is smart enough to e
alex clarke (OOO till 29th)
2016/08/18 16:51:41
Done.
|
| + handle.sequence_number_, false, handle.enqueue_order_); |
| + if (main_thread_only().immediate_work_queue->CancelTask(fake_task)) |
| + return true; |
| + |
| + base::AutoLock lock(any_thread_lock_); |
| + return any_thread().immediate_incoming_queue.erase(fake_task) > 0; |
| + } else { |
| + // It's a delayed task. |
| + DCHECK_EQ(0ull, handle.enqueue_order_); |
| + Task fake_task(FROM_HERE, base::Closure(), handle.scheduled_run_time_, |
|
Sami
2016/08/18 15:53:43
Readability suggestion -- feel free to ignore: Sho
alex clarke (OOO till 29th)
2016/08/18 16:51:41
Done.
|
| + handle.sequence_number_, false); |
| + DelayedRunTimeQueue::iterator it = |
| + main_thread_only().delayed_incoming_queue.find(fake_task); |
| + if (it != main_thread_only().delayed_incoming_queue.end()) { |
| + // It's safe to remove the wakeup from the TimeDomain only if this task's |
| + // scheduled run time was unique within the queue. |
| + bool can_cancel_timedomain_wakeup = true; |
| + if (it != main_thread_only().delayed_incoming_queue.begin()) { |
| + DelayedRunTimeQueue::iterator before = it; |
| + before--; |
| + if (before->delayed_run_time == handle.scheduled_run_time_) |
| + can_cancel_timedomain_wakeup = false; |
| + } |
| + if (can_cancel_timedomain_wakeup) { |
| + DelayedRunTimeQueue::iterator after = it; |
| + after++; |
| + if (after != main_thread_only().delayed_incoming_queue.end() && |
| + after->delayed_run_time == handle.scheduled_run_time_) { |
| + can_cancel_timedomain_wakeup = false; |
| + } |
| + } |
| + if (can_cancel_timedomain_wakeup) { |
| + main_thread_only().time_domain->CancelDelayedWork( |
| + this, handle.scheduled_run_time_); |
| + } |
| + |
| + main_thread_only().delayed_incoming_queue.erase(it); |
| + return true; |
| + } |
| + |
| + return main_thread_only().delayed_work_queue->CancelTask(fake_task); |
| + } |
| +} |
| + |
| +bool TaskQueueImpl::IsTaskPending(const TaskQueue::TaskHandle& handle) const { |
| + if (!handle) |
| + return false; |
| + |
| + // If the TaskQueueManager has gone away the task got cancelled. |
| + if (!main_thread_only().task_queue_manager) |
| + return false; |
| + |
| +#if DCHECK_IS_ON() |
| + DCHECK_EQ(handle.task_queue_, this); |
| +#endif |
| + |
| + if (handle.scheduled_run_time_.is_null()) { |
| + // It's an immediate task. |
| + Task fake_task(FROM_HERE, base::Closure(), handle.scheduled_run_time_, |
| + handle.sequence_number_, false, handle.enqueue_order_); |
| + if (main_thread_only().immediate_work_queue->IsTaskPending(fake_task)) |
| + return true; |
| + |
| + base::AutoLock lock(any_thread_lock_); |
| + return any_thread().immediate_incoming_queue.find(fake_task) != |
| + any_thread().immediate_incoming_queue.end(); |
| + } else { |
| + // It's a delayed task. |
| + DCHECK_EQ(0ull, handle.enqueue_order_); |
| + Task fake_task(FROM_HERE, base::Closure(), handle.scheduled_run_time_, |
| + handle.sequence_number_, false); |
| + DelayedRunTimeQueue::iterator it = |
| + main_thread_only().delayed_incoming_queue.find(fake_task); |
| + if (it != main_thread_only().delayed_incoming_queue.end()) |
| + return true; |
| + |
| + return main_thread_only().delayed_work_queue->IsTaskPending(fake_task); |
| + } |
| +} |
| + |
| bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay) { |
| @@ -223,7 +420,7 @@ void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread( |
| // Schedule a later call to MoveReadyDelayedTasksToDelayedWorkQueue. |
| base::TimeTicks delayed_run_time = pending_task.delayed_run_time; |
| - main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); |
| + main_thread_only().delayed_incoming_queue.insert(std::move(pending_task)); |
| main_thread_only().time_domain->ScheduleDelayedWork(this, delayed_run_time, |
| now); |
| TraceQueueSize(false); |
| @@ -249,14 +446,14 @@ void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task pending_task) { |
| any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE); |
| } |
| any_thread().task_queue_manager->DidQueueTask(pending_task); |
| - any_thread().immediate_incoming_queue.push(std::move(pending_task)); |
| + any_thread().immediate_incoming_queue.insert(std::move(pending_task)); |
| TraceQueueSize(true); |
| } |
| void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| base::TimeTicks delayed_run_time = pending_task.delayed_run_time; |
| - main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); |
| + main_thread_only().delayed_incoming_queue.insert(std::move(pending_task)); |
| main_thread_only().time_domain->ScheduleDelayedWork( |
| this, delayed_run_time, main_thread_only().time_domain->Now()); |
| } |
| @@ -311,7 +508,7 @@ bool TaskQueueImpl::NeedsPumping() const { |
| if (main_thread_only().delayed_incoming_queue.empty()) |
| return false; |
| - return main_thread_only().delayed_incoming_queue.top().delayed_run_time <= |
| + return main_thread_only().delayed_incoming_queue.begin()->delayed_run_time <= |
| main_thread_only().time_domain->CreateLazyNow().Now(); |
| } |
| @@ -326,7 +523,7 @@ bool TaskQueueImpl::TaskIsOlderThanQueuedImmediateTasksLocked( |
| // Return false if task is newer than the oldest immediate task. |
| if (!any_thread().immediate_incoming_queue.empty() && |
| task->enqueue_order() > |
| - any_thread().immediate_incoming_queue.front().enqueue_order()) { |
| + any_thread().immediate_incoming_queue.begin()->enqueue_order()) { |
| return false; |
| } |
| return true; |
| @@ -374,17 +571,17 @@ bool TaskQueueImpl::ShouldAutoPumpDelayedQueue(bool should_trigger_wakeup, |
| void TaskQueueImpl::MoveReadyDelayedTasksToDelayedWorkQueue(LazyNow* lazy_now) { |
| // Enqueue all delayed tasks that should be running now. |
| - while (!main_thread_only().delayed_incoming_queue.empty() && |
| - main_thread_only().delayed_incoming_queue.top().delayed_run_time <= |
| - lazy_now->Now()) { |
| - // Note: the const_cast is needed because there is no direct way to move |
| - // elements out of a priority queue. The queue must not be modified between |
| - // the top() and the pop(). |
| - main_thread_only().delayed_work_queue->PushAndSetEnqueueOrder( |
| - std::move( |
| - const_cast<Task&>(main_thread_only().delayed_incoming_queue.top())), |
| + while (!main_thread_only().delayed_incoming_queue.empty()) { |
| + DelayedRunTimeQueue::iterator next_task = |
| + main_thread_only().delayed_incoming_queue.begin(); |
| + if (next_task->delayed_run_time > lazy_now->Now()) |
| + break; |
| + // TODO(alexclarke): Use extract() when C++17 is allowed. |
| + Task& task = const_cast<Task&>(*next_task); |
| + task.set_enqueue_order( |
| main_thread_only().task_queue_manager->GetNextSequenceNumber()); |
| - main_thread_only().delayed_incoming_queue.pop(); |
| + main_thread_only().delayed_work_queue->Push(std::move(task)); |
| + main_thread_only().delayed_incoming_queue.erase(next_task); |
| } |
| } |
| @@ -467,9 +664,11 @@ void TaskQueueImpl::PumpQueueLocked(LazyNow* lazy_now, bool may_post_dowork) { |
| MoveReadyDelayedTasksToDelayedWorkQueue(lazy_now); |
| while (!any_thread().immediate_incoming_queue.empty()) { |
| + ComparatorQueue::iterator it = |
| + any_thread().immediate_incoming_queue.begin(); |
| main_thread_only().immediate_work_queue->Push( |
| - std::move(any_thread().immediate_incoming_queue.front())); |
| - any_thread().immediate_incoming_queue.pop(); |
| + std::move(const_cast<Task&>(*it))); |
| + any_thread().immediate_incoming_queue.erase(it); |
| } |
| // |immediate_incoming_queue| is now empty so TimeDomain::UpdateQueues no |
| @@ -576,7 +775,7 @@ void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const { |
| main_thread_only().delayed_work_queue->Size()); |
| if (!main_thread_only().delayed_incoming_queue.empty()) { |
| base::TimeDelta delay_to_next_task = |
| - (main_thread_only().delayed_incoming_queue.top().delayed_run_time - |
| + (main_thread_only().delayed_incoming_queue.begin()->delayed_run_time - |
| main_thread_only().time_domain->CreateLazyNow().Now()); |
| state->SetDouble("delay_to_next_task_ms", |
| delay_to_next_task.InMillisecondsF()); |
| @@ -662,34 +861,19 @@ void TaskQueueImpl::SetBlameContext( |
| } |
| // static |
| -void TaskQueueImpl::QueueAsValueInto(const std::queue<Task>& queue, |
| +void TaskQueueImpl::QueueAsValueInto(const ComparatorQueue& queue, |
| base::trace_event::TracedValue* state) { |
| - // Remove const to search |queue| in the destructive manner. Restore the |
| - // content from |visited| later. |
| - std::queue<Task>* mutable_queue = const_cast<std::queue<Task>*>(&queue); |
| - std::queue<Task> visited; |
| - while (!mutable_queue->empty()) { |
| - TaskAsValueInto(mutable_queue->front(), state); |
| - visited.push(std::move(mutable_queue->front())); |
| - mutable_queue->pop(); |
| + for (const Task& task : queue) { |
| + TaskAsValueInto(task, state); |
| } |
| - *mutable_queue = std::move(visited); |
| } |
| // static |
| -void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue, |
| +void TaskQueueImpl::QueueAsValueInto(const DelayedRunTimeQueue& queue, |
| base::trace_event::TracedValue* state) { |
| - // Remove const to search |queue| in the destructive manner. Restore the |
| - // content from |visited| later. |
| - std::priority_queue<Task>* mutable_queue = |
| - const_cast<std::priority_queue<Task>*>(&queue); |
| - std::priority_queue<Task> visited; |
| - while (!mutable_queue->empty()) { |
| - TaskAsValueInto(mutable_queue->top(), state); |
| - visited.push(std::move(const_cast<Task&>(mutable_queue->top()))); |
| - mutable_queue->pop(); |
| + for (const Task& task : queue) { |
| + TaskAsValueInto(task, state); |
| } |
| - *mutable_queue = std::move(visited); |
| } |
| // static |