Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3259)

Unified Diff: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: CR Feedback Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
diff --git a/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
new file mode 100644
index 0000000000000000000000000000000000000000..ba6e8f0720e0f21c9b56c578564398154010e345
--- /dev/null
+++ b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
@@ -0,0 +1,223 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/scheduler_single_thread_worker_pool_manager.h"
+
+#include <string>
+
+#include "base/atomicops.h"
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/memory/ptr_util.h"
+#include "base/strings/stringprintf.h"
+#include "base/task_scheduler/scheduler_worker_pool_params.h"
+#include "base/task_scheduler/task_traits.h"
+#include "base/threading/thread.h"
+
+namespace base {
+namespace internal {
+
+class SchedulerSingleThreadWorkerPoolManager::
+ SingleThreadSchedulerWorkerPoolImplPool {
+ public:
+ SingleThreadSchedulerWorkerPoolImplPool(
+ const SchedulerWorkerPoolParams& params,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : params_(params),
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager),
+ unregister_single_thread_worker_pool_callback_(
+ Bind(&SingleThreadSchedulerWorkerPoolImplPool::
+ UnregisterSingleThreadWorkerPool,
+ Unretained(this))) {}
+ ~SingleThreadSchedulerWorkerPoolImplPool() = default;
+
+ scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ return GetAvailableOrCreateSchedulerWorkerPool()
+ ->CreateSingleThreadTaskRunnerWithTraits(traits);
+ }
+
+ void JoinForTesting() {
+ // SchedulerWorkerPoolImpl::JoinForTesting() acquires a SchedulerLock, so we
+ // move |worker_pools_| locally, disallow detachment and join, and then
+ // move it back once we're done.
+ decltype(worker_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local = std::move(worker_pools_);
+ }
+ for (const auto& worker_pool : local)
+ worker_pool->DisallowWorkerDetachmentForTesting();
+ for (const auto& worker_pool : local)
+ worker_pool->JoinForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pools_.empty());
+ worker_pools_ = std::move(local);
+ }
+ }
+
+ void WaitForAllWorkersIdleForTesting() {
+ // SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() acquires a
+ // SchedulerLock, so we move |worker_pools_| locally, request the wait, and
+ // then move it back once we're done.
+ decltype(worker_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local = std::move(worker_pools_);
+ }
+ for (const auto& worker_pool : local)
+ worker_pool->WaitForAllWorkersIdleForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pools_.empty());
+ worker_pools_ = std::move(local);
+ }
+ }
+
+ private:
+ SchedulerWorkerPoolImpl* GetAvailableOrCreateSchedulerWorkerPool() {
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ if (!free_worker_pool_indexes_.empty()) {
+ size_t index = free_worker_pool_indexes_.back();
+ free_worker_pool_indexes_.pop_back();
+ return worker_pools_[index].get();
+ }
+ }
+
+ int num_worker_pools = static_cast<int>(
+ subtle::NoBarrier_AtomicIncrement(&num_worker_pools_, 1));
+ // Subtract 1 so we can index our worker pools at 0.
+ int worker_pool_index = num_worker_pools - 1;
+
+ SchedulerWorkerPoolParams worker_pool_params(
+ base::StringPrintf("Single%s%d", params_.name().c_str(),
+ worker_pool_index),
+ params_.priority_hint(),
+ SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY, 1,
+ params_.suggested_reclaim_time(), params_.backward_compatibility());
+ // SchedulerWorkerPoolImpl::Create acquires locks underneath, so we have to
+ // construct the new SchedulerWorkerPoolImpl outside of
+ // |lock_|.
+ std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool =
+ SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
+ worker_pool_params, unregister_single_thread_worker_pool_callback_,
+ task_tracker_, delayed_task_manager_);
+ DCHECK(worker_pool);
+ AutoSchedulerLock auto_lock(lock_);
+ worker_pools_.emplace_back(std::move(worker_pool));
+ return worker_pools_.back().get();
+ }
+
+ void UnregisterSingleThreadWorkerPool(
+ const SchedulerWorkerPoolImpl* scheduler_worker_pool) {
+ // Reuse is best effort. If the last SingleThreadTaskRunner is released
+ // during either JoinForTesting() or WaitForAllWorkersIdleForTesting(), it
+ // will not be eligible for reuse.
+ AutoSchedulerLock auto_lock(lock_);
+ size_t worker_pool_count = worker_pools_.size();
+ for (size_t i = 0; i < worker_pool_count; ++i) {
+ if (scheduler_worker_pool == worker_pools_[i].get()) {
+ free_worker_pool_indexes_.push_back(i);
+ return;
+ }
+ }
+ DCHECK_EQ(0U, worker_pool_count);
+ }
+
+ const SchedulerWorkerPoolParams params_;
+ TaskTracker* const task_tracker_;
+ DelayedTaskManager* const delayed_task_manager_;
+ const SchedulerWorkerPoolImpl::UnregisterSingleThreadWorkerPoolCallback
+ unregister_single_thread_worker_pool_callback_;
+
+ // Used to provide an 0-based index in the worker pool name. We can't rely on
+ // |worker_pools_| for this because of the lock and data dance we do for
+ // SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(). We need the count
+ // for construction but we can't hold the lock during construction. As a
+ // result, the count could change during construction!
+ subtle::Atomic32 num_worker_pools_ = 0;
+
+ // |lock_| synchronizes the items below.
+ SchedulerLock lock_;
+ std::vector<std::unique_ptr<SchedulerWorkerPoolImpl>> worker_pools_;
+ std::vector<size_t> free_worker_pool_indexes_;
+
+ DISALLOW_COPY_AND_ASSIGN(SingleThreadSchedulerWorkerPoolImplPool);
+};
+
+SchedulerSingleThreadWorkerPoolManager::SchedulerSingleThreadWorkerPoolManager(
+ const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
+ const TaskScheduler::WorkerPoolIndexForTraitsCallback&
+ worker_pool_index_for_traits_callback,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : worker_pool_index_for_traits_callback_(
+ worker_pool_index_for_traits_callback) {
+ size_t worker_pool_count = worker_pool_params_vector.size();
+ worker_pool_impl_pools_.resize(worker_pool_count);
+ for (size_t i = 0; i < worker_pool_count; i++) {
+ worker_pool_impl_pools_[i] =
+ MakeUnique<SingleThreadSchedulerWorkerPoolImplPool>(
+ worker_pool_params_vector[i], task_tracker, delayed_task_manager);
+ }
+}
+
+SchedulerSingleThreadWorkerPoolManager::
+ ~SchedulerSingleThreadWorkerPoolManager() = default;
+
+scoped_refptr<SingleThreadTaskRunner>
+SchedulerSingleThreadWorkerPoolManager::CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ size_t index = worker_pool_index_for_traits_callback_.Run(traits);
+ SingleThreadSchedulerWorkerPoolImplPool* worker_pool_impl_pool;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ worker_pool_impl_pool = worker_pool_impl_pools_[index].get();
+ }
+ return worker_pool_impl_pool->CreateSingleThreadTaskRunnerWithTraits(traits);
+}
+
+void SchedulerSingleThreadWorkerPoolManager::JoinForTesting() {
+ WaitForAllWorkersIdleForTesting();
+ // SingleThreadSchedulerWorkerPoolImplPool::JoinForTesting() acquires a
+ // SchedulerLock, so we move |worker_pool_impl_pools| locally, request the
+ // join, and move it back once we're done.
+ decltype(worker_pool_impl_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local = std::move(worker_pool_impl_pools_);
+ }
+ for (const auto& worker_pool_impl_pool : local)
+ worker_pool_impl_pool->JoinForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pool_impl_pools_.empty());
+ worker_pool_impl_pools_ = std::move(local);
+ }
+}
+
+void SchedulerSingleThreadWorkerPoolManager::WaitForAllWorkersIdleForTesting() {
+ // SingleThreadSchedulerWorkerPoolImplPool::WaitForAllWorkersIdleForTesting()
+ // acquires a SchedulerLock, so we move |worker_pool_impl_pools| locally,
+ // request the wait, and move it back once we're done.
+ decltype(worker_pool_impl_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local = std::move(worker_pool_impl_pools_);
+ }
+ for (const auto& worker_pool_impl_pool : local)
+ worker_pool_impl_pool->WaitForAllWorkersIdleForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pool_impl_pools_.empty());
+ worker_pool_impl_pools_ = std::move(local);
+ }
+}
+
+} // namespace internal
+} // namespace base

Powered by Google App Engine
This is Rietveld 408576698