Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Unified Diff: third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc

Issue 2546423002: [Try # 3] Scheduler refactoring to virtually eliminate redundant DoWorks (Closed)
Patch Set: Rebased Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
diff --git a/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc b/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
index 16dc9089c0f9fe4b5d58cad805e6aa62729d6d50..e42fee10c09e0aa1542725e07936804dca855cc6 100644
--- a/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
+++ b/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
@@ -75,12 +75,10 @@ TaskQueueManager::TaskQueueManager(
"TaskQueueManager", this);
selector_.SetTaskQueueSelectorObserver(this);
- from_main_thread_immediate_do_work_closure_ =
- base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(),
- base::TimeTicks(), true);
- from_other_thread_immediate_do_work_closure_ =
- base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(),
- base::TimeTicks(), false);
+ delayed_do_work_closure_ =
+ base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), true);
+ immediate_do_work_closure_ =
+ base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), false);
// TODO(alexclarke): Change this to be a parameter that's passed in.
RegisterTimeDomain(real_time_domain_.get());
@@ -100,7 +98,10 @@ TaskQueueManager::~TaskQueueManager() {
delegate_->RemoveNestingObserver(this);
}
-TaskQueueManager::AnyThread::AnyThread() : other_thread_pending_wakeup(false) {}
+TaskQueueManager::AnyThread::AnyThread()
+ : do_work_running_count(0),
+ immediate_do_work_posted_count(0),
+ is_nested(false) {}
void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) {
time_domains_.insert(time_domain);
@@ -155,14 +156,13 @@ void TaskQueueManager::UnregisterTaskQueue(
}
void TaskQueueManager::ReloadEmptyWorkQueues(
- const std::unordered_set<internal::TaskQueueImpl*>& queues_to_reload)
- const {
+ const IncomingImmediateWorkMap& queues_to_reload) const {
// There are two cases where a queue needs reloading. First, it might be
// completely empty and we've just posted a task (this method handles that
// case). Secondly if the work queue becomes empty in when calling
// WorkQueue::TakeTaskFromWorkQueue (handled there).
- for (internal::TaskQueueImpl* queue : queues_to_reload) {
- queue->ReloadImmediateWorkQueueIfEmpty();
+ for (const auto& pair : queues_to_reload) {
+ pair.first->ReloadImmediateWorkQueueIfEmpty();
}
}
@@ -183,59 +183,49 @@ void TaskQueueManager::WakeupReadyDelayedQueues(LazyNow* lazy_now) {
void TaskQueueManager::OnBeginNestedMessageLoop() {
// We just entered a nested message loop, make sure there's a DoWork posted or
// the system will grind to a halt.
- delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_);
+ {
+ base::AutoLock lock(any_thread_lock_);
+ any_thread().immediate_do_work_posted_count++;
+ any_thread().is_nested = true;
+ }
+ delegate_->PostTask(FROM_HERE, immediate_do_work_closure_);
}
void TaskQueueManager::OnQueueHasIncomingImmediateWork(
internal::TaskQueueImpl* queue,
+ internal::EnqueueOrder enqueue_order,
bool queue_is_blocked) {
- bool on_main_thread = delegate_->BelongsToCurrentThread();
-
- {
- base::AutoLock lock(any_thread_lock_);
- any_thread().has_incoming_immediate_work.insert(queue);
-
- if (queue_is_blocked)
- return;
-
- // De-duplicate DoWork posts.
- if (on_main_thread) {
- if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second)
- return;
- } else {
- if (any_thread().other_thread_pending_wakeup)
- return;
- any_thread().other_thread_pending_wakeup = true;
- }
- }
-
- if (on_main_thread) {
- delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_);
- } else {
- delegate_->PostTask(FROM_HERE,
- from_other_thread_immediate_do_work_closure_);
- }
+ MoveableAutoLock lock(any_thread_lock_);
+ any_thread().has_incoming_immediate_work.insert(
+ std::make_pair(queue, enqueue_order));
+ if (!queue_is_blocked)
+ MaybeScheduleImmediateWorkLocked(FROM_HERE, std::move(lock));
}
void TaskQueueManager::MaybeScheduleImmediateWork(
const tracked_objects::Location& from_here) {
- bool on_main_thread = delegate_->BelongsToCurrentThread();
- // De-duplicate DoWork posts.
- if (on_main_thread) {
- if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) {
+ MoveableAutoLock lock(any_thread_lock_);
+ MaybeScheduleImmediateWorkLocked(from_here, std::move(lock));
+}
+
+void TaskQueueManager::MaybeScheduleImmediateWorkLocked(
+ const tracked_objects::Location& from_here,
+ MoveableAutoLock&& lock) {
+ {
+ MoveableAutoLock auto_lock(std::move(lock));
+ // Unless we're nested, try to avoid posting redundant DoWorks.
+ if (!any_thread().is_nested &&
+ (any_thread().do_work_running_count == 1 ||
+ any_thread().immediate_do_work_posted_count > 0)) {
return;
}
- delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_);
- } else {
- {
- base::AutoLock lock(any_thread_lock_);
- if (any_thread().other_thread_pending_wakeup)
- return;
- any_thread().other_thread_pending_wakeup = true;
- }
- delegate_->PostTask(from_here,
- from_other_thread_immediate_do_work_closure_);
+
+ any_thread().immediate_do_work_posted_count++;
}
+
+ TRACE_EVENT0(disabled_by_default_tracing_category_,
+ "TaskQueueManager::MaybeScheduleImmediateWorkLocked::PostTask");
+ delegate_->PostTask(from_here, immediate_do_work_closure_);
}
void TaskQueueManager::MaybeScheduleDelayedWork(
@@ -244,54 +234,70 @@ void TaskQueueManager::MaybeScheduleDelayedWork(
base::TimeDelta delay) {
DCHECK(main_thread_checker_.CalledOnValidThread());
DCHECK_GE(delay, base::TimeDelta());
+ {
+ base::AutoLock lock(any_thread_lock_);
- // If there's a pending immediate DoWork then we rely on
- // TryAdvanceTimeDomains getting the TimeDomain to call
- // MaybeScheduleDelayedWork again when the immediate DoWork is complete.
- if (main_thread_pending_wakeups_.find(base::TimeTicks()) !=
- main_thread_pending_wakeups_.end()) {
- return;
+ // Unless we're nested, don't post a delayed DoWork if there's an immediate
+ // DoWork in flight or we're inside a DoWork. We can rely on DoWork posting
+ // a delayed continuation as needed.
+ if (!any_thread().is_nested &&
+ (any_thread().immediate_do_work_posted_count > 0 ||
+ any_thread().do_work_running_count == 1)) {
+ return;
+ }
}
+
// De-duplicate DoWork posts.
base::TimeTicks run_time = now + delay;
- if (!main_thread_pending_wakeups_.empty() &&
- *main_thread_pending_wakeups_.begin() <= run_time) {
+ if (next_scheduled_delayed_do_work_time_ <= run_time &&
+ !next_scheduled_delayed_do_work_time_.is_null())
return;
- }
- main_thread_pending_wakeups_.insert(run_time);
+
+ TRACE_EVENT1(disabled_by_default_tracing_category_,
+ "TaskQueueManager::MaybeScheduleDelayedWork::PostDelayedTask",
+ "delay_ms", delay.InMillisecondsF());
+
+ cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
+ next_scheduled_delayed_do_work_time_ = run_time;
delegate_->PostDelayedTask(
- from_here, base::Bind(&TaskQueueManager::DoWork,
- weak_factory_.GetWeakPtr(), run_time, true),
- delay);
+ from_here, cancelable_delayed_do_work_closure_.callback(), delay);
}
-void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
+void TaskQueueManager::DoWork(bool delayed) {
DCHECK(main_thread_checker_.CalledOnValidThread());
- TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork",
- "from_main_thread", from_main_thread);
-
- if (from_main_thread) {
- main_thread_pending_wakeups_.erase(run_time);
- } else {
- base::AutoLock lock(any_thread_lock_);
- any_thread().other_thread_pending_wakeup = false;
- }
-
- // Posting a DoWork while a DoWork is running leads to spurious DoWorks.
- main_thread_pending_wakeups_.insert(base::TimeTicks());
+ TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork", "delayed",
+ delayed);
+ LazyNow lazy_now(real_time_domain()->CreateLazyNow());
bool is_nested = delegate_->IsNested();
if (!is_nested)
queues_to_delete_.clear();
- LazyNow lazy_now(real_time_domain()->CreateLazyNow());
- WakeupReadyDelayedQueues(&lazy_now);
+ // This must be done before running any tasks because they could invoke a
+ // nested message loop and we risk having a stale
+ // |next_scheduled_delayed_do_work_time_|.
+ if (delayed)
+ next_scheduled_delayed_do_work_time_ = base::TimeTicks();
for (int i = 0; i < work_batch_size_; i++) {
- std::unordered_set<internal::TaskQueueImpl*> queues_to_reload;
+ IncomingImmediateWorkMap queues_to_reload;
{
base::AutoLock lock(any_thread_lock_);
+ if (i == 0) {
+ any_thread().do_work_running_count++;
+
+ if (!delayed) {
+ any_thread().immediate_do_work_posted_count--;
+ DCHECK_GE(any_thread().immediate_do_work_posted_count, 0);
+ }
+ } else {
+ // Ideally we'd have an OnNestedMessageloopExit observer, but in it's
+ // absence we may need to clear this flag after running a task (which
+ // ran a nested messageloop).
+ any_thread().is_nested = is_nested;
+ }
+ DCHECK_EQ(any_thread().is_nested, delegate_->IsNested());
std::swap(queues_to_reload, any_thread().has_incoming_immediate_work);
}
@@ -299,10 +305,13 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
// avoid a lock order inversion.
ReloadEmptyWorkQueues(queues_to_reload);
- internal::WorkQueue* work_queue;
+ WakeupReadyDelayedQueues(&lazy_now);
+
+ internal::WorkQueue* work_queue = nullptr;
if (!SelectWorkQueueToService(&work_queue))
break;
+ // NB this may unregister |work_queue|.
base::TimeTicks time_after_task;
switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now,
&time_after_task)) {
@@ -315,11 +324,8 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
return; // The TaskQueueManager got deleted, we must bail out.
}
- work_queue = nullptr; // The queue may have been unregistered.
-
lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow()
: LazyNow(time_after_task);
- WakeupReadyDelayedQueues(&lazy_now);
// Only run a single task per batch in nested run loops so that we can
// properly exit the nested loop when someone calls RunLoop::Quit().
@@ -327,45 +333,104 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
break;
}
- main_thread_pending_wakeups_.erase(base::TimeTicks());
-
// TODO(alexclarke): Consider refactoring the above loop to terminate only
// when there's no more work left to be done, rather than posting a
// continuation task.
- base::Optional<base::TimeDelta> next_delay =
- ComputeDelayTillNextTask(&lazy_now);
- if (!next_delay)
- return;
+ {
+ MoveableAutoLock lock(any_thread_lock_);
+ base::Optional<base::TimeDelta> next_delay =
+ ComputeDelayTillNextTaskLocked(&lazy_now);
- base::TimeDelta delay = next_delay.value();
- if (delay.is_zero()) {
- MaybeScheduleImmediateWork(FROM_HERE);
- } else {
- MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay);
+ any_thread().do_work_running_count--;
+ DCHECK_GE(any_thread().do_work_running_count, 0);
+
+ any_thread().is_nested = is_nested;
+ DCHECK_EQ(any_thread().is_nested, delegate_->IsNested());
+
+ PostDoWorkContinuationLocked(next_delay, &lazy_now, std::move(lock));
}
}
-base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask(
- LazyNow* lazy_now) {
+void TaskQueueManager::PostDoWorkContinuationLocked(
+ base::Optional<base::TimeDelta> next_delay,
+ LazyNow* lazy_now,
+ MoveableAutoLock&& lock) {
DCHECK(main_thread_checker_.CalledOnValidThread());
+ base::TimeDelta delay;
- std::unordered_set<internal::TaskQueueImpl*> queues_to_reload;
{
- base::AutoLock lock(any_thread_lock_);
- std::swap(queues_to_reload, any_thread().has_incoming_immediate_work);
+ MoveableAutoLock auto_lock(std::move(lock));
+
+ // If there are no tasks left then we don't need to post a continuation.
+ if (!next_delay) {
+ // If there's a pending delayed DoWork, cancel it because it's not needed.
+ if (!next_scheduled_delayed_do_work_time_.is_null()) {
+ next_scheduled_delayed_do_work_time_ = base::TimeTicks();
+ cancelable_delayed_do_work_closure_.Cancel();
+ }
+ return;
+ }
+
+ // If an immediate DoWork is posted, we don't need to post a continuation.
+ if (any_thread().immediate_do_work_posted_count > 0)
+ return;
+
+ delay = next_delay.value();
+
+ // This isn't supposed to happen, but in case it does convert to
+ // non-delayed.
+ if (delay < base::TimeDelta())
+ delay = base::TimeDelta();
+
+ if (delay.is_zero()) {
+ // If a delayed DoWork is pending then we don't need to post a
+ // continuation because it should run immediately.
+ if (!next_scheduled_delayed_do_work_time_.is_null() &&
+ next_scheduled_delayed_do_work_time_ <= lazy_now->Now()) {
+ return;
+ }
+
+ any_thread().immediate_do_work_posted_count++;
+ } else {
+ base::TimeTicks run_time = lazy_now->Now() + delay;
+ if (next_scheduled_delayed_do_work_time_ == run_time)
+ return;
+
+ next_scheduled_delayed_do_work_time_ = run_time;
+ }
+ }
+
+ // We avoid holding |any_thread_lock_| while posting the task.
+ if (delay.is_zero()) {
+ delegate_->PostTask(FROM_HERE, immediate_do_work_closure_);
+ } else {
+ cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
+ delegate_->PostDelayedTask(
+ FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay);
}
+}
- // It's important we call ReloadEmptyWorkQueues out side of the lock to
- // avoid a lock order inversion.
- ReloadEmptyWorkQueues(queues_to_reload);
+base::Optional<base::TimeDelta>
+TaskQueueManager::ComputeDelayTillNextTaskLocked(LazyNow* lazy_now) {
+ DCHECK(main_thread_checker_.CalledOnValidThread());
+
+ // Unfortunately because |any_thread_lock_| is held it's not safe to call
+ // ReloadEmptyWorkQueues here (possible lock order inversion), however this
+ // check is equavalent to calling ReloadEmptyWorkQueues first.
+ for (const auto& pair : any_thread().has_incoming_immediate_work) {
+ if (pair.first->CouldTaskRun(pair.second))
+ return base::TimeDelta();
+ }
// If the selector has non-empty queues we trivially know there is immediate
// work to be done.
if (!selector_.EnabledWorkQueuesEmpty())
return base::TimeDelta();
- // Otherwise we need to find the shortest delay, if any.
+ // Otherwise we need to find the shortest delay, if any. NB we don't need to
+ // call WakeupReadyDelayedQueues because it's assumed DelayTillNextTask will
+ // return base::TimeDelta>() if the delayed task is due to run now.
base::Optional<base::TimeDelta> next_continuation;
for (TimeDomain* time_domain : time_domains_) {
base::Optional<base::TimeDelta> continuation =
@@ -578,10 +643,15 @@ TaskQueueManager::AsValueWithSelectorResult(
state->EndArray();
{
base::AutoLock lock(any_thread_lock_);
+ state->SetBoolean("is_nested", any_thread().is_nested);
+ state->SetInteger("do_work_running_count",
+ any_thread().do_work_running_count);
+ state->SetInteger("immediate_do_work_posted_count",
+ any_thread().immediate_do_work_posted_count);
+
state->BeginArray("has_incoming_immediate_work");
- for (internal::TaskQueueImpl* task_queue :
- any_thread().has_incoming_immediate_work) {
- state->AppendString(task_queue->GetName());
+ for (const auto& pair : any_thread().has_incoming_immediate_work) {
+ state->AppendString(pair.first->GetName());
}
state->EndArray();
}

Powered by Google App Engine
This is Rietveld 408576698