OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_thread_pool_impl.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/bind_helpers.h" | 10 #include "base/bind_helpers.h" |
(...skipping 253 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
264 } | 264 } |
265 } | 265 } |
266 | 266 |
267 NOTREACHED(); | 267 NOTREACHED(); |
268 return nullptr; | 268 return nullptr; |
269 } | 269 } |
270 | 270 |
271 void SchedulerThreadPoolImpl::ReEnqueueSequence( | 271 void SchedulerThreadPoolImpl::ReEnqueueSequence( |
272 scoped_refptr<Sequence> sequence, | 272 scoped_refptr<Sequence> sequence, |
273 const SequenceSortKey& sequence_sort_key) { | 273 const SequenceSortKey& sequence_sort_key) { |
274 shared_priority_queue_.BeginTransaction()->Push( | 274 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
275 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), | 275 sequence_sort_key); |
276 sequence_sort_key))); | |
277 | 276 |
278 // The thread calling this method just ran a Task from |sequence| and will | 277 // The thread calling this method just ran a Task from |sequence| and will |
279 // soon try to get another Sequence from which to run a Task. If the thread | 278 // soon try to get another Sequence from which to run a Task. If the thread |
280 // belongs to this pool, it will get that Sequence from | 279 // belongs to this pool, it will get that Sequence from |
281 // |shared_priority_queue_|. When that's the case, there is no need to wake up | 280 // |shared_priority_queue_|. When that's the case, there is no need to wake up |
282 // another thread after |sequence| is inserted in |shared_priority_queue_|. If | 281 // another thread after |sequence| is inserted in |shared_priority_queue_|. If |
283 // we did wake up another thread, we would waste resources by having more | 282 // we did wake up another thread, we would waste resources by having more |
284 // threads trying to get a Sequence from |shared_priority_queue_| than the | 283 // threads trying to get a Sequence from |shared_priority_queue_| than the |
285 // number of Sequences in it. | 284 // number of Sequences in it. |
286 if (tls_current_thread_pool.Get().Get() != this) | 285 if (tls_current_thread_pool.Get().Get() != this) |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
331 | 330 |
332 const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 331 const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
333 if (sequence_was_empty) { | 332 if (sequence_was_empty) { |
334 // Insert |sequence| in |priority_queue| if it was empty before |task| was | 333 // Insert |sequence| in |priority_queue| if it was empty before |task| was |
335 // inserted into it. Otherwise, one of these must be true: | 334 // inserted into it. Otherwise, one of these must be true: |
336 // - |sequence| is already in a PriorityQueue (not necessarily | 335 // - |sequence| is already in a PriorityQueue (not necessarily |
337 // |shared_priority_queue_|), or, | 336 // |shared_priority_queue_|), or, |
338 // - A worker thread is running a Task from |sequence|. It will insert | 337 // - A worker thread is running a Task from |sequence|. It will insert |
339 // |sequence| in a PriorityQueue once it's done running the Task. | 338 // |sequence| in a PriorityQueue once it's done running the Task. |
340 const auto sequence_sort_key = sequence->GetSortKey(); | 339 const auto sequence_sort_key = sequence->GetSortKey(); |
341 priority_queue->BeginTransaction()->Push( | 340 priority_queue->BeginTransaction()->Push(std::move(sequence), |
342 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), | 341 sequence_sort_key); |
343 sequence_sort_key))); | |
344 | 342 |
345 // Wake up a worker thread to process |sequence|. | 343 // Wake up a worker thread to process |sequence|. |
346 if (worker_thread) | 344 if (worker_thread) |
347 worker_thread->WakeUp(); | 345 worker_thread->WakeUp(); |
348 else | 346 else |
349 WakeUpOneThread(); | 347 WakeUpOneThread(); |
350 } | 348 } |
351 } | 349 } |
352 | 350 |
353 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 351 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
(...skipping 22 matching lines...) Expand all Loading... |
376 tls_current_worker_thread.Get().Set(worker_thread); | 374 tls_current_worker_thread.Get().Set(worker_thread); |
377 tls_current_thread_pool.Get().Set(outer_); | 375 tls_current_thread_pool.Get().Set(outer_); |
378 } | 376 } |
379 | 377 |
380 scoped_refptr<Sequence> | 378 scoped_refptr<Sequence> |
381 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( | 379 SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork( |
382 SchedulerWorkerThread* worker_thread) { | 380 SchedulerWorkerThread* worker_thread) { |
383 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); | 381 DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread)); |
384 | 382 |
385 scoped_refptr<Sequence> sequence; | 383 scoped_refptr<Sequence> sequence; |
386 | |
387 { | 384 { |
388 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( | 385 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
389 outer_->shared_priority_queue_.BeginTransaction()); | 386 outer_->shared_priority_queue_.BeginTransaction()); |
390 const auto& shared_sequence_and_sort_key = shared_transaction->Peek(); | |
391 | 387 |
392 std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( | 388 std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
393 SINGLE_THREADED_PRIORITY_QUEUE_FROM_WORKER_THREAD(worker_thread) | 389 SINGLE_THREADED_PRIORITY_QUEUE_FROM_WORKER_THREAD(worker_thread) |
394 ->BeginTransaction()); | 390 ->BeginTransaction()); |
395 const auto& single_threaded_sequence_and_sort_key = | |
396 single_threaded_transaction->Peek(); | |
397 | 391 |
398 if (shared_sequence_and_sort_key.is_null() && | 392 if (shared_transaction->IsEmpty() && |
399 single_threaded_sequence_and_sort_key.is_null()) { | 393 single_threaded_transaction->IsEmpty()) { |
400 single_threaded_transaction.reset(); | 394 single_threaded_transaction.reset(); |
401 | 395 |
402 // |shared_transaction| is kept alive while |worker_thread| is added to | 396 // |shared_transaction| is kept alive while |worker_thread| is added to |
403 // |idle_worker_threads_stack_| to avoid this race: | 397 // |idle_worker_threads_stack_| to avoid this race: |
404 // 1. This thread creates a Transaction, finds |shared_priority_queue_| | 398 // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
405 // empty and ends the Transaction. | 399 // empty and ends the Transaction. |
406 // 2. Other thread creates a Transaction, inserts a Sequence into | 400 // 2. Other thread creates a Transaction, inserts a Sequence into |
407 // |shared_priority_queue_| and ends the Transaction. This can't happen | 401 // |shared_priority_queue_| and ends the Transaction. This can't happen |
408 // if the Transaction of step 1 is still active because because there | 402 // if the Transaction of step 1 is still active because because there |
409 // can only be one active Transaction per PriorityQueue at a time. | 403 // can only be one active Transaction per PriorityQueue at a time. |
410 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because | 404 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because |
411 // |idle_worker_threads_stack_| is empty. | 405 // |idle_worker_threads_stack_| is empty. |
412 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to | 406 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to |
413 // sleep. No thread runs the Sequence inserted in step 2. | 407 // sleep. No thread runs the Sequence inserted in step 2. |
414 outer_->AddToIdleWorkerThreadsStack(worker_thread); | 408 outer_->AddToIdleWorkerThreadsStack(worker_thread); |
415 return nullptr; | 409 return nullptr; |
416 } | 410 } |
417 | 411 |
418 if (single_threaded_sequence_and_sort_key.is_null() || | 412 if (single_threaded_transaction->IsEmpty() || |
419 (!shared_sequence_and_sort_key.is_null() && | 413 (!shared_transaction->IsEmpty() && |
420 single_threaded_sequence_and_sort_key.sort_key < | 414 single_threaded_transaction->PeekSortKey() < |
421 shared_sequence_and_sort_key.sort_key)) { | 415 shared_transaction->PeekSortKey())) { |
422 sequence = shared_sequence_and_sort_key.sequence; | 416 sequence = shared_transaction->PopSequence(); |
423 shared_transaction->Pop(); | |
424 last_sequence_is_single_threaded_ = false; | 417 last_sequence_is_single_threaded_ = false; |
425 } else { | 418 } else { |
426 DCHECK(!single_threaded_sequence_and_sort_key.is_null()); | 419 DCHECK(!single_threaded_transaction->IsEmpty()); |
427 sequence = single_threaded_sequence_and_sort_key.sequence; | 420 sequence = single_threaded_transaction->PopSequence(); |
428 single_threaded_transaction->Pop(); | |
429 last_sequence_is_single_threaded_ = true; | 421 last_sequence_is_single_threaded_ = true; |
430 } | 422 } |
431 } | 423 } |
| 424 DCHECK(sequence); |
432 | 425 |
433 outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); | 426 outer_->RemoveFromIdleWorkerThreadsStack(worker_thread); |
434 return sequence; | 427 return sequence; |
435 } | 428 } |
436 | 429 |
437 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: | 430 void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl:: |
438 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 431 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
439 if (last_sequence_is_single_threaded_) { | 432 if (last_sequence_is_single_threaded_) { |
440 // A single-threaded Sequence is always re-enqueued in the single-threaded | 433 // A single-threaded Sequence is always re-enqueued in the single-threaded |
441 // PriorityQueue from which it was extracted. | 434 // PriorityQueue from which it was extracted. |
442 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | 435 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
443 single_threaded_priority_queue_.BeginTransaction()->Push( | 436 single_threaded_priority_queue_.BeginTransaction()->Push( |
444 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), | 437 std::move(sequence), sequence_sort_key); |
445 sequence_sort_key))); | |
446 } else { | 438 } else { |
447 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 439 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
448 // |sequence| must be enqueued. | 440 // |sequence| must be enqueued. |
449 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 441 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
450 } | 442 } |
451 } | 443 } |
452 | 444 |
453 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( | 445 SchedulerThreadPoolImpl::SchedulerThreadPoolImpl( |
454 TaskTracker* task_tracker, | 446 TaskTracker* task_tracker, |
455 DelayedTaskManager* delayed_task_manager) | 447 DelayedTaskManager* delayed_task_manager) |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
511 } | 503 } |
512 | 504 |
513 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( | 505 void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack( |
514 SchedulerWorkerThread* worker_thread) { | 506 SchedulerWorkerThread* worker_thread) { |
515 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 507 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
516 idle_worker_threads_stack_.Remove(worker_thread); | 508 idle_worker_threads_stack_.Remove(worker_thread); |
517 } | 509 } |
518 | 510 |
519 } // namespace internal | 511 } // namespace internal |
520 } // namespace base | 512 } // namespace base |
OLD | NEW |