Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4181)

Unified Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2686593003: DESIGN DISCUSSION ONLY Task Scheduler Single Thread Task Runner Manager for Dedicated Threads per S… (Closed)
Patch Set: Wait for Detached Thread to Complete Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
new file mode 100644
index 0000000000000000000000000000000000000000..c3c9a557395a5762536c3a04e8c0c3e1f2d56b0f
--- /dev/null
+++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
@@ -0,0 +1,266 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
+
+#include <algorithm>
+
+#include "base/bind.h"
+#include "base/memory/ptr_util.h"
+#include "base/single_thread_task_runner.h"
+#include "base/strings/stringprintf.h"
+#include "base/task_scheduler/delayed_task_manager.h"
+#include "base/task_scheduler/scheduler_worker.h"
+#include "base/task_scheduler/sequence.h"
+#include "base/task_scheduler/task.h"
+#include "base/task_scheduler/task_tracker.h"
+#include "base/task_scheduler/task_traits.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+
+namespace base {
+namespace internal {
+
+class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
+ : public SingleThreadTaskRunner {
+ public:
+ // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
+ // tasks so long as |worker_pool| and |worker| are alive.
fdoray 2017/02/08 17:59:01 Update comment (no more |worker_pool|).
robliao 2017/02/10 00:04:04 Done.
+ // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
+ // and |worker|.
+ SchedulerSingleThreadTaskRunner(
+ SchedulerSingleThreadTaskRunnerManager* const outer,
+ const TaskTraits& traits,
+ SchedulerWorker* worker)
+ : outer_(outer), traits_(traits), worker_(worker) {
+ DCHECK(outer_);
+ DCHECK(worker_);
+ }
+
+ // SingleThreadTaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override;
+
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ base::TimeDelta delay) override {
+ // Tasks are never nested within the task scheduler.
+ return PostDelayedTask(from_here, closure, delay);
+ }
+
+ bool RunsTasksOnCurrentThread() const override {
+ // Even though this is a SingleThreadTaskRunner, test the actual sequence
+ // instead of the assigned worker so that another task randomly assigned
+ // to the same worker doesn't return true by happenstance.
+ return sequence_->token() == SequenceToken::GetForCurrentThread();
fdoray 2017/02/08 17:59:01 "another task randomly assigned to the same worker
robliao 2017/02/10 00:04:04 Done.
+ }
+
+ private:
+ ~SchedulerSingleThreadTaskRunner() override {
+ outer_->UnregisterSchedulerWorker(worker_);
+ }
+
+ void PostTaskNow(std::unique_ptr<Task> task);
+
+ // Sequence for all Tasks posted through this TaskRunner.
+ const scoped_refptr<Sequence> sequence_ = new Sequence;
+
+ SchedulerSingleThreadTaskRunnerManager* const outer_;
+ const TaskTraits traits_;
+ SchedulerWorker* const worker_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
+};
+
+class SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate
+ : public SchedulerWorker::Delegate {
+ public:
+ SchedulerWorkerDelegate(SchedulerSingleThreadTaskRunnerManager* const outer,
+ const std::string& thread_name)
+ : outer_(outer), thread_name_(thread_name) {}
+
+ // SchedulerWorker::Delegate:
+ void OnMainEntry(SchedulerWorker* worker) override {
+ PlatformThread::SetName(thread_name_);
+ }
+
+ scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
+ if (!sequence_)
+ outer_->IncrementIdleWorkerCount();
+
+ return std::move(sequence_);
+ }
+
+ void DidRunTask() override {}
+
+ void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
+ AutoSchedulerLock auto_lock(lock_);
+ DCHECK(!sequence_);
+ sequence_ = std::move(sequence);
+ }
+
+ TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
+
+ bool CanDetach(SchedulerWorker* worker) override {
+ AutoSchedulerLock auto_lock(can_detach_lock_);
+ return can_detach_;
+ }
+
+ void OnDetach() override {}
+
+ void AllowDetach() {
+ AutoSchedulerLock auto_lock(can_detach_lock_);
+ can_detach_ = true;
+ }
+
+ private:
+ SchedulerSingleThreadTaskRunnerManager* const outer_;
+ const std::string thread_name_;
+
+ SchedulerLock lock_;
fdoray 2017/02/08 17:59:01 // Contains a reference to the Sequence from which
robliao 2017/02/10 00:04:04 Yep. I'll be setting these comments in the next CL
+ scoped_refptr<Sequence> sequence_;
+
+ SchedulerLock can_detach_lock_;
fdoray 2017/02/08 17:59:01 Replace bool+lock with AtomicFlag.
robliao 2017/02/10 00:04:04 AtomicFlag has a sequence checker that doesn't qui
+ bool can_detach_ = false;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
+};
+
+bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
+ PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) {
+ std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
fdoray 2017/02/08 17:59:01 MakeUnique
robliao 2017/02/10 00:04:04 Done.
+ task->single_thread_task_runner_ref = this;
+
+ if (!outer_->task_tracker_->WillPostTask(task.get()))
+ return false;
+
+ if (task->delayed_run_time.is_null()) {
+ PostTaskNow(std::move(task));
+ } else {
+ outer_->delayed_task_manager_->AddDelayedTask(
+ std::move(task),
+ Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
+ }
+ return true;
+}
+
+void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
+ PostTaskNow(std::unique_ptr<Task> task) {
+ const bool sequence_was_empty = sequence_->PushTask(std::move(task));
+ if (sequence_was_empty) {
+ auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
+ delegate->ReEnqueueSequence(sequence_);
+ outer_->DecrementIdleWorkerCount();
+ worker_->WakeUp();
+ }
+}
+
+SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
+ const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
+ const TaskScheduler::WorkerPoolIndexForTraitsCallback&
+ worker_pool_index_for_traits_callback,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : worker_pool_params_vector_(worker_pool_params_vector),
+ worker_pool_index_for_traits_callback_(
+ worker_pool_index_for_traits_callback),
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager),
+ lock_(),
+ idle_workers_cv_for_testing_(lock_.CreateConditionVariable()) {
+ DCHECK_GT(worker_pool_params_vector_.size(), 0U);
+ DCHECK(worker_pool_index_for_traits_callback_);
+ DCHECK(task_tracker_);
+ DCHECK(delayed_task_manager_);
+}
+
+SchedulerSingleThreadTaskRunnerManager::
+ ~SchedulerSingleThreadTaskRunnerManager() = default;
+
+scoped_refptr<SingleThreadTaskRunner>
+SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ size_t index = worker_pool_index_for_traits_callback_.Run(traits);
+ DCHECK_LT(index, worker_pool_params_vector_.size());
+ return new SchedulerSingleThreadTaskRunner(
+ this, traits,
+ CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
+}
+
+void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
+ decltype(workers_) local_workers;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ local_workers = std::move(workers_);
+ }
+ for (const auto& worker : local_workers)
+ worker->JoinForTesting();
+ {
+ AutoSchedulerLock auto_lock(lock_);
fdoray 2017/02/08 17:59:01 DCHECK(workers_.empty());
robliao 2017/02/10 00:04:04 Done.
+ workers_ = std::move(local_workers);
+ }
+}
+
+void SchedulerSingleThreadTaskRunnerManager::WaitForAllWorkersIdleForTesting() {
+ AutoSchedulerLock auto_lock(lock_);
+ while (static_cast<size_t>(subtle::NoBarrier_Load(&idle_workers_)) <
+ workers_.size()) {
+ idle_workers_cv_for_testing_->Wait();
+ }
+}
+
+SchedulerWorker*
+SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
+ const SchedulerWorkerPoolParams& params) {
+ AutoSchedulerLock auto_lock(lock_);
+ int count = workers_.size();
+ auto delegate = MakeUnique<SchedulerWorkerDelegate>(
+ this, base::StringPrintf("TaskSchedulerSingleThreadWorker%d%s", count,
+ params.name().c_str()));
+ workers_.emplace_back(SchedulerWorker::Create(
+ params.priority_hint(), std::move(delegate), task_tracker_,
+ SchedulerWorker::InitialState::DETACHED));
+ // This avoids calling IncrementIdleWorkerCount as we're adding a worker but
+ // we don't want to signal to any CVs that we're done.
+ subtle::NoBarrier_AtomicIncrement(&idle_workers_, 1);
+ return workers_.back().get();
+}
+
+void SchedulerSingleThreadTaskRunnerManager::IncrementIdleWorkerCount() {
+ subtle::Atomic32 idle_workers =
+ subtle::NoBarrier_AtomicIncrement(&idle_workers_, 1);
+ AutoSchedulerLock auto_lock(lock_);
+ if (static_cast<size_t>(idle_workers) == workers_.size())
+ idle_workers_cv_for_testing_->Broadcast();
+}
+
+void SchedulerSingleThreadTaskRunnerManager::DecrementIdleWorkerCount() {
+ subtle::NoBarrier_AtomicIncrement(&idle_workers_, -1);
+}
+
+void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
+ SchedulerWorker* worker) {
+ std::unique_ptr<SchedulerWorker> worker_to_destroy;
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ auto worker_iter = std::find_if(
+ workers_.begin(), workers_.end(),
+ [worker](const std::unique_ptr<SchedulerWorker>& candidate) {
+ return candidate.get() == worker;
+ });
+ DCHECK(worker_iter != workers_.end());
+ worker_to_destroy = std::move(*worker_iter);
+ workers_.erase(worker_iter);
fdoray 2017/02/08 17:59:01 If we erase a worker in the middle of the vector,
robliao 2017/02/10 00:04:04 That sounds fine to me. We'll have a monotonically
+ DecrementIdleWorkerCount();
+ }
+ auto delegate = static_cast<SchedulerWorkerDelegate*>(worker->delegate());
+ delegate->AllowDetach();
+ SchedulerWorker::DestroyAfterDetachment(std::move(worker_to_destroy));
+}
+
+} // namespace internal
+} // namespace base

Powered by Google App Engine
This is Rietveld 408576698