Chromium Code Reviews| Index: components/scheduler/child/task_queue_manager.cc |
| diff --git a/components/scheduler/child/task_queue_manager.cc b/components/scheduler/child/task_queue_manager.cc |
| index 620ede55ed35563d6e4c42a9f11fabf2d8e483ae..4e55a89bceb828c89f642f6f6e033bb1fda8ef23 100644 |
| --- a/components/scheduler/child/task_queue_manager.cc |
| +++ b/components/scheduler/child/task_queue_manager.cc |
| @@ -12,6 +12,7 @@ |
| #include "base/trace_event/trace_event.h" |
| #include "base/trace_event/trace_event_argument.h" |
| #include "components/scheduler/child/nestable_single_thread_task_runner.h" |
| +#include "components/scheduler/child/task_queue.h" |
| #include "components/scheduler/child/task_queue_selector.h" |
| namespace { |
| @@ -44,24 +45,32 @@ class LazyNow { |
| base::TimeTicks now_; |
| }; |
| -class TaskQueue : public base::SingleThreadTaskRunner { |
| +class TaskQueueImpl : public TaskQueue { |
| public: |
| - TaskQueue(TaskQueueManager* task_queue_manager, |
| - const char* disabled_by_default_tracing_category, |
| - const char* disabled_by_default_verbose_tracing_category); |
| + TaskQueueImpl(TaskQueueManager* task_queue_manager, |
| + const char* disabled_by_default_tracing_category, |
| + const char* disabled_by_default_verbose_tracing_category); |
| - // base::SingleThreadTaskRunner implementation. |
| + // TaskQueue :implementation. |
| bool RunsTasksOnCurrentThread() const override; |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay) override { |
| - return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); |
| + return PostDelayedTaskImpl(from_here, task, delay, base::TimeTicks(), |
| + TaskType::NORMAL); |
| } |
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay) override { |
| - return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE); |
| + return PostDelayedTaskImpl(from_here, task, delay, base::TimeTicks(), |
| + TaskType::NON_NESTABLE); |
| + } |
| + bool PostDelayedTaskAt(const tracked_objects::Location& from_here, |
| + const base::Closure& task, |
| + base::TimeTicks desired_run_time) override { |
| + return PostDelayedTaskImpl(from_here, task, base::TimeDelta(), |
| + desired_run_time, TaskType::NORMAL); |
| } |
| TaskQueueManager::QueueState GetQueueState() const; |
| @@ -101,11 +110,12 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| NON_NESTABLE, |
| }; |
| - ~TaskQueue() override; |
| + ~TaskQueueImpl() override; |
| bool PostDelayedTaskImpl(const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay, |
|
alex clarke (OOO till 29th)
2015/07/16 09:29:00
It feels a bit weird having both a delay and a des
Sami
2015/07/16 12:19:32
Yep I struggled with this interface a bit. The loc
|
| + base::TimeTicks desired_run_time, |
| TaskType task_type); |
| // Delayed task posted to the underlying run loop, which locks |lock_| and |
| @@ -149,6 +159,11 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| const char* name_; |
| const char* disabled_by_default_tracing_category_; |
| const char* disabled_by_default_verbose_tracing_category_; |
| + |
| + // Queue-local task sequence number for maintaining the order of delayed |
| + // tasks which are posted for the exact same time. Note that this will be |
| + // replaced by the global sequence number when the delay has elapsed. |
| + int delayed_task_sequence_number_; |
| base::DelayedTaskQueue delayed_task_queue_; |
| std::set<base::TimeTicks> in_flight_kick_delayed_tasks_; |
| @@ -156,12 +171,13 @@ class TaskQueue : public base::SingleThreadTaskRunner { |
| base::TaskQueue work_queue_; |
| TaskQueueManager::WakeupPolicy wakeup_policy_; |
| - DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
| + DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl); |
| }; |
| -TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager, |
| - const char* disabled_by_default_tracing_category, |
| - const char* disabled_by_default_verbose_tracing_category) |
| +TaskQueueImpl::TaskQueueImpl( |
| + TaskQueueManager* task_queue_manager, |
| + const char* disabled_by_default_tracing_category, |
| + const char* disabled_by_default_verbose_tracing_category) |
| : thread_id_(base::PlatformThread::CurrentId()), |
| task_queue_manager_(task_queue_manager), |
| pump_policy_(TaskQueueManager::PumpPolicy::AUTO), |
| @@ -170,13 +186,14 @@ TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager, |
| disabled_by_default_tracing_category), |
| disabled_by_default_verbose_tracing_category_( |
| disabled_by_default_verbose_tracing_category), |
| + delayed_task_sequence_number_(0), |
| wakeup_policy_(TaskQueueManager::WakeupPolicy::CAN_WAKE_OTHER_QUEUES) { |
| } |
| -TaskQueue::~TaskQueue() { |
| +TaskQueueImpl::~TaskQueueImpl() { |
| } |
| -void TaskQueue::WillDeleteTaskQueueManager() { |
| +void TaskQueueImpl::WillDeleteTaskQueueManager() { |
| base::AutoLock lock(lock_); |
| task_queue_manager_ = nullptr; |
| delayed_task_queue_ = base::DelayedTaskQueue(); |
| @@ -184,15 +201,17 @@ void TaskQueue::WillDeleteTaskQueueManager() { |
| work_queue_ = base::TaskQueue(); |
| } |
| -bool TaskQueue::RunsTasksOnCurrentThread() const { |
| +bool TaskQueueImpl::RunsTasksOnCurrentThread() const { |
| base::AutoLock lock(lock_); |
| return base::PlatformThread::CurrentId() == thread_id_; |
| } |
| -bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, |
| - const base::Closure& task, |
| - base::TimeDelta delay, |
| - TaskType task_type) { |
| +bool TaskQueueImpl::PostDelayedTaskImpl( |
| + const tracked_objects::Location& from_here, |
| + const base::Closure& task, |
| + base::TimeDelta delay, |
| + base::TimeTicks desired_run_time, |
| + TaskType task_type) { |
| base::AutoLock lock(lock_); |
| if (!task_queue_manager_) |
| return false; |
| @@ -201,9 +220,14 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, |
| task_type != TaskType::NON_NESTABLE); |
| task_queue_manager_->DidQueueTask(pending_task); |
| - if (delay > base::TimeDelta()) { |
| + if (delay > base::TimeDelta() || !desired_run_time.is_null()) { |
| base::TimeTicks now = task_queue_manager_->Now(); |
| - pending_task.delayed_run_time = now + delay; |
| + if (!desired_run_time.is_null()) { |
| + pending_task.delayed_run_time = std::max(now, desired_run_time); |
|
alex clarke (OOO till 29th)
2015/07/16 09:29:00
Should we just enqueue the task if desired runtime
Sami
2015/07/16 12:19:32
I think that might change the ordering. For exampl
|
| + } else { |
| + pending_task.delayed_run_time = now + delay; |
| + } |
| + pending_task.sequence_num = delayed_task_sequence_number_++; |
| delayed_task_queue_.push(pending_task); |
| TraceQueueSize(true); |
| // If we changed the topmost task, then it is time to reschedule. |
| @@ -217,7 +241,7 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, |
| return true; |
| } |
| -void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { |
| +void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue() { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| base::AutoLock lock(lock_); |
| if (!task_queue_manager_) |
| @@ -227,7 +251,8 @@ void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { |
| MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); |
| } |
| -void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { |
| +void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked( |
| + LazyNow* lazy_now) { |
| lock_.AssertAcquired(); |
| // Enqueue all delayed tasks that should be running now. |
| while (!delayed_task_queue_.empty() && |
| @@ -241,12 +266,12 @@ void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { |
| ScheduleDelayedWorkLocked(lazy_now); |
| } |
| -void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { |
| +void TaskQueueImpl::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { |
| lock_.AssertAcquired(); |
| // Any remaining tasks are in the future, so queue a task to kick them. |
| if (!delayed_task_queue_.empty()) { |
| base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time; |
| - DCHECK_GT(next_run_time, lazy_now->Now()); |
| + DCHECK_GE(next_run_time, lazy_now->Now()); |
| // Make sure we don't have more than one |
| // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled |
| // run time (note it's fine to have multiple ones in flight for distinct |
| @@ -257,12 +282,13 @@ void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { |
| base::TimeDelta delay = next_run_time - lazy_now->Now(); |
| task_queue_manager_->PostDelayedTask( |
| FROM_HERE, |
| - Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay); |
| + Bind(&TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue, this), |
| + delay); |
| } |
| } |
| } |
| -TaskQueueManager::QueueState TaskQueue::GetQueueState() const { |
| +TaskQueueManager::QueueState TaskQueueImpl::GetQueueState() const { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| if (!work_queue_.empty()) |
| return TaskQueueManager::QueueState::HAS_WORK; |
| @@ -277,7 +303,7 @@ TaskQueueManager::QueueState TaskQueue::GetQueueState() const { |
| } |
| } |
| -bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { |
| +bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { |
| lock_.AssertAcquired(); |
| // A null task is passed when UpdateQueue is called before any task is run. |
| // In this case we don't want to pump an after_wakeup queue, so return true |
| @@ -299,7 +325,7 @@ bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { |
| return oldest_queued_task < *task; |
| } |
| -bool TaskQueue::ShouldAutoPumpQueueLocked( |
| +bool TaskQueueImpl::ShouldAutoPumpQueueLocked( |
| bool should_trigger_wakeup, |
| const base::PendingTask* previous_task) { |
| lock_.AssertAcquired(); |
| @@ -313,7 +339,7 @@ bool TaskQueue::ShouldAutoPumpQueueLocked( |
| return true; |
| } |
| -bool TaskQueue::NextPendingDelayedTaskRunTime( |
| +bool TaskQueueImpl::NextPendingDelayedTaskRunTime( |
| base::TimeTicks* next_pending_delayed_task) { |
| base::AutoLock lock(lock_); |
| if (delayed_task_queue_.empty()) |
| @@ -322,9 +348,9 @@ bool TaskQueue::NextPendingDelayedTaskRunTime( |
| return true; |
| } |
| -bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, |
| - bool should_trigger_wakeup, |
| - const base::PendingTask* previous_task) { |
| +bool TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now, |
| + bool should_trigger_wakeup, |
| + const base::PendingTask* previous_task) { |
| if (!work_queue_.empty()) |
| return true; |
| @@ -339,14 +365,14 @@ bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, |
| } |
| } |
| -base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { |
| +base::PendingTask TaskQueueImpl::TakeTaskFromWorkQueue() { |
| base::PendingTask pending_task = work_queue_.front(); |
| work_queue_.pop(); |
| TraceQueueSize(false); |
| return pending_task; |
| } |
| -void TaskQueue::TraceQueueSize(bool is_locked) const { |
| +void TaskQueueImpl::TraceQueueSize(bool is_locked) const { |
| bool is_tracing; |
| TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, |
| &is_tracing); |
| @@ -363,7 +389,7 @@ void TaskQueue::TraceQueueSize(bool is_locked) const { |
| lock_.Release(); |
| } |
| -void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
| +void TaskQueueImpl::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
| lock_.AssertAcquired(); |
| if (!task_queue_manager_) |
| return; |
| @@ -382,7 +408,7 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { |
| TraceQueueSize(true); |
| } |
| -void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { |
| +void TaskQueueImpl::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { |
| base::AutoLock lock(lock_); |
| if (pump_policy == TaskQueueManager::PumpPolicy::AUTO && |
| pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) { |
| @@ -391,7 +417,7 @@ void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { |
| pump_policy_ = pump_policy; |
| } |
| -void TaskQueue::PumpQueueLocked() { |
| +void TaskQueueImpl::PumpQueueLocked() { |
| lock_.AssertAcquired(); |
| if (task_queue_manager_) { |
| LazyNow lazy_now(task_queue_manager_); |
| @@ -405,12 +431,12 @@ void TaskQueue::PumpQueueLocked() { |
| task_queue_manager_->MaybePostDoWorkOnMainRunner(); |
| } |
| -void TaskQueue::PumpQueue() { |
| +void TaskQueueImpl::PumpQueue() { |
| base::AutoLock lock(lock_); |
| PumpQueueLocked(); |
| } |
| -void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { |
| +void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const { |
| base::AutoLock lock(lock_); |
| state->BeginDictionary(); |
| if (name_) |
| @@ -440,8 +466,8 @@ void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { |
| } |
| // static |
| -void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, |
| - base::trace_event::TracedValue* state) { |
| +void TaskQueueImpl::QueueAsValueInto(const base::TaskQueue& queue, |
| + base::trace_event::TracedValue* state) { |
| base::TaskQueue queue_copy(queue); |
| while (!queue_copy.empty()) { |
| TaskAsValueInto(queue_copy.front(), state); |
| @@ -450,8 +476,8 @@ void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, |
| } |
| // static |
| -void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, |
| - base::trace_event::TracedValue* state) { |
| +void TaskQueueImpl::QueueAsValueInto(const base::DelayedTaskQueue& queue, |
| + base::trace_event::TracedValue* state) { |
| base::DelayedTaskQueue queue_copy(queue); |
| while (!queue_copy.empty()) { |
| TaskAsValueInto(queue_copy.top(), state); |
| @@ -460,8 +486,8 @@ void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, |
| } |
| // static |
| -void TaskQueue::TaskAsValueInto(const base::PendingTask& task, |
| - base::trace_event::TracedValue* state) { |
| +void TaskQueueImpl::TaskAsValueInto(const base::PendingTask& task, |
| + base::trace_event::TracedValue* state) { |
| state->BeginDictionary(); |
| state->SetString("posted_from", task.posted_from.ToString()); |
| state->SetInteger("sequence_num", task.sequence_num); |
| @@ -498,10 +524,10 @@ TaskQueueManager::TaskQueueManager( |
| "TaskQueueManager", this); |
| for (size_t i = 0; i < task_queue_count; i++) { |
| - scoped_refptr<internal::TaskQueue> queue(make_scoped_refptr( |
| - new internal::TaskQueue(this, |
| - disabled_by_default_tracing_category, |
| - disabled_by_default_verbose_tracing_category))); |
| + scoped_refptr<internal::TaskQueueImpl> queue( |
| + make_scoped_refptr(new internal::TaskQueueImpl( |
| + this, disabled_by_default_tracing_category, |
| + disabled_by_default_verbose_tracing_category))); |
| queues_.push_back(queue); |
| } |
| @@ -525,13 +551,13 @@ TaskQueueManager::~TaskQueueManager() { |
| selector_->SetTaskQueueSelectorObserver(nullptr); |
| } |
| -internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const { |
| +internal::TaskQueueImpl* TaskQueueManager::Queue(size_t queue_index) const { |
| DCHECK_LT(queue_index, queues_.size()); |
| return queues_[queue_index].get(); |
| } |
| -scoped_refptr<base::SingleThreadTaskRunner> |
| -TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const { |
| +scoped_refptr<TaskQueue> TaskQueueManager::TaskRunnerForQueue( |
| + size_t queue_index) const { |
| return Queue(queue_index); |
| } |
| @@ -570,20 +596,20 @@ base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() { |
| void TaskQueueManager::SetPumpPolicy(size_t queue_index, |
| PumpPolicy pump_policy) { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| - internal::TaskQueue* queue = Queue(queue_index); |
| + internal::TaskQueueImpl* queue = Queue(queue_index); |
| queue->SetPumpPolicy(pump_policy); |
| } |
| void TaskQueueManager::SetWakeupPolicy(size_t queue_index, |
| WakeupPolicy wakeup_policy) { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| - internal::TaskQueue* queue = Queue(queue_index); |
| + internal::TaskQueueImpl* queue = Queue(queue_index); |
| queue->set_wakeup_policy(wakeup_policy); |
| } |
| void TaskQueueManager::PumpQueue(size_t queue_index) { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| - internal::TaskQueue* queue = Queue(queue_index); |
| + internal::TaskQueueImpl* queue = Queue(queue_index); |
| queue->PumpQueue(); |
| } |
| @@ -673,7 +699,7 @@ bool TaskQueueManager::ProcessTaskFromWorkQueue( |
| base::PendingTask* previous_task) { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| scoped_refptr<DeletionSentinel> protect(deletion_sentinel_); |
| - internal::TaskQueue* queue = Queue(queue_index); |
| + internal::TaskQueueImpl* queue = Queue(queue_index); |
| base::PendingTask pending_task = queue->TakeTaskFromWorkQueue(); |
| task_was_run_bitmap_ |= UINT64_C(1) << queue_index; |
| if (!pending_task.nestable && main_task_runner_->IsNested()) { |
| @@ -713,13 +739,13 @@ bool TaskQueueManager::PostDelayedTask( |
| const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay) { |
| - DCHECK(delay > base::TimeDelta()); |
| + DCHECK_GE(delay, base::TimeDelta()); |
| return main_task_runner_->PostDelayedTask(from_here, task, delay); |
| } |
| void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { |
| DCHECK(main_thread_checker_.CalledOnValidThread()); |
| - internal::TaskQueue* queue = Queue(queue_index); |
| + internal::TaskQueueImpl* queue = Queue(queue_index); |
| queue->set_name(name); |
| } |