Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(361)

Side by Side Diff: components/scheduler/child/task_queue_manager.cc

Issue 1223163006: Implement PostDelayedTaskAt for guaranteed timer ordering (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebased. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/scheduler/child/task_queue_manager.h" 5 #include "components/scheduler/child/task_queue_manager.h"
6 6
7 #include <queue> 7 #include <queue>
8 #include <set> 8 #include <set>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/time/default_tick_clock.h" 11 #include "base/time/default_tick_clock.h"
12 #include "base/trace_event/trace_event.h" 12 #include "base/trace_event/trace_event.h"
13 #include "base/trace_event/trace_event_argument.h" 13 #include "base/trace_event/trace_event_argument.h"
14 #include "components/scheduler/child/nestable_single_thread_task_runner.h" 14 #include "components/scheduler/child/nestable_single_thread_task_runner.h"
15 #include "components/scheduler/child/task_queue.h"
15 #include "components/scheduler/child/task_queue_selector.h" 16 #include "components/scheduler/child/task_queue_selector.h"
16 17
17 namespace { 18 namespace {
18 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max(); 19 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
19 } 20 }
20 21
21 namespace scheduler { 22 namespace scheduler {
22 namespace internal { 23 namespace internal {
23 24
24 // Now() is somewhat expensive so it makes sense not to call Now() unless we 25 // Now() is somewhat expensive so it makes sense not to call Now() unless we
(...skipping 12 matching lines...) Expand all
37 if (now_.is_null()) 38 if (now_.is_null())
38 now_ = task_queue_manager_->Now(); 39 now_ = task_queue_manager_->Now();
39 return now_; 40 return now_;
40 } 41 }
41 42
42 private: 43 private:
43 TaskQueueManager* task_queue_manager_; // NOT OWNED 44 TaskQueueManager* task_queue_manager_; // NOT OWNED
44 base::TimeTicks now_; 45 base::TimeTicks now_;
45 }; 46 };
46 47
47 class TaskQueue : public base::SingleThreadTaskRunner { 48 class TaskQueueImpl : public TaskQueue {
48 public: 49 public:
49 TaskQueue(TaskQueueManager* task_queue_manager, 50 TaskQueueImpl(TaskQueueManager* task_queue_manager,
50 const char* disabled_by_default_tracing_category, 51 const char* disabled_by_default_tracing_category,
51 const char* disabled_by_default_verbose_tracing_category); 52 const char* disabled_by_default_verbose_tracing_category);
52 53
53 // base::SingleThreadTaskRunner implementation. 54 // TaskQueue :implementation.
54 bool RunsTasksOnCurrentThread() const override; 55 bool RunsTasksOnCurrentThread() const override;
55 bool PostDelayedTask(const tracked_objects::Location& from_here, 56 bool PostDelayedTask(const tracked_objects::Location& from_here,
56 const base::Closure& task, 57 const base::Closure& task,
57 base::TimeDelta delay) override { 58 base::TimeDelta delay) override {
58 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL); 59 return PostDelayedTaskImpl(from_here, task, delay, base::TimeTicks(),
60 TaskType::NORMAL);
59 } 61 }
60 62
61 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 63 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
62 const base::Closure& task, 64 const base::Closure& task,
63 base::TimeDelta delay) override { 65 base::TimeDelta delay) override {
64 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE); 66 return PostDelayedTaskImpl(from_here, task, delay, base::TimeTicks(),
67 TaskType::NON_NESTABLE);
68 }
69 bool PostDelayedTaskAt(const tracked_objects::Location& from_here,
70 const base::Closure& task,
71 base::TimeTicks desired_run_time) override {
72 return PostDelayedTaskImpl(from_here, task, base::TimeDelta(),
73 desired_run_time, TaskType::NORMAL);
65 } 74 }
66 75
67 TaskQueueManager::QueueState GetQueueState() const; 76 TaskQueueManager::QueueState GetQueueState() const;
68 77
69 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy); 78 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy);
70 void PumpQueue(); 79 void PumpQueue();
71 80
72 bool NextPendingDelayedTaskRunTime( 81 bool NextPendingDelayedTaskRunTime(
73 base::TimeTicks* next_pending_delayed_task); 82 base::TimeTicks* next_pending_delayed_task);
74 83
(...skipping 19 matching lines...) Expand all
94 } 103 }
95 104
96 void AsValueInto(base::trace_event::TracedValue* state) const; 105 void AsValueInto(base::trace_event::TracedValue* state) const;
97 106
98 private: 107 private:
99 enum class TaskType { 108 enum class TaskType {
100 NORMAL, 109 NORMAL,
101 NON_NESTABLE, 110 NON_NESTABLE,
102 }; 111 };
103 112
104 ~TaskQueue() override; 113 ~TaskQueueImpl() override;
105 114
106 bool PostDelayedTaskImpl(const tracked_objects::Location& from_here, 115 bool PostDelayedTaskImpl(const tracked_objects::Location& from_here,
107 const base::Closure& task, 116 const base::Closure& task,
108 base::TimeDelta delay, 117 base::TimeDelta delay,
alex clarke (OOO till 29th) 2015/07/16 09:29:00 It feels a bit weird having both a delay and a des
Sami 2015/07/16 12:19:32 Yep I struggled with this interface a bit. The loc
118 base::TimeTicks desired_run_time,
109 TaskType task_type); 119 TaskType task_type);
110 120
111 // Delayed task posted to the underlying run loop, which locks |lock_| and 121 // Delayed task posted to the underlying run loop, which locks |lock_| and
112 // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks 122 // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks
113 // that need to be run now. 123 // that need to be run now.
114 void MoveReadyDelayedTasksToIncomingQueue(); 124 void MoveReadyDelayedTasksToIncomingQueue();
115 125
116 // Enqueues any delayed tasks which should be run now on the incoming_queue_ 126 // Enqueues any delayed tasks which should be run now on the incoming_queue_
117 // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled. 127 // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled.
118 // Must be called with |lock_| locked. 128 // Must be called with |lock_| locked.
(...skipping 23 matching lines...) Expand all
142 // This lock protects all members except the work queue, the 152 // This lock protects all members except the work queue, the
143 // main_thread_checker_ and wakeup_policy_. 153 // main_thread_checker_ and wakeup_policy_.
144 mutable base::Lock lock_; 154 mutable base::Lock lock_;
145 base::PlatformThreadId thread_id_; 155 base::PlatformThreadId thread_id_;
146 TaskQueueManager* task_queue_manager_; 156 TaskQueueManager* task_queue_manager_;
147 base::TaskQueue incoming_queue_; 157 base::TaskQueue incoming_queue_;
148 TaskQueueManager::PumpPolicy pump_policy_; 158 TaskQueueManager::PumpPolicy pump_policy_;
149 const char* name_; 159 const char* name_;
150 const char* disabled_by_default_tracing_category_; 160 const char* disabled_by_default_tracing_category_;
151 const char* disabled_by_default_verbose_tracing_category_; 161 const char* disabled_by_default_verbose_tracing_category_;
162
163 // Queue-local task sequence number for maintaining the order of delayed
164 // tasks which are posted for the exact same time. Note that this will be
165 // replaced by the global sequence number when the delay has elapsed.
166 int delayed_task_sequence_number_;
152 base::DelayedTaskQueue delayed_task_queue_; 167 base::DelayedTaskQueue delayed_task_queue_;
153 std::set<base::TimeTicks> in_flight_kick_delayed_tasks_; 168 std::set<base::TimeTicks> in_flight_kick_delayed_tasks_;
154 169
155 base::ThreadChecker main_thread_checker_; 170 base::ThreadChecker main_thread_checker_;
156 base::TaskQueue work_queue_; 171 base::TaskQueue work_queue_;
157 TaskQueueManager::WakeupPolicy wakeup_policy_; 172 TaskQueueManager::WakeupPolicy wakeup_policy_;
158 173
159 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 174 DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl);
160 }; 175 };
161 176
162 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager, 177 TaskQueueImpl::TaskQueueImpl(
163 const char* disabled_by_default_tracing_category, 178 TaskQueueManager* task_queue_manager,
164 const char* disabled_by_default_verbose_tracing_category) 179 const char* disabled_by_default_tracing_category,
180 const char* disabled_by_default_verbose_tracing_category)
165 : thread_id_(base::PlatformThread::CurrentId()), 181 : thread_id_(base::PlatformThread::CurrentId()),
166 task_queue_manager_(task_queue_manager), 182 task_queue_manager_(task_queue_manager),
167 pump_policy_(TaskQueueManager::PumpPolicy::AUTO), 183 pump_policy_(TaskQueueManager::PumpPolicy::AUTO),
168 name_(nullptr), 184 name_(nullptr),
169 disabled_by_default_tracing_category_( 185 disabled_by_default_tracing_category_(
170 disabled_by_default_tracing_category), 186 disabled_by_default_tracing_category),
171 disabled_by_default_verbose_tracing_category_( 187 disabled_by_default_verbose_tracing_category_(
172 disabled_by_default_verbose_tracing_category), 188 disabled_by_default_verbose_tracing_category),
189 delayed_task_sequence_number_(0),
173 wakeup_policy_(TaskQueueManager::WakeupPolicy::CAN_WAKE_OTHER_QUEUES) { 190 wakeup_policy_(TaskQueueManager::WakeupPolicy::CAN_WAKE_OTHER_QUEUES) {
174 } 191 }
175 192
176 TaskQueue::~TaskQueue() { 193 TaskQueueImpl::~TaskQueueImpl() {
177 } 194 }
178 195
179 void TaskQueue::WillDeleteTaskQueueManager() { 196 void TaskQueueImpl::WillDeleteTaskQueueManager() {
180 base::AutoLock lock(lock_); 197 base::AutoLock lock(lock_);
181 task_queue_manager_ = nullptr; 198 task_queue_manager_ = nullptr;
182 delayed_task_queue_ = base::DelayedTaskQueue(); 199 delayed_task_queue_ = base::DelayedTaskQueue();
183 incoming_queue_ = base::TaskQueue(); 200 incoming_queue_ = base::TaskQueue();
184 work_queue_ = base::TaskQueue(); 201 work_queue_ = base::TaskQueue();
185 } 202 }
186 203
187 bool TaskQueue::RunsTasksOnCurrentThread() const { 204 bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
188 base::AutoLock lock(lock_); 205 base::AutoLock lock(lock_);
189 return base::PlatformThread::CurrentId() == thread_id_; 206 return base::PlatformThread::CurrentId() == thread_id_;
190 } 207 }
191 208
192 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, 209 bool TaskQueueImpl::PostDelayedTaskImpl(
193 const base::Closure& task, 210 const tracked_objects::Location& from_here,
194 base::TimeDelta delay, 211 const base::Closure& task,
195 TaskType task_type) { 212 base::TimeDelta delay,
213 base::TimeTicks desired_run_time,
214 TaskType task_type) {
196 base::AutoLock lock(lock_); 215 base::AutoLock lock(lock_);
197 if (!task_queue_manager_) 216 if (!task_queue_manager_)
198 return false; 217 return false;
199 218
200 base::PendingTask pending_task(from_here, task, base::TimeTicks(), 219 base::PendingTask pending_task(from_here, task, base::TimeTicks(),
201 task_type != TaskType::NON_NESTABLE); 220 task_type != TaskType::NON_NESTABLE);
202 task_queue_manager_->DidQueueTask(pending_task); 221 task_queue_manager_->DidQueueTask(pending_task);
203 222
204 if (delay > base::TimeDelta()) { 223 if (delay > base::TimeDelta() || !desired_run_time.is_null()) {
205 base::TimeTicks now = task_queue_manager_->Now(); 224 base::TimeTicks now = task_queue_manager_->Now();
206 pending_task.delayed_run_time = now + delay; 225 if (!desired_run_time.is_null()) {
226 pending_task.delayed_run_time = std::max(now, desired_run_time);
alex clarke (OOO till 29th) 2015/07/16 09:29:00 Should we just enqueue the task if desired runtime
Sami 2015/07/16 12:19:32 I think that might change the ordering. For exampl
227 } else {
228 pending_task.delayed_run_time = now + delay;
229 }
230 pending_task.sequence_num = delayed_task_sequence_number_++;
207 delayed_task_queue_.push(pending_task); 231 delayed_task_queue_.push(pending_task);
208 TraceQueueSize(true); 232 TraceQueueSize(true);
209 // If we changed the topmost task, then it is time to reschedule. 233 // If we changed the topmost task, then it is time to reschedule.
210 if (delayed_task_queue_.top().task.Equals(pending_task.task)) { 234 if (delayed_task_queue_.top().task.Equals(pending_task.task)) {
211 LazyNow lazy_now(now); 235 LazyNow lazy_now(now);
212 ScheduleDelayedWorkLocked(&lazy_now); 236 ScheduleDelayedWorkLocked(&lazy_now);
213 } 237 }
214 return true; 238 return true;
215 } 239 }
216 EnqueueTaskLocked(pending_task); 240 EnqueueTaskLocked(pending_task);
217 return true; 241 return true;
218 } 242 }
219 243
220 void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { 244 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue() {
221 DCHECK(main_thread_checker_.CalledOnValidThread()); 245 DCHECK(main_thread_checker_.CalledOnValidThread());
222 base::AutoLock lock(lock_); 246 base::AutoLock lock(lock_);
223 if (!task_queue_manager_) 247 if (!task_queue_manager_)
224 return; 248 return;
225 249
226 LazyNow lazy_now(task_queue_manager_); 250 LazyNow lazy_now(task_queue_manager_);
227 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); 251 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
228 } 252 }
229 253
230 void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { 254 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked(
255 LazyNow* lazy_now) {
231 lock_.AssertAcquired(); 256 lock_.AssertAcquired();
232 // Enqueue all delayed tasks that should be running now. 257 // Enqueue all delayed tasks that should be running now.
233 while (!delayed_task_queue_.empty() && 258 while (!delayed_task_queue_.empty() &&
234 delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) { 259 delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) {
235 in_flight_kick_delayed_tasks_.erase( 260 in_flight_kick_delayed_tasks_.erase(
236 delayed_task_queue_.top().delayed_run_time); 261 delayed_task_queue_.top().delayed_run_time);
237 EnqueueTaskLocked(delayed_task_queue_.top()); 262 EnqueueTaskLocked(delayed_task_queue_.top());
238 delayed_task_queue_.pop(); 263 delayed_task_queue_.pop();
239 } 264 }
240 TraceQueueSize(true); 265 TraceQueueSize(true);
241 ScheduleDelayedWorkLocked(lazy_now); 266 ScheduleDelayedWorkLocked(lazy_now);
242 } 267 }
243 268
244 void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { 269 void TaskQueueImpl::ScheduleDelayedWorkLocked(LazyNow* lazy_now) {
245 lock_.AssertAcquired(); 270 lock_.AssertAcquired();
246 // Any remaining tasks are in the future, so queue a task to kick them. 271 // Any remaining tasks are in the future, so queue a task to kick them.
247 if (!delayed_task_queue_.empty()) { 272 if (!delayed_task_queue_.empty()) {
248 base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time; 273 base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time;
249 DCHECK_GT(next_run_time, lazy_now->Now()); 274 DCHECK_GE(next_run_time, lazy_now->Now());
250 // Make sure we don't have more than one 275 // Make sure we don't have more than one
251 // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled 276 // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
252 // run time (note it's fine to have multiple ones in flight for distinct 277 // run time (note it's fine to have multiple ones in flight for distinct
253 // run times). 278 // run times).
254 if (in_flight_kick_delayed_tasks_.find(next_run_time) == 279 if (in_flight_kick_delayed_tasks_.find(next_run_time) ==
255 in_flight_kick_delayed_tasks_.end()) { 280 in_flight_kick_delayed_tasks_.end()) {
256 in_flight_kick_delayed_tasks_.insert(next_run_time); 281 in_flight_kick_delayed_tasks_.insert(next_run_time);
257 base::TimeDelta delay = next_run_time - lazy_now->Now(); 282 base::TimeDelta delay = next_run_time - lazy_now->Now();
258 task_queue_manager_->PostDelayedTask( 283 task_queue_manager_->PostDelayedTask(
259 FROM_HERE, 284 FROM_HERE,
260 Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay); 285 Bind(&TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue, this),
286 delay);
261 } 287 }
262 } 288 }
263 } 289 }
264 290
265 TaskQueueManager::QueueState TaskQueue::GetQueueState() const { 291 TaskQueueManager::QueueState TaskQueueImpl::GetQueueState() const {
266 DCHECK(main_thread_checker_.CalledOnValidThread()); 292 DCHECK(main_thread_checker_.CalledOnValidThread());
267 if (!work_queue_.empty()) 293 if (!work_queue_.empty())
268 return TaskQueueManager::QueueState::HAS_WORK; 294 return TaskQueueManager::QueueState::HAS_WORK;
269 295
270 { 296 {
271 base::AutoLock lock(lock_); 297 base::AutoLock lock(lock_);
272 if (incoming_queue_.empty()) { 298 if (incoming_queue_.empty()) {
273 return TaskQueueManager::QueueState::EMPTY; 299 return TaskQueueManager::QueueState::EMPTY;
274 } else { 300 } else {
275 return TaskQueueManager::QueueState::NEEDS_PUMPING; 301 return TaskQueueManager::QueueState::NEEDS_PUMPING;
276 } 302 }
277 } 303 }
278 } 304 }
279 305
280 bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { 306 bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) {
281 lock_.AssertAcquired(); 307 lock_.AssertAcquired();
282 // A null task is passed when UpdateQueue is called before any task is run. 308 // A null task is passed when UpdateQueue is called before any task is run.
283 // In this case we don't want to pump an after_wakeup queue, so return true 309 // In this case we don't want to pump an after_wakeup queue, so return true
284 // here. 310 // here.
285 if (!task) 311 if (!task)
286 return true; 312 return true;
287 313
288 // Return false if there are no task in the incoming queue. 314 // Return false if there are no task in the incoming queue.
289 if (incoming_queue_.empty()) 315 if (incoming_queue_.empty())
290 return false; 316 return false;
291 317
292 base::PendingTask oldest_queued_task = incoming_queue_.front(); 318 base::PendingTask oldest_queued_task = incoming_queue_.front();
293 DCHECK(oldest_queued_task.delayed_run_time.is_null()); 319 DCHECK(oldest_queued_task.delayed_run_time.is_null());
294 DCHECK(task->delayed_run_time.is_null()); 320 DCHECK(task->delayed_run_time.is_null());
295 321
296 // Note: the comparison is correct due to the fact that the PendingTask 322 // Note: the comparison is correct due to the fact that the PendingTask
297 // operator inverts its comparison operation in order to work well in a heap 323 // operator inverts its comparison operation in order to work well in a heap
298 // based priority queue. 324 // based priority queue.
299 return oldest_queued_task < *task; 325 return oldest_queued_task < *task;
300 } 326 }
301 327
302 bool TaskQueue::ShouldAutoPumpQueueLocked( 328 bool TaskQueueImpl::ShouldAutoPumpQueueLocked(
303 bool should_trigger_wakeup, 329 bool should_trigger_wakeup,
304 const base::PendingTask* previous_task) { 330 const base::PendingTask* previous_task) {
305 lock_.AssertAcquired(); 331 lock_.AssertAcquired();
306 if (pump_policy_ == TaskQueueManager::PumpPolicy::MANUAL) 332 if (pump_policy_ == TaskQueueManager::PumpPolicy::MANUAL)
307 return false; 333 return false;
308 if (pump_policy_ == TaskQueueManager::PumpPolicy::AFTER_WAKEUP && 334 if (pump_policy_ == TaskQueueManager::PumpPolicy::AFTER_WAKEUP &&
309 (!should_trigger_wakeup || TaskIsOlderThanQueuedTasks(previous_task))) 335 (!should_trigger_wakeup || TaskIsOlderThanQueuedTasks(previous_task)))
310 return false; 336 return false;
311 if (incoming_queue_.empty()) 337 if (incoming_queue_.empty())
312 return false; 338 return false;
313 return true; 339 return true;
314 } 340 }
315 341
316 bool TaskQueue::NextPendingDelayedTaskRunTime( 342 bool TaskQueueImpl::NextPendingDelayedTaskRunTime(
317 base::TimeTicks* next_pending_delayed_task) { 343 base::TimeTicks* next_pending_delayed_task) {
318 base::AutoLock lock(lock_); 344 base::AutoLock lock(lock_);
319 if (delayed_task_queue_.empty()) 345 if (delayed_task_queue_.empty())
320 return false; 346 return false;
321 *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time; 347 *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time;
322 return true; 348 return true;
323 } 349 }
324 350
325 bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, 351 bool TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now,
326 bool should_trigger_wakeup, 352 bool should_trigger_wakeup,
327 const base::PendingTask* previous_task) { 353 const base::PendingTask* previous_task) {
328 if (!work_queue_.empty()) 354 if (!work_queue_.empty())
329 return true; 355 return true;
330 356
331 { 357 {
332 base::AutoLock lock(lock_); 358 base::AutoLock lock(lock_);
333 if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup, previous_task)) 359 if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup, previous_task))
334 return false; 360 return false;
335 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now); 361 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
336 work_queue_.Swap(&incoming_queue_); 362 work_queue_.Swap(&incoming_queue_);
337 TraceQueueSize(true); 363 TraceQueueSize(true);
338 return true; 364 return true;
339 } 365 }
340 } 366 }
341 367
342 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { 368 base::PendingTask TaskQueueImpl::TakeTaskFromWorkQueue() {
343 base::PendingTask pending_task = work_queue_.front(); 369 base::PendingTask pending_task = work_queue_.front();
344 work_queue_.pop(); 370 work_queue_.pop();
345 TraceQueueSize(false); 371 TraceQueueSize(false);
346 return pending_task; 372 return pending_task;
347 } 373 }
348 374
349 void TaskQueue::TraceQueueSize(bool is_locked) const { 375 void TaskQueueImpl::TraceQueueSize(bool is_locked) const {
350 bool is_tracing; 376 bool is_tracing;
351 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, 377 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_,
352 &is_tracing); 378 &is_tracing);
353 if (!is_tracing || !name_) 379 if (!is_tracing || !name_)
354 return; 380 return;
355 if (!is_locked) 381 if (!is_locked)
356 lock_.Acquire(); 382 lock_.Acquire();
357 else 383 else
358 lock_.AssertAcquired(); 384 lock_.AssertAcquired();
359 TRACE_COUNTER1( 385 TRACE_COUNTER1(
360 disabled_by_default_tracing_category_, name_, 386 disabled_by_default_tracing_category_, name_,
361 incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size()); 387 incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size());
362 if (!is_locked) 388 if (!is_locked)
363 lock_.Release(); 389 lock_.Release();
364 } 390 }
365 391
366 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { 392 void TaskQueueImpl::EnqueueTaskLocked(const base::PendingTask& pending_task) {
367 lock_.AssertAcquired(); 393 lock_.AssertAcquired();
368 if (!task_queue_manager_) 394 if (!task_queue_manager_)
369 return; 395 return;
370 if (pump_policy_ == TaskQueueManager::PumpPolicy::AUTO && 396 if (pump_policy_ == TaskQueueManager::PumpPolicy::AUTO &&
371 incoming_queue_.empty()) 397 incoming_queue_.empty())
372 task_queue_manager_->MaybePostDoWorkOnMainRunner(); 398 task_queue_manager_->MaybePostDoWorkOnMainRunner();
373 incoming_queue_.push(pending_task); 399 incoming_queue_.push(pending_task);
374 incoming_queue_.back().sequence_num = 400 incoming_queue_.back().sequence_num =
375 task_queue_manager_->GetNextSequenceNumber(); 401 task_queue_manager_->GetNextSequenceNumber();
376 402
377 if (!pending_task.delayed_run_time.is_null()) { 403 if (!pending_task.delayed_run_time.is_null()) {
378 // Clear the delayed run time because we've already applied the delay 404 // Clear the delayed run time because we've already applied the delay
379 // before getting here. 405 // before getting here.
380 incoming_queue_.back().delayed_run_time = base::TimeTicks(); 406 incoming_queue_.back().delayed_run_time = base::TimeTicks();
381 } 407 }
382 TraceQueueSize(true); 408 TraceQueueSize(true);
383 } 409 }
384 410
385 void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { 411 void TaskQueueImpl::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
386 base::AutoLock lock(lock_); 412 base::AutoLock lock(lock_);
387 if (pump_policy == TaskQueueManager::PumpPolicy::AUTO && 413 if (pump_policy == TaskQueueManager::PumpPolicy::AUTO &&
388 pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) { 414 pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) {
389 PumpQueueLocked(); 415 PumpQueueLocked();
390 } 416 }
391 pump_policy_ = pump_policy; 417 pump_policy_ = pump_policy;
392 } 418 }
393 419
394 void TaskQueue::PumpQueueLocked() { 420 void TaskQueueImpl::PumpQueueLocked() {
395 lock_.AssertAcquired(); 421 lock_.AssertAcquired();
396 if (task_queue_manager_) { 422 if (task_queue_manager_) {
397 LazyNow lazy_now(task_queue_manager_); 423 LazyNow lazy_now(task_queue_manager_);
398 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); 424 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
399 } 425 }
400 while (!incoming_queue_.empty()) { 426 while (!incoming_queue_.empty()) {
401 work_queue_.push(incoming_queue_.front()); 427 work_queue_.push(incoming_queue_.front());
402 incoming_queue_.pop(); 428 incoming_queue_.pop();
403 } 429 }
404 if (!work_queue_.empty()) 430 if (!work_queue_.empty())
405 task_queue_manager_->MaybePostDoWorkOnMainRunner(); 431 task_queue_manager_->MaybePostDoWorkOnMainRunner();
406 } 432 }
407 433
408 void TaskQueue::PumpQueue() { 434 void TaskQueueImpl::PumpQueue() {
409 base::AutoLock lock(lock_); 435 base::AutoLock lock(lock_);
410 PumpQueueLocked(); 436 PumpQueueLocked();
411 } 437 }
412 438
413 void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { 439 void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const {
414 base::AutoLock lock(lock_); 440 base::AutoLock lock(lock_);
415 state->BeginDictionary(); 441 state->BeginDictionary();
416 if (name_) 442 if (name_)
417 state->SetString("name", name_); 443 state->SetString("name", name_);
418 state->SetString("pump_policy", 444 state->SetString("pump_policy",
419 TaskQueueManager::PumpPolicyToString(pump_policy_)); 445 TaskQueueManager::PumpPolicyToString(pump_policy_));
420 state->SetString("wakeup_policy", 446 state->SetString("wakeup_policy",
421 TaskQueueManager::WakeupPolicyToString(wakeup_policy_)); 447 TaskQueueManager::WakeupPolicyToString(wakeup_policy_));
422 bool verbose_tracing_enabled = false; 448 bool verbose_tracing_enabled = false;
423 TRACE_EVENT_CATEGORY_GROUP_ENABLED( 449 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
424 disabled_by_default_verbose_tracing_category_, &verbose_tracing_enabled); 450 disabled_by_default_verbose_tracing_category_, &verbose_tracing_enabled);
425 state->SetInteger("incoming_queue_size", incoming_queue_.size()); 451 state->SetInteger("incoming_queue_size", incoming_queue_.size());
426 state->SetInteger("work_queue_size", work_queue_.size()); 452 state->SetInteger("work_queue_size", work_queue_.size());
427 state->SetInteger("delayed_task_queue_size", delayed_task_queue_.size()); 453 state->SetInteger("delayed_task_queue_size", delayed_task_queue_.size());
428 if (verbose_tracing_enabled) { 454 if (verbose_tracing_enabled) {
429 state->BeginArray("incoming_queue"); 455 state->BeginArray("incoming_queue");
430 QueueAsValueInto(incoming_queue_, state); 456 QueueAsValueInto(incoming_queue_, state);
431 state->EndArray(); 457 state->EndArray();
432 state->BeginArray("work_queue"); 458 state->BeginArray("work_queue");
433 QueueAsValueInto(work_queue_, state); 459 QueueAsValueInto(work_queue_, state);
434 state->EndArray(); 460 state->EndArray();
435 state->BeginArray("delayed_task_queue"); 461 state->BeginArray("delayed_task_queue");
436 QueueAsValueInto(delayed_task_queue_, state); 462 QueueAsValueInto(delayed_task_queue_, state);
437 state->EndArray(); 463 state->EndArray();
438 } 464 }
439 state->EndDictionary(); 465 state->EndDictionary();
440 } 466 }
441 467
442 // static 468 // static
443 void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, 469 void TaskQueueImpl::QueueAsValueInto(const base::TaskQueue& queue,
444 base::trace_event::TracedValue* state) { 470 base::trace_event::TracedValue* state) {
445 base::TaskQueue queue_copy(queue); 471 base::TaskQueue queue_copy(queue);
446 while (!queue_copy.empty()) { 472 while (!queue_copy.empty()) {
447 TaskAsValueInto(queue_copy.front(), state); 473 TaskAsValueInto(queue_copy.front(), state);
448 queue_copy.pop(); 474 queue_copy.pop();
449 } 475 }
450 } 476 }
451 477
452 // static 478 // static
453 void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, 479 void TaskQueueImpl::QueueAsValueInto(const base::DelayedTaskQueue& queue,
454 base::trace_event::TracedValue* state) { 480 base::trace_event::TracedValue* state) {
455 base::DelayedTaskQueue queue_copy(queue); 481 base::DelayedTaskQueue queue_copy(queue);
456 while (!queue_copy.empty()) { 482 while (!queue_copy.empty()) {
457 TaskAsValueInto(queue_copy.top(), state); 483 TaskAsValueInto(queue_copy.top(), state);
458 queue_copy.pop(); 484 queue_copy.pop();
459 } 485 }
460 } 486 }
461 487
462 // static 488 // static
463 void TaskQueue::TaskAsValueInto(const base::PendingTask& task, 489 void TaskQueueImpl::TaskAsValueInto(const base::PendingTask& task,
464 base::trace_event::TracedValue* state) { 490 base::trace_event::TracedValue* state) {
465 state->BeginDictionary(); 491 state->BeginDictionary();
466 state->SetString("posted_from", task.posted_from.ToString()); 492 state->SetString("posted_from", task.posted_from.ToString());
467 state->SetInteger("sequence_num", task.sequence_num); 493 state->SetInteger("sequence_num", task.sequence_num);
468 state->SetBoolean("nestable", task.nestable); 494 state->SetBoolean("nestable", task.nestable);
469 state->SetBoolean("is_high_res", task.is_high_res); 495 state->SetBoolean("is_high_res", task.is_high_res);
470 state->SetDouble( 496 state->SetDouble(
471 "delayed_run_time", 497 "delayed_run_time",
472 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L); 498 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L);
473 state->EndDictionary(); 499 state->EndDictionary();
474 } 500 }
(...skipping 16 matching lines...) Expand all
491 disabled_by_default_tracing_category), 517 disabled_by_default_tracing_category),
492 deletion_sentinel_(new DeletionSentinel()), 518 deletion_sentinel_(new DeletionSentinel()),
493 weak_factory_(this) { 519 weak_factory_(this) {
494 DCHECK(main_task_runner->RunsTasksOnCurrentThread()); 520 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
495 DCHECK_LE(task_queue_count, sizeof(task_was_run_bitmap_) * CHAR_BIT) 521 DCHECK_LE(task_queue_count, sizeof(task_was_run_bitmap_) * CHAR_BIT)
496 << "You need a bigger int for task_was_run_bitmap_"; 522 << "You need a bigger int for task_was_run_bitmap_";
497 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category, 523 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category,
498 "TaskQueueManager", this); 524 "TaskQueueManager", this);
499 525
500 for (size_t i = 0; i < task_queue_count; i++) { 526 for (size_t i = 0; i < task_queue_count; i++) {
501 scoped_refptr<internal::TaskQueue> queue(make_scoped_refptr( 527 scoped_refptr<internal::TaskQueueImpl> queue(
502 new internal::TaskQueue(this, 528 make_scoped_refptr(new internal::TaskQueueImpl(
503 disabled_by_default_tracing_category, 529 this, disabled_by_default_tracing_category,
504 disabled_by_default_verbose_tracing_category))); 530 disabled_by_default_verbose_tracing_category)));
505 queues_.push_back(queue); 531 queues_.push_back(queue);
506 } 532 }
507 533
508 std::vector<const base::TaskQueue*> work_queues; 534 std::vector<const base::TaskQueue*> work_queues;
509 for (const auto& queue : queues_) 535 for (const auto& queue : queues_)
510 work_queues.push_back(&queue->work_queue()); 536 work_queues.push_back(&queue->work_queue());
511 selector_->RegisterWorkQueues(work_queues); 537 selector_->RegisterWorkQueues(work_queues);
512 selector_->SetTaskQueueSelectorObserver(this); 538 selector_->SetTaskQueueSelectorObserver(this);
513 539
514 do_work_from_main_thread_closure_ = 540 do_work_from_main_thread_closure_ =
515 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), true); 541 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), true);
516 do_work_from_other_thread_closure_ = 542 do_work_from_other_thread_closure_ =
517 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), false); 543 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), false);
518 } 544 }
519 545
520 TaskQueueManager::~TaskQueueManager() { 546 TaskQueueManager::~TaskQueueManager() {
521 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_, 547 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_,
522 "TaskQueueManager", this); 548 "TaskQueueManager", this);
523 for (auto& queue : queues_) 549 for (auto& queue : queues_)
524 queue->WillDeleteTaskQueueManager(); 550 queue->WillDeleteTaskQueueManager();
525 selector_->SetTaskQueueSelectorObserver(nullptr); 551 selector_->SetTaskQueueSelectorObserver(nullptr);
526 } 552 }
527 553
528 internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const { 554 internal::TaskQueueImpl* TaskQueueManager::Queue(size_t queue_index) const {
529 DCHECK_LT(queue_index, queues_.size()); 555 DCHECK_LT(queue_index, queues_.size());
530 return queues_[queue_index].get(); 556 return queues_[queue_index].get();
531 } 557 }
532 558
533 scoped_refptr<base::SingleThreadTaskRunner> 559 scoped_refptr<TaskQueue> TaskQueueManager::TaskRunnerForQueue(
534 TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const { 560 size_t queue_index) const {
535 return Queue(queue_index); 561 return Queue(queue_index);
536 } 562 }
537 563
538 bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const { 564 bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const {
539 return Queue(queue_index)->GetQueueState() == QueueState::EMPTY; 565 return Queue(queue_index)->GetQueueState() == QueueState::EMPTY;
540 } 566 }
541 567
542 TaskQueueManager::QueueState TaskQueueManager::GetQueueState(size_t queue_index) 568 TaskQueueManager::QueueState TaskQueueManager::GetQueueState(size_t queue_index)
543 const { 569 const {
544 return Queue(queue_index)->GetQueueState(); 570 return Queue(queue_index)->GetQueueState();
(...skipping 18 matching lines...) Expand all
563 return base::TimeTicks(); 589 return base::TimeTicks();
564 590
565 DCHECK_NE(next_pending_delayed_task, 591 DCHECK_NE(next_pending_delayed_task,
566 base::TimeTicks::FromInternalValue(kMaxTimeTicks)); 592 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
567 return next_pending_delayed_task; 593 return next_pending_delayed_task;
568 } 594 }
569 595
570 void TaskQueueManager::SetPumpPolicy(size_t queue_index, 596 void TaskQueueManager::SetPumpPolicy(size_t queue_index,
571 PumpPolicy pump_policy) { 597 PumpPolicy pump_policy) {
572 DCHECK(main_thread_checker_.CalledOnValidThread()); 598 DCHECK(main_thread_checker_.CalledOnValidThread());
573 internal::TaskQueue* queue = Queue(queue_index); 599 internal::TaskQueueImpl* queue = Queue(queue_index);
574 queue->SetPumpPolicy(pump_policy); 600 queue->SetPumpPolicy(pump_policy);
575 } 601 }
576 602
577 void TaskQueueManager::SetWakeupPolicy(size_t queue_index, 603 void TaskQueueManager::SetWakeupPolicy(size_t queue_index,
578 WakeupPolicy wakeup_policy) { 604 WakeupPolicy wakeup_policy) {
579 DCHECK(main_thread_checker_.CalledOnValidThread()); 605 DCHECK(main_thread_checker_.CalledOnValidThread());
580 internal::TaskQueue* queue = Queue(queue_index); 606 internal::TaskQueueImpl* queue = Queue(queue_index);
581 queue->set_wakeup_policy(wakeup_policy); 607 queue->set_wakeup_policy(wakeup_policy);
582 } 608 }
583 609
584 void TaskQueueManager::PumpQueue(size_t queue_index) { 610 void TaskQueueManager::PumpQueue(size_t queue_index) {
585 DCHECK(main_thread_checker_.CalledOnValidThread()); 611 DCHECK(main_thread_checker_.CalledOnValidThread());
586 internal::TaskQueue* queue = Queue(queue_index); 612 internal::TaskQueueImpl* queue = Queue(queue_index);
587 queue->PumpQueue(); 613 queue->PumpQueue();
588 } 614 }
589 615
590 bool TaskQueueManager::UpdateWorkQueues( 616 bool TaskQueueManager::UpdateWorkQueues(
591 bool should_trigger_wakeup, 617 bool should_trigger_wakeup,
592 const base::PendingTask* previous_task) { 618 const base::PendingTask* previous_task) {
593 // TODO(skyostil): This is not efficient when the number of queues grows very 619 // TODO(skyostil): This is not efficient when the number of queues grows very
594 // large due to the number of locks taken. Consider optimizing when we get 620 // large due to the number of locks taken. Consider optimizing when we get
595 // there. 621 // there.
596 DCHECK(main_thread_checker_.CalledOnValidThread()); 622 DCHECK(main_thread_checker_.CalledOnValidThread());
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
666 void TaskQueueManager::DidQueueTask(const base::PendingTask& pending_task) { 692 void TaskQueueManager::DidQueueTask(const base::PendingTask& pending_task) {
667 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task); 693 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task);
668 } 694 }
669 695
670 bool TaskQueueManager::ProcessTaskFromWorkQueue( 696 bool TaskQueueManager::ProcessTaskFromWorkQueue(
671 size_t queue_index, 697 size_t queue_index,
672 bool has_previous_task, 698 bool has_previous_task,
673 base::PendingTask* previous_task) { 699 base::PendingTask* previous_task) {
674 DCHECK(main_thread_checker_.CalledOnValidThread()); 700 DCHECK(main_thread_checker_.CalledOnValidThread());
675 scoped_refptr<DeletionSentinel> protect(deletion_sentinel_); 701 scoped_refptr<DeletionSentinel> protect(deletion_sentinel_);
676 internal::TaskQueue* queue = Queue(queue_index); 702 internal::TaskQueueImpl* queue = Queue(queue_index);
677 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue(); 703 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue();
678 task_was_run_bitmap_ |= UINT64_C(1) << queue_index; 704 task_was_run_bitmap_ |= UINT64_C(1) << queue_index;
679 if (!pending_task.nestable && main_task_runner_->IsNested()) { 705 if (!pending_task.nestable && main_task_runner_->IsNested()) {
680 // Defer non-nestable work to the main task runner. NOTE these tasks can be 706 // Defer non-nestable work to the main task runner. NOTE these tasks can be
681 // arbitrarily delayed so the additional delay should not be a problem. 707 // arbitrarily delayed so the additional delay should not be a problem.
682 main_task_runner_->PostNonNestableTask(pending_task.posted_from, 708 main_task_runner_->PostNonNestableTask(pending_task.posted_from,
683 pending_task.task); 709 pending_task.task);
684 } else { 710 } else {
685 // Suppress "will" task observer notifications for the first and "did" 711 // Suppress "will" task observer notifications for the first and "did"
686 // notifications for the last task in the batch to avoid duplicate 712 // notifications for the last task in the batch to avoid duplicate
(...skipping 19 matching lines...) Expand all
706 } 732 }
707 733
708 bool TaskQueueManager::RunsTasksOnCurrentThread() const { 734 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
709 return main_task_runner_->RunsTasksOnCurrentThread(); 735 return main_task_runner_->RunsTasksOnCurrentThread();
710 } 736 }
711 737
712 bool TaskQueueManager::PostDelayedTask( 738 bool TaskQueueManager::PostDelayedTask(
713 const tracked_objects::Location& from_here, 739 const tracked_objects::Location& from_here,
714 const base::Closure& task, 740 const base::Closure& task,
715 base::TimeDelta delay) { 741 base::TimeDelta delay) {
716 DCHECK(delay > base::TimeDelta()); 742 DCHECK_GE(delay, base::TimeDelta());
717 return main_task_runner_->PostDelayedTask(from_here, task, delay); 743 return main_task_runner_->PostDelayedTask(from_here, task, delay);
718 } 744 }
719 745
720 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { 746 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
721 DCHECK(main_thread_checker_.CalledOnValidThread()); 747 DCHECK(main_thread_checker_.CalledOnValidThread());
722 internal::TaskQueue* queue = Queue(queue_index); 748 internal::TaskQueueImpl* queue = Queue(queue_index);
723 queue->set_name(name); 749 queue->set_name(name);
724 } 750 }
725 751
726 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { 752 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
727 DCHECK(main_thread_checker_.CalledOnValidThread()); 753 DCHECK(main_thread_checker_.CalledOnValidThread());
728 DCHECK_GE(work_batch_size, 1); 754 DCHECK_GE(work_batch_size, 1);
729 work_batch_size_ = work_batch_size; 755 work_batch_size_ = work_batch_size;
730 } 756 }
731 757
732 void TaskQueueManager::AddTaskObserver( 758 void TaskQueueManager::AddTaskObserver(
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
810 return nullptr; 836 return nullptr;
811 } 837 }
812 } 838 }
813 839
814 void TaskQueueManager::OnTaskQueueEnabled() { 840 void TaskQueueManager::OnTaskQueueEnabled() {
815 DCHECK(main_thread_checker_.CalledOnValidThread()); 841 DCHECK(main_thread_checker_.CalledOnValidThread());
816 MaybePostDoWorkOnMainRunner(); 842 MaybePostDoWorkOnMainRunner();
817 } 843 }
818 844
819 } // namespace scheduler 845 } // namespace scheduler
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698