Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(267)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1806473002: TaskScheduler [9] Delayed Task Manager (Closed) Base URL: https://luckyluke-private.googlesource.com/src@s_5_worker_thread
Patch Set: CR danakj #29 Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool.h" 5 #include "base/task_scheduler/scheduler_thread_pool.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
(...skipping 13 matching lines...) Expand all
24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky 24 LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER; 25 tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
26 26
27 // A task runner that runs tasks with the PARALLEL ExecutionMode. 27 // A task runner that runs tasks with the PARALLEL ExecutionMode.
28 class SchedulerParallelTaskRunner : public TaskRunner { 28 class SchedulerParallelTaskRunner : public TaskRunner {
29 public: 29 public:
30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 30 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
31 // long as |executor| is alive. 31 // long as |executor| is alive.
32 // TODO(robliao): Find a concrete way to manage |executor|'s memory. 32 // TODO(robliao): Find a concrete way to manage |executor|'s memory.
33 SchedulerParallelTaskRunner(const TaskTraits& traits, 33 SchedulerParallelTaskRunner(const TaskTraits& traits,
34 SchedulerTaskExecutor* executor,
34 TaskTracker* task_tracker, 35 TaskTracker* task_tracker,
35 SchedulerTaskExecutor* executor) 36 DelayedTaskManager* delayed_task_manager)
36 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} 37 : traits_(traits),
38 executor_(executor),
39 task_tracker_(task_tracker),
40 delayed_task_manager_(delayed_task_manager) {}
37 41
38 // TaskRunner: 42 // TaskRunner:
39 bool PostDelayedTask(const tracked_objects::Location& from_here, 43 bool PostDelayedTask(const tracked_objects::Location& from_here,
40 const Closure& closure, 44 const Closure& closure,
41 TimeDelta delay) override { 45 TimeDelta delay) override {
42 // Post the task as part of a one-off single-task Sequence. 46 // Post the task as part of a one-off single-task Sequence.
43 return PostTaskToExecutor( 47 return PostTaskToExecutor(
44 WrapUnique( 48 WrapUnique(
45 new Task(from_here, closure, traits_, 49 new Task(from_here, closure, traits_,
46 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), 50 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
47 make_scoped_refptr(new Sequence), executor_, task_tracker_); 51 make_scoped_refptr(new Sequence), executor_, task_tracker_,
52 delayed_task_manager_);
48 } 53 }
49 54
50 bool RunsTasksOnCurrentThread() const override { 55 bool RunsTasksOnCurrentThread() const override {
51 return tls_current_thread_pool.Get().Get() == executor_; 56 return tls_current_thread_pool.Get().Get() == executor_;
52 } 57 }
53 58
54 private: 59 private:
55 ~SchedulerParallelTaskRunner() override = default; 60 ~SchedulerParallelTaskRunner() override = default;
56 61
57 const TaskTraits traits_; 62 const TaskTraits traits_;
63 SchedulerTaskExecutor* const executor_;
58 TaskTracker* const task_tracker_; 64 TaskTracker* const task_tracker_;
59 SchedulerTaskExecutor* const executor_; 65 DelayedTaskManager* const delayed_task_manager_;
60 66
61 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); 67 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
62 }; 68 };
63 69
64 // A task runner that runs tasks with the SEQUENCED ExecutionMode. 70 // A task runner that runs tasks with the SEQUENCED ExecutionMode.
65 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { 71 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
66 public: 72 public:
67 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 73 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
68 // long as |executor| is alive. 74 // long as |executor| is alive.
69 // TODO(robliao): Find a concrete way to manage |executor|'s memory. 75 // TODO(robliao): Find a concrete way to manage |executor|'s memory.
70 SchedulerSequencedTaskRunner(const TaskTraits& traits, 76 SchedulerSequencedTaskRunner(const TaskTraits& traits,
77 SchedulerTaskExecutor* executor,
71 TaskTracker* task_tracker, 78 TaskTracker* task_tracker,
72 SchedulerTaskExecutor* executor) 79 DelayedTaskManager* delayed_task_manager)
73 : traits_(traits), task_tracker_(task_tracker), executor_(executor) {} 80 : traits_(traits),
81 executor_(executor),
82 task_tracker_(task_tracker),
83 delayed_task_manager_(delayed_task_manager) {}
74 84
75 // SequencedTaskRunner: 85 // SequencedTaskRunner:
76 bool PostDelayedTask(const tracked_objects::Location& from_here, 86 bool PostDelayedTask(const tracked_objects::Location& from_here,
77 const Closure& closure, 87 const Closure& closure,
78 TimeDelta delay) override { 88 TimeDelta delay) override {
79 // Post the task as part of |sequence|. 89 // Post the task as part of |sequence|.
80 return PostTaskToExecutor( 90 return PostTaskToExecutor(
81 WrapUnique( 91 WrapUnique(
82 new Task(from_here, closure, traits_, 92 new Task(from_here, closure, traits_,
83 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)), 93 delay.is_zero() ? TimeTicks() : TimeTicks::Now() + delay)),
84 sequence_, executor_, task_tracker_); 94 sequence_, executor_, task_tracker_, delayed_task_manager_);
85 } 95 }
86 96
87 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 97 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
88 const Closure& closure, 98 const Closure& closure,
89 base::TimeDelta delay) override { 99 base::TimeDelta delay) override {
90 // Tasks are never nested within the task scheduler. 100 // Tasks are never nested within the task scheduler.
91 return PostDelayedTask(from_here, closure, delay); 101 return PostDelayedTask(from_here, closure, delay);
92 } 102 }
93 103
94 bool RunsTasksOnCurrentThread() const override { 104 bool RunsTasksOnCurrentThread() const override {
95 return tls_current_thread_pool.Get().Get() == executor_; 105 return tls_current_thread_pool.Get().Get() == executor_;
96 } 106 }
97 107
98 private: 108 private:
99 ~SchedulerSequencedTaskRunner() override = default; 109 ~SchedulerSequencedTaskRunner() override = default;
100 110
101 // Sequence for all Tasks posted through this TaskRunner. 111 // Sequence for all Tasks posted through this TaskRunner.
102 const scoped_refptr<Sequence> sequence_ = new Sequence; 112 const scoped_refptr<Sequence> sequence_ = new Sequence;
103 113
104 const TaskTraits traits_; 114 const TaskTraits traits_;
115 SchedulerTaskExecutor* const executor_;
105 TaskTracker* const task_tracker_; 116 TaskTracker* const task_tracker_;
106 SchedulerTaskExecutor* const executor_; 117 DelayedTaskManager* const delayed_task_manager_;
107 118
108 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); 119 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
109 }; 120 };
110 121
111 } // namespace 122 } // namespace
112 123
113 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl 124 class SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl
114 : public SchedulerWorkerThread::Delegate { 125 : public SchedulerWorkerThread::Delegate {
115 public: 126 public:
116 SchedulerWorkerThreadDelegateImpl( 127 SchedulerWorkerThreadDelegateImpl(
(...skipping 17 matching lines...) Expand all
134 SchedulerThreadPool::~SchedulerThreadPool() { 145 SchedulerThreadPool::~SchedulerThreadPool() {
135 // SchedulerThreadPool should never be deleted in production unless its 146 // SchedulerThreadPool should never be deleted in production unless its
136 // initialization failed. 147 // initialization failed.
137 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty()); 148 DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
138 } 149 }
139 150
140 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool( 151 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
141 ThreadPriority thread_priority, 152 ThreadPriority thread_priority,
142 size_t max_threads, 153 size_t max_threads,
143 const EnqueueSequenceCallback& enqueue_sequence_callback, 154 const EnqueueSequenceCallback& enqueue_sequence_callback,
144 TaskTracker* task_tracker) { 155 TaskTracker* task_tracker,
145 std::unique_ptr<SchedulerThreadPool> thread_pool( 156 DelayedTaskManager* delayed_task_manager) {
146 new SchedulerThreadPool(enqueue_sequence_callback, task_tracker)); 157 std::unique_ptr<SchedulerThreadPool> thread_pool(new SchedulerThreadPool(
158 enqueue_sequence_callback, task_tracker, delayed_task_manager));
147 if (thread_pool->Initialize(thread_priority, max_threads)) 159 if (thread_pool->Initialize(thread_priority, max_threads))
148 return thread_pool; 160 return thread_pool;
149 return nullptr; 161 return nullptr;
150 } 162 }
151 163
152 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits( 164 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
153 const TaskTraits& traits, 165 const TaskTraits& traits,
154 ExecutionMode execution_mode) { 166 ExecutionMode execution_mode) {
155 switch (execution_mode) { 167 switch (execution_mode) {
156 case ExecutionMode::PARALLEL: 168 case ExecutionMode::PARALLEL:
157 return make_scoped_refptr( 169 return make_scoped_refptr(new SchedulerParallelTaskRunner(
158 new SchedulerParallelTaskRunner(traits, task_tracker_, this)); 170 traits, this, task_tracker_, delayed_task_manager_));
159 171
160 case ExecutionMode::SEQUENCED: 172 case ExecutionMode::SEQUENCED:
161 return make_scoped_refptr( 173 return make_scoped_refptr(new SchedulerSequencedTaskRunner(
162 new SchedulerSequencedTaskRunner(traits, task_tracker_, this)); 174 traits, this, task_tracker_, delayed_task_manager_));
163 175
164 case ExecutionMode::SINGLE_THREADED: 176 case ExecutionMode::SINGLE_THREADED:
165 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. 177 // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
166 NOTREACHED(); 178 NOTREACHED();
167 return nullptr; 179 return nullptr;
168 } 180 }
169 181
170 NOTREACHED(); 182 NOTREACHED();
171 return nullptr; 183 return nullptr;
172 } 184 }
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
261 return sequence_and_sort_key.sequence; 273 return sequence_and_sort_key.sequence;
262 } 274 }
263 275
264 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( 276 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
265 scoped_refptr<Sequence> sequence) { 277 scoped_refptr<Sequence> sequence) {
266 enqueue_sequence_callback_.Run(std::move(sequence)); 278 enqueue_sequence_callback_.Run(std::move(sequence));
267 } 279 }
268 280
269 SchedulerThreadPool::SchedulerThreadPool( 281 SchedulerThreadPool::SchedulerThreadPool(
270 const EnqueueSequenceCallback& enqueue_sequence_callback, 282 const EnqueueSequenceCallback& enqueue_sequence_callback,
271 TaskTracker* task_tracker) 283 TaskTracker* task_tracker,
284 DelayedTaskManager* delayed_task_manager)
272 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()), 285 : idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
273 idle_worker_threads_stack_cv_for_testing_( 286 idle_worker_threads_stack_cv_for_testing_(
274 idle_worker_threads_stack_lock_.CreateConditionVariable()), 287 idle_worker_threads_stack_lock_.CreateConditionVariable()),
275 join_for_testing_returned_(true, false), 288 join_for_testing_returned_(true, false),
276 worker_thread_delegate_( 289 worker_thread_delegate_(
277 new SchedulerWorkerThreadDelegateImpl(this, 290 new SchedulerWorkerThreadDelegateImpl(this,
278 enqueue_sequence_callback)), 291 enqueue_sequence_callback)),
279 task_tracker_(task_tracker) { 292 task_tracker_(task_tracker),
293 delayed_task_manager_(delayed_task_manager) {
280 DCHECK(task_tracker_); 294 DCHECK(task_tracker_);
295 DCHECK(delayed_task_manager_);
281 } 296 }
282 297
283 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority, 298 bool SchedulerThreadPool::Initialize(ThreadPriority thread_priority,
284 size_t max_threads) { 299 size_t max_threads) {
285 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 300 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
286 301
287 DCHECK(worker_threads_.empty()); 302 DCHECK(worker_threads_.empty());
288 303
289 for (size_t i = 0; i < max_threads; ++i) { 304 for (size_t i = 0; i < max_threads; ++i) {
290 std::unique_ptr<SchedulerWorkerThread> worker_thread = 305 std::unique_ptr<SchedulerWorkerThread> worker_thread =
(...skipping 30 matching lines...) Expand all
321 if (idle_worker_threads_stack_.empty()) 336 if (idle_worker_threads_stack_.empty())
322 return nullptr; 337 return nullptr;
323 338
324 auto worker_thread = idle_worker_threads_stack_.top(); 339 auto worker_thread = idle_worker_threads_stack_.top();
325 idle_worker_threads_stack_.pop(); 340 idle_worker_threads_stack_.pop();
326 return worker_thread; 341 return worker_thread;
327 } 342 }
328 343
329 } // namespace internal 344 } // namespace internal
330 } // namespace base 345 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool.h ('k') | base/task_scheduler/scheduler_thread_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698