Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1180)

Unified Diff: third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc

Issue 2258713004: Make tasks cancellable inside the blink scheduler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Various comment nits addressed Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc
diff --git a/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc b/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc
index 02f5cef63fd6d4e0bf78028c66cfaef0c708a1f0..a68a4af5b4889e90167140c59ff298f5f6e7d8c2 100644
--- a/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc
+++ b/third_party/WebKit/Source/platform/scheduler/base/task_queue_impl.cc
@@ -12,6 +12,35 @@
namespace blink {
namespace scheduler {
+
+TaskQueue::TaskHandle::TaskHandle() : enqueue_order_(0), sequence_number_(0) {}
+
+TaskQueue::TaskHandle::TaskHandle(TaskQueue* task_queue, uint64_t enqueue_order)
+ : enqueue_order_(enqueue_order),
+#if DCHECK_IS_ON()
+ task_queue_(task_queue),
+#endif
+ sequence_number_(0) {
+ DCHECK_GT(enqueue_order, 0ull);
+}
+
+TaskQueue::TaskHandle::TaskHandle(TaskQueue* task_queue,
+ base::TimeTicks scheduled_run_time,
+ int sequence_number)
+ : enqueue_order_(0),
+ scheduled_run_time_(scheduled_run_time),
+#if DCHECK_IS_ON()
+ task_queue_(task_queue),
+#endif
+ sequence_number_(sequence_number) {
+ DCHECK(!scheduled_run_time.is_null());
+}
+
+TaskQueue::TaskHandle::operator bool() const {
+ return !scheduled_run_time_.is_null() ||
+ internal::EnqueueOrderGenerator::IsValidEnqueueOrder(enqueue_order_);
+}
+
namespace internal {
TaskQueueImpl::TaskQueueImpl(
@@ -91,12 +120,64 @@ TaskQueueImpl::Task::Task(const tracked_objects::Location& posted_from,
sequence_num = sequence_number;
}
+// static
+TaskQueueImpl::Task TaskQueueImpl::Task::CreateFakeTaskFromHandle(
+ const TaskHandle& handle) {
+ if (handle.scheduled_run_time_.is_null()) {
+ // It's an immediate task.
+ return Task(tracked_objects::Location(), base::Closure(),
+ handle.scheduled_run_time_, handle.sequence_number_, false,
+ handle.enqueue_order_);
+ } else {
+ // It's a delayed task.
+ DCHECK_EQ(0ull, handle.enqueue_order_);
+ return Task(tracked_objects::Location(), base::Closure(),
+ handle.scheduled_run_time_, handle.sequence_number_, false);
+ }
+}
+
+bool TaskQueueImpl::Task::DelayedRunTimeComparator::operator()(
+ const Task& a,
+ const Task& b) const {
+ if (a.delayed_run_time < b.delayed_run_time)
+ return true;
+
+ if (a.delayed_run_time > b.delayed_run_time)
+ return false;
+
+ // If the times happen to match, then we use the sequence number to decide.
+ // Compare the difference to support integer roll-over.
+ return (a.sequence_num - b.sequence_num) < 0;
+}
+
+// static
+bool TaskQueueImpl::Task::EnqueueOrderComparatorFn(const Task& a,
+ const Task& b) {
+ return a.enqueue_order() < b.enqueue_order();
+}
+
+// static
+bool TaskQueueImpl::Task::DelayedRunTimeComparatorFn(const Task& a,
+ const Task& b) {
+ if (a.delayed_run_time < b.delayed_run_time)
+ return true;
+
+ if (a.delayed_run_time > b.delayed_run_time)
+ return false;
+
+ // If the times happen to match, then we use the sequence number to decide.
+ // Compare the difference to support integer roll-over.
+ return (a.sequence_num - b.sequence_num) < 0;
+}
+
TaskQueueImpl::AnyThread::AnyThread(TaskQueueManager* task_queue_manager,
PumpPolicy pump_policy,
TimeDomain* time_domain)
: task_queue_manager(task_queue_manager),
pump_policy(pump_policy),
- time_domain(time_domain) {}
+ time_domain(time_domain),
+ immediate_incoming_queue(&TaskQueueImpl::Task::EnqueueOrderComparatorFn) {
+}
TaskQueueImpl::AnyThread::~AnyThread() {}
@@ -108,8 +189,14 @@ TaskQueueImpl::MainThreadOnly::MainThreadOnly(
: task_queue_manager(task_queue_manager),
pump_policy(pump_policy),
time_domain(time_domain),
- delayed_work_queue(new WorkQueue(task_queue, "delayed")),
- immediate_work_queue(new WorkQueue(task_queue, "immediate")),
+ delayed_work_queue(
+ new WorkQueue(task_queue,
+ "delayed",
+ &TaskQueueImpl::Task::DelayedRunTimeComparatorFn)),
+ immediate_work_queue(
+ new WorkQueue(task_queue,
+ "immediate",
+ &TaskQueueImpl::Task::EnqueueOrderComparatorFn)),
set_index(0),
is_enabled(true),
blame_context(nullptr) {}
@@ -128,8 +215,8 @@ void TaskQueueImpl::UnregisterTaskQueue() {
any_thread().task_queue_manager = nullptr;
main_thread_only().task_queue_manager = nullptr;
- main_thread_only().delayed_incoming_queue = std::priority_queue<Task>();
- any_thread().immediate_incoming_queue = std::queue<Task>();
+ main_thread_only().delayed_incoming_queue.clear();
+ any_thread().immediate_incoming_queue.clear();
main_thread_only().immediate_work_queue.reset();
main_thread_only().delayed_work_queue.reset();
}
@@ -139,6 +226,124 @@ bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
return base::PlatformThread::CurrentId() == thread_id_;
}
+TaskQueue::TaskHandle TaskQueueImpl::PostCancellableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ base::TimeDelta delay) {
+ if (!main_thread_only().task_queue_manager)
+ return TaskHandle();
+
+ EnqueueOrder sequence_number =
+ main_thread_only().task_queue_manager->GetNextSequenceNumber();
+
+ if (delay.is_zero()) {
+ base::AutoLock lock(any_thread_lock_);
+ PushOntoImmediateIncomingQueueLocked(
+ Task(from_here, task, base::TimeTicks(), sequence_number, true,
+ sequence_number));
+
+ return TaskHandle(this, sequence_number);
+ } else {
+ DCHECK_GT(delay, base::TimeDelta());
+ base::TimeTicks time_domain_now = main_thread_only().time_domain->Now();
+ base::TimeTicks time_domain_delayed_run_time = time_domain_now + delay;
+ PushOntoDelayedIncomingQueueFromMainThread(
+ Task(from_here, task, time_domain_delayed_run_time, sequence_number,
+ true),
+ time_domain_now);
+
+ return TaskHandle(this, time_domain_delayed_run_time, sequence_number);
+ }
+}
+
+bool TaskQueueImpl::CancelTask(const TaskQueue::TaskHandle& handle) {
+ if (!handle)
+ return false;
+
+ // If the TaskQueueManager has gone away, pretend we have canceled the task
+ // because this simplifies logic in TimerBase::stop.
+ if (!main_thread_only().task_queue_manager)
+ return true;
+
+#if DCHECK_IS_ON()
+ DCHECK_EQ(handle.task_queue_, this);
+#endif
+
+ Task fake_task = Task::CreateFakeTaskFromHandle(handle);
+ if (handle.scheduled_run_time_.is_null()) {
+ // It's an immediate task.
+ if (main_thread_only().immediate_work_queue->CancelTask(fake_task))
+ return true;
+
+ base::AutoLock lock(any_thread_lock_);
+ return any_thread().immediate_incoming_queue.erase(fake_task) > 0;
+ } else {
+ // It's a delayed task.
+ DelayedRunTimeQueue::iterator it =
+ main_thread_only().delayed_incoming_queue.find(fake_task);
+ if (it != main_thread_only().delayed_incoming_queue.end()) {
+ // It's safe to remove the wakeup from the TimeDomain only if this task's
+ // scheduled run time was unique within the queue.
+ bool can_cancel_timedomain_wakeup = true;
+ if (it != main_thread_only().delayed_incoming_queue.begin()) {
+ DelayedRunTimeQueue::iterator before = it;
+ before--;
+ if (before->delayed_run_time == handle.scheduled_run_time_)
+ can_cancel_timedomain_wakeup = false;
+ }
+ if (can_cancel_timedomain_wakeup) {
+ DelayedRunTimeQueue::iterator after = it;
+ after++;
+ if (after != main_thread_only().delayed_incoming_queue.end() &&
+ after->delayed_run_time == handle.scheduled_run_time_) {
+ can_cancel_timedomain_wakeup = false;
+ }
+ }
+ if (can_cancel_timedomain_wakeup) {
+ main_thread_only().time_domain->CancelDelayedWork(
+ this, handle.scheduled_run_time_);
+ }
+
+ main_thread_only().delayed_incoming_queue.erase(it);
+ return true;
+ }
+
+ return main_thread_only().delayed_work_queue->CancelTask(fake_task);
+ }
+}
+
+bool TaskQueueImpl::IsTaskPending(const TaskQueue::TaskHandle& handle) const {
+ if (!handle)
+ return false;
+
+ // If the TaskQueueManager has gone away the task got cancelled.
+ if (!main_thread_only().task_queue_manager)
+ return false;
+
+#if DCHECK_IS_ON()
+ DCHECK_EQ(handle.task_queue_, this);
+#endif
+
+ Task fake_task = Task::CreateFakeTaskFromHandle(handle);
+ if (handle.scheduled_run_time_.is_null()) {
+ // It's an immediate task.
+ if (main_thread_only().immediate_work_queue->IsTaskPending(fake_task))
+ return true;
+
+ base::AutoLock lock(any_thread_lock_);
+ return any_thread().immediate_incoming_queue.find(fake_task) !=
+ any_thread().immediate_incoming_queue.end();
+ } else {
+ // It's a delayed task.
+ DelayedRunTimeQueue::iterator it =
+ main_thread_only().delayed_incoming_queue.find(fake_task);
+ if (it != main_thread_only().delayed_incoming_queue.end())
+ return true;
+
+ return main_thread_only().delayed_work_queue->IsTaskPending(fake_task);
+ }
+}
+
bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
@@ -223,7 +428,7 @@ void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
// Schedule a later call to MoveReadyDelayedTasksToDelayedWorkQueue.
base::TimeTicks delayed_run_time = pending_task.delayed_run_time;
- main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
+ main_thread_only().delayed_incoming_queue.insert(std::move(pending_task));
main_thread_only().time_domain->ScheduleDelayedWork(this, delayed_run_time,
now);
TraceQueueSize(false);
@@ -249,16 +454,32 @@ void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task pending_task) {
any_thread().task_queue_manager->MaybeScheduleImmediateWork(FROM_HERE);
}
any_thread().task_queue_manager->DidQueueTask(pending_task);
- any_thread().immediate_incoming_queue.push(std::move(pending_task));
+ // We expect |pending_task| to be inserted at the end. Amoritized O(1).
+ any_thread().immediate_incoming_queue.insert(
+ any_thread().immediate_incoming_queue.end(),
+ std::move(pending_task));
+ DCHECK_EQ(pending_task.enqueue_order(),
+ any_thread().immediate_incoming_queue.rbegin()->enqueue_order());
TraceQueueSize(true);
}
void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
DCHECK(main_thread_checker_.CalledOnValidThread());
base::TimeTicks delayed_run_time = pending_task.delayed_run_time;
- main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
- main_thread_only().time_domain->ScheduleDelayedWork(
- this, delayed_run_time, main_thread_only().time_domain->Now());
+ base::TimeTicks time_domain_now = main_thread_only().time_domain->Now();
+ // Make sure |delayed_run_time| isn't in the past.
+ if (delayed_run_time < time_domain_now) {
+ delayed_run_time = time_domain_now;
+ pending_task.delayed_run_time = time_domain_now;
+ main_thread_only().delayed_incoming_queue.insert(std::move(pending_task));
+ LazyNow lazy_now(time_domain_now);
+ MoveReadyDelayedTasksToDelayedWorkQueue(&lazy_now);
+ } else {
+ main_thread_only().delayed_incoming_queue.insert(std::move(pending_task));
+ main_thread_only().time_domain->ScheduleDelayedWork(
+ this, delayed_run_time, main_thread_only().time_domain->Now());
+ }
+ TraceQueueSize(false);
}
void TaskQueueImpl::SetQueueEnabled(bool enabled) {
@@ -311,7 +532,7 @@ bool TaskQueueImpl::NeedsPumping() const {
if (main_thread_only().delayed_incoming_queue.empty())
return false;
- return main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
+ return main_thread_only().delayed_incoming_queue.begin()->delayed_run_time <=
main_thread_only().time_domain->CreateLazyNow().Now();
}
@@ -326,7 +547,7 @@ bool TaskQueueImpl::TaskIsOlderThanQueuedImmediateTasksLocked(
// Return false if task is newer than the oldest immediate task.
if (!any_thread().immediate_incoming_queue.empty() &&
task->enqueue_order() >
- any_thread().immediate_incoming_queue.front().enqueue_order()) {
+ any_thread().immediate_incoming_queue.begin()->enqueue_order()) {
return false;
}
return true;
@@ -374,17 +595,17 @@ bool TaskQueueImpl::ShouldAutoPumpDelayedQueue(bool should_trigger_wakeup,
void TaskQueueImpl::MoveReadyDelayedTasksToDelayedWorkQueue(LazyNow* lazy_now) {
// Enqueue all delayed tasks that should be running now.
- while (!main_thread_only().delayed_incoming_queue.empty() &&
- main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
- lazy_now->Now()) {
- // Note: the const_cast is needed because there is no direct way to move
- // elements out of a priority queue. The queue must not be modified between
- // the top() and the pop().
- main_thread_only().delayed_work_queue->PushAndSetEnqueueOrder(
- std::move(
- const_cast<Task&>(main_thread_only().delayed_incoming_queue.top())),
+ while (!main_thread_only().delayed_incoming_queue.empty()) {
+ DelayedRunTimeQueue::iterator next_task =
+ main_thread_only().delayed_incoming_queue.begin();
+ if (next_task->delayed_run_time > lazy_now->Now())
+ break;
+ // TODO(alexclarke): Use extract() when C++17 is allowed.
+ Task& task = const_cast<Task&>(*next_task);
+ task.set_enqueue_order(
main_thread_only().task_queue_manager->GetNextSequenceNumber());
- main_thread_only().delayed_incoming_queue.pop();
+ main_thread_only().delayed_work_queue->Push(std::move(task));
+ main_thread_only().delayed_incoming_queue.erase(next_task);
}
}
@@ -467,9 +688,11 @@ void TaskQueueImpl::PumpQueueLocked(LazyNow* lazy_now, bool may_post_dowork) {
MoveReadyDelayedTasksToDelayedWorkQueue(lazy_now);
while (!any_thread().immediate_incoming_queue.empty()) {
+ ComparatorQueue::iterator it =
+ any_thread().immediate_incoming_queue.begin();
main_thread_only().immediate_work_queue->Push(
- std::move(any_thread().immediate_incoming_queue.front()));
- any_thread().immediate_incoming_queue.pop();
+ std::move(const_cast<Task&>(*it)));
+ any_thread().immediate_incoming_queue.erase(it);
}
// |immediate_incoming_queue| is now empty so TimeDomain::UpdateQueues no
@@ -576,7 +799,7 @@ void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const {
main_thread_only().delayed_work_queue->Size());
if (!main_thread_only().delayed_incoming_queue.empty()) {
base::TimeDelta delay_to_next_task =
- (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
+ (main_thread_only().delayed_incoming_queue.begin()->delayed_run_time -
main_thread_only().time_domain->CreateLazyNow().Now());
state->SetDouble("delay_to_next_task_ms",
delay_to_next_task.InMillisecondsF());
@@ -662,34 +885,19 @@ void TaskQueueImpl::SetBlameContext(
}
// static
-void TaskQueueImpl::QueueAsValueInto(const std::queue<Task>& queue,
+void TaskQueueImpl::QueueAsValueInto(const ComparatorQueue& queue,
base::trace_event::TracedValue* state) {
- // Remove const to search |queue| in the destructive manner. Restore the
- // content from |visited| later.
- std::queue<Task>* mutable_queue = const_cast<std::queue<Task>*>(&queue);
- std::queue<Task> visited;
- while (!mutable_queue->empty()) {
- TaskAsValueInto(mutable_queue->front(), state);
- visited.push(std::move(mutable_queue->front()));
- mutable_queue->pop();
+ for (const Task& task : queue) {
+ TaskAsValueInto(task, state);
}
- *mutable_queue = std::move(visited);
}
// static
-void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue,
+void TaskQueueImpl::QueueAsValueInto(const DelayedRunTimeQueue& queue,
base::trace_event::TracedValue* state) {
- // Remove const to search |queue| in the destructive manner. Restore the
- // content from |visited| later.
- std::priority_queue<Task>* mutable_queue =
- const_cast<std::priority_queue<Task>*>(&queue);
- std::priority_queue<Task> visited;
- while (!mutable_queue->empty()) {
- TaskAsValueInto(mutable_queue->top(), state);
- visited.push(std::move(const_cast<Task&>(mutable_queue->top())));
- mutable_queue->pop();
+ for (const Task& task : queue) {
+ TaskAsValueInto(task, state);
}
- *mutable_queue = std::move(visited);
}
// static

Powered by Google App Engine
This is Rietveld 408576698