Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(947)

Side by Side Diff: base/task_scheduler/thread_pool.cc

Issue 1698183005: Reference CL for the new task scheduler. (Closed) Base URL: https://luckyluke-private.googlesource.com/src@bigmaster2
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/task_scheduler/thread_pool.h ('k') | base/task_scheduler/thread_pool_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/thread_pool.h"
6
7 #include <utility>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/task_scheduler/utils.h"
12
13 namespace base {
14 namespace internal {
15
16 namespace {
17
18 bool WorkerThreadIsInVector(
19 const WorkerThread* worker_thread,
20 const std::vector<scoped_ptr<WorkerThread>>& worker_threads) {
21 for (const auto& current_worker_thread : worker_threads) {
22 if (current_worker_thread.get() == worker_thread)
23 return true;
24 }
25 return false;
26 }
27
28 // A task runner that runs tasks with the PARALLEL strategy.
29 class SchedulerParallelTaskRunner : public TaskRunner {
30 public:
31 // Tasks posted through this task runner have |traits| and are inserted in
32 // |shared_priority_queue|. |delayed_task_manager| is used to post delayed
33 // tasks. |shutdown_manager| is notified when a task is posted.
34 SchedulerParallelTaskRunner(const TaskTraits& traits,
35 PriorityQueue* priority_queue,
36 DelayedTaskManager* delayed_task_manager,
37 ShutdownManager* shutdown_manager);
38
39 // TaskRunner:
40 bool PostDelayedTask(const tracked_objects::Location& from_here,
41 const Closure& closure,
42 TimeDelta delay) override;
43 bool RunsTasksOnCurrentThread() const override;
44
45 private:
46 ~SchedulerParallelTaskRunner() override;
47
48 TaskTraits traits_;
49 PriorityQueue* priority_queue_;
50 DelayedTaskManager* delayed_task_manager_;
51 ShutdownManager* shutdown_manager_;
52
53 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
54 };
55
56 SchedulerParallelTaskRunner::SchedulerParallelTaskRunner(
57 const TaskTraits& traits,
58 PriorityQueue* priority_queue,
59 DelayedTaskManager* delayed_task_manager,
60 ShutdownManager* shutdown_manager)
61 : traits_(traits),
62 priority_queue_(priority_queue),
63 delayed_task_manager_(delayed_task_manager),
64 shutdown_manager_(shutdown_manager) {}
65
66 bool SchedulerParallelTaskRunner::PostDelayedTask(
67 const tracked_objects::Location& from_here,
68 const Closure& closure,
69 TimeDelta delay) {
70 Task task(from_here, closure, traits_, TimeTicks::Now());
71 if (!delay.is_zero())
72 task.delayed_run_time = task.post_time + delay;
73 PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_,
74 shutdown_manager_, delayed_task_manager_);
75 return true;
76 }
77
78 bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const {
79 // TODO(fdoray): Return true only if tasks posted may actually run on the
80 // current thread. It is valid, but not ideal, to always return true.
81 return true;
82 }
83
84 SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default;
85
86 // A task runner that runs tasks in with the SEQUENCED strategy.
87 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
88 public:
89 // Tasks posted through this task runner have |traits| and are inserted in
90 // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|.
91 // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is
92 // notified when a task is posted.
93 SchedulerSequencedTaskRunner(const TaskTraits& traits,
94 scoped_refptr<Sequence> sequence,
95 PriorityQueue* priority_queue,
96 DelayedTaskManager* delayed_task_manager,
97 ShutdownManager* shutdown_manager);
98
99 // SequencedTaskRunner:
100 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
101 const Closure& task,
102 TimeDelta delay) override;
103 bool PostDelayedTask(const tracked_objects::Location& from_here,
104 const Closure& closure,
105 TimeDelta delay) override;
106 bool RunsTasksOnCurrentThread() const override;
107
108 private:
109 ~SchedulerSequencedTaskRunner() override;
110
111 TaskTraits traits_;
112 scoped_refptr<Sequence> sequence_;
113 PriorityQueue* priority_queue_;
114 DelayedTaskManager* delayed_task_manager_;
115 ShutdownManager* shutdown_manager_;
116
117 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
118 };
119
120 SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner(
121 const TaskTraits& traits,
122 scoped_refptr<Sequence> sequence,
123 PriorityQueue* priority_queue,
124 DelayedTaskManager* delayed_task_manager,
125 ShutdownManager* shutdown_manager)
126 : traits_(traits),
127 sequence_(sequence),
128 priority_queue_(priority_queue),
129 delayed_task_manager_(delayed_task_manager),
130 shutdown_manager_(shutdown_manager) {}
131
132 bool SchedulerSequencedTaskRunner::PostDelayedTask(
133 const tracked_objects::Location& from_here,
134 const Closure& closure,
135 TimeDelta delay) {
136 Task task(from_here, closure, traits_, TimeTicks::Now());
137 if (!delay.is_zero())
138 task.delayed_run_time = task.post_time + delay;
139 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_,
140 delayed_task_manager_);
141 return true;
142 }
143
144 bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const {
145 // TODO(fdoray): Return true only if tasks posted may actually run on the
146 // current thread. It is valid, but not ideal, to always return true.
147 return true;
148 }
149
150 bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask(
151 const tracked_objects::Location& from_here,
152 const Closure& task,
153 TimeDelta delay) {
154 return PostDelayedTask(from_here, task, delay);
155 }
156
157 SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default;
158
159 } // namespace
160
161
162 ThreadPool::~ThreadPool() = default;
163
164 scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool(
165 ThreadPriority thread_priority,
166 size_t num_threads,
167 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
168 ShutdownManager* shutdown_manager) {
169 scoped_ptr<ThreadPool> thread_pool(
170 new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback,
171 shutdown_manager));
172 return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool)
173 : scoped_ptr<ThreadPool>();
174 }
175
176 size_t ThreadPool::GetNumThreads() const {
177 return worker_threads_.size();
178 }
179
180 scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits(
181 const TaskTraits& traits,
182 ExecutionMode execution_mode) {
183 switch (execution_mode) {
184 case ExecutionMode::PARALLEL: {
185 return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner(
186 traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_));
187 }
188
189 case ExecutionMode::SEQUENCED: {
190 return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner(
191 traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_,
192 &delayed_task_manager_, shutdown_manager_));
193 }
194
195 case ExecutionMode::SINGLE_THREADED: {
196 DCHECK(!worker_threads_.empty());
197 // TODO(fdoray): Better thread assignment.
198 return scoped_refptr<TaskRunner>(
199 worker_threads_.front()
200 ->CreateTaskRunnerWithTraits(traits, execution_mode)
201 .get());
202 }
203
204 #if defined(OS_WIN)
205 case ExecutionMode::SINGLE_THREADED_COM_STA: {
206 // TODO(fdoray): Implement COM.
207 NOTIMPLEMENTED();
208 return scoped_refptr<TaskRunner>();
209 }
210 #endif
211
212 default: {
213 NOTREACHED();
214 return scoped_refptr<TaskRunner>();
215 }
216 }
217 }
218
219 void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence,
220 const SequenceSortKey& sequence_sort_key,
221 const WorkerThread* worker_thread) {
222 DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get());
223
224 // Set a flag to avoid waking up a thread when reinserting |sequence| in
225 // |priority_queue_| if the thread doing the reinsertion:
226 // - Can run tasks from |priority_queue_|, and,
227 // - Doesn't have pending single-threaded tasks.
228 // If these conditions are met, the thread doing the reinsertion will soon
229 // pop a sequence from |priority_queue_|. There is no need to wake up a new
230 // thread to do it.
231 if (worker_thread->shared_priority_queue() == &priority_queue_ &&
232 !worker_thread->HasSingleThreadedTasks()) {
233 disable_wake_up_thread_on_sequence_insertion_.Set(true);
234 }
235
236 // Insert the sequence in the priority queue.
237 priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key);
238
239 disable_wake_up_thread_on_sequence_insertion_.Set(false);
240 }
241
242 void ThreadPool::ShutdownAndJoinAllThreadsForTesting() {
243 for (const auto& worker_thread : worker_threads_)
244 worker_thread->ShutdownAndJoinForTesting();
245 }
246
247 ThreadPool::ThreadPool(
248 ThreadPriority thread_priority,
249 size_t num_threads,
250 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
251 ShutdownManager* shutdown_manager)
252 : thread_pool_ready_(true, false),
253 priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue,
254 Unretained(this))),
255 shutdown_manager_(shutdown_manager),
256 delayed_task_manager_(
257 Bind(&ThreadPool::WakeUpOneThread, Unretained(this)),
258 shutdown_manager_) {
259 DCHECK_GT(num_threads, 0u);
260 DCHECK(shutdown_manager);
261
262 // The platform threads reference thread pool data structures and there's
263 // currently no way for us to create them in a suspended state. We'll use the
264 // main entry callback to have the threads wait and then signal them when
265 // we're ready.
266 const WorkerThread::MainEntryCallback main_entry_callback =
267 Bind(&WaitableEvent::Wait, Unretained(&thread_pool_ready_));
268
269 const WorkerThread::BecomesIdleCallback becomes_idle_callback =
270 Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this));
271 worker_threads_.reserve(num_threads);
272
273 for (size_t i = 0; i < num_threads; ++i) {
274 scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread(
275 thread_priority, &priority_queue_, main_entry_callback,
276 reinsert_sequence_callback, becomes_idle_callback,
277 &delayed_task_manager_, shutdown_manager_);
278 if (worker_thread.get() != nullptr)
279 worker_threads_.push_back(std::move(worker_thread));
280 }
281
282 thread_pool_ready_.Signal();
283 }
284
285 void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) {
286 DCHECK(WorkerThreadIsInVector(worker_thread, worker_threads_));
287
288 AutoSchedulerLock auto_lock_(idle_worker_threads_lock_);
289
290 if (idle_worker_threads_set_.find(worker_thread) !=
291 idle_worker_threads_set_.end()) {
292 // The worker thread is already on the stack of idle threads.
293 return;
294 }
295
296 // Add the worker thread to the stack of idle threads.
297 idle_worker_threads_stack_.push(worker_thread);
298 idle_worker_threads_set_.insert(worker_thread);
299 }
300
301 void ThreadPool::WakeUpOneThread() {
302 // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't
303 // have pending or running single-threaded tasks.
304 AutoSchedulerLock auto_lock(idle_worker_threads_lock_);
305 while (!idle_worker_threads_stack_.empty()) {
306 WorkerThread* worker_thread = idle_worker_threads_stack_.top();
307
308 idle_worker_threads_stack_.pop();
309 idle_worker_threads_set_.erase(worker_thread);
310
311 // HasSingleThreadedTasks() can return stale results. However, when it
312 // returns true below, it is guaranteed that |worker_thread| is either awake
313 // or about to be woken up and that it will not enter
314 // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This
315 // is important because if all threads in |idle_worker_threads_stack_|
316 // report that they have single-threaded tasks, no thread is woken up by
317 // this method. If these threads don't check |priority_queue_| before
318 // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could
319 // end up never being done. The guarantee works because between the moment
320 // HasSingleThreadedTasks() goes from true to false and the moment
321 // |worker_thread| enters WaitUntilWorkIsAvailable(),
322 // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool.
323 // Both WorkerThreadBecomesIdleCallback() and the current method acquire
324 // |idle_worker_threads_lock_|, which synchronizes the value returned by
325 // HasSingleThreadedTasks().
326 //
327 // TODO(fdoray): A single-threaded task can be posted to |worker_thread|
328 // immediately after HasSingleThreadedTasks() has returned false. Ideally,
329 // when this happens, another worker thread should be woken up.
330 if (!worker_thread->HasSingleThreadedTasks()) {
331 worker_thread->WakeUp();
332 break;
333 }
334 }
335 }
336
337 void ThreadPool::OnSequenceInsertedInPriorityQueue() {
338 if (disable_wake_up_thread_on_sequence_insertion_.Get())
339 return;
340
341 WakeUpOneThread();
342 }
343
344 } // namespace internal
345 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/thread_pool.h ('k') | base/task_scheduler/thread_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698