| Index: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
|
| diff --git a/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ba6e8f0720e0f21c9b56c578564398154010e345
|
| --- /dev/null
|
| +++ b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
|
| @@ -0,0 +1,223 @@
|
| +// Copyright 2017 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/task_scheduler/scheduler_single_thread_worker_pool_manager.h"
|
| +
|
| +#include <string>
|
| +
|
| +#include "base/atomicops.h"
|
| +#include "base/bind.h"
|
| +#include "base/bind_helpers.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/strings/stringprintf.h"
|
| +#include "base/task_scheduler/scheduler_worker_pool_params.h"
|
| +#include "base/task_scheduler/task_traits.h"
|
| +#include "base/threading/thread.h"
|
| +
|
| +namespace base {
|
| +namespace internal {
|
| +
|
| +class SchedulerSingleThreadWorkerPoolManager::
|
| + SingleThreadSchedulerWorkerPoolImplPool {
|
| + public:
|
| + SingleThreadSchedulerWorkerPoolImplPool(
|
| + const SchedulerWorkerPoolParams& params,
|
| + TaskTracker* task_tracker,
|
| + DelayedTaskManager* delayed_task_manager)
|
| + : params_(params),
|
| + task_tracker_(task_tracker),
|
| + delayed_task_manager_(delayed_task_manager),
|
| + unregister_single_thread_worker_pool_callback_(
|
| + Bind(&SingleThreadSchedulerWorkerPoolImplPool::
|
| + UnregisterSingleThreadWorkerPool,
|
| + Unretained(this))) {}
|
| + ~SingleThreadSchedulerWorkerPoolImplPool() = default;
|
| +
|
| + scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
|
| + const TaskTraits& traits) {
|
| + return GetAvailableOrCreateSchedulerWorkerPool()
|
| + ->CreateSingleThreadTaskRunnerWithTraits(traits);
|
| + }
|
| +
|
| + void JoinForTesting() {
|
| + // SchedulerWorkerPoolImpl::JoinForTesting() acquires a SchedulerLock, so we
|
| + // move |worker_pools_| locally, disallow detachment and join, and then
|
| + // move it back once we're done.
|
| + decltype(worker_pools_) local;
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + local = std::move(worker_pools_);
|
| + }
|
| + for (const auto& worker_pool : local)
|
| + worker_pool->DisallowWorkerDetachmentForTesting();
|
| + for (const auto& worker_pool : local)
|
| + worker_pool->JoinForTesting();
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + DCHECK(worker_pools_.empty());
|
| + worker_pools_ = std::move(local);
|
| + }
|
| + }
|
| +
|
| + void WaitForAllWorkersIdleForTesting() {
|
| + // SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() acquires a
|
| + // SchedulerLock, so we move |worker_pools_| locally, request the wait, and
|
| + // then move it back once we're done.
|
| + decltype(worker_pools_) local;
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + local = std::move(worker_pools_);
|
| + }
|
| + for (const auto& worker_pool : local)
|
| + worker_pool->WaitForAllWorkersIdleForTesting();
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + DCHECK(worker_pools_.empty());
|
| + worker_pools_ = std::move(local);
|
| + }
|
| + }
|
| +
|
| + private:
|
| + SchedulerWorkerPoolImpl* GetAvailableOrCreateSchedulerWorkerPool() {
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + if (!free_worker_pool_indexes_.empty()) {
|
| + size_t index = free_worker_pool_indexes_.back();
|
| + free_worker_pool_indexes_.pop_back();
|
| + return worker_pools_[index].get();
|
| + }
|
| + }
|
| +
|
| + int num_worker_pools = static_cast<int>(
|
| + subtle::NoBarrier_AtomicIncrement(&num_worker_pools_, 1));
|
| + // Subtract 1 so we can index our worker pools at 0.
|
| + int worker_pool_index = num_worker_pools - 1;
|
| +
|
| + SchedulerWorkerPoolParams worker_pool_params(
|
| + base::StringPrintf("Single%s%d", params_.name().c_str(),
|
| + worker_pool_index),
|
| + params_.priority_hint(),
|
| + SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY, 1,
|
| + params_.suggested_reclaim_time(), params_.backward_compatibility());
|
| + // SchedulerWorkerPoolImpl::Create acquires locks underneath, so we have to
|
| + // construct the new SchedulerWorkerPoolImpl outside of
|
| + // |lock_|.
|
| + std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool =
|
| + SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
|
| + worker_pool_params, unregister_single_thread_worker_pool_callback_,
|
| + task_tracker_, delayed_task_manager_);
|
| + DCHECK(worker_pool);
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + worker_pools_.emplace_back(std::move(worker_pool));
|
| + return worker_pools_.back().get();
|
| + }
|
| +
|
| + void UnregisterSingleThreadWorkerPool(
|
| + const SchedulerWorkerPoolImpl* scheduler_worker_pool) {
|
| + // Reuse is best effort. If the last SingleThreadTaskRunner is released
|
| + // during either JoinForTesting() or WaitForAllWorkersIdleForTesting(), it
|
| + // will not be eligible for reuse.
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + size_t worker_pool_count = worker_pools_.size();
|
| + for (size_t i = 0; i < worker_pool_count; ++i) {
|
| + if (scheduler_worker_pool == worker_pools_[i].get()) {
|
| + free_worker_pool_indexes_.push_back(i);
|
| + return;
|
| + }
|
| + }
|
| + DCHECK_EQ(0U, worker_pool_count);
|
| + }
|
| +
|
| + const SchedulerWorkerPoolParams params_;
|
| + TaskTracker* const task_tracker_;
|
| + DelayedTaskManager* const delayed_task_manager_;
|
| + const SchedulerWorkerPoolImpl::UnregisterSingleThreadWorkerPoolCallback
|
| + unregister_single_thread_worker_pool_callback_;
|
| +
|
| + // Used to provide an 0-based index in the worker pool name. We can't rely on
|
| + // |worker_pools_| for this because of the lock and data dance we do for
|
| + // SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(). We need the count
|
| + // for construction but we can't hold the lock during construction. As a
|
| + // result, the count could change during construction!
|
| + subtle::Atomic32 num_worker_pools_ = 0;
|
| +
|
| + // |lock_| synchronizes the items below.
|
| + SchedulerLock lock_;
|
| + std::vector<std::unique_ptr<SchedulerWorkerPoolImpl>> worker_pools_;
|
| + std::vector<size_t> free_worker_pool_indexes_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SingleThreadSchedulerWorkerPoolImplPool);
|
| +};
|
| +
|
| +SchedulerSingleThreadWorkerPoolManager::SchedulerSingleThreadWorkerPoolManager(
|
| + const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
|
| + const TaskScheduler::WorkerPoolIndexForTraitsCallback&
|
| + worker_pool_index_for_traits_callback,
|
| + TaskTracker* task_tracker,
|
| + DelayedTaskManager* delayed_task_manager)
|
| + : worker_pool_index_for_traits_callback_(
|
| + worker_pool_index_for_traits_callback) {
|
| + size_t worker_pool_count = worker_pool_params_vector.size();
|
| + worker_pool_impl_pools_.resize(worker_pool_count);
|
| + for (size_t i = 0; i < worker_pool_count; i++) {
|
| + worker_pool_impl_pools_[i] =
|
| + MakeUnique<SingleThreadSchedulerWorkerPoolImplPool>(
|
| + worker_pool_params_vector[i], task_tracker, delayed_task_manager);
|
| + }
|
| +}
|
| +
|
| +SchedulerSingleThreadWorkerPoolManager::
|
| + ~SchedulerSingleThreadWorkerPoolManager() = default;
|
| +
|
| +scoped_refptr<SingleThreadTaskRunner>
|
| +SchedulerSingleThreadWorkerPoolManager::CreateSingleThreadTaskRunnerWithTraits(
|
| + const TaskTraits& traits) {
|
| + size_t index = worker_pool_index_for_traits_callback_.Run(traits);
|
| + SingleThreadSchedulerWorkerPoolImplPool* worker_pool_impl_pool;
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + worker_pool_impl_pool = worker_pool_impl_pools_[index].get();
|
| + }
|
| + return worker_pool_impl_pool->CreateSingleThreadTaskRunnerWithTraits(traits);
|
| +}
|
| +
|
| +void SchedulerSingleThreadWorkerPoolManager::JoinForTesting() {
|
| + WaitForAllWorkersIdleForTesting();
|
| + // SingleThreadSchedulerWorkerPoolImplPool::JoinForTesting() acquires a
|
| + // SchedulerLock, so we move |worker_pool_impl_pools| locally, request the
|
| + // join, and move it back once we're done.
|
| + decltype(worker_pool_impl_pools_) local;
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + local = std::move(worker_pool_impl_pools_);
|
| + }
|
| + for (const auto& worker_pool_impl_pool : local)
|
| + worker_pool_impl_pool->JoinForTesting();
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + DCHECK(worker_pool_impl_pools_.empty());
|
| + worker_pool_impl_pools_ = std::move(local);
|
| + }
|
| +}
|
| +
|
| +void SchedulerSingleThreadWorkerPoolManager::WaitForAllWorkersIdleForTesting() {
|
| + // SingleThreadSchedulerWorkerPoolImplPool::WaitForAllWorkersIdleForTesting()
|
| + // acquires a SchedulerLock, so we move |worker_pool_impl_pools| locally,
|
| + // request the wait, and move it back once we're done.
|
| + decltype(worker_pool_impl_pools_) local;
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + local = std::move(worker_pool_impl_pools_);
|
| + }
|
| + for (const auto& worker_pool_impl_pool : local)
|
| + worker_pool_impl_pool->WaitForAllWorkersIdleForTesting();
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + DCHECK(worker_pool_impl_pools_.empty());
|
| + worker_pool_impl_pools_ = std::move(local);
|
| + }
|
| +}
|
| +
|
| +} // namespace internal
|
| +} // namespace base
|
|
|