Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(988)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_worker_pool_manager.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_worker_pool_manager.h"
6
7 #include <string>
8
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/task_scheduler/scheduler_worker_pool_params.h"
14 #include "base/task_scheduler/task_traits.h"
15 #include "base/threading/thread.h"
16
17 namespace base {
18 namespace internal {
19
20 class SchedulerSingleThreadWorkerPoolManager::
21 SingleThreadSchedulerWorkerPoolImplPool {
22 public:
23 SingleThreadSchedulerWorkerPoolImplPool(
24 const SchedulerWorkerPoolParams& params,
25 TaskTracker* task_tracker,
26 DelayedTaskManager* delayed_task_manager)
27 : params_(params),
28 task_tracker_(task_tracker),
29 delayed_task_manager_(delayed_task_manager),
30 unregister_single_thread_worker_pool_callback_(
31 Bind(&SingleThreadSchedulerWorkerPoolImplPool::
32 UnregisterSingleThreadWorkerPool,
33 Unretained(this))) {}
34 ~SingleThreadSchedulerWorkerPoolImplPool() = default;
35
36 scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
37 const TaskTraits& traits) {
38 return GetAvailableOrCreateSchedulerWorkerPool()
39 ->CreateSingleThreadTaskRunnerWithTraits(traits);
40 }
41
42 void JoinForTesting() {
fdoray 2017/01/27 16:47:35 Set a |join_for_testing_called_| flag and DCHECK t
robliao 2017/01/27 21:25:41 I'm going to make worker pool cleanup best-effort
43 // SchedulerWorkerPoolImpl::JoinForTesting() acquires a SchedulerLock, so we
44 // move |worker_pools_| locally, request the detachment and join, and then
fdoray 2017/01/27 16:47:35 s/request the detachment/disallow detachment/
robliao 2017/01/27 21:25:40 Done.
45 // move it back once we're done.
46 decltype(worker_pools_) local;
47 {
48 AutoSchedulerLock auto_lock(lock_);
49 local = std::move(worker_pools_);
50 }
51 for (const auto& worker_pool : local)
52 worker_pool->DisallowWorkerDetachmentForTesting();
53 for (const auto& worker_pool : local)
54 worker_pool->JoinForTesting();
55 {
56 AutoSchedulerLock auto_lock(lock_);
57 DCHECK(worker_pools_.empty());
58 worker_pools_ = std::move(local);
59 }
60 }
61
62 void WaitForAllWorkersIdleForTesting() {
63 // SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() acquires a
64 // SchedulerLock, so we move |worker_pools_| locally, request the wait, and
65 // then move it back once we're done.
66 decltype(worker_pools_) local;
67 {
68 AutoSchedulerLock auto_lock(lock_);
69 local = std::move(worker_pools_);
70 }
71 for (const auto& worker_pool : local)
72 worker_pool->WaitForAllWorkersIdleForTesting();
73 {
74 AutoSchedulerLock auto_lock(lock_);
75 DCHECK(worker_pools_.empty());
76 worker_pools_ = std::move(local);
77 }
78 }
79
80 private:
81 SchedulerWorkerPoolImpl* GetAvailableOrCreateSchedulerWorkerPool() {
82 int num_worker_pools = 0;
83 {
84 AutoSchedulerLock auto_lock(lock_);
85 if (free_worker_pool_indexes_.empty()) {
86 num_worker_pools = worker_pools_.size();
fdoray 2017/01/27 16:47:34 A concurrent call to GetAvailableOrCreateScheduler
robliao 2017/01/27 21:25:40 Nice catch! Indeed! Fixed.
87 } else {
88 size_t index = free_worker_pool_indexes_.back();
fdoray 2017/01/27 16:47:35 Optional: We create SingleThreadTaskRunners for b
robliao 2017/01/27 21:25:41 While skipping the cleanup code would keep things
89 free_worker_pool_indexes_.pop_back();
90 return worker_pools_[index].get();
91 }
92 }
93
94 SchedulerWorkerPoolParams worker_pool_params(
95 base::StringPrintf("Single%s%d", params_.name().c_str(),
96 num_worker_pools),
97 params_.priority_hint(),
98 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY, 1,
99 params_.suggested_reclaim_time(), params_.backward_compatibility());
100 // SchedulerWorkerPoolImpl::Create acquires locks underneath, so we have to
101 // construct the new SchedulerWorkerPoolImpl outside of
102 // |single_thread_worker_pools_lock_|.
fdoray 2017/01/27 16:47:34 s/single_thread_worker_pools_lock_/lock_/
robliao 2017/01/27 21:25:40 Done.
103 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool =
104 SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
105 worker_pool_params, unregister_single_thread_worker_pool_callback_,
106 task_tracker_, delayed_task_manager_);
107 DCHECK(worker_pool);
108 AutoSchedulerLock auto_lock(lock_);
109 worker_pools_.emplace_back(std::move(worker_pool));
110 return worker_pools_.back().get();
111 }
112
113 void UnregisterSingleThreadWorkerPool(
114 const SchedulerWorkerPoolImpl* scheduler_worker_pool) {
115 AutoSchedulerLock auto_lock(lock_);
116 size_t worker_pool_count = worker_pools_.size();
117 DCHECK_NE(0U, worker_pool_count);
118 for (size_t i = 0; i < worker_pool_count; ++i) {
119 if (scheduler_worker_pool == worker_pools_[i].get()) {
120 free_worker_pool_indexes_.push_back(i);
121 return;
122 }
123 }
124 NOTREACHED();
125 }
126
127 const SchedulerWorkerPoolParams params_;
128 TaskTracker* const task_tracker_;
129 DelayedTaskManager* const delayed_task_manager_;
130 const SchedulerWorkerPoolImpl::UnregisterSingleThreadWorkerPoolCallback
131 unregister_single_thread_worker_pool_callback_;
132
133 // |lock_| synchronizes the items below.
134 SchedulerLock lock_;
135 std::vector<std::unique_ptr<SchedulerWorkerPoolImpl>> worker_pools_;
136 std::vector<size_t> free_worker_pool_indexes_;
137
138 DISALLOW_COPY_AND_ASSIGN(SingleThreadSchedulerWorkerPoolImplPool);
139 };
140
141 SchedulerSingleThreadWorkerPoolManager::SchedulerSingleThreadWorkerPoolManager(
142 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
143 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
144 worker_pool_index_for_traits_callback,
145 TaskTracker* task_tracker,
146 DelayedTaskManager* delayed_task_manager)
147 : worker_pool_params_vector_(worker_pool_params_vector),
148 worker_pool_index_for_traits_callback_(
149 worker_pool_index_for_traits_callback) {
150 size_t worker_pool_count = worker_pool_params_vector_.size();
151 worker_pool_impl_pools_.resize(worker_pool_count);
152 for (size_t i = 0; i < worker_pool_count; i++) {
153 worker_pool_impl_pools_[i] =
154 MakeUnique<SingleThreadSchedulerWorkerPoolImplPool>(
155 worker_pool_params_vector[i], task_tracker, delayed_task_manager);
156 }
157 }
158
159 SchedulerSingleThreadWorkerPoolManager::
160 ~SchedulerSingleThreadWorkerPoolManager() = default;
161
162 scoped_refptr<SingleThreadTaskRunner>
163 SchedulerSingleThreadWorkerPoolManager::CreateSingleThreadTaskRunnerWithTraits(
164 const TaskTraits& traits) {
165 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
166 SingleThreadSchedulerWorkerPoolImplPool* worker_pool_impl_pool;
167 {
168 AutoSchedulerLock auto_lock(lock_);
fdoray 2017/01/27 16:47:34 No lock required. |worker_pool_impl_pools_| is onl
robliao 2017/01/27 21:25:41 While it is an error to call JoinForTesting() or W
169 worker_pool_impl_pool = worker_pool_impl_pools_[index].get();
170 }
171 return worker_pool_impl_pool->CreateSingleThreadTaskRunnerWithTraits(traits);
172 }
173
174 void SchedulerSingleThreadWorkerPoolManager::JoinForTesting() {
175 WaitForAllWorkersIdleForTesting();
176 // SingleThreadSchedulerWorkerPoolImplPool::JoinForTesting() acquires a
177 // SchedulerLock, so we move |worker_pool_impl_pools| locally, request the
178 // join, and move it back once we're done.
179 decltype(worker_pool_impl_pools_) local;
180 {
181 AutoSchedulerLock auto_lock(lock_);
fdoray 2017/01/27 16:47:34 No lock / moving to a local variable required. |wo
robliao 2017/01/27 21:25:40 The following line changes worker_pool_impl_pools_
182 local = std::move(worker_pool_impl_pools_);
183 }
184 for (const auto& worker_pool_impl_pool : local)
185 worker_pool_impl_pool->JoinForTesting();
186 {
187 AutoSchedulerLock auto_lock(lock_);
188 DCHECK(worker_pool_impl_pools_.empty());
189 worker_pool_impl_pools_ = std::move(local);
190 }
191 }
192
193 void SchedulerSingleThreadWorkerPoolManager::WaitForAllWorkersIdleForTesting() {
194 // SingleThreadSchedulerWorkerPoolImplPool::WaitForAllWorkersIdleForTesting()
195 // acquires a SchedulerLock, so we move |worker_pool_impl_pools| locally,
196 // request the wait, and move it back once we're done.
197 decltype(worker_pool_impl_pools_) local;
198 {
199 AutoSchedulerLock auto_lock(lock_);
fdoray 2017/01/27 16:47:35 No lock / moving to a local variable required. |wo
robliao 2017/01/27 21:25:40 See above.
200 local = std::move(worker_pool_impl_pools_);
201 }
202 for (const auto& worker_pool_impl_pool : local)
203 worker_pool_impl_pool->WaitForAllWorkersIdleForTesting();
204 {
205 AutoSchedulerLock auto_lock(lock_);
206 DCHECK(worker_pool_impl_pools_.empty());
207 worker_pool_impl_pools_ = std::move(local);
208 }
209 }
210
211 } // namespace internal
212 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698