Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4045)

Unified Diff: base/task_scheduler/scheduler_worker_thread.cc

Issue 1876363004: TaskScheduler [11] Support ExecutionMode::SINGLE_THREADED. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@8_delayed
Patch Set: self review (remove unused headers) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_thread.cc
diff --git a/base/task_scheduler/scheduler_worker_thread.cc b/base/task_scheduler/scheduler_worker_thread.cc
index 60d05e1ca7b97ec213e8c24f86534e54395f2282..3565bd5e6b9f816cb1cf1d33074aea606e2255e0 100644
--- a/base/task_scheduler/scheduler_worker_thread.cc
+++ b/base/task_scheduler/scheduler_worker_thread.cc
@@ -8,19 +8,82 @@
#include <utility>
+#include "base/lazy_instance.h"
#include "base/logging.h"
+#include "base/memory/ptr_util.h"
#include "base/task_scheduler/task_tracker.h"
+#include "base/task_scheduler/utils.h"
+#include "base/threading/thread_local.h"
+#include "base/time/time.h"
namespace base {
namespace internal {
+namespace {
+
+// SchedulerWorkerThread that owns the current thread, if any.
+LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky
+ tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER;
+
+// A task runner that runs tasks on a single SchedulerWorkerThread.
+class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
+ public:
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
+ SchedulerTaskExecutor* executor,
gab 2016/04/18 18:04:02 TODO on memory management for this one as well for
fdoray 2016/04/18 19:04:50 Done.
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : traits_(traits),
+ executor_(executor),
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager) {}
+
+ // SingleThreadTaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override {
+ // Post the task as part of |sequence|.
+ return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_,
+ executor_, task_tracker_, delayed_task_manager_);
+ }
+
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ base::TimeDelta delay) override {
+ // Tasks are never nested within the task scheduler.
+ return PostDelayedTask(from_here, closure, delay);
+ }
+
+ bool RunsTasksOnCurrentThread() const override {
+ return tls_current_worker_thread.Get().Get() == executor_;
+ }
+
+ private:
+ ~SchedulerSingleThreadTaskRunner() override = default;
+
+ // Sequence for all Tasks posted through this TaskRunner.
+ const scoped_refptr<Sequence> sequence_ = new Sequence;
+
+ const TaskTraits traits_;
+ SchedulerTaskExecutor* const executor_;
+ TaskTracker* const task_tracker_;
+ DelayedTaskManager* const delayed_task_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
+};
+
+} // namespace
+
std::unique_ptr<SchedulerWorkerThread>
SchedulerWorkerThread::CreateSchedulerWorkerThread(
ThreadPriority thread_priority,
Delegate* delegate,
- TaskTracker* task_tracker) {
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager,
+ const PriorityQueue* predecessor_priority_queue) {
std::unique_ptr<SchedulerWorkerThread> worker_thread(
- new SchedulerWorkerThread(thread_priority, delegate, task_tracker));
+ new SchedulerWorkerThread(thread_priority, delegate, task_tracker,
+ delayed_task_manager,
+ predecessor_priority_queue));
if (worker_thread->thread_handle_.is_null())
return nullptr;
@@ -31,6 +94,12 @@ SchedulerWorkerThread::~SchedulerWorkerThread() {
DCHECK(ShouldExitForTesting());
}
+scoped_refptr<SingleThreadTaskRunner>
+SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) {
+ return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
+ traits, this, task_tracker_, delayed_task_manager_));
+}
+
void SchedulerWorkerThread::WakeUp() {
wake_up_event_.Signal();
}
@@ -44,14 +113,36 @@ void SchedulerWorkerThread::JoinForTesting() {
PlatformThread::Join(thread_handle_);
}
-SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority,
- Delegate* delegate,
- TaskTracker* task_tracker)
+void SchedulerWorkerThread::PostTaskWithSequence(
+ std::unique_ptr<Task> task,
+ scoped_refptr<Sequence> sequence) {
+ DCHECK(task);
+ DCHECK(sequence);
+
+ const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue(
+ std::move(task), std::move(sequence), &single_threaded_priority_queue_);
+
+ // If |sequence| wasn't empty before |task| was inserted into it, the worker
+ // thread has already been woken up to run it.
gab 2016/04/18 18:04:02 How about s/run/process/ (at first I thought "it"
fdoray 2016/04/18 19:04:50 Done.
+ if (sequence_was_empty)
+ WakeUp();
gab 2016/04/18 18:04:02 Add a TODO to check for removal from idle stack (a
fdoray 2016/04/18 19:04:50 Done.
+}
+
+SchedulerWorkerThread::SchedulerWorkerThread(
+ ThreadPriority thread_priority,
+ Delegate* delegate,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager,
+ const PriorityQueue* predecessor_priority_queue)
: wake_up_event_(false, false),
+ single_threaded_priority_queue_(predecessor_priority_queue),
delegate_(delegate),
- task_tracker_(task_tracker) {
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager) {
DCHECK(delegate_);
DCHECK(task_tracker_);
+ DCHECK(delayed_task_manager_);
+ DCHECK(predecessor_priority_queue);
const size_t kDefaultStackSize = 0;
PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
@@ -60,13 +151,16 @@ SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority,
void SchedulerWorkerThread::ThreadMain() {
delegate_->OnMainEntry();
+ tls_current_worker_thread.Get().Set(this);
// A SchedulerWorkerThread starts out sleeping.
wake_up_event_.Wait();
while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) {
// Get the sequence containing the next task to execute.
- scoped_refptr<Sequence> sequence = delegate_->GetWork(this);
+ bool is_single_threaded_sequence = false;
+ scoped_refptr<Sequence> sequence = delegate_->GetWork(
+ this, &single_threaded_priority_queue_, &is_single_threaded_sequence);
if (!sequence) {
wake_up_event_.Wait();
@@ -82,8 +176,16 @@ void SchedulerWorkerThread::ThreadMain() {
// either a PriorityQueue or a SchedulerWorkerThread. If it is empty and
// there are live references to it, it will be enqueued when a Task is added
// to it. Otherwise, it will be destroyed at the end of this scope.
- if (!sequence_became_empty)
- delegate_->EnqueueSequence(std::move(sequence));
+ if (!sequence_became_empty) {
+ if (is_single_threaded_sequence) {
+ const auto sort_key = sequence->GetSortKey();
+ single_threaded_priority_queue_.BeginTransaction()->Push(
+ WrapUnique(new PriorityQueue::SequenceAndSortKey(
+ std::move(sequence), sort_key)));
+ } else {
+ delegate_->EnqueueSequence(std::move(sequence));
+ }
+ }
// Calling WakeUp() guarantees that this SchedulerWorkerThread will run
// Tasks from Sequences returned by the GetWork() method of |delegate_|

Powered by Google App Engine
This is Rietveld 408576698