Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(18)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/strings/stringprintf.h"
15 #include "base/synchronization/atomic_flag.h"
16 #include "base/task_scheduler/delayed_task_manager.h"
17 #include "base/task_scheduler/scheduler_worker.h"
18 #include "base/task_scheduler/sequence.h"
19 #include "base/task_scheduler/task.h"
20 #include "base/task_scheduler/task_tracker.h"
21 #include "base/task_scheduler/task_traits.h"
22 #include "base/threading/platform_thread.h"
23 #include "base/time/time.h"
24
25 namespace base {
26 namespace internal {
27
28 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
29 : public SingleThreadTaskRunner {
gab 2017/02/21 21:39:50 Cleanup up of previous one will be in a follow-up
robliao 2017/02/22 01:04:18 Added a TODO and a new bug instead of the CL descr
30 public:
31 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
32 // lifetime of a dedicated |worker| for |traits|.
33 SchedulerSingleThreadTaskRunner(
34 SchedulerSingleThreadTaskRunnerManager* const outer,
35 const TaskTraits& traits,
36 SchedulerWorker* worker)
37 : outer_(outer), traits_(traits), worker_(worker) {
38 DCHECK(outer_);
39 DCHECK(worker_);
40 }
41
42 // SingleThreadTaskRunner:
43 bool PostDelayedTask(const tracked_objects::Location& from_here,
44 const Closure& closure,
45 TimeDelta delay) override;
46
47 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
48 const Closure& closure,
49 base::TimeDelta delay) override {
50 // Tasks are never nested within the task scheduler.
51 return PostDelayedTask(from_here, closure, delay);
52 }
53
54 bool RunsTasksOnCurrentThread() const override {
55 return sequence_->token() == SequenceToken::GetForCurrentThread();
56 }
57
58 private:
59 ~SchedulerSingleThreadTaskRunner() override {
60 outer_->UnregisterSchedulerWorker(worker_);
61 }
62
63 void PostTaskNow(std::unique_ptr<Task> task);
64
65 // Sequence for all Tasks posted through this TaskRunner.
66 const scoped_refptr<Sequence> sequence_ = new Sequence;
67
68 SchedulerSingleThreadTaskRunnerManager* const outer_;
69 const TaskTraits traits_;
70 SchedulerWorker* const worker_;
71
72 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
73 };
74
75 class SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate
76 : public SchedulerWorker::Delegate {
77 public:
78 SchedulerWorkerDelegate(const std::string& thread_name)
79 : thread_name_(thread_name) {}
80
81 // SchedulerWorker::Delegate:
82 void OnMainEntry(SchedulerWorker* worker) override {
83 PlatformThread::SetName(thread_name_);
84 }
85
86 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
87 AutoSchedulerLock auto_lock(sequence_lock_);
88 return std::move(sequence_);
89 }
90
91 void DidRunTask() override {}
92
93 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
94 AutoSchedulerLock auto_lock(sequence_lock_);
95 DCHECK(!sequence_);
96 sequence_ = std::move(sequence);
97 }
98
99 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
100
101 bool CanDetach(SchedulerWorker* worker) override {
102 return can_detach_.IsSet();
103 }
104
105 void OnDetach() override {}
106
107 // SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate:
108 void AllowDetach() {
109 can_detach_.Set();
110 }
111
112 private:
113 const std::string thread_name_;
114
115 SchedulerLock sequence_lock_;
gab 2017/02/21 21:39:50 Initially thought this wasn't necessary but then r
robliao 2017/02/22 01:04:18 Done. I emphasized that ReEnqueueSequence() in thi
gab 2017/02/22 18:18:21 I don't think this is unlike the other delegates.
robliao 2017/02/22 21:50:28 We specify in the delegate documentation that all
116 scoped_refptr<Sequence> sequence_;
117
118 AtomicFlag can_detach_;
119
120 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
121 };
122
123 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
124 PostDelayedTask(const tracked_objects::Location& from_here,
125 const Closure& closure,
126 TimeDelta delay) {
127 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
128 task->single_thread_task_runner_ref = this;
129
130 if (!outer_->task_tracker_->WillPostTask(task.get()))
131 return false;
132
133 if (task->delayed_run_time.is_null()) {
134 PostTaskNow(std::move(task));
135 } else {
136 outer_->delayed_task_manager_->AddDelayedTask(
137 std::move(task),
138 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
139 }
140 return true;
141 }
142
143 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
144 PostTaskNow(std::unique_ptr<Task> task) {
145 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
146 if (sequence_was_empty) {
147 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
148 delegate->ReEnqueueSequence(sequence_);
149 worker_->WakeUp();
150 }
151 }
152
153 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
154 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
155 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
156 worker_pool_index_for_traits_callback,
157 TaskTracker* task_tracker,
158 DelayedTaskManager* delayed_task_manager)
159 : worker_pool_params_vector_(worker_pool_params_vector),
160 worker_pool_index_for_traits_callback_(
161 worker_pool_index_for_traits_callback),
162 task_tracker_(task_tracker),
163 delayed_task_manager_(delayed_task_manager) {
164 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
165 DCHECK(worker_pool_index_for_traits_callback_);
166 DCHECK(task_tracker_);
167 DCHECK(delayed_task_manager_);
168 }
169
170 SchedulerSingleThreadTaskRunnerManager::
171 ~SchedulerSingleThreadTaskRunnerManager() = default;
172
173 scoped_refptr<SingleThreadTaskRunner>
174 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
175 const TaskTraits& traits) {
176 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
177 DCHECK_LT(index, worker_pool_params_vector_.size());
178 return new SchedulerSingleThreadTaskRunner(
179 this, traits,
180 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
181 }
182
183 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
184 decltype(workers_) local_workers;
185 {
186 AutoSchedulerLock auto_lock(workers_lock_);
187 local_workers = std::move(workers_);
188 }
189 for (const auto& worker : local_workers)
190 worker->JoinForTesting();
gab 2017/02/21 21:39:50 Empty lines above and below this loop (otherwise i
robliao 2017/02/22 01:04:18 My preference at this point is just to use bracket
191 {
192 AutoSchedulerLock auto_lock(workers_lock_);
193 DCHECK(workers_.empty());
gab 2017/02/21 21:39:50 << "New worker(s) unexpectedly registered during j
robliao 2017/02/22 01:04:18 Done.
194 workers_ = std::move(local_workers);
195 }
196 }
197
198 SchedulerWorker*
199 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
200 const SchedulerWorkerPoolParams& params) {
201 AutoSchedulerLock auto_lock(workers_lock_);
202 int id = next_worker_id_++;
203 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
204 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
205 workers_.emplace_back(SchedulerWorker::Create(
206 params.priority_hint(), std::move(delegate), task_tracker_,
207 SchedulerWorker::InitialState::DETACHED));
208 return workers_.back().get();
209 }
210
211 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
212 SchedulerWorker* worker) {
213 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
214 // |workers_lock_|.
215 scoped_refptr<SchedulerWorker> worker_to_destroy;
216 {
217 AutoSchedulerLock auto_lock(workers_lock_);
218
219 // We might be joining, so no-op this if |workers_| is empty.
220 if (workers_.empty())
221 return;
222
223 auto worker_iter =
224 std::find_if(workers_.begin(), workers_.end(),
225 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
226 return candidate.get() == worker;
227 });
228 DCHECK(worker_iter != workers_.end());
229 worker_to_destroy = std::move(*worker_iter);
230 workers_.erase(worker_iter);
231 }
232 worker_to_destroy->Cleanup();
233 }
234
235 } // namespace internal
236 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698