OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/atomicops.h" | 12 #include "base/atomicops.h" |
13 #include "base/bind.h" | 13 #include "base/bind.h" |
14 #include "base/bind_helpers.h" | 14 #include "base/bind_helpers.h" |
15 #include "base/lazy_instance.h" | 15 #include "base/lazy_instance.h" |
16 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
17 #include "base/metrics/histogram.h" | 17 #include "base/metrics/histogram.h" |
18 #include "base/sequence_token.h" | 18 #include "base/sequence_token.h" |
19 #include "base/sequenced_task_runner.h" | 19 #include "base/sequenced_task_runner.h" |
20 #include "base/single_thread_task_runner.h" | |
21 #include "base/strings/stringprintf.h" | 20 #include "base/strings/stringprintf.h" |
22 #include "base/task_runner.h" | 21 #include "base/task_runner.h" |
23 #include "base/task_scheduler/delayed_task_manager.h" | 22 #include "base/task_scheduler/delayed_task_manager.h" |
24 #include "base/task_scheduler/scheduler_worker_pool_params.h" | 23 #include "base/task_scheduler/scheduler_worker_pool_params.h" |
25 #include "base/task_scheduler/task_tracker.h" | 24 #include "base/task_scheduler/task_tracker.h" |
26 #include "base/task_scheduler/task_traits.h" | 25 #include "base/task_scheduler/task_traits.h" |
27 #include "base/threading/platform_thread.h" | 26 #include "base/threading/platform_thread.h" |
28 #include "base/threading/thread_local.h" | 27 #include "base/threading/thread_local.h" |
29 #include "base/threading/thread_restrictions.h" | 28 #include "base/threading/thread_restrictions.h" |
30 | 29 |
(...skipping 26 matching lines...) Expand all Loading... | |
57 DCHECK(worker_pool_); | 56 DCHECK(worker_pool_); |
58 } | 57 } |
59 | 58 |
60 // TaskRunner: | 59 // TaskRunner: |
61 bool PostDelayedTask(const tracked_objects::Location& from_here, | 60 bool PostDelayedTask(const tracked_objects::Location& from_here, |
62 const Closure& closure, | 61 const Closure& closure, |
63 TimeDelta delay) override { | 62 TimeDelta delay) override { |
64 // Post the task as part of a one-off single-task Sequence. | 63 // Post the task as part of a one-off single-task Sequence. |
65 return worker_pool_->PostTaskWithSequence( | 64 return worker_pool_->PostTaskWithSequence( |
66 MakeUnique<Task>(from_here, closure, traits_, delay), | 65 MakeUnique<Task>(from_here, closure, traits_, delay), |
67 make_scoped_refptr(new Sequence), nullptr); | 66 make_scoped_refptr(new Sequence)); |
68 } | 67 } |
69 | 68 |
70 bool RunsTasksOnCurrentThread() const override { | 69 bool RunsTasksOnCurrentThread() const override { |
71 return tls_current_worker_pool.Get().Get() == worker_pool_; | 70 return tls_current_worker_pool.Get().Get() == worker_pool_; |
72 } | 71 } |
73 | 72 |
74 private: | 73 private: |
75 ~SchedulerParallelTaskRunner() override = default; | 74 ~SchedulerParallelTaskRunner() override = default; |
76 | 75 |
77 const TaskTraits traits_; | 76 const TaskTraits traits_; |
(...skipping 15 matching lines...) Expand all Loading... | |
93 } | 92 } |
94 | 93 |
95 // SequencedTaskRunner: | 94 // SequencedTaskRunner: |
96 bool PostDelayedTask(const tracked_objects::Location& from_here, | 95 bool PostDelayedTask(const tracked_objects::Location& from_here, |
97 const Closure& closure, | 96 const Closure& closure, |
98 TimeDelta delay) override { | 97 TimeDelta delay) override { |
99 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 98 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
100 task->sequenced_task_runner_ref = this; | 99 task->sequenced_task_runner_ref = this; |
101 | 100 |
102 // Post the task as part of |sequence_|. | 101 // Post the task as part of |sequence_|. |
103 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 102 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); |
104 nullptr); | |
105 } | 103 } |
106 | 104 |
107 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 105 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
108 const Closure& closure, | 106 const Closure& closure, |
109 base::TimeDelta delay) override { | 107 base::TimeDelta delay) override { |
110 // Tasks are never nested within the task scheduler. | 108 // Tasks are never nested within the task scheduler. |
111 return PostDelayedTask(from_here, closure, delay); | 109 return PostDelayedTask(from_here, closure, delay); |
112 } | 110 } |
113 | 111 |
114 bool RunsTasksOnCurrentThread() const override { | 112 bool RunsTasksOnCurrentThread() const override { |
(...skipping 19 matching lines...) Expand all Loading... | |
134 const SchedulerWorker* worker) { | 132 const SchedulerWorker* worker) { |
135 auto it = std::find_if(workers.begin(), workers.end(), | 133 auto it = std::find_if(workers.begin(), workers.end(), |
136 [worker](const scoped_refptr<SchedulerWorker>& i) { | 134 [worker](const scoped_refptr<SchedulerWorker>& i) { |
137 return i.get() == worker; | 135 return i.get() == worker; |
138 }); | 136 }); |
139 return it != workers.end(); | 137 return it != workers.end(); |
140 } | 138 } |
141 | 139 |
142 } // namespace | 140 } // namespace |
143 | 141 |
144 // TODO(http://crbug.com/694823): Remove this and supporting framework. | |
145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | |
146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : | |
147 public SingleThreadTaskRunner { | |
148 public: | |
149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | |
150 // tasks so long as |worker_pool| and |worker| are alive. | |
151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | |
152 // and |worker|. | |
153 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
154 SchedulerWorkerPool* worker_pool, | |
155 SchedulerWorker* worker); | |
156 | |
157 // SingleThreadTaskRunner: | |
158 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
159 const Closure& closure, | |
160 TimeDelta delay) override { | |
161 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | |
162 task->single_thread_task_runner_ref = this; | |
163 | |
164 // Post the task to be executed by |worker_| as part of |sequence_|. | |
165 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | |
166 worker_); | |
167 } | |
168 | |
169 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
170 const Closure& closure, | |
171 base::TimeDelta delay) override { | |
172 // Tasks are never nested within the task scheduler. | |
173 return PostDelayedTask(from_here, closure, delay); | |
174 } | |
175 | |
176 bool RunsTasksOnCurrentThread() const override { | |
177 // Even though this is a SingleThreadTaskRunner, test the actual sequence | |
178 // instead of the assigned worker so that another task randomly assigned | |
179 // to the same worker doesn't return true by happenstance. | |
180 return sequence_->token() == SequenceToken::GetForCurrentThread(); | |
181 } | |
182 | |
183 private: | |
184 ~SchedulerSingleThreadTaskRunner() override; | |
185 | |
186 // Sequence for all Tasks posted through this TaskRunner. | |
187 const scoped_refptr<Sequence> sequence_ = new Sequence; | |
188 | |
189 const TaskTraits traits_; | |
190 SchedulerWorkerPool* const worker_pool_; | |
191 SchedulerWorker* const worker_; | |
192 | |
193 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
194 }; | |
195 | |
196 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 142 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
197 : public SchedulerWorker::Delegate { | 143 : public SchedulerWorker::Delegate { |
198 public: | 144 public: |
199 // |outer| owns the worker for which this delegate is constructed. | 145 // |outer| owns the worker for which this delegate is constructed. |
200 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 146 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
201 // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 147 // called. |index| will be appended to the pool name to label the underlying |
202 // PriorityQueue whose transactions may overlap with the worker's | 148 // worker threads. |
203 // single-threaded PriorityQueue's transactions. |index| will be appended to | |
204 // the pool name to label the underlying worker threads. | |
205 SchedulerWorkerDelegateImpl( | 149 SchedulerWorkerDelegateImpl( |
206 SchedulerWorkerPoolImpl* outer, | 150 SchedulerWorkerPoolImpl* outer, |
207 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 151 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
208 const PriorityQueue* shared_priority_queue, | |
209 int index); | 152 int index); |
210 ~SchedulerWorkerDelegateImpl() override; | 153 ~SchedulerWorkerDelegateImpl() override; |
211 | 154 |
212 PriorityQueue* single_threaded_priority_queue() { | |
213 return &single_threaded_priority_queue_; | |
214 } | |
215 | |
216 // SchedulerWorker::Delegate: | 155 // SchedulerWorker::Delegate: |
217 void OnMainEntry(SchedulerWorker* worker) override; | 156 void OnMainEntry(SchedulerWorker* worker) override; |
218 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 157 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
219 void DidRunTask() override; | 158 void DidRunTask() override; |
220 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 159 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
221 TimeDelta GetSleepTimeout() override; | 160 TimeDelta GetSleepTimeout() override; |
222 bool CanDetach(SchedulerWorker* worker) override; | 161 bool CanDetach(SchedulerWorker* worker) override; |
223 void OnDetach() override; | 162 void OnDetach() override; |
224 | 163 |
225 void RegisterSingleThreadTaskRunner() { | |
226 // No barrier as barriers only affect sequential consistency which is | |
227 // irrelevant in a single variable use case (they don't force an immediate | |
228 // flush anymore than atomics do by default). | |
229 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); | |
230 } | |
231 | |
232 void UnregisterSingleThreadTaskRunner() { | |
233 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); | |
234 } | |
235 | |
236 private: | 164 private: |
237 SchedulerWorkerPoolImpl* outer_; | 165 SchedulerWorkerPoolImpl* outer_; |
238 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 166 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
239 | 167 |
240 // Single-threaded PriorityQueue for the worker. | |
241 PriorityQueue single_threaded_priority_queue_; | |
242 | |
243 // True if the last Sequence returned by GetWork() was extracted from | |
244 // |single_threaded_priority_queue_|. | |
245 bool last_sequence_is_single_threaded_ = false; | |
246 | |
247 // Time of the last detach. | 168 // Time of the last detach. |
248 TimeTicks last_detach_time_; | 169 TimeTicks last_detach_time_; |
249 | 170 |
250 // Time when GetWork() first returned nullptr. | 171 // Time when GetWork() first returned nullptr. |
251 TimeTicks idle_start_time_; | 172 TimeTicks idle_start_time_; |
252 | 173 |
253 // Indicates whether the last call to GetWork() returned nullptr. | 174 // Indicates whether the last call to GetWork() returned nullptr. |
254 bool last_get_work_returned_nullptr_ = false; | 175 bool last_get_work_returned_nullptr_ = false; |
255 | 176 |
256 // Indicates whether the SchedulerWorker was detached since the last call to | 177 // Indicates whether the SchedulerWorker was detached since the last call to |
257 // GetWork(). | 178 // GetWork(). |
258 bool did_detach_since_last_get_work_ = false; | 179 bool did_detach_since_last_get_work_ = false; |
259 | 180 |
260 // Number of tasks executed since the last time the | 181 // Number of tasks executed since the last time the |
261 // TaskScheduler.NumTasksBetweenWaits histogram was recorded. | 182 // TaskScheduler.NumTasksBetweenWaits histogram was recorded. |
262 size_t num_tasks_since_last_wait_ = 0; | 183 size_t num_tasks_since_last_wait_ = 0; |
263 | 184 |
264 // Number of tasks executed since the last time the | 185 // Number of tasks executed since the last time the |
265 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. | 186 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. |
266 size_t num_tasks_since_last_detach_ = 0; | 187 size_t num_tasks_since_last_detach_ = 0; |
267 | 188 |
268 subtle::Atomic32 num_single_threaded_runners_ = 0; | |
269 | |
270 const int index_; | 189 const int index_; |
271 | 190 |
272 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 191 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
273 }; | 192 }; |
274 | 193 |
275 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 194 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
276 // SchedulerWorkerPool should never be deleted in production unless its | 195 // SchedulerWorkerPool should never be deleted in production unless its |
277 // initialization failed. | 196 // initialization failed. |
278 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 197 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
279 } | 198 } |
(...skipping 15 matching lines...) Expand all Loading... | |
295 const TaskTraits& traits) { | 214 const TaskTraits& traits) { |
296 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 215 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
297 } | 216 } |
298 | 217 |
299 scoped_refptr<SequencedTaskRunner> | 218 scoped_refptr<SequencedTaskRunner> |
300 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 219 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
301 const TaskTraits& traits) { | 220 const TaskTraits& traits) { |
302 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 221 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
303 } | 222 } |
304 | 223 |
305 scoped_refptr<SingleThreadTaskRunner> | |
306 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( | |
307 const TaskTraits& traits) { | |
308 // TODO(fdoray): Find a way to take load into account when assigning a | |
309 // SchedulerWorker to a SingleThreadTaskRunner. | |
310 size_t worker_index; | |
311 { | |
312 AutoSchedulerLock auto_lock(next_worker_index_lock_); | |
313 worker_index = next_worker_index_; | |
314 next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); | |
315 } | |
316 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | |
317 traits, this, workers_[worker_index].get())); | |
318 } | |
319 | |
320 void SchedulerWorkerPoolImpl::ReEnqueueSequence( | 224 void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
321 scoped_refptr<Sequence> sequence, | 225 scoped_refptr<Sequence> sequence, |
322 const SequenceSortKey& sequence_sort_key) { | 226 const SequenceSortKey& sequence_sort_key) { |
323 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | 227 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
324 sequence_sort_key); | 228 sequence_sort_key); |
325 | 229 |
326 // The thread calling this method just ran a Task from |sequence| and will | 230 // The thread calling this method just ran a Task from |sequence| and will |
327 // soon try to get another Sequence from which to run a Task. If the thread | 231 // soon try to get another Sequence from which to run a Task. If the thread |
328 // belongs to this pool, it will get that Sequence from | 232 // belongs to this pool, it will get that Sequence from |
329 // |shared_priority_queue_|. When that's the case, there is no need to wake up | 233 // |shared_priority_queue_|. When that's the case, there is no need to wake up |
330 // another worker after |sequence| is inserted in |shared_priority_queue_|. If | 234 // another worker after |sequence| is inserted in |shared_priority_queue_|. If |
331 // we did wake up another worker, we would waste resources by having more | 235 // we did wake up another worker, we would waste resources by having more |
332 // workers trying to get a Sequence from |shared_priority_queue_| than the | 236 // workers trying to get a Sequence from |shared_priority_queue_| than the |
333 // number of Sequences in it. | 237 // number of Sequences in it. |
334 if (tls_current_worker_pool.Get().Get() != this) | 238 if (tls_current_worker_pool.Get().Get() != this) |
335 WakeUpOneWorker(); | 239 WakeUpOneWorker(); |
336 } | 240 } |
337 | 241 |
338 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( | 242 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
339 std::unique_ptr<Task> task, | 243 std::unique_ptr<Task> task, |
340 scoped_refptr<Sequence> sequence, | 244 scoped_refptr<Sequence> sequence) { |
341 SchedulerWorker* worker) { | |
342 DCHECK(task); | 245 DCHECK(task); |
343 DCHECK(sequence); | 246 DCHECK(sequence); |
344 DCHECK(!worker || ContainsWorker(workers_, worker)); | |
345 | 247 |
346 if (!task_tracker_->WillPostTask(task.get())) | 248 if (!task_tracker_->WillPostTask(task.get())) |
347 return false; | 249 return false; |
348 | 250 |
349 if (task->delayed_run_time.is_null()) { | 251 if (task->delayed_run_time.is_null()) { |
350 PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); | 252 PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
351 } else { | 253 } else { |
352 delayed_task_manager_->AddDelayedTask( | 254 delayed_task_manager_->AddDelayedTask( |
353 std::move(task), | 255 std::move(task), |
354 Bind( | 256 Bind( |
355 [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker, | 257 [](scoped_refptr<Sequence> sequence, |
356 SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { | 258 SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { |
357 worker_pool->PostTaskWithSequenceNow(std::move(task), | 259 worker_pool->PostTaskWithSequenceNow(std::move(task), |
358 std::move(sequence), worker); | 260 std::move(sequence)); |
359 }, | 261 }, |
360 std::move(sequence), Unretained(worker), Unretained(this))); | 262 std::move(sequence), Unretained(this))); |
361 } | 263 } |
362 | 264 |
363 return true; | 265 return true; |
364 } | 266 } |
365 | 267 |
366 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( | 268 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
367 std::unique_ptr<Task> task, | 269 std::unique_ptr<Task> task, |
368 scoped_refptr<Sequence> sequence, | 270 scoped_refptr<Sequence> sequence) { |
369 SchedulerWorker* worker) { | |
370 DCHECK(task); | 271 DCHECK(task); |
371 DCHECK(sequence); | 272 DCHECK(sequence); |
372 DCHECK(!worker || ContainsWorker(workers_, worker)); | |
373 | 273 |
374 // Confirm that |task| is ready to run (its delayed run time is either null or | 274 // Confirm that |task| is ready to run (its delayed run time is either null or |
375 // in the past). | 275 // in the past). |
376 DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); | 276 DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); |
377 | 277 |
378 // Because |worker| belongs to this worker pool, we know that the type | |
379 // of its delegate is SchedulerWorkerDelegateImpl. | |
380 PriorityQueue* const priority_queue = | |
381 worker | |
382 ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) | |
383 ->single_threaded_priority_queue() | |
384 : &shared_priority_queue_; | |
385 DCHECK(priority_queue); | |
386 | |
387 const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 278 const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
388 if (sequence_was_empty) { | 279 if (sequence_was_empty) { |
389 // Insert |sequence| in |priority_queue| if it was empty before |task| was | 280 // Insert |sequence| in |shared_priority_queue_| if it was empty before |
390 // inserted into it. Otherwise, one of these must be true: | 281 // |task| was inserted into it. Otherwise, one of these must be true: |
391 // - |sequence| is already in a PriorityQueue (not necessarily | 282 // - |sequence| is already in a PriorityQueue, or, |
392 // |shared_priority_queue_|), or, | |
393 // - A worker is running a Task from |sequence|. It will insert |sequence| | 283 // - A worker is running a Task from |sequence|. It will insert |sequence| |
394 // in a PriorityQueue once it's done running the Task. | 284 // in a PriorityQueue once it's done running the Task. |
395 const auto sequence_sort_key = sequence->GetSortKey(); | 285 const auto sequence_sort_key = sequence->GetSortKey(); |
396 priority_queue->BeginTransaction()->Push(std::move(sequence), | 286 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
397 sequence_sort_key); | 287 sequence_sort_key); |
398 | 288 |
399 // Wake up a worker to process |sequence|. | 289 // Wake up a worker to process |sequence|. |
gab
2017/03/15 20:20:58
rm now redundant comment
robliao
2017/03/15 20:46:44
This comment is consistent with the current style
gab
2017/03/16 15:34:46
Hmm ok, we at least shouldn't add more (and I'm ha
robliao
2017/03/16 22:49:03
While I agree with your opinion, the style guide i
| |
400 if (worker) | 290 WakeUpOneWorker(); |
401 WakeUpWorker(worker); | |
402 else | |
403 WakeUpOneWorker(); | |
404 } | 291 } |
405 } | 292 } |
406 | 293 |
407 void SchedulerWorkerPoolImpl::GetHistograms( | 294 void SchedulerWorkerPoolImpl::GetHistograms( |
408 std::vector<const HistogramBase*>* histograms) const { | 295 std::vector<const HistogramBase*>* histograms) const { |
409 histograms->push_back(detach_duration_histogram_); | 296 histograms->push_back(detach_duration_histogram_); |
410 histograms->push_back(num_tasks_between_waits_histogram_); | 297 histograms->push_back(num_tasks_between_waits_histogram_); |
411 } | 298 } |
412 | 299 |
413 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { | 300 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { |
(...skipping 22 matching lines...) Expand all Loading... | |
436 | 323 |
437 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { | 324 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
438 size_t num_alive_workers = 0; | 325 size_t num_alive_workers = 0; |
439 for (const auto& worker : workers_) { | 326 for (const auto& worker : workers_) { |
440 if (worker->ThreadAliveForTesting()) | 327 if (worker->ThreadAliveForTesting()) |
441 ++num_alive_workers; | 328 ++num_alive_workers; |
442 } | 329 } |
443 return num_alive_workers; | 330 return num_alive_workers; |
444 } | 331 } |
445 | 332 |
446 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | |
447 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
448 SchedulerWorkerPool* worker_pool, | |
449 SchedulerWorker* worker) | |
450 : traits_(traits), | |
451 worker_pool_(worker_pool), | |
452 worker_(worker) { | |
453 DCHECK(worker_pool_); | |
454 DCHECK(worker_); | |
455 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | |
456 RegisterSingleThreadTaskRunner(); | |
457 } | |
458 | |
459 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | |
460 ~SchedulerSingleThreadTaskRunner() { | |
461 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | |
462 UnregisterSingleThreadTaskRunner(); | |
463 } | |
464 | |
465 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 333 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
466 SchedulerWorkerDelegateImpl( | 334 SchedulerWorkerDelegateImpl( |
467 SchedulerWorkerPoolImpl* outer, | 335 SchedulerWorkerPoolImpl* outer, |
468 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 336 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
469 const PriorityQueue* shared_priority_queue, | |
470 int index) | 337 int index) |
471 : outer_(outer), | 338 : outer_(outer), |
472 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 339 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
473 single_threaded_priority_queue_(shared_priority_queue), | |
474 index_(index) {} | 340 index_(index) {} |
475 | 341 |
476 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 342 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
477 ~SchedulerWorkerDelegateImpl() = default; | 343 ~SchedulerWorkerDelegateImpl() = default; |
478 | 344 |
479 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 345 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
480 SchedulerWorker* worker) { | 346 SchedulerWorker* worker) { |
481 #if DCHECK_IS_ON() | 347 #if DCHECK_IS_ON() |
482 // Wait for |outer_->workers_created_| to avoid traversing | 348 // Wait for |outer_->workers_created_| to avoid traversing |
483 // |outer_->workers_| while it is being filled by Initialize(). | 349 // |outer_->workers_| while it is being filled by Initialize(). |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
520 // SchedulerWorker didn't wait on its WaitableEvent since the last time the | 386 // SchedulerWorker didn't wait on its WaitableEvent since the last time the |
521 // histogram was recorded). | 387 // histogram was recorded). |
522 if (last_get_work_returned_nullptr_ && !did_detach_since_last_get_work_) { | 388 if (last_get_work_returned_nullptr_ && !did_detach_since_last_get_work_) { |
523 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_); | 389 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_); |
524 num_tasks_since_last_wait_ = 0; | 390 num_tasks_since_last_wait_ = 0; |
525 } | 391 } |
526 | 392 |
527 scoped_refptr<Sequence> sequence; | 393 scoped_refptr<Sequence> sequence; |
528 { | 394 { |
529 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( | 395 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
530 outer_->shared_priority_queue_.BeginTransaction()); | 396 outer_->shared_priority_queue_.BeginTransaction()); |
gab
2017/03/15 20:20:58
This was the only use case of transaction I believ
robliao
2017/03/15 20:46:44
Correct. This is focused on just SchedulerWorkerPo
gab
2017/03/16 15:34:45
Unless you have a CL doing this now I still prefer
robliao
2017/03/16 22:49:03
In progress at the moment!
gab
2017/03/20 16:40:58
Ok but for future reference a comment in code refe
| |
531 std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( | |
532 single_threaded_priority_queue_.BeginTransaction()); | |
533 | 397 |
534 if (shared_transaction->IsEmpty() && | 398 if (shared_transaction->IsEmpty()) { |
535 single_threaded_transaction->IsEmpty()) { | |
536 single_threaded_transaction.reset(); | |
537 | |
538 // |shared_transaction| is kept alive while |worker| is added to | 399 // |shared_transaction| is kept alive while |worker| is added to |
539 // |idle_workers_stack_| to avoid this race: | 400 // |idle_workers_stack_| to avoid this race: |
540 // 1. This thread creates a Transaction, finds |shared_priority_queue_| | 401 // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
541 // empty and ends the Transaction. | 402 // empty and ends the Transaction. |
542 // 2. Other thread creates a Transaction, inserts a Sequence into | 403 // 2. Other thread creates a Transaction, inserts a Sequence into |
543 // |shared_priority_queue_| and ends the Transaction. This can't happen | 404 // |shared_priority_queue_| and ends the Transaction. This can't happen |
544 // if the Transaction of step 1 is still active because because there | 405 // if the Transaction of step 1 is still active because because there |
545 // can only be one active Transaction per PriorityQueue at a time. | 406 // can only be one active Transaction per PriorityQueue at a time. |
546 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because | 407 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because |
547 // |idle_workers_stack_| is empty. | 408 // |idle_workers_stack_| is empty. |
548 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. | 409 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
549 // No thread runs the Sequence inserted in step 2. | 410 // No thread runs the Sequence inserted in step 2. |
550 outer_->AddToIdleWorkersStack(worker); | 411 outer_->AddToIdleWorkersStack(worker); |
551 if (idle_start_time_.is_null()) | 412 if (idle_start_time_.is_null()) |
552 idle_start_time_ = TimeTicks::Now(); | 413 idle_start_time_ = TimeTicks::Now(); |
553 did_detach_since_last_get_work_ = false; | 414 did_detach_since_last_get_work_ = false; |
554 last_get_work_returned_nullptr_ = true; | 415 last_get_work_returned_nullptr_ = true; |
555 return nullptr; | 416 return nullptr; |
556 } | 417 } |
557 | 418 |
558 // True if both PriorityQueues have Sequences and the Sequence at the top of | 419 sequence = shared_transaction->PopSequence(); |
559 // the shared PriorityQueue is more important. | |
560 const bool shared_sequence_is_more_important = | |
561 !shared_transaction->IsEmpty() && | |
562 !single_threaded_transaction->IsEmpty() && | |
563 shared_transaction->PeekSortKey() > | |
564 single_threaded_transaction->PeekSortKey(); | |
565 | |
566 if (single_threaded_transaction->IsEmpty() || | |
567 shared_sequence_is_more_important) { | |
568 sequence = shared_transaction->PopSequence(); | |
569 last_sequence_is_single_threaded_ = false; | |
570 } else { | |
571 DCHECK(!single_threaded_transaction->IsEmpty()); | |
572 sequence = single_threaded_transaction->PopSequence(); | |
573 last_sequence_is_single_threaded_ = true; | |
574 } | |
575 } | 420 } |
576 DCHECK(sequence); | 421 DCHECK(sequence); |
577 | 422 |
578 outer_->RemoveFromIdleWorkersStack(worker); | 423 outer_->RemoveFromIdleWorkersStack(worker); |
579 idle_start_time_ = TimeTicks(); | 424 idle_start_time_ = TimeTicks(); |
580 did_detach_since_last_get_work_ = false; | 425 did_detach_since_last_get_work_ = false; |
581 last_get_work_returned_nullptr_ = false; | 426 last_get_work_returned_nullptr_ = false; |
582 | 427 |
583 return sequence; | 428 return sequence; |
584 } | 429 } |
585 | 430 |
586 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { | 431 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { |
587 ++num_tasks_since_last_wait_; | 432 ++num_tasks_since_last_wait_; |
588 ++num_tasks_since_last_detach_; | 433 ++num_tasks_since_last_detach_; |
589 } | 434 } |
590 | 435 |
591 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 436 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
592 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 437 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
593 if (last_sequence_is_single_threaded_) { | 438 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
594 // A single-threaded Sequence is always re-enqueued in the single-threaded | 439 // |sequence| must be enqueued. |
595 // PriorityQueue from which it was extracted. | 440 re_enqueue_sequence_callback_.Run(std::move(sequence)); |
596 const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | |
597 single_threaded_priority_queue_.BeginTransaction()->Push( | |
598 std::move(sequence), sequence_sort_key); | |
599 } else { | |
600 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | |
601 // |sequence| must be enqueued. | |
602 re_enqueue_sequence_callback_.Run(std::move(sequence)); | |
603 } | |
604 } | 441 } |
605 | 442 |
606 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 443 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
607 GetSleepTimeout() { | 444 GetSleepTimeout() { |
608 return outer_->suggested_reclaim_time_; | 445 return outer_->suggested_reclaim_time_; |
609 } | 446 } |
610 | 447 |
611 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 448 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
612 SchedulerWorker* worker) { | 449 SchedulerWorker* worker) { |
613 // It's not an issue if |num_single_threaded_runners_| is incremented after | |
614 // this because the newly created SingleThreadTaskRunner (from which no task | |
615 // has run yet) will simply run all its tasks on the next physical thread | |
616 // created by the worker. | |
617 const bool can_detach = | 450 const bool can_detach = |
618 !idle_start_time_.is_null() && | 451 !idle_start_time_.is_null() && |
619 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && | 452 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
620 worker != outer_->PeekAtIdleWorkersStack() && | 453 worker != outer_->PeekAtIdleWorkersStack() && |
621 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && | |
622 outer_->CanWorkerDetachForTesting(); | 454 outer_->CanWorkerDetachForTesting(); |
623 return can_detach; | 455 return can_detach; |
624 } | 456 } |
625 | 457 |
626 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { | 458 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { |
627 DCHECK(!did_detach_since_last_get_work_); | 459 DCHECK(!did_detach_since_last_get_work_); |
628 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); | 460 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); |
629 num_tasks_since_last_detach_ = 0; | 461 num_tasks_since_last_detach_ = 0; |
630 did_detach_since_last_get_work_ = true; | 462 did_detach_since_last_get_work_ = true; |
631 last_detach_time_ = TimeTicks::Now(); | 463 last_detach_time_ = TimeTicks::Now(); |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
693 const bool is_standby_lazy = | 525 const bool is_standby_lazy = |
694 params.standby_thread_policy() == | 526 params.standby_thread_policy() == |
695 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; | 527 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; |
696 const SchedulerWorker::InitialState initial_state = | 528 const SchedulerWorker::InitialState initial_state = |
697 (index == 0 && !is_standby_lazy) | 529 (index == 0 && !is_standby_lazy) |
698 ? SchedulerWorker::InitialState::ALIVE | 530 ? SchedulerWorker::InitialState::ALIVE |
699 : SchedulerWorker::InitialState::DETACHED; | 531 : SchedulerWorker::InitialState::DETACHED; |
700 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | 532 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( |
701 params.priority_hint(), | 533 params.priority_hint(), |
702 MakeUnique<SchedulerWorkerDelegateImpl>( | 534 MakeUnique<SchedulerWorkerDelegateImpl>( |
703 this, re_enqueue_sequence_callback, &shared_priority_queue_, index), | 535 this, re_enqueue_sequence_callback, index), |
704 task_tracker_, initial_state, params.backward_compatibility()); | 536 task_tracker_, initial_state, params.backward_compatibility()); |
705 if (!worker) | 537 if (!worker) |
706 break; | 538 break; |
707 idle_workers_stack_.Push(worker.get()); | 539 idle_workers_stack_.Push(worker.get()); |
708 workers_[index] = std::move(worker); | 540 workers_[index] = std::move(worker); |
709 } | 541 } |
710 | 542 |
711 #if DCHECK_IS_ON() | 543 #if DCHECK_IS_ON() |
712 workers_created_.Signal(); | 544 workers_created_.Signal(); |
713 #endif | 545 #endif |
714 | 546 |
715 return !workers_.empty(); | 547 return !workers_.empty(); |
716 } | 548 } |
717 | 549 |
718 void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) { | |
719 DCHECK(worker); | |
720 RemoveFromIdleWorkersStack(worker); | |
721 worker->WakeUp(); | |
722 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding | |
723 // hysteresis to the CanDetach check. See https://crbug.com/666041. | |
724 } | |
725 | |
726 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 550 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
727 SchedulerWorker* worker; | 551 SchedulerWorker* worker; |
728 { | 552 { |
729 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 553 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
730 worker = idle_workers_stack_.Pop(); | 554 worker = idle_workers_stack_.Pop(); |
731 } | 555 } |
732 if (worker) | 556 if (worker) |
733 worker->WakeUp(); | 557 worker->WakeUp(); |
558 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding | |
gab
2017/03/15 20:20:58
Shouldn't this TODO be in CanDetach()?
robliao
2017/03/15 20:46:44
This was the best place to put it that respected t
| |
559 // hysteresis to the CanDetach check. See https://crbug.com/666041. | |
734 } | 560 } |
735 | 561 |
736 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( | 562 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
737 SchedulerWorker* worker) { | 563 SchedulerWorker* worker) { |
738 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 564 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
739 // Detachment may cause multiple attempts to add because the delegate cannot | 565 // Detachment may cause multiple attempts to add because the delegate cannot |
740 // determine who woke it up. As a result, when it wakes up, it may conclude | 566 // determine who woke it up. As a result, when it wakes up, it may conclude |
741 // there's no work to be done and attempt to add itself to the idle stack | 567 // there's no work to be done and attempt to add itself to the idle stack |
742 // again. | 568 // again. |
743 if (!idle_workers_stack_.Contains(worker)) | 569 if (!idle_workers_stack_.Contains(worker)) |
(...skipping 15 matching lines...) Expand all Loading... | |
759 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 585 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
760 idle_workers_stack_.Remove(worker); | 586 idle_workers_stack_.Remove(worker); |
761 } | 587 } |
762 | 588 |
763 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 589 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
764 return !worker_detachment_disallowed_.IsSet(); | 590 return !worker_detachment_disallowed_.IsSet(); |
765 } | 591 } |
766 | 592 |
767 } // namespace internal | 593 } // namespace internal |
768 } // namespace base | 594 } // namespace base |
OLD | NEW |