Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(869)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2686593003: DESIGN DISCUSSION ONLY Task Scheduler Single Thread Task Runner Manager for Dedicated Threads per S… (Closed)
Patch Set: Wait for Detached Thread to Complete Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/task_scheduler/delayed_task_manager.h"
14 #include "base/task_scheduler/scheduler_worker.h"
15 #include "base/task_scheduler/sequence.h"
16 #include "base/task_scheduler/task.h"
17 #include "base/task_scheduler/task_tracker.h"
18 #include "base/task_scheduler/task_traits.h"
19 #include "base/threading/platform_thread.h"
20 #include "base/time/time.h"
21
22 namespace base {
23 namespace internal {
24
25 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
26 : public SingleThreadTaskRunner {
27 public:
28 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
29 // tasks so long as |worker_pool| and |worker| are alive.
fdoray 2017/02/08 17:59:01 Update comment (no more |worker_pool|).
robliao 2017/02/10 00:04:04 Done.
30 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
31 // and |worker|.
32 SchedulerSingleThreadTaskRunner(
33 SchedulerSingleThreadTaskRunnerManager* const outer,
34 const TaskTraits& traits,
35 SchedulerWorker* worker)
36 : outer_(outer), traits_(traits), worker_(worker) {
37 DCHECK(outer_);
38 DCHECK(worker_);
39 }
40
41 // SingleThreadTaskRunner:
42 bool PostDelayedTask(const tracked_objects::Location& from_here,
43 const Closure& closure,
44 TimeDelta delay) override;
45
46 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
47 const Closure& closure,
48 base::TimeDelta delay) override {
49 // Tasks are never nested within the task scheduler.
50 return PostDelayedTask(from_here, closure, delay);
51 }
52
53 bool RunsTasksOnCurrentThread() const override {
54 // Even though this is a SingleThreadTaskRunner, test the actual sequence
55 // instead of the assigned worker so that another task randomly assigned
56 // to the same worker doesn't return true by happenstance.
57 return sequence_->token() == SequenceToken::GetForCurrentThread();
fdoray 2017/02/08 17:59:01 "another task randomly assigned to the same worker
robliao 2017/02/10 00:04:04 Done.
58 }
59
60 private:
61 ~SchedulerSingleThreadTaskRunner() override {
62 outer_->UnregisterSchedulerWorker(worker_);
63 }
64
65 void PostTaskNow(std::unique_ptr<Task> task);
66
67 // Sequence for all Tasks posted through this TaskRunner.
68 const scoped_refptr<Sequence> sequence_ = new Sequence;
69
70 SchedulerSingleThreadTaskRunnerManager* const outer_;
71 const TaskTraits traits_;
72 SchedulerWorker* const worker_;
73
74 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
75 };
76
77 class SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate
78 : public SchedulerWorker::Delegate {
79 public:
80 SchedulerWorkerDelegate(SchedulerSingleThreadTaskRunnerManager* const outer,
81 const std::string& thread_name)
82 : outer_(outer), thread_name_(thread_name) {}
83
84 // SchedulerWorker::Delegate:
85 void OnMainEntry(SchedulerWorker* worker) override {
86 PlatformThread::SetName(thread_name_);
87 }
88
89 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
90 if (!sequence_)
91 outer_->IncrementIdleWorkerCount();
92
93 return std::move(sequence_);
94 }
95
96 void DidRunTask() override {}
97
98 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
99 AutoSchedulerLock auto_lock(lock_);
100 DCHECK(!sequence_);
101 sequence_ = std::move(sequence);
102 }
103
104 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
105
106 bool CanDetach(SchedulerWorker* worker) override {
107 AutoSchedulerLock auto_lock(can_detach_lock_);
108 return can_detach_;
109 }
110
111 void OnDetach() override {}
112
113 void AllowDetach() {
114 AutoSchedulerLock auto_lock(can_detach_lock_);
115 can_detach_ = true;
116 }
117
118 private:
119 SchedulerSingleThreadTaskRunnerManager* const outer_;
120 const std::string thread_name_;
121
122 SchedulerLock lock_;
fdoray 2017/02/08 17:59:01 // Contains a reference to the Sequence from which
robliao 2017/02/10 00:04:04 Yep. I'll be setting these comments in the next CL
123 scoped_refptr<Sequence> sequence_;
124
125 SchedulerLock can_detach_lock_;
fdoray 2017/02/08 17:59:01 Replace bool+lock with AtomicFlag.
robliao 2017/02/10 00:04:04 AtomicFlag has a sequence checker that doesn't qui
126 bool can_detach_ = false;
127
128 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
129 };
130
131 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
132 PostDelayedTask(const tracked_objects::Location& from_here,
133 const Closure& closure,
134 TimeDelta delay) {
135 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
fdoray 2017/02/08 17:59:01 MakeUnique
robliao 2017/02/10 00:04:04 Done.
136 task->single_thread_task_runner_ref = this;
137
138 if (!outer_->task_tracker_->WillPostTask(task.get()))
139 return false;
140
141 if (task->delayed_run_time.is_null()) {
142 PostTaskNow(std::move(task));
143 } else {
144 outer_->delayed_task_manager_->AddDelayedTask(
145 std::move(task),
146 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
147 }
148 return true;
149 }
150
151 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
152 PostTaskNow(std::unique_ptr<Task> task) {
153 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
154 if (sequence_was_empty) {
155 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
156 delegate->ReEnqueueSequence(sequence_);
157 outer_->DecrementIdleWorkerCount();
158 worker_->WakeUp();
159 }
160 }
161
162 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
163 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
164 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
165 worker_pool_index_for_traits_callback,
166 TaskTracker* task_tracker,
167 DelayedTaskManager* delayed_task_manager)
168 : worker_pool_params_vector_(worker_pool_params_vector),
169 worker_pool_index_for_traits_callback_(
170 worker_pool_index_for_traits_callback),
171 task_tracker_(task_tracker),
172 delayed_task_manager_(delayed_task_manager),
173 lock_(),
174 idle_workers_cv_for_testing_(lock_.CreateConditionVariable()) {
175 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
176 DCHECK(worker_pool_index_for_traits_callback_);
177 DCHECK(task_tracker_);
178 DCHECK(delayed_task_manager_);
179 }
180
181 SchedulerSingleThreadTaskRunnerManager::
182 ~SchedulerSingleThreadTaskRunnerManager() = default;
183
184 scoped_refptr<SingleThreadTaskRunner>
185 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
186 const TaskTraits& traits) {
187 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
188 DCHECK_LT(index, worker_pool_params_vector_.size());
189 return new SchedulerSingleThreadTaskRunner(
190 this, traits,
191 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
192 }
193
194 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
195 decltype(workers_) local_workers;
196 {
197 AutoSchedulerLock auto_lock(lock_);
198 local_workers = std::move(workers_);
199 }
200 for (const auto& worker : local_workers)
201 worker->JoinForTesting();
202 {
203 AutoSchedulerLock auto_lock(lock_);
fdoray 2017/02/08 17:59:01 DCHECK(workers_.empty());
robliao 2017/02/10 00:04:04 Done.
204 workers_ = std::move(local_workers);
205 }
206 }
207
208 void SchedulerSingleThreadTaskRunnerManager::WaitForAllWorkersIdleForTesting() {
209 AutoSchedulerLock auto_lock(lock_);
210 while (static_cast<size_t>(subtle::NoBarrier_Load(&idle_workers_)) <
211 workers_.size()) {
212 idle_workers_cv_for_testing_->Wait();
213 }
214 }
215
216 SchedulerWorker*
217 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
218 const SchedulerWorkerPoolParams& params) {
219 AutoSchedulerLock auto_lock(lock_);
220 int count = workers_.size();
221 auto delegate = MakeUnique<SchedulerWorkerDelegate>(
222 this, base::StringPrintf("TaskSchedulerSingleThreadWorker%d%s", count,
223 params.name().c_str()));
224 workers_.emplace_back(SchedulerWorker::Create(
225 params.priority_hint(), std::move(delegate), task_tracker_,
226 SchedulerWorker::InitialState::DETACHED));
227 // This avoids calling IncrementIdleWorkerCount as we're adding a worker but
228 // we don't want to signal to any CVs that we're done.
229 subtle::NoBarrier_AtomicIncrement(&idle_workers_, 1);
230 return workers_.back().get();
231 }
232
233 void SchedulerSingleThreadTaskRunnerManager::IncrementIdleWorkerCount() {
234 subtle::Atomic32 idle_workers =
235 subtle::NoBarrier_AtomicIncrement(&idle_workers_, 1);
236 AutoSchedulerLock auto_lock(lock_);
237 if (static_cast<size_t>(idle_workers) == workers_.size())
238 idle_workers_cv_for_testing_->Broadcast();
239 }
240
241 void SchedulerSingleThreadTaskRunnerManager::DecrementIdleWorkerCount() {
242 subtle::NoBarrier_AtomicIncrement(&idle_workers_, -1);
243 }
244
245 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
246 SchedulerWorker* worker) {
247 std::unique_ptr<SchedulerWorker> worker_to_destroy;
248 {
249 AutoSchedulerLock auto_lock(lock_);
250 auto worker_iter = std::find_if(
251 workers_.begin(), workers_.end(),
252 [worker](const std::unique_ptr<SchedulerWorker>& candidate) {
253 return candidate.get() == worker;
254 });
255 DCHECK(worker_iter != workers_.end());
256 worker_to_destroy = std::move(*worker_iter);
257 workers_.erase(worker_iter);
fdoray 2017/02/08 17:59:01 If we erase a worker in the middle of the vector,
robliao 2017/02/10 00:04:04 That sounds fine to me. We'll have a monotonically
258 DecrementIdleWorkerCount();
259 }
260 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker->delegate());
261 delegate->AllowDetach();
262 SchedulerWorker::DestroyAfterDetachment(std::move(worker_to_destroy));
263 }
264
265 } // namespace internal
266 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698