OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_thread_pool.h" | 5 #include "base/task_scheduler/scheduler_thread_pool.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/bind_helpers.h" | 10 #include "base/bind_helpers.h" |
(...skipping 13 matching lines...) Expand all Loading... |
24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky | 24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; | 25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; |
26 | 26 |
27 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 27 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
28 class SchedulerParallelTaskRunner : public TaskRunner { | 28 class SchedulerParallelTaskRunner : public TaskRunner { |
29 public: | 29 public: |
30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
31 // long as |executor| is alive. | 31 // long as |executor| is alive. |
32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. | 32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
33 SchedulerParallelTaskRunner(const TaskTraits& traits, | 33 SchedulerParallelTaskRunner(const TaskTraits& traits, |
| 34 SchedulerTaskExecutor* executor, |
34 TaskTracker* task_tracker, | 35 TaskTracker* task_tracker, |
35 SchedulerTaskExecutor* executor) | 36 DelayedTaskManager* delayed_task_manager) |
36 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} | 37 : traits_(traits), |
| 38 executor_(executor), |
| 39 task_tracker_(task_tracker), |
| 40 delayed_task_manager_(delayed_task_manager) {} |
37 | 41 |
38 // TaskRunner: | 42 // TaskRunner: |
39 bool PostDelayedTask(const tracked_objects::Location& from_here, | 43 bool PostDelayedTask(const tracked_objects::Location& from_here, |
40 const Closure& closure, | 44 const Closure& closure, |
41 TimeDelta delay) override { | 45 TimeDelta delay) override { |
42 // Post the task as part of a one-off single-task Sequence. | 46 // Post the task as part of a one-off single-task Sequence. |
43 return PostTaskToExecutor(from_here, closure, traits_, delay, | 47 return PostTaskToExecutor(from_here, closure, traits_, delay, |
44 make_scoped_refptr(new Sequence), executor_, | 48 make_scoped_refptr(new Sequence), executor_, |
45 task_tracker_); | 49 task_tracker_, delayed_task_manager_); |
46 } | 50 } |
47 | 51 |
48 bool RunsTasksOnCurrentThread() const override { | 52 bool RunsTasksOnCurrentThread() const override { |
49 return tls_current_thread_pool.Get().Get() == executor_; | 53 return tls_current_thread_pool.Get().Get() == executor_; |
50 } | 54 } |
51 | 55 |
52 private: | 56 private: |
53 ~SchedulerParallelTaskRunner() override = default; | 57 ~SchedulerParallelTaskRunner() override = default; |
54 | 58 |
55 const TaskTraits traits_; | 59 const TaskTraits traits_; |
| 60 SchedulerTaskExecutor* const executor_; |
56 TaskTracker* const task_tracker_; | 61 TaskTracker* const task_tracker_; |
57 SchedulerTaskExecutor* const executor_; | 62 DelayedTaskManager* const delayed_task_manager_; |
58 | 63 |
59 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 64 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
60 }; | 65 }; |
61 | 66 |
62 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 67 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
63 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 68 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
64 public: | 69 public: |
65 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 70 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
66 // long as |executor| is alive. | 71 // long as |executor| is alive. |
67 // TODO(robliao): Find a concrete way to manage |executor|'s memory. | 72 // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
68 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 73 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| 74 SchedulerTaskExecutor* executor, |
69 TaskTracker* task_tracker, | 75 TaskTracker* task_tracker, |
70 SchedulerTaskExecutor* executor) | 76 DelayedTaskManager* delayed_task_manager) |
71 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} | 77 : traits_(traits), |
| 78 executor_(executor), |
| 79 task_tracker_(task_tracker), |
| 80 delayed_task_manager_(delayed_task_manager) {} |
72 | 81 |
73 // SequencedTaskRunner: | 82 // SequencedTaskRunner: |
74 bool PostDelayedTask(const tracked_objects::Location& from_here, | 83 bool PostDelayedTask(const tracked_objects::Location& from_here, |
75 const Closure& closure, | 84 const Closure& closure, |
76 TimeDelta delay) override { | 85 TimeDelta delay) override { |
77 // Post the task as part of |sequence|. | 86 // Post the task as part of |sequence|. |
78 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, | 87 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, |
79 executor_, task_tracker_); | 88 executor_, task_tracker_, delayed_task_manager_); |
80 } | 89 } |
81 | 90 |
82 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 91 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
83 const Closure& closure, | 92 const Closure& closure, |
84 base::TimeDelta delay) override { | 93 base::TimeDelta delay) override { |
85 // Tasks are never nested within the task scheduler. | 94 // Tasks are never nested within the task scheduler. |
86 return PostDelayedTask(from_here, closure, delay); | 95 return PostDelayedTask(from_here, closure, delay); |
87 } | 96 } |
88 | 97 |
89 bool RunsTasksOnCurrentThread() const override { | 98 bool RunsTasksOnCurrentThread() const override { |
90 return tls_current_thread_pool.Get().Get() == executor_; | 99 return tls_current_thread_pool.Get().Get() == executor_; |
91 } | 100 } |
92 | 101 |
93 private: | 102 private: |
94 ~SchedulerSequencedTaskRunner() override = default; | 103 ~SchedulerSequencedTaskRunner() override = default; |
95 | 104 |
96 // Sequence for all Tasks posted through this TaskRunner. | 105 // Sequence for all Tasks posted through this TaskRunner. |
97 const scoped_refptr<Sequence> sequence_ = new Sequence; | 106 const scoped_refptr<Sequence> sequence_ = new Sequence; |
98 | 107 |
99 const TaskTraits traits_; | 108 const TaskTraits traits_; |
| 109 SchedulerTaskExecutor* const executor_; |
100 TaskTracker* const task_tracker_; | 110 TaskTracker* const task_tracker_; |
101 SchedulerTaskExecutor* const executor_; | 111 DelayedTaskManager* const delayed_task_manager_; |
102 | 112 |
103 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 113 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
104 }; | 114 }; |
105 | 115 |
106 } // namespace | 116 } // namespace |
107 | 117 |
108 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl | 118 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl |
109 : public SchedulerWorkerThread::Delegate { | 119 : public SchedulerWorkerThread::Delegate { |
110 public: | 120 public: |
111 SchedulerWorkerThreadDelegateImpl( | 121 SchedulerWorkerThreadDelegateImpl( |
(...skipping 17 matching lines...) Expand all Loading... |
129 SchedulerThreadPool::~SchedulerThreadPool() { | 139 SchedulerThreadPool::~SchedulerThreadPool() { |
130 // SchedulerThreadPool should never be deleted in production unless its | 140 // SchedulerThreadPool should never be deleted in production unless its |
131 // initialization failed. | 141 // initialization failed. |
132 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 142 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
133 } | 143 } |
134 | 144 |
135 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( | 145 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( |
136 ThreadPriority thread_priority, | 146 ThreadPriority thread_priority, |
137 size_t max_threads, | 147 size_t max_threads, |
138 const EnqueueSequenceCallback& enqueue_sequence_callback, | 148 const EnqueueSequenceCallback& enqueue_sequence_callback, |
139 TaskTracker* task_tracker) { | 149 TaskTracker* task_tracker, |
140 std::unique_ptr<SchedulerThreadPool> thread_pool( | 150 DelayedTaskManager* delayed_task_manager) { |
141 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); | 151 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool( |
| 152 enqueue_sequence_callback, task_tracker, delayed_task_manager)); |
142 if (thread_pool->Initialize(thread_priority, max_threads)) | 153 if (thread_pool->Initialize(thread_priority, max_threads)) |
143 return thread_pool; | 154 return thread_pool; |
144 return nullptr; | 155 return nullptr; |
145 } | 156 } |
146 | 157 |
147 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( | 158 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
148 const TaskTraits& traits, | 159 const TaskTraits& traits, |
149 ExecutionMode execution_mode) { | 160 ExecutionMode execution_mode) { |
150 switch (execution_mode) { | 161 switch (execution_mode) { |
151 case ExecutionMode::PARALLEL: | 162 case ExecutionMode::PARALLEL: |
152 return make_scoped_refptr( | 163 return make_scoped_refptr(new SchedulerParallelTaskRunner( |
153 new SchedulerParallelTaskRunner(traits, task_tracker_, this)); | 164 traits, this, task_tracker_, delayed_task_manager_)); |
154 | 165 |
155 case ExecutionMode::SEQUENCED: | 166 case ExecutionMode::SEQUENCED: |
156 return make_scoped_refptr( | 167 return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
157 new SchedulerSequencedTaskRunner(traits, task_tracker_, this)); | 168 traits, this, task_tracker_, delayed_task_manager_)); |
158 | 169 |
159 case ExecutionMode::SINGLE_THREADED: | 170 case ExecutionMode::SINGLE_THREADED: |
160 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. | 171 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
161 NOTREACHED(); | 172 NOTREACHED(); |
162 return nullptr; | 173 return nullptr; |
163 } | 174 } |
164 | 175 |
165 NOTREACHED(); | 176 NOTREACHED(); |
166 return nullptr; | 177 return nullptr; |
167 } | 178 } |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
256 return sequence_and_sort_key.sequence; | 267 return sequence_and_sort_key.sequence; |
257 } | 268 } |
258 | 269 |
259 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( | 270 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
260 scoped_refptr<Sequence> sequence) { | 271 scoped_refptr<Sequence> sequence) { |
261 enqueue_sequence_callback_.Run(std::move(sequence)); | 272 enqueue_sequence_callback_.Run(std::move(sequence)); |
262 } | 273 } |
263 | 274 |
264 SchedulerThreadPool::SchedulerThreadPool( | 275 SchedulerThreadPool::SchedulerThreadPool( |
265 const EnqueueSequenceCallback& enqueue_sequence_callback, | 276 const EnqueueSequenceCallback& enqueue_sequence_callback, |
266 TaskTracker* task_tracker) | 277 TaskTracker* task_tracker, |
| 278 DelayedTaskManager* delayed_task_manager) |
267 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), | 279 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
268 idle_worker_threads_stack_cv_for_testing_( | 280 idle_worker_threads_stack_cv_for_testing_( |
269 idle_worker_threads_stack_lock_.CreateConditionVariable()), | 281 idle_worker_threads_stack_lock_.CreateConditionVariable()), |
270 join_for_testing_returned_(true, false), | 282 join_for_testing_returned_(true, false), |
271 worker_thread_delegate_( | 283 worker_thread_delegate_( |
272 new SchedulerWorkerThreadDelegateImpl(this, | 284 new SchedulerWorkerThreadDelegateImpl(this, |
273 enqueue_sequence_callback)), | 285 enqueue_sequence_callback)), |
274 task_tracker_(task_tracker) { | 286 task_tracker_(task_tracker), |
| 287 delayed_task_manager_(delayed_task_manager) { |
275 DCHECK(task_tracker_); | 288 DCHECK(task_tracker_); |
| 289 DCHECK(delayed_task_manager_); |
276 } | 290 } |
277 | 291 |
278 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, | 292 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
279 size_t max_threads) { | 293 size_t max_threads) { |
280 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 294 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
281 | 295 |
282 DCHECK(worker_threads_.empty()); | 296 DCHECK(worker_threads_.empty()); |
283 | 297 |
284 for (size_t i = 0; i < max_threads; ++i) { | 298 for (size_t i = 0; i < max_threads; ++i) { |
285 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 299 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
(...skipping 30 matching lines...) Expand all Loading... |
316 if (idle_worker_threads_stack_.empty()) | 330 if (idle_worker_threads_stack_.empty()) |
317 return nullptr; | 331 return nullptr; |
318 | 332 |
319 auto worker_thread = idle_worker_threads_stack_.top(); | 333 auto worker_thread = idle_worker_threads_stack_.top(); |
320 idle_worker_threads_stack_.pop(); | 334 idle_worker_threads_stack_.pop(); |
321 return worker_thread; | 335 return worker_thread; |
322 } | 336 } |
323 | 337 |
324 } // namespace internal | 338 } // namespace internal |
325 } // namespace base | 339 } // namespace base |
OLD | NEW |