Chromium Code Reviews| Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..1c136592e25c61b7fecd33cc37d0a670dcaa7d30 |
| --- /dev/null |
| +++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc |
| @@ -0,0 +1,267 @@ |
| +// Copyright 2017 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
| + |
| +#include <algorithm> |
| +#include <memory> |
| +#include <string> |
| + |
| +#include "base/bind.h" |
| +#include "base/callback.h" |
| +#include "base/memory/ptr_util.h" |
| +#include "base/single_thread_task_runner.h" |
| +#include "base/strings/stringprintf.h" |
| +#include "base/synchronization/atomic_flag.h" |
| +#include "base/task_scheduler/delayed_task_manager.h" |
| +#include "base/task_scheduler/scheduler_worker.h" |
| +#include "base/task_scheduler/sequence.h" |
| +#include "base/task_scheduler/task.h" |
| +#include "base/task_scheduler/task_tracker.h" |
| +#include "base/task_scheduler/task_traits.h" |
| +#include "base/threading/platform_thread.h" |
| +#include "base/time/time.h" |
| + |
| +namespace base { |
| +namespace internal { |
| + |
| +namespace { |
| + |
| +class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { |
| + public: |
| + SchedulerWorkerDelegate(const std::string& thread_name) |
| + : thread_name_(thread_name) {} |
| + |
| + // SchedulerWorker::Delegate: |
| + void OnMainEntry(SchedulerWorker* worker) override { |
| + { |
| + AutoSchedulerLock auto_lock(thread_ref_lock_); |
| + thread_ref_ = PlatformThread::CurrentRef(); |
| + } |
| + PlatformThread::SetName(thread_name_); |
| + } |
| + |
| + scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
| + AutoSchedulerLock auto_lock(sequence_lock_); |
| + return std::move(sequence_); |
| + } |
| + |
| + void DidRunTask() override {} |
| + |
| + void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { |
| + AutoSchedulerLock auto_lock(sequence_lock_); |
| + DCHECK(!sequence_); |
| + sequence_ = std::move(sequence); |
| + } |
| + |
| + TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
| + |
| + bool CanDetach(SchedulerWorker* worker) override { |
| + return can_detach_.IsSet(); |
| + } |
| + |
| + void OnDetach() override { |
| + AutoSchedulerLock auto_lock(thread_ref_lock_); |
| + thread_ref_ = PlatformThreadRef(); |
|
gab
2017/02/23 19:48:10
Actually, shouldn't we NOTREACHED() here? It doesn
robliao
2017/02/23 21:26:06
Indeed. You found a vestige of the std::unique_ptr
|
| + } |
| + |
| + // SchedulerSingleThreadTaskRunnerManager::SchedulerWorkerDelegate: |
| + void AllowDetach() { can_detach_.Set(); } |
| + |
| + bool RunsTasksOnCurrentThread() { |
| + AutoSchedulerLock auto_lock(thread_ref_lock_); |
| + return thread_ref_ == PlatformThread::CurrentRef(); |
| + } |
| + |
| + private: |
| + const std::string thread_name_; |
| + |
| + // Synchronizes access to |sequence_| and handles the fact that |
| + // ReEnqueueSequence() is called on both the worker thread for reenqueuing |
| + // the sequence and off of the worker thread to seed the sequence for |
| + // GetWork(). |
| + SchedulerLock sequence_lock_; |
| + scoped_refptr<Sequence> sequence_; |
| + |
| + AtomicFlag can_detach_; |
| + |
| + // Synchronizes access to |thread_ref_|. |
| + SchedulerLock thread_ref_lock_; |
|
gab
2017/02/23 19:48:10
Meh, I don't like locking for this.
But, also, I
robliao
2017/02/23 21:26:06
Done via AtomicFlag instead of WaitableEvent. (Wai
|
| + PlatformThreadRef thread_ref_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
| +}; |
| + |
| +} // namespace |
| + |
| +class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| + : public SingleThreadTaskRunner { |
| + public: |
| + // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
| + // lifetime of a dedicated |worker| for |traits|. |
| + SchedulerSingleThreadTaskRunner( |
| + SchedulerSingleThreadTaskRunnerManager* const outer, |
| + const TaskTraits& traits, |
| + SchedulerWorker* worker) |
| + : outer_(outer), traits_(traits), worker_(worker) { |
| + DCHECK(outer_); |
| + DCHECK(worker_); |
| + } |
| + |
| + // SingleThreadTaskRunner: |
| + bool PostDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) override; |
| + |
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + base::TimeDelta delay) override { |
| + // Tasks are never nested within the task scheduler. |
| + return PostDelayedTask(from_here, closure, delay); |
| + } |
| + |
| + bool RunsTasksOnCurrentThread() const override { |
| + auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| + return delegate->RunsTasksOnCurrentThread(); |
| + } |
| + |
| + private: |
| + ~SchedulerSingleThreadTaskRunner() override { |
| + outer_->UnregisterSchedulerWorker(worker_); |
| + } |
| + |
| + void PostTaskNow(std::unique_ptr<Task> task); |
| + |
| + // Sequence for all Tasks posted through this TaskRunner. |
| + const scoped_refptr<Sequence> sequence_ = new Sequence; |
| + |
| + SchedulerSingleThreadTaskRunnerManager* const outer_; |
| + const TaskTraits traits_; |
| + SchedulerWorker* const worker_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| +}; |
| + |
| +bool SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
| + PostDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) { |
| + auto task = MakeUnique<Task>(from_here, closure, traits_, delay); |
| + task->single_thread_task_runner_ref = this; |
| + |
| + if (!outer_->task_tracker_->WillPostTask(task.get())) |
| + return false; |
| + |
| + if (task->delayed_run_time.is_null()) { |
| + PostTaskNow(std::move(task)); |
| + } else { |
| + outer_->delayed_task_manager_->AddDelayedTask( |
| + std::move(task), |
| + Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, Unretained(this))); |
| + } |
| + return true; |
| +} |
| + |
| +void SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner:: |
| + PostTaskNow(std::unique_ptr<Task> task) { |
| + const bool sequence_was_empty = sequence_->PushTask(std::move(task)); |
| + if (sequence_was_empty) { |
| + auto delegate = static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); |
| + delegate->ReEnqueueSequence(sequence_); |
| + worker_->WakeUp(); |
| + } |
| +} |
| + |
| +SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( |
| + const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, |
| + const TaskScheduler::WorkerPoolIndexForTraitsCallback& |
| + worker_pool_index_for_traits_callback, |
| + TaskTracker* task_tracker, |
| + DelayedTaskManager* delayed_task_manager) |
| + : worker_pool_params_vector_(worker_pool_params_vector), |
| + worker_pool_index_for_traits_callback_( |
| + worker_pool_index_for_traits_callback), |
| + task_tracker_(task_tracker), |
| + delayed_task_manager_(delayed_task_manager) { |
| + DCHECK_GT(worker_pool_params_vector_.size(), 0U); |
| + DCHECK(worker_pool_index_for_traits_callback_); |
| + DCHECK(task_tracker_); |
| + DCHECK(delayed_task_manager_); |
| +} |
| + |
| +SchedulerSingleThreadTaskRunnerManager:: |
| + ~SchedulerSingleThreadTaskRunnerManager() { |
| + DCHECK(workers_.empty()) << "Deleting SchedulerSingleThreadTaskRunnerManager " |
| + "when there are still active " |
| + "SingleThreadTaskRunners. Future calls to those " |
| + "SingleThreadTaskRunners may crash."; |
|
gab
2017/02/23 19:48:10
"may crash" is too loose IMO, suggest:
"Scheduler
robliao
2017/02/23 21:26:06
How about
SchedulerSingleThreadTaskRunners must ou
gab
2017/02/23 21:52:10
I think "otherwise the outstanding SchedulerSingle
robliao
2017/02/24 18:33:59
Done.
|
| +} |
| + |
| +scoped_refptr<SingleThreadTaskRunner> |
| +SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
| + const TaskTraits& traits) { |
| + size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| + DCHECK_LT(index, worker_pool_params_vector_.size()); |
| + return new SchedulerSingleThreadTaskRunner( |
| + this, traits, |
| + CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); |
| +} |
| + |
| +void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { |
| + decltype(workers_) local_workers; |
| + { |
| + AutoSchedulerLock auto_lock(workers_lock_); |
| + local_workers = std::move(workers_); |
| + } |
| + |
| + for (const auto& worker : local_workers) |
| + worker->JoinForTesting(); |
| + |
| + { |
| + AutoSchedulerLock auto_lock(workers_lock_); |
| + DCHECK(workers_.empty()) |
| + << "New worker(s) unexpectedly registered during join."; |
| + workers_ = std::move(local_workers); |
| + } |
| +} |
| + |
| +SchedulerWorker* |
| +SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( |
| + const SchedulerWorkerPoolParams& params) { |
| + AutoSchedulerLock auto_lock(workers_lock_); |
| + int id = next_worker_id_++; |
| + auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( |
| + "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); |
| + workers_.emplace_back(SchedulerWorker::Create( |
| + params.priority_hint(), std::move(delegate), task_tracker_, |
| + SchedulerWorker::InitialState::DETACHED)); |
| + return workers_.back().get(); |
| +} |
| + |
| +void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| + SchedulerWorker* worker) { |
| + // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
| + // |workers_lock_|. |
| + scoped_refptr<SchedulerWorker> worker_to_destroy; |
| + { |
| + AutoSchedulerLock auto_lock(workers_lock_); |
| + |
| + // We might be joining, so no-op this if |workers_| is empty. |
| + if (workers_.empty()) |
| + return; |
| + |
| + auto worker_iter = |
| + std::find_if(workers_.begin(), workers_.end(), |
| + [worker](const scoped_refptr<SchedulerWorker>& candidate) { |
| + return candidate.get() == worker; |
| + }); |
| + DCHECK(worker_iter != workers_.end()); |
| + worker_to_destroy = std::move(*worker_iter); |
| + workers_.erase(worker_iter); |
| + } |
| + worker_to_destroy->Cleanup(); |
| +} |
| + |
| +} // namespace internal |
| +} // namespace base |