Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: base/task_scheduler/scheduler_thread_pool.cc

Issue 1895513002: TaskScheduler [12] Support SINGLE_THREADED in SchedulerThreadPool DO NOT SUBMIT (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@10_superstack
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_thread_pool.h" 5 #include "base/task_scheduler/scheduler_thread_pool.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
161 ExecutionMode execution_mode) { 161 ExecutionMode execution_mode) {
162 switch (execution_mode) { 162 switch (execution_mode) {
163 case ExecutionMode::PARALLEL: 163 case ExecutionMode::PARALLEL:
164 return make_scoped_refptr(new SchedulerParallelTaskRunner( 164 return make_scoped_refptr(new SchedulerParallelTaskRunner(
165 traits, this, task_tracker_, delayed_task_manager_)); 165 traits, this, task_tracker_, delayed_task_manager_));
166 166
167 case ExecutionMode::SEQUENCED: 167 case ExecutionMode::SEQUENCED:
168 return make_scoped_refptr(new SchedulerSequencedTaskRunner( 168 return make_scoped_refptr(new SchedulerSequencedTaskRunner(
169 traits, this, task_tracker_, delayed_task_manager_)); 169 traits, this, task_tracker_, delayed_task_manager_));
170 170
171 case ExecutionMode::SINGLE_THREADED: 171 case ExecutionMode::SINGLE_THREADED: {
172 // TODO(fdoray): Support SINGLE_THREADED TaskRunners. 172 // Acquire a lock to ensure that the number of single-thread TaskRunners
173 NOTREACHED(); 173 // for a given SchedulerWorkerThread doesn't increase between the calls to
174 return nullptr; 174 // GetNumSingleThreadTaskRunners() and CreateTaskRunnerWithTraits() below.
175 // This number can still decrease.
176 AutoSchedulerLock auto_lock(single_thread_task_runner_creation_lock_);
177
178 SchedulerWorkerThread* worker_thread = nullptr;
179 size_t min_num_single_thread_task_runners = -1;
180 for (const auto& current_worker_thread : worker_threads_) {
gab 2016/04/18 19:06:35 Add a TODO to make this work with dynamic thread c
181 const size_t current_num_single_thread_task_runners =
182 current_worker_thread->GetNumSingleThreadTaskRunners();
183 if (current_num_single_thread_task_runners <
184 min_num_single_thread_task_runners) {
185 worker_thread = current_worker_thread.get();
186 min_num_single_thread_task_runners =
187 current_num_single_thread_task_runners;
188 if (min_num_single_thread_task_runners == 0)
189 break;
gab 2016/04/18 19:06:35 If we assume we always assign in a loop then there
fdoray 2016/04/18 19:47:14 SingleThreadTaskRunners can be destroyed and thus
190 }
191 }
192
193 // |worker_thread| is the SchedulerWorkerThread with the fewer single-
194 // thread TaskRunners.
195 DCHECK(worker_thread);
196 return worker_thread->CreateTaskRunnerWithTraits(traits);
197 }
175 } 198 }
176 199
177 NOTREACHED(); 200 NOTREACHED();
178 return nullptr; 201 return nullptr;
179 } 202 }
180 203
181 void SchedulerThreadPool::EnqueueSequence( 204 void SchedulerThreadPool::EnqueueSequence(
182 scoped_refptr<Sequence> sequence, 205 scoped_refptr<Sequence> sequence,
183 const SequenceSortKey& sequence_sort_key) { 206 const SequenceSortKey& sequence_sort_key) {
184 shared_priority_queue_.BeginTransaction()->Push( 207 shared_priority_queue_.BeginTransaction()->Push(
185 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence), 208 WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
186 sequence_sort_key))); 209 sequence_sort_key)));
187 210
188 // The thread calling this method just ran a Task from |sequence| and will 211 // The thread calling this method just ran a Task from |sequence| and will
189 // soon try to get another Sequence from which to run a Task. If the thread 212 // soon try to get another Sequence from which to run a Task. If the thread
190 // belongs to this pool, it will get that Sequence from 213 // belongs to this pool, it will get that Sequence from
191 // |shared_priority_queue_|. When that's the case, there is no need to wake up 214 // |shared_priority_queue_|. When that's the case, there is no need to wake up
192 // another thread after |sequence| is inserted in |shared_priority_queue_|. If 215 // another thread after |sequence| is inserted in |shared_priority_queue_|. If
193 // we did wake up another thread, we would waste resources by having more 216 // we did wake up another thread, we would waste resources by having more
194 // threads trying to get a Sequence from |shared_priority_queue_| than the 217 // threads trying to get a Sequence from |shared_priority_queue_| than the
195 // number of Sequences in it. 218 // number of Sequences in it.
196 if (tls_current_thread_pool.Get().Get() != this) 219 if (tls_current_thread_pool.Get().Get() != this)
197 WakeUpOneThread(); 220 WakeUpOneThread();
198 } 221 }
199 222
200 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() { 223 void SchedulerThreadPool::WaitForAllWorkerThreadsIdleForTesting() {
201 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 224 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
202 while (idle_worker_threads_stack_.size() < worker_threads_.size()) 225 while (idle_worker_threads_stack_.Size() < worker_threads_.size())
203 idle_worker_threads_stack_cv_for_testing_->Wait(); 226 idle_worker_threads_stack_cv_for_testing_->Wait();
204 } 227 }
205 228
206 void SchedulerThreadPool::JoinForTesting() { 229 void SchedulerThreadPool::JoinForTesting() {
207 for (const auto& worker_thread : worker_threads_) 230 for (const auto& worker_thread : worker_threads_)
208 worker_thread->JoinForTesting(); 231 worker_thread->JoinForTesting();
209 232
210 DCHECK(!join_for_testing_returned_.IsSignaled()); 233 DCHECK(!join_for_testing_returned_.IsSignaled());
211 join_for_testing_returned_.Signal(); 234 join_for_testing_returned_.Signal();
212 } 235 }
(...skipping 25 matching lines...) Expand all
238 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() { 261 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
239 DCHECK(!tls_current_thread_pool.Get().Get()); 262 DCHECK(!tls_current_thread_pool.Get().Get());
240 tls_current_thread_pool.Get().Set(outer_); 263 tls_current_thread_pool.Get().Set(outer_);
241 } 264 }
242 265
243 scoped_refptr<Sequence> 266 scoped_refptr<Sequence>
244 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork( 267 SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::GetWork(
245 SchedulerWorkerThread* worker_thread, 268 SchedulerWorkerThread* worker_thread,
246 PriorityQueue* single_threaded_priority_queue, 269 PriorityQueue* single_threaded_priority_queue,
247 bool* is_single_threaded_sequence) { 270 bool* is_single_threaded_sequence) {
248 // TODO(fdoray): Return a Sequence from |single_threaded_priority_queue| when 271 DCHECK(worker_thread);
249 // appropriate. 272 DCHECK(single_threaded_priority_queue);
273 DCHECK(is_single_threaded_sequence);
250 274
251 std::unique_ptr<PriorityQueue::Transaction> transaction( 275 *is_single_threaded_sequence = false;
276
277 std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
252 outer_->shared_priority_queue_.BeginTransaction()); 278 outer_->shared_priority_queue_.BeginTransaction());
253 const auto sequence_and_sort_key = transaction->Peek(); 279 const auto shared_sequence_and_sort_key = shared_transaction->Peek();
254 280
255 if (sequence_and_sort_key.is_null()) { 281 std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
256 // |transaction| is kept alive while |worker_thread| is added to 282 single_threaded_priority_queue->BeginTransaction());
283 const auto single_threaded_sequence_and_sort_key =
284 single_threaded_transaction->Peek();
285
286 if (shared_sequence_and_sort_key.is_null() &&
287 single_threaded_sequence_and_sort_key.is_null()) {
288 single_threaded_transaction.reset();
289
290 // |shared_transaction| is kept alive while |worker_thread| is added to
257 // |idle_worker_threads_stack_| to avoid this race: 291 // |idle_worker_threads_stack_| to avoid this race:
258 // 1. This thread creates a Transaction, finds |shared_priority_queue_| 292 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
259 // empty and ends the Transaction. 293 // empty and ends the Transaction.
260 // 2. Other thread creates a Transaction, inserts a Sequence into 294 // 2. Other thread creates a Transaction, inserts a Sequence into
261 // |shared_priority_queue_| and ends the Transaction. This can't happen 295 // |shared_priority_queue_| and ends the Transaction. This can't happen
262 // if the Transaction of step 1 is still active because because there can 296 // if the Transaction of step 1 is still active because because there can
263 // only be one active Transaction per PriorityQueue at a time. 297 // only be one active Transaction per PriorityQueue at a time.
264 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because 298 // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
265 // |idle_worker_threads_stack_| is empty. 299 // |idle_worker_threads_stack_| is empty.
266 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to 300 // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
267 // sleep. No thread runs the Sequence inserted in step 2. 301 // sleep. No thread runs the Sequence inserted in step 2.
268 outer_->AddToIdleWorkerThreadsStack(worker_thread); 302 outer_->AddToIdleWorkerThreadsStack(worker_thread);
269 return nullptr; 303 return nullptr;
270 } 304 }
271 305
272 transaction->Pop(); 306 scoped_refptr<Sequence> sequence;
273 return sequence_and_sort_key.sequence; 307
308 if (single_threaded_sequence_and_sort_key.is_null() ||
309 (!shared_sequence_and_sort_key.is_null() &&
310 single_threaded_sequence_and_sort_key.sort_key <
311 shared_sequence_and_sort_key.sort_key)) {
312 shared_transaction->Pop();
313 sequence = std::move(shared_sequence_and_sort_key.sequence);
314 } else {
315 DCHECK(!single_threaded_sequence_and_sort_key.is_null());
316 single_threaded_transaction->Pop();
317 sequence = std::move(single_threaded_sequence_and_sort_key.sequence);
318 *is_single_threaded_sequence = true;
319 }
320
321 single_threaded_transaction.reset();
322 shared_transaction.reset();
323 outer_->RemoveFromIdleWorkerThreadsStack(worker_thread);
324
325 return sequence;
274 } 326 }
275 327
276 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence( 328 void SchedulerThreadPool::SchedulerWorkerThreadDelegateImpl::EnqueueSequence(
277 scoped_refptr<Sequence> sequence) { 329 scoped_refptr<Sequence> sequence) {
278 enqueue_sequence_callback_.Run(std::move(sequence)); 330 enqueue_sequence_callback_.Run(std::move(sequence));
279 } 331 }
280 332
281 SchedulerThreadPool::SchedulerThreadPool( 333 SchedulerThreadPool::SchedulerThreadPool(
282 const EnqueueSequenceCallback& enqueue_sequence_callback, 334 const EnqueueSequenceCallback& enqueue_sequence_callback,
283 TaskTracker* task_tracker, 335 TaskTracker* task_tracker,
(...skipping 17 matching lines...) Expand all
301 353
302 DCHECK(worker_threads_.empty()); 354 DCHECK(worker_threads_.empty());
303 355
304 for (size_t i = 0; i < max_threads; ++i) { 356 for (size_t i = 0; i < max_threads; ++i) {
305 std::unique_ptr<SchedulerWorkerThread> worker_thread = 357 std::unique_ptr<SchedulerWorkerThread> worker_thread =
306 SchedulerWorkerThread::CreateSchedulerWorkerThread( 358 SchedulerWorkerThread::CreateSchedulerWorkerThread(
307 thread_priority, worker_thread_delegate_.get(), task_tracker_, 359 thread_priority, worker_thread_delegate_.get(), task_tracker_,
308 delayed_task_manager_, &shared_priority_queue_); 360 delayed_task_manager_, &shared_priority_queue_);
309 if (!worker_thread) 361 if (!worker_thread)
310 break; 362 break;
311 idle_worker_threads_stack_.push(worker_thread.get()); 363 idle_worker_threads_stack_.Push(worker_thread.get());
312 worker_threads_.push_back(std::move(worker_thread)); 364 worker_threads_.push_back(std::move(worker_thread));
313 } 365 }
314 366
315 return !worker_threads_.empty(); 367 return !worker_threads_.empty();
316 } 368 }
317 369
318 void SchedulerThreadPool::WakeUpOneThread() { 370 void SchedulerThreadPool::WakeUpOneThread() {
319 SchedulerWorkerThread* worker_thread = PopOneIdleWorkerThread(); 371 SchedulerWorkerThread* worker_thread = nullptr;
372 {
373 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
374 if (!idle_worker_threads_stack_.Empty())
375 worker_thread = idle_worker_threads_stack_.Pop();
376 }
377
320 if (worker_thread) 378 if (worker_thread)
321 worker_thread->WakeUp(); 379 worker_thread->WakeUp();
322 } 380 }
323 381
324 void SchedulerThreadPool::AddToIdleWorkerThreadsStack( 382 void SchedulerThreadPool::AddToIdleWorkerThreadsStack(
325 SchedulerWorkerThread* worker_thread) { 383 SchedulerWorkerThread* worker_thread) {
326 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 384 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
327 idle_worker_threads_stack_.push(worker_thread); 385 idle_worker_threads_stack_.Push(worker_thread);
328 DCHECK_LE(idle_worker_threads_stack_.size(), worker_threads_.size()); 386 DCHECK_LE(idle_worker_threads_stack_.Size(), worker_threads_.size());
329 387
330 if (idle_worker_threads_stack_.size() == worker_threads_.size()) 388 if (idle_worker_threads_stack_.Size() == worker_threads_.size())
331 idle_worker_threads_stack_cv_for_testing_->Broadcast(); 389 idle_worker_threads_stack_cv_for_testing_->Broadcast();
332 } 390 }
333 391
334 SchedulerWorkerThread* SchedulerThreadPool::PopOneIdleWorkerThread() { 392 void SchedulerThreadPool::RemoveFromIdleWorkerThreadsStack(
393 SchedulerWorkerThread* worker_thread) {
335 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_); 394 AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
336 395 idle_worker_threads_stack_.Remove(worker_thread);
337 if (idle_worker_threads_stack_.empty())
338 return nullptr;
339
340 auto worker_thread = idle_worker_threads_stack_.top();
341 idle_worker_threads_stack_.pop();
342 return worker_thread;
343 } 396 }
344 397
345 } // namespace internal 398 } // namespace internal
346 } // namespace base 399 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool.h ('k') | base/task_scheduler/scheduler_thread_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698