Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(341)

Side by Side Diff: base/task_scheduler/thread_pool.cc

Issue 1708773002: TaskScheduler [7] SchedulerThreadPool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_5_worker_thread
Patch Set: self review Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/thread_pool.h"
6
7 #include <utility>
8
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/task_scheduler/task_tracker.h"
14
15 namespace base {
16 namespace internal {
17
18 namespace {
19
20 // Shared PriorityQueue of a thread's ThreadPool. Not set for threads that don't
21 // belong to a ThreadPool.
22 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky
23 g_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER;
24
25 void PostTaskCallback(scoped_refptr<Sequence> sequence,
robliao 2016/03/31 22:48:56 Wasn't this and the below going to go into utils.h
fdoray 2016/04/01 16:02:51 Yes, it was previously in utils.h|.cc because it w
robliao 2016/04/01 19:14:49 Given that PostTaskHelper is somewhat like a priva
fdoray 2016/04/01 20:16:45 PostTaskHelper is only one statement, but it indir
26 PriorityQueue* priority_queue,
27 scoped_ptr<Task> task) {
robliao 2016/03/31 22:48:55 Should DCHECK(task) as it's not immediately derefe
fdoray 2016/04/01 16:02:51 Done. DCHECK everything to be consistent.
28 if (sequence->PushTask(std::move(task))) {
29 // |sequence| must be inserted into |priority_queue| because it was empty
30 // before |task| was inserted into it.
31 const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
32 priority_queue->BeginTransaction()->Push(
33 make_scoped_ptr(new PriorityQueue::SequenceAndSortKey(
34 std::move(sequence), sequence_sort_key)));
35 }
36 }
37
38 // Helper for posting |task| to the provided |sequence| and |priority_queue|
39 // conditional on |task_tracker|. Used by all TaskRunners defined in this file.
40 void PostTaskHelper(scoped_ptr<Task> task,
41 scoped_refptr<Sequence> sequence,
robliao 2016/03/31 22:48:55 sequence, priority_queue, and task should be DCHEC
fdoray 2016/04/01 16:02:52 Done.
42 PriorityQueue* priority_queue,
43 TaskTracker* task_tracker) {
44 task_tracker->PostTask(
45 Bind(&PostTaskCallback, std::move(sequence), priority_queue),
46 std::move(task));
47 }
48
49 // A task runner that runs tasks with the PARALLEL ExecutionMode.
50 class SchedulerParallelTaskRunner : public TaskRunner {
51 public:
52 SchedulerParallelTaskRunner(const TaskTraits& traits,
53 PriorityQueue* priority_queue,
54 TaskTracker* task_tracker)
55 : traits_(traits),
56 priority_queue_(priority_queue),
57 task_tracker_(task_tracker) {}
58
59 // TaskRunner:
60 bool PostDelayedTask(const tracked_objects::Location& from_here,
61 const Closure& closure,
62 TimeDelta delay) override {
63 // TODO(fdoray): Support delayed tasks.
64 DCHECK(delay.is_zero());
65 PostTaskHelper(make_scoped_ptr(new Task(from_here, closure, traits_)),
66 make_scoped_refptr(new Sequence), priority_queue_,
67 task_tracker_);
68 return true;
69 }
70
71 bool RunsTasksOnCurrentThread() const override {
72 return g_current_shared_priority_queue.Get().Get() == priority_queue_;
73 }
74
75 private:
76 ~SchedulerParallelTaskRunner() override = default;
77
78 const TaskTraits traits_;
79 PriorityQueue* const priority_queue_;
80 TaskTracker* const task_tracker_;
81
82 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
83 };
84
85 } // namespace
86
87 ThreadPool::~ThreadPool() {
88 AutoSchedulerLock auto_lock(join_for_testing_returned_lock_);
89 DCHECK(join_for_testing_returned_);
90 }
91
92 scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool(
93 ThreadPriority thread_priority,
94 size_t num_threads,
95 const SchedulerWorkerThread::RanTaskFromSequenceCallback&
96 ran_task_from_sequence_callback,
97 TaskTracker* task_tracker) {
98 scoped_ptr<ThreadPool> thread_pool(
99 new ThreadPool(thread_priority, num_threads,
100 ran_task_from_sequence_callback, task_tracker));
101
102 if (thread_pool->worker_threads_.empty())
robliao 2016/03/31 22:48:56 Does checking the thread count == num_threads make
fdoray 2016/04/01 16:02:51 gab@ said that we should return a ThreadPool w/o e
robliao 2016/04/01 19:14:49 A histogram would make sense if we were able to ta
fdoray 2016/04/01 20:16:45 Done. Renamed num_threads -> max_threads. I think
103 return nullptr;
104 return thread_pool;
105 }
106
107 scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits(
108 const TaskTraits& traits,
109 ExecutionMode execution_mode) {
110 switch (execution_mode) {
111 case ExecutionMode::PARALLEL:
112 return make_scoped_refptr(new SchedulerParallelTaskRunner(
113 traits, &shared_priority_queue_, task_tracker_));
114
115 case ExecutionMode::SEQUENCED:
116 case ExecutionMode::SINGLE_THREADED:
117 NOTIMPLEMENTED();
118 return nullptr;
119 }
120 }
121
122 void ThreadPool::ReinsertSequence(const SchedulerWorkerThread* worker_thread,
123 scoped_refptr<Sequence> sequence,
124 const SequenceSortKey& sequence_sort_key) {
125 DCHECK(!g_current_shared_priority_queue.Get().Get());
126
127 // If |worker_thread| belongs to this ThreadPool, set a flag to avoid waking
128 // up a SchedulerWorkerThread when |sequence| is reinserted in
129 // |shared_priority_queue_|. In such cases, |worker_thread| will soon pop a
130 // Sequence from |shared_priority_queue_| which means that there is no need to
131 // wake up another SchedulerWorkerThread to do so.
robliao 2016/03/31 22:48:56 Update this comment with the delegated GetWork.
fdoray 2016/04/01 16:02:51 Done.
132 if (g_current_shared_priority_queue.Get().Get() == &shared_priority_queue_)
133 no_wake_up_on_sequence_insertion_.Set(true);
134
135 shared_priority_queue_.BeginTransaction()->Push(
136 make_scoped_ptr(new PriorityQueue::SequenceAndSortKey(
137 std::move(sequence), sequence_sort_key)));
138 no_wake_up_on_sequence_insertion_.Set(false);
139 }
140
141 void ThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
142 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
143 while (idle_worker_threads_stack_.size() < worker_threads_.size())
144 idle_worker_threads_stack_cv_->Wait();
145 }
146
147 void ThreadPool::JoinForTesting() {
148 for (const auto& worker_thread : worker_threads_)
149 worker_thread->JoinForTesting();
150
151 {
robliao 2016/03/31 22:48:55 Is this scope necessary?
fdoray 2016/04/01 16:02:52 No. Removed it.
152 AutoSchedulerLock auto_lock(join_for_testing_returned_lock_);
153 DCHECK(!join_for_testing_returned_);
154 join_for_testing_returned_ = true;
155 }
156 }
157
158 ThreadPool::ThreadPool(ThreadPriority thread_priority,
159 size_t num_threads,
160 const SchedulerWorkerThread::RanTaskFromSequenceCallback&
161 ran_task_from_sequence_callback,
162 TaskTracker* task_tracker)
163 : shared_priority_queue_(
164 Bind(&ThreadPool::SequenceInsertedInSharedPriorityQueueCallback,
165 Unretained(this))),
166 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
167 idle_worker_threads_stack_cv_(
168 idle_worker_threads_stack_lock_.CreateConditionVariable()),
169 task_tracker_(task_tracker) {
170 DCHECK_GT(num_threads, 0U);
171 DCHECK(!ran_task_from_sequence_callback.is_null());
172 DCHECK(task_tracker_);
173
174 // |this| always outlives the worker threads to which these callbacks are
175 // passed.
176 const Closure main_entry_callback(
177 Bind(&ThreadPool::MainEntryCallback, Unretained(this)));
178 const SchedulerWorkerThread::GetWorkCallback get_work_callback(
179 Bind(&ThreadPool::GetWorkCallback, Unretained(this)));
180
181 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
182
183 for (size_t i = 0; i < num_threads; ++i) {
184 scoped_ptr<SchedulerWorkerThread> worker_thread =
185 SchedulerWorkerThread::CreateSchedulerWorkerThread(
186 thread_priority, main_entry_callback, get_work_callback,
187 ran_task_from_sequence_callback, task_tracker);
188 if (worker_thread) {
robliao 2016/03/31 22:48:55 If we decide to fail when we couldn't create all t
fdoray 2016/04/01 16:02:52 I think it makes sense to fail-fast because it is
189 idle_worker_threads_stack_.push(worker_thread.get());
190 worker_threads_.push_back(std::move(worker_thread));
191 }
192 }
193 }
194
195 void ThreadPool::WakeUpOneThread() {
196 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
197
198 if (idle_worker_threads_stack_.empty())
199 return;
200
201 idle_worker_threads_stack_.top()->WakeUp();
robliao 2016/03/31 22:48:56 Optional: It might be more robust to wake up the t
fdoray 2016/04/01 16:02:51 Done. I don't like the fact that it requires one m
202 idle_worker_threads_stack_.pop();
203 }
204
205 void ThreadPool::AddToIdleSchedulerWorkerThreadsStack(
206 SchedulerWorkerThread* worker_thread) {
207 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
208 idle_worker_threads_stack_.push(worker_thread);
209 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size());
210
211 if (idle_worker_threads_stack_.size() == worker_threads_.size())
212 idle_worker_threads_stack_cv_->Signal();
213 }
214
215 void ThreadPool::SequenceInsertedInSharedPriorityQueueCallback() {
216 if (!no_wake_up_on_sequence_insertion_.Get())
217 WakeUpOneThread();
218 }
219
220 void ThreadPool::MainEntryCallback() const {
221 DCHECK(!g_current_shared_priority_queue.Get().Get());
222 g_current_shared_priority_queue.Get().Set(&shared_priority_queue_);
223 }
224
225 scoped_refptr<Sequence> ThreadPool::GetWorkCallback(
226 SchedulerWorkerThread* worker_thread) {
227 scoped_ptr<PriorityQueue::Transaction> transaction(
228 shared_priority_queue_.BeginTransaction());
229 const PriorityQueue::SequenceAndSortKey sequence_and_sort_key(
230 transaction->Peek());
231
232 if (sequence_and_sort_key.is_null()) {
233 // |transaction| is kept alive while |worker_thread| is added to
234 // |idle_worker_threads_stack_| to avoid this scenario:
robliao 2016/03/31 22:48:55 Nit: scenario -> race
fdoray 2016/04/01 16:02:52 Done.
235 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
236 // empty and ends the Transaction.
237 // 2. Other thread creates a Transaction, inserts a Sequence into
238 // |shared_priority_queue_| and ends the Transaction. This couldn't
robliao 2016/03/31 22:48:55 Nit: couldn't -> can't
fdoray 2016/04/01 16:02:51 Done.
239 // happen if the Transaction of step 1 was still active.
240 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
241 // |idle_worker_threads_stack_| is empty.
242 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
243 // sleep. No thread runs the Sequence inserted in step 2.
244 AddToIdleSchedulerWorkerThreadsStack(worker_thread);
245 return nullptr;
246 }
247
248 transaction->Pop();
249 return sequence_and_sort_key.sequence;
250 }
251
252 } // namespace internal
253 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698