Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1688)

Unified Diff: base/task_scheduler/scheduler_thread_pool_impl.cc

Issue 1876363004: TaskScheduler [11] Support ExecutionMode::SINGLE_THREADED. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@8_delayed
Patch Set: add #include <algorithm> Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_thread_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_thread_pool_impl.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc
index 599c46be89f514d8e3b4be9a64c36846bf0ce3f0..b64405dd471d50d5e09c8a42ec5d641d0eeac6e6 100644
--- a/base/task_scheduler/scheduler_thread_pool_impl.cc
+++ b/base/task_scheduler/scheduler_thread_pool_impl.cc
@@ -4,14 +4,15 @@
#include "base/task_scheduler/scheduler_thread_pool_impl.h"
+#include <algorithm>
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
-#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/sequenced_task_runner.h"
+#include "base/single_thread_task_runner.h"
#include "base/task_scheduler/delayed_task_manager.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/threading/thread_local.h"
@@ -25,6 +26,10 @@ namespace {
LazyInstance<ThreadLocalPointer<const SchedulerThreadPool>>::Leaky
tls_current_thread_pool = LAZY_INSTANCE_INITIALIZER;
+// SchedulerWorkerThread that owns the current thread, if any.
+LazyInstance<ThreadLocalPointer<const SchedulerWorkerThread>>::Leaky
+ tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
+
// A task runner that runs tasks with the PARALLEL ExecutionMode.
class SchedulerParallelTaskRunner : public TaskRunner {
public:
@@ -42,7 +47,7 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// Post the task as part of a one-off single-task Sequence.
return thread_pool_->PostTaskWithSequence(
WrapUnique(new Task(from_here, closure, traits_, delay)),
- make_scoped_refptr(new Sequence));
+ make_scoped_refptr(new Sequence), nullptr);
}
bool RunsTasksOnCurrentThread() const override {
@@ -72,9 +77,10 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
bool PostDelayedTask(const tracked_objects::Location& from_here,
const Closure& closure,
TimeDelta delay) override {
- // Post the task as part of |sequence|.
+ // Post the task as part of |sequence_|.
return thread_pool_->PostTaskWithSequence(
- WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_);
+ WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_,
+ nullptr);
}
bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
@@ -100,18 +106,88 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
+// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
+class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
+ public:
+ // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
+ // tasks so long as |thread_pool| and |worker_thread| are alive.
+ // TODO(robliao): Find a concrete way to manage the memory of |thread_pool|
+ // and |worker_thread|.
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
+ SchedulerThreadPool* thread_pool,
+ SchedulerWorkerThread* worker_thread)
+ : traits_(traits),
+ thread_pool_(thread_pool),
+ worker_thread_(worker_thread) {}
+
+ // SingleThreadTaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override {
+ // Post the task to be executed by |worker_thread_| as part of |sequence_|.
+ return thread_pool_->PostTaskWithSequence(
+ WrapUnique(new Task(from_here, closure, traits_, delay)), sequence_,
+ worker_thread_);
+ }
+
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ base::TimeDelta delay) override {
+ // Tasks are never nested within the task scheduler.
+ return PostDelayedTask(from_here, closure, delay);
+ }
+
+ bool RunsTasksOnCurrentThread() const override {
+ return tls_current_worker_thread.Get().Get() == worker_thread_;
+ }
+
+ private:
+ ~SchedulerSingleThreadTaskRunner() override = default;
+
+ // Sequence for all Tasks posted through this TaskRunner.
+ const scoped_refptr<Sequence> sequence_ = new Sequence;
+
+ const TaskTraits traits_;
+ SchedulerThreadPool* const thread_pool_;
+ SchedulerWorkerThread* const worker_thread_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
+};
+
+// Only used in DCHECKs.
+bool ContainsWorkerThread(
+ const std::vector<std::unique_ptr<SchedulerWorkerThread>>& worker_threads,
+ const SchedulerWorkerThread* worker_thread) {
+ auto it = std::find_if(
+ worker_threads.begin(), worker_threads.end(),
+ [worker_thread](const std::unique_ptr<SchedulerWorkerThread>& i) {
+ return i.get() == worker_thread;
+ });
+ return it != worker_threads.end();
+}
+
} // namespace
class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
: public SchedulerWorkerThread::Delegate {
public:
+ // |outer| owns the worker thread for which this delegate is constructed.
+ // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
+ // called with a non-single-threaded Sequence. |shared_priority_queue| is a
+ // PriorityQueue whose transactions may overlap with the worker thread's
+ // single-threaded PriorityQueue's transactions.
SchedulerWorkerThreadDelegateImpl(
SchedulerThreadPoolImpl* outer,
- const ReEnqueueSequenceCallback& re_enqueue_sequence_callback);
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
+ const PriorityQueue* shared_priority_queue);
~SchedulerWorkerThreadDelegateImpl() override;
+ PriorityQueue* single_threaded_priority_queue() {
+ return &single_threaded_priority_queue_;
+ }
+
// SchedulerWorkerThread::Delegate:
- void OnMainEntry() override;
+ void OnMainEntry(SchedulerWorkerThread* worker_thread) override;
scoped_refptr<Sequence> GetWork(
SchedulerWorkerThread* worker_thread) override;
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
@@ -120,6 +196,13 @@ class SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl
SchedulerThreadPoolImpl* outer_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
+ // Single-threaded PriorityQueue for the worker thread.
+ PriorityQueue single_threaded_priority_queue_;
+
+ // True if the last Sequence returned by GetWork() was extracted from
+ // |single_threaded_priority_queue_|.
+ bool last_sequence_is_single_threaded_ = false;
+
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerThreadDelegateImpl);
};
@@ -129,6 +212,7 @@ SchedulerThreadPoolImpl::~SchedulerThreadPoolImpl() {
DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
}
+// static
std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create(
ThreadPriority thread_priority,
size_t max_threads,
@@ -168,10 +252,21 @@ scoped_refptr<TaskRunner> SchedulerThreadPoolImpl::CreateTaskRunnerWithTraits(
case ExecutionMode::SEQUENCED:
return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
- case ExecutionMode::SINGLE_THREADED:
- // TODO(fdoray): Support SINGLE_THREADED TaskRunners.
- NOTREACHED();
- return nullptr;
+ case ExecutionMode::SINGLE_THREADED: {
+ // TODO(fdoray): Find a way to take load into account when assigning a
+ // SchedulerWorkerThread to a SingleThreadTaskRunner. Also, this code
+ // assumes that all SchedulerWorkerThreads are alive. Eventually, we might
+ // decide to tear down threads that haven't run tasks for a long time.
+ size_t worker_thread_index;
+ {
+ AutoSchedulerLock auto_lock(next_worker_thread_index_lock_);
+ worker_thread_index = next_worker_thread_index_;
+ next_worker_thread_index_ =
+ (next_worker_thread_index_ + 1) % worker_threads_.size();
+ }
+ return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
+ traits, this, worker_threads_[worker_thread_index].get()));
+ }
}
NOTREACHED();
@@ -199,18 +294,22 @@ void SchedulerThreadPoolImpl::ReEnqueueSequence(
bool SchedulerThreadPoolImpl::PostTaskWithSequence(
std::unique_ptr<Task> task,
- scoped_refptr<Sequence> sequence) {
+ scoped_refptr<Sequence> sequence,
+ SchedulerWorkerThread* worker_thread) {
DCHECK(task);
DCHECK(sequence);
+ DCHECK(!worker_thread ||
+ ContainsWorkerThread(worker_threads_, worker_thread));
if (!task_tracker_->WillPostTask(task.get()))
return false;
if (task->delayed_run_time.is_null()) {
- PostTaskWithSequenceNow(std::move(task), std::move(sequence));
+ PostTaskWithSequenceNow(std::move(task), std::move(sequence),
+ worker_thread);
} else {
delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
- this);
+ worker_thread, this);
}
return true;
@@ -218,79 +317,151 @@ bool SchedulerThreadPoolImpl::PostTaskWithSequence(
void SchedulerThreadPoolImpl::PostTaskWithSequenceNow(
std::unique_ptr<Task> task,
- scoped_refptr<Sequence> sequence) {
+ scoped_refptr<Sequence> sequence,
+ SchedulerWorkerThread* worker_thread) {
DCHECK(task);
DCHECK(sequence);
+ DCHECK(!worker_thread ||
+ ContainsWorkerThread(worker_threads_, worker_thread));
// Confirm that |task| is ready to run (its delayed run time is either null or
// in the past).
DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now());
+ // Because |worker_thread| belongs to this thread pool, we know that the type
+ // of its delegate is SchedulerWorkerThreadDelegateImpl.
+ PriorityQueue* const priority_queue =
+ worker_thread
+ ? static_cast<SchedulerWorkerThreadDelegateImpl*>(
+ worker_thread->delegate())
+ ->single_threaded_priority_queue()
+ : &shared_priority_queue_;
+ DCHECK(priority_queue);
+
const bool sequence_was_empty = sequence->PushTask(std::move(task));
if (sequence_was_empty) {
- // Insert |sequence| in |shared_priority_queue_| if it was empty before
- // |task| was inserted into it. Otherwise, one of these must be true:
+ // Insert |sequence| in |priority_queue| if it was empty before |task| was
+ // inserted into it. Otherwise, one of these must be true:
// - |sequence| is already in a PriorityQueue (not necessarily
// |shared_priority_queue_|), or,
// - A worker thread is running a Task from |sequence|. It will insert
// |sequence| in a PriorityQueue once it's done running the Task.
const auto sequence_sort_key = sequence->GetSortKey();
- shared_priority_queue_.BeginTransaction()->Push(
+ priority_queue->BeginTransaction()->Push(
WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
sequence_sort_key)));
// Wake up a worker thread to process |sequence|.
- WakeUpOneThread();
+ if (worker_thread)
+ worker_thread->WakeUp();
+ else
+ WakeUpOneThread();
}
}
SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
SchedulerWorkerThreadDelegateImpl(
SchedulerThreadPoolImpl* outer,
- const ReEnqueueSequenceCallback& re_enqueue_sequence_callback)
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
+ const PriorityQueue* shared_priority_queue)
: outer_(outer),
- re_enqueue_sequence_callback_(re_enqueue_sequence_callback) {}
+ re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
+ single_threaded_priority_queue_(shared_priority_queue) {}
SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
~SchedulerWorkerThreadDelegateImpl() = default;
-void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry() {
+void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::OnMainEntry(
+ SchedulerWorkerThread* worker_thread) {
+#if DCHECK_IS_ON()
+ // Wait for |outer_->threads_created_| to avoid traversing
+ // |outer_->worker_threads_| while it is being filled by Initialize().
+ outer_->threads_created_.Wait();
+ DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
+#endif
+
+ DCHECK(!tls_current_worker_thread.Get().Get());
DCHECK(!tls_current_thread_pool.Get().Get());
+ tls_current_worker_thread.Get().Set(worker_thread);
tls_current_thread_pool.Get().Set(outer_);
}
scoped_refptr<Sequence>
SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::GetWork(
SchedulerWorkerThread* worker_thread) {
- std::unique_ptr<PriorityQueue::Transaction> transaction(
- outer_->shared_priority_queue_.BeginTransaction());
- const auto& sequence_and_sort_key = transaction->Peek();
-
- if (sequence_and_sort_key.is_null()) {
- // |transaction| is kept alive while |worker_thread| is added to
- // |idle_worker_threads_stack_| to avoid this race:
- // 1. This thread creates a Transaction, finds |shared_priority_queue_|
- // empty and ends the Transaction.
- // 2. Other thread creates a Transaction, inserts a Sequence into
- // |shared_priority_queue_| and ends the Transaction. This can't happen
- // if the Transaction of step 1 is still active because because there can
- // only be one active Transaction per PriorityQueue at a time.
- // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
- // |idle_worker_threads_stack_| is empty.
- // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
- // sleep. No thread runs the Sequence inserted in step 2.
- outer_->AddToIdleWorkerThreadsStack(worker_thread);
- return nullptr;
+ DCHECK(ContainsWorkerThread(outer_->worker_threads_, worker_thread));
+
+ scoped_refptr<Sequence> sequence;
+ {
+ std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
+ outer_->shared_priority_queue_.BeginTransaction());
+ const auto& shared_sequence_and_sort_key = shared_transaction->Peek();
+
+ std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
+ single_threaded_priority_queue_.BeginTransaction());
+ const auto& single_threaded_sequence_and_sort_key =
+ single_threaded_transaction->Peek();
+
+ if (shared_sequence_and_sort_key.is_null() &&
+ single_threaded_sequence_and_sort_key.is_null()) {
+ single_threaded_transaction.reset();
+
+ // |shared_transaction| is kept alive while |worker_thread| is added to
+ // |idle_worker_threads_stack_| to avoid this race:
+ // 1. This thread creates a Transaction, finds |shared_priority_queue_|
+ // empty and ends the Transaction.
+ // 2. Other thread creates a Transaction, inserts a Sequence into
+ // |shared_priority_queue_| and ends the Transaction. This can't happen
+ // if the Transaction of step 1 is still active because because there
+ // can only be one active Transaction per PriorityQueue at a time.
+ // 3. Other thread calls WakeUpOneThread(). No thread is woken up because
+ // |idle_worker_threads_stack_| is empty.
+ // 4. This thread adds itself to |idle_worker_threads_stack_| and goes to
+ // sleep. No thread runs the Sequence inserted in step 2.
+ outer_->AddToIdleWorkerThreadsStack(worker_thread);
+ return nullptr;
+ }
+
+ // True if both PriorityQueues have Sequences and the Sequence at the top of
+ // the shared PriorityQueue is more important.
+ const bool shared_sequence_is_more_important =
+ !shared_sequence_and_sort_key.is_null() &&
+ !single_threaded_sequence_and_sort_key.is_null() &&
+ shared_sequence_and_sort_key.sort_key >
+ single_threaded_sequence_and_sort_key.sort_key;
+
+ if (single_threaded_sequence_and_sort_key.is_null() ||
+ shared_sequence_is_more_important) {
+ sequence = shared_sequence_and_sort_key.sequence;
+ shared_transaction->Pop();
+ last_sequence_is_single_threaded_ = false;
+ } else {
+ DCHECK(!single_threaded_sequence_and_sort_key.is_null());
+ sequence = single_threaded_sequence_and_sort_key.sequence;
+ single_threaded_transaction->Pop();
+ last_sequence_is_single_threaded_ = true;
+ }
}
+ DCHECK(sequence);
- scoped_refptr<Sequence> sequence = sequence_and_sort_key.sequence;
- transaction->Pop();
+ outer_->RemoveFromIdleWorkerThreadsStack(worker_thread);
return sequence;
}
void SchedulerThreadPoolImpl::SchedulerWorkerThreadDelegateImpl::
ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
- re_enqueue_sequence_callback_.Run(std::move(sequence));
+ if (last_sequence_is_single_threaded_) {
+ // A single-threaded Sequence is always re-enqueued in the single-threaded
+ // PriorityQueue from which it was extracted.
+ const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
+ single_threaded_priority_queue_.BeginTransaction()->Push(
+ WrapUnique(new PriorityQueue::SequenceAndSortKey(std::move(sequence),
+ sequence_sort_key)));
+ } else {
+ // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
+ // |sequence| must be enqueued.
+ re_enqueue_sequence_callback_.Run(std::move(sequence));
+ }
}
SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
@@ -300,6 +471,9 @@ SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
idle_worker_threads_stack_cv_for_testing_(
idle_worker_threads_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(true, false),
+#if DCHECK_IS_ON()
+ threads_created_(true, false),
+#endif
task_tracker_(task_tracker),
delayed_task_manager_(delayed_task_manager) {
DCHECK(task_tracker_);
@@ -317,8 +491,9 @@ bool SchedulerThreadPoolImpl::Initialize(
for (size_t i = 0; i < max_threads; ++i) {
std::unique_ptr<SchedulerWorkerThread> worker_thread =
SchedulerWorkerThread::Create(
- thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl(
- this, re_enqueue_sequence_callback)),
+ thread_priority,
+ WrapUnique(new SchedulerWorkerThreadDelegateImpl(
+ this, re_enqueue_sequence_callback, &shared_priority_queue_)),
task_tracker_);
if (!worker_thread)
break;
@@ -326,6 +501,10 @@ bool SchedulerThreadPoolImpl::Initialize(
worker_threads_.push_back(std::move(worker_thread));
}
+#if DCHECK_IS_ON()
+ threads_created_.Signal();
+#endif
+
return !worker_threads_.empty();
}
@@ -349,5 +528,11 @@ void SchedulerThreadPoolImpl::AddToIdleWorkerThreadsStack(
idle_worker_threads_stack_cv_for_testing_->Broadcast();
}
+void SchedulerThreadPoolImpl::RemoveFromIdleWorkerThreadsStack(
+ SchedulerWorkerThread* worker_thread) {
+ AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
+ idle_worker_threads_stack_.Remove(worker_thread);
+}
+
} // namespace internal
} // namespace base
« no previous file with comments | « base/task_scheduler/scheduler_thread_pool_impl.h ('k') | base/task_scheduler/scheduler_thread_pool_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698