OLD | NEW |
---|---|
1 // Copyright 2017 The Chromium Authors. All rights reserved. | 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" | 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <memory> | 8 #include <memory> |
9 #include <string> | 9 #include <string> |
10 | 10 |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
57 : thread_name_(thread_name) {} | 57 : thread_name_(thread_name) {} |
58 | 58 |
59 // SchedulerWorker::Delegate: | 59 // SchedulerWorker::Delegate: |
60 void OnMainEntry(SchedulerWorker* worker) override { | 60 void OnMainEntry(SchedulerWorker* worker) override { |
61 thread_ref_checker_.Set(); | 61 thread_ref_checker_.Set(); |
62 PlatformThread::SetName(thread_name_); | 62 PlatformThread::SetName(thread_name_); |
63 } | 63 } |
64 | 64 |
65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { | 65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
66 AutoSchedulerLock auto_lock(sequence_lock_); | 66 AutoSchedulerLock auto_lock(sequence_lock_); |
67 return std::move(sequence_); | 67 bool has_work = has_work_; |
68 has_work_ = false; | |
69 return has_work ? sequence_ : nullptr; | |
68 } | 70 } |
69 | 71 |
70 void DidRunTask() override {} | 72 void DidRunTask() override {} |
71 | 73 |
72 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { | 74 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
73 AutoSchedulerLock auto_lock(sequence_lock_); | 75 AutoSchedulerLock auto_lock(sequence_lock_); |
74 DCHECK(!sequence_); | 76 // We've shut down, so no-op this work request. Any sequence cleanup will |
75 sequence_ = std::move(sequence); | 77 // occur in the caller's context. |
78 if (!sequence_) | |
79 return; | |
80 | |
81 DCHECK_EQ(sequence, sequence_); | |
82 has_work_ = true; | |
76 } | 83 } |
77 | 84 |
78 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } | 85 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
79 | 86 |
80 bool CanDetach(SchedulerWorker* worker) override { return false; } | 87 bool CanDetach(SchedulerWorker* worker) override { return false; } |
81 | 88 |
82 void OnDetach() override { NOTREACHED(); } | 89 void OnDetach() override { NOTREACHED(); } |
83 | 90 |
84 bool RunsTasksOnCurrentThread() { | 91 bool RunsTasksOnCurrentThread() { |
85 // We check the thread ref instead of the sequence for the benefit of COM | 92 // We check the thread ref instead of the sequence for the benefit of COM |
86 // callbacks which may execute without a sequence context. | 93 // callbacks which may execute without a sequence context. |
87 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); | 94 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
88 } | 95 } |
89 | 96 |
97 void OnMainExit() override { | |
98 // To reclaim skipped tasks on shutdown, we null out the sequence to allow | |
99 // the tasks to destroy themselves. | |
fdoray
2017/03/03 05:31:13
Need to grab the lock to clear the pointer. To avo
robliao
2017/03/03 07:07:14
Nice. Yes indeed!
| |
100 sequence_ = nullptr; | |
101 } | |
102 | |
103 // SchedulerWorkerDelegate: | |
104 | |
105 // Consumers should release their sequence reference as soon as possible to | |
106 // ensure timely cleanup for general shutdown. | |
107 scoped_refptr<Sequence> sequence() { | |
108 AutoSchedulerLock auto_lock(sequence_lock_); | |
109 return sequence_.get(); | |
fdoray
2017/03/03 05:31:13
no .get();
robliao
2017/03/03 07:07:14
Done.
| |
110 } | |
111 | |
90 private: | 112 private: |
91 const std::string thread_name_; | 113 const std::string thread_name_; |
92 | 114 |
93 // Synchronizes access to |sequence_| and handles the fact that | 115 // Synchronizes access to |sequence_| and |has_work_|. |
94 // ReEnqueueSequence() is called on both the worker thread for reenqueuing | |
95 // the sequence and off of the worker thread to seed the sequence for | |
96 // GetWork(). | |
97 SchedulerLock sequence_lock_; | 116 SchedulerLock sequence_lock_; |
98 scoped_refptr<Sequence> sequence_; | 117 scoped_refptr<Sequence> sequence_ = new Sequence; |
118 bool has_work_ = false; | |
99 | 119 |
100 AtomicThreadRefChecker thread_ref_checker_; | 120 AtomicThreadRefChecker thread_ref_checker_; |
101 | 121 |
102 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); | 122 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
103 }; | 123 }; |
104 | 124 |
105 } // namespace | 125 } // namespace |
106 | 126 |
107 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner | 127 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
108 : public SingleThreadTaskRunner { | 128 : public SingleThreadTaskRunner { |
109 public: | 129 public: |
110 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the | 130 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
111 // lifetime of a dedicated |worker| for |traits|. | 131 // lifetime of a dedicated |worker| for |traits|. |
112 SchedulerSingleThreadTaskRunner( | 132 SchedulerSingleThreadTaskRunner( |
113 SchedulerSingleThreadTaskRunnerManager* const outer, | 133 SchedulerSingleThreadTaskRunnerManager* const outer, |
114 const TaskTraits& traits, | 134 const TaskTraits& traits, |
115 SchedulerWorker* worker) | 135 SchedulerWorker* worker) |
116 : outer_(outer), traits_(traits), worker_(worker) { | 136 : outer_(outer), traits_(traits), worker_(worker) { |
117 DCHECK(outer_); | 137 DCHECK(outer_); |
118 DCHECK(worker_); | 138 DCHECK(worker_); |
119 } | 139 } |
120 | 140 |
121 // SingleThreadTaskRunner: | 141 // SingleThreadTaskRunner: |
122 bool PostDelayedTask(const tracked_objects::Location& from_here, | 142 bool PostDelayedTask(const tracked_objects::Location& from_here, |
123 const Closure& closure, | 143 const Closure& closure, |
124 TimeDelta delay) override; | 144 TimeDelta delay) override { |
145 auto task = MakeUnique<Task>(from_here, closure, traits_, delay); | |
146 task->single_thread_task_runner_ref = this; | |
147 | |
148 if (!outer_->task_tracker_->WillPostTask(task.get())) | |
149 return false; | |
150 | |
151 if (task->delayed_run_time.is_null()) { | |
152 PostTaskNow(std::move(task)); | |
153 } else { | |
154 outer_->delayed_task_manager_->AddDelayedTask( | |
155 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, | |
156 Unretained(this))); | |
157 } | |
158 return true; | |
159 } | |
125 | 160 |
126 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 161 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
127 const Closure& closure, | 162 const Closure& closure, |
128 base::TimeDelta delay) override { | 163 base::TimeDelta delay) override { |
129 // Tasks are never nested within the task scheduler. | 164 // Tasks are never nested within the task scheduler. |
130 return PostDelayedTask(from_here, closure, delay); | 165 return PostDelayedTask(from_here, closure, delay); |
131 } | 166 } |
132 | 167 |
133 bool RunsTasksOnCurrentThread() const override { | 168 bool RunsTasksOnCurrentThread() const override { |
134 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); | 169 return GetDelegate()->RunsTasksOnCurrentThread(); |
135 return delegate->RunsTasksOnCurrentThread(); | |
136 } | 170 } |
137 | 171 |
138 private: | 172 private: |
139 ~SchedulerSingleThreadTaskRunner() override { | 173 ~SchedulerSingleThreadTaskRunner() override { |
140 outer_->UnregisterSchedulerWorker(worker_); | 174 outer_->UnregisterSchedulerWorker(worker_); |
141 } | 175 } |
142 | 176 |
143 void PostTaskNow(std::unique_ptr<Task> task); | 177 void PostTaskNow(std::unique_ptr<Task> task) { |
178 scoped_refptr<Sequence> sequence = GetDelegate()->sequence(); | |
179 // If |sequence| is null, then the thread is effectively gone (either | |
180 // shutdonw or joined). We will destroy the task in this context instead. | |
fdoray
2017/03/03 05:31:13
I don't find "We will destroy the task in this con
fdoray
2017/03/03 05:31:13
s/shutdonw/shutdown/
robliao
2017/03/03 07:07:13
Done.
robliao
2017/03/03 07:07:14
Sounds good. Removed.
| |
181 if (!sequence) | |
182 return; | |
144 | 183 |
145 // Sequence for all Tasks posted through this TaskRunner. | 184 const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
146 const scoped_refptr<Sequence> sequence_ = new Sequence; | 185 if (sequence_was_empty) { |
186 GetDelegate()->ReEnqueueSequence(std::move(sequence)); | |
187 worker_->WakeUp(); | |
188 } | |
fdoray
2017/03/03 05:31:13
The sequence and tasks it contained before the Pus
robliao
2017/03/03 07:07:14
Offline discussion:
There is still a potential for
| |
189 } | |
190 | |
191 SchedulerWorkerDelegate* GetDelegate() const { | |
192 return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); | |
193 } | |
147 | 194 |
148 SchedulerSingleThreadTaskRunnerManager* const outer_; | 195 SchedulerSingleThreadTaskRunnerManager* const outer_; |
149 const TaskTraits traits_; | 196 const TaskTraits traits_; |
150 SchedulerWorker* const worker_; | 197 SchedulerWorker* const worker_; |
151 | 198 |
152 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 199 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
153 }; | 200 }; |
154 | 201 |
155 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: | |
156 PostDelayedTask(const tracked_objects::Location& from_here, | |
157 const Closure& closure, | |
158 TimeDelta delay) { | |
159 auto task = MakeUnique<Task>(from_here, closure, traits_, delay); | |
160 task->single_thread_task_runner_ref = this; | |
161 | |
162 if (!outer_->task_tracker_->WillPostTask(task.get())) | |
163 return false; | |
164 | |
165 if (task->delayed_run_time.is_null()) { | |
166 PostTaskNow(std::move(task)); | |
167 } else { | |
168 outer_->delayed_task_manager_->AddDelayedTask( | |
169 std::move(task), | |
170 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); | |
171 } | |
172 return true; | |
173 } | |
174 | |
175 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: | |
176 PostTaskNow(std::unique_ptr<Task> task) { | |
177 const bool sequence_was_empty = sequence_->PushTask(std::move(task)); | |
178 if (sequence_was_empty) { | |
179 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); | |
180 delegate->ReEnqueueSequence(sequence_); | |
181 worker_->WakeUp(); | |
182 } | |
183 } | |
184 | |
185 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( | 202 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
186 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, | 203 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
187 const TaskScheduler::WorkerPoolIndexForTraitsCallback& | 204 const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
188 worker_pool_index_for_traits_callback, | 205 worker_pool_index_for_traits_callback, |
189 TaskTracker* task_tracker, | 206 TaskTracker* task_tracker, |
190 DelayedTaskManager* delayed_task_manager) | 207 DelayedTaskManager* delayed_task_manager) |
191 : worker_pool_params_vector_(worker_pool_params_vector), | 208 : worker_pool_params_vector_(worker_pool_params_vector), |
192 worker_pool_index_for_traits_callback_( | 209 worker_pool_index_for_traits_callback_( |
193 worker_pool_index_for_traits_callback), | 210 worker_pool_index_for_traits_callback), |
194 task_tracker_(task_tracker), | 211 task_tracker_(task_tracker), |
195 delayed_task_manager_(delayed_task_manager) { | 212 delayed_task_manager_(delayed_task_manager) { |
196 DCHECK_GT(worker_pool_params_vector_.size(), 0U); | 213 DCHECK_GT(worker_pool_params_vector_.size(), 0U); |
197 DCHECK(worker_pool_index_for_traits_callback_); | 214 DCHECK(worker_pool_index_for_traits_callback_); |
198 DCHECK(task_tracker_); | 215 DCHECK(task_tracker_); |
199 DCHECK(delayed_task_manager_); | 216 DCHECK(delayed_task_manager_); |
200 } | 217 } |
201 | 218 |
202 SchedulerSingleThreadTaskRunnerManager:: | 219 SchedulerSingleThreadTaskRunnerManager:: |
203 ~SchedulerSingleThreadTaskRunnerManager() { | 220 ~SchedulerSingleThreadTaskRunnerManager() { |
204 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " | 221 #if DCHECK_IS_ON() |
205 "SchedulerSingleThreadTaskRunnerManager"; | 222 size_t workers_unregistered_during_join = |
223 subtle::NoBarrier_Load(&workers_unregistered_during_join_); | |
224 DCHECK_EQ(workers_unregistered_during_join, workers_.size()) | |
225 << "There cannot be outstanding SingleThreadTaskRunners upon destruction" | |
226 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; | |
227 #endif | |
206 } | 228 } |
207 | 229 |
208 scoped_refptr<SingleThreadTaskRunner> | 230 scoped_refptr<SingleThreadTaskRunner> |
209 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( | 231 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
210 const TaskTraits& traits) { | 232 const TaskTraits& traits) { |
211 size_t index = worker_pool_index_for_traits_callback_.Run(traits); | 233 size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
212 DCHECK_LT(index, worker_pool_params_vector_.size()); | 234 DCHECK_LT(index, worker_pool_params_vector_.size()); |
213 return new SchedulerSingleThreadTaskRunner( | 235 return new SchedulerSingleThreadTaskRunner( |
214 this, traits, | 236 this, traits, |
215 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); | 237 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
247 } | 269 } |
248 | 270 |
249 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( | 271 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
250 SchedulerWorker* worker) { | 272 SchedulerWorker* worker) { |
251 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing | 273 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
252 // |workers_lock_|. | 274 // |workers_lock_|. |
253 scoped_refptr<SchedulerWorker> worker_to_destroy; | 275 scoped_refptr<SchedulerWorker> worker_to_destroy; |
254 { | 276 { |
255 AutoSchedulerLock auto_lock(workers_lock_); | 277 AutoSchedulerLock auto_lock(workers_lock_); |
256 | 278 |
257 // We might be joining, so no-op this if |workers_| is empty. | 279 // We might be joining, so record that a worker was unregistered for |
258 if (workers_.empty()) | 280 // verification at destruction. |
281 if (workers_.empty()) { | |
282 #if DCHECK_IS_ON() | |
283 subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); | |
284 #endif | |
259 return; | 285 return; |
286 } | |
260 | 287 |
261 auto worker_iter = | 288 auto worker_iter = |
262 std::find_if(workers_.begin(), workers_.end(), | 289 std::find_if(workers_.begin(), workers_.end(), |
263 [worker](const scoped_refptr<SchedulerWorker>& candidate) { | 290 [worker](const scoped_refptr<SchedulerWorker>& candidate) { |
264 return candidate.get() == worker; | 291 return candidate.get() == worker; |
265 }); | 292 }); |
266 DCHECK(worker_iter != workers_.end()); | 293 DCHECK(worker_iter != workers_.end()); |
267 worker_to_destroy = std::move(*worker_iter); | 294 worker_to_destroy = std::move(*worker_iter); |
268 workers_.erase(worker_iter); | 295 workers_.erase(worker_iter); |
269 } | 296 } |
270 worker_to_destroy->Cleanup(); | 297 worker_to_destroy->Cleanup(); |
271 } | 298 } |
272 | 299 |
273 } // namespace internal | 300 } // namespace internal |
274 } // namespace base | 301 } // namespace base |
OLD | NEW |