| Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc | 
| diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..0d9d55dbfab7f40183ff5f95b154c0b395868c8c | 
| --- /dev/null | 
| +++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc | 
| @@ -0,0 +1,308 @@ | 
| +// Copyright 2017 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +#include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" | 
| + | 
| +#include <algorithm> | 
| +#include <memory> | 
| +#include <string> | 
| + | 
| +#include "base/bind.h" | 
| +#include "base/callback.h" | 
| +#include "base/memory/ptr_util.h" | 
| +#include "base/single_thread_task_runner.h" | 
| +#include "base/strings/stringprintf.h" | 
| +#include "base/synchronization/atomic_flag.h" | 
| +#include "base/task_scheduler/delayed_task_manager.h" | 
| +#include "base/task_scheduler/scheduler_worker.h" | 
| +#include "base/task_scheduler/sequence.h" | 
| +#include "base/task_scheduler/task.h" | 
| +#include "base/task_scheduler/task_tracker.h" | 
| +#include "base/task_scheduler/task_traits.h" | 
| +#include "base/threading/platform_thread.h" | 
| +#include "base/time/time.h" | 
| + | 
| +namespace base { | 
| +namespace internal { | 
| + | 
| +namespace { | 
| + | 
| +// Allows for checking the PlatformThread::CurrentRef() against a set | 
| +// PlatformThreadRef atomically without using locks. | 
| +class AtomicThreadRefChecker { | 
| + public: | 
| +  AtomicThreadRefChecker() = default; | 
| +  ~AtomicThreadRefChecker() = default; | 
| + | 
| +  void Set() { | 
| +    thread_ref_ = PlatformThread::CurrentRef(); | 
| +    is_set_.Set(); | 
| +  } | 
| + | 
| +  bool IsCurrentThreadSameAsSetThread() { | 
| +    return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef(); | 
| +  } | 
| + | 
| + private: | 
| +  AtomicFlag is_set_; | 
| +  PlatformThreadRef thread_ref_; | 
| + | 
| +  DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker); | 
| +}; | 
| + | 
| +class SchedulerWorkerDelegate : public SchedulerWorker::Delegate { | 
| + public: | 
| +  SchedulerWorkerDelegate(const std::string& thread_name) | 
| +      : thread_name_(thread_name) {} | 
| + | 
| +  // SchedulerWorker::Delegate: | 
| +  void OnMainEntry(SchedulerWorker* worker) override { | 
| +    thread_ref_checker_.Set(); | 
| +    PlatformThread::SetName(thread_name_); | 
| +  } | 
| + | 
| +  scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { | 
| +    AutoSchedulerLock auto_lock(sequence_lock_); | 
| +    bool has_work = has_work_; | 
| +    has_work_ = false; | 
| +    return has_work ? sequence_ : nullptr; | 
| +  } | 
| + | 
| +  void DidRunTask() override {} | 
| + | 
| +  void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override { | 
| +    AutoSchedulerLock auto_lock(sequence_lock_); | 
| +    // We've shut down, so no-op this work request. Any sequence cleanup will | 
| +    // occur in the caller's context. | 
| +    if (!sequence_) | 
| +      return; | 
| + | 
| +    DCHECK_EQ(sequence, sequence_); | 
| +    has_work_ = true; | 
| +  } | 
| + | 
| +  TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } | 
| + | 
| +  bool CanDetach(SchedulerWorker* worker) override { return false; } | 
| + | 
| +  void OnDetach() override { NOTREACHED(); } | 
| + | 
| +  bool RunsTasksOnCurrentThread() { | 
| +    // We check the thread ref instead of the sequence for the benefit of COM | 
| +    // callbacks which may execute without a sequence context. | 
| +    return thread_ref_checker_.IsCurrentThreadSameAsSetThread(); | 
| +  } | 
| + | 
| +  void OnMainExit() override { | 
| +    // Move |sequence_| to |local_sequence| so that if we have the last | 
| +    // reference to the sequence we don't destroy it (and its tasks) within | 
| +    // |sequence_lock_|. | 
| +    scoped_refptr<Sequence> local_sequence; | 
| +    { | 
| +      AutoSchedulerLock auto_lock(sequence_lock_); | 
| +      // To reclaim skipped tasks on shutdown, we null out the sequence to allow | 
| +      // the tasks to destroy themselves. | 
| +      local_sequence = std::move(sequence_); | 
| +    } | 
| +  } | 
| + | 
| +  // SchedulerWorkerDelegate: | 
| + | 
| +  // Consumers should release their sequence reference as soon as possible to | 
| +  // ensure timely cleanup for general shutdown. | 
| +  scoped_refptr<Sequence> sequence() { | 
| +    AutoSchedulerLock auto_lock(sequence_lock_); | 
| +    return sequence_; | 
| +  } | 
| + | 
| + private: | 
| +  const std::string thread_name_; | 
| + | 
| +  // Synchronizes access to |sequence_| and |has_work_|. | 
| +  SchedulerLock sequence_lock_; | 
| +  scoped_refptr<Sequence> sequence_ = new Sequence; | 
| +  bool has_work_ = false; | 
| + | 
| +  AtomicThreadRefChecker thread_ref_checker_; | 
| + | 
| +  DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); | 
| +}; | 
| + | 
| +}  // namespace | 
| + | 
| +class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner | 
| +    : public SingleThreadTaskRunner { | 
| + public: | 
| +  // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the | 
| +  // lifetime of a dedicated |worker| for |traits|. | 
| +  SchedulerSingleThreadTaskRunner( | 
| +      SchedulerSingleThreadTaskRunnerManager* const outer, | 
| +      const TaskTraits& traits, | 
| +      SchedulerWorker* worker) | 
| +      : outer_(outer), traits_(traits), worker_(worker) { | 
| +    DCHECK(outer_); | 
| +    DCHECK(worker_); | 
| +  } | 
| + | 
| +  // SingleThreadTaskRunner: | 
| +  bool PostDelayedTask(const tracked_objects::Location& from_here, | 
| +                       const Closure& closure, | 
| +                       TimeDelta delay) override { | 
| +    auto task = MakeUnique<Task>(from_here, closure, traits_, delay); | 
| +    task->single_thread_task_runner_ref = this; | 
| + | 
| +    if (!outer_->task_tracker_->WillPostTask(task.get())) | 
| +      return false; | 
| + | 
| +    if (task->delayed_run_time.is_null()) { | 
| +      PostTaskNow(std::move(task)); | 
| +    } else { | 
| +      outer_->delayed_task_manager_->AddDelayedTask( | 
| +          std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, | 
| +                                Unretained(this))); | 
| +    } | 
| +    return true; | 
| +  } | 
| + | 
| +  bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 
| +                                  const Closure& closure, | 
| +                                  base::TimeDelta delay) override { | 
| +    // Tasks are never nested within the task scheduler. | 
| +    return PostDelayedTask(from_here, closure, delay); | 
| +  } | 
| + | 
| +  bool RunsTasksOnCurrentThread() const override { | 
| +    return GetDelegate()->RunsTasksOnCurrentThread(); | 
| +  } | 
| + | 
| + private: | 
| +  ~SchedulerSingleThreadTaskRunner() override { | 
| +    outer_->UnregisterSchedulerWorker(worker_); | 
| +  } | 
| + | 
| +  void PostTaskNow(std::unique_ptr<Task> task) { | 
| +    scoped_refptr<Sequence> sequence = GetDelegate()->sequence(); | 
| +    // If |sequence| is null, then the thread is effectively gone (either | 
| +    // shutdown or joined). | 
| +    if (!sequence) | 
| +      return; | 
| + | 
| +    const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 
| +    if (sequence_was_empty) { | 
| +      GetDelegate()->ReEnqueueSequence(std::move(sequence)); | 
| +      worker_->WakeUp(); | 
| +    } | 
| +  } | 
| + | 
| +  SchedulerWorkerDelegate* GetDelegate() const { | 
| +    return static_cast<SchedulerWorkerDelegate*>(worker_->delegate()); | 
| +  } | 
| + | 
| +  SchedulerSingleThreadTaskRunnerManager* const outer_; | 
| +  const TaskTraits traits_; | 
| +  SchedulerWorker* const worker_; | 
| + | 
| +  DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 
| +}; | 
| + | 
| +SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager( | 
| +    const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector, | 
| +    const TaskScheduler::WorkerPoolIndexForTraitsCallback& | 
| +        worker_pool_index_for_traits_callback, | 
| +    TaskTracker* task_tracker, | 
| +    DelayedTaskManager* delayed_task_manager) | 
| +    : worker_pool_params_vector_(worker_pool_params_vector), | 
| +      worker_pool_index_for_traits_callback_( | 
| +          worker_pool_index_for_traits_callback), | 
| +      task_tracker_(task_tracker), | 
| +      delayed_task_manager_(delayed_task_manager) { | 
| +  DCHECK_GT(worker_pool_params_vector_.size(), 0U); | 
| +  DCHECK(worker_pool_index_for_traits_callback_); | 
| +  DCHECK(task_tracker_); | 
| +  DCHECK(delayed_task_manager_); | 
| +} | 
| + | 
| +SchedulerSingleThreadTaskRunnerManager:: | 
| +    ~SchedulerSingleThreadTaskRunnerManager() { | 
| +#if DCHECK_IS_ON() | 
| +  size_t workers_unregistered_during_join = | 
| +      subtle::NoBarrier_Load(&workers_unregistered_during_join_); | 
| +  DCHECK_EQ(workers_unregistered_during_join, workers_.size()) | 
| +      << "There cannot be outstanding SingleThreadTaskRunners upon destruction" | 
| +         "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; | 
| +#endif | 
| +} | 
| + | 
| +scoped_refptr<SingleThreadTaskRunner> | 
| +SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( | 
| +    const TaskTraits& traits) { | 
| +  size_t index = worker_pool_index_for_traits_callback_.Run(traits); | 
| +  DCHECK_LT(index, worker_pool_params_vector_.size()); | 
| +  return new SchedulerSingleThreadTaskRunner( | 
| +      this, traits, | 
| +      CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); | 
| +} | 
| + | 
| +void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { | 
| +  decltype(workers_) local_workers; | 
| +  { | 
| +    AutoSchedulerLock auto_lock(workers_lock_); | 
| +    local_workers = std::move(workers_); | 
| +  } | 
| + | 
| +  for (const auto& worker : local_workers) | 
| +    worker->JoinForTesting(); | 
| + | 
| +  { | 
| +    AutoSchedulerLock auto_lock(workers_lock_); | 
| +    DCHECK(workers_.empty()) | 
| +        << "New worker(s) unexpectedly registered during join."; | 
| +    workers_ = std::move(local_workers); | 
| +  } | 
| +} | 
| + | 
| +SchedulerWorker* | 
| +SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( | 
| +    const SchedulerWorkerPoolParams& params) { | 
| +  AutoSchedulerLock auto_lock(workers_lock_); | 
| +  int id = next_worker_id_++; | 
| +  auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( | 
| +      "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); | 
| +  workers_.emplace_back(SchedulerWorker::Create( | 
| +      params.priority_hint(), std::move(delegate), task_tracker_, | 
| +      SchedulerWorker::InitialState::DETACHED)); | 
| +  return workers_.back().get(); | 
| +} | 
| + | 
| +void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( | 
| +    SchedulerWorker* worker) { | 
| +  // Cleanup uses a SchedulerLock, so call Cleanup() after releasing | 
| +  // |workers_lock_|. | 
| +  scoped_refptr<SchedulerWorker> worker_to_destroy; | 
| +  { | 
| +    AutoSchedulerLock auto_lock(workers_lock_); | 
| + | 
| +    // We might be joining, so record that a worker was unregistered for | 
| +    // verification at destruction. | 
| +    if (workers_.empty()) { | 
| +#if DCHECK_IS_ON() | 
| +      subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1); | 
| +#endif | 
| +      return; | 
| +    } | 
| + | 
| +    auto worker_iter = | 
| +        std::find_if(workers_.begin(), workers_.end(), | 
| +                     [worker](const scoped_refptr<SchedulerWorker>& candidate) { | 
| +                       return candidate.get() == worker; | 
| +                     }); | 
| +    DCHECK(worker_iter != workers_.end()); | 
| +    worker_to_destroy = std::move(*worker_iter); | 
| +    workers_.erase(worker_iter); | 
| +  } | 
| +  worker_to_destroy->Cleanup(); | 
| +} | 
| + | 
| +}  // namespace internal | 
| +}  // namespace base | 
|  |