Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1806473002: TaskScheduler [9] Delayed Task Manager (Closed) Base URL: https://luckyluke-private.googlesource.com/src@s_5_worker_thread
Patch Set: self review Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool.h" 5 #include "base/task_scheduler/scheduler_thread_pool.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
(...skipping 12 matching lines...) Expand all
23 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads 23 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads
24 // that don't belong to a SchedulerThreadPool. 24 // that don't belong to a SchedulerThreadPool.
25 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky 25 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky
26 tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER; 26 tls_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER;
27 27
28 // A task runner that runs tasks with the PARALLEL ExecutionMode. 28 // A task runner that runs tasks with the PARALLEL ExecutionMode.
29 class SchedulerParallelTaskRunner : public TaskRunner { 29 class SchedulerParallelTaskRunner : public TaskRunner {
30 public: 30 public:
31 SchedulerParallelTaskRunner(const TaskTraits& traits, 31 SchedulerParallelTaskRunner(const TaskTraits& traits,
32 PriorityQueue* priority_queue, 32 PriorityQueue* priority_queue,
33 TaskTracker* task_tracker) 33 TaskTracker* task_tracker,
34 DelayedTaskManager* delayed_task_manager)
34 : traits_(traits), 35 : traits_(traits),
35 priority_queue_(priority_queue), 36 priority_queue_(priority_queue),
36 task_tracker_(task_tracker) {} 37 task_tracker_(task_tracker),
38 delayed_task_manager_(delayed_task_manager) {}
37 39
38 // TaskRunner: 40 // TaskRunner:
39 bool PostDelayedTask(const tracked_objects::Location& from_here, 41 bool PostDelayedTask(const tracked_objects::Location& from_here,
40 const Closure& closure, 42 const Closure& closure,
41 TimeDelta delay) override { 43 TimeDelta delay) override {
42 // TODO(fdoray): Support delayed tasks. 44 // TODO(fdoray): Support delayed tasks.
gab 2016/04/11 18:28:30 Remove TODO
fdoray 2016/04/11 19:57:06 Delayed tasks are not supported until we have the
gab 2016/04/12 13:35:47 Ah right, but from this component's POV it *is* po
fdoray 2016/04/12 14:43:56 Done. Removed the DCHECK here and put it above Pos
43 DCHECK(delay.is_zero()); 45 DCHECK(delay.is_zero());
44 46
45 // Post the task as part of a one-off single-task Sequence. 47 // Post the task as part of a one-off single-task Sequence.
46 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), 48 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)),
47 make_scoped_refptr(new Sequence), priority_queue_, 49 make_scoped_refptr(new Sequence), priority_queue_,
48 task_tracker_); 50 task_tracker_, delayed_task_manager_);
49 } 51 }
50 52
51 bool RunsTasksOnCurrentThread() const override { 53 bool RunsTasksOnCurrentThread() const override {
52 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; 54 return tls_current_shared_priority_queue.Get().Get() == priority_queue_;
53 } 55 }
54 56
55 private: 57 private:
56 ~SchedulerParallelTaskRunner() override = default; 58 ~SchedulerParallelTaskRunner() override = default;
57 59
58 const TaskTraits traits_; 60 const TaskTraits traits_;
59 PriorityQueue* const priority_queue_; 61 PriorityQueue* const priority_queue_;
60 TaskTracker* const task_tracker_; 62 TaskTracker* const task_tracker_;
63 DelayedTaskManager* const delayed_task_manager_;
61 64
62 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); 65 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
63 }; 66 };
64 67
65 // A task runner that runs tasks with the SEQUENCED ExecutionMode. 68 // A task runner that runs tasks with the SEQUENCED ExecutionMode.
66 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { 69 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
67 public: 70 public:
68 SchedulerSequencedTaskRunner(const TaskTraits& traits, 71 SchedulerSequencedTaskRunner(const TaskTraits& traits,
69 PriorityQueue* priority_queue, 72 PriorityQueue* priority_queue,
70 TaskTracker* task_tracker) 73 TaskTracker* task_tracker,
74 DelayedTaskManager* delayed_task_manager)
71 : traits_(traits), 75 : traits_(traits),
72 priority_queue_(priority_queue), 76 priority_queue_(priority_queue),
73 task_tracker_(task_tracker) {} 77 task_tracker_(task_tracker),
78 delayed_task_manager_(delayed_task_manager) {}
74 79
75 // SequencedTaskRunner: 80 // SequencedTaskRunner:
76 bool PostDelayedTask(const tracked_objects::Location& from_here, 81 bool PostDelayedTask(const tracked_objects::Location& from_here,
77 const Closure& closure, 82 const Closure& closure,
78 TimeDelta delay) override { 83 TimeDelta delay) override {
79 // TODO(fdoray): Support delayed tasks. 84 // TODO(fdoray): Support delayed tasks.
80 DCHECK(delay.is_zero()); 85 DCHECK(delay.is_zero());
81 86
82 // Post the task as part of |sequence_|. 87 // Post the task as part of |sequence_|.
83 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)), 88 return PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)),
84 sequence_, priority_queue_, task_tracker_); 89 sequence_, priority_queue_, task_tracker_,
90 delayed_task_manager_);
85 } 91 }
86 92
87 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 93 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
88 const Closure& closure, 94 const Closure& closure,
89 base::TimeDelta delay) override { 95 base::TimeDelta delay) override {
90 // Tasks are never nested within the task scheduler. 96 // Tasks are never nested within the task scheduler.
91 return PostDelayedTask(from_here, closure, delay); 97 return PostDelayedTask(from_here, closure, delay);
92 } 98 }
93 99
94 bool RunsTasksOnCurrentThread() const override { 100 bool RunsTasksOnCurrentThread() const override {
95 return tls_current_shared_priority_queue.Get().Get() == priority_queue_; 101 return tls_current_shared_priority_queue.Get().Get() == priority_queue_;
96 } 102 }
97 103
98 private: 104 private:
99 ~SchedulerSequencedTaskRunner() override = default; 105 ~SchedulerSequencedTaskRunner() override = default;
100 106
101 // Sequence in which all Tasks posted through this TaskRunner are inserted. 107 // Sequence in which all Tasks posted through this TaskRunner are inserted.
102 const scoped_refptr<Sequence> sequence_ = new Sequence; 108 const scoped_refptr<Sequence> sequence_ = new Sequence;
103 109
104 const TaskTraits traits_; 110 const TaskTraits traits_;
105 PriorityQueue* const priority_queue_; 111 PriorityQueue* const priority_queue_;
106 TaskTracker* const task_tracker_; 112 TaskTracker* const task_tracker_;
113 DelayedTaskManager* const delayed_task_manager_;
107 114
108 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); 115 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
109 }; 116 };
110 117
111 } // namespace 118 } // namespace
112 119
113 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl 120 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
114 : public SchedulerWorkerThread::Delegate { 121 : public SchedulerWorkerThread::Delegate {
115 public: 122 public:
116 SchedulerWorkerThreadDelegateImpl( 123 SchedulerWorkerThreadDelegateImpl(
(...skipping 11 matching lines...) Expand all
128 SchedulerThreadPool* outer_; 135 SchedulerThreadPool* outer_;
129 const EnqueueSequenceCallback enqueue_sequence_callback_; 136 const EnqueueSequenceCallback enqueue_sequence_callback_;
130 137
131 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl); 138 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
132 }; 139 };
133 140
134 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( 141 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
135 ThreadPriority thread_priority, 142 ThreadPriority thread_priority,
136 size_t max_threads, 143 size_t max_threads,
137 const EnqueueSequenceCallback& enqueue_sequence_callback, 144 const EnqueueSequenceCallback& enqueue_sequence_callback,
138 TaskTracker* task_tracker) { 145 TaskTracker* task_tracker,
139 std::unique_ptr<SchedulerThreadPool> thread_pool( 146 DelayedTaskManager* delayed_task_manager) {
140 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); 147 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool(
148 enqueue_sequence_callback, task_tracker, delayed_task_manager));
141 if (thread_pool->Initialize(thread_priority, max_threads)) 149 if (thread_pool->Initialize(thread_priority, max_threads))
142 return thread_pool; 150 return thread_pool;
143 return nullptr; 151 return nullptr;
144 } 152 }
145 153
146 SchedulerThreadPool::~SchedulerThreadPool() { 154 SchedulerThreadPool::~SchedulerThreadPool() {
147 #if DCHECK_IS_ON() 155 #if DCHECK_IS_ON()
148 // SchedulerThreadPool should never be deleted in production unless its 156 // SchedulerThreadPool should never be deleted in production unless its
149 // initialization failed. 157 // initialization failed.
150 AutoSchedulerLock auto_lock(worker_threads_lock_); 158 AutoSchedulerLock auto_lock(worker_threads_lock_);
151 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 159 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
152 #endif // DCHECK_IS_ON() 160 #endif // DCHECK_IS_ON()
153 } 161 }
154 162
155 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( 163 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
156 const TaskTraits& traits, 164 const TaskTraits& traits,
157 ExecutionMode execution_mode) { 165 ExecutionMode execution_mode) {
158 switch (execution_mode) { 166 switch (execution_mode) {
159 case ExecutionMode::PARALLEL: 167 case ExecutionMode::PARALLEL:
160 return make_scoped_refptr(new SchedulerParallelTaskRunner( 168 return make_scoped_refptr(new SchedulerParallelTaskRunner(
161 traits, &shared_priority_queue_, task_tracker_)); 169 traits, &shared_priority_queue_, task_tracker_,
170 delayed_task_manager_));
162 171
163 case ExecutionMode::SEQUENCED: 172 case ExecutionMode::SEQUENCED:
164 return make_scoped_refptr(new SchedulerSequencedTaskRunner( 173 return make_scoped_refptr(new SchedulerSequencedTaskRunner(
165 traits, &shared_priority_queue_, task_tracker_)); 174 traits, &shared_priority_queue_, task_tracker_,
175 delayed_task_manager_));
166 176
167 case ExecutionMode::SINGLE_THREADED: 177 case ExecutionMode::SINGLE_THREADED:
168 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. 178 // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
169 NOTREACHED(); 179 NOTREACHED();
170 return nullptr; 180 return nullptr;
171 } 181 }
172 182
173 NOTREACHED(); 183 NOTREACHED();
174 return nullptr; 184 return nullptr;
175 } 185 }
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 return sequence_and_sort_key.sequence; 261 return sequence_and_sort_key.sequence;
252 } 262 }
253 263
254 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( 264 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
255 scoped_refptr<Sequence> sequence) { 265 scoped_refptr<Sequence> sequence) {
256 enqueue_sequence_callback_.Run(std::move(sequence)); 266 enqueue_sequence_callback_.Run(std::move(sequence));
257 } 267 }
258 268
259 SchedulerThreadPool::SchedulerThreadPool( 269 SchedulerThreadPool::SchedulerThreadPool(
260 const EnqueueSequenceCallback& enqueue_sequence_callback, 270 const EnqueueSequenceCallback& enqueue_sequence_callback,
261 TaskTracker* task_tracker) 271 TaskTracker* task_tracker,
272 DelayedTaskManager* delayed_task_manager)
262 : shared_priority_queue_( 273 : shared_priority_queue_(
263 Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))), 274 Bind(&SchedulerThreadPool::WakeUpOneThread, Unretained(this))),
264 worker_threads_lock_(shared_priority_queue_.container_lock()), 275 worker_threads_lock_(shared_priority_queue_.container_lock()),
265 idle_worker_threads_stack_cv_for_testing_( 276 idle_worker_threads_stack_cv_for_testing_(
266 worker_threads_lock_.CreateConditionVariable()), 277 worker_threads_lock_.CreateConditionVariable()),
267 join_for_testing_returned_(true, false), 278 join_for_testing_returned_(true, false),
268 worker_thread_delegate_( 279 worker_thread_delegate_(
269 new SchedulerWorkerThreadDelegateImpl(this, 280 new SchedulerWorkerThreadDelegateImpl(this,
270 enqueue_sequence_callback)), 281 enqueue_sequence_callback)),
271 task_tracker_(task_tracker) { 282 task_tracker_(task_tracker),
283 delayed_task_manager_(delayed_task_manager) {
272 DCHECK(task_tracker_); 284 DCHECK(task_tracker_);
285 DCHECK(delayed_task_manager_);
273 } 286 }
274 287
275 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, 288 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
276 size_t max_threads) { 289 size_t max_threads) {
277 AutoSchedulerLock auto_lock(worker_threads_lock_); 290 AutoSchedulerLock auto_lock(worker_threads_lock_);
278 291
279 DCHECK(worker_threads_.empty()); 292 DCHECK(worker_threads_.empty());
280 293
281 for (size_t i = 0; i < max_threads; ++i) { 294 for (size_t i = 0; i < max_threads; ++i) {
282 std::unique_ptr<SchedulerWorkerThread> worker_thread = 295 std::unique_ptr<SchedulerWorkerThread> worker_thread =
(...skipping 27 matching lines...) Expand all
310 AutoSchedulerLock auto_lock(worker_threads_lock_); 323 AutoSchedulerLock auto_lock(worker_threads_lock_);
311 idle_worker_threads_stack_.push(worker_thread); 324 idle_worker_threads_stack_.push(worker_thread);
312 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); 325 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size());
313 326
314 if (idle_worker_threads_stack_.size() == worker_threads_.size()) 327 if (idle_worker_threads_stack_.size() == worker_threads_.size())
315 idle_worker_threads_stack_cv_for_testing_->Broadcast(); 328 idle_worker_threads_stack_cv_for_testing_->Broadcast();
316 } 329 }
317 330
318 } // namespace internal 331 } // namespace internal
319 } // namespace base 332 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698