Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1708773002: TaskScheduler [7] SchedulerThreadPool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_5_worker_thread
Patch Set: Use std::unique_ptr, rename ThreadPool -> SchedulerThreadPool. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_thread_pool.h"
6
7 #include <utility>
8
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/task_scheduler/task_tracker.h"
15
16 namespace base {
17 namespace internal {
18
19 namespace {
20
21 // Shared PriorityQueue of a thread's SchedulerThreadPool. Not set for threads
22 // that don't belong to a SchedulerThreadPool.
23 LazyInstance<ThreadLocalPointer<const PriorityQueue>>::Leaky
24 g_current_shared_priority_queue = LAZY_INSTANCE_INITIALIZER;
25
26 // A task runner that runs tasks with the PARALLEL ExecutionMode.
27 class SchedulerParallelTaskRunner : public TaskRunner {
28 public:
29 SchedulerParallelTaskRunner(const TaskTraits& traits,
30 PriorityQueue* priority_queue,
31 TaskTracker* task_tracker)
32 : traits_(traits),
33 priority_queue_(priority_queue),
34 task_tracker_(task_tracker) {}
35
36 // TaskRunner:
37 bool PostDelayedTask(const tracked_objects::Location& from_here,
38 const Closure& closure,
39 TimeDelta delay) override {
40 // TODO(fdoray): Support delayed tasks.
41 DCHECK(delay.is_zero());
42 PostTaskHelper(WrapUnique(new Task(from_here, closure, traits_)),
43 make_scoped_refptr(new Sequence), priority_queue_,
44 task_tracker_);
45 return true;
46 }
47
48 bool RunsTasksOnCurrentThread() const override {
49 return g_current_shared_priority_queue.Get().Get() == priority_queue_;
50 }
51
52 private:
53 ~SchedulerParallelTaskRunner() override = default;
54
55 const TaskTraits traits_;
56 PriorityQueue* const priority_queue_;
57 TaskTracker* const task_tracker_;
58
59 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
60 };
61
62 void PostTaskCallback(scoped_refptr<Sequence> sequence,
63 PriorityQueue* priority_queue,
64 std::unique_ptr<Task> task) {
65 DCHECK(sequence);
66 DCHECK(priority_queue);
67 DCHECK(task);
68
69 if (sequence->PushTask(std::move(task))) {
70 // |sequence| must be inserted into |priority_queue| because it was empty
71 // before |task| was inserted into it.
72 const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
73 priority_queue->BeginTransaction()->Push(
74 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
75 sequence_sort_key)));
76 }
77 }
78
79 } // namespace
80
81 SchedulerThreadPool::~SchedulerThreadPool() {
82 AutoSchedulerLock auto_lock(join_for_testing_returned_lock_);
83 DCHECK(join_for_testing_returned_);
84 }
85
86 std::unique_ptr<SchedulerThreadPool> SchedulerThreadPool::CreateThreadPool(
87 ThreadPriority thread_priority,
88 size_t max_threads,
89 const SchedulerWorkerThread::RanTaskFromSequenceCallback&
90 ran_task_from_sequence_callback,
91 TaskTracker* task_tracker) {
92 std::unique_ptr<SchedulerThreadPool> thread_pool(
93 new SchedulerThreadPool(thread_priority, max_threads,
94 ran_task_from_sequence_callback, task_tracker));
95
96 if (thread_pool->worker_threads_.empty())
97 return nullptr;
98 return thread_pool;
99 }
100
101 scoped_refptr<TaskRunner> SchedulerThreadPool::CreateTaskRunnerWithTraits(
102 const TaskTraits& traits,
103 ExecutionMode execution_mode) {
104 switch (execution_mode) {
105 case ExecutionMode::PARALLEL:
106 return make_scoped_refptr(new SchedulerParallelTaskRunner(
107 traits, &shared_priority_queue_, task_tracker_));
108
109 case ExecutionMode::SEQUENCED:
110 case ExecutionMode::SINGLE_THREADED:
111 NOTIMPLEMENTED();
112 return nullptr;
113 }
114
115 NOTREACHED();
116 return nullptr;
117 }
118
119 void SchedulerThreadPool::ReinsertSequence(
120 scoped_refptr<Sequence> sequence,
121 const SequenceSortKey& sequence_sort_key) {
122 DCHECK(!g_current_shared_priority_queue.Get().Get());
123
124 // If |worker_thread| belongs to this thread pool, set a flag to avoid waking
robliao 2016/04/01 21:05:51 Update this comment.
fdoray 2016/04/01 21:45:30 Done.
125 // up a SchedulerWorkerThread when |sequence| is reinserted in
126 // |shared_priority_queue_|. In such cases, |worker_thread| will soon get a
127 // Sequence from |shared_priority_queue_| via GetWorkCallback() and there is
128 // no need to wake up another SchedulerWorkerThread to do so.
129 if (g_current_shared_priority_queue.Get().Get() == &shared_priority_queue_)
130 no_wake_up_on_sequence_insertion_.Set(true);
131
132 shared_priority_queue_.BeginTransaction()->Push(
133 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
134 sequence_sort_key)));
135 no_wake_up_on_sequence_insertion_.Set(false);
136 }
137
138 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
139 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
140 while (idle_worker_threads_stack_.size() < worker_threads_.size())
141 idle_worker_threads_stack_cv_->Wait();
142 }
143
144 void SchedulerThreadPool::JoinForTesting() {
145 for (const auto& worker_thread : worker_threads_)
146 worker_thread->JoinForTesting();
147
148 AutoSchedulerLock auto_lock(join_for_testing_returned_lock_);
149 DCHECK(!join_for_testing_returned_);
150 join_for_testing_returned_ = true;
151 }
152
153 SchedulerThreadPool::SchedulerThreadPool(
154 ThreadPriority thread_priority,
155 size_t max_threads,
156 const SchedulerWorkerThread::RanTaskFromSequenceCallback&
157 ran_task_from_sequence_callback,
158 TaskTracker* task_tracker)
159 : shared_priority_queue_(Bind(
160 &SchedulerThreadPool::SequenceInsertedInSharedPriorityQueueCallback,
161 Unretained(this))),
162 idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
163 idle_worker_threads_stack_cv_(
164 idle_worker_threads_stack_lock_.CreateConditionVariable()),
165 task_tracker_(task_tracker) {
166 DCHECK_GT(max_threads, 0U);
167 DCHECK(!ran_task_from_sequence_callback.is_null());
168 DCHECK(task_tracker_);
169
170 // |this| always outlives the worker threads to which these callbacks are
171 // passed.
172 const Closure main_entry_callback(
173 Bind(&SchedulerThreadPool::MainEntryCallback, Unretained(this)));
174 const SchedulerWorkerThread::GetWorkCallback get_work_callback(
175 Bind(&SchedulerThreadPool::GetWorkCallback, Unretained(this)));
176
177 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
178
179 for (size_t i = 0; i < max_threads; ++i) {
180 std::unique_ptr<SchedulerWorkerThread> worker_thread =
181 SchedulerWorkerThread::CreateSchedulerWorkerThread(
182 thread_priority, main_entry_callback, get_work_callback,
183 ran_task_from_sequence_callback, task_tracker);
184 if (!worker_thread)
185 break;
186 idle_worker_threads_stack_.push(worker_thread.get());
187 worker_threads_.push_back(std::move(worker_thread));
188 }
189 }
190
191 void SchedulerThreadPool::WakeUpOneThread() {
192 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
193
194 if (idle_worker_threads_stack_.empty())
195 return;
196
197 SchedulerWorkerThread* worker_thread = idle_worker_threads_stack_.top();
198 idle_worker_threads_stack_.pop();
199 worker_thread->WakeUp();
200 }
201
202 void SchedulerThreadPool::AddToIdleSchedulerWorkerThreadsStack(
203 SchedulerWorkerThread* worker_thread) {
204 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
205 idle_worker_threads_stack_.push(worker_thread);
206 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size());
207
208 if (idle_worker_threads_stack_.size() == worker_threads_.size())
209 idle_worker_threads_stack_cv_->Signal();
210 }
211
212 void SchedulerThreadPool::SequenceInsertedInSharedPriorityQueueCallback() {
213 if (!no_wake_up_on_sequence_insertion_.Get())
214 WakeUpOneThread();
215 }
216
217 void SchedulerThreadPool::MainEntryCallback() const {
218 DCHECK(!g_current_shared_priority_queue.Get().Get());
219 g_current_shared_priority_queue.Get().Set(&shared_priority_queue_);
220 }
221
222 scoped_refptr<Sequence> SchedulerThreadPool::GetWorkCallback(
223 SchedulerWorkerThread* worker_thread) {
224 std::unique_ptr<PriorityQueue::Transaction> transaction(
225 shared_priority_queue_.BeginTransaction());
226 const PriorityQueue::SequenceAndSortKey sequence_and_sort_key(
227 transaction->Peek());
228
229 if (sequence_and_sort_key.is_null()) {
230 // |transaction| is kept alive while |worker_thread| is added to
231 // |idle_worker_threads_stack_| to avoid this race:
232 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
233 // empty and ends the Transaction.
234 // 2. Other thread creates a Transaction, inserts a Sequence into
235 // |shared_priority_queue_| and ends the Transaction. This can't happen
236 // if the Transaction of step 1 is still active.
237 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
238 // |idle_worker_threads_stack_| is empty.
239 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
240 // sleep. No thread runs the Sequence inserted in step 2.
241 AddToIdleSchedulerWorkerThreadsStack(worker_thread);
242 return nullptr;
243 }
244
245 transaction->Pop();
246 return sequence_and_sort_key.sequence;
247 }
248
249 void PostTaskHelper(std::unique_ptr<Task> task,
250 scoped_refptr<Sequence> sequence,
251 PriorityQueue* priority_queue,
252 TaskTracker* task_tracker) {
253 DCHECK(task);
254 DCHECK(sequence);
255 DCHECK(priority_queue);
256 DCHECK(task_tracker);
257
258 task_tracker->PostTask(
259 Bind(&PostTaskCallback, std::move(sequence), priority_queue),
260 std::move(task));
261 }
262
263 } // namespace internal
264 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698