Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(427)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/strings/stringprintf.h"
15 #include "base/synchronization/atomic_flag.h"
16 #include "base/task_scheduler/delayed_task_manager.h"
17 #include "base/task_scheduler/scheduler_worker.h"
18 #include "base/task_scheduler/sequence.h"
19 #include "base/task_scheduler/task.h"
20 #include "base/task_scheduler/task_tracker.h"
21 #include "base/task_scheduler/task_traits.h"
22 #include "base/threading/platform_thread.h"
23 #include "base/time/time.h"
24
25 namespace base {
26 namespace internal {
27
28 namespace {
29
30 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
31 public:
32 SchedulerWorkerDelegate(const std::string& thread_name)
fdoray 2017/02/22 17:40:40 #include <string>
robliao 2017/02/22 21:50:28 Done.
33 : thread_name_(thread_name) {}
34
35 // SchedulerWorker::Delegate:
36 void OnMainEntry(SchedulerWorker* worker) override {
37 PlatformThread::SetName(thread_name_);
38 }
39
40 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
41 AutoSchedulerLock auto_lock(sequence_lock_);
42 return std::move(sequence_);
43 }
44
45 void DidRunTask() override {}
46
47 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
48 AutoSchedulerLock auto_lock(sequence_lock_);
49 DCHECK(!sequence_);
50 sequence_ = std::move(sequence);
51 }
52
53 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
54
55 bool CanDetach(SchedulerWorker* worker) override {
56 return can_detach_.IsSet();
57 }
58
59 void OnDetach() override {}
60
61 // SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate:
62 void AllowDetach() { can_detach_.Set(); }
63
64 private:
65 const std::string thread_name_;
66
67 // Synchronizes access to |sequence_| and handles the fact that
68 // ReEnqueueSequence() is called on both on the worker thread for reenqueuing
gab 2017/02/22 18:18:21 s/on both on/both on/ ?
robliao 2017/02/22 21:50:28 ... on both the worker thread ... Done
69 // the sequence and off of the worker thread to seed the sequence for
70 // GetWork().
71 SchedulerLock sequence_lock_;
72 scoped_refptr<Sequence> sequence_;
73
74 AtomicFlag can_detach_;
75
76 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
77 };
78
79 } // namespace
80
81 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
82 : public SingleThreadTaskRunner {
83 public:
84 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
85 // lifetime of a dedicated |worker| for |traits|.
86 SchedulerSingleThreadTaskRunner(
87 SchedulerSingleThreadTaskRunnerManager* const outer,
88 const TaskTraits& traits,
89 SchedulerWorker* worker)
90 : outer_(outer), traits_(traits), worker_(worker) {
91 DCHECK(outer_);
92 DCHECK(worker_);
93 }
94
95 // SingleThreadTaskRunner:
96 bool PostDelayedTask(const tracked_objects::Location& from_here,
97 const Closure& closure,
98 TimeDelta delay) override;
99
100 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
101 const Closure& closure,
102 base::TimeDelta delay) override {
103 // Tasks are never nested within the task scheduler.
104 return PostDelayedTask(from_here, closure, delay);
105 }
106
107 bool RunsTasksOnCurrentThread() const override {
108 return sequence_->token() == SequenceToken::GetForCurrentThread();
fdoray 2017/02/22 17:40:40 Should we check the thread id instead? That way, R
robliao 2017/02/22 21:50:28 Hrm... now that's tricky as SchedulerWorker doesn'
109 }
110
111 private:
112 ~SchedulerSingleThreadTaskRunner() override {
113 outer_->UnregisterSchedulerWorker(worker_);
114 }
115
116 void PostTaskNow(std::unique_ptr<Task> task);
117
118 // Sequence for all Tasks posted through this TaskRunner.
119 const scoped_refptr<Sequence> sequence_ = new Sequence;
120
121 SchedulerSingleThreadTaskRunnerManager* const outer_;
122 const TaskTraits traits_;
123 SchedulerWorker* const worker_;
124
125 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
126 };
127
128 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
129 PostDelayedTask(const tracked_objects::Location& from_here,
130 const Closure& closure,
131 TimeDelta delay) {
132 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
133 task->single_thread_task_runner_ref = this;
134
135 if (!outer_->task_tracker_->WillPostTask(task.get()))
136 return false;
137
138 if (task->delayed_run_time.is_null()) {
139 PostTaskNow(std::move(task));
140 } else {
141 outer_->delayed_task_manager_->AddDelayedTask(
142 std::move(task),
143 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
144 }
145 return true;
146 }
147
148 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
149 PostTaskNow(std::unique_ptr<Task> task) {
150 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
151 if (sequence_was_empty) {
152 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
153 delegate->ReEnqueueSequence(sequence_);
154 worker_->WakeUp();
155 }
156 }
157
158 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
159 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
160 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
161 worker_pool_index_for_traits_callback,
162 TaskTracker* task_tracker,
163 DelayedTaskManager* delayed_task_manager)
164 : worker_pool_params_vector_(worker_pool_params_vector),
165 worker_pool_index_for_traits_callback_(
166 worker_pool_index_for_traits_callback),
167 task_tracker_(task_tracker),
168 delayed_task_manager_(delayed_task_manager) {
169 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
170 DCHECK(worker_pool_index_for_traits_callback_);
171 DCHECK(task_tracker_);
172 DCHECK(delayed_task_manager_);
173 }
174
175 SchedulerSingleThreadTaskRunnerManager::
176 ~SchedulerSingleThreadTaskRunnerManager() = default;
fdoray 2017/02/22 17:40:40 DCHECK(workers_.empty()) << "Deleting SchedulerSin
robliao 2017/02/22 21:50:28 Done.
177
178 scoped_refptr<SingleThreadTaskRunner>
179 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
180 const TaskTraits& traits) {
181 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
182 DCHECK_LT(index, worker_pool_params_vector_.size());
183 return new SchedulerSingleThreadTaskRunner(
184 this, traits,
185 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
186 }
187
188 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
189 decltype(workers_) local_workers;
190 {
191 AutoSchedulerLock auto_lock(workers_lock_);
192 local_workers = std::move(workers_);
193 }
194
195 for (const auto& worker : local_workers)
196 worker->JoinForTesting();
197
198 {
199 AutoSchedulerLock auto_lock(workers_lock_);
200 DCHECK(workers_.empty())
201 << "New worker(s) unexpectedly registered during join.";
202 workers_ = std::move(local_workers);
203 }
204 }
205
206 SchedulerWorker*
207 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
208 const SchedulerWorkerPoolParams& params) {
209 AutoSchedulerLock auto_lock(workers_lock_);
210 int id = next_worker_id_++;
211 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
212 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
213 workers_.emplace_back(SchedulerWorker::Create(
214 params.priority_hint(), std::move(delegate), task_tracker_,
215 SchedulerWorker::InitialState::DETACHED));
216 return workers_.back().get();
217 }
218
219 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
220 SchedulerWorker* worker) {
221 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
222 // |workers_lock_|.
223 scoped_refptr<SchedulerWorker> worker_to_destroy;
224 {
225 AutoSchedulerLock auto_lock(workers_lock_);
226
227 // We might be joining, so no-op this if |workers_| is empty.
228 if (workers_.empty())
229 return;
230
231 auto worker_iter =
232 std::find_if(workers_.begin(), workers_.end(),
233 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
234 return candidate.get() == worker;
235 });
236 DCHECK(worker_iter != workers_.end());
fdoray 2017/02/22 17:40:40 DCHECK_NE? Maybe it doesn't work with iterators.
robliao 2017/02/22 21:50:28 That's right. The template for value types like th
237 worker_to_destroy = std::move(*worker_iter);
238 workers_.erase(worker_iter);
239 }
240 worker_to_destroy->Cleanup();
241 }
242
243 } // namespace internal
244 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698