Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1405)

Unified Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2721553003: Remove SingleThreadTaskRunner Support from SchedulerWorkerPoolImpl (Closed)
Patch Set: Rebase to 08266b3 Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
index 9a6a20b741097adf42e8ff7a5ebb6921ceaaeecd..56e1f7b54eb5e55522e94f047dddf0a7333a62ee 100644
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
@@ -17,7 +17,6 @@
#include "base/metrics/histogram.h"
#include "base/sequence_token.h"
#include "base/sequenced_task_runner.h"
-#include "base/single_thread_task_runner.h"
#include "base/strings/stringprintf.h"
#include "base/task_runner.h"
#include "base/task_scheduler/delayed_task_manager.h"
@@ -64,7 +63,7 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// Post the task as part of a one-off single-task Sequence.
return worker_pool_->PostTaskWithSequence(
MakeUnique<Task>(from_here, closure, traits_, delay),
- make_scoped_refptr(new Sequence), nullptr);
+ make_scoped_refptr(new Sequence));
}
bool RunsTasksOnCurrentThread() const override {
@@ -100,8 +99,7 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
task->sequenced_task_runner_ref = this;
// Post the task as part of |sequence_|.
- return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
- nullptr);
+ return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
}
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
@@ -141,78 +139,19 @@ bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers,
} // namespace
-// TODO(http://crbug.com/694823): Remove this and supporting framework.
-// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
-class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
- public SingleThreadTaskRunner {
- public:
- // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
- // tasks so long as |worker_pool| and |worker| are alive.
- // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
- // and |worker|.
- SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
- SchedulerWorkerPool* worker_pool,
- SchedulerWorker* worker);
-
- // SingleThreadTaskRunner:
- bool PostDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- TimeDelta delay) override {
- std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
- task->single_thread_task_runner_ref = this;
-
- // Post the task to be executed by |worker_| as part of |sequence_|.
- return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
- worker_);
- }
-
- bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- base::TimeDelta delay) override {
- // Tasks are never nested within the task scheduler.
- return PostDelayedTask(from_here, closure, delay);
- }
-
- bool RunsTasksOnCurrentThread() const override {
- // Even though this is a SingleThreadTaskRunner, test the actual sequence
- // instead of the assigned worker so that another task randomly assigned
- // to the same worker doesn't return true by happenstance.
- return sequence_->token() == SequenceToken::GetForCurrentThread();
- }
-
- private:
- ~SchedulerSingleThreadTaskRunner() override;
-
- // Sequence for all Tasks posted through this TaskRunner.
- const scoped_refptr<Sequence> sequence_ = new Sequence;
-
- const TaskTraits traits_;
- SchedulerWorkerPool* const worker_pool_;
- SchedulerWorker* const worker_;
-
- DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
-};
-
class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
: public SchedulerWorker::Delegate {
public:
// |outer| owns the worker for which this delegate is constructed.
// |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
- // called with a non-single-threaded Sequence. |shared_priority_queue| is a
- // PriorityQueue whose transactions may overlap with the worker's
- // single-threaded PriorityQueue's transactions. |index| will be appended to
- // the pool name to label the underlying worker threads.
+ // called. |index| will be appended to the pool name to label the underlying
+ // worker threads.
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
- const PriorityQueue* shared_priority_queue,
int index);
~SchedulerWorkerDelegateImpl() override;
- PriorityQueue* single_threaded_priority_queue() {
- return &single_threaded_priority_queue_;
- }
-
// SchedulerWorker::Delegate:
void OnMainEntry(SchedulerWorker* worker) override;
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
@@ -222,28 +161,10 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
bool CanDetach(SchedulerWorker* worker) override;
void OnDetach() override;
- void RegisterSingleThreadTaskRunner() {
- // No barrier as barriers only affect sequential consistency which is
- // irrelevant in a single variable use case (they don't force an immediate
- // flush anymore than atomics do by default).
- subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
- }
-
- void UnregisterSingleThreadTaskRunner() {
- subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
- }
-
private:
SchedulerWorkerPoolImpl* outer_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
- // Single-threaded PriorityQueue for the worker.
- PriorityQueue single_threaded_priority_queue_;
-
- // True if the last Sequence returned by GetWork() was extracted from
- // |single_threaded_priority_queue_|.
- bool last_sequence_is_single_threaded_ = false;
-
// Time of the last detach.
TimeTicks last_detach_time_;
@@ -265,8 +186,6 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// TaskScheduler.NumTasksBeforeDetach histogram was recorded.
size_t num_tasks_since_last_detach_ = 0;
- subtle::Atomic32 num_single_threaded_runners_ = 0;
-
const int index_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
@@ -302,21 +221,6 @@ SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
}
-scoped_refptr<SingleThreadTaskRunner>
-SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits(
- const TaskTraits& traits) {
- // TODO(fdoray): Find a way to take load into account when assigning a
- // SchedulerWorker to a SingleThreadTaskRunner.
- size_t worker_index;
- {
- AutoSchedulerLock auto_lock(next_worker_index_lock_);
- worker_index = next_worker_index_;
- next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
- }
- return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
- traits, this, workers_[worker_index].get()));
-}
-
void SchedulerWorkerPoolImpl::ReEnqueueSequence(
scoped_refptr<Sequence> sequence,
const SequenceSortKey& sequence_sort_key) {
@@ -337,27 +241,25 @@ void SchedulerWorkerPoolImpl::ReEnqueueSequence(
bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
std::unique_ptr<Task> task,
- scoped_refptr<Sequence> sequence,
- SchedulerWorker* worker) {
+ scoped_refptr<Sequence> sequence) {
DCHECK(task);
DCHECK(sequence);
- DCHECK(!worker || ContainsWorker(workers_, worker));
if (!task_tracker_->WillPostTask(task.get()))
return false;
if (task->delayed_run_time.is_null()) {
- PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker);
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence));
} else {
delayed_task_manager_->AddDelayedTask(
std::move(task),
Bind(
- [](scoped_refptr<Sequence> sequence, SchedulerWorker* worker,
+ [](scoped_refptr<Sequence> sequence,
SchedulerWorkerPool* worker_pool, std::unique_ptr<Task> task) {
worker_pool->PostTaskWithSequenceNow(std::move(task),
- std::move(sequence), worker);
+ std::move(sequence));
},
- std::move(sequence), Unretained(worker), Unretained(this)));
+ std::move(sequence), Unretained(this)));
}
return true;
@@ -365,42 +267,27 @@ bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
std::unique_ptr<Task> task,
- scoped_refptr<Sequence> sequence,
- SchedulerWorker* worker) {
+ scoped_refptr<Sequence> sequence) {
DCHECK(task);
DCHECK(sequence);
- DCHECK(!worker || ContainsWorker(workers_, worker));
// Confirm that |task| is ready to run (its delayed run time is either null or
// in the past).
DCHECK_LE(task->delayed_run_time, TimeTicks::Now());
- // Because |worker| belongs to this worker pool, we know that the type
- // of its delegate is SchedulerWorkerDelegateImpl.
- PriorityQueue* const priority_queue =
- worker
- ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate())
- ->single_threaded_priority_queue()
- : &shared_priority_queue_;
- DCHECK(priority_queue);
-
const bool sequence_was_empty = sequence->PushTask(std::move(task));
if (sequence_was_empty) {
- // Insert |sequence| in |priority_queue| if it was empty before |task| was
- // inserted into it. Otherwise, one of these must be true:
- // - |sequence| is already in a PriorityQueue (not necessarily
- // |shared_priority_queue_|), or,
+ // Insert |sequence| in |shared_priority_queue_| if it was empty before
+ // |task| was inserted into it. Otherwise, one of these must be true:
+ // - |sequence| is already in a PriorityQueue, or,
// - A worker is running a Task from |sequence|. It will insert |sequence|
// in a PriorityQueue once it's done running the Task.
const auto sequence_sort_key = sequence->GetSortKey();
- priority_queue->BeginTransaction()->Push(std::move(sequence),
- sequence_sort_key);
+ shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
+ sequence_sort_key);
// Wake up a worker to process |sequence|.
gab 2017/03/15 20:20:58 rm now redundant comment
robliao 2017/03/15 20:46:44 This comment is consistent with the current style
gab 2017/03/16 15:34:46 Hmm ok, we at least shouldn't add more (and I'm ha
robliao 2017/03/16 22:49:03 While I agree with your opinion, the style guide i
- if (worker)
- WakeUpWorker(worker);
- else
- WakeUpOneWorker();
+ WakeUpOneWorker();
}
}
@@ -443,34 +330,13 @@ size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
return num_alive_workers;
}
-SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
- SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
- SchedulerWorkerPool* worker_pool,
- SchedulerWorker* worker)
- : traits_(traits),
- worker_pool_(worker_pool),
- worker_(worker) {
- DCHECK(worker_pool_);
- DCHECK(worker_);
- static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
- RegisterSingleThreadTaskRunner();
-}
-
-SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
- ~SchedulerSingleThreadTaskRunner() {
- static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
- UnregisterSingleThreadTaskRunner();
-}
-
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
- const PriorityQueue* shared_priority_queue,
int index)
: outer_(outer),
re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
- single_threaded_priority_queue_(shared_priority_queue),
index_(index) {}
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
@@ -528,13 +394,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
{
std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
outer_->shared_priority_queue_.BeginTransaction());
gab 2017/03/15 20:20:58 This was the only use case of transaction I believ
robliao 2017/03/15 20:46:44 Correct. This is focused on just SchedulerWorkerPo
gab 2017/03/16 15:34:45 Unless you have a CL doing this now I still prefer
robliao 2017/03/16 22:49:03 In progress at the moment!
gab 2017/03/20 16:40:58 Ok but for future reference a comment in code refe
- std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
- single_threaded_priority_queue_.BeginTransaction());
-
- if (shared_transaction->IsEmpty() &&
- single_threaded_transaction->IsEmpty()) {
- single_threaded_transaction.reset();
+ if (shared_transaction->IsEmpty()) {
// |shared_transaction| is kept alive while |worker| is added to
// |idle_workers_stack_| to avoid this race:
// 1. This thread creates a Transaction, finds |shared_priority_queue_|
@@ -555,23 +416,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
return nullptr;
}
- // True if both PriorityQueues have Sequences and the Sequence at the top of
- // the shared PriorityQueue is more important.
- const bool shared_sequence_is_more_important =
- !shared_transaction->IsEmpty() &&
- !single_threaded_transaction->IsEmpty() &&
- shared_transaction->PeekSortKey() >
- single_threaded_transaction->PeekSortKey();
-
- if (single_threaded_transaction->IsEmpty() ||
- shared_sequence_is_more_important) {
- sequence = shared_transaction->PopSequence();
- last_sequence_is_single_threaded_ = false;
- } else {
- DCHECK(!single_threaded_transaction->IsEmpty());
- sequence = single_threaded_transaction->PopSequence();
- last_sequence_is_single_threaded_ = true;
- }
+ sequence = shared_transaction->PopSequence();
}
DCHECK(sequence);
@@ -590,17 +435,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
- if (last_sequence_is_single_threaded_) {
- // A single-threaded Sequence is always re-enqueued in the single-threaded
- // PriorityQueue from which it was extracted.
- const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
- single_threaded_priority_queue_.BeginTransaction()->Push(
- std::move(sequence), sequence_sort_key);
- } else {
- // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
- // |sequence| must be enqueued.
- re_enqueue_sequence_callback_.Run(std::move(sequence));
- }
+ // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
+ // |sequence| must be enqueued.
+ re_enqueue_sequence_callback_.Run(std::move(sequence));
}
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
@@ -610,15 +447,10 @@ TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
SchedulerWorker* worker) {
- // It's not an issue if |num_single_threaded_runners_| is incremented after
- // this because the newly created SingleThreadTaskRunner (from which no task
- // has run yet) will simply run all its tasks on the next physical thread
- // created by the worker.
const bool can_detach =
!idle_start_time_.is_null() &&
(TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
worker != outer_->PeekAtIdleWorkersStack() &&
- !subtle::NoBarrier_Load(&num_single_threaded_runners_) &&
outer_->CanWorkerDetachForTesting();
return can_detach;
}
@@ -700,7 +532,7 @@ bool SchedulerWorkerPoolImpl::Initialize(
scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create(
params.priority_hint(),
MakeUnique<SchedulerWorkerDelegateImpl>(
- this, re_enqueue_sequence_callback, &shared_priority_queue_, index),
+ this, re_enqueue_sequence_callback, index),
task_tracker_, initial_state, params.backward_compatibility());
if (!worker)
break;
@@ -715,14 +547,6 @@ bool SchedulerWorkerPoolImpl::Initialize(
return !workers_.empty();
}
-void SchedulerWorkerPoolImpl::WakeUpWorker(SchedulerWorker* worker) {
- DCHECK(worker);
- RemoveFromIdleWorkersStack(worker);
- worker->WakeUp();
- // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
- // hysteresis to the CanDetach check. See https://crbug.com/666041.
-}
-
void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
SchedulerWorker* worker;
{
@@ -731,6 +555,8 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
}
if (worker)
worker->WakeUp();
+ // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding
gab 2017/03/15 20:20:58 Shouldn't this TODO be in CanDetach()?
robliao 2017/03/15 20:46:44 This was the best place to put it that respected t
+ // hysteresis to the CanDetach check. See https://crbug.com/666041.
}
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(

Powered by Google App Engine
This is Rietveld 408576698