Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(757)

Unified Diff: components/scheduler/child/task_queue_manager.cc

Issue 1223163006: Implement PostDelayedTaskAt for guaranteed timer ordering (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebased. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/scheduler/child/task_queue_manager.cc
diff --git a/components/scheduler/child/task_queue_manager.cc b/components/scheduler/child/task_queue_manager.cc
index 620ede55ed35563d6e4c42a9f11fabf2d8e483ae..4e55a89bceb828c89f642f6f6e033bb1fda8ef23 100644
--- a/components/scheduler/child/task_queue_manager.cc
+++ b/components/scheduler/child/task_queue_manager.cc
@@ -12,6 +12,7 @@
#include "base/trace_event/trace_event.h"
#include "base/trace_event/trace_event_argument.h"
#include "components/scheduler/child/nestable_single_thread_task_runner.h"
+#include "components/scheduler/child/task_queue.h"
#include "components/scheduler/child/task_queue_selector.h"
namespace {
@@ -44,24 +45,32 @@ class LazyNow {
base::TimeTicks now_;
};
-class TaskQueue : public base::SingleThreadTaskRunner {
+class TaskQueueImpl : public TaskQueue {
public:
- TaskQueue(TaskQueueManager* task_queue_manager,
- const char* disabled_by_default_tracing_category,
- const char* disabled_by_default_verbose_tracing_category);
+ TaskQueueImpl(TaskQueueManager* task_queue_manager,
+ const char* disabled_by_default_tracing_category,
+ const char* disabled_by_default_verbose_tracing_category);
- // base::SingleThreadTaskRunner implementation.
+ // TaskQueue :implementation.
bool RunsTasksOnCurrentThread() const override;
bool PostDelayedTask(const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) override {
- return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL);
+ return PostDelayedTaskImpl(from_here, task, delay, base::TimeTicks(),
+ TaskType::NORMAL);
}
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) override {
- return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE);
+ return PostDelayedTaskImpl(from_here, task, delay, base::TimeTicks(),
+ TaskType::NON_NESTABLE);
+ }
+ bool PostDelayedTaskAt(const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ base::TimeTicks desired_run_time) override {
+ return PostDelayedTaskImpl(from_here, task, base::TimeDelta(),
+ desired_run_time, TaskType::NORMAL);
}
TaskQueueManager::QueueState GetQueueState() const;
@@ -101,11 +110,12 @@ class TaskQueue : public base::SingleThreadTaskRunner {
NON_NESTABLE,
};
- ~TaskQueue() override;
+ ~TaskQueueImpl() override;
bool PostDelayedTaskImpl(const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay,
alex clarke (OOO till 29th) 2015/07/16 09:29:00 It feels a bit weird having both a delay and a des
Sami 2015/07/16 12:19:32 Yep I struggled with this interface a bit. The loc
+ base::TimeTicks desired_run_time,
TaskType task_type);
// Delayed task posted to the underlying run loop, which locks |lock_| and
@@ -149,6 +159,11 @@ class TaskQueue : public base::SingleThreadTaskRunner {
const char* name_;
const char* disabled_by_default_tracing_category_;
const char* disabled_by_default_verbose_tracing_category_;
+
+ // Queue-local task sequence number for maintaining the order of delayed
+ // tasks which are posted for the exact same time. Note that this will be
+ // replaced by the global sequence number when the delay has elapsed.
+ int delayed_task_sequence_number_;
base::DelayedTaskQueue delayed_task_queue_;
std::set<base::TimeTicks> in_flight_kick_delayed_tasks_;
@@ -156,12 +171,13 @@ class TaskQueue : public base::SingleThreadTaskRunner {
base::TaskQueue work_queue_;
TaskQueueManager::WakeupPolicy wakeup_policy_;
- DISALLOW_COPY_AND_ASSIGN(TaskQueue);
+ DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl);
};
-TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager,
- const char* disabled_by_default_tracing_category,
- const char* disabled_by_default_verbose_tracing_category)
+TaskQueueImpl::TaskQueueImpl(
+ TaskQueueManager* task_queue_manager,
+ const char* disabled_by_default_tracing_category,
+ const char* disabled_by_default_verbose_tracing_category)
: thread_id_(base::PlatformThread::CurrentId()),
task_queue_manager_(task_queue_manager),
pump_policy_(TaskQueueManager::PumpPolicy::AUTO),
@@ -170,13 +186,14 @@ TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager,
disabled_by_default_tracing_category),
disabled_by_default_verbose_tracing_category_(
disabled_by_default_verbose_tracing_category),
+ delayed_task_sequence_number_(0),
wakeup_policy_(TaskQueueManager::WakeupPolicy::CAN_WAKE_OTHER_QUEUES) {
}
-TaskQueue::~TaskQueue() {
+TaskQueueImpl::~TaskQueueImpl() {
}
-void TaskQueue::WillDeleteTaskQueueManager() {
+void TaskQueueImpl::WillDeleteTaskQueueManager() {
base::AutoLock lock(lock_);
task_queue_manager_ = nullptr;
delayed_task_queue_ = base::DelayedTaskQueue();
@@ -184,15 +201,17 @@ void TaskQueue::WillDeleteTaskQueueManager() {
work_queue_ = base::TaskQueue();
}
-bool TaskQueue::RunsTasksOnCurrentThread() const {
+bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
base::AutoLock lock(lock_);
return base::PlatformThread::CurrentId() == thread_id_;
}
-bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
- const base::Closure& task,
- base::TimeDelta delay,
- TaskType task_type) {
+bool TaskQueueImpl::PostDelayedTaskImpl(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ base::TimeDelta delay,
+ base::TimeTicks desired_run_time,
+ TaskType task_type) {
base::AutoLock lock(lock_);
if (!task_queue_manager_)
return false;
@@ -201,9 +220,14 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
task_type != TaskType::NON_NESTABLE);
task_queue_manager_->DidQueueTask(pending_task);
- if (delay > base::TimeDelta()) {
+ if (delay > base::TimeDelta() || !desired_run_time.is_null()) {
base::TimeTicks now = task_queue_manager_->Now();
- pending_task.delayed_run_time = now + delay;
+ if (!desired_run_time.is_null()) {
+ pending_task.delayed_run_time = std::max(now, desired_run_time);
alex clarke (OOO till 29th) 2015/07/16 09:29:00 Should we just enqueue the task if desired runtime
Sami 2015/07/16 12:19:32 I think that might change the ordering. For exampl
+ } else {
+ pending_task.delayed_run_time = now + delay;
+ }
+ pending_task.sequence_num = delayed_task_sequence_number_++;
delayed_task_queue_.push(pending_task);
TraceQueueSize(true);
// If we changed the topmost task, then it is time to reschedule.
@@ -217,7 +241,7 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
return true;
}
-void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() {
+void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue() {
DCHECK(main_thread_checker_.CalledOnValidThread());
base::AutoLock lock(lock_);
if (!task_queue_manager_)
@@ -227,7 +251,8 @@ void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() {
MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
}
-void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) {
+void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked(
+ LazyNow* lazy_now) {
lock_.AssertAcquired();
// Enqueue all delayed tasks that should be running now.
while (!delayed_task_queue_.empty() &&
@@ -241,12 +266,12 @@ void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) {
ScheduleDelayedWorkLocked(lazy_now);
}
-void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
+void TaskQueueImpl::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
lock_.AssertAcquired();
// Any remaining tasks are in the future, so queue a task to kick them.
if (!delayed_task_queue_.empty()) {
base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time;
- DCHECK_GT(next_run_time, lazy_now->Now());
+ DCHECK_GE(next_run_time, lazy_now->Now());
// Make sure we don't have more than one
// MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
// run time (note it's fine to have multiple ones in flight for distinct
@@ -257,12 +282,13 @@ void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
base::TimeDelta delay = next_run_time - lazy_now->Now();
task_queue_manager_->PostDelayedTask(
FROM_HERE,
- Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay);
+ Bind(&TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue, this),
+ delay);
}
}
}
-TaskQueueManager::QueueState TaskQueue::GetQueueState() const {
+TaskQueueManager::QueueState TaskQueueImpl::GetQueueState() const {
DCHECK(main_thread_checker_.CalledOnValidThread());
if (!work_queue_.empty())
return TaskQueueManager::QueueState::HAS_WORK;
@@ -277,7 +303,7 @@ TaskQueueManager::QueueState TaskQueue::GetQueueState() const {
}
}
-bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) {
+bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) {
lock_.AssertAcquired();
// A null task is passed when UpdateQueue is called before any task is run.
// In this case we don't want to pump an after_wakeup queue, so return true
@@ -299,7 +325,7 @@ bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) {
return oldest_queued_task < *task;
}
-bool TaskQueue::ShouldAutoPumpQueueLocked(
+bool TaskQueueImpl::ShouldAutoPumpQueueLocked(
bool should_trigger_wakeup,
const base::PendingTask* previous_task) {
lock_.AssertAcquired();
@@ -313,7 +339,7 @@ bool TaskQueue::ShouldAutoPumpQueueLocked(
return true;
}
-bool TaskQueue::NextPendingDelayedTaskRunTime(
+bool TaskQueueImpl::NextPendingDelayedTaskRunTime(
base::TimeTicks* next_pending_delayed_task) {
base::AutoLock lock(lock_);
if (delayed_task_queue_.empty())
@@ -322,9 +348,9 @@ bool TaskQueue::NextPendingDelayedTaskRunTime(
return true;
}
-bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now,
- bool should_trigger_wakeup,
- const base::PendingTask* previous_task) {
+bool TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now,
+ bool should_trigger_wakeup,
+ const base::PendingTask* previous_task) {
if (!work_queue_.empty())
return true;
@@ -339,14 +365,14 @@ bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now,
}
}
-base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
+base::PendingTask TaskQueueImpl::TakeTaskFromWorkQueue() {
base::PendingTask pending_task = work_queue_.front();
work_queue_.pop();
TraceQueueSize(false);
return pending_task;
}
-void TaskQueue::TraceQueueSize(bool is_locked) const {
+void TaskQueueImpl::TraceQueueSize(bool is_locked) const {
bool is_tracing;
TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_,
&is_tracing);
@@ -363,7 +389,7 @@ void TaskQueue::TraceQueueSize(bool is_locked) const {
lock_.Release();
}
-void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
+void TaskQueueImpl::EnqueueTaskLocked(const base::PendingTask& pending_task) {
lock_.AssertAcquired();
if (!task_queue_manager_)
return;
@@ -382,7 +408,7 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
TraceQueueSize(true);
}
-void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
+void TaskQueueImpl::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
base::AutoLock lock(lock_);
if (pump_policy == TaskQueueManager::PumpPolicy::AUTO &&
pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) {
@@ -391,7 +417,7 @@ void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
pump_policy_ = pump_policy;
}
-void TaskQueue::PumpQueueLocked() {
+void TaskQueueImpl::PumpQueueLocked() {
lock_.AssertAcquired();
if (task_queue_manager_) {
LazyNow lazy_now(task_queue_manager_);
@@ -405,12 +431,12 @@ void TaskQueue::PumpQueueLocked() {
task_queue_manager_->MaybePostDoWorkOnMainRunner();
}
-void TaskQueue::PumpQueue() {
+void TaskQueueImpl::PumpQueue() {
base::AutoLock lock(lock_);
PumpQueueLocked();
}
-void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const {
+void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const {
base::AutoLock lock(lock_);
state->BeginDictionary();
if (name_)
@@ -440,8 +466,8 @@ void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const {
}
// static
-void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
- base::trace_event::TracedValue* state) {
+void TaskQueueImpl::QueueAsValueInto(const base::TaskQueue& queue,
+ base::trace_event::TracedValue* state) {
base::TaskQueue queue_copy(queue);
while (!queue_copy.empty()) {
TaskAsValueInto(queue_copy.front(), state);
@@ -450,8 +476,8 @@ void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
}
// static
-void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue,
- base::trace_event::TracedValue* state) {
+void TaskQueueImpl::QueueAsValueInto(const base::DelayedTaskQueue& queue,
+ base::trace_event::TracedValue* state) {
base::DelayedTaskQueue queue_copy(queue);
while (!queue_copy.empty()) {
TaskAsValueInto(queue_copy.top(), state);
@@ -460,8 +486,8 @@ void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue,
}
// static
-void TaskQueue::TaskAsValueInto(const base::PendingTask& task,
- base::trace_event::TracedValue* state) {
+void TaskQueueImpl::TaskAsValueInto(const base::PendingTask& task,
+ base::trace_event::TracedValue* state) {
state->BeginDictionary();
state->SetString("posted_from", task.posted_from.ToString());
state->SetInteger("sequence_num", task.sequence_num);
@@ -498,10 +524,10 @@ TaskQueueManager::TaskQueueManager(
"TaskQueueManager", this);
for (size_t i = 0; i < task_queue_count; i++) {
- scoped_refptr<internal::TaskQueue> queue(make_scoped_refptr(
- new internal::TaskQueue(this,
- disabled_by_default_tracing_category,
- disabled_by_default_verbose_tracing_category)));
+ scoped_refptr<internal::TaskQueueImpl> queue(
+ make_scoped_refptr(new internal::TaskQueueImpl(
+ this, disabled_by_default_tracing_category,
+ disabled_by_default_verbose_tracing_category)));
queues_.push_back(queue);
}
@@ -525,13 +551,13 @@ TaskQueueManager::~TaskQueueManager() {
selector_->SetTaskQueueSelectorObserver(nullptr);
}
-internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const {
+internal::TaskQueueImpl* TaskQueueManager::Queue(size_t queue_index) const {
DCHECK_LT(queue_index, queues_.size());
return queues_[queue_index].get();
}
-scoped_refptr<base::SingleThreadTaskRunner>
-TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const {
+scoped_refptr<TaskQueue> TaskQueueManager::TaskRunnerForQueue(
+ size_t queue_index) const {
return Queue(queue_index);
}
@@ -570,20 +596,20 @@ base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() {
void TaskQueueManager::SetPumpPolicy(size_t queue_index,
PumpPolicy pump_policy) {
DCHECK(main_thread_checker_.CalledOnValidThread());
- internal::TaskQueue* queue = Queue(queue_index);
+ internal::TaskQueueImpl* queue = Queue(queue_index);
queue->SetPumpPolicy(pump_policy);
}
void TaskQueueManager::SetWakeupPolicy(size_t queue_index,
WakeupPolicy wakeup_policy) {
DCHECK(main_thread_checker_.CalledOnValidThread());
- internal::TaskQueue* queue = Queue(queue_index);
+ internal::TaskQueueImpl* queue = Queue(queue_index);
queue->set_wakeup_policy(wakeup_policy);
}
void TaskQueueManager::PumpQueue(size_t queue_index) {
DCHECK(main_thread_checker_.CalledOnValidThread());
- internal::TaskQueue* queue = Queue(queue_index);
+ internal::TaskQueueImpl* queue = Queue(queue_index);
queue->PumpQueue();
}
@@ -673,7 +699,7 @@ bool TaskQueueManager::ProcessTaskFromWorkQueue(
base::PendingTask* previous_task) {
DCHECK(main_thread_checker_.CalledOnValidThread());
scoped_refptr<DeletionSentinel> protect(deletion_sentinel_);
- internal::TaskQueue* queue = Queue(queue_index);
+ internal::TaskQueueImpl* queue = Queue(queue_index);
base::PendingTask pending_task = queue->TakeTaskFromWorkQueue();
task_was_run_bitmap_ |= UINT64_C(1) << queue_index;
if (!pending_task.nestable && main_task_runner_->IsNested()) {
@@ -713,13 +739,13 @@ bool TaskQueueManager::PostDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
- DCHECK(delay > base::TimeDelta());
+ DCHECK_GE(delay, base::TimeDelta());
return main_task_runner_->PostDelayedTask(from_here, task, delay);
}
void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
DCHECK(main_thread_checker_.CalledOnValidThread());
- internal::TaskQueue* queue = Queue(queue_index);
+ internal::TaskQueueImpl* queue = Queue(queue_index);
queue->set_name(name);
}

Powered by Google App Engine
This is Rietveld 408576698