Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2743)

Unified Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
new file mode 100644
index 0000000000000000000000000000000000000000..5d8486399f1ffba0dab0717847cde7ad4a3dc9d8
--- /dev/null
+++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
@@ -0,0 +1,236 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
+
+#include <algorithm>
+#include <memory>
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/memory/ptr_util.h"
+#include "base/single_thread_task_runner.h"
+#include "base/strings/stringprintf.h"
+#include "base/synchronization/atomic_flag.h"
+#include "base/task_scheduler/delayed_task_manager.h"
+#include "base/task_scheduler/scheduler_worker.h"
+#include "base/task_scheduler/sequence.h"
+#include "base/task_scheduler/task.h"
+#include "base/task_scheduler/task_tracker.h"
+#include "base/task_scheduler/task_traits.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+
+namespace base {
+namespace internal {
+
+class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
+ : public SingleThreadTaskRunner {
gab 2017/02/21 21:39:50 Cleanup up of previous one will be in a follow-up
robliao 2017/02/22 01:04:18 Added a TODO and a new bug instead of the CL descr
+ public:
+ // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
+ // lifetime of a dedicated |worker| for |traits|.
+ SchedulerSingleThreadTaskRunner(
+ SchedulerSingleThreadTaskRunnerManager* const outer,
+ const TaskTraits& traits,
+ SchedulerWorker* worker)
+ : outer_(outer), traits_(traits), worker_(worker) {
+ DCHECK(outer_);
+ DCHECK(worker_);
+ }
+
+ // SingleThreadTaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override;
+
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ base::TimeDelta delay) override {
+ // Tasks are never nested within the task scheduler.
+ return PostDelayedTask(from_here, closure, delay);
+ }
+
+ bool RunsTasksOnCurrentThread() const override {
+ return sequence_->token() == SequenceToken::GetForCurrentThread();
+ }
+
+ private:
+ ~SchedulerSingleThreadTaskRunner() override {
+ outer_->UnregisterSchedulerWorker(worker_);
+ }
+
+ void PostTaskNow(std::unique_ptr<Task> task);
+
+ // Sequence for all Tasks posted through this TaskRunner.
+ const scoped_refptr<Sequence> sequence_ = new Sequence;
+
+ SchedulerSingleThreadTaskRunnerManager* const outer_;
+ const TaskTraits traits_;
+ SchedulerWorker* const worker_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
+};
+
+class SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate
+ : public SchedulerWorker::Delegate {
+ public:
+ SchedulerWorkerDelegate(const std::string& thread_name)
+ : thread_name_(thread_name) {}
+
+ // SchedulerWorker::Delegate:
+ void OnMainEntry(SchedulerWorker* worker) override {
+ PlatformThread::SetName(thread_name_);
+ }
+
+ scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
+ AutoSchedulerLock auto_lock(sequence_lock_);
+ return std::move(sequence_);
+ }
+
+ void DidRunTask() override {}
+
+ void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
+ AutoSchedulerLock auto_lock(sequence_lock_);
+ DCHECK(!sequence_);
+ sequence_ = std::move(sequence);
+ }
+
+ TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
+
+ bool CanDetach(SchedulerWorker* worker) override {
+ return can_detach_.IsSet();
+ }
+
+ void OnDetach() override {}
+
+ // SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate:
+ void AllowDetach() {
+ can_detach_.Set();
+ }
+
+ private:
+ const std::string thread_name_;
+
+ SchedulerLock sequence_lock_;
gab 2017/02/21 21:39:50 Initially thought this wasn't necessary but then r
robliao 2017/02/22 01:04:18 Done. I emphasized that ReEnqueueSequence() in thi
gab 2017/02/22 18:18:21 I don't think this is unlike the other delegates.
robliao 2017/02/22 21:50:28 We specify in the delegate documentation that all
+ scoped_refptr<Sequence> sequence_;
+
+ AtomicFlag can_detach_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
+};
+
+bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
+ PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) {
+ auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
+ task->single_thread_task_runner_ref = this;
+
+ if (!outer_->task_tracker_->WillPostTask(task.get()))
+ return false;
+
+ if (task->delayed_run_time.is_null()) {
+ PostTaskNow(std::move(task));
+ } else {
+ outer_->delayed_task_manager_->AddDelayedTask(
+ std::move(task),
+ Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this)));
+ }
+ return true;
+}
+
+void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner::
+ PostTaskNow(std::unique_ptr<Task> task) {
+ const bool sequence_was_empty = sequence_->PushTask(std::move(task));
+ if (sequence_was_empty) {
+ auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
+ delegate->ReEnqueueSequence(sequence_);
+ worker_->WakeUp();
+ }
+}
+
+SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
+ const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
+ const TaskScheduler::WorkerPoolIndexForTraitsCallback&
+ worker_pool_index_for_traits_callback,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : worker_pool_params_vector_(worker_pool_params_vector),
+ worker_pool_index_for_traits_callback_(
+ worker_pool_index_for_traits_callback),
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager) {
+ DCHECK_GT(worker_pool_params_vector_.size(), 0U);
+ DCHECK(worker_pool_index_for_traits_callback_);
+ DCHECK(task_tracker_);
+ DCHECK(delayed_task_manager_);
+}
+
+SchedulerSingleThreadTaskRunnerManager::
+ ~SchedulerSingleThreadTaskRunnerManager() = default;
+
+scoped_refptr<SingleThreadTaskRunner>
+SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ size_t index = worker_pool_index_for_traits_callback_.Run(traits);
+ DCHECK_LT(index, worker_pool_params_vector_.size());
+ return new SchedulerSingleThreadTaskRunner(
+ this, traits,
+ CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
+}
+
+void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
+ decltype(workers_) local_workers;
+ {
+ AutoSchedulerLock auto_lock(workers_lock_);
+ local_workers = std::move(workers_);
+ }
+ for (const auto& worker : local_workers)
+ worker->JoinForTesting();
gab 2017/02/21 21:39:50 Empty lines above and below this loop (otherwise i
robliao 2017/02/22 01:04:18 My preference at this point is just to use bracket
+ {
+ AutoSchedulerLock auto_lock(workers_lock_);
+ DCHECK(workers_.empty());
gab 2017/02/21 21:39:50 << "New worker(s) unexpectedly registered during j
robliao 2017/02/22 01:04:18 Done.
+ workers_ = std::move(local_workers);
+ }
+}
+
+SchedulerWorker*
+SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
+ const SchedulerWorkerPoolParams& params) {
+ AutoSchedulerLock auto_lock(workers_lock_);
+ int id = next_worker_id_++;
+ auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
+ "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
+ workers_.emplace_back(SchedulerWorker::Create(
+ params.priority_hint(), std::move(delegate), task_tracker_,
+ SchedulerWorker::InitialState::DETACHED));
+ return workers_.back().get();
+}
+
+void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
+ SchedulerWorker* worker) {
+ // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
+ // |workers_lock_|.
+ scoped_refptr<SchedulerWorker> worker_to_destroy;
+ {
+ AutoSchedulerLock auto_lock(workers_lock_);
+
+ // We might be joining, so no-op this if |workers_| is empty.
+ if (workers_.empty())
+ return;
+
+ auto worker_iter =
+ std::find_if(workers_.begin(), workers_.end(),
+ [worker](const scoped_refptr<SchedulerWorker>& candidate) {
+ return candidate.get() == worker;
+ });
+ DCHECK(worker_iter != workers_.end());
+ worker_to_destroy = std::move(*worker_iter);
+ workers_.erase(worker_iter);
+ }
+ worker_to_destroy->Cleanup();
+}
+
+} // namespace internal
+} // namespace base

Powered by Google App Engine
This is Rietveld 408576698