Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(728)

Side by Side Diff: third_party/WebKit/Source/platform/scheduler/base/task_queue_manager.cc

Issue 2653643002: Move has_incoming_immediate_work to the TaskQueueManager (Closed)
Patch Set: Add some tracing Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "platform/scheduler/base/task_queue_manager.h" 5 #include "platform/scheduler/base/task_queue_manager.h"
6 6
7 #include <queue> 7 #include <queue>
8 #include <set> 8 #include <set>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 DCHECK(main_thread_checker_.CalledOnValidThread()); 140 DCHECK(main_thread_checker_.CalledOnValidThread());
141 if (observer_) 141 if (observer_)
142 observer_->OnUnregisterTaskQueue(task_queue); 142 observer_->OnUnregisterTaskQueue(task_queue);
143 143
144 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being 144 // Add |task_queue| to |queues_to_delete_| so we can prevent it from being
145 // freed while any of our structures hold hold a raw pointer to it. 145 // freed while any of our structures hold hold a raw pointer to it.
146 queues_to_delete_.insert(task_queue); 146 queues_to_delete_.insert(task_queue);
147 queues_.erase(task_queue); 147 queues_.erase(task_queue);
148 148
149 selector_.RemoveQueue(task_queue.get()); 149 selector_.RemoveQueue(task_queue.get());
150
151 {
152 base::AutoLock lock(any_thread_lock_);
153 any_thread().has_incoming_immediate_work.erase(task_queue.get());
154 }
150 } 155 }
151 156
152 void TaskQueueManager::UpdateWorkQueues(LazyNow* lazy_now) { 157 void TaskQueueManager::ReloadEmptyWorkQueues(
158 const std::set<internal::TaskQueueImpl*>& queues_to_reload) const {
159 // There are two cases where a queue needs reloading. First, it might be
160 // completely empty and we've just posted a task (this method handles that
161 // case). Secondly
Sami 2017/01/24 14:52:40 Secondly...? :)
alex clarke (OOO till 29th) 2017/01/24 15:28:14 Done.
162 for (internal::TaskQueueImpl* queue : queues_to_reload) {
163 queue->ReloadImmediateWorkQueueIfEmpty();
164 }
165 }
166
167 void TaskQueueManager::WakeupReadyDelayedQueues(LazyNow* lazy_now) {
153 TRACE_EVENT0(disabled_by_default_tracing_category_, 168 TRACE_EVENT0(disabled_by_default_tracing_category_,
154 "TaskQueueManager::UpdateWorkQueues"); 169 "TaskQueueManager::WakeupReadyDelayedQueues");
155 170
156 for (TimeDomain* time_domain : time_domains_) { 171 for (TimeDomain* time_domain : time_domains_) {
157 if (time_domain == real_time_domain_.get()) { 172 if (time_domain == real_time_domain_.get()) {
158 time_domain->UpdateWorkQueues(lazy_now); 173 time_domain->WakeupReadyDelayedQueues(lazy_now);
159 } else { 174 } else {
160 LazyNow time_domain_lazy_now = time_domain->CreateLazyNow(); 175 LazyNow time_domain_lazy_now = time_domain->CreateLazyNow();
161 time_domain->UpdateWorkQueues(&time_domain_lazy_now); 176 time_domain->WakeupReadyDelayedQueues(&time_domain_lazy_now);
162 } 177 }
163 } 178 }
164 } 179 }
165 180
166 void TaskQueueManager::OnBeginNestedMessageLoop() { 181 void TaskQueueManager::OnBeginNestedMessageLoop() {
167 // We just entered a nested message loop, make sure there's a DoWork posted or 182 // We just entered a nested message loop, make sure there's a DoWork posted or
168 // the system will grind to a halt. 183 // the system will grind to a halt.
169 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_); 184 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_);
170 } 185 }
171 186
187 void TaskQueueManager::OnQueueHasIncomingImmediateWork(
188 internal::TaskQueueImpl* queue,
189 bool ensure_do_work_posted) {
190 bool on_main_thread = delegate_->BelongsToCurrentThread();
191
192 {
193 base::AutoLock lock(any_thread_lock_);
194 any_thread().has_incoming_immediate_work.insert(queue);
195
196 if (!ensure_do_work_posted)
197 return;
198
199 // De-duplicate DoWork posts.
200 if (on_main_thread) {
201 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second)
202 return;
203 } else {
204 if (any_thread().other_thread_pending_wakeup)
205 return;
206 any_thread().other_thread_pending_wakeup = true;
207 }
208 }
209
210 if (on_main_thread) {
211 delegate_->PostTask(FROM_HERE, from_main_thread_immediate_do_work_closure_);
212 } else {
213 delegate_->PostTask(FROM_HERE,
214 from_other_thread_immediate_do_work_closure_);
215 }
216 }
217
172 void TaskQueueManager::MaybeScheduleImmediateWork( 218 void TaskQueueManager::MaybeScheduleImmediateWork(
173 const tracked_objects::Location& from_here) { 219 const tracked_objects::Location& from_here) {
174 bool on_main_thread = delegate_->BelongsToCurrentThread(); 220 bool on_main_thread = delegate_->BelongsToCurrentThread();
175 // De-duplicate DoWork posts. 221 // De-duplicate DoWork posts.
176 if (on_main_thread) { 222 if (on_main_thread) {
177 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) { 223 if (!main_thread_pending_wakeups_.insert(base::TimeTicks()).second) {
178 return; 224 return;
179 } 225 }
180 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_); 226 delegate_->PostTask(from_here, from_main_thread_immediate_do_work_closure_);
181 } else { 227 } else {
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
230 } 276 }
231 277
232 // Posting a DoWork while a DoWork is running leads to spurious DoWorks. 278 // Posting a DoWork while a DoWork is running leads to spurious DoWorks.
233 main_thread_pending_wakeups_.insert(base::TimeTicks()); 279 main_thread_pending_wakeups_.insert(base::TimeTicks());
234 280
235 bool is_nested = delegate_->IsNested(); 281 bool is_nested = delegate_->IsNested();
236 if (!is_nested) 282 if (!is_nested)
237 queues_to_delete_.clear(); 283 queues_to_delete_.clear();
238 284
239 LazyNow lazy_now(real_time_domain()->CreateLazyNow()); 285 LazyNow lazy_now(real_time_domain()->CreateLazyNow());
240 UpdateWorkQueues(&lazy_now); 286 WakeupReadyDelayedQueues(&lazy_now);
241 287
242 for (int i = 0; i < work_batch_size_; i++) { 288 for (int i = 0; i < work_batch_size_; i++) {
289 std::set<internal::TaskQueueImpl*> queues_to_reload;
Sami 2017/01/24 14:52:40 Probably doesn't make a huge difference, but unord
alex clarke (OOO till 29th) 2017/01/24 15:28:14 I guess.
290
291 {
292 base::AutoLock lock(any_thread_lock_);
293 std::swap(queues_to_reload, any_thread().has_incoming_immediate_work);
294 }
295
296 // It's important we call ReloadEmptyWorkQueues out side of the lock to
297 // avoid a lock order inversion.
298 ReloadEmptyWorkQueues(queues_to_reload);
299
243 internal::WorkQueue* work_queue; 300 internal::WorkQueue* work_queue;
244 if (!SelectWorkQueueToService(&work_queue)) 301 if (!SelectWorkQueueToService(&work_queue))
245 break; 302 break;
246 303
247 base::TimeTicks time_after_task; 304 base::TimeTicks time_after_task;
248 switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now, 305 switch (ProcessTaskFromWorkQueue(work_queue, is_nested, lazy_now,
249 &time_after_task)) { 306 &time_after_task)) {
250 case ProcessTaskResult::DEFERRED: 307 case ProcessTaskResult::DEFERRED:
251 // If a task was deferred, try again with another task. 308 // If a task was deferred, try again with another task.
252 continue; 309 continue;
253 case ProcessTaskResult::EXECUTED: 310 case ProcessTaskResult::EXECUTED:
254 break; 311 break;
255 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED: 312 case ProcessTaskResult::TASK_QUEUE_MANAGER_DELETED:
256 return; // The TaskQueueManager got deleted, we must bail out. 313 return; // The TaskQueueManager got deleted, we must bail out.
257 } 314 }
258 315
259 work_queue = nullptr; // The queue may have been unregistered. 316 work_queue = nullptr; // The queue may have been unregistered.
260 317
261 lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow() 318 lazy_now = time_after_task.is_null() ? real_time_domain()->CreateLazyNow()
262 : LazyNow(time_after_task); 319 : LazyNow(time_after_task);
263 UpdateWorkQueues(&lazy_now); 320 WakeupReadyDelayedQueues(&lazy_now);
264 321
265 // Only run a single task per batch in nested run loops so that we can 322 // Only run a single task per batch in nested run loops so that we can
266 // properly exit the nested loop when someone calls RunLoop::Quit(). 323 // properly exit the nested loop when someone calls RunLoop::Quit().
267 if (is_nested) 324 if (is_nested)
268 break; 325 break;
269 } 326 }
270 327
271 main_thread_pending_wakeups_.erase(base::TimeTicks()); 328 main_thread_pending_wakeups_.erase(base::TimeTicks());
272 329
273 // TODO(alexclarke): Consider refactoring the above loop to terminate only 330 // TODO(alexclarke): Consider refactoring the above loop to terminate only
274 // when there's no more work left to be done, rather than posting a 331 // when there's no more work left to be done, rather than posting a
275 // continuation task. 332 // continuation task.
276 base::Optional<base::TimeDelta> next_delay = 333 base::Optional<base::TimeDelta> next_delay =
277 ComputeDelayTillNextTask(&lazy_now); 334 ComputeDelayTillNextTask(&lazy_now);
278 335
279 if (!next_delay) 336 if (!next_delay)
280 return; 337 return;
281 338
282 base::TimeDelta delay = next_delay.value(); 339 base::TimeDelta delay = next_delay.value();
283 if (delay.is_zero()) { 340 if (delay.is_zero()) {
284 MaybeScheduleImmediateWork(FROM_HERE); 341 MaybeScheduleImmediateWork(FROM_HERE);
285 } else { 342 } else {
286 MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay); 343 MaybeScheduleDelayedWork(FROM_HERE, lazy_now.Now(), delay);
287 } 344 }
288 } 345 }
289 346
290 base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask( 347 base::Optional<base::TimeDelta> TaskQueueManager::ComputeDelayTillNextTask(
291 LazyNow* lazy_now) { 348 LazyNow* lazy_now) {
349 DCHECK(main_thread_checker_.CalledOnValidThread());
350
292 // If the selector has non-empty queues we trivially know there is immediate 351 // If the selector has non-empty queues we trivially know there is immediate
293 // work to be done. 352 // work to be done.
294 if (!selector_.EnabledWorkQueuesEmpty()) 353 if (!selector_.EnabledWorkQueuesEmpty())
295 return base::TimeDelta(); 354 return base::TimeDelta();
296 355
356 {
357 base::AutoLock lock(any_thread_lock_);
Sami 2017/01/24 14:52:40 Any lock inversion issues here? Basically I think
alex clarke (OOO till 29th) 2017/01/24 15:28:14 Yeah actually that could be one. Can probably get
alex clarke (OOO till 29th) 2017/01/24 15:29:11 Actually on second thoughts I think I'm trying to
358 // If we have incoming immediate work for any queue able to run, we know
359 // there is immediate work to be done.
360 for (internal::TaskQueueImpl* queue :
361 any_thread().has_incoming_immediate_work) {
362 // Note when posted ImmediateTaskCouldRun would have returned true but
363 // conditions may have changed since then.
364 if (queue->ImmediateTaskCouldRun())
365 return base::TimeDelta();
366 }
367 }
368
297 // Otherwise we need to find the shortest delay, if any. 369 // Otherwise we need to find the shortest delay, if any.
298 base::Optional<base::TimeDelta> next_continuation; 370 base::Optional<base::TimeDelta> next_continuation;
299 for (TimeDomain* time_domain : time_domains_) { 371 for (TimeDomain* time_domain : time_domains_) {
300 base::Optional<base::TimeDelta> continuation = 372 base::Optional<base::TimeDelta> continuation =
301 time_domain->DelayTillNextTask(lazy_now); 373 time_domain->DelayTillNextTask(lazy_now);
302 if (!continuation) 374 if (!continuation)
303 continue; 375 continue;
304 if (!next_continuation || next_continuation.value() > continuation.value()) 376 if (!next_continuation || next_continuation.value() > continuation.value())
305 next_continuation = continuation; 377 next_continuation = continuation;
306 } 378 }
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after
498 if (should_run) { 570 if (should_run) {
499 state->SetString("selected_queue", 571 state->SetString("selected_queue",
500 selected_work_queue->task_queue()->GetName()); 572 selected_work_queue->task_queue()->GetName());
501 state->SetString("work_queue_name", selected_work_queue->name()); 573 state->SetString("work_queue_name", selected_work_queue->name());
502 } 574 }
503 575
504 state->BeginArray("time_domains"); 576 state->BeginArray("time_domains");
505 for (auto* time_domain : time_domains_) 577 for (auto* time_domain : time_domains_)
506 time_domain->AsValueInto(state.get()); 578 time_domain->AsValueInto(state.get());
507 state->EndArray(); 579 state->EndArray();
580 {
581 base::AutoLock lock(any_thread_lock_);
582 state->BeginArray("has_incoming_immediate_work");
583 for (internal::TaskQueueImpl* task_queue :
584 any_thread().has_incoming_immediate_work) {
585 state->AppendString(task_queue->GetName());
586 }
587 state->EndArray();
588 }
508 return std::move(state); 589 return std::move(state);
509 } 590 }
510 591
511 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) { 592 void TaskQueueManager::OnTaskQueueEnabled(internal::TaskQueueImpl* queue) {
512 DCHECK(main_thread_checker_.CalledOnValidThread()); 593 DCHECK(main_thread_checker_.CalledOnValidThread());
513 DCHECK(queue->IsQueueEnabled()); 594 DCHECK(queue->IsQueueEnabled());
514 // Only schedule DoWork if there's something to do. 595 // Only schedule DoWork if there's something to do.
515 if (queue->HasPendingImmediateWork() && !queue->BlockedByFence()) 596 if (queue->HasPendingImmediateWork() && !queue->BlockedByFence())
516 MaybeScheduleImmediateWork(FROM_HERE); 597 MaybeScheduleImmediateWork(FROM_HERE);
517 } 598 }
(...skipping 23 matching lines...) Expand all
541 for (const scoped_refptr<internal::TaskQueueImpl>& queue : queues_) { 622 for (const scoped_refptr<internal::TaskQueueImpl>& queue : queues_) {
542 TimeDomain* time_domain = queue->GetTimeDomain(); 623 TimeDomain* time_domain = queue->GetTimeDomain();
543 if (time_domain_now.find(time_domain) == time_domain_now.end()) 624 if (time_domain_now.find(time_domain) == time_domain_now.end())
544 time_domain_now.insert(std::make_pair(time_domain, time_domain->Now())); 625 time_domain_now.insert(std::make_pair(time_domain, time_domain->Now()));
545 queue->SweepCanceledDelayedTasks(time_domain_now[time_domain]); 626 queue->SweepCanceledDelayedTasks(time_domain_now[time_domain]);
546 } 627 }
547 } 628 }
548 629
549 } // namespace scheduler 630 } // namespace scheduler
550 } // namespace blink 631 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698