Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(330)

Side by Side Diff: content/renderer/scheduler/task_queue_manager.cc

Issue 637303003: content: Add task queue manager (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Back to content. Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/renderer/scheduler/task_queue_manager.h"
6
7 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "content/renderer/scheduler/task_queue_selector.h"
10
11 namespace content {
12
13 TaskQueueManager::TaskRunner::TaskRunner(
14 base::WeakPtr<TaskQueueManager> task_queue_manager,
15 size_t queue_index)
16 : task_queue_manager_(task_queue_manager), queue_index_(queue_index) {
17 }
18
19 TaskQueueManager::TaskRunner::~TaskRunner() {
20 }
21
22 bool TaskQueueManager::TaskRunner::RunsTasksOnCurrentThread() const {
23 if (!task_queue_manager_)
24 return false;
25 return task_queue_manager_->RunsTasksOnCurrentThread();
26 }
27
28 bool TaskQueueManager::TaskRunner::PostDelayedTask(
29 const tracked_objects::Location& from_here,
30 const base::Closure& task,
31 base::TimeDelta delay) {
32 if (!task_queue_manager_)
33 return false;
34 return task_queue_manager_->PostDelayedTask(
35 queue_index_, from_here, task, delay);
36 }
37
38 bool TaskQueueManager::TaskRunner::PostNonNestableDelayedTask(
39 const tracked_objects::Location& from_here,
40 const base::Closure& task,
41 base::TimeDelta delay) {
42 if (!task_queue_manager_)
43 return false;
44 return task_queue_manager_->PostNonNestableDelayedTask(
45 queue_index_, from_here, task, delay);
46 }
47
48 TaskQueueManager::InternalTaskQueue::InternalTaskQueue() : auto_pump(true) {
49 }
50
51 TaskQueueManager::InternalTaskQueue::~InternalTaskQueue() {
52 }
53
54 TaskQueueManager::TaskQueueManager(
55 size_t task_queue_count,
56 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
57 TaskQueueSelector* selector)
58 : main_task_runner_(main_task_runner),
59 selector_(selector),
60 weak_factory_(this) {
61 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
62
63 for (size_t i = 0; i < task_queue_count; i++) {
64 scoped_ptr<InternalTaskQueue> queue(new InternalTaskQueue());
65 queue->task_runner =
66 make_scoped_refptr(new TaskRunner(weak_factory_.GetWeakPtr(), i));
67 queues_.push_back(queue.release());
68 }
69
70 std::vector<const base::TaskQueue*> work_queues;
71 for (size_t i = 0; i < queues_.size(); i++)
72 work_queues.push_back(&queues_[i]->work_queue);
73 selector_->RegisterWorkQueues(work_queues);
74 }
75
76 TaskQueueManager::~TaskQueueManager() {
77 }
78
79 TaskQueueManager::InternalTaskQueue* TaskQueueManager::Queue(
80 size_t queue_index) const {
81 DCHECK_LT(queue_index, queues_.size());
82 return queues_[queue_index];
83 }
84
85 scoped_refptr<base::SingleThreadTaskRunner>
86 TaskQueueManager::TaskRunnerForQueue(size_t queue_index) {
87 return Queue(queue_index)->task_runner;
88 }
89
90 bool TaskQueueManager::PollQueue(size_t queue_index) {
91 InternalTaskQueue* queue = Queue(queue_index);
92 if (!queue->work_queue.empty())
93 return true;
94 base::AutoLock lock(queue->incoming_queue_lock);
95 return !queue->incoming_queue.empty();
96 }
97
98 bool TaskQueueManager::ReloadWorkQueue(size_t queue_index) {
99 main_thread_checker_.CalledOnValidThread();
100 InternalTaskQueue* queue = Queue(queue_index);
101 DCHECK(queue->work_queue.empty());
102 base::AutoLock lock(queue->incoming_queue_lock);
103 if (!queue->auto_pump)
104 return false;
105 queue->work_queue.Swap(&queue->incoming_queue);
106 return !queue->work_queue.empty();
107 }
108
109 void TaskQueueManager::EnqueueTask(size_t queue_index,
110 const base::PendingTask& pending_task) {
111 InternalTaskQueue* queue = Queue(queue_index);
112 base::AutoLock lock(queue->incoming_queue_lock);
113 if (queue->auto_pump && queue->incoming_queue.empty())
114 PostDoWorkOnMainRunner();
115 queue->incoming_queue.push(pending_task);
116 }
117
118 void TaskQueueManager::SetAutoPump(size_t queue_index, bool auto_pump) {
119 InternalTaskQueue* queue = Queue(queue_index);
120 base::AutoLock lock(queue->incoming_queue_lock);
121 if (auto_pump) {
122 queue->auto_pump = true;
123 PumpQueueLocked(queue);
124 } else {
125 queue->auto_pump = false;
126 }
127 }
128
129 void TaskQueueManager::PumpQueueLocked(InternalTaskQueue* queue) {
130 queue->incoming_queue_lock.AssertAcquired();
131 while (!queue->incoming_queue.empty()) {
132 queue->work_queue.push(queue->incoming_queue.front());
133 queue->incoming_queue.pop();
134 }
135 if (!queue->work_queue.empty())
136 PostDoWorkOnMainRunner();
137 }
138
139 void TaskQueueManager::PumpQueue(size_t queue_index) {
140 InternalTaskQueue* queue = Queue(queue_index);
141 base::AutoLock lock(queue->incoming_queue_lock);
142 PumpQueueLocked(queue);
143 }
144
145 bool TaskQueueManager::UpdateWorkQueues() {
146 // TODO(skyostil): This is not efficient when the number of queues grows very
147 // large due to the number of locks taken. Consider optimizing when we get
148 // there.
149 bool has_work = false;
150 for (size_t i = 0; i < queues_.size(); i++) {
151 if (!queues_[i]->work_queue.empty())
152 has_work = true;
153 else if (ReloadWorkQueue(i))
154 has_work = true;
155 }
156 return has_work;
157 }
158
159 void TaskQueueManager::PostDoWorkOnMainRunner() {
160 main_task_runner_->PostTask(
161 FROM_HERE, Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr()));
162 }
163
164 void TaskQueueManager::DoWork() {
165 main_thread_checker_.CalledOnValidThread();
166 if (!UpdateWorkQueues())
167 return;
168
169 size_t queue_index;
170 if (!selector_->SelectWorkQueueToService(&queue_index))
171 return;
172 PostDoWorkOnMainRunner();
173 RunTaskFromWorkQueue(queue_index);
174 }
175
176 void TaskQueueManager::RunTaskFromWorkQueue(size_t queue_index) {
177 main_thread_checker_.CalledOnValidThread();
178 InternalTaskQueue* queue = Queue(queue_index);
179 DCHECK(!queue->work_queue.empty());
180 base::PendingTask pending_task = queue->work_queue.front();
181 queue->work_queue.pop();
182 task_annotator_.RunTask(
183 "TaskQueueManager::PostTask", "TaskQueueManager::RunTask", pending_task);
184 }
185
186 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
187 return main_task_runner_->RunsTasksOnCurrentThread();
188 }
189
190 bool TaskQueueManager::PostDelayedTask(
191 size_t queue_index,
192 const tracked_objects::Location& from_here,
193 const base::Closure& task,
194 base::TimeDelta delay) {
195 int sequence_num = task_sequence_num_.GetNext();
196
197 base::PendingTask pending_task(from_here, task);
198 pending_task.sequence_num = sequence_num;
199
200 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task);
201 if (delay > base::TimeDelta()) {
202 return main_task_runner_->PostDelayedTask(
203 from_here,
204 Bind(&TaskQueueManager::EnqueueTask,
205 weak_factory_.GetWeakPtr(),
206 queue_index,
207 pending_task),
208 delay);
209 }
210 EnqueueTask(queue_index, pending_task);
211 return true;
212 }
213
214 bool TaskQueueManager::PostNonNestableDelayedTask(
215 size_t queue_index,
216 const tracked_objects::Location& from_here,
217 const base::Closure& task,
218 base::TimeDelta delay) {
219 // Defer non-nestable work to the main task runner.
220 return main_task_runner_->PostNonNestableDelayedTask(from_here, task, delay);
221 }
222
223 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698