Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2722113006: Revert of Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h"
25
26 namespace base {
27 namespace internal {
28
29 namespace {
30
31 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker {
34 public:
35 AtomicThreadRefChecker() = default;
36 ~AtomicThreadRefChecker() = default;
37
38 void Set() {
39 thread_ref_ = PlatformThread::CurrentRef();
40 is_set_.Set();
41 }
42
43 bool IsCurrentThreadSameAsSetThread() {
44 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
45 }
46
47 private:
48 AtomicFlag is_set_;
49 PlatformThreadRef thread_ref_;
50
51 DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker);
52 };
53
54 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
55 public:
56 SchedulerWorkerDelegate(const std::string& thread_name)
57 : thread_name_(thread_name) {}
58
59 // SchedulerWorker::Delegate:
60 void OnMainEntry(SchedulerWorker* worker) override {
61 thread_ref_checker_.Set();
62 PlatformThread::SetName(thread_name_);
63 }
64
65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
66 AutoSchedulerLock auto_lock(sequence_lock_);
67 return std::move(sequence_);
68 }
69
70 void DidRunTask() override {}
71
72 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
73 AutoSchedulerLock auto_lock(sequence_lock_);
74 DCHECK(!sequence_);
75 sequence_ = std::move(sequence);
76 }
77
78 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
79
80 bool CanDetach(SchedulerWorker* worker) override { return false; }
81
82 void OnDetach() override { NOTREACHED(); }
83
84 bool RunsTasksOnCurrentThread() {
85 // We check the thread ref instead of the sequence for the benefit of COM
86 // callbacks which may execute without a sequence context.
87 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
88 }
89
90 private:
91 const std::string thread_name_;
92
93 // Synchronizes access to |sequence_| and handles the fact that
94 // ReEnqueueSequence() is called on both the worker thread for reenqueuing
95 // the sequence and off of the worker thread to seed the sequence for
96 // GetWork().
97 SchedulerLock sequence_lock_;
98 scoped_refptr<Sequence> sequence_;
99
100 AtomicThreadRefChecker thread_ref_checker_;
101
102 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
103 };
104
105 } // namespace
106
107 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
108 : public SingleThreadTaskRunner {
109 public:
110 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
111 // lifetime of a dedicated |worker| for |traits|.
112 SchedulerSingleThreadTaskRunner(
113 SchedulerSingleThreadTaskRunnerManager* const outer,
114 const TaskTraits& traits,
115 SchedulerWorker* worker)
116 : outer_(outer), traits_(traits), worker_(worker) {
117 DCHECK(outer_);
118 DCHECK(worker_);
119 }
120
121 // SingleThreadTaskRunner:
122 bool PostDelayedTask(const tracked_objects::Location& from_here,
123 const Closure& closure,
124 TimeDelta delay) override;
125
126 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
127 const Closure& closure,
128 base::TimeDelta delay) override {
129 // Tasks are never nested within the task scheduler.
130 return PostDelayedTask(from_here, closure, delay);
131 }
132
133 bool RunsTasksOnCurrentThread() const override {
134 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
135 return delegate->RunsTasksOnCurrentThread();
136 }
137
138 private:
139 ~SchedulerSingleThreadTaskRunner() override {
140 outer_->UnregisterSchedulerWorker(worker_);
141 }
142
143 void PostTaskNow(std::unique_ptr<Task> task);
144
145 // Sequence for all Tasks posted through this TaskRunner.
146 const scoped_refptr<Sequence> sequence_ = new Sequence;
147
148 SchedulerSingleThreadTaskRunnerManager* const outer_;
149 const TaskTraits traits_;
150 SchedulerWorker* const worker_;
151
152 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
153 };
154
155 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
156 PostDelayedTask(const tracked_objects::Location& from_here,
157 const Closure& closure,
158 TimeDelta delay) {
159 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
160 task->single_thread_task_runner_ref = this;
161
162 if (!outer_->task_tracker_->WillPostTask(task.get()))
163 return false;
164
165 if (task->delayed_run_time.is_null()) {
166 PostTaskNow(std::move(task));
167 } else {
168 outer_->delayed_task_manager_->AddDelayedTask(
169 std::move(task),
170 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
171 }
172 return true;
173 }
174
175 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
176 PostTaskNow(std::unique_ptr<Task> task) {
177 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
178 if (sequence_was_empty) {
179 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
180 delegate->ReEnqueueSequence(sequence_);
181 worker_->WakeUp();
182 }
183 }
184
185 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
186 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
187 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
188 worker_pool_index_for_traits_callback,
189 TaskTracker* task_tracker,
190 DelayedTaskManager* delayed_task_manager)
191 : worker_pool_params_vector_(worker_pool_params_vector),
192 worker_pool_index_for_traits_callback_(
193 worker_pool_index_for_traits_callback),
194 task_tracker_(task_tracker),
195 delayed_task_manager_(delayed_task_manager) {
196 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
197 DCHECK(worker_pool_index_for_traits_callback_);
198 DCHECK(task_tracker_);
199 DCHECK(delayed_task_manager_);
200 }
201
202 SchedulerSingleThreadTaskRunnerManager::
203 ~SchedulerSingleThreadTaskRunnerManager() {
204 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive "
205 "SchedulerSingleThreadTaskRunnerManager";
206 }
207
208 scoped_refptr<SingleThreadTaskRunner>
209 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
210 const TaskTraits& traits) {
211 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
212 DCHECK_LT(index, worker_pool_params_vector_.size());
213 return new SchedulerSingleThreadTaskRunner(
214 this, traits,
215 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
216 }
217
218 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
219 decltype(workers_) local_workers;
220 {
221 AutoSchedulerLock auto_lock(workers_lock_);
222 local_workers = std::move(workers_);
223 }
224
225 for (const auto& worker : local_workers)
226 worker->JoinForTesting();
227
228 {
229 AutoSchedulerLock auto_lock(workers_lock_);
230 DCHECK(workers_.empty())
231 << "New worker(s) unexpectedly registered during join.";
232 workers_ = std::move(local_workers);
233 }
234 }
235
236 SchedulerWorker*
237 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
238 const SchedulerWorkerPoolParams& params) {
239 AutoSchedulerLock auto_lock(workers_lock_);
240 int id = next_worker_id_++;
241 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
242 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
243 workers_.emplace_back(SchedulerWorker::Create(
244 params.priority_hint(), std::move(delegate), task_tracker_,
245 SchedulerWorker::InitialState::DETACHED));
246 return workers_.back().get();
247 }
248
249 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
250 SchedulerWorker* worker) {
251 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
252 // |workers_lock_|.
253 scoped_refptr<SchedulerWorker> worker_to_destroy;
254 {
255 AutoSchedulerLock auto_lock(workers_lock_);
256
257 // We might be joining, so no-op this if |workers_| is empty.
258 if (workers_.empty())
259 return;
260
261 auto worker_iter =
262 std::find_if(workers_.begin(), workers_.end(),
263 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
264 return candidate.get() == worker;
265 });
266 DCHECK(worker_iter != workers_.end());
267 worker_to_destroy = std::move(*worker_iter);
268 workers_.erase(worker_iter);
269 }
270 worker_to_destroy->Cleanup();
271 }
272
273 } // namespace internal
274 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698