Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_worker_pool_manager.h"
6
7 #include <string>
8
9 #include "base/atomicops.h"
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/task_scheduler/scheduler_worker_pool_params.h"
15 #include "base/task_scheduler/task_traits.h"
16 #include "base/threading/thread.h"
17
18 namespace base {
19 namespace internal {
20
21 class SchedulerSingleThreadWorkerPoolManager::
22 SingleThreadSchedulerWorkerPoolImplPool {
23 public:
24 SingleThreadSchedulerWorkerPoolImplPool(
25 const SchedulerWorkerPoolParams& params,
26 TaskTracker* task_tracker,
27 DelayedTaskManager* delayed_task_manager)
28 : params_(params),
29 task_tracker_(task_tracker),
30 delayed_task_manager_(delayed_task_manager),
31 unregister_single_thread_worker_pool_callback_(
32 Bind(&SingleThreadSchedulerWorkerPoolImplPool::
33 UnregisterSingleThreadWorkerPool,
34 Unretained(this))) {}
35 ~SingleThreadSchedulerWorkerPoolImplPool() = default;
36
37 scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
38 const TaskTraits& traits) {
39 return GetAvailableOrCreateSchedulerWorkerPool()
40 ->CreateSingleThreadTaskRunnerWithTraits(traits);
41 }
42
43 void JoinForTesting() {
44 // SchedulerWorkerPoolImpl::JoinForTesting() acquires a SchedulerLock, so we
45 // move |worker_pools_| locally, disallow detachment and join, and then
46 // move it back once we're done.
47 decltype(worker_pools_) local;
48 {
49 AutoSchedulerLock auto_lock(lock_);
50 local = std::move(worker_pools_);
51 }
52 for (const auto& worker_pool : local)
53 worker_pool->DisallowWorkerDetachmentForTesting();
54 for (const auto& worker_pool : local)
55 worker_pool->JoinForTesting();
56 {
57 AutoSchedulerLock auto_lock(lock_);
58 DCHECK(worker_pools_.empty());
59 worker_pools_ = std::move(local);
60 }
61 }
62
63 void WaitForAllWorkersIdleForTesting() {
64 // SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() acquires a
65 // SchedulerLock, so we move |worker_pools_| locally, request the wait, and
66 // then move it back once we're done.
67 decltype(worker_pools_) local;
68 {
69 AutoSchedulerLock auto_lock(lock_);
70 local = std::move(worker_pools_);
71 }
72 for (const auto& worker_pool : local)
73 worker_pool->WaitForAllWorkersIdleForTesting();
74 {
75 AutoSchedulerLock auto_lock(lock_);
76 DCHECK(worker_pools_.empty());
77 worker_pools_ = std::move(local);
78 }
79 }
80
81 private:
82 SchedulerWorkerPoolImpl* GetAvailableOrCreateSchedulerWorkerPool() {
83 {
84 AutoSchedulerLock auto_lock(lock_);
85 if (!free_worker_pool_indexes_.empty()) {
86 size_t index = free_worker_pool_indexes_.back();
87 free_worker_pool_indexes_.pop_back();
88 return worker_pools_[index].get();
89 }
90 }
91
92 int num_worker_pools = static_cast<int>(
93 subtle::NoBarrier_AtomicIncrement(&num_worker_pools_, 1));
94 // Subtract 1 so we can index our worker pools at 0.
95 int worker_pool_index = num_worker_pools - 1;
96
97 SchedulerWorkerPoolParams worker_pool_params(
98 base::StringPrintf("Single%s%d", params_.name().c_str(),
99 worker_pool_index),
100 params_.priority_hint(),
101 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY, 1,
102 params_.suggested_reclaim_time(), params_.backward_compatibility());
103 // SchedulerWorkerPoolImpl::Create acquires locks underneath, so we have to
104 // construct the new SchedulerWorkerPoolImpl outside of
105 // |lock_|.
106 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool =
107 SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
108 worker_pool_params, unregister_single_thread_worker_pool_callback_,
109 task_tracker_, delayed_task_manager_);
110 DCHECK(worker_pool);
111 AutoSchedulerLock auto_lock(lock_);
112 worker_pools_.emplace_back(std::move(worker_pool));
113 return worker_pools_.back().get();
114 }
115
116 void UnregisterSingleThreadWorkerPool(
117 const SchedulerWorkerPoolImpl* scheduler_worker_pool) {
118 // Reuse is best effort. If the last SingleThreadTaskRunner is released
119 // during either JoinForTesting() or WaitForAllWorkersIdleForTesting(), it
120 // will not be eligible for reuse.
121 AutoSchedulerLock auto_lock(lock_);
122 size_t worker_pool_count = worker_pools_.size();
123 for (size_t i = 0; i < worker_pool_count; ++i) {
124 if (scheduler_worker_pool == worker_pools_[i].get()) {
125 free_worker_pool_indexes_.push_back(i);
126 return;
127 }
128 }
129 DCHECK_EQ(0U, worker_pool_count);
130 }
131
132 const SchedulerWorkerPoolParams params_;
133 TaskTracker* const task_tracker_;
134 DelayedTaskManager* const delayed_task_manager_;
135 const SchedulerWorkerPoolImpl::UnregisterSingleThreadWorkerPoolCallback
136 unregister_single_thread_worker_pool_callback_;
137
138 // Used to provide an 0-based index in the worker pool name. We can't rely on
139 // |worker_pools_| for this because of the lock and data dance we do for
140 // SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(). We need the count
141 // for construction but we can't hold the lock during construction. As a
142 // result, the count could change during construction!
143 subtle::Atomic32 num_worker_pools_ = 0;
144
145 // |lock_| synchronizes the items below.
146 SchedulerLock lock_;
147 std::vector<std::unique_ptr<SchedulerWorkerPoolImpl>> worker_pools_;
148 std::vector<size_t> free_worker_pool_indexes_;
149
150 DISALLOW_COPY_AND_ASSIGN(SingleThreadSchedulerWorkerPoolImplPool);
151 };
152
153 SchedulerSingleThreadWorkerPoolManager::SchedulerSingleThreadWorkerPoolManager(
154 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
155 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
156 worker_pool_index_for_traits_callback,
157 TaskTracker* task_tracker,
158 DelayedTaskManager* delayed_task_manager)
159 : worker_pool_index_for_traits_callback_(
160 worker_pool_index_for_traits_callback) {
161 size_t worker_pool_count = worker_pool_params_vector.size();
162 worker_pool_impl_pools_.resize(worker_pool_count);
163 for (size_t i = 0; i < worker_pool_count; i++) {
164 worker_pool_impl_pools_[i] =
165 MakeUnique<SingleThreadSchedulerWorkerPoolImplPool>(
166 worker_pool_params_vector[i], task_tracker, delayed_task_manager);
167 }
168 }
169
170 SchedulerSingleThreadWorkerPoolManager::
171 ~SchedulerSingleThreadWorkerPoolManager() = default;
172
173 scoped_refptr<SingleThreadTaskRunner>
174 SchedulerSingleThreadWorkerPoolManager::CreateSingleThreadTaskRunnerWithTraits(
175 const TaskTraits& traits) {
176 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
177 SingleThreadSchedulerWorkerPoolImplPool* worker_pool_impl_pool;
178 {
179 AutoSchedulerLock auto_lock(lock_);
180 worker_pool_impl_pool = worker_pool_impl_pools_[index].get();
181 }
182 return worker_pool_impl_pool->CreateSingleThreadTaskRunnerWithTraits(traits);
183 }
184
185 void SchedulerSingleThreadWorkerPoolManager::JoinForTesting() {
186 WaitForAllWorkersIdleForTesting();
187 // SingleThreadSchedulerWorkerPoolImplPool::JoinForTesting() acquires a
188 // SchedulerLock, so we move |worker_pool_impl_pools| locally, request the
189 // join, and move it back once we're done.
190 decltype(worker_pool_impl_pools_) local;
191 {
192 AutoSchedulerLock auto_lock(lock_);
193 local = std::move(worker_pool_impl_pools_);
194 }
195 for (const auto& worker_pool_impl_pool : local)
196 worker_pool_impl_pool->JoinForTesting();
197 {
198 AutoSchedulerLock auto_lock(lock_);
199 DCHECK(worker_pool_impl_pools_.empty());
200 worker_pool_impl_pools_ = std::move(local);
201 }
202 }
203
204 void SchedulerSingleThreadWorkerPoolManager::WaitForAllWorkersIdleForTesting() {
205 // SingleThreadSchedulerWorkerPoolImplPool::WaitForAllWorkersIdleForTesting()
206 // acquires a SchedulerLock, so we move |worker_pool_impl_pools| locally,
207 // request the wait, and move it back once we're done.
208 decltype(worker_pool_impl_pools_) local;
209 {
210 AutoSchedulerLock auto_lock(lock_);
211 local = std::move(worker_pool_impl_pools_);
212 }
213 for (const auto& worker_pool_impl_pool : local)
214 worker_pool_impl_pool->WaitForAllWorkersIdleForTesting();
215 {
216 AutoSchedulerLock auto_lock(lock_);
217 DCHECK(worker_pool_impl_pools_.empty());
218 worker_pool_impl_pools_ = std::move(local);
219 }
220 }
221
222 } // namespace internal
223 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698