OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
| 6 |
| 7 #include <algorithm> |
| 8 #include <memory> |
| 9 #include <string> |
| 10 |
| 11 #include "base/bind.h" |
| 12 #include "base/callback.h" |
| 13 #include "base/memory/ptr_util.h" |
| 14 #include "base/single_thread_task_runner.h" |
| 15 #include "base/strings/stringprintf.h" |
| 16 #include "base/synchronization/atomic_flag.h" |
| 17 #include "base/task_scheduler/delayed_task_manager.h" |
| 18 #include "base/task_scheduler/scheduler_worker.h" |
| 19 #include "base/task_scheduler/sequence.h" |
| 20 #include "base/task_scheduler/task.h" |
| 21 #include "base/task_scheduler/task_tracker.h" |
| 22 #include "base/task_scheduler/task_traits.h" |
| 23 #include "base/threading/platform_thread.h" |
| 24 #include "base/time/time.h" |
| 25 |
| 26 namespace base { |
| 27 namespace internal { |
| 28 |
| 29 namespace { |
| 30 |
| 31 // Allows for checking the PlatformThread::CurrentRef() against a set |
| 32 // PlatformThreadRef atomically without using locks. |
| 33 class AtomicThreadRefChecker { |
| 34 public: |
| 35 AtomicThreadRefChecker() = default; |
| 36 ~AtomicThreadRefChecker() = default; |
| 37 |
| 38 void Set() { |
| 39 thread_ref_ = PlatformThread::CurrentRef(); |
| 40 is_set_.Set(); |
| 41 } |
| 42 |
| 43 bool IsCurrentThreadSameAsSetThread() { |
| 44 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef(); |
| 45 } |
| 46 |
| 47 private: |
| 48 AtomicFlag is_set_; |
| 49 PlatformThreadRef thread_ref_; |
| 50 |
| 51 DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker); |
| 52 }; |
| 53 |
| 54 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
| 55 public: |
| 56 SchedulerWorkerDelegate(const std::string& thread_name) |
| 57 : thread_name_(thread_name) {} |
| 58 |
| 59 // SchedulerWorker::Delegate: |
| 60 void OnMainEntry(SchedulerWorker* worker) override { |
| 61 thread_ref_checker_.Set(); |
| 62 PlatformThread::SetName(thread_name_); |
| 63 } |
| 64 |
| 65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
| 66 AutoSchedulerLock auto_lock(sequence_lock_); |
| 67 return std::move(sequence_); |
| 68 } |
| 69 |
| 70 void DidRunTask() override {} |
| 71 |
| 72 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
| 73 AutoSchedulerLock auto_lock(sequence_lock_); |
| 74 DCHECK(!sequence_); |
| 75 sequence_ = std::move(sequence); |
| 76 } |
| 77 |
| 78 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
| 79 |
| 80 bool CanDetach(SchedulerWorker* worker) override { return false; } |
| 81 |
| 82 void OnDetach() override { NOTREACHED(); } |
| 83 |
| 84 bool RunsTasksOnCurrentThread() { |
| 85 return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); |
| 86 } |
| 87 |
| 88 private: |
| 89 const std::string thread_name_; |
| 90 |
| 91 // Synchronizes access to |sequence_| and handles the fact that |
| 92 // ReEnqueueSequence() is called on both the worker thread for reenqueuing |
| 93 // the sequence and off of the worker thread to seed the sequence for |
| 94 // GetWork(). |
| 95 SchedulerLock sequence_lock_; |
| 96 scoped_refptr<Sequence> sequence_; |
| 97 |
| 98 AtomicThreadRefChecker thread_ref_checker_; |
| 99 |
| 100 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
| 101 }; |
| 102 |
| 103 } // namespace |
| 104 |
| 105 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| 106 : public SingleThreadTaskRunner { |
| 107 public: |
| 108 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
| 109 // lifetime of a dedicated |worker| for |traits|. |
| 110 SchedulerSingleThreadTaskRunner( |
| 111 SchedulerSingleThreadTaskRunnerManager* const outer, |
| 112 const TaskTraits& traits, |
| 113 SchedulerWorker* worker) |
| 114 : outer_(outer), traits_(traits), worker_(worker) { |
| 115 DCHECK(outer_); |
| 116 DCHECK(worker_); |
| 117 } |
| 118 |
| 119 // SingleThreadTaskRunner: |
| 120 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 121 const Closure& closure, |
| 122 TimeDelta delay) override; |
| 123 |
| 124 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 125 const Closure& closure, |
| 126 base::TimeDelta delay) override { |
| 127 // Tasks are never nested within the task scheduler. |
| 128 return PostDelayedTask(from_here, closure, delay); |
| 129 } |
| 130 |
| 131 bool RunsTasksOnCurrentThread() const override { |
| 132 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| 133 return delegate->RunsTasksOnCurrentThread(); |
| 134 } |
| 135 |
| 136 private: |
| 137 ~SchedulerSingleThreadTaskRunner() override { |
| 138 outer_->UnregisterSchedulerWorker(worker_); |
| 139 } |
| 140 |
| 141 void PostTaskNow(std::unique_ptr<Task> task); |
| 142 |
| 143 // Sequence for all Tasks posted through this TaskRunner. |
| 144 const scoped_refptr<Sequence> sequence_ = new Sequence; |
| 145 |
| 146 SchedulerSingleThreadTaskRunnerManager* const outer_; |
| 147 const TaskTraits traits_; |
| 148 SchedulerWorker* const worker_; |
| 149 |
| 150 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| 151 }; |
| 152 |
| 153 bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
| 154 PostDelayedTask(const tracked_objects::Location& from_here, |
| 155 const Closure& closure, |
| 156 TimeDelta delay) { |
| 157 auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
| 158 task->single_thread_task_runner_ref = this; |
| 159 |
| 160 if (!outer_->task_tracker_->WillPostTask(task.get())) |
| 161 return false; |
| 162 |
| 163 if (task->delayed_run_time.is_null()) { |
| 164 PostTaskNow(std::move(task)); |
| 165 } else { |
| 166 outer_->delayed_task_manager_->AddDelayedTask( |
| 167 std::move(task), |
| 168 Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); |
| 169 } |
| 170 return true; |
| 171 } |
| 172 |
| 173 void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
| 174 PostTaskNow(std::unique_ptr<Task> task) { |
| 175 const bool sequence_was_empty = sequence_->PushTask(std::move(task)); |
| 176 if (sequence_was_empty) { |
| 177 auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| 178 delegate->ReEnqueueSequence(sequence_); |
| 179 worker_->WakeUp(); |
| 180 } |
| 181 } |
| 182 |
| 183 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
| 184 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
| 185 const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
| 186 worker_pool_index_for_traits_callback, |
| 187 TaskTracker* task_tracker, |
| 188 DelayedTaskManager* delayed_task_manager) |
| 189 : worker_pool_params_vector_(worker_pool_params_vector), |
| 190 worker_pool_index_for_traits_callback_( |
| 191 worker_pool_index_for_traits_callback), |
| 192 task_tracker_(task_tracker), |
| 193 delayed_task_manager_(delayed_task_manager) { |
| 194 DCHECK_GT(worker_pool_params_vector_.size(), 0U); |
| 195 DCHECK(worker_pool_index_for_traits_callback_); |
| 196 DCHECK(task_tracker_); |
| 197 DCHECK(delayed_task_manager_); |
| 198 } |
| 199 |
| 200 SchedulerSingleThreadTaskRunnerManager:: |
| 201 ~SchedulerSingleThreadTaskRunnerManager() { |
| 202 DCHECK(workers_.empty()) << "SchedulerSingleThreadTaskRunners must outlive " |
| 203 "SchedulerSingleThreadTaskRunnerManager"; |
| 204 } |
| 205 |
| 206 scoped_refptr<SingleThreadTaskRunner> |
| 207 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
| 208 const TaskTraits& traits) { |
| 209 size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| 210 DCHECK_LT(index, worker_pool_params_vector_.size()); |
| 211 return new SchedulerSingleThreadTaskRunner( |
| 212 this, traits, |
| 213 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); |
| 214 } |
| 215 |
| 216 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { |
| 217 decltype(workers_) local_workers; |
| 218 { |
| 219 AutoSchedulerLock auto_lock(workers_lock_); |
| 220 local_workers = std::move(workers_); |
| 221 } |
| 222 |
| 223 for (const auto& worker : local_workers) |
| 224 worker->JoinForTesting(); |
| 225 |
| 226 { |
| 227 AutoSchedulerLock auto_lock(workers_lock_); |
| 228 DCHECK(workers_.empty()) |
| 229 << "New worker(s) unexpectedly registered during join."; |
| 230 workers_ = std::move(local_workers); |
| 231 } |
| 232 } |
| 233 |
| 234 SchedulerWorker* |
| 235 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( |
| 236 const SchedulerWorkerPoolParams& params) { |
| 237 AutoSchedulerLock auto_lock(workers_lock_); |
| 238 int id = next_worker_id_++; |
| 239 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( |
| 240 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); |
| 241 workers_.emplace_back(SchedulerWorker::Create( |
| 242 params.priority_hint(), std::move(delegate), task_tracker_, |
| 243 SchedulerWorker::InitialState::DETACHED)); |
| 244 return workers_.back().get(); |
| 245 } |
| 246 |
| 247 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| 248 SchedulerWorker* worker) { |
| 249 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
| 250 // |workers_lock_|. |
| 251 scoped_refptr<SchedulerWorker> worker_to_destroy; |
| 252 { |
| 253 AutoSchedulerLock auto_lock(workers_lock_); |
| 254 |
| 255 // We might be joining, so no-op this if |workers_| is empty. |
| 256 if (workers_.empty()) |
| 257 return; |
| 258 |
| 259 auto worker_iter = |
| 260 std::find_if(workers_.begin(), workers_.end(), |
| 261 [worker](const scoped_refptr<SchedulerWorker>& candidate) { |
| 262 return candidate.get() == worker; |
| 263 }); |
| 264 DCHECK(worker_iter != workers_.end()); |
| 265 worker_to_destroy = std::move(*worker_iter); |
| 266 workers_.erase(worker_iter); |
| 267 } |
| 268 worker_to_destroy->Cleanup(); |
| 269 } |
| 270 |
| 271 } // namespace internal |
| 272 } // namespace base |
OLD | NEW |