Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/atomicops.h" | 12 #include "base/atomicops.h" |
| 13 #include "base/bind.h" | 13 #include "base/bind.h" |
| 14 #include "base/bind_helpers.h" | 14 #include "base/bind_helpers.h" |
| 15 #include "base/lazy_instance.h" | 15 #include "base/lazy_instance.h" |
| 16 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
| 17 #include "base/metrics/histogram.h" | 17 #include "base/metrics/histogram.h" |
| 18 #include "base/sequence_token.h" | 18 #include "base/sequence_token.h" |
| 19 #include "base/sequenced_task_runner.h" | 19 #include "base/sequenced_task_runner.h" |
| 20 #include "base/single_thread_task_runner.h" | |
| 21 #include "base/strings/stringprintf.h" | 20 #include "base/strings/stringprintf.h" |
| 22 #include "base/task_runner.h" | 21 #include "base/task_runner.h" |
| 23 #include "base/task_scheduler/delayed_task_manager.h" | 22 #include "base/task_scheduler/delayed_task_manager.h" |
| 24 #include "base/task_scheduler/scheduler_worker_pool_params.h" | 23 #include "base/task_scheduler/scheduler_worker_pool_params.h" |
| 25 #include "base/task_scheduler/task_tracker.h" | 24 #include "base/task_scheduler/task_tracker.h" |
| 26 #include "base/task_scheduler/task_traits.h" | 25 #include "base/task_scheduler/task_traits.h" |
| 27 #include "base/threading/platform_thread.h" | 26 #include "base/threading/platform_thread.h" |
| 28 #include "base/threading/thread_local.h" | 27 #include "base/threading/thread_local.h" |
| 29 #include "base/threading/thread_restrictions.h" | 28 #include "base/threading/thread_restrictions.h" |
| 30 | 29 |
| (...skipping 26 matching lines...) Expand all Loading... | |
| 57 DCHECK(worker_pool_); | 56 DCHECK(worker_pool_); |
| 58 } | 57 } |
| 59 | 58 |
| 60 // TaskRunner: | 59 // TaskRunner: |
| 61 bool PostDelayedTask(const tracked_objects::Location& from_here, | 60 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 62 const Closure& closure, | 61 const Closure& closure, |
| 63 TimeDelta delay) override { | 62 TimeDelta delay) override { |
| 64 // Post the task as part of a one-off single-task Sequence. | 63 // Post the task as part of a one-off single-task Sequence. |
| 65 return worker_pool_->PostTaskWithSequence( | 64 return worker_pool_->PostTaskWithSequence( |
| 66 MakeUnique<Task>(from_here, closure, traits_, delay), | 65 MakeUnique<Task>(from_here, closure, traits_, delay), |
| 67 make_scoped_refptr(new Sequence), nullptr); | 66 make_scoped_refptr(new Sequence)); |
| 68 } | 67 } |
| 69 | 68 |
| 70 bool RunsTasksOnCurrentThread() const override { | 69 bool RunsTasksOnCurrentThread() const override { |
| 71 return tls_current_worker_pool.Get().Get() == worker_pool_; | 70 return tls_current_worker_pool.Get().Get() == worker_pool_; |
| 72 } | 71 } |
| 73 | 72 |
| 74 private: | 73 private: |
| 75 ~SchedulerParallelTaskRunner() override = default; | 74 ~SchedulerParallelTaskRunner() override = default; |
| 76 | 75 |
| 77 const TaskTraits traits_; | 76 const TaskTraits traits_; |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 93 } | 92 } |
| 94 | 93 |
| 95 // SequencedTaskRunner: | 94 // SequencedTaskRunner: |
| 96 bool PostDelayedTask(const tracked_objects::Location& from_here, | 95 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 97 const Closure& closure, | 96 const Closure& closure, |
| 98 TimeDelta delay) override { | 97 TimeDelta delay) override { |
| 99 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 98 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| 100 task->sequenced_task_runner_ref = this; | 99 task->sequenced_task_runner_ref = this; |
| 101 | 100 |
| 102 // Post the task as part of |sequence_|. | 101 // Post the task as part of |sequence_|. |
| 103 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 102 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); |
| 104 nullptr); | |
| 105 } | 103 } |
| 106 | 104 |
| 107 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 105 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 108 const Closure& closure, | 106 const Closure& closure, |
| 109 base::TimeDelta delay) override { | 107 base::TimeDelta delay) override { |
| 110 // Tasks are never nested within the task scheduler. | 108 // Tasks are never nested within the task scheduler. |
| 111 return PostDelayedTask(from_here, closure, delay); | 109 return PostDelayedTask(from_here, closure, delay); |
| 112 } | 110 } |
| 113 | 111 |
| 114 bool RunsTasksOnCurrentThread() const override { | 112 bool RunsTasksOnCurrentThread() const override { |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 134 const SchedulerWorker* worker) { | 132 const SchedulerWorker* worker) { |
| 135 auto it = std::find_if(workers.begin(), workers.end(), | 133 auto it = std::find_if(workers.begin(), workers.end(), |
| 136 [worker](const scoped_refptr<SchedulerWorker>& i) { | 134 [worker](const scoped_refptr<SchedulerWorker>& i) { |
| 137 return i.get() == worker; | 135 return i.get() == worker; |
| 138 }); | 136 }); |
| 139 return it != workers.end(); | 137 return it != workers.end(); |
| 140 } | 138 } |
| 141 | 139 |
| 142 } // namespace | 140 } // namespace |
| 143 | 141 |
| 144 // TODO(http://crbug.com/694823): Remove this and supporting framework. | |
| 145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | |
| 146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : | |
| 147 public SingleThreadTaskRunner { | |
| 148 public: | |
| 149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | |
| 150 // tasks so long as |worker_pool| and |worker| are alive. | |
| 151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | |
| 152 // and |worker|. | |
| 153 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
| 154 SchedulerWorkerPool* worker_pool, | |
| 155 SchedulerWorker* worker); | |
| 156 | |
| 157 // SingleThreadTaskRunner: | |
| 158 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
| 159 const Closure& closure, | |
| 160 TimeDelta delay) override { | |
| 161 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | |
| 162 task->single_thread_task_runner_ref = this; | |
| 163 | |
| 164 // Post the task to be executed by |worker_| as part of |sequence_|. | |
| 165 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | |
| 166 worker_); | |
| 167 } | |
| 168 | |
| 169 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
| 170 const Closure& closure, | |
| 171 base::TimeDelta delay) override { | |
| 172 // Tasks are never nested within the task scheduler. | |
| 173 return PostDelayedTask(from_here, closure, delay); | |
| 174 } | |
| 175 | |
| 176 bool RunsTasksOnCurrentThread() const override { | |
| 177 // Even though this is a SingleThreadTaskRunner, test the actual sequence | |
| 178 // instead of the assigned worker so that another task randomly assigned | |
| 179 // to the same worker doesn't return true by happenstance. | |
| 180 return sequence_->token() == SequenceToken::GetForCurrentThread(); | |
| 181 } | |
| 182 | |
| 183 private: | |
| 184 ~SchedulerSingleThreadTaskRunner() override; | |
| 185 | |
| 186 // Sequence for all Tasks posted through this TaskRunner. | |
| 187 const scoped_refptr<Sequence> sequence_ = new Sequence; | |
| 188 | |
| 189 const TaskTraits traits_; | |
| 190 SchedulerWorkerPool* const worker_pool_; | |
| 191 SchedulerWorker* const worker_; | |
| 192 | |
| 193 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
| 194 }; | |
| 195 | |
| 196 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 142 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| 197 : public SchedulerWorker::Delegate { | 143 : public SchedulerWorker::Delegate { |
| 198 public: | 144 public: |
| 199 // |outer| owns the worker for which this delegate is constructed. | 145 // |outer| owns the worker for which this delegate is constructed. |
| 200 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 146 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
| 201 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 147 // called. |index| will be appended to the pool name to label the underlying |
| 202 // PriorityQueue whose transactions may overlap with the worker's | 148 // worker threads. |
| 203 // single-threaded PriorityQueue's transactions. |index| will be appended to | |
| 204 // the pool name to label the underlying worker threads. | |
| 205 SchedulerWorkerDelegateImpl( | 149 SchedulerWorkerDelegateImpl( |
| 206 SchedulerWorkerPoolImpl* outer, | 150 SchedulerWorkerPoolImpl* outer, |
| 207 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 151 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 208 const PriorityQueue* shared_priority_queue, | |
| 209 int index); | 152 int index); |
| 210 ~SchedulerWorkerDelegateImpl() override; | 153 ~SchedulerWorkerDelegateImpl() override; |
| 211 | 154 |
| 212 PriorityQueue* single_threaded_priority_queue() { | |
| 213 return &single_threaded_priority_queue_; | |
| 214 } | |
| 215 | |
| 216 // SchedulerWorker::Delegate: | 155 // SchedulerWorker::Delegate: |
| 217 void OnMainEntry(SchedulerWorker* worker) override; | 156 void OnMainEntry(SchedulerWorker* worker) override; |
| 218 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 157 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
| 219 void DidRunTask() override; | 158 void DidRunTask() override; |
| 220 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 159 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
| 221 TimeDelta GetSleepTimeout() override; | 160 TimeDelta GetSleepTimeout() override; |
| 222 bool CanDetach(SchedulerWorker* worker) override; | 161 bool CanDetach(SchedulerWorker* worker) override; |
| 223 void OnDetach() override; | 162 void OnDetach() override; |
| 224 | 163 |
| 225 void RegisterSingleThreadTaskRunner() { | |
| 226 // No barrier as barriers only affect sequential consistency which is | |
| 227 // irrelevant in a single variable use case (they don't force an immediate | |
| 228 // flush anymore than atomics do by default). | |
| 229 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); | |
| 230 } | |
| 231 | |
| 232 void UnregisterSingleThreadTaskRunner() { | |
| 233 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); | |
| 234 } | |
| 235 | |
| 236 private: | 164 private: |
| 237 SchedulerWorkerPoolImpl* outer_; | 165 SchedulerWorkerPoolImpl* outer_; |
| 238 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 166 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| 239 | 167 |
| 240 // Single-threaded PriorityQueue for the worker. | |
| 241 PriorityQueue single_threaded_priority_queue_; | |
| 242 | |
| 243 // True if the last Sequence returned by GetWork() was extracted from | |
| 244 // |single_threaded_priority_queue_|. | |
| 245 bool last_sequence_is_single_threaded_ = false; | |
| 246 | |
| 247 // Time of the last detach. | 168 // Time of the last detach. |
| 248 TimeTicks last_detach_time_; | 169 TimeTicks last_detach_time_; |
| 249 | 170 |
| 250 // Time when GetWork() first returned nullptr. | 171 // Time when GetWork() first returned nullptr. |
| 251 TimeTicks idle_start_time_; | 172 TimeTicks idle_start_time_; |
| 252 | 173 |
| 253 // Indicates whether the last call to GetWork() returned nullptr. | 174 // Indicates whether the last call to GetWork() returned nullptr. |
| 254 bool last_get_work_returned_nullptr_ = false; | 175 bool last_get_work_returned_nullptr_ = false; |
| 255 | 176 |
| 256 // Indicates whether the SchedulerWorker was detached since the last call to | 177 // Indicates whether the SchedulerWorker was detached since the last call to |
| 257 // GetWork(). | 178 // GetWork(). |
| 258 bool did_detach_since_last_get_work_ = false; | 179 bool did_detach_since_last_get_work_ = false; |
| 259 | 180 |
| 260 // Number of tasks executed since the last time the | 181 // Number of tasks executed since the last time the |
| 261 // TaskScheduler.NumTasksBetweenWaits histogram was recorded. | 182 // TaskScheduler.NumTasksBetweenWaits histogram was recorded. |
| 262 size_t num_tasks_since_last_wait_ = 0; | 183 size_t num_tasks_since_last_wait_ = 0; |
| 263 | 184 |
| 264 // Number of tasks executed since the last time the | 185 // Number of tasks executed since the last time the |
| 265 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. | 186 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. |
| 266 size_t num_tasks_since_last_detach_ = 0; | 187 size_t num_tasks_since_last_detach_ = 0; |
| 267 | 188 |
| 268 subtle::Atomic32 num_single_threaded_runners_ = 0; | |
| 269 | |
| 270 const int index_; | 189 const int index_; |
| 271 | 190 |
| 272 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 191 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| 273 }; | 192 }; |
| 274 | 193 |
| 275 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 194 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
| 276 // SchedulerWorkerPool should never be deleted in production unless its | 195 // SchedulerWorkerPool should never be deleted in production unless its |
| 277 // initialization failed. | 196 // initialization failed. |
| 278 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 197 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
| 279 } | 198 } |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 295 const TaskTraits& traits) { | 214 const TaskTraits& traits) { |
| 296 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 215 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
| 297 } | 216 } |
| 298 | 217 |
| 299 scoped_refptr<SequencedTaskRunner> | 218 scoped_refptr<SequencedTaskRunner> |
| 300 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 219 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
| 301 const TaskTraits& traits) { | 220 const TaskTraits& traits) { |
| 302 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 221 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| 303 } | 222 } |
| 304 | 223 |
| 305 scoped_refptr<SingleThreadTaskRunner> | |
| 306 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( | |
| 307 const TaskTraits& traits) { | |
| 308 // TODO(fdoray): Find a way to take load into account when assigning a | |
| 309 // SchedulerWorker to a SingleThreadTaskRunner. | |
| 310 size_t worker_index; | |
| 311 { | |
| 312 AutoSchedulerLock auto_lock(next_worker_index_lock_); | |
| 313 worker_index = next_worker_index_; | |
| 314 next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); | |
| 315 } | |
| 316 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | |
| 317 traits, this, workers_[worker_index].get())); | |
| 318 } | |
| 319 | |
| 320 void SchedulerWorkerPoolImpl::ReEnqueueSequence( | 224 void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
| 321 scoped_refptr<Sequence> sequence, | 225 scoped_refptr<Sequence> sequence, |
| 322 const SequenceSortKey& sequence_sort_key) { | 226 const SequenceSortKey& sequence_sort_key) { |
| 323 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | 227 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
| 324 sequence_sort_key); | 228 sequence_sort_key); |
| 325 | 229 |
| 326 // The thread calling this method just ran a Task from |sequence| and will | 230 // The thread calling this method just ran a Task from |sequence| and will |
| 327 // soon try to get another Sequence from which to run a Task. If the thread | 231 // soon try to get another Sequence from which to run a Task. If the thread |
| 328 // belongs to this pool, it will get that Sequence from | 232 // belongs to this pool, it will get that Sequence from |
| 329 // |shared_priority_queue_|. When that's the case, there is no need to wake up | 233 // |shared_priority_queue_|. When that's the case, there is no need to wake up |
| 330 // another worker after |sequence| is inserted in |shared_priority_queue_|. If | 234 // another worker after |sequence| is inserted in |shared_priority_queue_|. If |
| 331 // we did wake up another worker, we would waste resources by having more | 235 // we did wake up another worker, we would waste resources by having more |
| 332 // workers trying to get a Sequence from |shared_priority_queue_| than the | 236 // workers trying to get a Sequence from |shared_priority_queue_| than the |
| 333 // number of Sequences in it. | 237 // number of Sequences in it. |
| 334 if (tls_current_worker_pool.Get().Get() != this) | 238 if (tls_current_worker_pool.Get().Get() != this) |
| 335 WakeUpOneWorker(); | 239 WakeUpOneWorker(); |
| 336 } | 240 } |
| 337 | 241 |
| 338 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( | 242 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
| 339 std::unique_ptr<Task> task, | 243 std::unique_ptr<Task> task, |
| 340 scoped_refptr<Sequence> sequence, | 244 scoped_refptr<Sequence> sequence) { |
| 341 SchedulerWorker* worker) { | |
| 342 DCHECK(task); | 245 DCHECK(task); |
| 343 DCHECK(sequence); | 246 DCHECK(sequence); |
| 344 DCHECK(!worker || ContainsWorker(workers_, worker)); | |
| 345 | 247 |
| 346 if (!task_tracker_->WillPostTask(task.get())) | 248 if (!task_tracker_->WillPostTask(task.get())) |
| 347 return false; | 249 return false; |
| 348 | 250 |
| 349 if (task->delayed_run_time.is_null()) { | 251 if (task->delayed_run_time.is_null()) { |
| 350 PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); | 252 PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
| 351 } else { | 253 } else { |
| 352 delayed_task_manager_->AddDelayedTask( | 254 delayed_task_manager_->AddDelayedTask( |
| 353 std::move(task), | 255 std::move(task), |
| 354 Bind( | 256 Bind( |
| 355 [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker, | 257 [](scoped_refptr<Sequence> sequence, |
| 356 SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { | 258 SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { |
| 357 worker_pool->PostTaskWithSequenceNow(std::move(task), | 259 worker_pool->PostTaskWithSequenceNow(std::move(task), |
| 358 std::move(sequence), worker); | 260 std::move(sequence)); |
| 359 }, | 261 }, |
| 360 std::move(sequence), Unretained(worker), Unretained(this))); | 262 std::move(sequence), Unretained(this))); |
| 361 } | 263 } |
| 362 | 264 |
| 363 return true; | 265 return true; |
| 364 } | 266 } |
| 365 | 267 |
| 366 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( | 268 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| 367 std::unique_ptr<Task> task, | 269 std::unique_ptr<Task> task, |
| 368 scoped_refptr<Sequence> sequence, | 270 scoped_refptr<Sequence> sequence) { |
| 369 SchedulerWorker* worker) { | |
| 370 DCHECK(task); | 271 DCHECK(task); |
| 371 DCHECK(sequence); | 272 DCHECK(sequence); |
| 372 DCHECK(!worker || ContainsWorker(workers_, worker)); | |
| 373 | 273 |
| 374 // Confirm that |task| is ready to run (its delayed run time is either null or | 274 // Confirm that |task| is ready to run (its delayed run time is either null or |
| 375 // in the past). | 275 // in the past). |
| 376 DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); | 276 DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); |
| 377 | 277 |
| 378 // Because |worker| belongs to this worker pool, we know that the type | |
| 379 // of its delegate is SchedulerWorkerDelegateImpl. | |
| 380 PriorityQueue* const priority_queue = | |
| 381 worker | |
| 382 ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) | |
| 383 ->single_threaded_priority_queue() | |
| 384 : &shared_priority_queue_; | |
| 385 DCHECK(priority_queue); | |
| 386 | |
| 387 const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 278 const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| 388 if (sequence_was_empty) { | 279 if (sequence_was_empty) { |
| 389 // Insert |sequence| in |priority_queue| if it was empty before |task| was | 280 // Insert |sequence| in |shared_priority_queue_| if it was empty before |
| 390 // inserted into it. Otherwise, one of these must be true: | 281 // |task| was inserted into it. Otherwise, one of these must be true: |
| 391 // - |sequence| is already in a PriorityQueue (not necessarily | 282 // - |sequence| is already in a PriorityQueue, or, |
| 392 // |shared_priority_queue_|), or, | |
| 393 // - A worker is running a Task from |sequence|. It will insert |sequence| | 283 // - A worker is running a Task from |sequence|. It will insert |sequence| |
| 394 // in a PriorityQueue once it's done running the Task. | 284 // in a PriorityQueue once it's done running the Task. |
| 395 const auto sequence_sort_key = sequence->GetSortKey(); | 285 const auto sequence_sort_key = sequence->GetSortKey(); |
| 396 priority_queue->BeginTransaction()->Push(std::move(sequence), | 286 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
| 397 sequence_sort_key); | 287 sequence_sort_key); |
| 398 | 288 |
| 399 // Wake up a worker to process |sequence|. | 289 // Wake up a worker to process |sequence|. |
|
gab
2017/03/15 20:20:58
rm now redundant comment
robliao
2017/03/15 20:46:44
This comment is consistent with the current style
gab
2017/03/16 15:34:46
Hmm ok, we at least shouldn't add more (and I'm ha
robliao
2017/03/16 22:49:03
While I agree with your opinion, the style guide i
| |
| 400 if (worker) | 290 WakeUpOneWorker(); |
| 401 WakeUpWorker(worker); | |
| 402 else | |
| 403 WakeUpOneWorker(); | |
| 404 } | 291 } |
| 405 } | 292 } |
| 406 | 293 |
| 407 void SchedulerWorkerPoolImpl::GetHistograms( | 294 void SchedulerWorkerPoolImpl::GetHistograms( |
| 408 std::vector<const HistogramBase*>* histograms) const { | 295 std::vector<const HistogramBase*>* histograms) const { |
| 409 histograms->push_back(detach_duration_histogram_); | 296 histograms->push_back(detach_duration_histogram_); |
| 410 histograms->push_back(num_tasks_between_waits_histogram_); | 297 histograms->push_back(num_tasks_between_waits_histogram_); |
| 411 } | 298 } |
| 412 | 299 |
| 413 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { | 300 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 436 | 323 |
| 437 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { | 324 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
| 438 size_t num_alive_workers = 0; | 325 size_t num_alive_workers = 0; |
| 439 for (const auto& worker : workers_) { | 326 for (const auto& worker : workers_) { |
| 440 if (worker->ThreadAliveForTesting()) | 327 if (worker->ThreadAliveForTesting()) |
| 441 ++num_alive_workers; | 328 ++num_alive_workers; |
| 442 } | 329 } |
| 443 return num_alive_workers; | 330 return num_alive_workers; |
| 444 } | 331 } |
| 445 | 332 |
| 446 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | |
| 447 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
| 448 SchedulerWorkerPool* worker_pool, | |
| 449 SchedulerWorker* worker) | |
| 450 : traits_(traits), | |
| 451 worker_pool_(worker_pool), | |
| 452 worker_(worker) { | |
| 453 DCHECK(worker_pool_); | |
| 454 DCHECK(worker_); | |
| 455 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | |
| 456 RegisterSingleThreadTaskRunner(); | |
| 457 } | |
| 458 | |
| 459 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | |
| 460 ~SchedulerSingleThreadTaskRunner() { | |
| 461 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | |
| 462 UnregisterSingleThreadTaskRunner(); | |
| 463 } | |
| 464 | |
| 465 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 333 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 466 SchedulerWorkerDelegateImpl( | 334 SchedulerWorkerDelegateImpl( |
| 467 SchedulerWorkerPoolImpl* outer, | 335 SchedulerWorkerPoolImpl* outer, |
| 468 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 336 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 469 const PriorityQueue* shared_priority_queue, | |
| 470 int index) | 337 int index) |
| 471 : outer_(outer), | 338 : outer_(outer), |
| 472 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 339 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| 473 single_threaded_priority_queue_(shared_priority_queue), | |
| 474 index_(index) {} | 340 index_(index) {} |
| 475 | 341 |
| 476 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 342 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 477 ~SchedulerWorkerDelegateImpl() = default; | 343 ~SchedulerWorkerDelegateImpl() = default; |
| 478 | 344 |
| 479 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 345 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| 480 SchedulerWorker* worker) { | 346 SchedulerWorker* worker) { |
| 481 #if DCHECK_IS_ON() | 347 #if DCHECK_IS_ON() |
| 482 // Wait for |outer_->workers_created_| to avoid traversing | 348 // Wait for |outer_->workers_created_| to avoid traversing |
| 483 // |outer_->workers_| while it is being filled by Initialize(). | 349 // |outer_->workers_| while it is being filled by Initialize(). |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 520 // SchedulerWorker didn't wait on its WaitableEvent since the last time the | 386 // SchedulerWorker didn't wait on its WaitableEvent since the last time the |
| 521 // histogram was recorded). | 387 // histogram was recorded). |
| 522 if (last_get_work_returned_nullptr_ && !did_detach_since_last_get_work_) { | 388 if (last_get_work_returned_nullptr_ && !did_detach_since_last_get_work_) { |
| 523 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_); | 389 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_); |
| 524 num_tasks_since_last_wait_ = 0; | 390 num_tasks_since_last_wait_ = 0; |
| 525 } | 391 } |
| 526 | 392 |
| 527 scoped_refptr<Sequence> sequence; | 393 scoped_refptr<Sequence> sequence; |
| 528 { | 394 { |
| 529 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( | 395 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
| 530 outer_->shared_priority_queue_.BeginTransaction()); | 396 outer_->shared_priority_queue_.BeginTransaction()); |
|
gab
2017/03/15 20:20:58
This was the only use case of transaction I believ
robliao
2017/03/15 20:46:44
Correct. This is focused on just SchedulerWorkerPo
gab
2017/03/16 15:34:45
Unless you have a CL doing this now I still prefer
robliao
2017/03/16 22:49:03
In progress at the moment!
gab
2017/03/20 16:40:58
Ok but for future reference a comment in code refe
| |
| 531 std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( | |
| 532 single_threaded_priority_queue_.BeginTransaction()); | |
| 533 | 397 |
| 534 if (shared_transaction->IsEmpty() && | 398 if (shared_transaction->IsEmpty()) { |
| 535 single_threaded_transaction->IsEmpty()) { | |
| 536 single_threaded_transaction.reset(); | |
| 537 | |
| 538 // |shared_transaction| is kept alive while |worker| is added to | 399 // |shared_transaction| is kept alive while |worker| is added to |
| 539 // |idle_workers_stack_| to avoid this race: | 400 // |idle_workers_stack_| to avoid this race: |
| 540 // 1. This thread creates a Transaction, finds |shared_priority_queue_| | 401 // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
| 541 // empty and ends the Transaction. | 402 // empty and ends the Transaction. |
| 542 // 2. Other thread creates a Transaction, inserts a Sequence into | 403 // 2. Other thread creates a Transaction, inserts a Sequence into |
| 543 // |shared_priority_queue_| and ends the Transaction. This can't happen | 404 // |shared_priority_queue_| and ends the Transaction. This can't happen |
| 544 // if the Transaction of step 1 is still active because because there | 405 // if the Transaction of step 1 is still active because because there |
| 545 // can only be one active Transaction per PriorityQueue at a time. | 406 // can only be one active Transaction per PriorityQueue at a time. |
| 546 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because | 407 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because |
| 547 // |idle_workers_stack_| is empty. | 408 // |idle_workers_stack_| is empty. |
| 548 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. | 409 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
| 549 // No thread runs the Sequence inserted in step 2. | 410 // No thread runs the Sequence inserted in step 2. |
| 550 outer_->AddToIdleWorkersStack(worker); | 411 outer_->AddToIdleWorkersStack(worker); |
| 551 if (idle_start_time_.is_null()) | 412 if (idle_start_time_.is_null()) |
| 552 idle_start_time_ = TimeTicks::Now(); | 413 idle_start_time_ = TimeTicks::Now(); |
| 553 did_detach_since_last_get_work_ = false; | 414 did_detach_since_last_get_work_ = false; |
| 554 last_get_work_returned_nullptr_ = true; | 415 last_get_work_returned_nullptr_ = true; |
| 555 return nullptr; | 416 return nullptr; |
| 556 } | 417 } |
| 557 | 418 |
| 558 // True if both PriorityQueues have Sequences and the Sequence at the top of | 419 sequence = shared_transaction->PopSequence(); |
| 559 // the shared PriorityQueue is more important. | |
| 560 const bool shared_sequence_is_more_important = | |
| 561 !shared_transaction->IsEmpty() && | |
| 562 !single_threaded_transaction->IsEmpty() && | |
| 563 shared_transaction->PeekSortKey() > | |
| 564 single_threaded_transaction->PeekSortKey(); | |
| 565 | |
| 566 if (single_threaded_transaction->IsEmpty() || | |
| 567 shared_sequence_is_more_important) { | |
| 568 sequence = shared_transaction->PopSequence(); | |
| 569 last_sequence_is_single_threaded_ = false; | |
| 570 } else { | |
| 571 DCHECK(!single_threaded_transaction->IsEmpty()); | |
| 572 sequence = single_threaded_transaction->PopSequence(); | |
| 573 last_sequence_is_single_threaded_ = true; | |
| 574 } | |
| 575 } | 420 } |
| 576 DCHECK(sequence); | 421 DCHECK(sequence); |
| 577 | 422 |
| 578 outer_->RemoveFromIdleWorkersStack(worker); | 423 outer_->RemoveFromIdleWorkersStack(worker); |
| 579 idle_start_time_ = TimeTicks(); | 424 idle_start_time_ = TimeTicks(); |
| 580 did_detach_since_last_get_work_ = false; | 425 did_detach_since_last_get_work_ = false; |
| 581 last_get_work_returned_nullptr_ = false; | 426 last_get_work_returned_nullptr_ = false; |
| 582 | 427 |
| 583 return sequence; | 428 return sequence; |
| 584 } | 429 } |
| 585 | 430 |
| 586 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { | 431 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
| 587 ++num_tasks_since_last_wait_; | 432 ++num_tasks_since_last_wait_; |
| 588 ++num_tasks_since_last_detach_; | 433 ++num_tasks_since_last_detach_; |
| 589 } | 434 } |
| 590 | 435 |
| 591 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 436 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 592 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 437 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
| 593 if (last_sequence_is_single_threaded_) { | 438 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| 594 // A single-threaded Sequence is always re-enqueued in the single-threaded | 439 // |sequence| must be enqueued. |
| 595 // PriorityQueue from which it was extracted. | 440 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| 596 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | |
| 597 single_threaded_priority_queue_.BeginTransaction()->Push( | |
| 598 std::move(sequence), sequence_sort_key); | |
| 599 } else { | |
| 600 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | |
| 601 // |sequence| must be enqueued. | |
| 602 re_enqueue_sequence_callback_.Run(std::move(sequence)); | |
| 603 } | |
| 604 } | 441 } |
| 605 | 442 |
| 606 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 443 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 607 GetSleepTimeout() { | 444 GetSleepTimeout() { |
| 608 return outer_->suggested_reclaim_time_; | 445 return outer_->suggested_reclaim_time_; |
| 609 } | 446 } |
| 610 | 447 |
| 611 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 448 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| 612 SchedulerWorker* worker) { | 449 SchedulerWorker* worker) { |
| 613 // It's not an issue if |num_single_threaded_runners_| is incremented after | |
| 614 // this because the newly created SingleThreadTaskRunner (from which no task | |
| 615 // has run yet) will simply run all its tasks on the next physical thread | |
| 616 // created by the worker. | |
| 617 const bool can_detach = | 450 const bool can_detach = |
| 618 !idle_start_time_.is_null() && | 451 !idle_start_time_.is_null() && |
| 619 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && | 452 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
| 620 worker != outer_->PeekAtIdleWorkersStack() && | 453 worker != outer_->PeekAtIdleWorkersStack() && |
| 621 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && | |
| 622 outer_->CanWorkerDetachForTesting(); | 454 outer_->CanWorkerDetachForTesting(); |
| 623 return can_detach; | 455 return can_detach; |
| 624 } | 456 } |
| 625 | 457 |
| 626 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { | 458 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { |
| 627 DCHECK(!did_detach_since_last_get_work_); | 459 DCHECK(!did_detach_since_last_get_work_); |
| 628 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); | 460 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); |
| 629 num_tasks_since_last_detach_ = 0; | 461 num_tasks_since_last_detach_ = 0; |
| 630 did_detach_since_last_get_work_ = true; | 462 did_detach_since_last_get_work_ = true; |
| 631 last_detach_time_ = TimeTicks::Now(); | 463 last_detach_time_ = TimeTicks::Now(); |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 693 const bool is_standby_lazy = | 525 const bool is_standby_lazy = |
| 694 params.standby_thread_policy() == | 526 params.standby_thread_policy() == |
| 695 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; | 527 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; |
| 696 const SchedulerWorker::InitialState initial_state = | 528 const SchedulerWorker::InitialState initial_state = |
| 697 (index == 0 && !is_standby_lazy) | 529 (index == 0 && !is_standby_lazy) |
| 698 ? SchedulerWorker::InitialState::ALIVE | 530 ? SchedulerWorker::InitialState::ALIVE |
| 699 : SchedulerWorker::InitialState::DETACHED; | 531 : SchedulerWorker::InitialState::DETACHED; |
| 700 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | 532 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( |
| 701 params.priority_hint(), | 533 params.priority_hint(), |
| 702 MakeUnique<SchedulerWorkerDelegateImpl>( | 534 MakeUnique<SchedulerWorkerDelegateImpl>( |
| 703 this, re_enqueue_sequence_callback, &shared_priority_queue_, index), | 535 this, re_enqueue_sequence_callback, index), |
| 704 task_tracker_, initial_state, params.backward_compatibility()); | 536 task_tracker_, initial_state, params.backward_compatibility()); |
| 705 if (!worker) | 537 if (!worker) |
| 706 break; | 538 break; |
| 707 idle_workers_stack_.Push(worker.get()); | 539 idle_workers_stack_.Push(worker.get()); |
| 708 workers_[index] = std::move(worker); | 540 workers_[index] = std::move(worker); |
| 709 } | 541 } |
| 710 | 542 |
| 711 #if DCHECK_IS_ON() | 543 #if DCHECK_IS_ON() |
| 712 workers_created_.Signal(); | 544 workers_created_.Signal(); |
| 713 #endif | 545 #endif |
| 714 | 546 |
| 715 return !workers_.empty(); | 547 return !workers_.empty(); |
| 716 } | 548 } |
| 717 | 549 |
| 718 void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) { | |
| 719 DCHECK(worker); | |
| 720 RemoveFromIdleWorkersStack(worker); | |
| 721 worker->WakeUp(); | |
| 722 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding | |
| 723 // hysteresis to the CanDetach check. See https://crbug.com/666041. | |
| 724 } | |
| 725 | |
| 726 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 550 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| 727 SchedulerWorker* worker; | 551 SchedulerWorker* worker; |
| 728 { | 552 { |
| 729 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 553 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 730 worker = idle_workers_stack_.Pop(); | 554 worker = idle_workers_stack_.Pop(); |
| 731 } | 555 } |
| 732 if (worker) | 556 if (worker) |
| 733 worker->WakeUp(); | 557 worker->WakeUp(); |
| 558 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding | |
|
gab
2017/03/15 20:20:58
Shouldn't this TODO be in CanDetach()?
robliao
2017/03/15 20:46:44
This was the best place to put it that respected t
| |
| 559 // hysteresis to the CanDetach check. See https://crbug.com/666041. | |
| 734 } | 560 } |
| 735 | 561 |
| 736 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( | 562 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| 737 SchedulerWorker* worker) { | 563 SchedulerWorker* worker) { |
| 738 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 564 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 739 // Detachment may cause multiple attempts to add because the delegate cannot | 565 // Detachment may cause multiple attempts to add because the delegate cannot |
| 740 // determine who woke it up. As a result, when it wakes up, it may conclude | 566 // determine who woke it up. As a result, when it wakes up, it may conclude |
| 741 // there's no work to be done and attempt to add itself to the idle stack | 567 // there's no work to be done and attempt to add itself to the idle stack |
| 742 // again. | 568 // again. |
| 743 if (!idle_workers_stack_.Contains(worker)) | 569 if (!idle_workers_stack_.Contains(worker)) |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 759 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 585 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 760 idle_workers_stack_.Remove(worker); | 586 idle_workers_stack_.Remove(worker); |
| 761 } | 587 } |
| 762 | 588 |
| 763 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 589 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| 764 return !worker_detachment_disallowed_.IsSet(); | 590 return !worker_detachment_disallowed_.IsSet(); |
| 765 } | 591 } |
| 766 | 592 |
| 767 } // namespace internal | 593 } // namespace internal |
| 768 } // namespace base | 594 } // namespace base |
| OLD | NEW |