Index: third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
diff --git a/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc b/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
index 7a8da55f6de9817fac85eedcd6f3e781f0885529..1b0980a4f4e00ccf236f4bc2250683d0b744c3b7 100644 |
--- a/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
+++ b/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc |
@@ -12,6 +12,34 @@ |
namespace blink { |
namespace scheduler { |
+ |
+TaskQueue::TaskHandle::TaskHandle() : enqueue_order_(0), sequence_number_(0) {} |
+ |
+TaskQueue::TaskHandle::TaskHandle(TaskQueue* task_queue, uint64_t enqueue_order) |
+ : enqueue_order_(enqueue_order), |
+#if DCHECK_IS_ON() |
+ task_queue_(task_queue), |
+#endif |
+ sequence_number_(0) { |
+ DCHECK_GT(enqueue_order, 0ull); |
+} |
+ |
+TaskQueue::TaskHandle::TaskHandle(TaskQueue* task_queue, |
+ base::TimeTicks scheduled_run_time, |
+ int sequence_number) |
+ : enqueue_order_(0), |
+ scheduled_run_time_(scheduled_run_time), |
+#if DCHECK_IS_ON() |
+ task_queue_(task_queue), |
+#endif |
+ sequence_number_(sequence_number) { |
+ DCHECK(!scheduled_run_time.is_null()); |
+} |
+ |
+TaskQueue::TaskHandle::operator bool() const { |
+ return !scheduled_run_time_.is_null() || |
+ internal::EnqueueOrderGenerator::IsValidEnqueueOrder(enqueue_order_); |
+} |
namespace internal { |
@@ -88,9 +116,62 @@ |
sequence_num = sequence_number; |
} |
+// static |
+TaskQueueImpl::Task TaskQueueImpl::Task::CreateFakeTaskFromHandle( |
+ const TaskHandle& handle) { |
+ if (handle.scheduled_run_time_.is_null()) { |
+ // It's an immediate task. |
+ return Task(tracked_objects::Location(), base::Closure(), |
+ handle.scheduled_run_time_, handle.sequence_number_, false, |
+ handle.enqueue_order_); |
+ } else { |
+ // It's a delayed task. |
+ DCHECK_EQ(0ull, handle.enqueue_order_); |
+ return Task(tracked_objects::Location(), base::Closure(), |
+ handle.scheduled_run_time_, handle.sequence_number_, false); |
+ } |
+} |
+ |
+bool TaskQueueImpl::Task::DelayedRunTimeComparator::operator()( |
+ const Task& a, |
+ const Task& b) const { |
+ if (a.delayed_run_time < b.delayed_run_time) |
+ return true; |
+ |
+ if (a.delayed_run_time > b.delayed_run_time) |
+ return false; |
+ |
+ // If the times happen to match, then we use the sequence number to decide. |
+ // Compare the difference to support integer roll-over. |
+ return (a.sequence_num - b.sequence_num) < 0; |
+} |
+ |
+// static |
+bool TaskQueueImpl::Task::EnqueueOrderComparatorFn(const Task& a, |
+ const Task& b) { |
+ return a.enqueue_order() < b.enqueue_order(); |
+} |
+ |
+// static |
+bool TaskQueueImpl::Task::DelayedRunTimeComparatorFn(const Task& a, |
+ const Task& b) { |
+ if (a.delayed_run_time < b.delayed_run_time) |
+ return true; |
+ |
+ if (a.delayed_run_time > b.delayed_run_time) |
+ return false; |
+ |
+ // If the times happen to match, then we use the sequence number to decide. |
+ // Compare the difference to support integer roll-over. |
+ return (a.sequence_num - b.sequence_num) < 0; |
+} |
+ |
TaskQueueImpl::AnyThread::AnyThread(TaskQueueManager* task_queue_manager, |
TimeDomain* time_domain) |
- : task_queue_manager(task_queue_manager), time_domain(time_domain) {} |
+ : task_queue_manager(task_queue_manager), |
+ time_domain(time_domain), |
+ immediate_incoming_queue(&TaskQueueImpl::Task::EnqueueOrderComparatorFn) { |
+} |
TaskQueueImpl::AnyThread::~AnyThread() {} |
@@ -100,8 +181,14 @@ |
TimeDomain* time_domain) |
: task_queue_manager(task_queue_manager), |
time_domain(time_domain), |
- delayed_work_queue(new WorkQueue(task_queue, "delayed")), |
- immediate_work_queue(new WorkQueue(task_queue, "immediate")), |
+ delayed_work_queue( |
+ new WorkQueue(task_queue, |
+ "delayed", |
+ &TaskQueueImpl::Task::DelayedRunTimeComparatorFn)), |
+ immediate_work_queue( |
+ new WorkQueue(task_queue, |
+ "immediate", |
+ &TaskQueueImpl::Task::EnqueueOrderComparatorFn)), |
set_index(0), |
is_enabled(true), |
blame_context(nullptr), |
@@ -121,8 +208,8 @@ |
any_thread().task_queue_manager = nullptr; |
main_thread_only().task_queue_manager = nullptr; |
- main_thread_only().delayed_incoming_queue = std::priority_queue<Task>(); |
- any_thread().immediate_incoming_queue = std::queue<Task>(); |
+ main_thread_only().delayed_incoming_queue.clear(); |
+ any_thread().immediate_incoming_queue.clear(); |
main_thread_only().immediate_work_queue.reset(); |
main_thread_only().delayed_work_queue.reset(); |
} |
@@ -132,6 +219,123 @@ |
return base::PlatformThread::CurrentId() == thread_id_; |
} |
+TaskQueue::TaskHandle TaskQueueImpl::PostCancellableDelayedTask( |
+ const tracked_objects::Location& from_here, |
+ const base::Closure& task, |
+ base::TimeDelta delay) { |
+ if (!main_thread_only().task_queue_manager) |
+ return TaskHandle(); |
+ |
+ EnqueueOrder sequence_number = |
+ main_thread_only().task_queue_manager->GetNextSequenceNumber(); |
+ |
+ if (delay.is_zero()) { |
+ base::AutoLock lock(any_thread_lock_); |
+ PushOntoImmediateIncomingQueueLocked( |
+ Task(from_here, task, base::TimeTicks(), sequence_number, true, |
+ sequence_number)); |
+ |
+ return TaskHandle(this, sequence_number); |
+ } else { |
+ DCHECK_GT(delay, base::TimeDelta()); |
+ base::TimeTicks time_domain_now = main_thread_only().time_domain->Now(); |
+ base::TimeTicks time_domain_delayed_run_time = time_domain_now + delay; |
+ PushOntoDelayedIncomingQueueFromMainThread( |
+ Task(from_here, task, time_domain_delayed_run_time, sequence_number, |
+ true), |
+ time_domain_now); |
+ |
+ return TaskHandle(this, time_domain_delayed_run_time, sequence_number); |
+ } |
+} |
+ |
+bool TaskQueueImpl::CancelTask(const TaskQueue::TaskHandle& handle) { |
+ if (!handle) |
+ return false; |
+ |
+ // If the TaskQueueManager has gone away, pretend we have canceled the task |
+ // because this simplifies logic in TimerBase::stop. |
+ if (!main_thread_only().task_queue_manager) |
+ return true; |
+ |
+#if DCHECK_IS_ON() |
+ DCHECK_EQ(handle.task_queue_, this); |
+#endif |
+ |
+ Task fake_task = Task::CreateFakeTaskFromHandle(handle); |
+ if (handle.scheduled_run_time_.is_null()) { |
+ // It's an immediate task. |
+ if (main_thread_only().immediate_work_queue->CancelTask(fake_task)) |
+ return true; |
+ |
+ base::AutoLock lock(any_thread_lock_); |
+ return any_thread().immediate_incoming_queue.erase(fake_task) > 0; |
+ } else { |
+ // It's a delayed task. |
+ DelayedRunTimeQueue::iterator it = |
+ main_thread_only().delayed_incoming_queue.find(fake_task); |
+ if (it != main_thread_only().delayed_incoming_queue.end()) { |
+ // It's safe to remove the wakeup from the TimeDomain only if this task's |
+ // scheduled run time was unique within the queue. |
+ bool can_cancel_timedomain_wakeup = true; |
+ if (it != main_thread_only().delayed_incoming_queue.begin()) { |
+ DelayedRunTimeQueue::iterator before = it; |
+ before--; |
+ if (before->delayed_run_time == handle.scheduled_run_time_) |
+ can_cancel_timedomain_wakeup = false; |
+ } |
+ if (can_cancel_timedomain_wakeup) { |
+ DelayedRunTimeQueue::iterator after = it; |
+ after++; |
+ if (after != main_thread_only().delayed_incoming_queue.end() && |
+ after->delayed_run_time == handle.scheduled_run_time_) { |
+ can_cancel_timedomain_wakeup = false; |
+ } |
+ } |
+ if (can_cancel_timedomain_wakeup) { |
+ main_thread_only().time_domain->CancelDelayedWork( |
+ this, handle.scheduled_run_time_); |
+ } |
+ |
+ main_thread_only().delayed_incoming_queue.erase(it); |
+ return true; |
+ } |
+ |
+ return main_thread_only().delayed_work_queue->CancelTask(fake_task); |
+ } |
+} |
+ |
+bool TaskQueueImpl::IsTaskPending(const TaskQueue::TaskHandle& handle) const { |
+ if (!handle) |
+ return false; |
+ |
+ // If the TaskQueueManager has gone away the task got cancelled. |
+ if (!main_thread_only().task_queue_manager) |
+ return false; |
+ |
+#if DCHECK_IS_ON() |
+ DCHECK_EQ(handle.task_queue_, this); |
+#endif |
+ |
+ Task fake_task = Task::CreateFakeTaskFromHandle(handle); |
+ if (handle.scheduled_run_time_.is_null()) { |
+ // It's an immediate task. |
+ if (main_thread_only().immediate_work_queue->IsTaskPending(fake_task)) |
+ return true; |
+ |
+ base::AutoLock lock(any_thread_lock_); |
+ return any_thread().immediate_incoming_queue.find(fake_task) != |
+ any_thread().immediate_incoming_queue.end(); |
+ } else { |
+ // It's a delayed task. |
+ DelayedRunTimeQueue::iterator it = |
+ main_thread_only().delayed_incoming_queue.find(fake_task); |
+ if (it != main_thread_only().delayed_incoming_queue.end()) |
+ return true; |
+ |
+ return main_thread_only().delayed_work_queue->IsTaskPending(fake_task); |
+ } |
+} |
bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here, |
const base::Closure& task, |
@@ -164,11 +368,8 @@ |
any_thread().task_queue_manager->GetNextSequenceNumber(); |
PushOntoImmediateIncomingQueueLocked( |
- from_here, |
- task, |
- base::TimeTicks(), |
- sequence_number, |
- task_type != TaskType::NON_NESTABLE); |
+ Task(from_here, task, base::TimeTicks(), sequence_number, |
+ task_type != TaskType::NON_NESTABLE, sequence_number)); |
return true; |
} |
@@ -214,12 +415,13 @@ |
} |
void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread( |
- Task pending_task, base::TimeTicks now) { |
+ Task pending_task, |
+ base::TimeTicks now) { |
main_thread_only().task_queue_manager->DidQueueTask(pending_task); |
// Schedule a later call to MoveReadyDelayedTasksToDelayedWorkQueue. |
base::TimeTicks delayed_run_time = pending_task.delayed_run_time; |
- main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); |
+ main_thread_only().delayed_incoming_queue.insert(std::move(pending_task)); |
main_thread_only().time_domain->ScheduleDelayedWork(this, delayed_run_time, |
now); |
TraceQueueSize(false); |
@@ -231,20 +433,13 @@ |
int thread_hop_task_sequence_number = |
any_thread().task_queue_manager->GetNextSequenceNumber(); |
PushOntoImmediateIncomingQueueLocked( |
- FROM_HERE, |
- base::Bind(&TaskQueueImpl::ScheduleDelayedWorkTask, this, |
- base::Passed(&pending_task)), |
- base::TimeTicks(), |
- thread_hop_task_sequence_number, |
- false); |
-} |
- |
-void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked( |
- const tracked_objects::Location& posted_from, |
- const base::Closure& task, |
- base::TimeTicks desired_run_time, |
- EnqueueOrder sequence_number, |
- bool nestable) { |
+ Task(FROM_HERE, base::Bind(&TaskQueueImpl::ScheduleDelayedWorkTask, this, |
+ base::Passed(&pending_task)), |
+ base::TimeTicks(), thread_hop_task_sequence_number, false, |
+ thread_hop_task_sequence_number)); |
+} |
+ |
+void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task pending_task) { |
if (any_thread().immediate_incoming_queue.empty()) |
any_thread().time_domain->RegisterAsUpdatableTaskQueue(this); |
// If the |immediate_incoming_queue| is empty we need a DoWork posted to make |
@@ -259,9 +454,13 @@ |
any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE); |
} |
} |
- any_thread().immediate_incoming_queue.emplace( |
- posted_from, task, desired_run_time, sequence_number, nestable, sequence_number); |
- any_thread().task_queue_manager->DidQueueTask( any_thread().immediate_incoming_queue.back()); |
+ any_thread().task_queue_manager->DidQueueTask(pending_task); |
+ // We expect |pending_task| to be inserted at the end. Amoritized O(1). |
+ any_thread().immediate_incoming_queue.insert( |
+ any_thread().immediate_incoming_queue.end(), |
+ std::move(pending_task)); |
+ DCHECK_EQ(pending_task.enqueue_order(), |
+ any_thread().immediate_incoming_queue.rbegin()->enqueue_order()); |
TraceQueueSize(true); |
} |
@@ -273,11 +472,11 @@ |
if (delayed_run_time < time_domain_now) { |
delayed_run_time = time_domain_now; |
pending_task.delayed_run_time = time_domain_now; |
- main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); |
+ main_thread_only().delayed_incoming_queue.insert(std::move(pending_task)); |
LazyNow lazy_now(time_domain_now); |
MoveReadyDelayedTasksToDelayedWorkQueue(&lazy_now); |
} else { |
- main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); |
+ main_thread_only().delayed_incoming_queue.insert(std::move(pending_task)); |
main_thread_only().time_domain->ScheduleDelayedWork( |
this, delayed_run_time, main_thread_only().time_domain->Now()); |
} |
@@ -305,13 +504,13 @@ |
bool TaskQueueImpl::IsEmpty() const { |
if (!main_thread_only().delayed_work_queue->Empty() || |
- !main_thread_only().delayed_incoming_queue.empty() || |
!main_thread_only().immediate_work_queue->Empty()) { |
return false; |
} |
base::AutoLock lock(any_thread_lock_); |
- return any_thread().immediate_incoming_queue.empty(); |
+ return any_thread().immediate_incoming_queue.empty() && |
+ main_thread_only().delayed_incoming_queue.empty(); |
} |
bool TaskQueueImpl::HasPendingImmediateWork() const { |
@@ -324,7 +523,7 @@ |
// Tasks on |delayed_incoming_queue| that could run now, count as |
// immediate work. |
if (!main_thread_only().delayed_incoming_queue.empty() && |
- main_thread_only().delayed_incoming_queue.top().delayed_run_time <= |
+ main_thread_only().delayed_incoming_queue.begin()->delayed_run_time <= |
main_thread_only().time_domain->CreateLazyNow().Now()) { |
return true; |
} |
@@ -335,37 +534,41 @@ |
} |
void TaskQueueImpl::MoveReadyDelayedTasksToDelayedWorkQueue(LazyNow* lazy_now) { |
- // Enqueue all delayed tasks that should be running now, skipping any that |
- // have been canceled. |
+ // Enqueue all delayed tasks that should be running now. |
while (!main_thread_only().delayed_incoming_queue.empty()) { |
+ DelayedRunTimeQueue::iterator next_task = |
+ main_thread_only().delayed_incoming_queue.begin(); |
+ if (next_task->delayed_run_time > lazy_now->Now()) |
+ break; |
// TODO(alexclarke): Use extract() when C++17 is allowed. |
- Task& task = |
- const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()); |
- if (task.task.IsCancelled()) { |
- main_thread_only().delayed_incoming_queue.pop(); |
- continue; |
- } |
- if (task.delayed_run_time > lazy_now->Now()) |
- break; |
+ Task& task = const_cast<Task&>(*next_task); |
task.set_enqueue_order( |
main_thread_only().task_queue_manager->GetNextSequenceNumber()); |
main_thread_only().delayed_work_queue->Push(std::move(task)); |
- main_thread_only().delayed_incoming_queue.pop(); |
- } |
-} |
- |
-bool TaskQueueImpl::MaybeUpdateImmediateWorkQueues() { |
+ main_thread_only().delayed_incoming_queue.erase(next_task); |
+ } |
+} |
+ |
+void TaskQueueImpl::UpdateDelayedWorkQueue(LazyNow* lazy_now) { |
if (!main_thread_only().task_queue_manager) |
- return false; |
- |
- if (!main_thread_only().immediate_work_queue->Empty()) |
- return true; |
- |
- base::AutoLock lock(any_thread_lock_); |
+ return; |
+ MoveReadyDelayedTasksToDelayedWorkQueue(lazy_now); |
+ TraceQueueSize(false); |
+} |
+ |
+void TaskQueueImpl::UpdateImmediateWorkQueue() { |
+ DCHECK(main_thread_only().immediate_work_queue->Empty()); |
+ base::AutoLock lock(any_thread_lock_); |
+ if (!main_thread_only().task_queue_manager) |
+ return; |
+ |
main_thread_only().immediate_work_queue->SwapLocked( |
any_thread().immediate_incoming_queue); |
- // |immediate_work_queue| is now empty so updates are no longer required. |
- return false; |
+ |
+ // |any_thread().immediate_incoming_queue| is now empty so |
+ // TimeDomain::UpdateQueues no longer needs to consider this queue for |
+ // reloading. |
+ main_thread_only().time_domain->UnregisterAsUpdatableTaskQueue(this); |
} |
void TaskQueueImpl::TraceQueueSize(bool is_locked) const { |
@@ -447,7 +650,7 @@ |
main_thread_only().delayed_work_queue->Size()); |
if (!main_thread_only().delayed_incoming_queue.empty()) { |
base::TimeDelta delay_to_next_task = |
- (main_thread_only().delayed_incoming_queue.top().delayed_run_time - |
+ (main_thread_only().delayed_incoming_queue.begin()->delayed_run_time - |
main_thread_only().time_domain->CreateLazyNow().Now()); |
state->SetDouble("delay_to_next_task_ms", |
delay_to_next_task.InMillisecondsF()); |
@@ -550,9 +753,9 @@ |
if (!task_unblocked && previous_fence) { |
base::AutoLock lock(any_thread_lock_); |
if (!any_thread().immediate_incoming_queue.empty() && |
- any_thread().immediate_incoming_queue.front().enqueue_order() > |
+ any_thread().immediate_incoming_queue.begin()->enqueue_order() > |
previous_fence && |
- any_thread().immediate_incoming_queue.front().enqueue_order() < |
+ any_thread().immediate_incoming_queue.begin()->enqueue_order() < |
main_thread_only().current_fence) { |
task_unblocked = true; |
} |
@@ -577,7 +780,7 @@ |
if (!task_unblocked && previous_fence) { |
base::AutoLock lock(any_thread_lock_); |
if (!any_thread().immediate_incoming_queue.empty() && |
- any_thread().immediate_incoming_queue.front().enqueue_order() > |
+ any_thread().immediate_incoming_queue.begin()->enqueue_order() > |
previous_fence) { |
task_unblocked = true; |
} |
@@ -602,7 +805,7 @@ |
if (any_thread().immediate_incoming_queue.empty()) |
return true; |
- return any_thread().immediate_incoming_queue.front().enqueue_order() > |
+ return any_thread().immediate_incoming_queue.begin()->enqueue_order() > |
main_thread_only().current_fence; |
} |
@@ -618,39 +821,24 @@ |
if (any_thread().immediate_incoming_queue.empty()) |
return true; |
- return any_thread().immediate_incoming_queue.front().enqueue_order() > |
+ return any_thread().immediate_incoming_queue.begin()->enqueue_order() > |
main_thread_only().current_fence; |
} |
// static |
-void TaskQueueImpl::QueueAsValueInto(const std::queue<Task>& queue, |
+void TaskQueueImpl::QueueAsValueInto(const ComparatorQueue& queue, |
base::trace_event::TracedValue* state) { |
- // Remove const to search |queue| in the destructive manner. Restore the |
- // content from |visited| later. |
- std::queue<Task>* mutable_queue = const_cast<std::queue<Task>*>(&queue); |
- std::queue<Task> visited; |
- while (!mutable_queue->empty()) { |
- TaskAsValueInto(mutable_queue->front(), state); |
- visited.push(std::move(mutable_queue->front())); |
- mutable_queue->pop(); |
- } |
- *mutable_queue = std::move(visited); |
+ for (const Task& task : queue) { |
+ TaskAsValueInto(task, state); |
+ } |
} |
// static |
-void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue, |
+void TaskQueueImpl::QueueAsValueInto(const DelayedRunTimeQueue& queue, |
base::trace_event::TracedValue* state) { |
- // Remove const to search |queue| in the destructive manner. Restore the |
- // content from |visited| later. |
- std::priority_queue<Task>* mutable_queue = |
- const_cast<std::priority_queue<Task>*>(&queue); |
- std::priority_queue<Task> visited; |
- while (!mutable_queue->empty()) { |
- TaskAsValueInto(mutable_queue->top(), state); |
- visited.push(std::move(const_cast<Task&>(mutable_queue->top()))); |
- mutable_queue->pop(); |
- } |
- *mutable_queue = std::move(visited); |
+ for (const Task& task : queue) { |
+ TaskAsValueInto(task, state); |
+ } |
} |
// static |
@@ -667,7 +855,6 @@ |
state->SetInteger("sequence_num", task.sequence_num); |
state->SetBoolean("nestable", task.nestable); |
state->SetBoolean("is_high_res", task.is_high_res); |
- state->SetBoolean("is_cancelled", task.task.IsCancelled()); |
state->SetDouble( |
"delayed_run_time", |
(task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L); |