Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(994)

Unified Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 1008693004: Handle delayed tasks more natively in the scheduler (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Responding to feedback again Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/renderer/scheduler/task_queue_manager.cc
diff --git a/content/renderer/scheduler/task_queue_manager.cc b/content/renderer/scheduler/task_queue_manager.cc
index 565aece83c5d1fb909da95aa011abe296315561d..97fbec44183636231178f421f8f60f5d8b81d3ec 100644
--- a/content/renderer/scheduler/task_queue_manager.cc
+++ b/content/renderer/scheduler/task_queue_manager.cc
@@ -5,6 +5,7 @@
#include "content/renderer/scheduler/task_queue_manager.h"
#include <queue>
+#include <set>
#include "base/bind.h"
#include "base/trace_event/trace_event.h"
@@ -20,6 +21,29 @@ const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
namespace content {
namespace internal {
+// Now() is somewhat expensive so it makes sense not to call Now() unless we
+// really need to.
+class LazyNow {
+ public:
+ explicit LazyNow(base::TimeTicks now)
+ : task_queue_manager_(nullptr), now_(now) {
+ DCHECK(!now.is_null());
+ }
+
+ explicit LazyNow(TaskQueueManager* task_queue_manager)
+ : task_queue_manager_(task_queue_manager) {}
+
+ base::TimeTicks Now() {
+ if (now_.is_null())
+ now_ = task_queue_manager_->Now();
+ return now_;
+ }
+
+ private:
+ TaskQueueManager* task_queue_manager_; // NOT OWNED
+ base::TimeTicks now_;
+};
+
class TaskQueue : public base::SingleThreadTaskRunner {
public:
TaskQueue(TaskQueueManager* task_queue_manager);
@@ -46,7 +70,7 @@ class TaskQueue : public base::SingleThreadTaskRunner {
bool NextPendingDelayedTaskRunTime(
base::TimeTicks* next_pending_delayed_task);
- bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task,
+ bool UpdateWorkQueue(LazyNow* lazy_now,
const base::PendingTask* previous_task);
base::PendingTask TakeTaskFromWorkQueue();
@@ -71,37 +95,47 @@ class TaskQueue : public base::SingleThreadTaskRunner {
base::TimeDelta delay,
TaskType task_type);
- // Adds a task at the end of the incoming task queue and schedules a call to
- // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
- // pumping is enabled. Can be called on an arbitrary thread.
- void EnqueueTask(const base::PendingTask& pending_task);
+ // Delayed task posted to the underlying run loop, which locks |lock_| and
+ // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks
+ // that need to be run now.
+ void MoveReadyDelayedTasksToIncomingQueue();
+
+ // Enqueues any delayed tasks which should be run now on the incoming_queue_
+ // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled.
+ // Must be called with |lock_| locked.
+ void MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now);
+
+ // Posts MoveReadyDelayedTasksToIncomingQueue if there isn't already a task
+ // posted on the underlying runloop for the next task's scheduled run time.
+ void ScheduleDelayedWorkLocked(LazyNow* lazy_now);
void PumpQueueLocked();
bool TaskIsOlderThanQueuedTasks(const base::PendingTask* task);
bool ShouldAutoPumpQueueLocked(const base::PendingTask* previous_task);
void EnqueueTaskLocked(const base::PendingTask& pending_task);
- bool NextPendingDelayedTaskRunTimeLocked(
- base::TimeTicks* next_pending_delayed_task);
void TraceQueueSize(bool is_locked) const;
static const char* PumpPolicyToString(
TaskQueueManager::PumpPolicy pump_policy);
static void QueueAsValueInto(const base::TaskQueue& queue,
base::trace_event::TracedValue* state);
+ static void QueueAsValueInto(const base::DelayedTaskQueue& queue,
+ base::trace_event::TracedValue* state);
static void TaskAsValueInto(const base::PendingTask& task,
base::trace_event::TracedValue* state);
- // This lock protects all members except the work queue.
+ // This lock protects all members except the work queue and the
+ // main_thread_checker_.
mutable base::Lock lock_;
base::PlatformThreadId thread_id_;
TaskQueueManager* task_queue_manager_;
base::TaskQueue incoming_queue_;
TaskQueueManager::PumpPolicy pump_policy_;
const char* name_;
- std::priority_queue<base::TimeTicks,
- std::vector<base::TimeTicks>,
- std::greater<base::TimeTicks>> delayed_task_run_times_;
+ base::DelayedTaskQueue delayed_task_queue_;
+ std::set<base::TimeTicks> in_flight_kick_delayed_tasks_;
+ base::ThreadChecker main_thread_checker_;
base::TaskQueue work_queue_;
DISALLOW_COPY_AND_ASSIGN(TaskQueue);
@@ -120,6 +154,8 @@ TaskQueue::~TaskQueue() {
void TaskQueue::WillDeleteTaskQueueManager() {
base::AutoLock lock(lock_);
task_queue_manager_ = nullptr;
+ // TODO(scheduler-dev): Should we also clear the other queues here too?
+ delayed_task_queue_ = base::DelayedTaskQueue();
}
bool TaskQueue::RunsTasksOnCurrentThread() const {
@@ -140,15 +176,66 @@ bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
task_queue_manager_->DidQueueTask(&pending_task);
if (delay > base::TimeDelta()) {
- pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
- delayed_task_run_times_.push(pending_task.delayed_run_time);
- return task_queue_manager_->PostDelayedTask(
- FROM_HERE, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
+ base::TimeTicks now = task_queue_manager_->Now();
+ pending_task.delayed_run_time = now + delay;
+ delayed_task_queue_.push(pending_task);
+ TraceQueueSize(true);
+ // If we changed the topmost task, then it is time to reschedule.
+ if (delayed_task_queue_.top().task.Equals(pending_task.task)) {
+ LazyNow lazy_now(now);
+ ScheduleDelayedWorkLocked(&lazy_now);
+ }
+ return true;
}
EnqueueTaskLocked(pending_task);
return true;
}
+void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() {
+ DCHECK(main_thread_checker_.CalledOnValidThread());
+ base::AutoLock lock(lock_);
+ if (!task_queue_manager_)
+ return;
+
+ LazyNow lazy_now(task_queue_manager_);
+ MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
+}
+
+void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) {
+ lock_.AssertAcquired();
+ // Enqueue all delayed tasks that should be running now.
+ while (!delayed_task_queue_.empty() &&
+ delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) {
+ in_flight_kick_delayed_tasks_.erase(
+ delayed_task_queue_.top().delayed_run_time);
+ EnqueueTaskLocked(delayed_task_queue_.top());
+ delayed_task_queue_.pop();
+ }
+ TraceQueueSize(true);
+ ScheduleDelayedWorkLocked(lazy_now);
+}
+
+void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
+ lock_.AssertAcquired();
+ // Any remaining tasks are in the future, so queue a task to kick them.
+ if (!delayed_task_queue_.empty()) {
+ base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time;
+ DCHECK_GT(next_run_time, lazy_now->Now());
+ // Make sure we don't have more than one
+ // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
+ // run time (note it's fine to have multiple ones in flight for distinct
+ // run times).
+ if (in_flight_kick_delayed_tasks_.find(next_run_time) ==
+ in_flight_kick_delayed_tasks_.end()) {
+ in_flight_kick_delayed_tasks_.insert(next_run_time);
+ base::TimeDelta delay = next_run_time - lazy_now->Now();
+ task_queue_manager_->PostDelayedTask(
+ FROM_HERE,
+ Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay);
+ }
+ }
+}
+
bool TaskQueue::IsQueueEmpty() const {
if (!work_queue_.empty())
return false;
@@ -197,31 +284,23 @@ bool TaskQueue::ShouldAutoPumpQueueLocked(
bool TaskQueue::NextPendingDelayedTaskRunTime(
base::TimeTicks* next_pending_delayed_task) {
base::AutoLock lock(lock_);
- return NextPendingDelayedTaskRunTimeLocked(next_pending_delayed_task);
-}
-
-bool TaskQueue::NextPendingDelayedTaskRunTimeLocked(
- base::TimeTicks* next_pending_delayed_task) {
- lock_.AssertAcquired();
- if (!delayed_task_run_times_.empty()) {
- *next_pending_delayed_task =
- std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
- return true;
- }
- return false;
+ if (delayed_task_queue_.empty())
+ return false;
+ *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time;
Sami 2015/03/18 12:26:43 It's not obvious from the function's name, but I t
alex clarke (OOO till 29th) 2015/03/18 12:38:32 Did it in the caller instead since it's kind of su
+ return true;
}
-bool TaskQueue::UpdateWorkQueue(
- base::TimeTicks* next_pending_delayed_task,
- const base::PendingTask* previous_task) {
+bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now,
+ const base::PendingTask* previous_task) {
if (!work_queue_.empty())
return true;
{
base::AutoLock lock(lock_);
- NextPendingDelayedTaskRunTimeLocked(next_pending_delayed_task);
+ MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
if (!ShouldAutoPumpQueueLocked(previous_task))
return false;
+ MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
work_queue_.Swap(&incoming_queue_);
TraceQueueSize(true);
return true;
@@ -245,17 +324,13 @@ void TaskQueue::TraceQueueSize(bool is_locked) const {
lock_.Acquire();
else
lock_.AssertAcquired();
- TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
- incoming_queue_.size() + work_queue_.size());
+ TRACE_COUNTER1(
+ TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
+ incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size());
if (!is_locked)
lock_.Release();
}
-void TaskQueue::EnqueueTask(const base::PendingTask& pending_task) {
- base::AutoLock lock(lock_);
- EnqueueTaskLocked(pending_task);
-}
-
void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
lock_.AssertAcquired();
if (!task_queue_manager_)
@@ -266,11 +341,6 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
incoming_queue_.push(pending_task);
if (!pending_task.delayed_run_time.is_null()) {
- // Update the time of the next pending delayed task.
- while (!delayed_task_run_times_.empty() &&
- delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
- delayed_task_run_times_.pop();
- }
// Clear the delayed run time because we've already applied the delay
// before getting here.
incoming_queue_.back().delayed_run_time = base::TimeTicks();
@@ -289,6 +359,10 @@ void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
void TaskQueue::PumpQueueLocked() {
lock_.AssertAcquired();
+ if (task_queue_manager_) {
+ LazyNow lazy_now(task_queue_manager_);
+ MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
+ }
while (!incoming_queue_.empty()) {
work_queue_.push(incoming_queue_.front());
incoming_queue_.pop();
@@ -313,6 +387,8 @@ void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const {
state->EndArray();
state->BeginArray("work_queue");
QueueAsValueInto(work_queue_, state);
+ state->BeginArray("delayed_task_queue");
+ QueueAsValueInto(delayed_task_queue_, state);
state->EndArray();
state->EndDictionary();
}
@@ -344,6 +420,16 @@ void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
}
// static
+void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue,
+ base::trace_event::TracedValue* state) {
+ base::DelayedTaskQueue queue_copy(queue);
+ while (!queue_copy.empty()) {
+ TaskAsValueInto(queue_copy.top(), state);
+ queue_copy.pop();
+ }
+}
+
+// static
void TaskQueue::TaskAsValueInto(const base::PendingTask& task,
base::trace_event::TracedValue* state) {
state->BeginDictionary();
@@ -442,16 +528,15 @@ void TaskQueueManager::PumpQueue(size_t queue_index) {
}
bool TaskQueueManager::UpdateWorkQueues(
- base::TimeTicks* next_pending_delayed_task,
const base::PendingTask* previous_task) {
// TODO(skyostil): This is not efficient when the number of queues grows very
// large due to the number of locks taken. Consider optimizing when we get
// there.
DCHECK(main_thread_checker_.CalledOnValidThread());
+ internal::LazyNow lazy_now(this);
bool has_work = false;
for (auto& queue : queues_) {
- has_work |= queue->UpdateWorkQueue(next_pending_delayed_task,
- previous_task);
+ has_work |= queue->UpdateWorkQueue(&lazy_now, previous_task);
if (!queue->work_queue().empty()) {
// Currently we should not be getting tasks with delayed run times in any
// of the work queues.
@@ -484,22 +569,14 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) {
}
DCHECK(main_thread_checker_.CalledOnValidThread());
- base::TimeTicks next_pending_delayed_task(
- base::TimeTicks::FromInternalValue(kMaxTimeTicks));
-
- // Pass nullptr to UpdateWorkQueues here to prevent waking up an
+ // Pass nullptr to UpdateWorkQueues here to prevent waking up a
// pump-after-wakeup queue.
- if (!UpdateWorkQueues(&next_pending_delayed_task, nullptr))
+ if (!UpdateWorkQueues(nullptr))
return;
base::PendingTask previous_task((tracked_objects::Location()),
(base::Closure()));
for (int i = 0; i < work_batch_size_; i++) {
- // Interrupt the work batch if we should run the next delayed task.
- if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
- Now() >= next_pending_delayed_task)
- return;
-
size_t queue_index;
if (!SelectWorkQueueToService(&queue_index))
return;
@@ -508,7 +585,7 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) {
MaybePostDoWorkOnMainRunner();
ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task);
- if (!UpdateWorkQueues(&next_pending_delayed_task, &previous_task))
+ if (!UpdateWorkQueues(&previous_task))
return;
}
}

Powered by Google App Engine
This is Rietveld 408576698