Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(587)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2405243003: TaskScheduler: Replace the SchedulerServiceThread with a base::Thread. (Closed)
Patch Set: self-review Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/atomicops.h" 12 #include "base/atomicops.h"
13 #include "base/bind.h" 13 #include "base/bind.h"
14 #include "base/bind_helpers.h" 14 #include "base/bind_helpers.h"
15 #include "base/lazy_instance.h" 15 #include "base/lazy_instance.h"
16 #include "base/memory/ptr_util.h" 16 #include "base/memory/ptr_util.h"
17 #include "base/metrics/histogram.h" 17 #include "base/metrics/histogram.h"
18 #include "base/sequence_token.h" 18 #include "base/sequence_token.h"
19 #include "base/sequenced_task_runner.h" 19 #include "base/sequenced_task_runner.h"
20 #include "base/single_thread_task_runner.h" 20 #include "base/single_thread_task_runner.h"
21 #include "base/strings/stringprintf.h" 21 #include "base/strings/stringprintf.h"
22 #include "base/task_scheduler/delayed_task_manager.h"
23 #include "base/task_scheduler/task_tracker.h" 22 #include "base/task_scheduler/task_tracker.h"
24 #include "base/threading/platform_thread.h" 23 #include "base/threading/platform_thread.h"
25 #include "base/threading/thread_local.h" 24 #include "base/threading/thread_local.h"
26 #include "base/threading/thread_restrictions.h" 25 #include "base/threading/thread_restrictions.h"
27 #include "base/time/time.h" 26 #include "base/time/time.h"
28 27
29 namespace base { 28 namespace base {
30 namespace internal { 29 namespace internal {
31 30
32 namespace { 31 namespace {
(...skipping 25 matching lines...) Expand all
58 DCHECK(worker_pool_); 57 DCHECK(worker_pool_);
59 } 58 }
60 59
61 // TaskRunner: 60 // TaskRunner:
62 bool PostDelayedTask(const tracked_objects::Location& from_here, 61 bool PostDelayedTask(const tracked_objects::Location& from_here,
63 const Closure& closure, 62 const Closure& closure,
64 TimeDelta delay) override { 63 TimeDelta delay) override {
65 // Post the task as part of a one-off single-task Sequence. 64 // Post the task as part of a one-off single-task Sequence.
66 return worker_pool_->PostTaskWithSequence( 65 return worker_pool_->PostTaskWithSequence(
67 MakeUnique<Task>(from_here, closure, traits_, delay), 66 MakeUnique<Task>(from_here, closure, traits_, delay),
68 make_scoped_refptr(new Sequence), nullptr); 67 make_scoped_refptr(new Sequence), nullptr, delay);
69 } 68 }
70 69
71 bool RunsTasksOnCurrentThread() const override { 70 bool RunsTasksOnCurrentThread() const override {
72 return tls_current_worker_pool.Get().Get() == worker_pool_; 71 return tls_current_worker_pool.Get().Get() == worker_pool_;
73 } 72 }
74 73
75 private: 74 private:
76 ~SchedulerParallelTaskRunner() override = default; 75 ~SchedulerParallelTaskRunner() override = default;
77 76
78 const TaskTraits traits_; 77 const TaskTraits traits_;
(...skipping 16 matching lines...) Expand all
95 94
96 // SequencedTaskRunner: 95 // SequencedTaskRunner:
97 bool PostDelayedTask(const tracked_objects::Location& from_here, 96 bool PostDelayedTask(const tracked_objects::Location& from_here,
98 const Closure& closure, 97 const Closure& closure,
99 TimeDelta delay) override { 98 TimeDelta delay) override {
100 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 99 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
101 task->sequenced_task_runner_ref = this; 100 task->sequenced_task_runner_ref = this;
102 101
103 // Post the task as part of |sequence_|. 102 // Post the task as part of |sequence_|.
104 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 103 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
105 nullptr); 104 nullptr, delay);
106 } 105 }
107 106
108 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 107 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
109 const Closure& closure, 108 const Closure& closure,
110 base::TimeDelta delay) override { 109 base::TimeDelta delay) override {
111 // Tasks are never nested within the task scheduler. 110 // Tasks are never nested within the task scheduler.
112 return PostDelayedTask(from_here, closure, delay); 111 return PostDelayedTask(from_here, closure, delay);
113 } 112 }
114 113
115 bool RunsTasksOnCurrentThread() const override { 114 bool RunsTasksOnCurrentThread() const override {
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
180 179
181 // SingleThreadTaskRunner: 180 // SingleThreadTaskRunner:
182 bool PostDelayedTask(const tracked_objects::Location& from_here, 181 bool PostDelayedTask(const tracked_objects::Location& from_here,
183 const Closure& closure, 182 const Closure& closure,
184 TimeDelta delay) override { 183 TimeDelta delay) override {
185 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 184 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
186 task->single_thread_task_runner_ref = this; 185 task->single_thread_task_runner_ref = this;
187 186
188 // Post the task to be executed by |worker_| as part of |sequence_|. 187 // Post the task to be executed by |worker_| as part of |sequence_|.
189 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 188 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
190 worker_); 189 worker_, delay);
191 } 190 }
192 191
193 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 192 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
194 const Closure& closure, 193 const Closure& closure,
195 base::TimeDelta delay) override { 194 base::TimeDelta delay) override {
196 // Tasks are never nested within the task scheduler. 195 // Tasks are never nested within the task scheduler.
197 return PostDelayedTask(from_here, closure, delay); 196 return PostDelayedTask(from_here, closure, delay);
198 } 197 }
199 198
200 bool RunsTasksOnCurrentThread() const override { 199 bool RunsTasksOnCurrentThread() const override {
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
291 // SchedulerWorkerPool should never be deleted in production unless its 290 // SchedulerWorkerPool should never be deleted in production unless its
292 // initialization failed. 291 // initialization failed.
293 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); 292 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
294 } 293 }
295 294
296 // static 295 // static
297 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( 296 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
298 const SchedulerWorkerPoolParams& params, 297 const SchedulerWorkerPoolParams& params,
299 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 298 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
300 TaskTracker* task_tracker, 299 TaskTracker* task_tracker,
301 DelayedTaskManager* delayed_task_manager) { 300 scoped_refptr<TaskRunner> service_thread_task_runner) {
302 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( 301 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool(
303 new SchedulerWorkerPoolImpl(params.name(), 302 new SchedulerWorkerPoolImpl(params.name(), params.io_restriction(),
304 params.io_restriction(), 303 params.suggested_reclaim_time(), task_tracker,
305 params.suggested_reclaim_time(), 304 std::move(service_thread_task_runner)));
306 task_tracker, delayed_task_manager));
307 if (worker_pool->Initialize(params.priority_hint(), params.max_threads(), 305 if (worker_pool->Initialize(params.priority_hint(), params.max_threads(),
308 re_enqueue_sequence_callback)) { 306 re_enqueue_sequence_callback)) {
309 return worker_pool; 307 return worker_pool;
310 } 308 }
311 return nullptr; 309 return nullptr;
312 } 310 }
313 311
314 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { 312 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
315 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 313 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
316 while (idle_workers_stack_.Size() < workers_.size()) 314 while (idle_workers_stack_.Size() < workers_.size())
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 // we did wake up another worker, we would waste resources by having more 373 // we did wake up another worker, we would waste resources by having more
376 // workers trying to get a Sequence from |shared_priority_queue_| than the 374 // workers trying to get a Sequence from |shared_priority_queue_| than the
377 // number of Sequences in it. 375 // number of Sequences in it.
378 if (tls_current_worker_pool.Get().Get() != this) 376 if (tls_current_worker_pool.Get().Get() != this)
379 WakeUpOneWorker(); 377 WakeUpOneWorker();
380 } 378 }
381 379
382 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( 380 bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
383 std::unique_ptr<Task> task, 381 std::unique_ptr<Task> task,
384 scoped_refptr<Sequence> sequence, 382 scoped_refptr<Sequence> sequence,
385 SchedulerWorker* worker) { 383 SchedulerWorker* worker,
384 TimeDelta delay) {
robliao 2016/10/12 20:55:45 I would be okay with storing the delay on the task
fdoray 2016/10/13 13:51:29 Done. Stored the delay with Task and added TODO.
386 DCHECK(task); 385 DCHECK(task);
387 DCHECK(sequence); 386 DCHECK(sequence);
388 DCHECK(!worker || ContainsWorker(workers_, worker)); 387 DCHECK(!worker || ContainsWorker(workers_, worker));
388 DCHECK_EQ(delay.is_zero(), task->delayed_run_time.is_null());
389 389
390 if (!task_tracker_->WillPostTask(task.get())) 390 if (!task_tracker_->WillPostTask(task.get()))
391 return false; 391 return false;
392 392
393 if (task->delayed_run_time.is_null()) { 393 if (delay.is_zero()) {
394 PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); 394 PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker);
395 } else { 395 } else {
396 delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), 396 service_thread_task_runner_->PostDelayedTask(
397 worker, this); 397 FROM_HERE, Bind(&SchedulerWorkerPoolImpl::PostTaskWithSequenceNow,
398 Unretained(this), Passed(std::move(task)),
399 std::move(sequence), worker),
400 delay);
398 } 401 }
399 402
400 return true; 403 return true;
401 } 404 }
402 405
403 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( 406 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
404 std::unique_ptr<Task> task, 407 std::unique_ptr<Task> task,
405 scoped_refptr<Sequence> sequence, 408 scoped_refptr<Sequence> sequence,
406 SchedulerWorker* worker) { 409 SchedulerWorker* worker) {
407 DCHECK(task); 410 DCHECK(task);
408 DCHECK(sequence); 411 DCHECK(sequence);
409 DCHECK(!worker || ContainsWorker(workers_, worker)); 412 DCHECK(!worker || ContainsWorker(workers_, worker));
410 413
411 // Confirm that |task| is ready to run (its delayed run time is either null or 414 // Confirm that |task| is ready to run (its delayed run time is either null or
412 // in the past). 415 // in the past).
413 DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); 416 DCHECK_LE(task->delayed_run_time, TimeTicks::Now());
414 417
415 // Because |worker| belongs to this worker pool, we know that the type 418 // Because |worker| belongs to this worker pool, we know that the type
416 // of its delegate is SchedulerWorkerDelegateImpl. 419 // of its delegate is SchedulerWorkerDelegateImpl.
417 PriorityQueue* const priority_queue = 420 PriorityQueue* const priority_queue =
418 worker 421 worker
419 ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) 422 ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate())
420 ->single_threaded_priority_queue() 423 ->single_threaded_priority_queue()
421 : &shared_priority_queue_; 424 : &shared_priority_queue_;
422 DCHECK(priority_queue); 425 DCHECK(priority_queue);
423 426
(...skipping 222 matching lines...) Expand 10 before | Expand all | Expand 10 after
646 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && 649 !subtle::NoBarrier_Load(&num_single_threaded_runners_) &&
647 outer_->CanWorkerDetachForTesting(); 650 outer_->CanWorkerDetachForTesting();
648 return can_detach; 651 return can_detach;
649 } 652 }
650 653
651 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( 654 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
652 StringPiece name, 655 StringPiece name,
653 SchedulerWorkerPoolParams::IORestriction io_restriction, 656 SchedulerWorkerPoolParams::IORestriction io_restriction,
654 const TimeDelta& suggested_reclaim_time, 657 const TimeDelta& suggested_reclaim_time,
655 TaskTracker* task_tracker, 658 TaskTracker* task_tracker,
656 DelayedTaskManager* delayed_task_manager) 659 scoped_refptr<TaskRunner> service_thread_task_runner)
657 : name_(name.as_string()), 660 : name_(name.as_string()),
658 io_restriction_(io_restriction), 661 io_restriction_(io_restriction),
659 suggested_reclaim_time_(suggested_reclaim_time), 662 suggested_reclaim_time_(suggested_reclaim_time),
660 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), 663 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
661 idle_workers_stack_cv_for_testing_( 664 idle_workers_stack_cv_for_testing_(
662 idle_workers_stack_lock_.CreateConditionVariable()), 665 idle_workers_stack_lock_.CreateConditionVariable()),
663 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 666 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
664 WaitableEvent::InitialState::NOT_SIGNALED), 667 WaitableEvent::InitialState::NOT_SIGNALED),
665 #if DCHECK_IS_ON() 668 #if DCHECK_IS_ON()
666 workers_created_(WaitableEvent::ResetPolicy::MANUAL, 669 workers_created_(WaitableEvent::ResetPolicy::MANUAL,
(...skipping 10 matching lines...) Expand all
677 // expected to run between zero and a few tens of tasks between waits. 680 // expected to run between zero and a few tens of tasks between waits.
678 // When it runs more than 100 tasks, there is no need to know the exact 681 // When it runs more than 100 tasks, there is no need to know the exact
679 // number of tasks that ran. 682 // number of tasks that ran.
680 num_tasks_between_waits_histogram_(Histogram::FactoryGet( 683 num_tasks_between_waits_histogram_(Histogram::FactoryGet(
681 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, 684 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix,
682 1, 685 1,
683 100, 686 100,
684 50, 687 50,
685 HistogramBase::kUmaTargetedHistogramFlag)), 688 HistogramBase::kUmaTargetedHistogramFlag)),
686 task_tracker_(task_tracker), 689 task_tracker_(task_tracker),
687 delayed_task_manager_(delayed_task_manager) { 690 service_thread_task_runner_(std::move(service_thread_task_runner)) {
688 DCHECK(task_tracker_); 691 DCHECK(task_tracker_);
689 DCHECK(delayed_task_manager_); 692 DCHECK(service_thread_task_runner_);
690 } 693 }
691 694
692 bool SchedulerWorkerPoolImpl::Initialize( 695 bool SchedulerWorkerPoolImpl::Initialize(
693 ThreadPriority priority_hint, 696 ThreadPriority priority_hint,
694 size_t max_threads, 697 size_t max_threads,
695 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 698 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
696 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 699 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
697 700
698 DCHECK(workers_.empty()); 701 DCHECK(workers_.empty());
699 702
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
753 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 756 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
754 idle_workers_stack_.Remove(worker); 757 idle_workers_stack_.Remove(worker);
755 } 758 }
756 759
757 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { 760 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
758 return !worker_detachment_disallowed_.IsSet(); 761 return !worker_detachment_disallowed_.IsSet();
759 } 762 }
760 763
761 } // namespace internal 764 } // namespace internal
762 } // namespace base 765 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698