| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "platform/scheduler/base/task_queue_manager.h" | 5 #include "platform/scheduler/base/task_queue_manager.h" |
| 6 | 6 |
| 7 #include <queue> | 7 #include <queue> |
| 8 #include <set> | 8 #include <set> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 140 DCHECK(main_thread_checker_.CalledOnValidThread()); | 140 DCHECK(main_thread_checker_.CalledOnValidThread()); |
| 141 if (observer_) | 141 if (observer_) |
| 142 observer_->OnUnregisterTaskQueue(task_queue); | 142 observer_->OnUnregisterTaskQueue(task_queue); |
| 143 | 143 |
| 144 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being | 144 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being |
| 145 // freed while any of our structures hold hold a raw pointer to it. | 145 // freed while any of our structures hold hold a raw pointer to it. |
| 146 queues_to_delete_.insert(task_queue); | 146 queues_to_delete_.insert(task_queue); |
| 147 queues_.erase(task_queue); | 147 queues_.erase(task_queue); |
| 148 | 148 |
| 149 selector_.RemoveQueue(task_queue.get()); | 149 selector_.RemoveQueue(task_queue.get()); |
| 150 |
| 151 { |
| 152 base::AutoLock lock(any_thread_lock_); |
| 153 any_thread().has_incoming_immediate_work.erase(task_queue.get()); |
| 154 } |
| 150 } | 155 } |
| 151 | 156 |
| 152 void TaskQueueManager::UpdateWorkQueues(LazyNow* lazy_now) { | 157 void TaskQueueManager::ReloadEmptyWorkQueues( |
| 158 const std::unordered_set<internal::TaskQueueImpl*>& queues_to_reload) |
| 159 const { |
| 160 // There are two cases where a queue needs reloading. First, it might be |
| 161 // completely empty and we've just posted a task (this method handles that |
| 162 // case). Secondly if the work queue becomes empty in when calling |
| 163 // WorkQueue::TakeTaskFromWorkQueue (handled there). |
| 164 for (internal::TaskQueueImpl* queue : queues_to_reload) { |
| 165 queue->ReloadImmediateWorkQueueIfEmpty(); |
| 166 } |
| 167 } |
| 168 |
| 169 void TaskQueueManager::WakeupReadyDelayedQueues(LazyNow* lazy_now) { |
| 153 TRACE_EVENT0(disabled_by_default_tracing_category_, | 170 TRACE_EVENT0(disabled_by_default_tracing_category_, |
| 154 "TaskQueueManager::UpdateWorkQueues"); | 171 "TaskQueueManager::WakeupReadyDelayedQueues"); |
| 155 | 172 |
| 156 for (TimeDomain* time_domain : time_domains_) { | 173 for (TimeDomain* time_domain : time_domains_) { |
| 157 if (time_domain == real_time_domain_.get()) { | 174 if (time_domain == real_time_domain_.get()) { |
| 158 time_domain->UpdateWorkQueues(lazy_now); | 175 time_domain->WakeupReadyDelayedQueues(lazy_now); |
| 159 } else { | 176 } else { |
| 160 LazyNow time_domain_lazy_now = time_domain->CreateLazyNow(); | 177 LazyNow time_domain_lazy_now = time_domain->CreateLazyNow(); |
| 161 time_domain->UpdateWorkQueues(&time_domain_lazy_now); | 178 time_domain->WakeupReadyDelayedQueues(&time_domain_lazy_now); |
| 162 } | 179 } |
| 163 } | 180 } |
| 164 } | 181 } |
| 165 | 182 |
| 166 void TaskQueueManager::OnBeginNestedMessageLoop() { | 183 void TaskQueueManager::OnBeginNestedMessageLoop() { |
| 167 // We just entered a nested message loop, make sure there's a DoWork posted or | 184 // We just entered a nested message loop, make sure there's a DoWork posted or |
| 168 // the system will grind to a halt. | 185 // the system will grind to a halt. |
| 169 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); | 186 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); |
| 170 } | 187 } |
| 171 | 188 |
| 189 void TaskQueueManager::OnQueueHasIncomingImmediateWork( |
| 190 internal::TaskQueueImpl* queue, |
| 191 bool queue_is_blocked) { |
| 192 bool on_main_thread = delegate_->BelongsToCurrentThread(); |
| 193 |
| 194 { |
| 195 base::AutoLock lock(any_thread_lock_); |
| 196 any_thread().has_incoming_immediate_work.insert(queue); |
| 197 |
| 198 if (queue_is_blocked) |
| 199 return; |
| 200 |
| 201 // De-duplicate DoWork posts. |
| 202 if (on_main_thread) { |
| 203 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) |
| 204 return; |
| 205 } else { |
| 206 if (any_thread().other_thread_pending_wakeup) |
| 207 return; |
| 208 any_thread().other_thread_pending_wakeup = true; |
| 209 } |
| 210 } |
| 211 |
| 212 if (on_main_thread) { |
| 213 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); |
| 214 } else { |
| 215 delegate_->PostTask(FROM_HERE, |
| 216 from_other_thread_immediate_do_work_closure_); |
| 217 } |
| 218 } |
| 219 |
| 172 void TaskQueueManager::MaybeScheduleImmediateWork( | 220 void TaskQueueManager::MaybeScheduleImmediateWork( |
| 173 const tracked_objects::Location& from_here) { | 221 const tracked_objects::Location& from_here) { |
| 174 bool on_main_thread = delegate_->BelongsToCurrentThread(); | 222 bool on_main_thread = delegate_->BelongsToCurrentThread(); |
| 175 // De-duplicate DoWork posts. | 223 // De-duplicate DoWork posts. |
| 176 if (on_main_thread) { | 224 if (on_main_thread) { |
| 177 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { | 225 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { |
| 178 return; | 226 return; |
| 179 } | 227 } |
| 180 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); | 228 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); |
| 181 } else { | 229 } else { |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 230 } | 278 } |
| 231 | 279 |
| 232 // Posting a DoWork while a DoWork is running leads to spurious DoWorks. | 280 // Posting a DoWork while a DoWork is running leads to spurious DoWorks. |
| 233 main_thread_pending_wakeups_.insert(base::TimeTicks()); | 281 main_thread_pending_wakeups_.insert(base::TimeTicks()); |
| 234 | 282 |
| 235 bool is_nested = delegate_->IsNested(); | 283 bool is_nested = delegate_->IsNested(); |
| 236 if (!is_nested) | 284 if (!is_nested) |
| 237 queues_to_delete_.clear(); | 285 queues_to_delete_.clear(); |
| 238 | 286 |
| 239 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); | 287 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); |
| 240 UpdateWorkQueues(&lazy_now); | 288 WakeupReadyDelayedQueues(&lazy_now); |
| 241 | 289 |
| 242 for (int i = 0; i < work_batch_size_; i++) { | 290 for (int i = 0; i < work_batch_size_; i++) { |
| 291 std::unordered_set<internal::TaskQueueImpl*> queues_to_reload; |
| 292 |
| 293 { |
| 294 base::AutoLock lock(any_thread_lock_); |
| 295 std::swap(queues_to_reload, any_thread().has_incoming_immediate_work); |
| 296 } |
| 297 |
| 298 // It's important we call ReloadEmptyWorkQueues out side of the lock to |
| 299 // avoid a lock order inversion. |
| 300 ReloadEmptyWorkQueues(queues_to_reload); |
| 301 |
| 243 internal::WorkQueue* work_queue; | 302 internal::WorkQueue* work_queue; |
| 244 if (!SelectWorkQueueToService(&work_queue)) | 303 if (!SelectWorkQueueToService(&work_queue)) |
| 245 break; | 304 break; |
| 246 | 305 |
| 247 base::TimeTicks time_after_task; | 306 base::TimeTicks time_after_task; |
| 248 switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now, | 307 switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now, |
| 249 &time_after_task)) { | 308 &time_after_task)) { |
| 250 case ProcessTaskResult::DEFERRED: | 309 case ProcessTaskResult::DEFERRED: |
| 251 // If a task was deferred, try again with another task. | 310 // If a task was deferred, try again with another task. |
| 252 continue; | 311 continue; |
| 253 case ProcessTaskResult::EXECUTED: | 312 case ProcessTaskResult::EXECUTED: |
| 254 break; | 313 break; |
| 255 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: | 314 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: |
| 256 return; // The TaskQueueManager got deleted, we must bail out. | 315 return; // The TaskQueueManager got deleted, we must bail out. |
| 257 } | 316 } |
| 258 | 317 |
| 259 work_queue = nullptr; // The queue may have been unregistered. | 318 work_queue = nullptr; // The queue may have been unregistered. |
| 260 | 319 |
| 261 lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow() | 320 lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow() |
| 262 : LazyNow(time_after_task); | 321 : LazyNow(time_after_task); |
| 263 UpdateWorkQueues(&lazy_now); | 322 WakeupReadyDelayedQueues(&lazy_now); |
| 264 | 323 |
| 265 // Only run a single task per batch in nested run loops so that we can | 324 // Only run a single task per batch in nested run loops so that we can |
| 266 // properly exit the nested loop when someone calls RunLoop::Quit(). | 325 // properly exit the nested loop when someone calls RunLoop::Quit(). |
| 267 if (is_nested) | 326 if (is_nested) |
| 268 break; | 327 break; |
| 269 } | 328 } |
| 270 | 329 |
| 271 main_thread_pending_wakeups_.erase(base::TimeTicks()); | 330 main_thread_pending_wakeups_.erase(base::TimeTicks()); |
| 272 | 331 |
| 273 // TODO(alexclarke): Consider refactoring the above loop to terminate only | 332 // TODO(alexclarke): Consider refactoring the above loop to terminate only |
| 274 // when there's no more work left to be done, rather than posting a | 333 // when there's no more work left to be done, rather than posting a |
| 275 // continuation task. | 334 // continuation task. |
| 276 base::Optional<base::TimeDelta> next_delay = | 335 base::Optional<base::TimeDelta> next_delay = |
| 277 ComputeDelayTillNextTask(&lazy_now); | 336 ComputeDelayTillNextTask(&lazy_now); |
| 278 | 337 |
| 279 if (!next_delay) | 338 if (!next_delay) |
| 280 return; | 339 return; |
| 281 | 340 |
| 282 base::TimeDelta delay = next_delay.value(); | 341 base::TimeDelta delay = next_delay.value(); |
| 283 if (delay.is_zero()) { | 342 if (delay.is_zero()) { |
| 284 MaybeScheduleImmediateWork(FROM_HERE); | 343 MaybeScheduleImmediateWork(FROM_HERE); |
| 285 } else { | 344 } else { |
| 286 MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay); | 345 MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay); |
| 287 } | 346 } |
| 288 } | 347 } |
| 289 | 348 |
| 290 base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask( | 349 base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask( |
| 291 LazyNow* lazy_now) { | 350 LazyNow* lazy_now) { |
| 351 DCHECK(main_thread_checker_.CalledOnValidThread()); |
| 352 |
| 353 std::unordered_set<internal::TaskQueueImpl*> queues_to_reload; |
| 354 { |
| 355 base::AutoLock lock(any_thread_lock_); |
| 356 std::swap(queues_to_reload, any_thread().has_incoming_immediate_work); |
| 357 } |
| 358 |
| 359 // It's important we call ReloadEmptyWorkQueues out side of the lock to |
| 360 // avoid a lock order inversion. |
| 361 ReloadEmptyWorkQueues(queues_to_reload); |
| 362 |
| 292 // If the selector has non-empty queues we trivially know there is immediate | 363 // If the selector has non-empty queues we trivially know there is immediate |
| 293 // work to be done. | 364 // work to be done. |
| 294 if (!selector_.EnabledWorkQueuesEmpty()) | 365 if (!selector_.EnabledWorkQueuesEmpty()) |
| 295 return base::TimeDelta(); | 366 return base::TimeDelta(); |
| 296 | 367 |
| 297 // Otherwise we need to find the shortest delay, if any. | 368 // Otherwise we need to find the shortest delay, if any. |
| 298 base::Optional<base::TimeDelta> next_continuation; | 369 base::Optional<base::TimeDelta> next_continuation; |
| 299 for (TimeDomain* time_domain : time_domains_) { | 370 for (TimeDomain* time_domain : time_domains_) { |
| 300 base::Optional<base::TimeDelta> continuation = | 371 base::Optional<base::TimeDelta> continuation = |
| 301 time_domain->DelayTillNextTask(lazy_now); | 372 time_domain->DelayTillNextTask(lazy_now); |
| (...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 498 if (should_run) { | 569 if (should_run) { |
| 499 state->SetString("selected_queue", | 570 state->SetString("selected_queue", |
| 500 selected_work_queue->task_queue()->GetName()); | 571 selected_work_queue->task_queue()->GetName()); |
| 501 state->SetString("work_queue_name", selected_work_queue->name()); | 572 state->SetString("work_queue_name", selected_work_queue->name()); |
| 502 } | 573 } |
| 503 | 574 |
| 504 state->BeginArray("time_domains"); | 575 state->BeginArray("time_domains"); |
| 505 for (auto* time_domain : time_domains_) | 576 for (auto* time_domain : time_domains_) |
| 506 time_domain->AsValueInto(state.get()); | 577 time_domain->AsValueInto(state.get()); |
| 507 state->EndArray(); | 578 state->EndArray(); |
| 579 { |
| 580 base::AutoLock lock(any_thread_lock_); |
| 581 state->BeginArray("has_incoming_immediate_work"); |
| 582 for (internal::TaskQueueImpl* task_queue : |
| 583 any_thread().has_incoming_immediate_work) { |
| 584 state->AppendString(task_queue->GetName()); |
| 585 } |
| 586 state->EndArray(); |
| 587 } |
| 508 return std::move(state); | 588 return std::move(state); |
| 509 } | 589 } |
| 510 | 590 |
| 511 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { | 591 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { |
| 512 DCHECK(main_thread_checker_.CalledOnValidThread()); | 592 DCHECK(main_thread_checker_.CalledOnValidThread()); |
| 513 DCHECK(queue->IsQueueEnabled()); | 593 DCHECK(queue->IsQueueEnabled()); |
| 514 // Only schedule DoWork if there's something to do. | 594 // Only schedule DoWork if there's something to do. |
| 515 if (queue->HasPendingImmediateWork() && !queue->BlockedByFence()) | 595 if (queue->HasPendingImmediateWork() && !queue->BlockedByFence()) |
| 516 MaybeScheduleImmediateWork(FROM_HERE); | 596 MaybeScheduleImmediateWork(FROM_HERE); |
| 517 } | 597 } |
| (...skipping 23 matching lines...) Expand all Loading... |
| 541 for (const scoped_refptr<internal::TaskQueueImpl>& queue : queues_) { | 621 for (const scoped_refptr<internal::TaskQueueImpl>& queue : queues_) { |
| 542 TimeDomain* time_domain = queue->GetTimeDomain(); | 622 TimeDomain* time_domain = queue->GetTimeDomain(); |
| 543 if (time_domain_now.find(time_domain) == time_domain_now.end()) | 623 if (time_domain_now.find(time_domain) == time_domain_now.end()) |
| 544 time_domain_now.insert(std::make_pair(time_domain, time_domain->Now())); | 624 time_domain_now.insert(std::make_pair(time_domain, time_domain->Now())); |
| 545 queue->SweepCanceledDelayedTasks(time_domain_now[time_domain]); | 625 queue->SweepCanceledDelayedTasks(time_domain_now[time_domain]); |
| 546 } | 626 } |
| 547 } | 627 } |
| 548 | 628 |
| 549 } // namespace scheduler | 629 } // namespace scheduler |
| 550 } // namespace blink | 630 } // namespace blink |
| OLD | NEW |