| Index: third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
|
| diff --git a/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc b/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
|
| index 16dc9089c0f9fe4b5d58cad805e6aa62729d6d50..e42fee10c09e0aa1542725e07936804dca855cc6 100644
|
| --- a/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
|
| +++ b/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc
|
| @@ -75,12 +75,10 @@ TaskQueueManager::TaskQueueManager(
|
| "TaskQueueManager", this);
|
| selector_.SetTaskQueueSelectorObserver(this);
|
|
|
| - from_main_thread_immediate_do_work_closure_ =
|
| - base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(),
|
| - base::TimeTicks(), true);
|
| - from_other_thread_immediate_do_work_closure_ =
|
| - base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(),
|
| - base::TimeTicks(), false);
|
| + delayed_do_work_closure_ =
|
| + base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), true);
|
| + immediate_do_work_closure_ =
|
| + base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), false);
|
|
|
| // TODO(alexclarke): Change this to be a parameter that's passed in.
|
| RegisterTimeDomain(real_time_domain_.get());
|
| @@ -100,7 +98,10 @@ TaskQueueManager::~TaskQueueManager() {
|
| delegate_->RemoveNestingObserver(this);
|
| }
|
|
|
| -TaskQueueManager::AnyThread::AnyThread() : other_thread_pending_wakeup(false) {}
|
| +TaskQueueManager::AnyThread::AnyThread()
|
| + : do_work_running_count(0),
|
| + immediate_do_work_posted_count(0),
|
| + is_nested(false) {}
|
|
|
| void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) {
|
| time_domains_.insert(time_domain);
|
| @@ -155,14 +156,13 @@ void TaskQueueManager::UnregisterTaskQueue(
|
| }
|
|
|
| void TaskQueueManager::ReloadEmptyWorkQueues(
|
| - const std::unordered_set<internal::TaskQueueImpl*>& queues_to_reload)
|
| - const {
|
| + const IncomingImmediateWorkMap& queues_to_reload) const {
|
| // There are two cases where a queue needs reloading. First, it might be
|
| // completely empty and we've just posted a task (this method handles that
|
| // case). Secondly if the work queue becomes empty in when calling
|
| // WorkQueue::TakeTaskFromWorkQueue (handled there).
|
| - for (internal::TaskQueueImpl* queue : queues_to_reload) {
|
| - queue->ReloadImmediateWorkQueueIfEmpty();
|
| + for (const auto& pair : queues_to_reload) {
|
| + pair.first->ReloadImmediateWorkQueueIfEmpty();
|
| }
|
| }
|
|
|
| @@ -183,59 +183,49 @@ void TaskQueueManager::WakeupReadyDelayedQueues(LazyNow* lazy_now) {
|
| void TaskQueueManager::OnBeginNestedMessageLoop() {
|
| // We just entered a nested message loop, make sure there's a DoWork posted or
|
| // the system will grind to a halt.
|
| - delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_);
|
| + {
|
| + base::AutoLock lock(any_thread_lock_);
|
| + any_thread().immediate_do_work_posted_count++;
|
| + any_thread().is_nested = true;
|
| + }
|
| + delegate_->PostTask(FROM_HERE, immediate_do_work_closure_);
|
| }
|
|
|
| void TaskQueueManager::OnQueueHasIncomingImmediateWork(
|
| internal::TaskQueueImpl* queue,
|
| + internal::EnqueueOrder enqueue_order,
|
| bool queue_is_blocked) {
|
| - bool on_main_thread = delegate_->BelongsToCurrentThread();
|
| -
|
| - {
|
| - base::AutoLock lock(any_thread_lock_);
|
| - any_thread().has_incoming_immediate_work.insert(queue);
|
| -
|
| - if (queue_is_blocked)
|
| - return;
|
| -
|
| - // De-duplicate DoWork posts.
|
| - if (on_main_thread) {
|
| - if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second)
|
| - return;
|
| - } else {
|
| - if (any_thread().other_thread_pending_wakeup)
|
| - return;
|
| - any_thread().other_thread_pending_wakeup = true;
|
| - }
|
| - }
|
| -
|
| - if (on_main_thread) {
|
| - delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_);
|
| - } else {
|
| - delegate_->PostTask(FROM_HERE,
|
| - from_other_thread_immediate_do_work_closure_);
|
| - }
|
| + MoveableAutoLock lock(any_thread_lock_);
|
| + any_thread().has_incoming_immediate_work.insert(
|
| + std::make_pair(queue, enqueue_order));
|
| + if (!queue_is_blocked)
|
| + MaybeScheduleImmediateWorkLocked(FROM_HERE, std::move(lock));
|
| }
|
|
|
| void TaskQueueManager::MaybeScheduleImmediateWork(
|
| const tracked_objects::Location& from_here) {
|
| - bool on_main_thread = delegate_->BelongsToCurrentThread();
|
| - // De-duplicate DoWork posts.
|
| - if (on_main_thread) {
|
| - if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) {
|
| + MoveableAutoLock lock(any_thread_lock_);
|
| + MaybeScheduleImmediateWorkLocked(from_here, std::move(lock));
|
| +}
|
| +
|
| +void TaskQueueManager::MaybeScheduleImmediateWorkLocked(
|
| + const tracked_objects::Location& from_here,
|
| + MoveableAutoLock&& lock) {
|
| + {
|
| + MoveableAutoLock auto_lock(std::move(lock));
|
| + // Unless we're nested, try to avoid posting redundant DoWorks.
|
| + if (!any_thread().is_nested &&
|
| + (any_thread().do_work_running_count == 1 ||
|
| + any_thread().immediate_do_work_posted_count > 0)) {
|
| return;
|
| }
|
| - delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_);
|
| - } else {
|
| - {
|
| - base::AutoLock lock(any_thread_lock_);
|
| - if (any_thread().other_thread_pending_wakeup)
|
| - return;
|
| - any_thread().other_thread_pending_wakeup = true;
|
| - }
|
| - delegate_->PostTask(from_here,
|
| - from_other_thread_immediate_do_work_closure_);
|
| +
|
| + any_thread().immediate_do_work_posted_count++;
|
| }
|
| +
|
| + TRACE_EVENT0(disabled_by_default_tracing_category_,
|
| + "TaskQueueManager::MaybeScheduleImmediateWorkLocked::PostTask");
|
| + delegate_->PostTask(from_here, immediate_do_work_closure_);
|
| }
|
|
|
| void TaskQueueManager::MaybeScheduleDelayedWork(
|
| @@ -244,54 +234,70 @@ void TaskQueueManager::MaybeScheduleDelayedWork(
|
| base::TimeDelta delay) {
|
| DCHECK(main_thread_checker_.CalledOnValidThread());
|
| DCHECK_GE(delay, base::TimeDelta());
|
| + {
|
| + base::AutoLock lock(any_thread_lock_);
|
|
|
| - // If there's a pending immediate DoWork then we rely on
|
| - // TryAdvanceTimeDomains getting the TimeDomain to call
|
| - // MaybeScheduleDelayedWork again when the immediate DoWork is complete.
|
| - if (main_thread_pending_wakeups_.find(base::TimeTicks()) !=
|
| - main_thread_pending_wakeups_.end()) {
|
| - return;
|
| + // Unless we're nested, don't post a delayed DoWork if there's an immediate
|
| + // DoWork in flight or we're inside a DoWork. We can rely on DoWork posting
|
| + // a delayed continuation as needed.
|
| + if (!any_thread().is_nested &&
|
| + (any_thread().immediate_do_work_posted_count > 0 ||
|
| + any_thread().do_work_running_count == 1)) {
|
| + return;
|
| + }
|
| }
|
| +
|
| // De-duplicate DoWork posts.
|
| base::TimeTicks run_time = now + delay;
|
| - if (!main_thread_pending_wakeups_.empty() &&
|
| - *main_thread_pending_wakeups_.begin() <= run_time) {
|
| + if (next_scheduled_delayed_do_work_time_ <= run_time &&
|
| + !next_scheduled_delayed_do_work_time_.is_null())
|
| return;
|
| - }
|
| - main_thread_pending_wakeups_.insert(run_time);
|
| +
|
| + TRACE_EVENT1(disabled_by_default_tracing_category_,
|
| + "TaskQueueManager::MaybeScheduleDelayedWork::PostDelayedTask",
|
| + "delay_ms", delay.InMillisecondsF());
|
| +
|
| + cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
|
| + next_scheduled_delayed_do_work_time_ = run_time;
|
| delegate_->PostDelayedTask(
|
| - from_here, base::Bind(&TaskQueueManager::DoWork,
|
| - weak_factory_.GetWeakPtr(), run_time, true),
|
| - delay);
|
| + from_here, cancelable_delayed_do_work_closure_.callback(), delay);
|
| }
|
|
|
| -void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
|
| +void TaskQueueManager::DoWork(bool delayed) {
|
| DCHECK(main_thread_checker_.CalledOnValidThread());
|
| - TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork",
|
| - "from_main_thread", from_main_thread);
|
| -
|
| - if (from_main_thread) {
|
| - main_thread_pending_wakeups_.erase(run_time);
|
| - } else {
|
| - base::AutoLock lock(any_thread_lock_);
|
| - any_thread().other_thread_pending_wakeup = false;
|
| - }
|
| -
|
| - // Posting a DoWork while a DoWork is running leads to spurious DoWorks.
|
| - main_thread_pending_wakeups_.insert(base::TimeTicks());
|
| + TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork", "delayed",
|
| + delayed);
|
|
|
| + LazyNow lazy_now(real_time_domain()->CreateLazyNow());
|
| bool is_nested = delegate_->IsNested();
|
| if (!is_nested)
|
| queues_to_delete_.clear();
|
|
|
| - LazyNow lazy_now(real_time_domain()->CreateLazyNow());
|
| - WakeupReadyDelayedQueues(&lazy_now);
|
| + // This must be done before running any tasks because they could invoke a
|
| + // nested message loop and we risk having a stale
|
| + // |next_scheduled_delayed_do_work_time_|.
|
| + if (delayed)
|
| + next_scheduled_delayed_do_work_time_ = base::TimeTicks();
|
|
|
| for (int i = 0; i < work_batch_size_; i++) {
|
| - std::unordered_set<internal::TaskQueueImpl*> queues_to_reload;
|
| + IncomingImmediateWorkMap queues_to_reload;
|
|
|
| {
|
| base::AutoLock lock(any_thread_lock_);
|
| + if (i == 0) {
|
| + any_thread().do_work_running_count++;
|
| +
|
| + if (!delayed) {
|
| + any_thread().immediate_do_work_posted_count--;
|
| + DCHECK_GE(any_thread().immediate_do_work_posted_count, 0);
|
| + }
|
| + } else {
|
| + // Ideally we'd have an OnNestedMessageloopExit observer, but in it's
|
| + // absence we may need to clear this flag after running a task (which
|
| + // ran a nested messageloop).
|
| + any_thread().is_nested = is_nested;
|
| + }
|
| + DCHECK_EQ(any_thread().is_nested, delegate_->IsNested());
|
| std::swap(queues_to_reload, any_thread().has_incoming_immediate_work);
|
| }
|
|
|
| @@ -299,10 +305,13 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
|
| // avoid a lock order inversion.
|
| ReloadEmptyWorkQueues(queues_to_reload);
|
|
|
| - internal::WorkQueue* work_queue;
|
| + WakeupReadyDelayedQueues(&lazy_now);
|
| +
|
| + internal::WorkQueue* work_queue = nullptr;
|
| if (!SelectWorkQueueToService(&work_queue))
|
| break;
|
|
|
| + // NB this may unregister |work_queue|.
|
| base::TimeTicks time_after_task;
|
| switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now,
|
| &time_after_task)) {
|
| @@ -315,11 +324,8 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
|
| return; // The TaskQueueManager got deleted, we must bail out.
|
| }
|
|
|
| - work_queue = nullptr; // The queue may have been unregistered.
|
| -
|
| lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow()
|
| : LazyNow(time_after_task);
|
| - WakeupReadyDelayedQueues(&lazy_now);
|
|
|
| // Only run a single task per batch in nested run loops so that we can
|
| // properly exit the nested loop when someone calls RunLoop::Quit().
|
| @@ -327,45 +333,104 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) {
|
| break;
|
| }
|
|
|
| - main_thread_pending_wakeups_.erase(base::TimeTicks());
|
| -
|
| // TODO(alexclarke): Consider refactoring the above loop to terminate only
|
| // when there's no more work left to be done, rather than posting a
|
| // continuation task.
|
| - base::Optional<base::TimeDelta> next_delay =
|
| - ComputeDelayTillNextTask(&lazy_now);
|
|
|
| - if (!next_delay)
|
| - return;
|
| + {
|
| + MoveableAutoLock lock(any_thread_lock_);
|
| + base::Optional<base::TimeDelta> next_delay =
|
| + ComputeDelayTillNextTaskLocked(&lazy_now);
|
|
|
| - base::TimeDelta delay = next_delay.value();
|
| - if (delay.is_zero()) {
|
| - MaybeScheduleImmediateWork(FROM_HERE);
|
| - } else {
|
| - MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay);
|
| + any_thread().do_work_running_count--;
|
| + DCHECK_GE(any_thread().do_work_running_count, 0);
|
| +
|
| + any_thread().is_nested = is_nested;
|
| + DCHECK_EQ(any_thread().is_nested, delegate_->IsNested());
|
| +
|
| + PostDoWorkContinuationLocked(next_delay, &lazy_now, std::move(lock));
|
| }
|
| }
|
|
|
| -base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask(
|
| - LazyNow* lazy_now) {
|
| +void TaskQueueManager::PostDoWorkContinuationLocked(
|
| + base::Optional<base::TimeDelta> next_delay,
|
| + LazyNow* lazy_now,
|
| + MoveableAutoLock&& lock) {
|
| DCHECK(main_thread_checker_.CalledOnValidThread());
|
| + base::TimeDelta delay;
|
|
|
| - std::unordered_set<internal::TaskQueueImpl*> queues_to_reload;
|
| {
|
| - base::AutoLock lock(any_thread_lock_);
|
| - std::swap(queues_to_reload, any_thread().has_incoming_immediate_work);
|
| + MoveableAutoLock auto_lock(std::move(lock));
|
| +
|
| + // If there are no tasks left then we don't need to post a continuation.
|
| + if (!next_delay) {
|
| + // If there's a pending delayed DoWork, cancel it because it's not needed.
|
| + if (!next_scheduled_delayed_do_work_time_.is_null()) {
|
| + next_scheduled_delayed_do_work_time_ = base::TimeTicks();
|
| + cancelable_delayed_do_work_closure_.Cancel();
|
| + }
|
| + return;
|
| + }
|
| +
|
| + // If an immediate DoWork is posted, we don't need to post a continuation.
|
| + if (any_thread().immediate_do_work_posted_count > 0)
|
| + return;
|
| +
|
| + delay = next_delay.value();
|
| +
|
| + // This isn't supposed to happen, but in case it does convert to
|
| + // non-delayed.
|
| + if (delay < base::TimeDelta())
|
| + delay = base::TimeDelta();
|
| +
|
| + if (delay.is_zero()) {
|
| + // If a delayed DoWork is pending then we don't need to post a
|
| + // continuation because it should run immediately.
|
| + if (!next_scheduled_delayed_do_work_time_.is_null() &&
|
| + next_scheduled_delayed_do_work_time_ <= lazy_now->Now()) {
|
| + return;
|
| + }
|
| +
|
| + any_thread().immediate_do_work_posted_count++;
|
| + } else {
|
| + base::TimeTicks run_time = lazy_now->Now() + delay;
|
| + if (next_scheduled_delayed_do_work_time_ == run_time)
|
| + return;
|
| +
|
| + next_scheduled_delayed_do_work_time_ = run_time;
|
| + }
|
| + }
|
| +
|
| + // We avoid holding |any_thread_lock_| while posting the task.
|
| + if (delay.is_zero()) {
|
| + delegate_->PostTask(FROM_HERE, immediate_do_work_closure_);
|
| + } else {
|
| + cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
|
| + delegate_->PostDelayedTask(
|
| + FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay);
|
| }
|
| +}
|
|
|
| - // It's important we call ReloadEmptyWorkQueues out side of the lock to
|
| - // avoid a lock order inversion.
|
| - ReloadEmptyWorkQueues(queues_to_reload);
|
| +base::Optional<base::TimeDelta>
|
| +TaskQueueManager::ComputeDelayTillNextTaskLocked(LazyNow* lazy_now) {
|
| + DCHECK(main_thread_checker_.CalledOnValidThread());
|
| +
|
| + // Unfortunately because |any_thread_lock_| is held it's not safe to call
|
| + // ReloadEmptyWorkQueues here (possible lock order inversion), however this
|
| + // check is equavalent to calling ReloadEmptyWorkQueues first.
|
| + for (const auto& pair : any_thread().has_incoming_immediate_work) {
|
| + if (pair.first->CouldTaskRun(pair.second))
|
| + return base::TimeDelta();
|
| + }
|
|
|
| // If the selector has non-empty queues we trivially know there is immediate
|
| // work to be done.
|
| if (!selector_.EnabledWorkQueuesEmpty())
|
| return base::TimeDelta();
|
|
|
| - // Otherwise we need to find the shortest delay, if any.
|
| + // Otherwise we need to find the shortest delay, if any. NB we don't need to
|
| + // call WakeupReadyDelayedQueues because it's assumed DelayTillNextTask will
|
| + // return base::TimeDelta>() if the delayed task is due to run now.
|
| base::Optional<base::TimeDelta> next_continuation;
|
| for (TimeDomain* time_domain : time_domains_) {
|
| base::Optional<base::TimeDelta> continuation =
|
| @@ -578,10 +643,15 @@ TaskQueueManager::AsValueWithSelectorResult(
|
| state->EndArray();
|
| {
|
| base::AutoLock lock(any_thread_lock_);
|
| + state->SetBoolean("is_nested", any_thread().is_nested);
|
| + state->SetInteger("do_work_running_count",
|
| + any_thread().do_work_running_count);
|
| + state->SetInteger("immediate_do_work_posted_count",
|
| + any_thread().immediate_do_work_posted_count);
|
| +
|
| state->BeginArray("has_incoming_immediate_work");
|
| - for (internal::TaskQueueImpl* task_queue :
|
| - any_thread().has_incoming_immediate_work) {
|
| - state->AppendString(task_queue->GetName());
|
| + for (const auto& pair : any_thread().has_incoming_immediate_work) {
|
| + state->AppendString(pair.first->GetName());
|
| }
|
| state->EndArray();
|
| }
|
|
|