Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2017 The Chromium Authors. All rights reserved. | 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" | 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <memory> | 8 #include <memory> |
| 9 #include <string> | 9 #include <string> |
| 10 | 10 |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 57 : thread_name_(thread_name) {} | 57 : thread_name_(thread_name) {} |
| 58 | 58 |
| 59 // SchedulerWorker::Delegate: | 59 // SchedulerWorker::Delegate: |
| 60 void OnMainEntry(SchedulerWorker* worker) override { | 60 void OnMainEntry(SchedulerWorker* worker) override { |
| 61 thread_ref_checker_.Set(); | 61 thread_ref_checker_.Set(); |
| 62 PlatformThread::SetName(thread_name_); | 62 PlatformThread::SetName(thread_name_); |
| 63 } | 63 } |
| 64 | 64 |
| 65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { | 65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
| 66 AutoSchedulerLock auto_lock(sequence_lock_); | 66 AutoSchedulerLock auto_lock(sequence_lock_); |
| 67 return std::move(sequence_); | 67 bool has_work = has_work_; |
| 68 has_work_ = false; | |
| 69 return has_work ? sequence_ : nullptr; | |
| 68 } | 70 } |
| 69 | 71 |
| 70 void DidRunTask() override {} | 72 void DidRunTask() override {} |
| 71 | 73 |
| 72 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { | 74 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
| 73 AutoSchedulerLock auto_lock(sequence_lock_); | 75 AutoSchedulerLock auto_lock(sequence_lock_); |
| 74 DCHECK(!sequence_); | 76 DCHECK_EQ(sequence, sequence_); |
| 75 sequence_ = std::move(sequence); | 77 has_work_ = true; |
| 76 } | 78 } |
| 77 | 79 |
| 78 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } | 80 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
| 79 | 81 |
| 80 bool CanDetach(SchedulerWorker* worker) override { return false; } | 82 bool CanDetach(SchedulerWorker* worker) override { return false; } |
| 81 | 83 |
| 82 void OnDetach() override { NOTREACHED(); } | 84 void OnDetach() override { NOTREACHED(); } |
| 83 | 85 |
| 84 bool RunsTasksOnCurrentThread() { | 86 bool RunsTasksOnCurrentThread() { |
| 85 // We check the thread ref instead of the sequence for the benefit of COM | 87 // We check the thread ref instead of the sequence for the benefit of COM |
| 86 // callbacks which may execute without a sequence context. | 88 // callbacks which may execute without a sequence context. |
| 87 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); | 89 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
| 88 } | 90 } |
| 89 | 91 |
| 92 void OnMainExit() override { | |
| 93 // To reclaim skipped tasks on shutdown, we null out the sequence to allow | |
| 94 // the tasks to destroy themselves. | |
|
fdoray
2017/03/02 14:37:48
DCHECK(sequence->HasOneRef());
robliao
2017/03/03 04:24:17
In general for refcounted things, it's a dangerous
| |
| 95 sequence_ = nullptr; | |
| 96 } | |
| 97 | |
| 98 // SchedulerWorkerDelegate: | |
| 99 Sequence* sequence() { return sequence_.get(); } | |
| 100 | |
| 90 private: | 101 private: |
| 91 const std::string thread_name_; | 102 const std::string thread_name_; |
| 92 | 103 |
| 93 // Synchronizes access to |sequence_| and handles the fact that | 104 // Synchronizes access to |sequence_| and |has_work_|. |
| 94 // ReEnqueueSequence() is called on both the worker thread for reenqueuing | |
| 95 // the sequence and off of the worker thread to seed the sequence for | |
| 96 // GetWork(). | |
| 97 SchedulerLock sequence_lock_; | 105 SchedulerLock sequence_lock_; |
| 98 scoped_refptr<Sequence> sequence_; | 106 scoped_refptr<Sequence> sequence_ = new Sequence; |
| 107 bool has_work_ = false; | |
| 99 | 108 |
| 100 AtomicThreadRefChecker thread_ref_checker_; | 109 AtomicThreadRefChecker thread_ref_checker_; |
| 101 | 110 |
| 102 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); | 111 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
| 103 }; | 112 }; |
| 104 | 113 |
| 105 } // namespace | 114 } // namespace |
| 106 | 115 |
| 107 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner | 116 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| 108 : public SingleThreadTaskRunner { | 117 : public SingleThreadTaskRunner { |
| 109 public: | 118 public: |
| 110 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the | 119 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
| 111 // lifetime of a dedicated |worker| for |traits|. | 120 // lifetime of a dedicated |worker| for |traits|. |
| 112 SchedulerSingleThreadTaskRunner( | 121 SchedulerSingleThreadTaskRunner( |
| 113 SchedulerSingleThreadTaskRunnerManager* const outer, | 122 SchedulerSingleThreadTaskRunnerManager* const outer, |
| 114 const TaskTraits& traits, | 123 const TaskTraits& traits, |
| 115 SchedulerWorker* worker) | 124 SchedulerWorker* worker) |
| 116 : outer_(outer), traits_(traits), worker_(worker) { | 125 : outer_(outer), traits_(traits), worker_(worker) { |
| 117 DCHECK(outer_); | 126 DCHECK(outer_); |
| 118 DCHECK(worker_); | 127 DCHECK(worker_); |
| 119 } | 128 } |
| 120 | 129 |
| 121 // SingleThreadTaskRunner: | 130 // SingleThreadTaskRunner: |
| 122 bool PostDelayedTask(const tracked_objects::Location& from_here, | 131 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 123 const Closure& closure, | 132 const Closure& closure, |
| 124 TimeDelta delay) override; | 133 TimeDelta delay) override { |
| 134 auto task = MakeUnique<Task>(from_here, closure, traits_, delay); | |
| 135 task->single_thread_task_runner_ref = this; | |
| 136 | |
| 137 if (!outer_->task_tracker_->WillPostTask(task.get())) | |
| 138 return false; | |
| 139 | |
| 140 if (task->delayed_run_time.is_null()) { | |
| 141 PostTaskNow(std::move(task)); | |
| 142 } else { | |
| 143 outer_->delayed_task_manager_->AddDelayedTask( | |
| 144 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, | |
| 145 Unretained(this))); | |
| 146 } | |
| 147 return true; | |
| 148 } | |
| 125 | 149 |
| 126 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 150 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 127 const Closure& closure, | 151 const Closure& closure, |
| 128 base::TimeDelta delay) override { | 152 base::TimeDelta delay) override { |
| 129 // Tasks are never nested within the task scheduler. | 153 // Tasks are never nested within the task scheduler. |
| 130 return PostDelayedTask(from_here, closure, delay); | 154 return PostDelayedTask(from_here, closure, delay); |
| 131 } | 155 } |
| 132 | 156 |
| 133 bool RunsTasksOnCurrentThread() const override { | 157 bool RunsTasksOnCurrentThread() const override { |
| 134 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); | 158 return GetDelegate()->RunsTasksOnCurrentThread(); |
| 135 return delegate->RunsTasksOnCurrentThread(); | |
| 136 } | 159 } |
| 137 | 160 |
| 138 private: | 161 private: |
| 139 ~SchedulerSingleThreadTaskRunner() override { | 162 ~SchedulerSingleThreadTaskRunner() override { |
| 140 outer_->UnregisterSchedulerWorker(worker_); | 163 outer_->UnregisterSchedulerWorker(worker_); |
| 141 } | 164 } |
| 142 | 165 |
| 143 void PostTaskNow(std::unique_ptr<Task> task); | 166 void PostTaskNow(std::unique_ptr<Task> task) { |
| 167 Sequence* sequence = GetDelegate()->sequence(); | |
|
fdoray
2017/03/02 14:37:48
When shutdown completes, the thread exits and the
robliao
2017/03/03 04:24:17
Handled by making this a scoped_refptr. At this po
| |
| 168 const bool sequence_was_empty = sequence->PushTask(std::move(task)); | |
| 169 if (sequence_was_empty) { | |
| 170 GetDelegate()->ReEnqueueSequence(sequence); | |
| 171 worker_->WakeUp(); | |
| 172 } | |
| 173 } | |
| 144 | 174 |
| 145 // Sequence for all Tasks posted through this TaskRunner. | 175 SchedulerWorkerDelegate* GetDelegate() const { |
| 146 const scoped_refptr<Sequence> sequence_ = new Sequence; | 176 return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| 177 } | |
| 147 | 178 |
| 148 SchedulerSingleThreadTaskRunnerManager* const outer_; | 179 SchedulerSingleThreadTaskRunnerManager* const outer_; |
| 149 const TaskTraits traits_; | 180 const TaskTraits traits_; |
| 150 SchedulerWorker* const worker_; | 181 SchedulerWorker* const worker_; |
| 151 | 182 |
| 152 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 183 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| 153 }; | 184 }; |
| 154 | 185 |
| 155 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: | |
| 156 PostDelayedTask(const tracked_objects::Location& from_here, | |
| 157 const Closure& closure, | |
| 158 TimeDelta delay) { | |
| 159 auto task = MakeUnique<Task>(from_here, closure, traits_, delay); | |
| 160 task->single_thread_task_runner_ref = this; | |
| 161 | |
| 162 if (!outer_->task_tracker_->WillPostTask(task.get())) | |
| 163 return false; | |
| 164 | |
| 165 if (task->delayed_run_time.is_null()) { | |
| 166 PostTaskNow(std::move(task)); | |
| 167 } else { | |
| 168 outer_->delayed_task_manager_->AddDelayedTask( | |
| 169 std::move(task), | |
| 170 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); | |
| 171 } | |
| 172 return true; | |
| 173 } | |
| 174 | |
| 175 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: | |
| 176 PostTaskNow(std::unique_ptr<Task> task) { | |
| 177 const bool sequence_was_empty = sequence_->PushTask(std::move(task)); | |
| 178 if (sequence_was_empty) { | |
| 179 auto* delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); | |
| 180 delegate->ReEnqueueSequence(sequence_); | |
| 181 worker_->WakeUp(); | |
| 182 } | |
| 183 } | |
| 184 | |
| 185 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( | 186 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
| 186 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, | 187 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
| 187 const TaskScheduler::WorkerPoolIndexForTraitsCallback& | 188 const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
| 188 worker_pool_index_for_traits_callback, | 189 worker_pool_index_for_traits_callback, |
| 189 TaskTracker* task_tracker, | 190 TaskTracker* task_tracker, |
| 190 DelayedTaskManager* delayed_task_manager) | 191 DelayedTaskManager* delayed_task_manager) |
| 191 : worker_pool_params_vector_(worker_pool_params_vector), | 192 : worker_pool_params_vector_(worker_pool_params_vector), |
| 192 worker_pool_index_for_traits_callback_( | 193 worker_pool_index_for_traits_callback_( |
| 193 worker_pool_index_for_traits_callback), | 194 worker_pool_index_for_traits_callback), |
| 194 task_tracker_(task_tracker), | 195 task_tracker_(task_tracker), |
| 195 delayed_task_manager_(delayed_task_manager) { | 196 delayed_task_manager_(delayed_task_manager) { |
| 196 DCHECK_GT(worker_pool_params_vector_.size(), 0U); | 197 DCHECK_GT(worker_pool_params_vector_.size(), 0U); |
| 197 DCHECK(worker_pool_index_for_traits_callback_); | 198 DCHECK(worker_pool_index_for_traits_callback_); |
| 198 DCHECK(task_tracker_); | 199 DCHECK(task_tracker_); |
| 199 DCHECK(delayed_task_manager_); | 200 DCHECK(delayed_task_manager_); |
| 200 } | 201 } |
| 201 | 202 |
| 202 SchedulerSingleThreadTaskRunnerManager:: | 203 SchedulerSingleThreadTaskRunnerManager:: |
| 203 ~SchedulerSingleThreadTaskRunnerManager() { | 204 ~SchedulerSingleThreadTaskRunnerManager() { |
| 204 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " | 205 size_t workers_unregistered_during_join = |
| 205 "SchedulerSingleThreadTaskRunnerManager"; | 206 subtle::NoBarrier_Load(&workers_unregistered_during_join_); |
| 207 DCHECK_EQ(workers_unregistered_during_join, workers_.size()) | |
| 208 << "SchedulerSingleThreadTaskRunners must outlive " | |
|
fdoray
2017/03/02 14:37:48
I think this would be clearer from a user point of
robliao
2017/03/03 04:24:17
While that's true, it doesn't make sense in the co
| |
| 209 "SchedulerSingleThreadTaskRunnerManager"; | |
| 206 } | 210 } |
| 207 | 211 |
| 208 scoped_refptr<SingleThreadTaskRunner> | 212 scoped_refptr<SingleThreadTaskRunner> |
| 209 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( | 213 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
| 210 const TaskTraits& traits) { | 214 const TaskTraits& traits) { |
| 211 size_t index = worker_pool_index_for_traits_callback_.Run(traits); | 215 size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| 212 DCHECK_LT(index, worker_pool_params_vector_.size()); | 216 DCHECK_LT(index, worker_pool_params_vector_.size()); |
| 213 return new SchedulerSingleThreadTaskRunner( | 217 return new SchedulerSingleThreadTaskRunner( |
| 214 this, traits, | 218 this, traits, |
| 215 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); | 219 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 247 } | 251 } |
| 248 | 252 |
| 249 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( | 253 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| 250 SchedulerWorker* worker) { | 254 SchedulerWorker* worker) { |
| 251 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing | 255 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
| 252 // |workers_lock_|. | 256 // |workers_lock_|. |
| 253 scoped_refptr<SchedulerWorker> worker_to_destroy; | 257 scoped_refptr<SchedulerWorker> worker_to_destroy; |
| 254 { | 258 { |
| 255 AutoSchedulerLock auto_lock(workers_lock_); | 259 AutoSchedulerLock auto_lock(workers_lock_); |
| 256 | 260 |
| 257 // We might be joining, so no-op this if |workers_| is empty. | 261 // We might be joining, so record that a worker was unregistered for |
| 258 if (workers_.empty()) | 262 // verification at destruction. |
| 263 if (workers_.empty()) { | |
| 264 subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); | |
|
fdoray
2017/03/02 14:37:48
Use base::AtomicRefCountInc and base::AtomicRefCou
robliao
2017/03/03 04:24:17
Those functions are intended for reference countin
fdoray
2017/03/03 05:31:13
They want people to use base/memory/ref_counted.h
fdoray
2017/03/03 07:22:39
ping
robliao
2017/03/03 07:50:05
The intent of atomic_ref_count.h as written is to
| |
| 259 return; | 265 return; |
| 266 } | |
| 260 | 267 |
| 261 auto worker_iter = | 268 auto worker_iter = |
| 262 std::find_if(workers_.begin(), workers_.end(), | 269 std::find_if(workers_.begin(), workers_.end(), |
| 263 [worker](const scoped_refptr<SchedulerWorker>& candidate) { | 270 [worker](const scoped_refptr<SchedulerWorker>& candidate) { |
| 264 return candidate.get() == worker; | 271 return candidate.get() == worker; |
| 265 }); | 272 }); |
| 266 DCHECK(worker_iter != workers_.end()); | 273 DCHECK(worker_iter != workers_.end()); |
| 267 worker_to_destroy = std::move(*worker_iter); | 274 worker_to_destroy = std::move(*worker_iter); |
| 268 workers_.erase(worker_iter); | 275 workers_.erase(worker_iter); |
| 269 } | 276 } |
| 270 worker_to_destroy->Cleanup(); | 277 worker_to_destroy->Cleanup(); |
| 271 } | 278 } |
| 272 | 279 |
| 273 } // namespace internal | 280 } // namespace internal |
| 274 } // namespace base | 281 } // namespace base |
| OLD | NEW |