Chromium Code Reviews| Index: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc |
| diff --git a/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..cf03e60db710b1cfbc307b161c3202055d7f9e31 |
| --- /dev/null |
| +++ b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc |
| @@ -0,0 +1,212 @@ |
| +// Copyright 2017 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "base/task_scheduler/scheduler_single_thread_worker_pool_manager.h" |
| + |
| +#include <string> |
| + |
| +#include "base/bind.h" |
| +#include "base/bind_helpers.h" |
| +#include "base/memory/ptr_util.h" |
| +#include "base/strings/stringprintf.h" |
| +#include "base/task_scheduler/scheduler_worker_pool_params.h" |
| +#include "base/task_scheduler/task_traits.h" |
| +#include "base/threading/thread.h" |
| + |
| +namespace base { |
| +namespace internal { |
| + |
| +class SchedulerSingleThreadWorkerPoolManager:: |
| + SingleThreadSchedulerWorkerPoolImplPool { |
| + public: |
| + SingleThreadSchedulerWorkerPoolImplPool( |
| + const SchedulerWorkerPoolParams& params, |
| + TaskTracker* task_tracker, |
| + DelayedTaskManager* delayed_task_manager) |
| + : params_(params), |
| + task_tracker_(task_tracker), |
| + delayed_task_manager_(delayed_task_manager), |
| + unregister_single_thread_worker_pool_callback_( |
| + Bind(&SingleThreadSchedulerWorkerPoolImplPool:: |
| + UnregisterSingleThreadWorkerPool, |
| + Unretained(this))) {} |
| + ~SingleThreadSchedulerWorkerPoolImplPool() = default; |
| + |
| + scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits( |
| + const TaskTraits& traits) { |
| + return GetAvailableOrCreateSchedulerWorkerPool() |
| + ->CreateSingleThreadTaskRunnerWithTraits(traits); |
| + } |
| + |
| + void JoinForTesting() { |
|
fdoray
2017/01/27 16:47:35
Set a |join_for_testing_called_| flag and DCHECK t
robliao
2017/01/27 21:25:41
I'm going to make worker pool cleanup best-effort
|
| + // SchedulerWorkerPoolImpl::JoinForTesting() acquires a SchedulerLock, so we |
| + // move |worker_pools_| locally, request the detachment and join, and then |
|
fdoray
2017/01/27 16:47:35
s/request the detachment/disallow detachment/
robliao
2017/01/27 21:25:40
Done.
|
| + // move it back once we're done. |
| + decltype(worker_pools_) local; |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + local = std::move(worker_pools_); |
| + } |
| + for (const auto& worker_pool : local) |
| + worker_pool->DisallowWorkerDetachmentForTesting(); |
| + for (const auto& worker_pool : local) |
| + worker_pool->JoinForTesting(); |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + DCHECK(worker_pools_.empty()); |
| + worker_pools_ = std::move(local); |
| + } |
| + } |
| + |
| + void WaitForAllWorkersIdleForTesting() { |
| + // SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() acquires a |
| + // SchedulerLock, so we move |worker_pools_| locally, request the wait, and |
| + // then move it back once we're done. |
| + decltype(worker_pools_) local; |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + local = std::move(worker_pools_); |
| + } |
| + for (const auto& worker_pool : local) |
| + worker_pool->WaitForAllWorkersIdleForTesting(); |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + DCHECK(worker_pools_.empty()); |
| + worker_pools_ = std::move(local); |
| + } |
| + } |
| + |
| + private: |
| + SchedulerWorkerPoolImpl* GetAvailableOrCreateSchedulerWorkerPool() { |
| + int num_worker_pools = 0; |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + if (free_worker_pool_indexes_.empty()) { |
| + num_worker_pools = worker_pools_.size(); |
|
fdoray
2017/01/27 16:47:34
A concurrent call to GetAvailableOrCreateScheduler
robliao
2017/01/27 21:25:40
Nice catch! Indeed! Fixed.
|
| + } else { |
| + size_t index = free_worker_pool_indexes_.back(); |
|
fdoray
2017/01/27 16:47:35
Optional:
We create SingleThreadTaskRunners for b
robliao
2017/01/27 21:25:41
While skipping the cleanup code would keep things
|
| + free_worker_pool_indexes_.pop_back(); |
| + return worker_pools_[index].get(); |
| + } |
| + } |
| + |
| + SchedulerWorkerPoolParams worker_pool_params( |
| + base::StringPrintf("Single%s%d", params_.name().c_str(), |
| + num_worker_pools), |
| + params_.priority_hint(), |
| + SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY, 1, |
| + params_.suggested_reclaim_time(), params_.backward_compatibility()); |
| + // SchedulerWorkerPoolImpl::Create acquires locks underneath, so we have to |
| + // construct the new SchedulerWorkerPoolImpl outside of |
| + // |single_thread_worker_pools_lock_|. |
|
fdoray
2017/01/27 16:47:34
s/single_thread_worker_pools_lock_/lock_/
robliao
2017/01/27 21:25:40
Done.
|
| + std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool = |
| + SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool( |
| + worker_pool_params, unregister_single_thread_worker_pool_callback_, |
| + task_tracker_, delayed_task_manager_); |
| + DCHECK(worker_pool); |
| + AutoSchedulerLock auto_lock(lock_); |
| + worker_pools_.emplace_back(std::move(worker_pool)); |
| + return worker_pools_.back().get(); |
| + } |
| + |
| + void UnregisterSingleThreadWorkerPool( |
| + const SchedulerWorkerPoolImpl* scheduler_worker_pool) { |
| + AutoSchedulerLock auto_lock(lock_); |
| + size_t worker_pool_count = worker_pools_.size(); |
| + DCHECK_NE(0U, worker_pool_count); |
| + for (size_t i = 0; i < worker_pool_count; ++i) { |
| + if (scheduler_worker_pool == worker_pools_[i].get()) { |
| + free_worker_pool_indexes_.push_back(i); |
| + return; |
| + } |
| + } |
| + NOTREACHED(); |
| + } |
| + |
| + const SchedulerWorkerPoolParams params_; |
| + TaskTracker* const task_tracker_; |
| + DelayedTaskManager* const delayed_task_manager_; |
| + const SchedulerWorkerPoolImpl::UnregisterSingleThreadWorkerPoolCallback |
| + unregister_single_thread_worker_pool_callback_; |
| + |
| + // |lock_| synchronizes the items below. |
| + SchedulerLock lock_; |
| + std::vector<std::unique_ptr<SchedulerWorkerPoolImpl>> worker_pools_; |
| + std::vector<size_t> free_worker_pool_indexes_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SingleThreadSchedulerWorkerPoolImplPool); |
| +}; |
| + |
| +SchedulerSingleThreadWorkerPoolManager::SchedulerSingleThreadWorkerPoolManager( |
| + const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
| + const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
| + worker_pool_index_for_traits_callback, |
| + TaskTracker* task_tracker, |
| + DelayedTaskManager* delayed_task_manager) |
| + : worker_pool_params_vector_(worker_pool_params_vector), |
| + worker_pool_index_for_traits_callback_( |
| + worker_pool_index_for_traits_callback) { |
| + size_t worker_pool_count = worker_pool_params_vector_.size(); |
| + worker_pool_impl_pools_.resize(worker_pool_count); |
| + for (size_t i = 0; i < worker_pool_count; i++) { |
| + worker_pool_impl_pools_[i] = |
| + MakeUnique<SingleThreadSchedulerWorkerPoolImplPool>( |
| + worker_pool_params_vector[i], task_tracker, delayed_task_manager); |
| + } |
| +} |
| + |
| +SchedulerSingleThreadWorkerPoolManager:: |
| + ~SchedulerSingleThreadWorkerPoolManager() = default; |
| + |
| +scoped_refptr<SingleThreadTaskRunner> |
| +SchedulerSingleThreadWorkerPoolManager::CreateSingleThreadTaskRunnerWithTraits( |
| + const TaskTraits& traits) { |
| + size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| + SingleThreadSchedulerWorkerPoolImplPool* worker_pool_impl_pool; |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
|
fdoray
2017/01/27 16:47:34
No lock required. |worker_pool_impl_pools_| is onl
robliao
2017/01/27 21:25:41
While it is an error to call JoinForTesting() or W
|
| + worker_pool_impl_pool = worker_pool_impl_pools_[index].get(); |
| + } |
| + return worker_pool_impl_pool->CreateSingleThreadTaskRunnerWithTraits(traits); |
| +} |
| + |
| +void SchedulerSingleThreadWorkerPoolManager::JoinForTesting() { |
| + WaitForAllWorkersIdleForTesting(); |
| + // SingleThreadSchedulerWorkerPoolImplPool::JoinForTesting() acquires a |
| + // SchedulerLock, so we move |worker_pool_impl_pools| locally, request the |
| + // join, and move it back once we're done. |
| + decltype(worker_pool_impl_pools_) local; |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
|
fdoray
2017/01/27 16:47:34
No lock / moving to a local variable required. |wo
robliao
2017/01/27 21:25:40
The following line changes worker_pool_impl_pools_
|
| + local = std::move(worker_pool_impl_pools_); |
| + } |
| + for (const auto& worker_pool_impl_pool : local) |
| + worker_pool_impl_pool->JoinForTesting(); |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + DCHECK(worker_pool_impl_pools_.empty()); |
| + worker_pool_impl_pools_ = std::move(local); |
| + } |
| +} |
| + |
| +void SchedulerSingleThreadWorkerPoolManager::WaitForAllWorkersIdleForTesting() { |
| + // SingleThreadSchedulerWorkerPoolImplPool::WaitForAllWorkersIdleForTesting() |
| + // acquires a SchedulerLock, so we move |worker_pool_impl_pools| locally, |
| + // request the wait, and move it back once we're done. |
| + decltype(worker_pool_impl_pools_) local; |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
|
fdoray
2017/01/27 16:47:35
No lock / moving to a local variable required. |wo
robliao
2017/01/27 21:25:40
See above.
|
| + local = std::move(worker_pool_impl_pools_); |
| + } |
| + for (const auto& worker_pool_impl_pool : local) |
| + worker_pool_impl_pool->WaitForAllWorkersIdleForTesting(); |
| + { |
| + AutoSchedulerLock auto_lock(lock_); |
| + DCHECK(worker_pool_impl_pools_.empty()); |
| + worker_pool_impl_pools_ = std::move(local); |
| + } |
| +} |
| + |
| +} // namespace internal |
| +} // namespace base |