Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "platform/scheduler/base/task_queue_manager.h" | 5 #include "platform/scheduler/base/task_queue_manager.h" |
| 6 | 6 |
| 7 #include <queue> | 7 #include <queue> |
| 8 #include <set> | 8 #include <set> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 140 DCHECK(main_thread_checker_.CalledOnValidThread()); | 140 DCHECK(main_thread_checker_.CalledOnValidThread()); |
| 141 if (observer_) | 141 if (observer_) |
| 142 observer_->OnUnregisterTaskQueue(task_queue); | 142 observer_->OnUnregisterTaskQueue(task_queue); |
| 143 | 143 |
| 144 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being | 144 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being |
| 145 // freed while any of our structures hold hold a raw pointer to it. | 145 // freed while any of our structures hold hold a raw pointer to it. |
| 146 queues_to_delete_.insert(task_queue); | 146 queues_to_delete_.insert(task_queue); |
| 147 queues_.erase(task_queue); | 147 queues_.erase(task_queue); |
| 148 | 148 |
| 149 selector_.RemoveQueue(task_queue.get()); | 149 selector_.RemoveQueue(task_queue.get()); |
| 150 | |
| 151 { | |
| 152 base::AutoLock lock(any_thread_lock_); | |
| 153 any_thread().has_incoming_immediate_work.erase(task_queue.get()); | |
| 154 } | |
| 150 } | 155 } |
| 151 | 156 |
| 152 void TaskQueueManager::UpdateWorkQueues(LazyNow* lazy_now) { | 157 void TaskQueueManager::ReloadEmptyWorkQueues( |
| 158 const std::set<internal::TaskQueueImpl*>& queues_to_reload) const { | |
| 159 // There are two cases where a queue needs reloading. First, it might be | |
| 160 // completely empty and we've just posted a task (this method handles that | |
| 161 // case). Secondly | |
|
Sami
2017/01/24 14:52:40
Secondly...? :)
alex clarke (OOO till 29th)
2017/01/24 15:28:14
Done.
| |
| 162 for (internal::TaskQueueImpl* queue : queues_to_reload) { | |
| 163 queue->ReloadImmediateWorkQueueIfEmpty(); | |
| 164 } | |
| 165 } | |
| 166 | |
| 167 void TaskQueueManager::WakeupReadyDelayedQueues(LazyNow* lazy_now) { | |
| 153 TRACE_EVENT0(disabled_by_default_tracing_category_, | 168 TRACE_EVENT0(disabled_by_default_tracing_category_, |
| 154 "TaskQueueManager::UpdateWorkQueues"); | 169 "TaskQueueManager::WakeupReadyDelayedQueues"); |
| 155 | 170 |
| 156 for (TimeDomain* time_domain : time_domains_) { | 171 for (TimeDomain* time_domain : time_domains_) { |
| 157 if (time_domain == real_time_domain_.get()) { | 172 if (time_domain == real_time_domain_.get()) { |
| 158 time_domain->UpdateWorkQueues(lazy_now); | 173 time_domain->WakeupReadyDelayedQueues(lazy_now); |
| 159 } else { | 174 } else { |
| 160 LazyNow time_domain_lazy_now = time_domain->CreateLazyNow(); | 175 LazyNow time_domain_lazy_now = time_domain->CreateLazyNow(); |
| 161 time_domain->UpdateWorkQueues(&time_domain_lazy_now); | 176 time_domain->WakeupReadyDelayedQueues(&time_domain_lazy_now); |
| 162 } | 177 } |
| 163 } | 178 } |
| 164 } | 179 } |
| 165 | 180 |
| 166 void TaskQueueManager::OnBeginNestedMessageLoop() { | 181 void TaskQueueManager::OnBeginNestedMessageLoop() { |
| 167 // We just entered a nested message loop, make sure there's a DoWork posted or | 182 // We just entered a nested message loop, make sure there's a DoWork posted or |
| 168 // the system will grind to a halt. | 183 // the system will grind to a halt. |
| 169 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); | 184 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); |
| 170 } | 185 } |
| 171 | 186 |
| 187 void TaskQueueManager::OnQueueHasIncomingImmediateWork( | |
| 188 internal::TaskQueueImpl* queue, | |
| 189 bool ensure_do_work_posted) { | |
| 190 bool on_main_thread = delegate_->BelongsToCurrentThread(); | |
| 191 | |
| 192 { | |
| 193 base::AutoLock lock(any_thread_lock_); | |
| 194 any_thread().has_incoming_immediate_work.insert(queue); | |
| 195 | |
| 196 if (!ensure_do_work_posted) | |
| 197 return; | |
| 198 | |
| 199 // De-duplicate DoWork posts. | |
| 200 if (on_main_thread) { | |
| 201 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) | |
| 202 return; | |
| 203 } else { | |
| 204 if (any_thread().other_thread_pending_wakeup) | |
| 205 return; | |
| 206 any_thread().other_thread_pending_wakeup = true; | |
| 207 } | |
| 208 } | |
| 209 | |
| 210 if (on_main_thread) { | |
| 211 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); | |
| 212 } else { | |
| 213 delegate_->PostTask(FROM_HERE, | |
| 214 from_other_thread_immediate_do_work_closure_); | |
| 215 } | |
| 216 } | |
| 217 | |
| 172 void TaskQueueManager::MaybeScheduleImmediateWork( | 218 void TaskQueueManager::MaybeScheduleImmediateWork( |
| 173 const tracked_objects::Location& from_here) { | 219 const tracked_objects::Location& from_here) { |
| 174 bool on_main_thread = delegate_->BelongsToCurrentThread(); | 220 bool on_main_thread = delegate_->BelongsToCurrentThread(); |
| 175 // De-duplicate DoWork posts. | 221 // De-duplicate DoWork posts. |
| 176 if (on_main_thread) { | 222 if (on_main_thread) { |
| 177 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { | 223 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { |
| 178 return; | 224 return; |
| 179 } | 225 } |
| 180 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); | 226 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); |
| 181 } else { | 227 } else { |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 230 } | 276 } |
| 231 | 277 |
| 232 // Posting a DoWork while a DoWork is running leads to spurious DoWorks. | 278 // Posting a DoWork while a DoWork is running leads to spurious DoWorks. |
| 233 main_thread_pending_wakeups_.insert(base::TimeTicks()); | 279 main_thread_pending_wakeups_.insert(base::TimeTicks()); |
| 234 | 280 |
| 235 bool is_nested = delegate_->IsNested(); | 281 bool is_nested = delegate_->IsNested(); |
| 236 if (!is_nested) | 282 if (!is_nested) |
| 237 queues_to_delete_.clear(); | 283 queues_to_delete_.clear(); |
| 238 | 284 |
| 239 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); | 285 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); |
| 240 UpdateWorkQueues(&lazy_now); | 286 WakeupReadyDelayedQueues(&lazy_now); |
| 241 | 287 |
| 242 for (int i = 0; i < work_batch_size_; i++) { | 288 for (int i = 0; i < work_batch_size_; i++) { |
| 289 std::set<internal::TaskQueueImpl*> queues_to_reload; | |
|
Sami
2017/01/24 14:52:40
Probably doesn't make a huge difference, but unord
alex clarke (OOO till 29th)
2017/01/24 15:28:14
I guess.
| |
| 290 | |
| 291 { | |
| 292 base::AutoLock lock(any_thread_lock_); | |
| 293 std::swap(queues_to_reload, any_thread().has_incoming_immediate_work); | |
| 294 } | |
| 295 | |
| 296 // It's important we call ReloadEmptyWorkQueues out side of the lock to | |
| 297 // avoid a lock order inversion. | |
| 298 ReloadEmptyWorkQueues(queues_to_reload); | |
| 299 | |
| 243 internal::WorkQueue* work_queue; | 300 internal::WorkQueue* work_queue; |
| 244 if (!SelectWorkQueueToService(&work_queue)) | 301 if (!SelectWorkQueueToService(&work_queue)) |
| 245 break; | 302 break; |
| 246 | 303 |
| 247 base::TimeTicks time_after_task; | 304 base::TimeTicks time_after_task; |
| 248 switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now, | 305 switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now, |
| 249 &time_after_task)) { | 306 &time_after_task)) { |
| 250 case ProcessTaskResult::DEFERRED: | 307 case ProcessTaskResult::DEFERRED: |
| 251 // If a task was deferred, try again with another task. | 308 // If a task was deferred, try again with another task. |
| 252 continue; | 309 continue; |
| 253 case ProcessTaskResult::EXECUTED: | 310 case ProcessTaskResult::EXECUTED: |
| 254 break; | 311 break; |
| 255 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: | 312 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: |
| 256 return; // The TaskQueueManager got deleted, we must bail out. | 313 return; // The TaskQueueManager got deleted, we must bail out. |
| 257 } | 314 } |
| 258 | 315 |
| 259 work_queue = nullptr; // The queue may have been unregistered. | 316 work_queue = nullptr; // The queue may have been unregistered. |
| 260 | 317 |
| 261 lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow() | 318 lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow() |
| 262 : LazyNow(time_after_task); | 319 : LazyNow(time_after_task); |
| 263 UpdateWorkQueues(&lazy_now); | 320 WakeupReadyDelayedQueues(&lazy_now); |
| 264 | 321 |
| 265 // Only run a single task per batch in nested run loops so that we can | 322 // Only run a single task per batch in nested run loops so that we can |
| 266 // properly exit the nested loop when someone calls RunLoop::Quit(). | 323 // properly exit the nested loop when someone calls RunLoop::Quit(). |
| 267 if (is_nested) | 324 if (is_nested) |
| 268 break; | 325 break; |
| 269 } | 326 } |
| 270 | 327 |
| 271 main_thread_pending_wakeups_.erase(base::TimeTicks()); | 328 main_thread_pending_wakeups_.erase(base::TimeTicks()); |
| 272 | 329 |
| 273 // TODO(alexclarke): Consider refactoring the above loop to terminate only | 330 // TODO(alexclarke): Consider refactoring the above loop to terminate only |
| 274 // when there's no more work left to be done, rather than posting a | 331 // when there's no more work left to be done, rather than posting a |
| 275 // continuation task. | 332 // continuation task. |
| 276 base::Optional<base::TimeDelta> next_delay = | 333 base::Optional<base::TimeDelta> next_delay = |
| 277 ComputeDelayTillNextTask(&lazy_now); | 334 ComputeDelayTillNextTask(&lazy_now); |
| 278 | 335 |
| 279 if (!next_delay) | 336 if (!next_delay) |
| 280 return; | 337 return; |
| 281 | 338 |
| 282 base::TimeDelta delay = next_delay.value(); | 339 base::TimeDelta delay = next_delay.value(); |
| 283 if (delay.is_zero()) { | 340 if (delay.is_zero()) { |
| 284 MaybeScheduleImmediateWork(FROM_HERE); | 341 MaybeScheduleImmediateWork(FROM_HERE); |
| 285 } else { | 342 } else { |
| 286 MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay); | 343 MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay); |
| 287 } | 344 } |
| 288 } | 345 } |
| 289 | 346 |
| 290 base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask( | 347 base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask( |
| 291 LazyNow* lazy_now) { | 348 LazyNow* lazy_now) { |
| 349 DCHECK(main_thread_checker_.CalledOnValidThread()); | |
| 350 | |
| 292 // If the selector has non-empty queues we trivially know there is immediate | 351 // If the selector has non-empty queues we trivially know there is immediate |
| 293 // work to be done. | 352 // work to be done. |
| 294 if (!selector_.EnabledWorkQueuesEmpty()) | 353 if (!selector_.EnabledWorkQueuesEmpty()) |
| 295 return base::TimeDelta(); | 354 return base::TimeDelta(); |
| 296 | 355 |
| 356 { | |
| 357 base::AutoLock lock(any_thread_lock_); | |
|
Sami
2017/01/24 14:52:40
Any lock inversion issues here? Basically I think
alex clarke (OOO till 29th)
2017/01/24 15:28:14
Yeah actually that could be one. Can probably get
alex clarke (OOO till 29th)
2017/01/24 15:29:11
Actually on second thoughts I think I'm trying to
| |
| 358 // If we have incoming immediate work for any queue able to run, we know | |
| 359 // there is immediate work to be done. | |
| 360 for (internal::TaskQueueImpl* queue : | |
| 361 any_thread().has_incoming_immediate_work) { | |
| 362 // Note when posted ImmediateTaskCouldRun would have returned true but | |
| 363 // conditions may have changed since then. | |
| 364 if (queue->ImmediateTaskCouldRun()) | |
| 365 return base::TimeDelta(); | |
| 366 } | |
| 367 } | |
| 368 | |
| 297 // Otherwise we need to find the shortest delay, if any. | 369 // Otherwise we need to find the shortest delay, if any. |
| 298 base::Optional<base::TimeDelta> next_continuation; | 370 base::Optional<base::TimeDelta> next_continuation; |
| 299 for (TimeDomain* time_domain : time_domains_) { | 371 for (TimeDomain* time_domain : time_domains_) { |
| 300 base::Optional<base::TimeDelta> continuation = | 372 base::Optional<base::TimeDelta> continuation = |
| 301 time_domain->DelayTillNextTask(lazy_now); | 373 time_domain->DelayTillNextTask(lazy_now); |
| 302 if (!continuation) | 374 if (!continuation) |
| 303 continue; | 375 continue; |
| 304 if (!next_continuation || next_continuation.value() > continuation.value()) | 376 if (!next_continuation || next_continuation.value() > continuation.value()) |
| 305 next_continuation = continuation; | 377 next_continuation = continuation; |
| 306 } | 378 } |
| (...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 498 if (should_run) { | 570 if (should_run) { |
| 499 state->SetString("selected_queue", | 571 state->SetString("selected_queue", |
| 500 selected_work_queue->task_queue()->GetName()); | 572 selected_work_queue->task_queue()->GetName()); |
| 501 state->SetString("work_queue_name", selected_work_queue->name()); | 573 state->SetString("work_queue_name", selected_work_queue->name()); |
| 502 } | 574 } |
| 503 | 575 |
| 504 state->BeginArray("time_domains"); | 576 state->BeginArray("time_domains"); |
| 505 for (auto* time_domain : time_domains_) | 577 for (auto* time_domain : time_domains_) |
| 506 time_domain->AsValueInto(state.get()); | 578 time_domain->AsValueInto(state.get()); |
| 507 state->EndArray(); | 579 state->EndArray(); |
| 580 { | |
| 581 base::AutoLock lock(any_thread_lock_); | |
| 582 state->BeginArray("has_incoming_immediate_work"); | |
| 583 for (internal::TaskQueueImpl* task_queue : | |
| 584 any_thread().has_incoming_immediate_work) { | |
| 585 state->AppendString(task_queue->GetName()); | |
| 586 } | |
| 587 state->EndArray(); | |
| 588 } | |
| 508 return std::move(state); | 589 return std::move(state); |
| 509 } | 590 } |
| 510 | 591 |
| 511 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { | 592 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { |
| 512 DCHECK(main_thread_checker_.CalledOnValidThread()); | 593 DCHECK(main_thread_checker_.CalledOnValidThread()); |
| 513 DCHECK(queue->IsQueueEnabled()); | 594 DCHECK(queue->IsQueueEnabled()); |
| 514 // Only schedule DoWork if there's something to do. | 595 // Only schedule DoWork if there's something to do. |
| 515 if (queue->HasPendingImmediateWork() && !queue->BlockedByFence()) | 596 if (queue->HasPendingImmediateWork() && !queue->BlockedByFence()) |
| 516 MaybeScheduleImmediateWork(FROM_HERE); | 597 MaybeScheduleImmediateWork(FROM_HERE); |
| 517 } | 598 } |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 541 for (const scoped_refptr<internal::TaskQueueImpl>& queue : queues_) { | 622 for (const scoped_refptr<internal::TaskQueueImpl>& queue : queues_) { |
| 542 TimeDomain* time_domain = queue->GetTimeDomain(); | 623 TimeDomain* time_domain = queue->GetTimeDomain(); |
| 543 if (time_domain_now.find(time_domain) == time_domain_now.end()) | 624 if (time_domain_now.find(time_domain) == time_domain_now.end()) |
| 544 time_domain_now.insert(std::make_pair(time_domain, time_domain->Now())); | 625 time_domain_now.insert(std::make_pair(time_domain, time_domain->Now())); |
| 545 queue->SweepCanceledDelayedTasks(time_domain_now[time_domain]); | 626 queue->SweepCanceledDelayedTasks(time_domain_now[time_domain]); |
| 546 } | 627 } |
| 547 } | 628 } |
| 548 | 629 |
| 549 } // namespace scheduler | 630 } // namespace scheduler |
| 550 } // namespace blink | 631 } // namespace blink |
| OLD | NEW |