OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
| 6 |
| 7 #include <algorithm> |
| 8 #include <memory> |
| 9 #include <string> |
| 10 |
| 11 #include "base/bind.h" |
| 12 #include "base/callback.h" |
| 13 #include "base/memory/ptr_util.h" |
| 14 #include "base/single_thread_task_runner.h" |
| 15 #include "base/strings/stringprintf.h" |
| 16 #include "base/synchronization/atomic_flag.h" |
| 17 #include "base/task_scheduler/delayed_task_manager.h" |
| 18 #include "base/task_scheduler/scheduler_worker.h" |
| 19 #include "base/task_scheduler/sequence.h" |
| 20 #include "base/task_scheduler/task.h" |
| 21 #include "base/task_scheduler/task_tracker.h" |
| 22 #include "base/task_scheduler/task_traits.h" |
| 23 #include "base/threading/platform_thread.h" |
| 24 #include "base/time/time.h" |
| 25 |
| 26 namespace base { |
| 27 namespace internal { |
| 28 |
| 29 namespace { |
| 30 |
| 31 // Allows for checking the PlatformThread::CurrentRef() against a set |
| 32 // PlatformThreadRef atomically without using locks. |
| 33 class AtomicThreadRefChecker { |
| 34 public: |
| 35 AtomicThreadRefChecker() = default; |
| 36 ~AtomicThreadRefChecker() = default; |
| 37 |
| 38 void Set() { |
| 39 thread_ref_ = PlatformThread::CurrentRef(); |
| 40 is_set_.Set(); |
| 41 } |
| 42 |
| 43 bool IsCurrentThreadSameAsSetThread() { |
| 44 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef(); |
| 45 } |
| 46 |
| 47 private: |
| 48 AtomicFlag is_set_; |
| 49 PlatformThreadRef thread_ref_; |
| 50 |
| 51 DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker); |
| 52 }; |
| 53 |
| 54 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
| 55 public: |
| 56 SchedulerWorkerDelegate(const std::string& thread_name) |
| 57 : thread_name_(thread_name) {} |
| 58 |
| 59 // SchedulerWorker::Delegate: |
| 60 void OnMainEntry(SchedulerWorker* worker) override { |
| 61 thread_ref_checker_.Set(); |
| 62 PlatformThread::SetName(thread_name_); |
| 63 } |
| 64 |
| 65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
| 66 AutoSchedulerLock auto_lock(sequence_lock_); |
| 67 bool has_work = has_work_; |
| 68 has_work_ = false; |
| 69 return has_work ? sequence_ : nullptr; |
| 70 } |
| 71 |
| 72 void DidRunTask() override {} |
| 73 |
| 74 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
| 75 AutoSchedulerLock auto_lock(sequence_lock_); |
| 76 // We've shut down, so no-op this work request. Any sequence cleanup will |
| 77 // occur in the caller's context. |
| 78 if (!sequence_) |
| 79 return; |
| 80 |
| 81 DCHECK_EQ(sequence, sequence_); |
| 82 has_work_ = true; |
| 83 } |
| 84 |
| 85 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
| 86 |
| 87 bool CanDetach(SchedulerWorker* worker) override { return false; } |
| 88 |
| 89 void OnDetach() override { NOTREACHED(); } |
| 90 |
| 91 bool RunsTasksOnCurrentThread() { |
| 92 // We check the thread ref instead of the sequence for the benefit of COM |
| 93 // callbacks which may execute without a sequence context. |
| 94 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
| 95 } |
| 96 |
| 97 void OnMainExit() override { |
| 98 // Move |sequence_| to |local_sequence| so that if we have the last |
| 99 // reference to the sequence we don't destroy it (and its tasks) within |
| 100 // |sequence_lock_|. |
| 101 scoped_refptr<Sequence> local_sequence; |
| 102 { |
| 103 AutoSchedulerLock auto_lock(sequence_lock_); |
| 104 // To reclaim skipped tasks on shutdown, we null out the sequence to allow |
| 105 // the tasks to destroy themselves. |
| 106 local_sequence = std::move(sequence_); |
| 107 } |
| 108 } |
| 109 |
| 110 // SchedulerWorkerDelegate: |
| 111 |
| 112 // Consumers should release their sequence reference as soon as possible to |
| 113 // ensure timely cleanup for general shutdown. |
| 114 scoped_refptr<Sequence> sequence() { |
| 115 AutoSchedulerLock auto_lock(sequence_lock_); |
| 116 return sequence_; |
| 117 } |
| 118 |
| 119 private: |
| 120 const std::string thread_name_; |
| 121 |
| 122 // Synchronizes access to |sequence_| and |has_work_|. |
| 123 SchedulerLock sequence_lock_; |
| 124 scoped_refptr<Sequence> sequence_ = new Sequence; |
| 125 bool has_work_ = false; |
| 126 |
| 127 AtomicThreadRefChecker thread_ref_checker_; |
| 128 |
| 129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
| 130 }; |
| 131 |
| 132 } // namespace |
| 133 |
| 134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| 135 : public SingleThreadTaskRunner { |
| 136 public: |
| 137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
| 138 // lifetime of a dedicated |worker| for |traits|. |
| 139 SchedulerSingleThreadTaskRunner( |
| 140 SchedulerSingleThreadTaskRunnerManager* const outer, |
| 141 const TaskTraits& traits, |
| 142 SchedulerWorker* worker) |
| 143 : outer_(outer), traits_(traits), worker_(worker) { |
| 144 DCHECK(outer_); |
| 145 DCHECK(worker_); |
| 146 } |
| 147 |
| 148 // SingleThreadTaskRunner: |
| 149 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 150 const Closure& closure, |
| 151 TimeDelta delay) override { |
| 152 auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
| 153 task->single_thread_task_runner_ref = this; |
| 154 |
| 155 if (!outer_->task_tracker_->WillPostTask(task.get())) |
| 156 return false; |
| 157 |
| 158 if (task->delayed_run_time.is_null()) { |
| 159 PostTaskNow(std::move(task)); |
| 160 } else { |
| 161 outer_->delayed_task_manager_->AddDelayedTask( |
| 162 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, |
| 163 Unretained(this))); |
| 164 } |
| 165 return true; |
| 166 } |
| 167 |
| 168 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 169 const Closure& closure, |
| 170 base::TimeDelta delay) override { |
| 171 // Tasks are never nested within the task scheduler. |
| 172 return PostDelayedTask(from_here, closure, delay); |
| 173 } |
| 174 |
| 175 bool RunsTasksOnCurrentThread() const override { |
| 176 return GetDelegate()->RunsTasksOnCurrentThread(); |
| 177 } |
| 178 |
| 179 private: |
| 180 ~SchedulerSingleThreadTaskRunner() override { |
| 181 outer_->UnregisterSchedulerWorker(worker_); |
| 182 } |
| 183 |
| 184 void PostTaskNow(std::unique_ptr<Task> task) { |
| 185 scoped_refptr<Sequence> sequence = GetDelegate()->sequence(); |
| 186 // If |sequence| is null, then the thread is effectively gone (either |
| 187 // shutdown or joined). |
| 188 if (!sequence) |
| 189 return; |
| 190 |
| 191 const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| 192 if (sequence_was_empty) { |
| 193 GetDelegate()->ReEnqueueSequence(std::move(sequence)); |
| 194 worker_->WakeUp(); |
| 195 } |
| 196 } |
| 197 |
| 198 SchedulerWorkerDelegate* GetDelegate() const { |
| 199 return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| 200 } |
| 201 |
| 202 SchedulerSingleThreadTaskRunnerManager* const outer_; |
| 203 const TaskTraits traits_; |
| 204 SchedulerWorker* const worker_; |
| 205 |
| 206 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| 207 }; |
| 208 |
| 209 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
| 210 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
| 211 const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
| 212 worker_pool_index_for_traits_callback, |
| 213 TaskTracker* task_tracker, |
| 214 DelayedTaskManager* delayed_task_manager) |
| 215 : worker_pool_params_vector_(worker_pool_params_vector), |
| 216 worker_pool_index_for_traits_callback_( |
| 217 worker_pool_index_for_traits_callback), |
| 218 task_tracker_(task_tracker), |
| 219 delayed_task_manager_(delayed_task_manager) { |
| 220 DCHECK_GT(worker_pool_params_vector_.size(), 0U); |
| 221 DCHECK(worker_pool_index_for_traits_callback_); |
| 222 DCHECK(task_tracker_); |
| 223 DCHECK(delayed_task_manager_); |
| 224 } |
| 225 |
| 226 SchedulerSingleThreadTaskRunnerManager:: |
| 227 ~SchedulerSingleThreadTaskRunnerManager() { |
| 228 #if DCHECK_IS_ON() |
| 229 size_t workers_unregistered_during_join = |
| 230 subtle::NoBarrier_Load(&workers_unregistered_during_join_); |
| 231 DCHECK_EQ(workers_unregistered_during_join, workers_.size()) |
| 232 << "There cannot be outstanding SingleThreadTaskRunners upon destruction" |
| 233 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; |
| 234 #endif |
| 235 } |
| 236 |
| 237 scoped_refptr<SingleThreadTaskRunner> |
| 238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
| 239 const TaskTraits& traits) { |
| 240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| 241 DCHECK_LT(index, worker_pool_params_vector_.size()); |
| 242 return new SchedulerSingleThreadTaskRunner( |
| 243 this, traits, |
| 244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); |
| 245 } |
| 246 |
| 247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { |
| 248 decltype(workers_) local_workers; |
| 249 { |
| 250 AutoSchedulerLock auto_lock(workers_lock_); |
| 251 local_workers = std::move(workers_); |
| 252 } |
| 253 |
| 254 for (const auto& worker : local_workers) |
| 255 worker->JoinForTesting(); |
| 256 |
| 257 { |
| 258 AutoSchedulerLock auto_lock(workers_lock_); |
| 259 DCHECK(workers_.empty()) |
| 260 << "New worker(s) unexpectedly registered during join."; |
| 261 workers_ = std::move(local_workers); |
| 262 } |
| 263 } |
| 264 |
| 265 SchedulerWorker* |
| 266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( |
| 267 const SchedulerWorkerPoolParams& params) { |
| 268 AutoSchedulerLock auto_lock(workers_lock_); |
| 269 int id = next_worker_id_++; |
| 270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( |
| 271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); |
| 272 workers_.emplace_back(SchedulerWorker::Create( |
| 273 params.priority_hint(), std::move(delegate), task_tracker_, |
| 274 SchedulerWorker::InitialState::DETACHED)); |
| 275 return workers_.back().get(); |
| 276 } |
| 277 |
| 278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| 279 SchedulerWorker* worker) { |
| 280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
| 281 // |workers_lock_|. |
| 282 scoped_refptr<SchedulerWorker> worker_to_destroy; |
| 283 { |
| 284 AutoSchedulerLock auto_lock(workers_lock_); |
| 285 |
| 286 // We might be joining, so record that a worker was unregistered for |
| 287 // verification at destruction. |
| 288 if (workers_.empty()) { |
| 289 #if DCHECK_IS_ON() |
| 290 subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); |
| 291 #endif |
| 292 return; |
| 293 } |
| 294 |
| 295 auto worker_iter = |
| 296 std::find_if(workers_.begin(), workers_.end(), |
| 297 [worker](const scoped_refptr<SchedulerWorker>& candidate) { |
| 298 return candidate.get() == worker; |
| 299 }); |
| 300 DCHECK(worker_iter != workers_.end()); |
| 301 worker_to_destroy = std::move(*worker_iter); |
| 302 workers_.erase(worker_iter); |
| 303 } |
| 304 worker_to_destroy->Cleanup(); |
| 305 } |
| 306 |
| 307 } // namespace internal |
| 308 } // namespace base |
OLD | NEW |