Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1806473002: TaskScheduler [9] Delayed Task Manager (Closed) Base URL: https://luckyluke-private.googlesource.com/src@s_5_worker_thread
Patch Set: rebase Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool.h" 5 #include "base/task_scheduler/scheduler_thread_pool.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
(...skipping 13 matching lines...) Expand all
24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky 24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; 25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
26 26
27 // A task runner that runs tasks with the PARALLEL ExecutionMode. 27 // A task runner that runs tasks with the PARALLEL ExecutionMode.
28 class SchedulerParallelTaskRunner : public TaskRunner { 28 class SchedulerParallelTaskRunner : public TaskRunner {
29 public: 29 public:
30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
31 // long as |executor| is alive. 31 // long as |executor| is alive.
32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. 32 // TODO(robliao): Find a concrete way to manage |executor|'s memory.
33 SchedulerParallelTaskRunner(const TaskTraits& traits, 33 SchedulerParallelTaskRunner(const TaskTraits& traits,
34 SchedulerTaskExecutor* executor,
34 TaskTracker* task_tracker, 35 TaskTracker* task_tracker,
35 SchedulerTaskExecutor* executor) 36 DelayedTaskManager* delayed_task_manager)
36 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} 37 : traits_(traits),
38 executor_(executor),
39 task_tracker_(task_tracker),
40 delayed_task_manager_(delayed_task_manager) {}
37 41
38 // TaskRunner: 42 // TaskRunner:
39 bool PostDelayedTask(const tracked_objects::Location& from_here, 43 bool PostDelayedTask(const tracked_objects::Location& from_here,
40 const Closure& closure, 44 const Closure& closure,
41 TimeDelta delay) override { 45 TimeDelta delay) override {
42 // Post the task as part of a one-off single-task Sequence. 46 // Post the task as part of a one-off single-task Sequence.
43 return PostTaskToExecutor(from_here, closure, traits_, delay, 47 return PostTaskToExecutor(from_here, closure, traits_, delay,
44 make_scoped_refptr(new Sequence), executor_, 48 make_scoped_refptr(new Sequence), executor_,
45 task_tracker_); 49 task_tracker_, delayed_task_manager_);
46 } 50 }
47 51
48 bool RunsTasksOnCurrentThread() const override { 52 bool RunsTasksOnCurrentThread() const override {
49 return tls_current_thread_pool.Get().Get() == executor_; 53 return tls_current_thread_pool.Get().Get() == executor_;
50 } 54 }
51 55
52 private: 56 private:
53 ~SchedulerParallelTaskRunner() override = default; 57 ~SchedulerParallelTaskRunner() override = default;
54 58
55 const TaskTraits traits_; 59 const TaskTraits traits_;
60 SchedulerTaskExecutor* const executor_;
56 TaskTracker* const task_tracker_; 61 TaskTracker* const task_tracker_;
57 SchedulerTaskExecutor* const executor_; 62 DelayedTaskManager* const delayed_task_manager_;
58 63
59 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); 64 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
60 }; 65 };
61 66
62 // A task runner that runs tasks with the SEQUENCED ExecutionMode. 67 // A task runner that runs tasks with the SEQUENCED ExecutionMode.
63 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { 68 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
64 public: 69 public:
65 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 70 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
66 // long as |executor| is alive. 71 // long as |executor| is alive.
67 // TODO(robliao): Find a concrete way to manage |executor|'s memory. 72 // TODO(robliao): Find a concrete way to manage |executor|'s memory.
68 SchedulerSequencedTaskRunner(const TaskTraits& traits, 73 SchedulerSequencedTaskRunner(const TaskTraits& traits,
74 SchedulerTaskExecutor* executor,
69 TaskTracker* task_tracker, 75 TaskTracker* task_tracker,
70 SchedulerTaskExecutor* executor) 76 DelayedTaskManager* delayed_task_manager)
71 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} 77 : traits_(traits),
78 executor_(executor),
79 task_tracker_(task_tracker),
80 delayed_task_manager_(delayed_task_manager) {}
72 81
73 // SequencedTaskRunner: 82 // SequencedTaskRunner:
74 bool PostDelayedTask(const tracked_objects::Location& from_here, 83 bool PostDelayedTask(const tracked_objects::Location& from_here,
75 const Closure& closure, 84 const Closure& closure,
76 TimeDelta delay) override { 85 TimeDelta delay) override {
77 // Post the task as part of |sequence|. 86 // Post the task as part of |sequence|.
78 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, 87 return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_,
79 executor_, task_tracker_); 88 executor_, task_tracker_, delayed_task_manager_);
80 } 89 }
81 90
82 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 91 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
83 const Closure& closure, 92 const Closure& closure,
84 base::TimeDelta delay) override { 93 base::TimeDelta delay) override {
85 // Tasks are never nested within the task scheduler. 94 // Tasks are never nested within the task scheduler.
86 return PostDelayedTask(from_here, closure, delay); 95 return PostDelayedTask(from_here, closure, delay);
87 } 96 }
88 97
89 bool RunsTasksOnCurrentThread() const override { 98 bool RunsTasksOnCurrentThread() const override {
90 return tls_current_thread_pool.Get().Get() == executor_; 99 return tls_current_thread_pool.Get().Get() == executor_;
91 } 100 }
92 101
93 private: 102 private:
94 ~SchedulerSequencedTaskRunner() override = default; 103 ~SchedulerSequencedTaskRunner() override = default;
95 104
96 // Sequence for all Tasks posted through this TaskRunner. 105 // Sequence for all Tasks posted through this TaskRunner.
97 const scoped_refptr<Sequence> sequence_ = new Sequence; 106 const scoped_refptr<Sequence> sequence_ = new Sequence;
98 107
99 const TaskTraits traits_; 108 const TaskTraits traits_;
109 SchedulerTaskExecutor* const executor_;
100 TaskTracker* const task_tracker_; 110 TaskTracker* const task_tracker_;
101 SchedulerTaskExecutor* const executor_; 111 DelayedTaskManager* const delayed_task_manager_;
102 112
103 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); 113 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
104 }; 114 };
105 115
106 } // namespace 116 } // namespace
107 117
108 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl 118 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
109 : public SchedulerWorkerThread::Delegate { 119 : public SchedulerWorkerThread::Delegate {
110 public: 120 public:
111 SchedulerWorkerThreadDelegateImpl( 121 SchedulerWorkerThreadDelegateImpl(
(...skipping 17 matching lines...) Expand all
129 SchedulerThreadPool::~SchedulerThreadPool() { 139 SchedulerThreadPool::~SchedulerThreadPool() {
130 // SchedulerThreadPool should never be deleted in production unless its 140 // SchedulerThreadPool should never be deleted in production unless its
131 // initialization failed. 141 // initialization failed.
132 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 142 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
133 } 143 }
134 144
135 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( 145 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
136 ThreadPriority thread_priority, 146 ThreadPriority thread_priority,
137 size_t max_threads, 147 size_t max_threads,
138 const EnqueueSequenceCallback& enqueue_sequence_callback, 148 const EnqueueSequenceCallback& enqueue_sequence_callback,
139 TaskTracker* task_tracker) { 149 TaskTracker* task_tracker,
140 std::unique_ptr<SchedulerThreadPool> thread_pool( 150 DelayedTaskManager* delayed_task_manager) {
141 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); 151 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool(
152 enqueue_sequence_callback, task_tracker, delayed_task_manager));
142 if (thread_pool->Initialize(thread_priority, max_threads)) 153 if (thread_pool->Initialize(thread_priority, max_threads))
143 return thread_pool; 154 return thread_pool;
144 return nullptr; 155 return nullptr;
145 } 156 }
146 157
147 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( 158 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
148 const TaskTraits& traits, 159 const TaskTraits& traits,
149 ExecutionMode execution_mode) { 160 ExecutionMode execution_mode) {
150 switch (execution_mode) { 161 switch (execution_mode) {
151 case ExecutionMode::PARALLEL: 162 case ExecutionMode::PARALLEL:
152 return make_scoped_refptr( 163 return make_scoped_refptr(new SchedulerParallelTaskRunner(
153 new SchedulerParallelTaskRunner(traits, task_tracker_, this)); 164 traits, this, task_tracker_, delayed_task_manager_));
154 165
155 case ExecutionMode::SEQUENCED: 166 case ExecutionMode::SEQUENCED:
156 return make_scoped_refptr( 167 return make_scoped_refptr(new SchedulerSequencedTaskRunner(
157 new SchedulerSequencedTaskRunner(traits, task_tracker_, this)); 168 traits, this, task_tracker_, delayed_task_manager_));
158 169
159 case ExecutionMode::SINGLE_THREADED: 170 case ExecutionMode::SINGLE_THREADED:
160 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. 171 // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
161 NOTREACHED(); 172 NOTREACHED();
162 return nullptr; 173 return nullptr;
163 } 174 }
164 175
165 NOTREACHED(); 176 NOTREACHED();
166 return nullptr; 177 return nullptr;
167 } 178 }
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
256 return sequence_and_sort_key.sequence; 267 return sequence_and_sort_key.sequence;
257 } 268 }
258 269
259 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( 270 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
260 scoped_refptr<Sequence> sequence) { 271 scoped_refptr<Sequence> sequence) {
261 enqueue_sequence_callback_.Run(std::move(sequence)); 272 enqueue_sequence_callback_.Run(std::move(sequence));
262 } 273 }
263 274
264 SchedulerThreadPool::SchedulerThreadPool( 275 SchedulerThreadPool::SchedulerThreadPool(
265 const EnqueueSequenceCallback& enqueue_sequence_callback, 276 const EnqueueSequenceCallback& enqueue_sequence_callback,
266 TaskTracker* task_tracker) 277 TaskTracker* task_tracker,
278 DelayedTaskManager* delayed_task_manager)
267 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), 279 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
268 idle_worker_threads_stack_cv_for_testing_( 280 idle_worker_threads_stack_cv_for_testing_(
269 idle_worker_threads_stack_lock_.CreateConditionVariable()), 281 idle_worker_threads_stack_lock_.CreateConditionVariable()),
270 join_for_testing_returned_(true, false), 282 join_for_testing_returned_(true, false),
271 worker_thread_delegate_( 283 worker_thread_delegate_(
272 new SchedulerWorkerThreadDelegateImpl(this, 284 new SchedulerWorkerThreadDelegateImpl(this,
273 enqueue_sequence_callback)), 285 enqueue_sequence_callback)),
274 task_tracker_(task_tracker) { 286 task_tracker_(task_tracker),
287 delayed_task_manager_(delayed_task_manager) {
275 DCHECK(task_tracker_); 288 DCHECK(task_tracker_);
289 DCHECK(delayed_task_manager_);
276 } 290 }
277 291
278 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, 292 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
279 size_t max_threads) { 293 size_t max_threads) {
280 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 294 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
281 295
282 DCHECK(worker_threads_.empty()); 296 DCHECK(worker_threads_.empty());
283 297
284 for (size_t i = 0; i < max_threads; ++i) { 298 for (size_t i = 0; i < max_threads; ++i) {
285 std::unique_ptr<SchedulerWorkerThread> worker_thread = 299 std::unique_ptr<SchedulerWorkerThread> worker_thread =
(...skipping 30 matching lines...) Expand all
316 if (idle_worker_threads_stack_.empty()) 330 if (idle_worker_threads_stack_.empty())
317 return nullptr; 331 return nullptr;
318 332
319 auto worker_thread = idle_worker_threads_stack_.top(); 333 auto worker_thread = idle_worker_threads_stack_.top();
320 idle_worker_threads_stack_.pop(); 334 idle_worker_threads_stack_.pop();
321 return worker_thread; 335 return worker_thread;
322 } 336 }
323 337
324 } // namespace internal 338 } // namespace internal
325 } // namespace base 339 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698