Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: base/task_scheduler/task_scheduler_impl.cc

Issue 2405243003: TaskScheduler: Replace the SchedulerServiceThread with a base::Thread. (Closed)
Patch Set: CR robliao #9 Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/task_scheduler_impl.h" 5 #include "base/task_scheduler/task_scheduler_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/bind_helpers.h" 10 #include "base/bind_helpers.h"
11 #include "base/memory/ptr_util.h" 11 #include "base/memory/ptr_util.h"
12 #include "base/task_scheduler/scheduler_service_thread.h"
13 #include "base/task_scheduler/scheduler_worker_pool_params.h" 12 #include "base/task_scheduler/scheduler_worker_pool_params.h"
14 #include "base/task_scheduler/sequence_sort_key.h" 13 #include "base/task_scheduler/sequence_sort_key.h"
15 #include "base/task_scheduler/task.h" 14 #include "base/task_scheduler/task.h"
16 #include "base/time/time.h" 15 #include "base/time/time.h"
17 16
18 namespace base { 17 namespace base {
19 namespace internal { 18 namespace internal {
20 19
21 // static 20 // static
22 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create( 21 std::unique_ptr<TaskSchedulerImpl> TaskSchedulerImpl::Create(
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 void TaskSchedulerImpl::FlushForTesting() { 59 void TaskSchedulerImpl::FlushForTesting() {
61 task_tracker_.Flush(); 60 task_tracker_.Flush();
62 } 61 }
63 62
64 void TaskSchedulerImpl::JoinForTesting() { 63 void TaskSchedulerImpl::JoinForTesting() {
65 #if DCHECK_IS_ON() 64 #if DCHECK_IS_ON()
66 DCHECK(!join_for_testing_returned_.IsSet()); 65 DCHECK(!join_for_testing_returned_.IsSet());
67 #endif 66 #endif
68 for (const auto& worker_pool : worker_pools_) 67 for (const auto& worker_pool : worker_pools_)
69 worker_pool->JoinForTesting(); 68 worker_pool->JoinForTesting();
70 service_thread_->JoinForTesting(); 69 service_thread_.Stop();
71 #if DCHECK_IS_ON() 70 #if DCHECK_IS_ON()
72 join_for_testing_returned_.Set(); 71 join_for_testing_returned_.Set();
73 #endif 72 #endif
74 } 73 }
75 74
76 TaskSchedulerImpl::TaskSchedulerImpl(const WorkerPoolIndexForTraitsCallback& 75 TaskSchedulerImpl::TaskSchedulerImpl(const WorkerPoolIndexForTraitsCallback&
77 worker_pool_index_for_traits_callback) 76 worker_pool_index_for_traits_callback)
78 : delayed_task_manager_( 77 : service_thread_("TaskSchedulerServiceThread"),
79 Bind(&TaskSchedulerImpl::OnDelayedRunTimeUpdated, Unretained(this))),
80 worker_pool_index_for_traits_callback_( 78 worker_pool_index_for_traits_callback_(
81 worker_pool_index_for_traits_callback) 79 worker_pool_index_for_traits_callback) {
82 {
83 DCHECK(!worker_pool_index_for_traits_callback_.is_null()); 80 DCHECK(!worker_pool_index_for_traits_callback_.is_null());
84 } 81 }
85 82
86 void TaskSchedulerImpl::Initialize( 83 void TaskSchedulerImpl::Initialize(
87 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector) { 84 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector) {
88 DCHECK(!worker_pool_params_vector.empty()); 85 DCHECK(!worker_pool_params_vector.empty());
89 86
87 // Start the service thread.
88 constexpr MessageLoop::Type kServiceThreadMessageLoopType =
89 #if defined(OS_POSIX)
90 MessageLoop::TYPE_IO;
gab 2016/10/17 19:09:19 Document why this is required
fdoray 2016/10/18 20:10:49 Done.
91 #else
92 MessageLoop::TYPE_DEFAULT;
93 #endif
94 constexpr size_t kDefaultStackSize = 0;
95 CHECK(service_thread_.StartWithOptions(
96 Thread::Options(kServiceThreadMessageLoopType, kDefaultStackSize)));
97
90 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback 98 const SchedulerWorkerPoolImpl::ReEnqueueSequenceCallback
gab 2016/10/17 19:09:19 Move this below |delayed_task_manager_| instantiat
fdoray 2016/10/18 20:10:49 Done.
91 re_enqueue_sequence_callback = 99 re_enqueue_sequence_callback =
92 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this)); 100 Bind(&TaskSchedulerImpl::ReEnqueueSequenceCallback, Unretained(this));
93 101
102 // Instantiate the DelayedTaskManager. The service thread must be started
103 // before its TaskRunner is available.
104 delayed_task_manager_ =
105 base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner());
106
107 // Start worker pools.
94 for (const auto& worker_pool_params : worker_pool_params_vector) { 108 for (const auto& worker_pool_params : worker_pool_params_vector) {
95 // Passing pointers to objects owned by |this| to 109 // Passing pointers to objects owned by |this| to
96 // SchedulerWorkerPoolImpl::Create() is safe because a TaskSchedulerImpl 110 // SchedulerWorkerPoolImpl::Create() is safe because a TaskSchedulerImpl
97 // can't be deleted before all its worker pools have been joined. 111 // can't be deleted before all its worker pools have been joined.
98 worker_pools_.push_back(SchedulerWorkerPoolImpl::Create( 112 worker_pools_.push_back(SchedulerWorkerPoolImpl::Create(
99 worker_pool_params, re_enqueue_sequence_callback, &task_tracker_, 113 worker_pool_params, re_enqueue_sequence_callback, &task_tracker_,
100 &delayed_task_manager_)); 114 delayed_task_manager_.get()));
101 CHECK(worker_pools_.back()); 115 CHECK(worker_pools_.back());
102 } 116 }
103
104 service_thread_ = SchedulerServiceThread::Create(&task_tracker_,
105 &delayed_task_manager_);
106 CHECK(service_thread_);
107 } 117 }
108 118
109 SchedulerWorkerPool* TaskSchedulerImpl::GetWorkerPoolForTraits( 119 SchedulerWorkerPool* TaskSchedulerImpl::GetWorkerPoolForTraits(
110 const TaskTraits& traits) { 120 const TaskTraits& traits) {
111 const size_t index = worker_pool_index_for_traits_callback_.Run(traits); 121 const size_t index = worker_pool_index_for_traits_callback_.Run(traits);
112 DCHECK_LT(index, worker_pools_.size()); 122 DCHECK_LT(index, worker_pools_.size());
113 return worker_pools_[index].get(); 123 return worker_pools_[index].get();
114 } 124 }
115 125
116 void TaskSchedulerImpl::ReEnqueueSequenceCallback( 126 void TaskSchedulerImpl::ReEnqueueSequenceCallback(
117 scoped_refptr<Sequence> sequence) { 127 scoped_refptr<Sequence> sequence) {
118 DCHECK(sequence); 128 DCHECK(sequence);
119 129
120 const SequenceSortKey sort_key = sequence->GetSortKey(); 130 const SequenceSortKey sort_key = sequence->GetSortKey();
121 131
122 // The next task in |sequence| should run in a worker pool suited for its 132 // The next task in |sequence| should run in a worker pool suited for its
123 // traits, except for the priority which is adjusted to the highest priority 133 // traits, except for the priority which is adjusted to the highest priority
124 // in |sequence|. 134 // in |sequence|.
125 const TaskTraits traits = 135 const TaskTraits traits =
126 sequence->PeekTaskTraits().WithPriority(sort_key.priority()); 136 sequence->PeekTaskTraits().WithPriority(sort_key.priority());
127 137
128 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence), 138 GetWorkerPoolForTraits(traits)->ReEnqueueSequence(std::move(sequence),
129 sort_key); 139 sort_key);
130 } 140 }
131 141
132 void TaskSchedulerImpl::OnDelayedRunTimeUpdated() {
133 service_thread_->WakeUp();
134 }
135
136 } // namespace internal 142 } // namespace internal
137 } // namespace base 143 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698