Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(202)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2721553003: Remove SingleThreadTaskRunner Support from SchedulerWorkerPoolImpl (Closed)
Patch Set: Rebase to 08266b3 Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/atomicops.h" 12 #include "base/atomicops.h"
13 #include "base/bind.h" 13 #include "base/bind.h"
14 #include "base/bind_helpers.h" 14 #include "base/bind_helpers.h"
15 #include "base/lazy_instance.h" 15 #include "base/lazy_instance.h"
16 #include "base/memory/ptr_util.h" 16 #include "base/memory/ptr_util.h"
17 #include "base/metrics/histogram.h" 17 #include "base/metrics/histogram.h"
18 #include "base/sequence_token.h" 18 #include "base/sequence_token.h"
19 #include "base/sequenced_task_runner.h" 19 #include "base/sequenced_task_runner.h"
20 #include "base/single_thread_task_runner.h"
21 #include "base/strings/stringprintf.h" 20 #include "base/strings/stringprintf.h"
22 #include "base/task_runner.h" 21 #include "base/task_runner.h"
23 #include "base/task_scheduler/delayed_task_manager.h" 22 #include "base/task_scheduler/delayed_task_manager.h"
24 #include "base/task_scheduler/scheduler_worker_pool_params.h" 23 #include "base/task_scheduler/scheduler_worker_pool_params.h"
25 #include "base/task_scheduler/task_tracker.h" 24 #include "base/task_scheduler/task_tracker.h"
26 #include "base/task_scheduler/task_traits.h" 25 #include "base/task_scheduler/task_traits.h"
27 #include "base/threading/platform_thread.h" 26 #include "base/threading/platform_thread.h"
28 #include "base/threading/thread_local.h" 27 #include "base/threading/thread_local.h"
29 #include "base/threading/thread_restrictions.h" 28 #include "base/threading/thread_restrictions.h"
30 29
(...skipping 26 matching lines...) Expand all
57 DCHECK(worker_pool_); 56 DCHECK(worker_pool_);
58 } 57 }
59 58
60 // TaskRunner: 59 // TaskRunner:
61 bool PostDelayedTask(const tracked_objects::Location& from_here, 60 bool PostDelayedTask(const tracked_objects::Location& from_here,
62 const Closure& closure, 61 const Closure& closure,
63 TimeDelta delay) override { 62 TimeDelta delay) override {
64 // Post the task as part of a one-off single-task Sequence. 63 // Post the task as part of a one-off single-task Sequence.
65 return worker_pool_->PostTaskWithSequence( 64 return worker_pool_->PostTaskWithSequence(
66 MakeUnique<Task>(from_here, closure, traits_, delay), 65 MakeUnique<Task>(from_here, closure, traits_, delay),
67 make_scoped_refptr(new Sequence), nullptr); 66 make_scoped_refptr(new Sequence));
68 } 67 }
69 68
70 bool RunsTasksOnCurrentThread() const override { 69 bool RunsTasksOnCurrentThread() const override {
71 return tls_current_worker_pool.Get().Get() == worker_pool_; 70 return tls_current_worker_pool.Get().Get() == worker_pool_;
72 } 71 }
73 72
74 private: 73 private:
75 ~SchedulerParallelTaskRunner() override = default; 74 ~SchedulerParallelTaskRunner() override = default;
76 75
77 const TaskTraits traits_; 76 const TaskTraits traits_;
(...skipping 15 matching lines...) Expand all
93 } 92 }
94 93
95 // SequencedTaskRunner: 94 // SequencedTaskRunner:
96 bool PostDelayedTask(const tracked_objects::Location& from_here, 95 bool PostDelayedTask(const tracked_objects::Location& from_here,
97 const Closure& closure, 96 const Closure& closure,
98 TimeDelta delay) override { 97 TimeDelta delay) override {
99 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 98 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
100 task->sequenced_task_runner_ref = this; 99 task->sequenced_task_runner_ref = this;
101 100
102 // Post the task as part of |sequence_|. 101 // Post the task as part of |sequence_|.
103 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 102 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
104 nullptr);
105 } 103 }
106 104
107 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 105 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
108 const Closure& closure, 106 const Closure& closure,
109 base::TimeDelta delay) override { 107 base::TimeDelta delay) override {
110 // Tasks are never nested within the task scheduler. 108 // Tasks are never nested within the task scheduler.
111 return PostDelayedTask(from_here, closure, delay); 109 return PostDelayedTask(from_here, closure, delay);
112 } 110 }
113 111
114 bool RunsTasksOnCurrentThread() const override { 112 bool RunsTasksOnCurrentThread() const override {
(...skipping 19 matching lines...) Expand all
134 const SchedulerWorker* worker) { 132 const SchedulerWorker* worker) {
135 auto it = std::find_if(workers.begin(), workers.end(), 133 auto it = std::find_if(workers.begin(), workers.end(),
136 [worker](const scoped_refptr<SchedulerWorker>& i) { 134 [worker](const scoped_refptr<SchedulerWorker>& i) {
137 return i.get() == worker; 135 return i.get() == worker;
138 }); 136 });
139 return it != workers.end(); 137 return it != workers.end();
140 } 138 }
141 139
142 } // namespace 140 } // namespace
143 141
144 // TODO(http://crbug.com/694823): Remove this and supporting framework.
145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
147 public SingleThreadTaskRunner {
148 public:
149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
150 // tasks so long as |worker_pool| and |worker| are alive.
151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
152 // and |worker|.
153 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
154 SchedulerWorkerPool* worker_pool,
155 SchedulerWorker* worker);
156
157 // SingleThreadTaskRunner:
158 bool PostDelayedTask(const tracked_objects::Location& from_here,
159 const Closure& closure,
160 TimeDelta delay) override {
161 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
162 task->single_thread_task_runner_ref = this;
163
164 // Post the task to be executed by |worker_| as part of |sequence_|.
165 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
166 worker_);
167 }
168
169 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
170 const Closure& closure,
171 base::TimeDelta delay) override {
172 // Tasks are never nested within the task scheduler.
173 return PostDelayedTask(from_here, closure, delay);
174 }
175
176 bool RunsTasksOnCurrentThread() const override {
177 // Even though this is a SingleThreadTaskRunner, test the actual sequence
178 // instead of the assigned worker so that another task randomly assigned
179 // to the same worker doesn't return true by happenstance.
180 return sequence_->token() == SequenceToken::GetForCurrentThread();
181 }
182
183 private:
184 ~SchedulerSingleThreadTaskRunner() override;
185
186 // Sequence for all Tasks posted through this TaskRunner.
187 const scoped_refptr<Sequence> sequence_ = new Sequence;
188
189 const TaskTraits traits_;
190 SchedulerWorkerPool* const worker_pool_;
191 SchedulerWorker* const worker_;
192
193 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
194 };
195
196 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl 142 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
197 : public SchedulerWorker::Delegate { 143 : public SchedulerWorker::Delegate {
198 public: 144 public:
199 // |outer| owns the worker for which this delegate is constructed. 145 // |outer| owns the worker for which this delegate is constructed.
200 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 146 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
201 // called with a non-single-threaded Sequence. |shared_priority_queue| is a 147 // called. |index| will be appended to the pool name to label the underlying
202 // PriorityQueue whose transactions may overlap with the worker's 148 // worker threads.
203 // single-threaded PriorityQueue's transactions. |index| will be appended to
204 // the pool name to label the underlying worker threads.
205 SchedulerWorkerDelegateImpl( 149 SchedulerWorkerDelegateImpl(
206 SchedulerWorkerPoolImpl* outer, 150 SchedulerWorkerPoolImpl* outer,
207 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 151 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
208 const PriorityQueue* shared_priority_queue,
209 int index); 152 int index);
210 ~SchedulerWorkerDelegateImpl() override; 153 ~SchedulerWorkerDelegateImpl() override;
211 154
212 PriorityQueue* single_threaded_priority_queue() {
213 return &single_threaded_priority_queue_;
214 }
215
216 // SchedulerWorker::Delegate: 155 // SchedulerWorker::Delegate:
217 void OnMainEntry(SchedulerWorker* worker) override; 156 void OnMainEntry(SchedulerWorker* worker) override;
218 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; 157 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
219 void DidRunTask() override; 158 void DidRunTask() override;
220 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; 159 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
221 TimeDelta GetSleepTimeout() override; 160 TimeDelta GetSleepTimeout() override;
222 bool CanDetach(SchedulerWorker* worker) override; 161 bool CanDetach(SchedulerWorker* worker) override;
223 void OnDetach() override; 162 void OnDetach() override;
224 163
225 void RegisterSingleThreadTaskRunner() {
226 // No barrier as barriers only affect sequential consistency which is
227 // irrelevant in a single variable use case (they don't force an immediate
228 // flush anymore than atomics do by default).
229 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
230 }
231
232 void UnregisterSingleThreadTaskRunner() {
233 subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
234 }
235
236 private: 164 private:
237 SchedulerWorkerPoolImpl* outer_; 165 SchedulerWorkerPoolImpl* outer_;
238 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; 166 const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
239 167
240 // Single-threaded PriorityQueue for the worker.
241 PriorityQueue single_threaded_priority_queue_;
242
243 // True if the last Sequence returned by GetWork() was extracted from
244 // |single_threaded_priority_queue_|.
245 bool last_sequence_is_single_threaded_ = false;
246
247 // Time of the last detach. 168 // Time of the last detach.
248 TimeTicks last_detach_time_; 169 TimeTicks last_detach_time_;
249 170
250 // Time when GetWork() first returned nullptr. 171 // Time when GetWork() first returned nullptr.
251 TimeTicks idle_start_time_; 172 TimeTicks idle_start_time_;
252 173
253 // Indicates whether the last call to GetWork() returned nullptr. 174 // Indicates whether the last call to GetWork() returned nullptr.
254 bool last_get_work_returned_nullptr_ = false; 175 bool last_get_work_returned_nullptr_ = false;
255 176
256 // Indicates whether the SchedulerWorker was detached since the last call to 177 // Indicates whether the SchedulerWorker was detached since the last call to
257 // GetWork(). 178 // GetWork().
258 bool did_detach_since_last_get_work_ = false; 179 bool did_detach_since_last_get_work_ = false;
259 180
260 // Number of tasks executed since the last time the 181 // Number of tasks executed since the last time the
261 // TaskScheduler.NumTasksBetweenWaits histogram was recorded. 182 // TaskScheduler.NumTasksBetweenWaits histogram was recorded.
262 size_t num_tasks_since_last_wait_ = 0; 183 size_t num_tasks_since_last_wait_ = 0;
263 184
264 // Number of tasks executed since the last time the 185 // Number of tasks executed since the last time the
265 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. 186 // TaskScheduler.NumTasksBeforeDetach histogram was recorded.
266 size_t num_tasks_since_last_detach_ = 0; 187 size_t num_tasks_since_last_detach_ = 0;
267 188
268 subtle::Atomic32 num_single_threaded_runners_ = 0;
269
270 const int index_; 189 const int index_;
271 190
272 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); 191 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
273 }; 192 };
274 193
275 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { 194 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
276 // SchedulerWorkerPool should never be deleted in production unless its 195 // SchedulerWorkerPool should never be deleted in production unless its
277 // initialization failed. 196 // initialization failed.
278 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); 197 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
279 } 198 }
(...skipping 15 matching lines...) Expand all
295 const TaskTraits& traits) { 214 const TaskTraits& traits) {
296 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); 215 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
297 } 216 }
298 217
299 scoped_refptr<SequencedTaskRunner> 218 scoped_refptr<SequencedTaskRunner>
300 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( 219 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
301 const TaskTraits& traits) { 220 const TaskTraits& traits) {
302 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); 221 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
303 } 222 }
304 223
305 scoped_refptr<SingleThreadTaskRunner>
306 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits(
307 const TaskTraits& traits) {
308 // TODO(fdoray): Find a way to take load into account when assigning a
309 // SchedulerWorker to a SingleThreadTaskRunner.
310 size_t worker_index;
311 {
312 AutoSchedulerLock auto_lock(next_worker_index_lock_);
313 worker_index = next_worker_index_;
314 next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
315 }
316 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
317 traits, this, workers_[worker_index].get()));
318 }
319
320 void SchedulerWorkerPoolImpl::ReEnqueueSequence( 224 void SchedulerWorkerPoolImpl::ReEnqueueSequence(
321 scoped_refptr<Sequence> sequence, 225 scoped_refptr<Sequence> sequence,
322 const SequenceSortKey& sequence_sort_key) { 226 const SequenceSortKey& sequence_sort_key) {
323 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), 227 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
324 sequence_sort_key); 228 sequence_sort_key);
325 229
326 // The thread calling this method just ran a Task from |sequence| and will 230 // The thread calling this method just ran a Task from |sequence| and will
327 // soon try to get another Sequence from which to run a Task. If the thread 231 // soon try to get another Sequence from which to run a Task. If the thread
328 // belongs to this pool, it will get that Sequence from 232 // belongs to this pool, it will get that Sequence from
329 // |shared_priority_queue_|. When that's the case, there is no need to wake up 233 // |shared_priority_queue_|. When that's the case, there is no need to wake up
330 // another worker after |sequence| is inserted in |shared_priority_queue_|. If 234 // another worker after |sequence| is inserted in |shared_priority_queue_|. If
331 // we did wake up another worker, we would waste resources by having more 235 // we did wake up another worker, we would waste resources by having more
332 // workers trying to get a Sequence from |shared_priority_queue_| than the 236 // workers trying to get a Sequence from |shared_priority_queue_| than the
333 // number of Sequences in it. 237 // number of Sequences in it.
334 if (tls_current_worker_pool.Get().Get() != this) 238 if (tls_current_worker_pool.Get().Get() != this)
335 WakeUpOneWorker(); 239 WakeUpOneWorker();
336 } 240 }
337 241
338 bool SchedulerWorkerPoolImpl::PostTaskWithSequence( 242 bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
339 std::unique_ptr<Task> task, 243 std::unique_ptr<Task> task,
340 scoped_refptr<Sequence> sequence, 244 scoped_refptr<Sequence> sequence) {
341 SchedulerWorker* worker) {
342 DCHECK(task); 245 DCHECK(task);
343 DCHECK(sequence); 246 DCHECK(sequence);
344 DCHECK(!worker || ContainsWorker(workers_, worker));
345 247
346 if (!task_tracker_->WillPostTask(task.get())) 248 if (!task_tracker_->WillPostTask(task.get()))
347 return false; 249 return false;
348 250
349 if (task->delayed_run_time.is_null()) { 251 if (task->delayed_run_time.is_null()) {
350 PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); 252 PostTaskWithSequenceNow(std::move(task), std::move(sequence));
351 } else { 253 } else {
352 delayed_task_manager_->AddDelayedTask( 254 delayed_task_manager_->AddDelayedTask(
353 std::move(task), 255 std::move(task),
354 Bind( 256 Bind(
355 [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker, 257 [](scoped_refptr<Sequence> sequence,
356 SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) { 258 SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) {
357 worker_pool->PostTaskWithSequenceNow(std::move(task), 259 worker_pool->PostTaskWithSequenceNow(std::move(task),
358 std::move(sequence), worker); 260 std::move(sequence));
359 }, 261 },
360 std::move(sequence), Unretained(worker), Unretained(this))); 262 std::move(sequence), Unretained(this)));
361 } 263 }
362 264
363 return true; 265 return true;
364 } 266 }
365 267
366 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( 268 void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
367 std::unique_ptr<Task> task, 269 std::unique_ptr<Task> task,
368 scoped_refptr<Sequence> sequence, 270 scoped_refptr<Sequence> sequence) {
369 SchedulerWorker* worker) {
370 DCHECK(task); 271 DCHECK(task);
371 DCHECK(sequence); 272 DCHECK(sequence);
372 DCHECK(!worker || ContainsWorker(workers_, worker));
373 273
374 // Confirm that |task| is ready to run (its delayed run time is either null or 274 // Confirm that |task| is ready to run (its delayed run time is either null or
375 // in the past). 275 // in the past).
376 DCHECK_LE(task->delayed_run_time, TimeTicks::Now()); 276 DCHECK_LE(task->delayed_run_time, TimeTicks::Now());
377 277
378 // Because |worker| belongs to this worker pool, we know that the type
379 // of its delegate is SchedulerWorkerDelegateImpl.
380 PriorityQueue* const priority_queue =
381 worker
382 ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate())
383 ->single_threaded_priority_queue()
384 : &shared_priority_queue_;
385 DCHECK(priority_queue);
386
387 const bool sequence_was_empty = sequence->PushTask(std::move(task)); 278 const bool sequence_was_empty = sequence->PushTask(std::move(task));
388 if (sequence_was_empty) { 279 if (sequence_was_empty) {
389 // Insert |sequence| in |priority_queue| if it was empty before |task| was 280 // Insert |sequence| in |shared_priority_queue_| if it was empty before
390 // inserted into it. Otherwise, one of these must be true: 281 // |task| was inserted into it. Otherwise, one of these must be true:
391 // - |sequence| is already in a PriorityQueue (not necessarily 282 // - |sequence| is already in a PriorityQueue, or,
392 // |shared_priority_queue_|), or,
393 // - A worker is running a Task from |sequence|. It will insert |sequence| 283 // - A worker is running a Task from |sequence|. It will insert |sequence|
394 // in a PriorityQueue once it's done running the Task. 284 // in a PriorityQueue once it's done running the Task.
395 const auto sequence_sort_key = sequence->GetSortKey(); 285 const auto sequence_sort_key = sequence->GetSortKey();
396 priority_queue->BeginTransaction()->Push(std::move(sequence), 286 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
397 sequence_sort_key); 287 sequence_sort_key);
398 288
399 // Wake up a worker to process |sequence|. 289 // Wake up a worker to process |sequence|.
gab 2017/03/15 20:20:58 rm now redundant comment
robliao 2017/03/15 20:46:44 This comment is consistent with the current style
gab 2017/03/16 15:34:46 Hmm ok, we at least shouldn't add more (and I'm ha
robliao 2017/03/16 22:49:03 While I agree with your opinion, the style guide i
400 if (worker) 290 WakeUpOneWorker();
401 WakeUpWorker(worker);
402 else
403 WakeUpOneWorker();
404 } 291 }
405 } 292 }
406 293
407 void SchedulerWorkerPoolImpl::GetHistograms( 294 void SchedulerWorkerPoolImpl::GetHistograms(
408 std::vector<const HistogramBase*>* histograms) const { 295 std::vector<const HistogramBase*>* histograms) const {
409 histograms->push_back(detach_duration_histogram_); 296 histograms->push_back(detach_duration_histogram_);
410 histograms->push_back(num_tasks_between_waits_histogram_); 297 histograms->push_back(num_tasks_between_waits_histogram_);
411 } 298 }
412 299
413 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { 300 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const {
(...skipping 22 matching lines...) Expand all
436 323
437 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { 324 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
438 size_t num_alive_workers = 0; 325 size_t num_alive_workers = 0;
439 for (const auto& worker : workers_) { 326 for (const auto& worker : workers_) {
440 if (worker->ThreadAliveForTesting()) 327 if (worker->ThreadAliveForTesting())
441 ++num_alive_workers; 328 ++num_alive_workers;
442 } 329 }
443 return num_alive_workers; 330 return num_alive_workers;
444 } 331 }
445 332
446 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
447 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
448 SchedulerWorkerPool* worker_pool,
449 SchedulerWorker* worker)
450 : traits_(traits),
451 worker_pool_(worker_pool),
452 worker_(worker) {
453 DCHECK(worker_pool_);
454 DCHECK(worker_);
455 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
456 RegisterSingleThreadTaskRunner();
457 }
458
459 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
460 ~SchedulerSingleThreadTaskRunner() {
461 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
462 UnregisterSingleThreadTaskRunner();
463 }
464
465 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 333 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
466 SchedulerWorkerDelegateImpl( 334 SchedulerWorkerDelegateImpl(
467 SchedulerWorkerPoolImpl* outer, 335 SchedulerWorkerPoolImpl* outer,
468 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 336 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
469 const PriorityQueue* shared_priority_queue,
470 int index) 337 int index)
471 : outer_(outer), 338 : outer_(outer),
472 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 339 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
473 single_threaded_priority_queue_(shared_priority_queue),
474 index_(index) {} 340 index_(index) {}
475 341
476 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 342 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
477 ~SchedulerWorkerDelegateImpl() = default; 343 ~SchedulerWorkerDelegateImpl() = default;
478 344
479 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( 345 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
480 SchedulerWorker* worker) { 346 SchedulerWorker* worker) {
481 #if DCHECK_IS_ON() 347 #if DCHECK_IS_ON()
482 // Wait for |outer_->workers_created_| to avoid traversing 348 // Wait for |outer_->workers_created_| to avoid traversing
483 // |outer_->workers_| while it is being filled by Initialize(). 349 // |outer_->workers_| while it is being filled by Initialize().
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 // SchedulerWorker didn't wait on its WaitableEvent since the last time the 386 // SchedulerWorker didn't wait on its WaitableEvent since the last time the
521 // histogram was recorded). 387 // histogram was recorded).
522 if (last_get_work_returned_nullptr_ && !did_detach_since_last_get_work_) { 388 if (last_get_work_returned_nullptr_ && !did_detach_since_last_get_work_) {
523 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_); 389 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_);
524 num_tasks_since_last_wait_ = 0; 390 num_tasks_since_last_wait_ = 0;
525 } 391 }
526 392
527 scoped_refptr<Sequence> sequence; 393 scoped_refptr<Sequence> sequence;
528 { 394 {
529 std::unique_ptr<PriorityQueue::Transaction> shared_transaction( 395 std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
530 outer_->shared_priority_queue_.BeginTransaction()); 396 outer_->shared_priority_queue_.BeginTransaction());
gab 2017/03/15 20:20:58 This was the only use case of transaction I believ
robliao 2017/03/15 20:46:44 Correct. This is focused on just SchedulerWorkerPo
gab 2017/03/16 15:34:45 Unless you have a CL doing this now I still prefer
robliao 2017/03/16 22:49:03 In progress at the moment!
gab 2017/03/20 16:40:58 Ok but for future reference a comment in code refe
531 std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
532 single_threaded_priority_queue_.BeginTransaction());
533 397
534 if (shared_transaction->IsEmpty() && 398 if (shared_transaction->IsEmpty()) {
535 single_threaded_transaction->IsEmpty()) {
536 single_threaded_transaction.reset();
537
538 // |shared_transaction| is kept alive while |worker| is added to 399 // |shared_transaction| is kept alive while |worker| is added to
539 // |idle_workers_stack_| to avoid this race: 400 // |idle_workers_stack_| to avoid this race:
540 // 1. This thread creates a Transaction, finds |shared_priority_queue_| 401 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
541 // empty and ends the Transaction. 402 // empty and ends the Transaction.
542 // 2. Other thread creates a Transaction, inserts a Sequence into 403 // 2. Other thread creates a Transaction, inserts a Sequence into
543 // |shared_priority_queue_| and ends the Transaction. This can't happen 404 // |shared_priority_queue_| and ends the Transaction. This can't happen
544 // if the Transaction of step 1 is still active because because there 405 // if the Transaction of step 1 is still active because because there
545 // can only be one active Transaction per PriorityQueue at a time. 406 // can only be one active Transaction per PriorityQueue at a time.
546 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because 407 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because
547 // |idle_workers_stack_| is empty. 408 // |idle_workers_stack_| is empty.
548 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. 409 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
549 // No thread runs the Sequence inserted in step 2. 410 // No thread runs the Sequence inserted in step 2.
550 outer_->AddToIdleWorkersStack(worker); 411 outer_->AddToIdleWorkersStack(worker);
551 if (idle_start_time_.is_null()) 412 if (idle_start_time_.is_null())
552 idle_start_time_ = TimeTicks::Now(); 413 idle_start_time_ = TimeTicks::Now();
553 did_detach_since_last_get_work_ = false; 414 did_detach_since_last_get_work_ = false;
554 last_get_work_returned_nullptr_ = true; 415 last_get_work_returned_nullptr_ = true;
555 return nullptr; 416 return nullptr;
556 } 417 }
557 418
558 // True if both PriorityQueues have Sequences and the Sequence at the top of 419 sequence = shared_transaction->PopSequence();
559 // the shared PriorityQueue is more important.
560 const bool shared_sequence_is_more_important =
561 !shared_transaction->IsEmpty() &&
562 !single_threaded_transaction->IsEmpty() &&
563 shared_transaction->PeekSortKey() >
564 single_threaded_transaction->PeekSortKey();
565
566 if (single_threaded_transaction->IsEmpty() ||
567 shared_sequence_is_more_important) {
568 sequence = shared_transaction->PopSequence();
569 last_sequence_is_single_threaded_ = false;
570 } else {
571 DCHECK(!single_threaded_transaction->IsEmpty());
572 sequence = single_threaded_transaction->PopSequence();
573 last_sequence_is_single_threaded_ = true;
574 }
575 } 420 }
576 DCHECK(sequence); 421 DCHECK(sequence);
577 422
578 outer_->RemoveFromIdleWorkersStack(worker); 423 outer_->RemoveFromIdleWorkersStack(worker);
579 idle_start_time_ = TimeTicks(); 424 idle_start_time_ = TimeTicks();
580 did_detach_since_last_get_work_ = false; 425 did_detach_since_last_get_work_ = false;
581 last_get_work_returned_nullptr_ = false; 426 last_get_work_returned_nullptr_ = false;
582 427
583 return sequence; 428 return sequence;
584 } 429 }
585 430
586 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { 431 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
587 ++num_tasks_since_last_wait_; 432 ++num_tasks_since_last_wait_;
588 ++num_tasks_since_last_detach_; 433 ++num_tasks_since_last_detach_;
589 } 434 }
590 435
591 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 436 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
592 ReEnqueueSequence(scoped_refptr<Sequence> sequence) { 437 ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
593 if (last_sequence_is_single_threaded_) { 438 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
594 // A single-threaded Sequence is always re-enqueued in the single-threaded 439 // |sequence| must be enqueued.
595 // PriorityQueue from which it was extracted. 440 re_enqueue_sequence_callback_.Run(std::move(sequence));
596 const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
597 single_threaded_priority_queue_.BeginTransaction()->Push(
598 std::move(sequence), sequence_sort_key);
599 } else {
600 // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
601 // |sequence| must be enqueued.
602 re_enqueue_sequence_callback_.Run(std::move(sequence));
603 }
604 } 441 }
605 442
606 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 443 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
607 GetSleepTimeout() { 444 GetSleepTimeout() {
608 return outer_->suggested_reclaim_time_; 445 return outer_->suggested_reclaim_time_;
609 } 446 }
610 447
611 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( 448 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
612 SchedulerWorker* worker) { 449 SchedulerWorker* worker) {
613 // It's not an issue if |num_single_threaded_runners_| is incremented after
614 // this because the newly created SingleThreadTaskRunner (from which no task
615 // has run yet) will simply run all its tasks on the next physical thread
616 // created by the worker.
617 const bool can_detach = 450 const bool can_detach =
618 !idle_start_time_.is_null() && 451 !idle_start_time_.is_null() &&
619 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && 452 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
620 worker != outer_->PeekAtIdleWorkersStack() && 453 worker != outer_->PeekAtIdleWorkersStack() &&
621 !subtle::NoBarrier_Load(&num_single_threaded_runners_) &&
622 outer_->CanWorkerDetachForTesting(); 454 outer_->CanWorkerDetachForTesting();
623 return can_detach; 455 return can_detach;
624 } 456 }
625 457
626 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { 458 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() {
627 DCHECK(!did_detach_since_last_get_work_); 459 DCHECK(!did_detach_since_last_get_work_);
628 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); 460 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_);
629 num_tasks_since_last_detach_ = 0; 461 num_tasks_since_last_detach_ = 0;
630 did_detach_since_last_get_work_ = true; 462 did_detach_since_last_get_work_ = true;
631 last_detach_time_ = TimeTicks::Now(); 463 last_detach_time_ = TimeTicks::Now();
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
693 const bool is_standby_lazy = 525 const bool is_standby_lazy =
694 params.standby_thread_policy() == 526 params.standby_thread_policy() ==
695 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; 527 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY;
696 const SchedulerWorker::InitialState initial_state = 528 const SchedulerWorker::InitialState initial_state =
697 (index == 0 && !is_standby_lazy) 529 (index == 0 && !is_standby_lazy)
698 ? SchedulerWorker::InitialState::ALIVE 530 ? SchedulerWorker::InitialState::ALIVE
699 : SchedulerWorker::InitialState::DETACHED; 531 : SchedulerWorker::InitialState::DETACHED;
700 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( 532 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create(
701 params.priority_hint(), 533 params.priority_hint(),
702 MakeUnique<SchedulerWorkerDelegateImpl>( 534 MakeUnique<SchedulerWorkerDelegateImpl>(
703 this, re_enqueue_sequence_callback, &shared_priority_queue_, index), 535 this, re_enqueue_sequence_callback, index),
704 task_tracker_, initial_state, params.backward_compatibility()); 536 task_tracker_, initial_state, params.backward_compatibility());
705 if (!worker) 537 if (!worker)
706 break; 538 break;
707 idle_workers_stack_.Push(worker.get()); 539 idle_workers_stack_.Push(worker.get());
708 workers_[index] = std::move(worker); 540 workers_[index] = std::move(worker);
709 } 541 }
710 542
711 #if DCHECK_IS_ON() 543 #if DCHECK_IS_ON()
712 workers_created_.Signal(); 544 workers_created_.Signal();
713 #endif 545 #endif
714 546
715 return !workers_.empty(); 547 return !workers_.empty();
716 } 548 }
717 549
718 void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) {
719 DCHECK(worker);
720 RemoveFromIdleWorkersStack(worker);
721 worker->WakeUp();
722 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
723 // hysteresis to the CanDetach check. See https://crbug.com/666041.
724 }
725
726 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { 550 void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
727 SchedulerWorker* worker; 551 SchedulerWorker* worker;
728 { 552 {
729 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 553 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
730 worker = idle_workers_stack_.Pop(); 554 worker = idle_workers_stack_.Pop();
731 } 555 }
732 if (worker) 556 if (worker)
733 worker->WakeUp(); 557 worker->WakeUp();
558 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
gab 2017/03/15 20:20:58 Shouldn't this TODO be in CanDetach()?
robliao 2017/03/15 20:46:44 This was the best place to put it that respected t
559 // hysteresis to the CanDetach check. See https://crbug.com/666041.
734 } 560 }
735 561
736 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( 562 void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
737 SchedulerWorker* worker) { 563 SchedulerWorker* worker) {
738 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 564 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
739 // Detachment may cause multiple attempts to add because the delegate cannot 565 // Detachment may cause multiple attempts to add because the delegate cannot
740 // determine who woke it up. As a result, when it wakes up, it may conclude 566 // determine who woke it up. As a result, when it wakes up, it may conclude
741 // there's no work to be done and attempt to add itself to the idle stack 567 // there's no work to be done and attempt to add itself to the idle stack
742 // again. 568 // again.
743 if (!idle_workers_stack_.Contains(worker)) 569 if (!idle_workers_stack_.Contains(worker))
(...skipping 15 matching lines...) Expand all
759 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 585 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
760 idle_workers_stack_.Remove(worker); 586 idle_workers_stack_.Remove(worker);
761 } 587 }
762 588
763 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { 589 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
764 return !worker_detachment_disallowed_.IsSet(); 590 return !worker_detachment_disallowed_.IsSet();
765 } 591 }
766 592
767 } // namespace internal 593 } // namespace internal
768 } // namespace base 594 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698