Index: base/task_scheduler/scheduler_worker_thread.cc |
diff --git a/base/task_scheduler/scheduler_worker_thread.cc b/base/task_scheduler/scheduler_worker_thread.cc |
index 60d05e1ca7b97ec213e8c24f86534e54395f2282..3565bd5e6b9f816cb1cf1d33074aea606e2255e0 100644 |
--- a/base/task_scheduler/scheduler_worker_thread.cc |
+++ b/base/task_scheduler/scheduler_worker_thread.cc |
@@ -8,19 +8,82 @@ |
#include <utility> |
+#include "base/lazy_instance.h" |
#include "base/logging.h" |
+#include "base/memory/ptr_util.h" |
#include "base/task_scheduler/task_tracker.h" |
+#include "base/task_scheduler/utils.h" |
+#include "base/threading/thread_local.h" |
+#include "base/time/time.h" |
namespace base { |
namespace internal { |
+namespace { |
+ |
+// SchedulerWorkerThread that owns the current thread, if any. |
+LazyInstance<ThreadLocalPointer<const SchedulerTaskExecutor>>::Leaky |
+ tls_current_worker_thread = LAZY_INSTANCE_INITIALIZER; |
+ |
+// A task runner that runs tasks on a single SchedulerWorkerThread. |
+class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
+ public: |
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
+ SchedulerTaskExecutor* executor, |
gab
2016/04/18 18:04:02
TODO on memory management for this one as well for
fdoray
2016/04/18 19:04:50
Done.
|
+ TaskTracker* task_tracker, |
+ DelayedTaskManager* delayed_task_manager) |
+ : traits_(traits), |
+ executor_(executor), |
+ task_tracker_(task_tracker), |
+ delayed_task_manager_(delayed_task_manager) {} |
+ |
+ // SingleThreadTaskRunner: |
+ bool PostDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) override { |
+ // Post the task as part of |sequence|. |
+ return PostTaskToExecutor(from_here, closure, traits_, delay, sequence_, |
+ executor_, task_tracker_, delayed_task_manager_); |
+ } |
+ |
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ base::TimeDelta delay) override { |
+ // Tasks are never nested within the task scheduler. |
+ return PostDelayedTask(from_here, closure, delay); |
+ } |
+ |
+ bool RunsTasksOnCurrentThread() const override { |
+ return tls_current_worker_thread.Get().Get() == executor_; |
+ } |
+ |
+ private: |
+ ~SchedulerSingleThreadTaskRunner() override = default; |
+ |
+ // Sequence for all Tasks posted through this TaskRunner. |
+ const scoped_refptr<Sequence> sequence_ = new Sequence; |
+ |
+ const TaskTraits traits_; |
+ SchedulerTaskExecutor* const executor_; |
+ TaskTracker* const task_tracker_; |
+ DelayedTaskManager* const delayed_task_manager_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
+}; |
+ |
+} // namespace |
+ |
std::unique_ptr<SchedulerWorkerThread> |
SchedulerWorkerThread::CreateSchedulerWorkerThread( |
ThreadPriority thread_priority, |
Delegate* delegate, |
- TaskTracker* task_tracker) { |
+ TaskTracker* task_tracker, |
+ DelayedTaskManager* delayed_task_manager, |
+ const PriorityQueue* predecessor_priority_queue) { |
std::unique_ptr<SchedulerWorkerThread> worker_thread( |
- new SchedulerWorkerThread(thread_priority, delegate, task_tracker)); |
+ new SchedulerWorkerThread(thread_priority, delegate, task_tracker, |
+ delayed_task_manager, |
+ predecessor_priority_queue)); |
if (worker_thread->thread_handle_.is_null()) |
return nullptr; |
@@ -31,6 +94,12 @@ SchedulerWorkerThread::~SchedulerWorkerThread() { |
DCHECK(ShouldExitForTesting()); |
} |
+scoped_refptr<SingleThreadTaskRunner> |
+SchedulerWorkerThread::CreateTaskRunnerWithTraits(const TaskTraits& traits) { |
+ return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
+ traits, this, task_tracker_, delayed_task_manager_)); |
+} |
+ |
void SchedulerWorkerThread::WakeUp() { |
wake_up_event_.Signal(); |
} |
@@ -44,14 +113,36 @@ void SchedulerWorkerThread::JoinForTesting() { |
PlatformThread::Join(thread_handle_); |
} |
-SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, |
- Delegate* delegate, |
- TaskTracker* task_tracker) |
+void SchedulerWorkerThread::PostTaskWithSequence( |
+ std::unique_ptr<Task> task, |
+ scoped_refptr<Sequence> sequence) { |
+ DCHECK(task); |
+ DCHECK(sequence); |
+ |
+ const bool sequence_was_empty = AddTaskToSequenceAndPriorityQueue( |
+ std::move(task), std::move(sequence), &single_threaded_priority_queue_); |
+ |
+ // If |sequence| wasn't empty before |task| was inserted into it, the worker |
+ // thread has already been woken up to run it. |
gab
2016/04/18 18:04:02
How about s/run/process/ (at first I thought "it"
fdoray
2016/04/18 19:04:50
Done.
|
+ if (sequence_was_empty) |
+ WakeUp(); |
gab
2016/04/18 18:04:02
Add a TODO to check for removal from idle stack (a
fdoray
2016/04/18 19:04:50
Done.
|
+} |
+ |
+SchedulerWorkerThread::SchedulerWorkerThread( |
+ ThreadPriority thread_priority, |
+ Delegate* delegate, |
+ TaskTracker* task_tracker, |
+ DelayedTaskManager* delayed_task_manager, |
+ const PriorityQueue* predecessor_priority_queue) |
: wake_up_event_(false, false), |
+ single_threaded_priority_queue_(predecessor_priority_queue), |
delegate_(delegate), |
- task_tracker_(task_tracker) { |
+ task_tracker_(task_tracker), |
+ delayed_task_manager_(delayed_task_manager) { |
DCHECK(delegate_); |
DCHECK(task_tracker_); |
+ DCHECK(delayed_task_manager_); |
+ DCHECK(predecessor_priority_queue); |
const size_t kDefaultStackSize = 0; |
PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
@@ -60,13 +151,16 @@ SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, |
void SchedulerWorkerThread::ThreadMain() { |
delegate_->OnMainEntry(); |
+ tls_current_worker_thread.Get().Set(this); |
// A SchedulerWorkerThread starts out sleeping. |
wake_up_event_.Wait(); |
while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
// Get the sequence containing the next task to execute. |
- scoped_refptr<Sequence> sequence = delegate_->GetWork(this); |
+ bool is_single_threaded_sequence = false; |
+ scoped_refptr<Sequence> sequence = delegate_->GetWork( |
+ this, &single_threaded_priority_queue_, &is_single_threaded_sequence); |
if (!sequence) { |
wake_up_event_.Wait(); |
@@ -82,8 +176,16 @@ void SchedulerWorkerThread::ThreadMain() { |
// either a PriorityQueue or a SchedulerWorkerThread. If it is empty and |
// there are live references to it, it will be enqueued when a Task is added |
// to it. Otherwise, it will be destroyed at the end of this scope. |
- if (!sequence_became_empty) |
- delegate_->EnqueueSequence(std::move(sequence)); |
+ if (!sequence_became_empty) { |
+ if (is_single_threaded_sequence) { |
+ const auto sort_key = sequence->GetSortKey(); |
+ single_threaded_priority_queue_.BeginTransaction()->Push( |
+ WrapUnique(new PriorityQueue::SequenceAndSortKey( |
+ std::move(sequence), sort_key))); |
+ } else { |
+ delegate_->EnqueueSequence(std::move(sequence)); |
+ } |
+ } |
// Calling WakeUp() guarantees that this SchedulerWorkerThread will run |
// Tasks from Sequences returned by the GetWork() method of |delegate_| |