OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_thread_pool.h" | 5 #include "base/task_scheduler/scheduler_thread_pool.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/bind_helpers.h" | 10 #include "base/bind_helpers.h" |
(...skipping 13 matching lines...) Expand all Loading... |
24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky | 24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; | 25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; |
26 | 26 |
27 // A task runner that runs tasks with the PARALLEL ExecutionMode. | 27 // A task runner that runs tasks with the PARALLEL ExecutionMode. |
28 class SchedulerParallelTaskRunner : public TaskRunner { | 28 class SchedulerParallelTaskRunner : public TaskRunner { |
29 public: | 29 public: |
30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
31 // long as |executor| is alive. | 31 // long as |executor| is alive. |
32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. | 32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
33 SchedulerParallelTaskRunner(const TaskTraits& traits, | 33 SchedulerParallelTaskRunner(const TaskTraits& traits, |
| 34 SchedulerTaskExecutor* executor, |
34 TaskTracker* task_tracker, | 35 TaskTracker* task_tracker, |
35 SchedulerTaskExecutor* executor) | 36 DelayedTaskManager* delayed_task_manager) |
36 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} | 37 : traits_(traits), |
| 38 executor_(executor), |
| 39 task_tracker_(task_tracker), |
| 40 delayed_task_manager_(delayed_task_manager) {} |
37 | 41 |
38 // TaskRunner: | 42 // TaskRunner: |
39 bool PostDelayedTask(const tracked_objects::Location& from_here, | 43 bool PostDelayedTask(const tracked_objects::Location& from_here, |
40 const Closure& closure, | 44 const Closure& closure, |
41 TimeDelta delay) override { | 45 TimeDelta delay) override { |
42 // Post the task as part of a one-off single-task Sequence. | 46 // Post the task as part of a one-off single-task Sequence. |
43 return PostTaskToExecutor( | 47 return PostTaskToExecutor( |
44 WrapUnique( | 48 WrapUnique( |
45 new Task(from_here, closure, traits_, | 49 new Task(from_here, closure, traits_, |
46 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), | 50 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), |
47 make_scoped_refptr(new Sequence), executor_, task_tracker_); | 51 make_scoped_refptr(new Sequence), executor_, task_tracker_, |
| 52 delayed_task_manager_); |
48 } | 53 } |
49 | 54 |
50 bool RunsTasksOnCurrentThread() const override { | 55 bool RunsTasksOnCurrentThread() const override { |
51 return tls_current_thread_pool.Get().Get() == executor_; | 56 return tls_current_thread_pool.Get().Get() == executor_; |
52 } | 57 } |
53 | 58 |
54 private: | 59 private: |
55 ~SchedulerParallelTaskRunner() override = default; | 60 ~SchedulerParallelTaskRunner() override = default; |
56 | 61 |
57 const TaskTraits traits_; | 62 const TaskTraits traits_; |
| 63 SchedulerTaskExecutor* const executor_; |
58 TaskTracker* const task_tracker_; | 64 TaskTracker* const task_tracker_; |
59 SchedulerTaskExecutor* const executor_; | 65 DelayedTaskManager* const delayed_task_manager_; |
60 | 66 |
61 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 67 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
62 }; | 68 }; |
63 | 69 |
64 // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 70 // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
65 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 71 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
66 public: | 72 public: |
67 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 73 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
68 // long as |executor| is alive. | 74 // long as |executor| is alive. |
69 // TODO(robliao): Find a concrete way to manage |executor|'s memory. | 75 // TODO(robliao): Find a concrete way to manage |executor|'s memory. |
70 SchedulerSequencedTaskRunner(const TaskTraits& traits, | 76 SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| 77 SchedulerTaskExecutor* executor, |
71 TaskTracker* task_tracker, | 78 TaskTracker* task_tracker, |
72 SchedulerTaskExecutor* executor) | 79 DelayedTaskManager* delayed_task_manager) |
73 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} | 80 : traits_(traits), |
| 81 executor_(executor), |
| 82 task_tracker_(task_tracker), |
| 83 delayed_task_manager_(delayed_task_manager) {} |
74 | 84 |
75 // SequencedTaskRunner: | 85 // SequencedTaskRunner: |
76 bool PostDelayedTask(const tracked_objects::Location& from_here, | 86 bool PostDelayedTask(const tracked_objects::Location& from_here, |
77 const Closure& closure, | 87 const Closure& closure, |
78 TimeDelta delay) override { | 88 TimeDelta delay) override { |
79 // Post the task as part of |sequence|. | 89 // Post the task as part of |sequence|. |
80 return PostTaskToExecutor( | 90 return PostTaskToExecutor( |
81 WrapUnique( | 91 WrapUnique( |
82 new Task(from_here, closure, traits_, | 92 new Task(from_here, closure, traits_, |
83 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), | 93 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), |
84 sequence_, executor_, task_tracker_); | 94 sequence_, executor_, task_tracker_, delayed_task_manager_); |
85 } | 95 } |
86 | 96 |
87 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 97 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
88 const Closure& closure, | 98 const Closure& closure, |
89 base::TimeDelta delay) override { | 99 base::TimeDelta delay) override { |
90 // Tasks are never nested within the task scheduler. | 100 // Tasks are never nested within the task scheduler. |
91 return PostDelayedTask(from_here, closure, delay); | 101 return PostDelayedTask(from_here, closure, delay); |
92 } | 102 } |
93 | 103 |
94 bool RunsTasksOnCurrentThread() const override { | 104 bool RunsTasksOnCurrentThread() const override { |
95 return tls_current_thread_pool.Get().Get() == executor_; | 105 return tls_current_thread_pool.Get().Get() == executor_; |
96 } | 106 } |
97 | 107 |
98 private: | 108 private: |
99 ~SchedulerSequencedTaskRunner() override = default; | 109 ~SchedulerSequencedTaskRunner() override = default; |
100 | 110 |
101 // Sequence for all Tasks posted through this TaskRunner. | 111 // Sequence for all Tasks posted through this TaskRunner. |
102 const scoped_refptr<Sequence> sequence_ = new Sequence; | 112 const scoped_refptr<Sequence> sequence_ = new Sequence; |
103 | 113 |
104 const TaskTraits traits_; | 114 const TaskTraits traits_; |
| 115 SchedulerTaskExecutor* const executor_; |
105 TaskTracker* const task_tracker_; | 116 TaskTracker* const task_tracker_; |
106 SchedulerTaskExecutor* const executor_; | 117 DelayedTaskManager* const delayed_task_manager_; |
107 | 118 |
108 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 119 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
109 }; | 120 }; |
110 | 121 |
111 } // namespace | 122 } // namespace |
112 | 123 |
113 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl | 124 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl |
114 : public SchedulerWorkerThread::Delegate { | 125 : public SchedulerWorkerThread::Delegate { |
115 public: | 126 public: |
116 SchedulerWorkerThreadDelegateImpl( | 127 SchedulerWorkerThreadDelegateImpl( |
(...skipping 17 matching lines...) Expand all Loading... |
134 SchedulerThreadPool::~SchedulerThreadPool() { | 145 SchedulerThreadPool::~SchedulerThreadPool() { |
135 // SchedulerThreadPool should never be deleted in production unless its | 146 // SchedulerThreadPool should never be deleted in production unless its |
136 // initialization failed. | 147 // initialization failed. |
137 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); | 148 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); |
138 } | 149 } |
139 | 150 |
140 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( | 151 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( |
141 ThreadPriority thread_priority, | 152 ThreadPriority thread_priority, |
142 size_t max_threads, | 153 size_t max_threads, |
143 const EnqueueSequenceCallback& enqueue_sequence_callback, | 154 const EnqueueSequenceCallback& enqueue_sequence_callback, |
144 TaskTracker* task_tracker) { | 155 TaskTracker* task_tracker, |
145 std::unique_ptr<SchedulerThreadPool> thread_pool( | 156 DelayedTaskManager* delayed_task_manager) { |
146 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); | 157 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool( |
| 158 enqueue_sequence_callback, task_tracker, delayed_task_manager)); |
147 if (thread_pool->Initialize(thread_priority, max_threads)) | 159 if (thread_pool->Initialize(thread_priority, max_threads)) |
148 return thread_pool; | 160 return thread_pool; |
149 return nullptr; | 161 return nullptr; |
150 } | 162 } |
151 | 163 |
152 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( | 164 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( |
153 const TaskTraits& traits, | 165 const TaskTraits& traits, |
154 ExecutionMode execution_mode) { | 166 ExecutionMode execution_mode) { |
155 switch (execution_mode) { | 167 switch (execution_mode) { |
156 case ExecutionMode::PARALLEL: | 168 case ExecutionMode::PARALLEL: |
157 return make_scoped_refptr( | 169 return make_scoped_refptr(new SchedulerParallelTaskRunner( |
158 new SchedulerParallelTaskRunner(traits, task_tracker_, this)); | 170 traits, this, task_tracker_, delayed_task_manager_)); |
159 | 171 |
160 case ExecutionMode::SEQUENCED: | 172 case ExecutionMode::SEQUENCED: |
161 return make_scoped_refptr( | 173 return make_scoped_refptr(new SchedulerSequencedTaskRunner( |
162 new SchedulerSequencedTaskRunner(traits, task_tracker_, this)); | 174 traits, this, task_tracker_, delayed_task_manager_)); |
163 | 175 |
164 case ExecutionMode::SINGLE_THREADED: | 176 case ExecutionMode::SINGLE_THREADED: |
165 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. | 177 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. |
166 NOTREACHED(); | 178 NOTREACHED(); |
167 return nullptr; | 179 return nullptr; |
168 } | 180 } |
169 | 181 |
170 NOTREACHED(); | 182 NOTREACHED(); |
171 return nullptr; | 183 return nullptr; |
172 } | 184 } |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
261 return sequence_and_sort_key.sequence; | 273 return sequence_and_sort_key.sequence; |
262 } | 274 } |
263 | 275 |
264 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( | 276 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( |
265 scoped_refptr<Sequence> sequence) { | 277 scoped_refptr<Sequence> sequence) { |
266 enqueue_sequence_callback_.Run(std::move(sequence)); | 278 enqueue_sequence_callback_.Run(std::move(sequence)); |
267 } | 279 } |
268 | 280 |
269 SchedulerThreadPool::SchedulerThreadPool( | 281 SchedulerThreadPool::SchedulerThreadPool( |
270 const EnqueueSequenceCallback& enqueue_sequence_callback, | 282 const EnqueueSequenceCallback& enqueue_sequence_callback, |
271 TaskTracker* task_tracker) | 283 TaskTracker* task_tracker, |
| 284 DelayedTaskManager* delayed_task_manager) |
272 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), | 285 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), |
273 idle_worker_threads_stack_cv_for_testing_( | 286 idle_worker_threads_stack_cv_for_testing_( |
274 idle_worker_threads_stack_lock_.CreateConditionVariable()), | 287 idle_worker_threads_stack_lock_.CreateConditionVariable()), |
275 join_for_testing_returned_(true, false), | 288 join_for_testing_returned_(true, false), |
276 worker_thread_delegate_( | 289 worker_thread_delegate_( |
277 new SchedulerWorkerThreadDelegateImpl(this, | 290 new SchedulerWorkerThreadDelegateImpl(this, |
278 enqueue_sequence_callback)), | 291 enqueue_sequence_callback)), |
279 task_tracker_(task_tracker) { | 292 task_tracker_(task_tracker), |
| 293 delayed_task_manager_(delayed_task_manager) { |
280 DCHECK(task_tracker_); | 294 DCHECK(task_tracker_); |
| 295 DCHECK(delayed_task_manager_); |
281 } | 296 } |
282 | 297 |
283 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, | 298 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, |
284 size_t max_threads) { | 299 size_t max_threads) { |
285 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); | 300 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); |
286 | 301 |
287 DCHECK(worker_threads_.empty()); | 302 DCHECK(worker_threads_.empty()); |
288 | 303 |
289 for (size_t i = 0; i < max_threads; ++i) { | 304 for (size_t i = 0; i < max_threads; ++i) { |
290 std::unique_ptr<SchedulerWorkerThread> worker_thread = | 305 std::unique_ptr<SchedulerWorkerThread> worker_thread = |
(...skipping 30 matching lines...) Expand all Loading... |
321 if (idle_worker_threads_stack_.empty()) | 336 if (idle_worker_threads_stack_.empty()) |
322 return nullptr; | 337 return nullptr; |
323 | 338 |
324 auto worker_thread = idle_worker_threads_stack_.top(); | 339 auto worker_thread = idle_worker_threads_stack_.top(); |
325 idle_worker_threads_stack_.pop(); | 340 idle_worker_threads_stack_.pop(); |
326 return worker_thread; | 341 return worker_thread; |
327 } | 342 } |
328 | 343 |
329 } // namespace internal | 344 } // namespace internal |
330 } // namespace base | 345 } // namespace base |
OLD | NEW |