Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(860)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h"
25
26 namespace base {
27 namespace internal {
28
29 namespace {
30
31 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker {
gab 2017/02/23 21:52:10 Don't think this warrants a separate class.
robliao 2017/02/24 18:33:59 The joy of lock-free algorithms is that they're ea
34 public:
35 AtomicThreadRefChecker() = default;
36 ~AtomicThreadRefChecker() = default;
37
38 void Set() {
39 thread_ref_ = PlatformThread::CurrentRef();
40 is_set_.Set();
41 }
42
43 bool IsCurrentThreadSameAsSetThread() {
44 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
45 }
46
47 private:
48 AtomicFlag is_set_;
49 PlatformThreadRef thread_ref_;
50 };
51
52 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
53 public:
54 SchedulerWorkerDelegate(const std::string& thread_name)
55 : thread_name_(thread_name) {}
56
57 // SchedulerWorker::Delegate:
58 void OnMainEntry(SchedulerWorker* worker) override {
59 thread_ref_checker_.Set();
60 PlatformThread::SetName(thread_name_);
61 }
62
63 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
64 AutoSchedulerLock auto_lock(sequence_lock_);
65 return std::move(sequence_);
66 }
67
68 void DidRunTask() override {}
69
70 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
71 AutoSchedulerLock auto_lock(sequence_lock_);
72 DCHECK(!sequence_);
73 sequence_ = std::move(sequence);
74 }
75
76 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
77
78 bool CanDetach(SchedulerWorker* worker) override { return false; }
79
80 void OnDetach() override { NOTREACHED(); }
81
82 bool RunsTasksOnCurrentThread() {
83 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
84 }
85
86 private:
87 const std::string thread_name_;
88
89 // Synchronizes access to |sequence_| and handles the fact that
90 // ReEnqueueSequence() is called on both the worker thread for reenqueuing
91 // the sequence and off of the worker thread to seed the sequence for
92 // GetWork().
93 SchedulerLock sequence_lock_;
94 scoped_refptr<Sequence> sequence_;
95
96 AtomicThreadRefChecker thread_ref_checker_;
97
98 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
99 };
100
101 } // namespace
102
103 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
104 : public SingleThreadTaskRunner {
105 public:
106 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
107 // lifetime of a dedicated |worker| for |traits|.
108 SchedulerSingleThreadTaskRunner(
109 SchedulerSingleThreadTaskRunnerManager* const outer,
110 const TaskTraits& traits,
111 SchedulerWorker* worker)
112 : outer_(outer), traits_(traits), worker_(worker) {
113 DCHECK(outer_);
114 DCHECK(worker_);
115 }
116
117 // SingleThreadTaskRunner:
118 bool PostDelayedTask(const tracked_objects::Location& from_here,
119 const Closure& closure,
120 TimeDelta delay) override;
121
122 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
123 const Closure& closure,
124 base::TimeDelta delay) override {
125 // Tasks are never nested within the task scheduler.
126 return PostDelayedTask(from_here, closure, delay);
127 }
128
129 bool RunsTasksOnCurrentThread() const override {
130 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
131 return delegate->RunsTasksOnCurrentThread();
132 }
133
134 private:
135 ~SchedulerSingleThreadTaskRunner() override {
136 outer_->UnregisterSchedulerWorker(worker_);
137 }
138
139 void PostTaskNow(std::unique_ptr<Task> task);
140
141 // Sequence for all Tasks posted through this TaskRunner.
142 const scoped_refptr<Sequence> sequence_ = new Sequence;
143
144 SchedulerSingleThreadTaskRunnerManager* const outer_;
145 const TaskTraits traits_;
146 SchedulerWorker* const worker_;
147
148 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
149 };
150
151 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
152 PostDelayedTask(const tracked_objects::Location& from_here,
153 const Closure& closure,
154 TimeDelta delay) {
155 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
156 task->single_thread_task_runner_ref = this;
157
158 if (!outer_->task_tracker_->WillPostTask(task.get()))
159 return false;
160
161 if (task->delayed_run_time.is_null()) {
162 PostTaskNow(std::move(task));
163 } else {
164 outer_->delayed_task_manager_->AddDelayedTask(
165 std::move(task),
166 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
167 }
168 return true;
169 }
170
171 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
172 PostTaskNow(std::unique_ptr<Task> task) {
173 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
174 if (sequence_was_empty) {
175 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
176 delegate->ReEnqueueSequence(sequence_);
177 worker_->WakeUp();
178 }
179 }
180
181 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
182 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
183 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
184 worker_pool_index_for_traits_callback,
185 TaskTracker* task_tracker,
186 DelayedTaskManager* delayed_task_manager)
187 : worker_pool_params_vector_(worker_pool_params_vector),
188 worker_pool_index_for_traits_callback_(
189 worker_pool_index_for_traits_callback),
190 task_tracker_(task_tracker),
191 delayed_task_manager_(delayed_task_manager) {
192 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
193 DCHECK(worker_pool_index_for_traits_callback_);
194 DCHECK(task_tracker_);
195 DCHECK(delayed_task_manager_);
196 }
197
198 SchedulerSingleThreadTaskRunnerManager::
199 ~SchedulerSingleThreadTaskRunnerManager() {
200 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive "
201 "SchedulerSingleThreadTaskRunnerManager "
202 "otherwise the outstanding "
203 "SchedulerSingleThreadTaskRunners will trigger "
204 "crashes.";
205 }
206
207 scoped_refptr<SingleThreadTaskRunner>
208 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
209 const TaskTraits& traits) {
210 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
211 DCHECK_LT(index, worker_pool_params_vector_.size());
212 return new SchedulerSingleThreadTaskRunner(
213 this, traits,
214 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
215 }
216
217 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
218 decltype(workers_) local_workers;
219 {
220 AutoSchedulerLock auto_lock(workers_lock_);
221 local_workers = std::move(workers_);
222 }
223
224 for (const auto& worker : local_workers)
225 worker->JoinForTesting();
226
227 {
228 AutoSchedulerLock auto_lock(workers_lock_);
229 DCHECK(workers_.empty())
230 << "New worker(s) unexpectedly registered during join.";
231 workers_ = std::move(local_workers);
232 }
233 }
234
235 SchedulerWorker*
236 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
237 const SchedulerWorkerPoolParams& params) {
238 AutoSchedulerLock auto_lock(workers_lock_);
239 int id = next_worker_id_++;
240 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
241 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
242 workers_.emplace_back(SchedulerWorker::Create(
243 params.priority_hint(), std::move(delegate), task_tracker_,
244 SchedulerWorker::InitialState::DETACHED));
245 return workers_.back().get();
246 }
247
248 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
249 SchedulerWorker* worker) {
250 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
251 // |workers_lock_|.
252 scoped_refptr<SchedulerWorker> worker_to_destroy;
253 {
254 AutoSchedulerLock auto_lock(workers_lock_);
255
256 // We might be joining, so no-op this if |workers_| is empty.
257 if (workers_.empty())
258 return;
259
260 auto worker_iter =
261 std::find_if(workers_.begin(), workers_.end(),
262 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
263 return candidate.get() == worker;
264 });
265 DCHECK(worker_iter != workers_.end());
266 worker_to_destroy = std::move(*worker_iter);
267 workers_.erase(worker_iter);
268 }
269 worker_to_destroy->Cleanup();
270 }
271
272 } // namespace internal
273 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698