Index: third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc |
diff --git a/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc b/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc |
index 16dc9089c0f9fe4b5d58cad805e6aa62729d6d50..e42fee10c09e0aa1542725e07936804dca855cc6 100644 |
--- a/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc |
+++ b/third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc |
@@ -75,12 +75,10 @@ TaskQueueManager::TaskQueueManager( |
"TaskQueueManager", this); |
selector_.SetTaskQueueSelectorObserver(this); |
- from_main_thread_immediate_do_work_closure_ = |
- base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), |
- base::TimeTicks(), true); |
- from_other_thread_immediate_do_work_closure_ = |
- base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), |
- base::TimeTicks(), false); |
+ delayed_do_work_closure_ = |
+ base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), true); |
+ immediate_do_work_closure_ = |
+ base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), false); |
// TODO(alexclarke): Change this to be a parameter that's passed in. |
RegisterTimeDomain(real_time_domain_.get()); |
@@ -100,7 +98,10 @@ TaskQueueManager::~TaskQueueManager() { |
delegate_->RemoveNestingObserver(this); |
} |
-TaskQueueManager::AnyThread::AnyThread() : other_thread_pending_wakeup(false) {} |
+TaskQueueManager::AnyThread::AnyThread() |
+ : do_work_running_count(0), |
+ immediate_do_work_posted_count(0), |
+ is_nested(false) {} |
void TaskQueueManager::RegisterTimeDomain(TimeDomain* time_domain) { |
time_domains_.insert(time_domain); |
@@ -155,14 +156,13 @@ void TaskQueueManager::UnregisterTaskQueue( |
} |
void TaskQueueManager::ReloadEmptyWorkQueues( |
- const std::unordered_set<internal::TaskQueueImpl*>& queues_to_reload) |
- const { |
+ const IncomingImmediateWorkMap& queues_to_reload) const { |
// There are two cases where a queue needs reloading. First, it might be |
// completely empty and we've just posted a task (this method handles that |
// case). Secondly if the work queue becomes empty in when calling |
// WorkQueue::TakeTaskFromWorkQueue (handled there). |
- for (internal::TaskQueueImpl* queue : queues_to_reload) { |
- queue->ReloadImmediateWorkQueueIfEmpty(); |
+ for (const auto& pair : queues_to_reload) { |
+ pair.first->ReloadImmediateWorkQueueIfEmpty(); |
} |
} |
@@ -183,59 +183,49 @@ void TaskQueueManager::WakeupReadyDelayedQueues(LazyNow* lazy_now) { |
void TaskQueueManager::OnBeginNestedMessageLoop() { |
// We just entered a nested message loop, make sure there's a DoWork posted or |
// the system will grind to a halt. |
- delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); |
+ { |
+ base::AutoLock lock(any_thread_lock_); |
+ any_thread().immediate_do_work_posted_count++; |
+ any_thread().is_nested = true; |
+ } |
+ delegate_->PostTask(FROM_HERE, immediate_do_work_closure_); |
} |
void TaskQueueManager::OnQueueHasIncomingImmediateWork( |
internal::TaskQueueImpl* queue, |
+ internal::EnqueueOrder enqueue_order, |
bool queue_is_blocked) { |
- bool on_main_thread = delegate_->BelongsToCurrentThread(); |
- |
- { |
- base::AutoLock lock(any_thread_lock_); |
- any_thread().has_incoming_immediate_work.insert(queue); |
- |
- if (queue_is_blocked) |
- return; |
- |
- // De-duplicate DoWork posts. |
- if (on_main_thread) { |
- if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) |
- return; |
- } else { |
- if (any_thread().other_thread_pending_wakeup) |
- return; |
- any_thread().other_thread_pending_wakeup = true; |
- } |
- } |
- |
- if (on_main_thread) { |
- delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); |
- } else { |
- delegate_->PostTask(FROM_HERE, |
- from_other_thread_immediate_do_work_closure_); |
- } |
+ MoveableAutoLock lock(any_thread_lock_); |
+ any_thread().has_incoming_immediate_work.insert( |
+ std::make_pair(queue, enqueue_order)); |
+ if (!queue_is_blocked) |
+ MaybeScheduleImmediateWorkLocked(FROM_HERE, std::move(lock)); |
} |
void TaskQueueManager::MaybeScheduleImmediateWork( |
const tracked_objects::Location& from_here) { |
- bool on_main_thread = delegate_->BelongsToCurrentThread(); |
- // De-duplicate DoWork posts. |
- if (on_main_thread) { |
- if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { |
+ MoveableAutoLock lock(any_thread_lock_); |
+ MaybeScheduleImmediateWorkLocked(from_here, std::move(lock)); |
+} |
+ |
+void TaskQueueManager::MaybeScheduleImmediateWorkLocked( |
+ const tracked_objects::Location& from_here, |
+ MoveableAutoLock&& lock) { |
+ { |
+ MoveableAutoLock auto_lock(std::move(lock)); |
+ // Unless we're nested, try to avoid posting redundant DoWorks. |
+ if (!any_thread().is_nested && |
+ (any_thread().do_work_running_count == 1 || |
+ any_thread().immediate_do_work_posted_count > 0)) { |
return; |
} |
- delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); |
- } else { |
- { |
- base::AutoLock lock(any_thread_lock_); |
- if (any_thread().other_thread_pending_wakeup) |
- return; |
- any_thread().other_thread_pending_wakeup = true; |
- } |
- delegate_->PostTask(from_here, |
- from_other_thread_immediate_do_work_closure_); |
+ |
+ any_thread().immediate_do_work_posted_count++; |
} |
+ |
+ TRACE_EVENT0(disabled_by_default_tracing_category_, |
+ "TaskQueueManager::MaybeScheduleImmediateWorkLocked::PostTask"); |
+ delegate_->PostTask(from_here, immediate_do_work_closure_); |
} |
void TaskQueueManager::MaybeScheduleDelayedWork( |
@@ -244,54 +234,70 @@ void TaskQueueManager::MaybeScheduleDelayedWork( |
base::TimeDelta delay) { |
DCHECK(main_thread_checker_.CalledOnValidThread()); |
DCHECK_GE(delay, base::TimeDelta()); |
+ { |
+ base::AutoLock lock(any_thread_lock_); |
- // If there's a pending immediate DoWork then we rely on |
- // TryAdvanceTimeDomains getting the TimeDomain to call |
- // MaybeScheduleDelayedWork again when the immediate DoWork is complete. |
- if (main_thread_pending_wakeups_.find(base::TimeTicks()) != |
- main_thread_pending_wakeups_.end()) { |
- return; |
+ // Unless we're nested, don't post a delayed DoWork if there's an immediate |
+ // DoWork in flight or we're inside a DoWork. We can rely on DoWork posting |
+ // a delayed continuation as needed. |
+ if (!any_thread().is_nested && |
+ (any_thread().immediate_do_work_posted_count > 0 || |
+ any_thread().do_work_running_count == 1)) { |
+ return; |
+ } |
} |
+ |
// De-duplicate DoWork posts. |
base::TimeTicks run_time = now + delay; |
- if (!main_thread_pending_wakeups_.empty() && |
- *main_thread_pending_wakeups_.begin() <= run_time) { |
+ if (next_scheduled_delayed_do_work_time_ <= run_time && |
+ !next_scheduled_delayed_do_work_time_.is_null()) |
return; |
- } |
- main_thread_pending_wakeups_.insert(run_time); |
+ |
+ TRACE_EVENT1(disabled_by_default_tracing_category_, |
+ "TaskQueueManager::MaybeScheduleDelayedWork::PostDelayedTask", |
+ "delay_ms", delay.InMillisecondsF()); |
+ |
+ cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_); |
+ next_scheduled_delayed_do_work_time_ = run_time; |
delegate_->PostDelayedTask( |
- from_here, base::Bind(&TaskQueueManager::DoWork, |
- weak_factory_.GetWeakPtr(), run_time, true), |
- delay); |
+ from_here, cancelable_delayed_do_work_closure_.callback(), delay); |
} |
-void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) { |
+void TaskQueueManager::DoWork(bool delayed) { |
DCHECK(main_thread_checker_.CalledOnValidThread()); |
- TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork", |
- "from_main_thread", from_main_thread); |
- |
- if (from_main_thread) { |
- main_thread_pending_wakeups_.erase(run_time); |
- } else { |
- base::AutoLock lock(any_thread_lock_); |
- any_thread().other_thread_pending_wakeup = false; |
- } |
- |
- // Posting a DoWork while a DoWork is running leads to spurious DoWorks. |
- main_thread_pending_wakeups_.insert(base::TimeTicks()); |
+ TRACE_EVENT1(tracing_category_, "TaskQueueManager::DoWork", "delayed", |
+ delayed); |
+ LazyNow lazy_now(real_time_domain()->CreateLazyNow()); |
bool is_nested = delegate_->IsNested(); |
if (!is_nested) |
queues_to_delete_.clear(); |
- LazyNow lazy_now(real_time_domain()->CreateLazyNow()); |
- WakeupReadyDelayedQueues(&lazy_now); |
+ // This must be done before running any tasks because they could invoke a |
+ // nested message loop and we risk having a stale |
+ // |next_scheduled_delayed_do_work_time_|. |
+ if (delayed) |
+ next_scheduled_delayed_do_work_time_ = base::TimeTicks(); |
for (int i = 0; i < work_batch_size_; i++) { |
- std::unordered_set<internal::TaskQueueImpl*> queues_to_reload; |
+ IncomingImmediateWorkMap queues_to_reload; |
{ |
base::AutoLock lock(any_thread_lock_); |
+ if (i == 0) { |
+ any_thread().do_work_running_count++; |
+ |
+ if (!delayed) { |
+ any_thread().immediate_do_work_posted_count--; |
+ DCHECK_GE(any_thread().immediate_do_work_posted_count, 0); |
+ } |
+ } else { |
+ // Ideally we'd have an OnNestedMessageloopExit observer, but in it's |
+ // absence we may need to clear this flag after running a task (which |
+ // ran a nested messageloop). |
+ any_thread().is_nested = is_nested; |
+ } |
+ DCHECK_EQ(any_thread().is_nested, delegate_->IsNested()); |
std::swap(queues_to_reload, any_thread().has_incoming_immediate_work); |
} |
@@ -299,10 +305,13 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) { |
// avoid a lock order inversion. |
ReloadEmptyWorkQueues(queues_to_reload); |
- internal::WorkQueue* work_queue; |
+ WakeupReadyDelayedQueues(&lazy_now); |
+ |
+ internal::WorkQueue* work_queue = nullptr; |
if (!SelectWorkQueueToService(&work_queue)) |
break; |
+ // NB this may unregister |work_queue|. |
base::TimeTicks time_after_task; |
switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now, |
&time_after_task)) { |
@@ -315,11 +324,8 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) { |
return; // The TaskQueueManager got deleted, we must bail out. |
} |
- work_queue = nullptr; // The queue may have been unregistered. |
- |
lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow() |
: LazyNow(time_after_task); |
- WakeupReadyDelayedQueues(&lazy_now); |
// Only run a single task per batch in nested run loops so that we can |
// properly exit the nested loop when someone calls RunLoop::Quit(). |
@@ -327,45 +333,104 @@ void TaskQueueManager::DoWork(base::TimeTicks run_time, bool from_main_thread) { |
break; |
} |
- main_thread_pending_wakeups_.erase(base::TimeTicks()); |
- |
// TODO(alexclarke): Consider refactoring the above loop to terminate only |
// when there's no more work left to be done, rather than posting a |
// continuation task. |
- base::Optional<base::TimeDelta> next_delay = |
- ComputeDelayTillNextTask(&lazy_now); |
- if (!next_delay) |
- return; |
+ { |
+ MoveableAutoLock lock(any_thread_lock_); |
+ base::Optional<base::TimeDelta> next_delay = |
+ ComputeDelayTillNextTaskLocked(&lazy_now); |
- base::TimeDelta delay = next_delay.value(); |
- if (delay.is_zero()) { |
- MaybeScheduleImmediateWork(FROM_HERE); |
- } else { |
- MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay); |
+ any_thread().do_work_running_count--; |
+ DCHECK_GE(any_thread().do_work_running_count, 0); |
+ |
+ any_thread().is_nested = is_nested; |
+ DCHECK_EQ(any_thread().is_nested, delegate_->IsNested()); |
+ |
+ PostDoWorkContinuationLocked(next_delay, &lazy_now, std::move(lock)); |
} |
} |
-base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask( |
- LazyNow* lazy_now) { |
+void TaskQueueManager::PostDoWorkContinuationLocked( |
+ base::Optional<base::TimeDelta> next_delay, |
+ LazyNow* lazy_now, |
+ MoveableAutoLock&& lock) { |
DCHECK(main_thread_checker_.CalledOnValidThread()); |
+ base::TimeDelta delay; |
- std::unordered_set<internal::TaskQueueImpl*> queues_to_reload; |
{ |
- base::AutoLock lock(any_thread_lock_); |
- std::swap(queues_to_reload, any_thread().has_incoming_immediate_work); |
+ MoveableAutoLock auto_lock(std::move(lock)); |
+ |
+ // If there are no tasks left then we don't need to post a continuation. |
+ if (!next_delay) { |
+ // If there's a pending delayed DoWork, cancel it because it's not needed. |
+ if (!next_scheduled_delayed_do_work_time_.is_null()) { |
+ next_scheduled_delayed_do_work_time_ = base::TimeTicks(); |
+ cancelable_delayed_do_work_closure_.Cancel(); |
+ } |
+ return; |
+ } |
+ |
+ // If an immediate DoWork is posted, we don't need to post a continuation. |
+ if (any_thread().immediate_do_work_posted_count > 0) |
+ return; |
+ |
+ delay = next_delay.value(); |
+ |
+ // This isn't supposed to happen, but in case it does convert to |
+ // non-delayed. |
+ if (delay < base::TimeDelta()) |
+ delay = base::TimeDelta(); |
+ |
+ if (delay.is_zero()) { |
+ // If a delayed DoWork is pending then we don't need to post a |
+ // continuation because it should run immediately. |
+ if (!next_scheduled_delayed_do_work_time_.is_null() && |
+ next_scheduled_delayed_do_work_time_ <= lazy_now->Now()) { |
+ return; |
+ } |
+ |
+ any_thread().immediate_do_work_posted_count++; |
+ } else { |
+ base::TimeTicks run_time = lazy_now->Now() + delay; |
+ if (next_scheduled_delayed_do_work_time_ == run_time) |
+ return; |
+ |
+ next_scheduled_delayed_do_work_time_ = run_time; |
+ } |
+ } |
+ |
+ // We avoid holding |any_thread_lock_| while posting the task. |
+ if (delay.is_zero()) { |
+ delegate_->PostTask(FROM_HERE, immediate_do_work_closure_); |
+ } else { |
+ cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_); |
+ delegate_->PostDelayedTask( |
+ FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay); |
} |
+} |
- // It's important we call ReloadEmptyWorkQueues out side of the lock to |
- // avoid a lock order inversion. |
- ReloadEmptyWorkQueues(queues_to_reload); |
+base::Optional<base::TimeDelta> |
+TaskQueueManager::ComputeDelayTillNextTaskLocked(LazyNow* lazy_now) { |
+ DCHECK(main_thread_checker_.CalledOnValidThread()); |
+ |
+ // Unfortunately because |any_thread_lock_| is held it's not safe to call |
+ // ReloadEmptyWorkQueues here (possible lock order inversion), however this |
+ // check is equavalent to calling ReloadEmptyWorkQueues first. |
+ for (const auto& pair : any_thread().has_incoming_immediate_work) { |
+ if (pair.first->CouldTaskRun(pair.second)) |
+ return base::TimeDelta(); |
+ } |
// If the selector has non-empty queues we trivially know there is immediate |
// work to be done. |
if (!selector_.EnabledWorkQueuesEmpty()) |
return base::TimeDelta(); |
- // Otherwise we need to find the shortest delay, if any. |
+ // Otherwise we need to find the shortest delay, if any. NB we don't need to |
+ // call WakeupReadyDelayedQueues because it's assumed DelayTillNextTask will |
+ // return base::TimeDelta>() if the delayed task is due to run now. |
base::Optional<base::TimeDelta> next_continuation; |
for (TimeDomain* time_domain : time_domains_) { |
base::Optional<base::TimeDelta> continuation = |
@@ -578,10 +643,15 @@ TaskQueueManager::AsValueWithSelectorResult( |
state->EndArray(); |
{ |
base::AutoLock lock(any_thread_lock_); |
+ state->SetBoolean("is_nested", any_thread().is_nested); |
+ state->SetInteger("do_work_running_count", |
+ any_thread().do_work_running_count); |
+ state->SetInteger("immediate_do_work_posted_count", |
+ any_thread().immediate_do_work_posted_count); |
+ |
state->BeginArray("has_incoming_immediate_work"); |
- for (internal::TaskQueueImpl* task_queue : |
- any_thread().has_incoming_immediate_work) { |
- state->AppendString(task_queue->GetName()); |
+ for (const auto& pair : any_thread().has_incoming_immediate_work) { |
+ state->AppendString(pair.first->GetName()); |
} |
state->EndArray(); |
} |