Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(765)

Unified Diff: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
diff --git a/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
new file mode 100644
index 0000000000000000000000000000000000000000..cf03e60db710b1cfbc307b161c3202055d7f9e31
--- /dev/null
+++ b/base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc
@@ -0,0 +1,212 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/scheduler_single_thread_worker_pool_manager.h"
+
+#include <string>
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/memory/ptr_util.h"
+#include "base/strings/stringprintf.h"
+#include "base/task_scheduler/scheduler_worker_pool_params.h"
+#include "base/task_scheduler/task_traits.h"
+#include "base/threading/thread.h"
+
+namespace base {
+namespace internal {
+
+class SchedulerSingleThreadWorkerPoolManager::
+ SingleThreadSchedulerWorkerPoolImplPool {
+ public:
+ SingleThreadSchedulerWorkerPoolImplPool(
+ const SchedulerWorkerPoolParams& params,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : params_(params),
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager),
+ unregister_single_thread_worker_pool_callback_(
+ Bind(&SingleThreadSchedulerWorkerPoolImplPool::
+ UnregisterSingleThreadWorkerPool,
+ Unretained(this))) {}
+ ~SingleThreadSchedulerWorkerPoolImplPool() = default;
+
+ scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ return GetAvailableOrCreateSchedulerWorkerPool()
+ ->CreateSingleThreadTaskRunnerWithTraits(traits);
+ }
+
+ void JoinForTesting() {
fdoray 2017/01/27 16:47:35 Set a |join_for_testing_called_| flag and DCHECK t
robliao 2017/01/27 21:25:41 I'm going to make worker pool cleanup best-effort
+ // SchedulerWorkerPoolImpl::JoinForTesting() acquires a SchedulerLock, so we
+ // move |worker_pools_| locally, request the detachment and join, and then
fdoray 2017/01/27 16:47:35 s/request the detachment/disallow detachment/
robliao 2017/01/27 21:25:40 Done.
+ // move it back once we're done.
+ decltype(worker_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local = std::move(worker_pools_);
+ }
+ for (const auto& worker_pool : local)
+ worker_pool->DisallowWorkerDetachmentForTesting();
+ for (const auto& worker_pool : local)
+ worker_pool->JoinForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pools_.empty());
+ worker_pools_ = std::move(local);
+ }
+ }
+
+ void WaitForAllWorkersIdleForTesting() {
+ // SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() acquires a
+ // SchedulerLock, so we move |worker_pools_| locally, request the wait, and
+ // then move it back once we're done.
+ decltype(worker_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local = std::move(worker_pools_);
+ }
+ for (const auto& worker_pool : local)
+ worker_pool->WaitForAllWorkersIdleForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pools_.empty());
+ worker_pools_ = std::move(local);
+ }
+ }
+
+ private:
+ SchedulerWorkerPoolImpl* GetAvailableOrCreateSchedulerWorkerPool() {
+ int num_worker_pools = 0;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ if (free_worker_pool_indexes_.empty()) {
+ num_worker_pools = worker_pools_.size();
fdoray 2017/01/27 16:47:34 A concurrent call to GetAvailableOrCreateScheduler
robliao 2017/01/27 21:25:40 Nice catch! Indeed! Fixed.
+ } else {
+ size_t index = free_worker_pool_indexes_.back();
fdoray 2017/01/27 16:47:35 Optional: We create SingleThreadTaskRunners for b
robliao 2017/01/27 21:25:41 While skipping the cleanup code would keep things
+ free_worker_pool_indexes_.pop_back();
+ return worker_pools_[index].get();
+ }
+ }
+
+ SchedulerWorkerPoolParams worker_pool_params(
+ base::StringPrintf("Single%s%d", params_.name().c_str(),
+ num_worker_pools),
+ params_.priority_hint(),
+ SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY, 1,
+ params_.suggested_reclaim_time(), params_.backward_compatibility());
+ // SchedulerWorkerPoolImpl::Create acquires locks underneath, so we have to
+ // construct the new SchedulerWorkerPoolImpl outside of
+ // |single_thread_worker_pools_lock_|.
fdoray 2017/01/27 16:47:34 s/single_thread_worker_pools_lock_/lock_/
robliao 2017/01/27 21:25:40 Done.
+ std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool =
+ SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
+ worker_pool_params, unregister_single_thread_worker_pool_callback_,
+ task_tracker_, delayed_task_manager_);
+ DCHECK(worker_pool);
+ AutoSchedulerLock auto_lock(lock_);
+ worker_pools_.emplace_back(std::move(worker_pool));
+ return worker_pools_.back().get();
+ }
+
+ void UnregisterSingleThreadWorkerPool(
+ const SchedulerWorkerPoolImpl* scheduler_worker_pool) {
+ AutoSchedulerLock auto_lock(lock_);
+ size_t worker_pool_count = worker_pools_.size();
+ DCHECK_NE(0U, worker_pool_count);
+ for (size_t i = 0; i < worker_pool_count; ++i) {
+ if (scheduler_worker_pool == worker_pools_[i].get()) {
+ free_worker_pool_indexes_.push_back(i);
+ return;
+ }
+ }
+ NOTREACHED();
+ }
+
+ const SchedulerWorkerPoolParams params_;
+ TaskTracker* const task_tracker_;
+ DelayedTaskManager* const delayed_task_manager_;
+ const SchedulerWorkerPoolImpl::UnregisterSingleThreadWorkerPoolCallback
+ unregister_single_thread_worker_pool_callback_;
+
+ // |lock_| synchronizes the items below.
+ SchedulerLock lock_;
+ std::vector<std::unique_ptr<SchedulerWorkerPoolImpl>> worker_pools_;
+ std::vector<size_t> free_worker_pool_indexes_;
+
+ DISALLOW_COPY_AND_ASSIGN(SingleThreadSchedulerWorkerPoolImplPool);
+};
+
+SchedulerSingleThreadWorkerPoolManager::SchedulerSingleThreadWorkerPoolManager(
+ const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
+ const TaskScheduler::WorkerPoolIndexForTraitsCallback&
+ worker_pool_index_for_traits_callback,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : worker_pool_params_vector_(worker_pool_params_vector),
+ worker_pool_index_for_traits_callback_(
+ worker_pool_index_for_traits_callback) {
+ size_t worker_pool_count = worker_pool_params_vector_.size();
+ worker_pool_impl_pools_.resize(worker_pool_count);
+ for (size_t i = 0; i < worker_pool_count; i++) {
+ worker_pool_impl_pools_[i] =
+ MakeUnique<SingleThreadSchedulerWorkerPoolImplPool>(
+ worker_pool_params_vector[i], task_tracker, delayed_task_manager);
+ }
+}
+
+SchedulerSingleThreadWorkerPoolManager::
+ ~SchedulerSingleThreadWorkerPoolManager() = default;
+
+scoped_refptr<SingleThreadTaskRunner>
+SchedulerSingleThreadWorkerPoolManager::CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ size_t index = worker_pool_index_for_traits_callback_.Run(traits);
+ SingleThreadSchedulerWorkerPoolImplPool* worker_pool_impl_pool;
+ {
+ AutoSchedulerLock auto_lock(lock_);
fdoray 2017/01/27 16:47:34 No lock required. |worker_pool_impl_pools_| is onl
robliao 2017/01/27 21:25:41 While it is an error to call JoinForTesting() or W
+ worker_pool_impl_pool = worker_pool_impl_pools_[index].get();
+ }
+ return worker_pool_impl_pool->CreateSingleThreadTaskRunnerWithTraits(traits);
+}
+
+void SchedulerSingleThreadWorkerPoolManager::JoinForTesting() {
+ WaitForAllWorkersIdleForTesting();
+ // SingleThreadSchedulerWorkerPoolImplPool::JoinForTesting() acquires a
+ // SchedulerLock, so we move |worker_pool_impl_pools| locally, request the
+ // join, and move it back once we're done.
+ decltype(worker_pool_impl_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
fdoray 2017/01/27 16:47:34 No lock / moving to a local variable required. |wo
robliao 2017/01/27 21:25:40 The following line changes worker_pool_impl_pools_
+ local = std::move(worker_pool_impl_pools_);
+ }
+ for (const auto& worker_pool_impl_pool : local)
+ worker_pool_impl_pool->JoinForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pool_impl_pools_.empty());
+ worker_pool_impl_pools_ = std::move(local);
+ }
+}
+
+void SchedulerSingleThreadWorkerPoolManager::WaitForAllWorkersIdleForTesting() {
+ // SingleThreadSchedulerWorkerPoolImplPool::WaitForAllWorkersIdleForTesting()
+ // acquires a SchedulerLock, so we move |worker_pool_impl_pools| locally,
+ // request the wait, and move it back once we're done.
+ decltype(worker_pool_impl_pools_) local;
+ {
+ AutoSchedulerLock auto_lock(lock_);
fdoray 2017/01/27 16:47:35 No lock / moving to a local variable required. |wo
robliao 2017/01/27 21:25:40 See above.
+ local = std::move(worker_pool_impl_pools_);
+ }
+ for (const auto& worker_pool_impl_pool : local)
+ worker_pool_impl_pool->WaitForAllWorkersIdleForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(worker_pool_impl_pools_.empty());
+ worker_pool_impl_pools_ = std::move(local);
+ }
+}
+
+} // namespace internal
+} // namespace base

Powered by Google App Engine
This is Rietveld 408576698