| Index: base/task_scheduler/scheduler_worker_thread.cc
|
| diff --git a/base/task_scheduler/scheduler_worker_thread.cc b/base/task_scheduler/scheduler_worker_thread.cc
|
| index 60d05e1ca7b97ec213e8c24f86534e54395f2282..d96961de5f5ee3e74127b1e9f0c2d4f11767b1eb 100644
|
| --- a/base/task_scheduler/scheduler_worker_thread.cc
|
| +++ b/base/task_scheduler/scheduler_worker_thread.cc
|
| @@ -8,19 +8,85 @@
|
|
|
| #include <utility>
|
|
|
| +#include "base/lazy_instance.h"
|
| #include "base/logging.h"
|
| +#include "base/memory/ptr_util.h"
|
| #include "base/task_scheduler/task_tracker.h"
|
| +#include "base/task_scheduler/utils.h"
|
| +#include "base/threading/thread_local.h"
|
| +#include "base/time/time.h"
|
|
|
| namespace base {
|
| namespace internal {
|
|
|
| +namespace {
|
| +
|
| +// SchedulerWorkerThread that owns the current thread, if any.
|
| +LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
|
| + tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
|
| +
|
| +// A task runner that runs tasks on a single SchedulerWorkerThread.
|
| +class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
|
| + public:
|
| + // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
|
| + // tasks so long as |executor| is alive.
|
| + // TODO(robliao): Find a concrete way to manage |executor|'s memory.
|
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
|
| + SchedulerTaskExecutor* executor,
|
| + TaskTracker* task_tracker,
|
| + DelayedTaskManager* delayed_task_manager)
|
| + : traits_(traits),
|
| + executor_(executor),
|
| + task_tracker_(task_tracker),
|
| + delayed_task_manager_(delayed_task_manager) {}
|
| +
|
| + // SingleThreadTaskRunner:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) override {
|
| + // Post the task as part of |sequence|.
|
| + return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_,
|
| + executor_, task_tracker_, delayed_task_manager_);
|
| + }
|
| +
|
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + base::TimeDelta delay) override {
|
| + // Tasks are never nested within the task scheduler.
|
| + return PostDelayedTask(from_here, closure, delay);
|
| + }
|
| +
|
| + bool RunsTasksOnCurrentThread() const override {
|
| + return tls_current_worker_thread.Get().Get() == executor_;
|
| + }
|
| +
|
| + private:
|
| + ~SchedulerSingleThreadTaskRunner() override = default;
|
| +
|
| + // Sequence for all Tasks posted through this TaskRunner.
|
| + const scoped_refptr<Sequence> sequence_ = new Sequence;
|
| +
|
| + const TaskTraits traits_;
|
| + SchedulerTaskExecutor* const executor_;
|
| + TaskTracker* const task_tracker_;
|
| + DelayedTaskManager* const delayed_task_manager_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| std::unique_ptr<SchedulerWorkerThread>
|
| SchedulerWorkerThread::CreateSchedulerWorkerThread(
|
| ThreadPriority thread_priority,
|
| Delegate* delegate,
|
| - TaskTracker* task_tracker) {
|
| + TaskTracker* task_tracker,
|
| + DelayedTaskManager* delayed_task_manager,
|
| + const PriorityQueue* predecessor_priority_queue) {
|
| std::unique_ptr<SchedulerWorkerThread> worker_thread(
|
| - new SchedulerWorkerThread(thread_priority, delegate, task_tracker));
|
| + new SchedulerWorkerThread(thread_priority, delegate, task_tracker,
|
| + delayed_task_manager,
|
| + predecessor_priority_queue));
|
|
|
| if (worker_thread->thread_handle_.is_null())
|
| return nullptr;
|
| @@ -31,6 +97,12 @@ SchedulerWorkerThread::~SchedulerWorkerThread() {
|
| DCHECK(ShouldExitForTesting());
|
| }
|
|
|
| +scoped_refptr<SingleThreadTaskRunner>
|
| +SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) {
|
| + return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
|
| + traits, this, task_tracker_, delayed_task_manager_));
|
| +}
|
| +
|
| void SchedulerWorkerThread::WakeUp() {
|
| wake_up_event_.Signal();
|
| }
|
| @@ -44,14 +116,38 @@ void SchedulerWorkerThread::JoinForTesting() {
|
| PlatformThread::Join(thread_handle_);
|
| }
|
|
|
| -SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority,
|
| - Delegate* delegate,
|
| - TaskTracker* task_tracker)
|
| +void SchedulerWorkerThread::PostTaskWithSequence(
|
| + std::unique_ptr<Task> task,
|
| + scoped_refptr<Sequence> sequence) {
|
| + DCHECK(task);
|
| + DCHECK(sequence);
|
| +
|
| + const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
|
| + std::move(task), std::move(sequence), &single_threaded_priority_queue_);
|
| +
|
| + // If |sequence| wasn't empty before |task| was inserted into it, the worker
|
| + // thread has already been woken up to process it.
|
| + // TODO(fdoray): Remove the worker thread from the stack of idle threads of
|
| + // its parent thread pool when it is woken up to run single-threaded tasks.
|
| + if (sequence_was_empty)
|
| + WakeUp();
|
| +}
|
| +
|
| +SchedulerWorkerThread::SchedulerWorkerThread(
|
| + ThreadPriority thread_priority,
|
| + Delegate* delegate,
|
| + TaskTracker* task_tracker,
|
| + DelayedTaskManager* delayed_task_manager,
|
| + const PriorityQueue* predecessor_priority_queue)
|
| : wake_up_event_(false, false),
|
| + single_threaded_priority_queue_(predecessor_priority_queue),
|
| delegate_(delegate),
|
| - task_tracker_(task_tracker) {
|
| + task_tracker_(task_tracker),
|
| + delayed_task_manager_(delayed_task_manager) {
|
| DCHECK(delegate_);
|
| DCHECK(task_tracker_);
|
| + DCHECK(delayed_task_manager_);
|
| + DCHECK(predecessor_priority_queue);
|
|
|
| const size_t kDefaultStackSize = 0;
|
| PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
|
| @@ -60,13 +156,16 @@ SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority,
|
|
|
| void SchedulerWorkerThread::ThreadMain() {
|
| delegate_->OnMainEntry();
|
| + tls_current_worker_thread.Get().Set(this);
|
|
|
| // A SchedulerWorkerThread starts out sleeping.
|
| wake_up_event_.Wait();
|
|
|
| while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) {
|
| // Get the sequence containing the next task to execute.
|
| - scoped_refptr<Sequence> sequence = delegate_->GetWork(this);
|
| + bool is_single_threaded_sequence = false;
|
| + scoped_refptr<Sequence> sequence = delegate_->GetWork(
|
| + this, &single_threaded_priority_queue_, &is_single_threaded_sequence);
|
|
|
| if (!sequence) {
|
| wake_up_event_.Wait();
|
| @@ -82,8 +181,16 @@ void SchedulerWorkerThread::ThreadMain() {
|
| // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and
|
| // there are live references to it, it will be enqueued when a Task is added
|
| // to it. Otherwise, it will be destroyed at the end of this scope.
|
| - if (!sequence_became_empty)
|
| - delegate_->EnqueueSequence(std::move(sequence));
|
| + if (!sequence_became_empty) {
|
| + if (is_single_threaded_sequence) {
|
| + const auto sort_key = sequence->GetSortKey();
|
| + single_threaded_priority_queue_.BeginTransaction()->Push(
|
| + WrapUnique(new PriorityQueue::SequenceAndSortKey(
|
| + std::move(sequence), sort_key)));
|
| + } else {
|
| + delegate_->EnqueueSequence(std::move(sequence));
|
| + }
|
| + }
|
|
|
| // Calling WakeUp() guarantees that this SchedulerWorkerThread will run
|
| // Tasks from Sequences returned by the GetWork() method of |delegate_|
|
|
|