Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(300)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h"
25
26 namespace base {
27 namespace internal {
28
29 namespace {
30
31 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker {
34 public:
35 AtomicThreadRefChecker() = default;
36 ~AtomicThreadRefChecker() = default;
37
38 void Set() {
39 thread_ref_ = PlatformThread::CurrentRef();
40 is_set_.Set();
41 }
42
43 bool IsCurrentThreadSameAsSetThread() {
44 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
45 }
46
47 private:
48 AtomicFlag is_set_;
49 PlatformThreadRef thread_ref_;
50
51 DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker);
52 };
53
54 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
55 public:
56 SchedulerWorkerDelegate(const std::string& thread_name)
57 : thread_name_(thread_name) {}
58
59 // SchedulerWorker::Delegate:
60 void OnMainEntry(SchedulerWorker* worker) override {
61 thread_ref_checker_.Set();
62 PlatformThread::SetName(thread_name_);
63 }
64
65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
66 AutoSchedulerLock auto_lock(sequence_lock_);
67 return std::move(sequence_);
68 }
69
70 void DidRunTask() override {}
71
72 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
73 AutoSchedulerLock auto_lock(sequence_lock_);
74 DCHECK(!sequence_);
75 sequence_ = std::move(sequence);
76 }
77
78 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
79
80 bool CanDetach(SchedulerWorker* worker) override { return false; }
81
82 void OnDetach() override { NOTREACHED(); }
83
84 bool RunsTasksOnCurrentThread() {
85 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
86 }
87
88 private:
89 const std::string thread_name_;
90
91 // Synchronizes access to |sequence_| and handles the fact that
92 // ReEnqueueSequence() is called on both the worker thread for reenqueuing
93 // the sequence and off of the worker thread to seed the sequence for
94 // GetWork().
95 SchedulerLock sequence_lock_;
96 scoped_refptr<Sequence> sequence_;
97
98 AtomicThreadRefChecker thread_ref_checker_;
99
100 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
101 };
102
103 } // namespace
104
105 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
106 : public SingleThreadTaskRunner {
107 public:
108 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
109 // lifetime of a dedicated |worker| for |traits|.
110 SchedulerSingleThreadTaskRunner(
111 SchedulerSingleThreadTaskRunnerManager* const outer,
112 const TaskTraits& traits,
113 SchedulerWorker* worker)
114 : outer_(outer), traits_(traits), worker_(worker) {
115 DCHECK(outer_);
116 DCHECK(worker_);
117 }
118
119 // SingleThreadTaskRunner:
120 bool PostDelayedTask(const tracked_objects::Location& from_here,
121 const Closure& closure,
122 TimeDelta delay) override;
123
124 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
125 const Closure& closure,
126 base::TimeDelta delay) override {
127 // Tasks are never nested within the task scheduler.
128 return PostDelayedTask(from_here, closure, delay);
129 }
130
131 bool RunsTasksOnCurrentThread() const override {
132 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
133 return delegate->RunsTasksOnCurrentThread();
134 }
135
136 private:
137 ~SchedulerSingleThreadTaskRunner() override {
138 outer_->UnregisterSchedulerWorker(worker_);
139 }
140
141 void PostTaskNow(std::unique_ptr<Task> task);
142
143 // Sequence for all Tasks posted through this TaskRunner.
144 const scoped_refptr<Sequence> sequence_ = new Sequence;
145
146 SchedulerSingleThreadTaskRunnerManager* const outer_;
147 const TaskTraits traits_;
148 SchedulerWorker* const worker_;
149
150 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
151 };
152
153 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
154 PostDelayedTask(const tracked_objects::Location& from_here,
155 const Closure& closure,
156 TimeDelta delay) {
157 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
158 task->single_thread_task_runner_ref = this;
159
160 if (!outer_->task_tracker_->WillPostTask(task.get()))
161 return false;
162
163 if (task->delayed_run_time.is_null()) {
164 PostTaskNow(std::move(task));
165 } else {
166 outer_->delayed_task_manager_->AddDelayedTask(
167 std::move(task),
168 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
169 }
170 return true;
171 }
172
173 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
174 PostTaskNow(std::unique_ptr<Task> task) {
175 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
176 if (sequence_was_empty) {
177 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
178 delegate->ReEnqueueSequence(sequence_);
179 worker_->WakeUp();
180 }
181 }
182
183 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
184 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
185 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
186 worker_pool_index_for_traits_callback,
187 TaskTracker* task_tracker,
188 DelayedTaskManager* delayed_task_manager)
189 : worker_pool_params_vector_(worker_pool_params_vector),
190 worker_pool_index_for_traits_callback_(
191 worker_pool_index_for_traits_callback),
192 task_tracker_(task_tracker),
193 delayed_task_manager_(delayed_task_manager) {
194 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
195 DCHECK(worker_pool_index_for_traits_callback_);
196 DCHECK(task_tracker_);
197 DCHECK(delayed_task_manager_);
198 }
199
200 SchedulerSingleThreadTaskRunnerManager::
201 ~SchedulerSingleThreadTaskRunnerManager() {
202 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive "
203 "SchedulerSingleThreadTaskRunnerManager";
204 }
205
206 scoped_refptr<SingleThreadTaskRunner>
207 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
208 const TaskTraits& traits) {
209 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
210 DCHECK_LT(index, worker_pool_params_vector_.size());
211 return new SchedulerSingleThreadTaskRunner(
212 this, traits,
213 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
214 }
215
216 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
217 decltype(workers_) local_workers;
218 {
219 AutoSchedulerLock auto_lock(workers_lock_);
220 local_workers = std::move(workers_);
221 }
222
223 for (const auto& worker : local_workers)
224 worker->JoinForTesting();
225
226 {
227 AutoSchedulerLock auto_lock(workers_lock_);
228 DCHECK(workers_.empty())
229 << "New worker(s) unexpectedly registered during join.";
230 workers_ = std::move(local_workers);
231 }
232 }
233
234 SchedulerWorker*
235 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
236 const SchedulerWorkerPoolParams& params) {
237 AutoSchedulerLock auto_lock(workers_lock_);
238 int id = next_worker_id_++;
239 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
240 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
241 workers_.emplace_back(SchedulerWorker::Create(
242 params.priority_hint(), std::move(delegate), task_tracker_,
243 SchedulerWorker::InitialState::DETACHED));
244 return workers_.back().get();
245 }
246
247 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
248 SchedulerWorker* worker) {
249 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
250 // |workers_lock_|.
251 scoped_refptr<SchedulerWorker> worker_to_destroy;
252 {
253 AutoSchedulerLock auto_lock(workers_lock_);
254
255 // We might be joining, so no-op this if |workers_| is empty.
256 if (workers_.empty())
257 return;
258
259 auto worker_iter =
260 std::find_if(workers_.begin(), workers_.end(),
261 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
262 return candidate.get() == worker;
263 });
264 DCHECK(worker_iter != workers_.end());
265 worker_to_destroy = std::move(*worker_iter);
266 workers_.erase(worker_iter);
267 }
268 worker_to_destroy->Cleanup();
269 }
270
271 } // namespace internal
272 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698