Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/atomicops.h" | |
| 12 #include "base/bind.h" | 13 #include "base/bind.h" |
| 13 #include "base/bind_helpers.h" | 14 #include "base/bind_helpers.h" |
| 14 #include "base/lazy_instance.h" | 15 #include "base/lazy_instance.h" |
| 15 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
| 16 #include "base/sequenced_task_runner.h" | 17 #include "base/sequenced_task_runner.h" |
| 17 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
| 18 #include "base/strings/stringprintf.h" | 19 #include "base/strings/stringprintf.h" |
| 19 #include "base/task_scheduler/delayed_task_manager.h" | 20 #include "base/task_scheduler/delayed_task_manager.h" |
| 20 #include "base/task_scheduler/task_tracker.h" | 21 #include "base/task_scheduler/task_tracker.h" |
| 21 #include "base/threading/platform_thread.h" | 22 #include "base/threading/platform_thread.h" |
| 22 #include "base/threading/thread_local.h" | 23 #include "base/threading/thread_local.h" |
| 23 #include "base/threading/thread_restrictions.h" | 24 #include "base/threading/thread_restrictions.h" |
| 25 #include "base/time/time.h" | |
| 24 | 26 |
| 25 namespace base { | 27 namespace base { |
| 26 namespace internal { | 28 namespace internal { |
| 27 | 29 |
| 28 namespace { | 30 namespace { |
| 29 | 31 |
| 30 // SchedulerWorker that owns the current thread, if any. | 32 // SchedulerWorker that owns the current thread, if any. |
| 31 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky | 33 LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky |
| 32 tls_current_worker = LAZY_INSTANCE_INITIALIZER; | 34 tls_current_worker = LAZY_INSTANCE_INITIALIZER; |
| 33 | 35 |
| 34 // SchedulerWorkerPool that owns the current thread, if any. | 36 // SchedulerWorkerPool that owns the current thread, if any. |
| 35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky | 37 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky |
| 36 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; | 38 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; |
| 37 | 39 |
| 38 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 40 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
| 39 class SchedulerParallelTaskRunner : public TaskRunner { | 41 class SchedulerParallelTaskRunner : public TaskRunner { |
| 40 public: | 42 public: |
| 41 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 43 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
| 42 // long as |worker_pool| is alive. | 44 // long as |worker_pool| is alive. |
| 43 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 45 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| 44 SchedulerParallelTaskRunner(const TaskTraits& traits, | 46 SchedulerParallelTaskRunner(const TaskTraits& traits, |
| 45 SchedulerWorkerPool* worker_pool) | 47 SchedulerWorkerPool* worker_pool) |
| 46 : traits_(traits), worker_pool_(worker_pool) {} | 48 : traits_(traits), worker_pool_(worker_pool) { |
| 49 DCHECK(worker_pool_); | |
| 50 } | |
| 47 | 51 |
| 48 // TaskRunner: | 52 // TaskRunner: |
| 49 bool PostDelayedTask(const tracked_objects::Location& from_here, | 53 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 50 const Closure& closure, | 54 const Closure& closure, |
| 51 TimeDelta delay) override { | 55 TimeDelta delay) override { |
| 52 // Post the task as part of a one-off single-task Sequence. | 56 // Post the task as part of a one-off single-task Sequence. |
| 53 return worker_pool_->PostTaskWithSequence( | 57 return worker_pool_->PostTaskWithSequence( |
| 54 WrapUnique(new Task(from_here, closure, traits_, delay)), | 58 WrapUnique(new Task(from_here, closure, traits_, delay)), |
| 55 make_scoped_refptr(new Sequence), nullptr); | 59 make_scoped_refptr(new Sequence), nullptr); |
| 56 } | 60 } |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 69 }; | 73 }; |
| 70 | 74 |
| 71 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 75 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
| 72 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 76 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| 73 public: | 77 public: |
| 74 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks | 78 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
| 75 // so long as |worker_pool| is alive. | 79 // so long as |worker_pool| is alive. |
| 76 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 80 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| 77 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 81 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| 78 SchedulerWorkerPool* worker_pool) | 82 SchedulerWorkerPool* worker_pool) |
| 79 : traits_(traits), worker_pool_(worker_pool) {} | 83 : traits_(traits), worker_pool_(worker_pool) { |
| 84 DCHECK(worker_pool_); | |
| 85 } | |
| 80 | 86 |
| 81 // SequencedTaskRunner: | 87 // SequencedTaskRunner: |
| 82 bool PostDelayedTask(const tracked_objects::Location& from_here, | 88 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 83 const Closure& closure, | 89 const Closure& closure, |
| 84 TimeDelta delay) override { | 90 TimeDelta delay) override { |
| 85 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 91 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| 86 task->sequenced_task_runner_ref = this; | 92 task->sequenced_task_runner_ref = this; |
| 87 | 93 |
| 88 // Post the task as part of |sequence_|. | 94 // Post the task as part of |sequence_|. |
| 89 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 95 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 106 | 112 |
| 107 // Sequence for all Tasks posted through this TaskRunner. | 113 // Sequence for all Tasks posted through this TaskRunner. |
| 108 const scoped_refptr<Sequence> sequence_ = new Sequence; | 114 const scoped_refptr<Sequence> sequence_ = new Sequence; |
| 109 | 115 |
| 110 const TaskTraits traits_; | 116 const TaskTraits traits_; |
| 111 SchedulerWorkerPool* const worker_pool_; | 117 SchedulerWorkerPool* const worker_pool_; |
| 112 | 118 |
| 113 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 119 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| 114 }; | 120 }; |
| 115 | 121 |
| 122 // Only used in DCHECKs. | |
| 123 bool ContainsWorker( | |
| 124 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, | |
| 125 const SchedulerWorker* worker) { | |
| 126 auto it = std::find_if(workers.begin(), workers.end(), | |
| 127 [worker](const std::unique_ptr<SchedulerWorker>& i) { | |
| 128 return i.get() == worker; | |
| 129 }); | |
| 130 return it != workers.end(); | |
| 131 } | |
| 132 | |
| 133 } // namespace | |
| 134 | |
| 116 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | 135 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| 117 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | 136 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| 137 public SingleThreadTaskRunner { | |
| 118 public: | 138 public: |
| 119 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | 139 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| 120 // tasks so long as |worker_pool| and |worker| are alive. | 140 // tasks so long as |worker_pool| and |worker| are alive. |
| 121 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | 141 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| 122 // and |worker|. | 142 // and |worker|. |
| 123 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | 143 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| 124 SchedulerWorkerPool* worker_pool, | 144 SchedulerWorkerPoolImpl* worker_pool, |
| 125 SchedulerWorker* worker) | 145 SchedulerWorker* worker); |
| 126 : traits_(traits), | |
| 127 worker_pool_(worker_pool), | |
| 128 worker_(worker) {} | |
| 129 | 146 |
| 130 // SingleThreadTaskRunner: | 147 // SingleThreadTaskRunner: |
| 131 bool PostDelayedTask(const tracked_objects::Location& from_here, | 148 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 132 const Closure& closure, | 149 const Closure& closure, |
| 133 TimeDelta delay) override { | 150 TimeDelta delay) override { |
| 134 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 151 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| 135 task->single_thread_task_runner_ref = this; | 152 task->single_thread_task_runner_ref = this; |
| 136 | 153 |
| 137 // Post the task to be executed by |worker_| as part of |sequence_|. | 154 // Post the task to be executed by |worker_| as part of |sequence_|. |
| 138 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 155 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| 139 worker_); | 156 worker_); |
| 140 } | 157 } |
| 141 | 158 |
| 142 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 159 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 143 const Closure& closure, | 160 const Closure& closure, |
| 144 base::TimeDelta delay) override { | 161 base::TimeDelta delay) override { |
| 145 // Tasks are never nested within the task scheduler. | 162 // Tasks are never nested within the task scheduler. |
| 146 return PostDelayedTask(from_here, closure, delay); | 163 return PostDelayedTask(from_here, closure, delay); |
| 147 } | 164 } |
| 148 | 165 |
| 149 bool RunsTasksOnCurrentThread() const override { | 166 bool RunsTasksOnCurrentThread() const override { |
| 150 return tls_current_worker.Get().Get() == worker_; | 167 return tls_current_worker.Get().Get() == worker_; |
| 151 } | 168 } |
| 152 | 169 |
| 153 private: | 170 private: |
| 154 ~SchedulerSingleThreadTaskRunner() override = default; | 171 ~SchedulerSingleThreadTaskRunner() override; |
| 155 | 172 |
| 156 // Sequence for all Tasks posted through this TaskRunner. | 173 // Sequence for all Tasks posted through this TaskRunner. |
| 157 const scoped_refptr<Sequence> sequence_ = new Sequence; | 174 const scoped_refptr<Sequence> sequence_ = new Sequence; |
| 158 | 175 |
| 159 const TaskTraits traits_; | 176 const TaskTraits traits_; |
| 160 SchedulerWorkerPool* const worker_pool_; | 177 SchedulerWorkerPoolImpl* const worker_pool_; |
| 161 SchedulerWorker* const worker_; | 178 SchedulerWorker* const worker_; |
| 162 | 179 |
| 163 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 180 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| 164 }; | 181 }; |
| 165 | 182 |
| 166 // Only used in DCHECKs. | |
| 167 bool ContainsWorker( | |
| 168 const std::vector<std::unique_ptr<SchedulerWorker>>& workers, | |
| 169 const SchedulerWorker* worker) { | |
| 170 auto it = std::find_if(workers.begin(), workers.end(), | |
| 171 [worker](const std::unique_ptr<SchedulerWorker>& i) { | |
| 172 return i.get() == worker; | |
| 173 }); | |
| 174 return it != workers.end(); | |
| 175 } | |
| 176 | |
| 177 } // namespace | |
| 178 | |
| 179 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 183 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| 180 : public SchedulerWorker::Delegate { | 184 : public SchedulerWorker::Delegate { |
| 181 public: | 185 public: |
| 182 // |outer| owns the worker for which this delegate is constructed. | 186 // |outer| owns the worker for which this delegate is constructed. |
| 183 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 187 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
| 184 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 188 // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
| 185 // PriorityQueue whose transactions may overlap with the worker's | 189 // PriorityQueue whose transactions may overlap with the worker's |
| 186 // single-threaded PriorityQueue's transactions. |index| will be appended to | 190 // single-threaded PriorityQueue's transactions. |index| will be appended to |
| 187 // the pool name to label the underlying worker threads. | 191 // the pool name to label the underlying worker threads. |
| 188 SchedulerWorkerDelegateImpl( | 192 SchedulerWorkerDelegateImpl( |
| 189 SchedulerWorkerPoolImpl* outer, | 193 SchedulerWorkerPoolImpl* outer, |
| 190 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 194 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 191 const PriorityQueue* shared_priority_queue, | 195 const PriorityQueue* shared_priority_queue, |
| 192 int index); | 196 int index); |
| 193 ~SchedulerWorkerDelegateImpl() override; | 197 ~SchedulerWorkerDelegateImpl() override; |
| 194 | 198 |
| 195 PriorityQueue* single_threaded_priority_queue() { | 199 PriorityQueue* single_threaded_priority_queue() { |
| 196 return &single_threaded_priority_queue_; | 200 return &single_threaded_priority_queue_; |
| 197 } | 201 } |
| 198 | 202 |
| 199 // SchedulerWorker::Delegate: | 203 // SchedulerWorker::Delegate: |
| 200 void OnMainEntry(SchedulerWorker* worker) override; | 204 void OnMainEntry(SchedulerWorker* worker) override; |
| 201 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 205 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
| 202 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 206 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
| 203 TimeDelta GetSleepTimeout() override; | 207 TimeDelta GetSleepTimeout() override; |
| 204 bool CanDetach(SchedulerWorker* worker) override; | 208 bool CanDetach(SchedulerWorker* worker) override; |
| 205 | 209 |
| 210 void RegisterSingleThreadTaskRunner() { | |
| 211 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); | |
|
gab
2016/07/13 18:36:31
Let's wait until Dana/Francois settle the barrier/
robliao
2016/07/13 20:19:46
sgtm. As we are all well aware, atomics are tricky
| |
| 212 } | |
| 213 | |
| 214 void UnregisterSingleThreadTaskRunner() { | |
| 215 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); | |
| 216 } | |
| 217 | |
| 206 private: | 218 private: |
| 207 SchedulerWorkerPoolImpl* outer_; | 219 SchedulerWorkerPoolImpl* outer_; |
| 208 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 220 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| 209 | 221 |
| 210 // Single-threaded PriorityQueue for the worker. | 222 // Single-threaded PriorityQueue for the worker. |
| 211 PriorityQueue single_threaded_priority_queue_; | 223 PriorityQueue single_threaded_priority_queue_; |
| 212 | 224 |
| 213 // True if the last Sequence returned by GetWork() was extracted from | 225 // True if the last Sequence returned by GetWork() was extracted from |
| 214 // |single_threaded_priority_queue_|. | 226 // |single_threaded_priority_queue_|. |
| 215 bool last_sequence_is_single_threaded_ = false; | 227 bool last_sequence_is_single_threaded_ = false; |
| 216 | 228 |
| 229 // True if the worker performed an idle cycle. Workers start out idle. | |
| 230 bool performed_idle_cycle_ = true; | |
|
gab
2016/07/13 18:36:31
is_idle_ ? Then it's true whether it's a new worke
robliao
2016/07/13 20:19:46
The delegate needs to distinguish between a thread
gab
2016/07/13 21:07:55
Ah!! Got it :-). Hadn't fully understood the logic
robliao
2016/07/19 22:03:47
I like performed_idle_cycle_ since it lets us expr
| |
| 231 | |
| 232 subtle::Atomic32 num_single_threaded_runners_ = 0; | |
| 233 | |
| 217 const int index_; | 234 const int index_; |
| 218 | 235 |
| 219 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 236 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| 220 }; | 237 }; |
| 221 | 238 |
| 222 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 239 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
| 223 // SchedulerWorkerPool should never be deleted in production unless its | 240 // SchedulerWorkerPool should never be deleted in production unless its |
| 224 // initialization failed. | 241 // initialization failed. |
| 225 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 242 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
| 226 } | 243 } |
| 227 | 244 |
| 228 // static | 245 // static |
| 229 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | 246 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| 230 StringPiece name, | 247 StringPiece name, |
| 231 ThreadPriority thread_priority, | 248 ThreadPriority thread_priority, |
| 232 size_t max_threads, | 249 size_t max_threads, |
| 233 IORestriction io_restriction, | 250 IORestriction io_restriction, |
| 251 const TimeDelta& suggested_reclaim_time, | |
| 234 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 252 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 235 TaskTracker* task_tracker, | 253 TaskTracker* task_tracker, |
| 236 DelayedTaskManager* delayed_task_manager) { | 254 DelayedTaskManager* delayed_task_manager) { |
| 237 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( | 255 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
| 238 new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, | 256 new SchedulerWorkerPoolImpl(name, io_restriction, suggested_reclaim_time, |
| 239 delayed_task_manager)); | 257 task_tracker, delayed_task_manager)); |
| 240 if (worker_pool->Initialize(thread_priority, max_threads, | 258 if (worker_pool->Initialize(thread_priority, max_threads, |
| 241 re_enqueue_sequence_callback)) { | 259 re_enqueue_sequence_callback)) { |
| 242 return worker_pool; | 260 return worker_pool; |
| 243 } | 261 } |
| 244 return nullptr; | 262 return nullptr; |
| 245 } | 263 } |
| 246 | 264 |
| 247 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 265 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| 248 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 266 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 249 while (idle_workers_stack_.Size() < workers_.size()) | 267 while (idle_workers_stack_.Size() < workers_.size()) |
| 250 idle_workers_stack_cv_for_testing_->Wait(); | 268 idle_workers_stack_cv_for_testing_->Wait(); |
| 251 } | 269 } |
| 252 | 270 |
| 253 void SchedulerWorkerPoolImpl::JoinForTesting() { | 271 void SchedulerWorkerPoolImpl::JoinForTesting() { |
| 272 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << | |
| 273 "Workers can detach during join."; | |
|
gab
2016/07/13 18:36:31
CanWorker...() versus "Workers" (plural versus sin
robliao
2016/07/13 20:19:46
The act of detaching applies to a single worker, m
| |
| 254 for (const auto& worker : workers_) | 274 for (const auto& worker : workers_) |
| 255 worker->JoinForTesting(); | 275 worker->JoinForTesting(); |
| 256 | 276 |
| 257 DCHECK(!join_for_testing_returned_.IsSignaled()); | 277 DCHECK(!join_for_testing_returned_.IsSignaled()); |
| 258 join_for_testing_returned_.Signal(); | 278 join_for_testing_returned_.Signal(); |
| 259 } | 279 } |
| 260 | 280 |
| 281 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { | |
| 282 AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); | |
| 283 worker_detachment_allowed_ = false; | |
| 284 } | |
| 285 | |
| 261 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 286 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| 262 const TaskTraits& traits, | 287 const TaskTraits& traits, |
| 263 ExecutionMode execution_mode) { | 288 ExecutionMode execution_mode) { |
| 264 switch (execution_mode) { | 289 switch (execution_mode) { |
| 265 case ExecutionMode::PARALLEL: | 290 case ExecutionMode::PARALLEL: |
| 266 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 291 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
| 267 | 292 |
| 268 case ExecutionMode::SEQUENCED: | 293 case ExecutionMode::SEQUENCED: |
| 269 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 294 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| 270 | 295 |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 361 sequence_sort_key); | 386 sequence_sort_key); |
| 362 | 387 |
| 363 // Wake up a worker to process |sequence|. | 388 // Wake up a worker to process |sequence|. |
| 364 if (worker) | 389 if (worker) |
| 365 worker->WakeUp(); | 390 worker->WakeUp(); |
| 366 else | 391 else |
| 367 WakeUpOneWorker(); | 392 WakeUpOneWorker(); |
| 368 } | 393 } |
| 369 } | 394 } |
| 370 | 395 |
| 396 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | |
| 397 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
| 398 SchedulerWorkerPoolImpl* worker_pool, | |
| 399 SchedulerWorker* worker) | |
|
gab
2016/07/13 18:36:31
missing space: git cl format?
robliao
2016/07/13 20:19:46
Done.
| |
| 400 : traits_(traits), | |
| 401 worker_pool_(worker_pool), | |
| 402 worker_(worker) { | |
| 403 DCHECK(worker_pool_); | |
| 404 DCHECK(worker_); | |
| 405 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | |
| 406 RegisterSingleThreadTaskRunner(); | |
| 407 } | |
| 408 | |
| 409 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | |
| 410 ~SchedulerSingleThreadTaskRunner() { | |
| 411 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | |
| 412 UnregisterSingleThreadTaskRunner(); | |
| 413 } | |
| 414 | |
| 371 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 415 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 372 SchedulerWorkerDelegateImpl( | 416 SchedulerWorkerDelegateImpl( |
| 373 SchedulerWorkerPoolImpl* outer, | 417 SchedulerWorkerPoolImpl* outer, |
| 374 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 418 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 375 const PriorityQueue* shared_priority_queue, | 419 const PriorityQueue* shared_priority_queue, |
| 376 int index) | 420 int index) |
| 377 : outer_(outer), | 421 : outer_(outer), |
| 378 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 422 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| 379 single_threaded_priority_queue_(shared_priority_queue), | 423 single_threaded_priority_queue_(shared_priority_queue), |
| 380 index_(index) {} | 424 index_(index) {} |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 392 #endif | 436 #endif |
| 393 | 437 |
| 394 PlatformThread::SetName( | 438 PlatformThread::SetName( |
| 395 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); | 439 StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); |
| 396 | 440 |
| 397 DCHECK(!tls_current_worker.Get().Get()); | 441 DCHECK(!tls_current_worker.Get().Get()); |
| 398 DCHECK(!tls_current_worker_pool.Get().Get()); | 442 DCHECK(!tls_current_worker_pool.Get().Get()); |
| 399 tls_current_worker.Get().Set(worker); | 443 tls_current_worker.Get().Set(worker); |
| 400 tls_current_worker_pool.Get().Set(outer_); | 444 tls_current_worker_pool.Get().Set(outer_); |
| 401 | 445 |
| 446 // Workers start out idle, so this counts as an idle cycle. | |
| 447 performed_idle_cycle_ = true; | |
|
gab
2016/07/13 18:36:31
This is redundant per the value being initialized
robliao
2016/07/13 20:19:46
The delegate is reused across thread instances, so
| |
| 448 | |
| 402 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == | 449 ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
| 403 IORestriction::ALLOWED); | 450 IORestriction::ALLOWED); |
| 404 } | 451 } |
| 405 | 452 |
| 406 scoped_refptr<Sequence> | 453 scoped_refptr<Sequence> |
| 407 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( | 454 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| 408 SchedulerWorker* worker) { | 455 SchedulerWorker* worker) { |
| 409 DCHECK(ContainsWorker(outer_->workers_, worker)); | 456 DCHECK(ContainsWorker(outer_->workers_, worker)); |
| 410 | 457 |
| 411 scoped_refptr<Sequence> sequence; | 458 scoped_refptr<Sequence> sequence; |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 448 sequence = shared_transaction->PopSequence(); | 495 sequence = shared_transaction->PopSequence(); |
| 449 last_sequence_is_single_threaded_ = false; | 496 last_sequence_is_single_threaded_ = false; |
| 450 } else { | 497 } else { |
| 451 DCHECK(!single_threaded_transaction->IsEmpty()); | 498 DCHECK(!single_threaded_transaction->IsEmpty()); |
| 452 sequence = single_threaded_transaction->PopSequence(); | 499 sequence = single_threaded_transaction->PopSequence(); |
| 453 last_sequence_is_single_threaded_ = true; | 500 last_sequence_is_single_threaded_ = true; |
| 454 } | 501 } |
| 455 } | 502 } |
| 456 DCHECK(sequence); | 503 DCHECK(sequence); |
| 457 | 504 |
| 505 // We're doing work, so this is not an idle cycle. | |
| 506 performed_idle_cycle_ = false; | |
|
gab
2016/07/13 18:36:31
is_idle_ = false;
with no comments is sufficient
robliao
2016/07/13 20:19:46
sgtm. Removed.
| |
| 507 | |
| 458 outer_->RemoveFromIdleWorkersStack(worker); | 508 outer_->RemoveFromIdleWorkersStack(worker); |
| 459 return sequence; | 509 return sequence; |
| 460 } | 510 } |
| 461 | 511 |
| 462 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 512 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 463 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 513 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
| 464 if (last_sequence_is_single_threaded_) { | 514 if (last_sequence_is_single_threaded_) { |
| 465 // A single-threaded Sequence is always re-enqueued in the single-threaded | 515 // A single-threaded Sequence is always re-enqueued in the single-threaded |
| 466 // PriorityQueue from which it was extracted. | 516 // PriorityQueue from which it was extracted. |
| 467 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | 517 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
| 468 single_threaded_priority_queue_.BeginTransaction()->Push( | 518 single_threaded_priority_queue_.BeginTransaction()->Push( |
| 469 std::move(sequence), sequence_sort_key); | 519 std::move(sequence), sequence_sort_key); |
| 470 } else { | 520 } else { |
| 471 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 521 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| 472 // |sequence| must be enqueued. | 522 // |sequence| must be enqueued. |
| 473 re_enqueue_sequence_callback_.Run(std::move(sequence)); | 523 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| 474 } | 524 } |
| 475 } | 525 } |
| 476 | 526 |
| 477 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 527 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 478 GetSleepTimeout() { | 528 GetSleepTimeout() { |
| 479 return TimeDelta::Max(); | 529 return outer_->suggested_reclaim_time_; |
| 480 } | 530 } |
| 481 | 531 |
| 482 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 532 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| 483 SchedulerWorker* worker) { | 533 SchedulerWorker* worker) { |
| 484 return false; | 534 // It's not an issue if |num_single_threaded_runners_| is incremented after |
| 535 // this because the next single-threaded task will simply pick up a new | |
| 536 // physical thread. | |
| 537 const bool can_detach = | |
| 538 performed_idle_cycle_ && | |
| 539 worker != outer_->PeekAtIdleWorkersStack() && | |
| 540 !subtle::Release_Load(&num_single_threaded_runners_) && | |
| 541 outer_->CanWorkerDetachForTesting(); | |
| 542 // CanDetach is part of a SchedulerWorker's idle cycle. | |
| 543 performed_idle_cycle_ = true; | |
|
gab
2016/07/13 18:36:31
Shouldn't is_idle_ instead be set when about to re
robliao
2016/07/13 20:19:46
See is_idle_ comment.
gab
2016/07/13 21:07:55
Perhaps expand comment above here to explain the d
robliao
2016/07/19 22:03:47
Done.
| |
| 544 return can_detach; | |
| 485 } | 545 } |
| 486 | 546 |
| 487 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 547 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| 488 StringPiece name, | 548 StringPiece name, |
| 489 IORestriction io_restriction, | 549 IORestriction io_restriction, |
| 550 const TimeDelta& suggested_reclaim_time, | |
| 490 TaskTracker* task_tracker, | 551 TaskTracker* task_tracker, |
| 491 DelayedTaskManager* delayed_task_manager) | 552 DelayedTaskManager* delayed_task_manager) |
| 492 : name_(name.as_string()), | 553 : name_(name.as_string()), |
| 493 io_restriction_(io_restriction), | 554 io_restriction_(io_restriction), |
| 555 suggested_reclaim_time_(suggested_reclaim_time), | |
| 494 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 556 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| 495 idle_workers_stack_cv_for_testing_( | 557 idle_workers_stack_cv_for_testing_( |
| 496 idle_workers_stack_lock_.CreateConditionVariable()), | 558 idle_workers_stack_lock_.CreateConditionVariable()), |
| 497 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 559 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| 498 WaitableEvent::InitialState::NOT_SIGNALED), | 560 WaitableEvent::InitialState::NOT_SIGNALED), |
| 561 worker_detachment_allowed_(true), | |
|
gab
2016/07/13 18:36:31
If keeping the bool (ref. WaitableEvent comment),
robliao
2016/07/13 20:19:46
See earlier comment.
| |
| 499 #if DCHECK_IS_ON() | 562 #if DCHECK_IS_ON() |
| 500 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | 563 workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| 501 WaitableEvent::InitialState::NOT_SIGNALED), | 564 WaitableEvent::InitialState::NOT_SIGNALED), |
| 502 #endif | 565 #endif |
| 503 task_tracker_(task_tracker), | 566 task_tracker_(task_tracker), |
| 504 delayed_task_manager_(delayed_task_manager) { | 567 delayed_task_manager_(delayed_task_manager) { |
| 505 DCHECK(task_tracker_); | 568 DCHECK(task_tracker_); |
| 506 DCHECK(delayed_task_manager_); | 569 DCHECK(delayed_task_manager_); |
| 507 } | 570 } |
| 508 | 571 |
| 509 bool SchedulerWorkerPoolImpl::Initialize( | 572 bool SchedulerWorkerPoolImpl::Initialize( |
| 510 ThreadPriority thread_priority, | 573 ThreadPriority thread_priority, |
| 511 size_t max_threads, | 574 size_t max_threads, |
| 512 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 575 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
| 513 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 576 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 514 | 577 |
| 515 DCHECK(workers_.empty()); | 578 DCHECK(workers_.empty()); |
| 516 | 579 |
| 517 for (size_t i = 0; i < max_threads; ++i) { | 580 for (size_t i = 0; i < max_threads; ++i) { |
| 518 std::unique_ptr<SchedulerWorker> worker = | 581 std::unique_ptr<SchedulerWorker> worker = |
| 519 SchedulerWorker::Create( | 582 SchedulerWorker::Create( |
| 520 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( | 583 thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
| 521 this, re_enqueue_sequence_callback, | 584 this, re_enqueue_sequence_callback, |
| 522 &shared_priority_queue_, static_cast<int>(i))), | 585 &shared_priority_queue_, static_cast<int>(i))), |
| 523 task_tracker_, | 586 task_tracker_, |
| 524 SchedulerWorker::InitialState::ALIVE); | 587 i == 0 |
| 588 ? SchedulerWorker::InitialState::ALIVE | |
| 589 : SchedulerWorker::InitialState::DETACHED); | |
| 525 if (!worker) | 590 if (!worker) |
| 526 break; | 591 break; |
| 527 idle_workers_stack_.Push(worker.get()); | 592 idle_workers_stack_.Push(worker.get()); |
| 528 workers_.push_back(std::move(worker)); | 593 workers_.push_back(std::move(worker)); |
| 529 } | 594 } |
| 530 | 595 |
| 531 #if DCHECK_IS_ON() | 596 #if DCHECK_IS_ON() |
| 532 workers_created_.Signal(); | 597 workers_created_.Signal(); |
| 533 #endif | 598 #endif |
| 534 | 599 |
| 535 return !workers_.empty(); | 600 return !workers_.empty(); |
| 536 } | 601 } |
| 537 | 602 |
| 538 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 603 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| 539 SchedulerWorker* worker; | 604 SchedulerWorker* worker; |
| 540 { | 605 { |
| 541 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 606 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 542 worker = idle_workers_stack_.Pop(); | 607 worker = idle_workers_stack_.Pop(); |
| 543 } | 608 } |
| 544 if (worker) | 609 if (worker) |
| 545 worker->WakeUp(); | 610 worker->WakeUp(); |
| 546 } | 611 } |
| 547 | 612 |
| 548 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( | 613 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| 549 SchedulerWorker* worker) { | 614 SchedulerWorker* worker) { |
| 550 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 615 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 551 idle_workers_stack_.Push(worker); | 616 // Detachment may cause multiple attempts to add because the delegate cannot |
|
gab
2016/07/13 18:36:31
Shouldn't a thread that wakes up after it's sleep
robliao
2016/07/13 20:19:46
We can't distinguish between sleep timeouts and ac
| |
| 617 // determine who woke it up. As a result, when it wakes up, it may conclude | |
| 618 // there's no work to be done and attempt to add itself to the idle stack | |
| 619 // again. | |
| 620 if (!idle_workers_stack_.Contains(worker)) | |
| 621 idle_workers_stack_.Push(worker); | |
| 622 | |
| 552 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); | 623 DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| 553 | 624 |
| 554 if (idle_workers_stack_.Size() == workers_.size()) | 625 if (idle_workers_stack_.Size() == workers_.size()) |
| 555 idle_workers_stack_cv_for_testing_->Broadcast(); | 626 idle_workers_stack_cv_for_testing_->Broadcast(); |
| 556 } | 627 } |
| 557 | 628 |
| 629 const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { | |
| 630 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | |
| 631 return idle_workers_stack_.Peek(); | |
| 632 } | |
| 633 | |
| 558 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( | 634 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| 559 SchedulerWorker* worker) { | 635 SchedulerWorker* worker) { |
| 560 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 636 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 561 idle_workers_stack_.Remove(worker); | 637 idle_workers_stack_.Remove(worker); |
| 562 } | 638 } |
| 563 | 639 |
| 640 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | |
| 641 AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); | |
| 642 return worker_detachment_allowed_; | |
| 643 } | |
| 644 | |
| 564 } // namespace internal | 645 } // namespace internal |
| 565 } // namespace base | 646 } // namespace base |
| OLD | NEW |