Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2637843002: Migrate base::TaskRunner from Closure to OnceClosure (Closed)
Patch Set: rebase without dcheck_in_ref_count Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
86 86
87 explicit SequencedTask(const tracked_objects::Location& from_here) 87 explicit SequencedTask(const tracked_objects::Location& from_here)
88 : base::TrackingInfo(from_here, TimeTicks()), 88 : base::TrackingInfo(from_here, TimeTicks()),
89 sequence_token_id(0), 89 sequence_token_id(0),
90 trace_id(0), 90 trace_id(0),
91 sequence_task_number(0), 91 sequence_task_number(0),
92 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 92 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
93 93
94 ~SequencedTask() {} 94 ~SequencedTask() {}
95 95
96 SequencedTask(SequencedTask&&) = default;
97 SequencedTask& operator=(SequencedTask&&) = default;
98
96 int sequence_token_id; 99 int sequence_token_id;
97 int trace_id; 100 int trace_id;
98 int64_t sequence_task_number; 101 int64_t sequence_task_number;
99 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 102 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
100 tracked_objects::Location posted_from; 103 tracked_objects::Location posted_from;
101 Closure task; 104 OnceClosure task;
102 105
103 // Non-delayed tasks and delayed tasks are managed together by time-to-run 106 // Non-delayed tasks and delayed tasks are managed together by time-to-run
104 // order. We calculate the time by adding the posted time and the given delay. 107 // order. We calculate the time by adding the posted time and the given delay.
105 TimeTicks time_to_run; 108 TimeTicks time_to_run;
106 }; 109 };
107 110
108 struct SequencedTaskLessThan { 111 struct SequencedTaskLessThan {
109 public: 112 public:
110 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 113 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
111 if (lhs.time_to_run < rhs.time_to_run) 114 if (lhs.time_to_run < rhs.time_to_run)
(...skipping 21 matching lines...) Expand all
133 // 136 //
134 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 137 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
135 class SequencedWorkerPoolTaskRunner : public TaskRunner { 138 class SequencedWorkerPoolTaskRunner : public TaskRunner {
136 public: 139 public:
137 SequencedWorkerPoolTaskRunner( 140 SequencedWorkerPoolTaskRunner(
138 scoped_refptr<SequencedWorkerPool> pool, 141 scoped_refptr<SequencedWorkerPool> pool,
139 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 142 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
140 143
141 // TaskRunner implementation 144 // TaskRunner implementation
142 bool PostDelayedTask(const tracked_objects::Location& from_here, 145 bool PostDelayedTask(const tracked_objects::Location& from_here,
143 Closure task, 146 OnceClosure task,
144 TimeDelta delay) override; 147 TimeDelta delay) override;
145 bool RunsTasksOnCurrentThread() const override; 148 bool RunsTasksOnCurrentThread() const override;
146 149
147 private: 150 private:
148 ~SequencedWorkerPoolTaskRunner() override; 151 ~SequencedWorkerPoolTaskRunner() override;
149 152
150 const scoped_refptr<SequencedWorkerPool> pool_; 153 const scoped_refptr<SequencedWorkerPool> pool_;
151 154
152 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 155 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
153 156
154 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 157 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
155 }; 158 };
156 159
157 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 160 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
158 scoped_refptr<SequencedWorkerPool> pool, 161 scoped_refptr<SequencedWorkerPool> pool,
159 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 162 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
160 : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {} 163 : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {}
161 164
162 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 165 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
163 } 166 }
164 167
165 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 168 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
166 const tracked_objects::Location& from_here, 169 const tracked_objects::Location& from_here,
167 Closure task, 170 OnceClosure task,
168 TimeDelta delay) { 171 TimeDelta delay) {
169 if (delay.is_zero()) { 172 if (delay.is_zero()) {
170 return pool_->PostWorkerTaskWithShutdownBehavior(from_here, std::move(task), 173 return pool_->PostWorkerTaskWithShutdownBehavior(from_here, std::move(task),
171 shutdown_behavior_); 174 shutdown_behavior_);
172 } 175 }
173 return pool_->PostDelayedWorkerTask(from_here, std::move(task), delay); 176 return pool_->PostDelayedWorkerTask(from_here, std::move(task), delay);
174 } 177 }
175 178
176 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 179 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
177 return pool_->RunsTasksOnCurrentThread(); 180 return pool_->RunsTasksOnCurrentThread();
178 } 181 }
179 182
180 } // namespace 183 } // namespace
181 184
182 // SequencedWorkerPool::PoolSequencedTaskRunner ------------------------------ 185 // SequencedWorkerPool::PoolSequencedTaskRunner ------------------------------
183 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 186 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
184 // fixed sequence token. 187 // fixed sequence token.
185 // 188 //
186 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 189 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
187 class SequencedWorkerPool::PoolSequencedTaskRunner 190 class SequencedWorkerPool::PoolSequencedTaskRunner
188 : public SequencedTaskRunner { 191 : public SequencedTaskRunner {
189 public: 192 public:
190 PoolSequencedTaskRunner( 193 PoolSequencedTaskRunner(
191 scoped_refptr<SequencedWorkerPool> pool, 194 scoped_refptr<SequencedWorkerPool> pool,
192 SequencedWorkerPool::SequenceToken token, 195 SequencedWorkerPool::SequenceToken token,
193 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 196 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
194 197
195 // TaskRunner implementation 198 // TaskRunner implementation
196 bool PostDelayedTask(const tracked_objects::Location& from_here, 199 bool PostDelayedTask(const tracked_objects::Location& from_here,
197 Closure task, 200 OnceClosure task,
198 TimeDelta delay) override; 201 TimeDelta delay) override;
199 bool RunsTasksOnCurrentThread() const override; 202 bool RunsTasksOnCurrentThread() const override;
200 203
201 // SequencedTaskRunner implementation 204 // SequencedTaskRunner implementation
202 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 205 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
203 Closure task, 206 OnceClosure task,
204 TimeDelta delay) override; 207 TimeDelta delay) override;
205 208
206 private: 209 private:
207 ~PoolSequencedTaskRunner() override; 210 ~PoolSequencedTaskRunner() override;
208 211
209 const scoped_refptr<SequencedWorkerPool> pool_; 212 const scoped_refptr<SequencedWorkerPool> pool_;
210 213
211 const SequencedWorkerPool::SequenceToken token_; 214 const SequencedWorkerPool::SequenceToken token_;
212 215
213 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 216 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
214 217
215 DISALLOW_COPY_AND_ASSIGN(PoolSequencedTaskRunner); 218 DISALLOW_COPY_AND_ASSIGN(PoolSequencedTaskRunner);
216 }; 219 };
217 220
218 SequencedWorkerPool::PoolSequencedTaskRunner:: 221 SequencedWorkerPool::PoolSequencedTaskRunner::
219 PoolSequencedTaskRunner( 222 PoolSequencedTaskRunner(
220 scoped_refptr<SequencedWorkerPool> pool, 223 scoped_refptr<SequencedWorkerPool> pool,
221 SequencedWorkerPool::SequenceToken token, 224 SequencedWorkerPool::SequenceToken token,
222 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 225 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
223 : pool_(std::move(pool)), 226 : pool_(std::move(pool)),
224 token_(token), 227 token_(token),
225 shutdown_behavior_(shutdown_behavior) {} 228 shutdown_behavior_(shutdown_behavior) {}
226 229
227 SequencedWorkerPool::PoolSequencedTaskRunner:: 230 SequencedWorkerPool::PoolSequencedTaskRunner::
228 ~PoolSequencedTaskRunner() = default; 231 ~PoolSequencedTaskRunner() = default;
229 232
230 bool SequencedWorkerPool::PoolSequencedTaskRunner::PostDelayedTask( 233 bool SequencedWorkerPool::PoolSequencedTaskRunner::PostDelayedTask(
231 const tracked_objects::Location& from_here, 234 const tracked_objects::Location& from_here,
232 Closure task, 235 OnceClosure task,
233 TimeDelta delay) { 236 TimeDelta delay) {
234 if (delay.is_zero()) { 237 if (delay.is_zero()) {
235 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 238 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
236 token_, from_here, std::move(task), shutdown_behavior_); 239 token_, from_here, std::move(task), shutdown_behavior_);
237 } 240 }
238 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, 241 return pool_->PostDelayedSequencedWorkerTask(token_, from_here,
239 std::move(task), delay); 242 std::move(task), delay);
240 } 243 }
241 244
242 bool SequencedWorkerPool::PoolSequencedTaskRunner:: 245 bool SequencedWorkerPool::PoolSequencedTaskRunner::
243 RunsTasksOnCurrentThread() const { 246 RunsTasksOnCurrentThread() const {
244 return pool_->IsRunningSequenceOnCurrentThread(token_); 247 return pool_->IsRunningSequenceOnCurrentThread(token_);
245 } 248 }
246 249
247 bool SequencedWorkerPool::PoolSequencedTaskRunner::PostNonNestableDelayedTask( 250 bool SequencedWorkerPool::PoolSequencedTaskRunner::PostNonNestableDelayedTask(
248 const tracked_objects::Location& from_here, 251 const tracked_objects::Location& from_here,
249 Closure task, 252 OnceClosure task,
250 TimeDelta delay) { 253 TimeDelta delay) {
251 // There's no way to run nested tasks, so simply forward to 254 // There's no way to run nested tasks, so simply forward to
252 // PostDelayedTask. 255 // PostDelayedTask.
253 return PostDelayedTask(from_here, std::move(task), delay); 256 return PostDelayedTask(from_here, std::move(task), delay);
254 } 257 }
255 258
256 // Worker --------------------------------------------------------------------- 259 // Worker ---------------------------------------------------------------------
257 260
258 class SequencedWorkerPool::Worker : public SimpleThread { 261 class SequencedWorkerPool::Worker : public SimpleThread {
259 public: 262 public:
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
342 345
343 SequenceToken GetNamedSequenceToken(const std::string& name); 346 SequenceToken GetNamedSequenceToken(const std::string& name);
344 347
345 // This function accepts a name and an ID. If the name is null, the 348 // This function accepts a name and an ID. If the name is null, the
346 // token ID is used. This allows us to implement the optional name lookup 349 // token ID is used. This allows us to implement the optional name lookup
347 // from a single function without having to enter the lock a separate time. 350 // from a single function without having to enter the lock a separate time.
348 bool PostTask(const std::string* optional_token_name, 351 bool PostTask(const std::string* optional_token_name,
349 SequenceToken sequence_token, 352 SequenceToken sequence_token,
350 WorkerShutdown shutdown_behavior, 353 WorkerShutdown shutdown_behavior,
351 const tracked_objects::Location& from_here, 354 const tracked_objects::Location& from_here,
352 Closure task, 355 OnceClosure task,
353 TimeDelta delay); 356 TimeDelta delay);
354 357
355 bool RunsTasksOnCurrentThread() const; 358 bool RunsTasksOnCurrentThread() const;
356 359
357 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 360 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
358 361
359 void CleanupForTesting(); 362 void CleanupForTesting();
360 363
361 void SignalHasWorkForTesting(); 364 void SignalHasWorkForTesting();
362 365
(...skipping 24 matching lines...) Expand all
387 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that 390 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that
388 // |this_worker| has the desired task info context during ~ScheduledTask() to 391 // |this_worker| has the desired task info context during ~ScheduledTask() to
389 // allow sequence-checking. 392 // allow sequence-checking.
390 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, 393 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete,
391 Worker* this_worker); 394 Worker* this_worker);
392 395
393 // Helper used by PostTask() to complete the work when redirection is on. 396 // Helper used by PostTask() to complete the work when redirection is on.
394 // Returns true if the task may run at some point in the future and false if 397 // Returns true if the task may run at some point in the future and false if
395 // it will definitely not run. 398 // it will definitely not run.
396 // Coalesce upon resolution of http://crbug.com/622400. 399 // Coalesce upon resolution of http://crbug.com/622400.
397 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, 400 bool PostTaskToTaskScheduler(SequencedTask sequenced, const TimeDelta& delay);
398 const TimeDelta& delay);
399 401
400 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| 402 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id|
401 // and |traits|. 403 // and |traits|.
402 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( 404 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner(
403 int sequence_token_id, 405 int sequence_token_id,
404 const TaskTraits& traits); 406 const TaskTraits& traits);
405 407
406 // Called from within the lock, this converts the given token name into a 408 // Called from within the lock, this converts the given token name into a
407 // token ID, creating a new one if necessary. 409 // token ID, creating a new one if necessary.
408 int LockedGetNamedTokenID(const std::string& name); 410 int LockedGetNamedTokenID(const std::string& name);
(...skipping 277 matching lines...) Expand 10 before | Expand all | Expand 10 after
686 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 688 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
687 AutoLock lock(lock_); 689 AutoLock lock(lock_);
688 return SequenceToken(LockedGetNamedTokenID(name)); 690 return SequenceToken(LockedGetNamedTokenID(name));
689 } 691 }
690 692
691 bool SequencedWorkerPool::Inner::PostTask( 693 bool SequencedWorkerPool::Inner::PostTask(
692 const std::string* optional_token_name, 694 const std::string* optional_token_name,
693 SequenceToken sequence_token, 695 SequenceToken sequence_token,
694 WorkerShutdown shutdown_behavior, 696 WorkerShutdown shutdown_behavior,
695 const tracked_objects::Location& from_here, 697 const tracked_objects::Location& from_here,
696 Closure task, 698 OnceClosure task,
697 TimeDelta delay) { 699 TimeDelta delay) {
698 // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a 700 // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a
699 // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the 701 // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the
700 // waterfall. https://crbug.com/622400 702 // waterfall. https://crbug.com/622400
701 // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); 703 // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
702 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) 704 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
703 debug::DumpWithoutCrashing(); 705 debug::DumpWithoutCrashing();
704 706
705 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 707 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
706 SequencedTask sequenced(from_here); 708 SequencedTask sequenced(from_here);
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
745 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 747 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
746 TRACE_EVENT_FLAG_FLOW_OUT); 748 TRACE_EVENT_FLAG_FLOW_OUT);
747 749
748 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 750 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
749 751
750 // Now that we have the lock, apply the named token rules. 752 // Now that we have the lock, apply the named token rules.
751 if (optional_token_name) 753 if (optional_token_name)
752 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 754 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
753 755
754 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 756 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
755 if (!PostTaskToTaskScheduler(sequenced, delay)) 757 if (!PostTaskToTaskScheduler(std::move(sequenced), delay))
756 return false; 758 return false;
757 } else { 759 } else {
758 pending_tasks_.insert(sequenced); 760 SequencedWorkerPool::WorkerShutdown shutdown_behavior =
761 sequenced.shutdown_behavior;
762 pending_tasks_.insert(std::move(sequenced));
759 763
760 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) 764 if (shutdown_behavior == BLOCK_SHUTDOWN)
761 blocking_shutdown_pending_task_count_++; 765 blocking_shutdown_pending_task_count_++;
762 766
763 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 767 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
764 } 768 }
765 } 769 }
766 770
767 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure 771 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure
768 // correct behavior if a task is posted to a SequencedWorkerPool before 772 // correct behavior if a task is posted to a SequencedWorkerPool before
769 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. 773 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
770 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 774 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
(...skipping 16 matching lines...) Expand all
787 } else { 791 } else {
788 DCHECK(sequenced_task_runner_map_.empty()); 792 DCHECK(sequenced_task_runner_map_.empty());
789 } 793 }
790 } 794 }
791 #endif // DCHECK_IS_ON() 795 #endif // DCHECK_IS_ON()
792 796
793 return true; 797 return true;
794 } 798 }
795 799
796 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 800 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
797 const SequencedTask& sequenced, 801 SequencedTask sequenced,
798 const TimeDelta& delay) { 802 const TimeDelta& delay) {
799 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 803 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
800 804
801 lock_.AssertAcquired(); 805 lock_.AssertAcquired();
802 806
803 // Confirm that the TaskScheduler's shutdown behaviors use the same 807 // Confirm that the TaskScheduler's shutdown behaviors use the same
804 // underlying values as SequencedWorkerPool. 808 // underlying values as SequencedWorkerPool.
805 static_assert( 809 static_assert(
806 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 810 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
807 static_cast<int>(CONTINUE_ON_SHUTDOWN), 811 static_cast<int>(CONTINUE_ON_SHUTDOWN),
808 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 812 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
809 "CONTINUE_ON_SHUTDOWN."); 813 "CONTINUE_ON_SHUTDOWN.");
810 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 814 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
811 static_cast<int>(SKIP_ON_SHUTDOWN), 815 static_cast<int>(SKIP_ON_SHUTDOWN),
812 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 816 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
813 "SKIP_ON_SHUTDOWN."); 817 "SKIP_ON_SHUTDOWN.");
814 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == 818 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
815 static_cast<int>(BLOCK_SHUTDOWN), 819 static_cast<int>(BLOCK_SHUTDOWN),
816 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 820 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
817 "BLOCK_SHUTDOWN."); 821 "BLOCK_SHUTDOWN.");
818 822
819 const TaskShutdownBehavior task_shutdown_behavior = 823 const TaskShutdownBehavior task_shutdown_behavior =
820 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); 824 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
821 const TaskTraits traits = TaskTraits() 825 const TaskTraits traits = TaskTraits()
822 .MayBlock() 826 .MayBlock()
823 .WithBaseSyncPrimitives() 827 .WithBaseSyncPrimitives()
824 .WithPriority(task_priority_) 828 .WithPriority(task_priority_)
825 .WithShutdownBehavior(task_shutdown_behavior); 829 .WithShutdownBehavior(task_shutdown_behavior);
826 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) 830 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits)
827 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); 831 ->PostDelayedTask(sequenced.posted_from, std::move(sequenced.task),
832 delay);
828 } 833 }
829 834
830 scoped_refptr<TaskRunner> 835 scoped_refptr<TaskRunner>
831 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( 836 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
832 int sequence_token_id, 837 int sequence_token_id,
833 const TaskTraits& traits) { 838 const TaskTraits& traits) {
834 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 839 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
835 840
836 lock_.AssertAcquired(); 841 lock_.AssertAcquired();
837 842
(...skipping 371 matching lines...) Expand 10 before | Expand all | Expand 10 after
1209 // This may lead to starvation if there are sufficient numbers of sequences 1214 // This may lead to starvation if there are sufficient numbers of sequences
1210 // in use. To alleviate this, we could add an incrementing priority counter 1215 // in use. To alleviate this, we could add an incrementing priority counter
1211 // to each SequencedTask. Then maintain a priority_queue of all runnable 1216 // to each SequencedTask. Then maintain a priority_queue of all runnable
1212 // tasks, sorted by priority counter. When a sequenced task is completed 1217 // tasks, sorted by priority counter. When a sequenced task is completed
1213 // we would pop the head element off of that tasks pending list and add it 1218 // we would pop the head element off of that tasks pending list and add it
1214 // to the priority queue. Then we would run the first item in the priority 1219 // to the priority queue. Then we would run the first item in the priority
1215 // queue. 1220 // queue.
1216 1221
1217 GetWorkStatus status = GET_WORK_NOT_FOUND; 1222 GetWorkStatus status = GET_WORK_NOT_FOUND;
1218 int unrunnable_tasks = 0; 1223 int unrunnable_tasks = 0;
1219 PendingTaskSet::iterator i = pending_tasks_.begin(); 1224 PendingTaskSet::iterator i = pending_tasks_.begin();
gab 2017/03/29 16:57:35 This iterator is non-const, why do we even need co
tzik 2017/03/29 18:17:43 PendingTaskSet is a std::set, and its iterator is
1220 // We assume that the loop below doesn't take too long and so we can just do 1225 // We assume that the loop below doesn't take too long and so we can just do
1221 // a single call to TimeTicks::Now(). 1226 // a single call to TimeTicks::Now().
1222 const TimeTicks current_time = TimeTicks::Now(); 1227 const TimeTicks current_time = TimeTicks::Now();
1223 while (i != pending_tasks_.end()) { 1228 while (i != pending_tasks_.end()) {
1224 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 1229 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
1225 unrunnable_tasks++; 1230 unrunnable_tasks++;
1226 ++i; 1231 ++i;
1227 continue; 1232 continue;
1228 } 1233 }
1229 1234
1230 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 1235 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
1231 // We're shutting down and the task we just found isn't blocking 1236 // We're shutting down and the task we just found isn't blocking
1232 // shutdown. Delete it and get more work. 1237 // shutdown. Delete it and get more work.
1233 // 1238 //
1234 // Note that we do not want to delete unrunnable tasks. Deleting a task 1239 // Note that we do not want to delete unrunnable tasks. Deleting a task
1235 // can have side effects (like freeing some objects) and deleting a task 1240 // can have side effects (like freeing some objects) and deleting a task
1236 // that's supposed to run after one that's currently running could cause 1241 // that's supposed to run after one that's currently running could cause
1237 // an obscure crash. 1242 // an obscure crash.
1238 // 1243 //
1239 // We really want to delete these tasks outside the lock in case the 1244 // We really want to delete these tasks outside the lock in case the
1240 // closures are holding refs to objects that want to post work from their 1245 // closures are holding refs to objects that want to post work from their
1241 // destructors (which would deadlock). The closures are internally 1246 // destructors (which would deadlock). The closures are internally
1242 // refcounted, so we just need to keep a copy of them alive until the lock 1247 // refcounted, so we just need to keep a copy of them alive until the lock
1243 // is exited. The calling code can just clear() the vector they passed to 1248 // is exited. The calling code can just clear() the vector they passed to
1244 // us once the lock is exited to make this happen. 1249 // us once the lock is exited to make this happen.
1245 delete_these_outside_lock->push_back(*i); 1250 //
1251 // The const_cast here is safe since the object is erased from
1252 // |pending_tasks_| soon after the move.
1253 delete_these_outside_lock->push_back(
1254 std::move(const_cast<SequencedTask&>(*i)));
1246 pending_tasks_.erase(i++); 1255 pending_tasks_.erase(i++);
1247 continue; 1256 continue;
1248 } 1257 }
1249 1258
1250 if (i->time_to_run > current_time) { 1259 if (i->time_to_run > current_time) {
1251 // The time to run has not come yet. 1260 // The time to run has not come yet.
1252 *wait_time = i->time_to_run - current_time; 1261 *wait_time = i->time_to_run - current_time;
1253 status = GET_WORK_WAIT; 1262 status = GET_WORK_WAIT;
1254 if (cleanup_state_ == CLEANUP_RUNNING) { 1263 if (cleanup_state_ == CLEANUP_RUNNING) {
1255 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 1264 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
1256 delete_these_outside_lock->push_back(*i); 1265 // The const_cast here is safe since the object is erased from
1266 // |pending_tasks_| soon after the move.
1267 delete_these_outside_lock->push_back(
1268 std::move(const_cast<SequencedTask&>(*i)));
1257 pending_tasks_.erase(i); 1269 pending_tasks_.erase(i);
1258 } 1270 }
1259 break; 1271 break;
1260 } 1272 }
1261 1273
1262 // Found a runnable task. 1274 // Found a runnable task. The const_cast is safe here since the object is
1263 *task = *i; 1275 // erased from |pending_tasks_| soon after the move.
1276 *task = std::move(const_cast<SequencedTask&>(*i));
1264 pending_tasks_.erase(i); 1277 pending_tasks_.erase(i);
1265 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 1278 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1266 blocking_shutdown_pending_task_count_--; 1279 blocking_shutdown_pending_task_count_--;
1267 } 1280 }
1268 1281
1269 status = GET_WORK_FOUND; 1282 status = GET_WORK_FOUND;
1270 break; 1283 break;
1271 } 1284 }
1272 1285
1273 return status; 1286 return status;
(...skipping 252 matching lines...) Expand 10 before | Expand all | Expand 10 after
1526 } 1539 }
1527 1540
1528 scoped_refptr<TaskRunner> 1541 scoped_refptr<TaskRunner>
1529 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1542 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1530 WorkerShutdown shutdown_behavior) { 1543 WorkerShutdown shutdown_behavior) {
1531 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1544 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1532 } 1545 }
1533 1546
1534 bool SequencedWorkerPool::PostWorkerTask( 1547 bool SequencedWorkerPool::PostWorkerTask(
1535 const tracked_objects::Location& from_here, 1548 const tracked_objects::Location& from_here,
1536 Closure task) { 1549 OnceClosure task) {
1537 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, from_here, 1550 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, from_here,
1538 std::move(task), TimeDelta()); 1551 std::move(task), TimeDelta());
1539 } 1552 }
1540 1553
1541 bool SequencedWorkerPool::PostDelayedWorkerTask( 1554 bool SequencedWorkerPool::PostDelayedWorkerTask(
1542 const tracked_objects::Location& from_here, 1555 const tracked_objects::Location& from_here,
1543 Closure task, 1556 OnceClosure task,
1544 TimeDelta delay) { 1557 TimeDelta delay) {
1545 WorkerShutdown shutdown_behavior = 1558 WorkerShutdown shutdown_behavior =
1546 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1559 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1547 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here, 1560 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here,
1548 std::move(task), delay); 1561 std::move(task), delay);
1549 } 1562 }
1550 1563
1551 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1564 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1552 const tracked_objects::Location& from_here, 1565 const tracked_objects::Location& from_here,
1553 Closure task, 1566 OnceClosure task,
1554 WorkerShutdown shutdown_behavior) { 1567 WorkerShutdown shutdown_behavior) {
1555 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here, 1568 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here,
1556 std::move(task), TimeDelta()); 1569 std::move(task), TimeDelta());
1557 } 1570 }
1558 1571
1559 bool SequencedWorkerPool::PostSequencedWorkerTask( 1572 bool SequencedWorkerPool::PostSequencedWorkerTask(
1560 SequenceToken sequence_token, 1573 SequenceToken sequence_token,
1561 const tracked_objects::Location& from_here, 1574 const tracked_objects::Location& from_here,
1562 Closure task) { 1575 OnceClosure task) {
1563 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, from_here, 1576 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, from_here,
1564 std::move(task), TimeDelta()); 1577 std::move(task), TimeDelta());
1565 } 1578 }
1566 1579
1567 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1580 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1568 SequenceToken sequence_token, 1581 SequenceToken sequence_token,
1569 const tracked_objects::Location& from_here, 1582 const tracked_objects::Location& from_here,
1570 Closure task, 1583 OnceClosure task,
1571 TimeDelta delay) { 1584 TimeDelta delay) {
1572 WorkerShutdown shutdown_behavior = 1585 WorkerShutdown shutdown_behavior =
1573 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1586 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1574 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here, 1587 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here,
1575 std::move(task), delay); 1588 std::move(task), delay);
1576 } 1589 }
1577 1590
1578 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1591 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1579 const std::string& token_name, 1592 const std::string& token_name,
1580 const tracked_objects::Location& from_here, 1593 const tracked_objects::Location& from_here,
1581 Closure task) { 1594 OnceClosure task) {
1582 DCHECK(!token_name.empty()); 1595 DCHECK(!token_name.empty());
1583 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1596 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1584 from_here, std::move(task), TimeDelta()); 1597 from_here, std::move(task), TimeDelta());
1585 } 1598 }
1586 1599
1587 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1600 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1588 SequenceToken sequence_token, 1601 SequenceToken sequence_token,
1589 const tracked_objects::Location& from_here, 1602 const tracked_objects::Location& from_here,
1590 Closure task, 1603 OnceClosure task,
1591 WorkerShutdown shutdown_behavior) { 1604 WorkerShutdown shutdown_behavior) {
1592 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here, 1605 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here,
1593 std::move(task), TimeDelta()); 1606 std::move(task), TimeDelta());
1594 } 1607 }
1595 1608
1596 bool SequencedWorkerPool::PostDelayedTask( 1609 bool SequencedWorkerPool::PostDelayedTask(
1597 const tracked_objects::Location& from_here, 1610 const tracked_objects::Location& from_here,
1598 Closure task, 1611 OnceClosure task,
1599 TimeDelta delay) { 1612 TimeDelta delay) {
1600 return PostDelayedWorkerTask(from_here, std::move(task), delay); 1613 return PostDelayedWorkerTask(from_here, std::move(task), delay);
1601 } 1614 }
1602 1615
1603 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1616 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1604 return inner_->RunsTasksOnCurrentThread(); 1617 return inner_->RunsTasksOnCurrentThread();
1605 } 1618 }
1606 1619
1607 void SequencedWorkerPool::FlushForTesting() { 1620 void SequencedWorkerPool::FlushForTesting() {
1608 DCHECK(!RunsTasksOnCurrentThread()); 1621 DCHECK(!RunsTasksOnCurrentThread());
(...skipping 18 matching lines...) Expand all
1627 bool SequencedWorkerPool::IsShutdownInProgress() { 1640 bool SequencedWorkerPool::IsShutdownInProgress() {
1628 return inner_->IsShutdownInProgress(); 1641 return inner_->IsShutdownInProgress();
1629 } 1642 }
1630 1643
1631 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1644 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1632 SequenceToken sequence_token) const { 1645 SequenceToken sequence_token) const {
1633 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1646 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1634 } 1647 }
1635 1648
1636 } // namespace base 1649 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698