Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1034)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h"
25
26 namespace base {
27 namespace internal {
28
29 namespace {
30
31 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
32 public:
33 SchedulerWorkerDelegate(const std::string& thread_name)
34 : thread_name_(thread_name) {}
35
36 // SchedulerWorker::Delegate:
37 void OnMainEntry(SchedulerWorker* worker) override {
38 {
39 AutoSchedulerLock auto_lock(thread_ref_lock_);
40 thread_ref_ = PlatformThread::CurrentRef();
41 }
42 PlatformThread::SetName(thread_name_);
43 }
44
45 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
46 AutoSchedulerLock auto_lock(sequence_lock_);
47 return std::move(sequence_);
48 }
49
50 void DidRunTask() override {}
51
52 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
53 AutoSchedulerLock auto_lock(sequence_lock_);
54 DCHECK(!sequence_);
55 sequence_ = std::move(sequence);
56 }
57
58 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
59
60 bool CanDetach(SchedulerWorker* worker) override {
61 return can_detach_.IsSet();
62 }
63
64 void OnDetach() override {
65 AutoSchedulerLock auto_lock(thread_ref_lock_);
66 thread_ref_ = PlatformThreadRef();
gab 2017/02/23 19:48:10 Actually, shouldn't we NOTREACHED() here? It doesn
robliao 2017/02/23 21:26:06 Indeed. You found a vestige of the std::unique_ptr
67 }
68
69 // SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate:
70 void AllowDetach() { can_detach_.Set(); }
71
72 bool RunsTasksOnCurrentThread() {
73 AutoSchedulerLock auto_lock(thread_ref_lock_);
74 return thread_ref_ == PlatformThread::CurrentRef();
75 }
76
77 private:
78 const std::string thread_name_;
79
80 // Synchronizes access to |sequence_| and handles the fact that
81 // ReEnqueueSequence() is called on both the worker thread for reenqueuing
82 // the sequence and off of the worker thread to seed the sequence for
83 // GetWork().
84 SchedulerLock sequence_lock_;
85 scoped_refptr<Sequence> sequence_;
86
87 AtomicFlag can_detach_;
88
89 // Synchronizes access to |thread_ref_|.
90 SchedulerLock thread_ref_lock_;
gab 2017/02/23 19:48:10 Meh, I don't like locking for this. But, also, I
robliao 2017/02/23 21:26:06 Done via AtomicFlag instead of WaitableEvent. (Wai
91 PlatformThreadRef thread_ref_;
92
93 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
94 };
95
96 } // namespace
97
98 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
99 : public SingleThreadTaskRunner {
100 public:
101 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
102 // lifetime of a dedicated |worker| for |traits|.
103 SchedulerSingleThreadTaskRunner(
104 SchedulerSingleThreadTaskRunnerManager* const outer,
105 const TaskTraits& traits,
106 SchedulerWorker* worker)
107 : outer_(outer), traits_(traits), worker_(worker) {
108 DCHECK(outer_);
109 DCHECK(worker_);
110 }
111
112 // SingleThreadTaskRunner:
113 bool PostDelayedTask(const tracked_objects::Location& from_here,
114 const Closure& closure,
115 TimeDelta delay) override;
116
117 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
118 const Closure& closure,
119 base::TimeDelta delay) override {
120 // Tasks are never nested within the task scheduler.
121 return PostDelayedTask(from_here, closure, delay);
122 }
123
124 bool RunsTasksOnCurrentThread() const override {
125 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
126 return delegate->RunsTasksOnCurrentThread();
127 }
128
129 private:
130 ~SchedulerSingleThreadTaskRunner() override {
131 outer_->UnregisterSchedulerWorker(worker_);
132 }
133
134 void PostTaskNow(std::unique_ptr<Task> task);
135
136 // Sequence for all Tasks posted through this TaskRunner.
137 const scoped_refptr<Sequence> sequence_ = new Sequence;
138
139 SchedulerSingleThreadTaskRunnerManager* const outer_;
140 const TaskTraits traits_;
141 SchedulerWorker* const worker_;
142
143 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
144 };
145
146 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
147 PostDelayedTask(const tracked_objects::Location& from_here,
148 const Closure& closure,
149 TimeDelta delay) {
150 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
151 task->single_thread_task_runner_ref = this;
152
153 if (!outer_->task_tracker_->WillPostTask(task.get()))
154 return false;
155
156 if (task->delayed_run_time.is_null()) {
157 PostTaskNow(std::move(task));
158 } else {
159 outer_->delayed_task_manager_->AddDelayedTask(
160 std::move(task),
161 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
162 }
163 return true;
164 }
165
166 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
167 PostTaskNow(std::unique_ptr<Task> task) {
168 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
169 if (sequence_was_empty) {
170 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
171 delegate->ReEnqueueSequence(sequence_);
172 worker_->WakeUp();
173 }
174 }
175
176 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
177 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
178 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
179 worker_pool_index_for_traits_callback,
180 TaskTracker* task_tracker,
181 DelayedTaskManager* delayed_task_manager)
182 : worker_pool_params_vector_(worker_pool_params_vector),
183 worker_pool_index_for_traits_callback_(
184 worker_pool_index_for_traits_callback),
185 task_tracker_(task_tracker),
186 delayed_task_manager_(delayed_task_manager) {
187 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
188 DCHECK(worker_pool_index_for_traits_callback_);
189 DCHECK(task_tracker_);
190 DCHECK(delayed_task_manager_);
191 }
192
193 SchedulerSingleThreadTaskRunnerManager::
194 ~SchedulerSingleThreadTaskRunnerManager() {
195 DCHECK(workers_.empty()) << "Deleting SchedulerSingleThreadTaskRunnerManager "
196 "when there are still active "
197 "SingleThreadTaskRunners. Future calls to those "
198 "SingleThreadTaskRunners may crash.";
gab 2017/02/23 19:48:10 "may crash" is too loose IMO, suggest: "Scheduler
robliao 2017/02/23 21:26:06 How about SchedulerSingleThreadTaskRunners must ou
gab 2017/02/23 21:52:10 I think "otherwise the outstanding SchedulerSingle
robliao 2017/02/24 18:33:59 Done.
199 }
200
201 scoped_refptr<SingleThreadTaskRunner>
202 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
203 const TaskTraits& traits) {
204 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
205 DCHECK_LT(index, worker_pool_params_vector_.size());
206 return new SchedulerSingleThreadTaskRunner(
207 this, traits,
208 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
209 }
210
211 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
212 decltype(workers_) local_workers;
213 {
214 AutoSchedulerLock auto_lock(workers_lock_);
215 local_workers = std::move(workers_);
216 }
217
218 for (const auto& worker : local_workers)
219 worker->JoinForTesting();
220
221 {
222 AutoSchedulerLock auto_lock(workers_lock_);
223 DCHECK(workers_.empty())
224 << "New worker(s) unexpectedly registered during join.";
225 workers_ = std::move(local_workers);
226 }
227 }
228
229 SchedulerWorker*
230 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
231 const SchedulerWorkerPoolParams& params) {
232 AutoSchedulerLock auto_lock(workers_lock_);
233 int id = next_worker_id_++;
234 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
235 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
236 workers_.emplace_back(SchedulerWorker::Create(
237 params.priority_hint(), std::move(delegate), task_tracker_,
238 SchedulerWorker::InitialState::DETACHED));
239 return workers_.back().get();
240 }
241
242 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
243 SchedulerWorker* worker) {
244 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
245 // |workers_lock_|.
246 scoped_refptr<SchedulerWorker> worker_to_destroy;
247 {
248 AutoSchedulerLock auto_lock(workers_lock_);
249
250 // We might be joining, so no-op this if |workers_| is empty.
251 if (workers_.empty())
252 return;
253
254 auto worker_iter =
255 std::find_if(workers_.begin(), workers_.end(),
256 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
257 return candidate.get() == worker;
258 });
259 DCHECK(worker_iter != workers_.end());
260 worker_to_destroy = std::move(*worker_iter);
261 workers_.erase(worker_iter);
262 }
263 worker_to_destroy->Cleanup();
264 }
265
266 } // namespace internal
267 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698