Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(438)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2726073002: WILL BE MERGED Change Ownership of Sequence to the Single Thread SchedulerWorker Delegate (Closed)
Patch Set: CR Feedback Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <memory> 8 #include <memory>
9 #include <string> 9 #include <string>
10 10
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
57 : thread_name_(thread_name) {} 57 : thread_name_(thread_name) {}
58 58
59 // SchedulerWorker::Delegate: 59 // SchedulerWorker::Delegate:
60 void OnMainEntry(SchedulerWorker* worker) override { 60 void OnMainEntry(SchedulerWorker* worker) override {
61 thread_ref_checker_.Set(); 61 thread_ref_checker_.Set();
62 PlatformThread::SetName(thread_name_); 62 PlatformThread::SetName(thread_name_);
63 } 63 }
64 64
65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { 65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
66 AutoSchedulerLock auto_lock(sequence_lock_); 66 AutoSchedulerLock auto_lock(sequence_lock_);
67 return std::move(sequence_); 67 bool has_work = has_work_;
68 has_work_ = false;
69 return has_work ? sequence_ : nullptr;
68 } 70 }
69 71
70 void DidRunTask() override {} 72 void DidRunTask() override {}
71 73
72 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { 74 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
73 AutoSchedulerLock auto_lock(sequence_lock_); 75 AutoSchedulerLock auto_lock(sequence_lock_);
74 DCHECK(!sequence_); 76 // We've shut down, so no-op this work request. Any sequence cleanup will
75 sequence_ = std::move(sequence); 77 // occur in the caller's context.
78 if (!sequence_)
79 return;
80
81 DCHECK_EQ(sequence, sequence_);
82 has_work_ = true;
76 } 83 }
77 84
78 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } 85 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
79 86
80 bool CanDetach(SchedulerWorker* worker) override { return false; } 87 bool CanDetach(SchedulerWorker* worker) override { return false; }
81 88
82 void OnDetach() override { NOTREACHED(); } 89 void OnDetach() override { NOTREACHED(); }
83 90
84 bool RunsTasksOnCurrentThread() { 91 bool RunsTasksOnCurrentThread() {
85 // We check the thread ref instead of the sequence for the benefit of COM 92 // We check the thread ref instead of the sequence for the benefit of COM
86 // callbacks which may execute without a sequence context. 93 // callbacks which may execute without a sequence context.
87 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); 94 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
88 } 95 }
89 96
97 void OnMainExit() override {
98 // To reclaim skipped tasks on shutdown, we null out the sequence to allow
99 // the tasks to destroy themselves.
fdoray 2017/03/03 05:31:13 Need to grab the lock to clear the pointer. To avo
robliao 2017/03/03 07:07:14 Nice. Yes indeed!
100 sequence_ = nullptr;
101 }
102
103 // SchedulerWorkerDelegate:
104
105 // Consumers should release their sequence reference as soon as possible to
106 // ensure timely cleanup for general shutdown.
107 scoped_refptr<Sequence> sequence() {
108 AutoSchedulerLock auto_lock(sequence_lock_);
109 return sequence_.get();
fdoray 2017/03/03 05:31:13 no .get();
robliao 2017/03/03 07:07:14 Done.
110 }
111
90 private: 112 private:
91 const std::string thread_name_; 113 const std::string thread_name_;
92 114
93 // Synchronizes access to |sequence_| and handles the fact that 115 // Synchronizes access to |sequence_| and |has_work_|.
94 // ReEnqueueSequence() is called on both the worker thread for reenqueuing
95 // the sequence and off of the worker thread to seed the sequence for
96 // GetWork().
97 SchedulerLock sequence_lock_; 116 SchedulerLock sequence_lock_;
98 scoped_refptr<Sequence> sequence_; 117 scoped_refptr<Sequence> sequence_ = new Sequence;
118 bool has_work_ = false;
99 119
100 AtomicThreadRefChecker thread_ref_checker_; 120 AtomicThreadRefChecker thread_ref_checker_;
101 121
102 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); 122 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
103 }; 123 };
104 124
105 } // namespace 125 } // namespace
106 126
107 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner 127 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
108 : public SingleThreadTaskRunner { 128 : public SingleThreadTaskRunner {
109 public: 129 public:
110 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the 130 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
111 // lifetime of a dedicated |worker| for |traits|. 131 // lifetime of a dedicated |worker| for |traits|.
112 SchedulerSingleThreadTaskRunner( 132 SchedulerSingleThreadTaskRunner(
113 SchedulerSingleThreadTaskRunnerManager* const outer, 133 SchedulerSingleThreadTaskRunnerManager* const outer,
114 const TaskTraits& traits, 134 const TaskTraits& traits,
115 SchedulerWorker* worker) 135 SchedulerWorker* worker)
116 : outer_(outer), traits_(traits), worker_(worker) { 136 : outer_(outer), traits_(traits), worker_(worker) {
117 DCHECK(outer_); 137 DCHECK(outer_);
118 DCHECK(worker_); 138 DCHECK(worker_);
119 } 139 }
120 140
121 // SingleThreadTaskRunner: 141 // SingleThreadTaskRunner:
122 bool PostDelayedTask(const tracked_objects::Location& from_here, 142 bool PostDelayedTask(const tracked_objects::Location& from_here,
123 const Closure& closure, 143 const Closure& closure,
124 TimeDelta delay) override; 144 TimeDelta delay) override {
145 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
146 task->single_thread_task_runner_ref = this;
147
148 if (!outer_->task_tracker_->WillPostTask(task.get()))
149 return false;
150
151 if (task->delayed_run_time.is_null()) {
152 PostTaskNow(std::move(task));
153 } else {
154 outer_->delayed_task_manager_->AddDelayedTask(
155 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow,
156 Unretained(this)));
157 }
158 return true;
159 }
125 160
126 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 161 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
127 const Closure& closure, 162 const Closure& closure,
128 base::TimeDelta delay) override { 163 base::TimeDelta delay) override {
129 // Tasks are never nested within the task scheduler. 164 // Tasks are never nested within the task scheduler.
130 return PostDelayedTask(from_here, closure, delay); 165 return PostDelayedTask(from_here, closure, delay);
131 } 166 }
132 167
133 bool RunsTasksOnCurrentThread() const override { 168 bool RunsTasksOnCurrentThread() const override {
134 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); 169 return GetDelegate()->RunsTasksOnCurrentThread();
135 return delegate->RunsTasksOnCurrentThread();
136 } 170 }
137 171
138 private: 172 private:
139 ~SchedulerSingleThreadTaskRunner() override { 173 ~SchedulerSingleThreadTaskRunner() override {
140 outer_->UnregisterSchedulerWorker(worker_); 174 outer_->UnregisterSchedulerWorker(worker_);
141 } 175 }
142 176
143 void PostTaskNow(std::unique_ptr<Task> task); 177 void PostTaskNow(std::unique_ptr<Task> task) {
178 scoped_refptr<Sequence> sequence = GetDelegate()->sequence();
179 // If |sequence| is null, then the thread is effectively gone (either
180 // shutdonw or joined). We will destroy the task in this context instead.
fdoray 2017/03/03 05:31:13 I don't find "We will destroy the task in this con
fdoray 2017/03/03 05:31:13 s/shutdonw/shutdown/
robliao 2017/03/03 07:07:13 Done.
robliao 2017/03/03 07:07:14 Sounds good. Removed.
181 if (!sequence)
182 return;
144 183
145 // Sequence for all Tasks posted through this TaskRunner. 184 const bool sequence_was_empty = sequence->PushTask(std::move(task));
146 const scoped_refptr<Sequence> sequence_ = new Sequence; 185 if (sequence_was_empty) {
186 GetDelegate()->ReEnqueueSequence(std::move(sequence));
187 worker_->WakeUp();
188 }
fdoray 2017/03/03 05:31:13 The sequence and tasks it contained before the Pus
robliao 2017/03/03 07:07:14 Offline discussion: There is still a potential for
189 }
190
191 SchedulerWorkerDelegate* GetDelegate() const {
192 return static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
193 }
147 194
148 SchedulerSingleThreadTaskRunnerManager* const outer_; 195 SchedulerSingleThreadTaskRunnerManager* const outer_;
149 const TaskTraits traits_; 196 const TaskTraits traits_;
150 SchedulerWorker* const worker_; 197 SchedulerWorker* const worker_;
151 198
152 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); 199 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
153 }; 200 };
154 201
155 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
156 PostDelayedTask(const tracked_objects::Location& from_here,
157 const Closure& closure,
158 TimeDelta delay) {
159 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
160 task->single_thread_task_runner_ref = this;
161
162 if (!outer_->task_tracker_->WillPostTask(task.get()))
163 return false;
164
165 if (task->delayed_run_time.is_null()) {
166 PostTaskNow(std::move(task));
167 } else {
168 outer_->delayed_task_manager_->AddDelayedTask(
169 std::move(task),
170 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
171 }
172 return true;
173 }
174
175 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
176 PostTaskNow(std::unique_ptr<Task> task) {
177 const bool sequence_was_empty = sequence_->PushTask(std::move(task));
178 if (sequence_was_empty) {
179 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
180 delegate->ReEnqueueSequence(sequence_);
181 worker_->WakeUp();
182 }
183 }
184
185 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( 202 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
186 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, 203 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
187 const TaskScheduler::WorkerPoolIndexForTraitsCallback& 204 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
188 worker_pool_index_for_traits_callback, 205 worker_pool_index_for_traits_callback,
189 TaskTracker* task_tracker, 206 TaskTracker* task_tracker,
190 DelayedTaskManager* delayed_task_manager) 207 DelayedTaskManager* delayed_task_manager)
191 : worker_pool_params_vector_(worker_pool_params_vector), 208 : worker_pool_params_vector_(worker_pool_params_vector),
192 worker_pool_index_for_traits_callback_( 209 worker_pool_index_for_traits_callback_(
193 worker_pool_index_for_traits_callback), 210 worker_pool_index_for_traits_callback),
194 task_tracker_(task_tracker), 211 task_tracker_(task_tracker),
195 delayed_task_manager_(delayed_task_manager) { 212 delayed_task_manager_(delayed_task_manager) {
196 DCHECK_GT(worker_pool_params_vector_.size(), 0U); 213 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
197 DCHECK(worker_pool_index_for_traits_callback_); 214 DCHECK(worker_pool_index_for_traits_callback_);
198 DCHECK(task_tracker_); 215 DCHECK(task_tracker_);
199 DCHECK(delayed_task_manager_); 216 DCHECK(delayed_task_manager_);
200 } 217 }
201 218
202 SchedulerSingleThreadTaskRunnerManager:: 219 SchedulerSingleThreadTaskRunnerManager::
203 ~SchedulerSingleThreadTaskRunnerManager() { 220 ~SchedulerSingleThreadTaskRunnerManager() {
204 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " 221 #if DCHECK_IS_ON()
205 "SchedulerSingleThreadTaskRunnerManager"; 222 size_t workers_unregistered_during_join =
223 subtle::NoBarrier_Load(&workers_unregistered_during_join_);
224 DCHECK_EQ(workers_unregistered_during_join, workers_.size())
225 << "There cannot be outstanding SingleThreadTaskRunners upon destruction"
226 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler";
227 #endif
206 } 228 }
207 229
208 scoped_refptr<SingleThreadTaskRunner> 230 scoped_refptr<SingleThreadTaskRunner>
209 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( 231 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
210 const TaskTraits& traits) { 232 const TaskTraits& traits) {
211 size_t index = worker_pool_index_for_traits_callback_.Run(traits); 233 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
212 DCHECK_LT(index, worker_pool_params_vector_.size()); 234 DCHECK_LT(index, worker_pool_params_vector_.size());
213 return new SchedulerSingleThreadTaskRunner( 235 return new SchedulerSingleThreadTaskRunner(
214 this, traits, 236 this, traits,
215 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); 237 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
247 } 269 }
248 270
249 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( 271 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
250 SchedulerWorker* worker) { 272 SchedulerWorker* worker) {
251 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing 273 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
252 // |workers_lock_|. 274 // |workers_lock_|.
253 scoped_refptr<SchedulerWorker> worker_to_destroy; 275 scoped_refptr<SchedulerWorker> worker_to_destroy;
254 { 276 {
255 AutoSchedulerLock auto_lock(workers_lock_); 277 AutoSchedulerLock auto_lock(workers_lock_);
256 278
257 // We might be joining, so no-op this if |workers_| is empty. 279 // We might be joining, so record that a worker was unregistered for
258 if (workers_.empty()) 280 // verification at destruction.
281 if (workers_.empty()) {
282 #if DCHECK_IS_ON()
283 subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1);
284 #endif
259 return; 285 return;
286 }
260 287
261 auto worker_iter = 288 auto worker_iter =
262 std::find_if(workers_.begin(), workers_.end(), 289 std::find_if(workers_.begin(), workers_.end(),
263 [worker](const scoped_refptr<SchedulerWorker>& candidate) { 290 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
264 return candidate.get() == worker; 291 return candidate.get() == worker;
265 }); 292 });
266 DCHECK(worker_iter != workers_.end()); 293 DCHECK(worker_iter != workers_.end());
267 worker_to_destroy = std::move(*worker_iter); 294 worker_to_destroy = std::move(*worker_iter);
268 workers_.erase(worker_iter); 295 workers_.erase(worker_iter);
269 } 296 }
270 worker_to_destroy->Cleanup(); 297 worker_to_destroy->Cleanup();
271 } 298 }
272 299
273 } // namespace internal 300 } // namespace internal
274 } // namespace base 301 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698