Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2122543002: Replace Closure in TaskRunner::PostTask with OneShotCallback (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@07_oneshot
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/worker_pool.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
82 sequence_task_number(0), 82 sequence_task_number(0),
83 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 83 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
84 84
85 explicit SequencedTask(const tracked_objects::Location& from_here) 85 explicit SequencedTask(const tracked_objects::Location& from_here)
86 : base::TrackingInfo(from_here, TimeTicks()), 86 : base::TrackingInfo(from_here, TimeTicks()),
87 sequence_token_id(0), 87 sequence_token_id(0),
88 trace_id(0), 88 trace_id(0),
89 sequence_task_number(0), 89 sequence_task_number(0),
90 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 90 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
91 91
92 SequencedTask(SequencedTask&&) = default;
93 SequencedTask& operator=(SequencedTask&&) = default;
94
92 ~SequencedTask() {} 95 ~SequencedTask() {}
93 96
94 int sequence_token_id; 97 int sequence_token_id;
95 int trace_id; 98 int trace_id;
96 int64_t sequence_task_number; 99 int64_t sequence_task_number;
97 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 100 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
98 tracked_objects::Location posted_from; 101 tracked_objects::Location posted_from;
99 Closure task; 102 mutable OnceClosure task;
100 103
101 // Non-delayed tasks and delayed tasks are managed together by time-to-run 104 // Non-delayed tasks and delayed tasks are managed together by time-to-run
102 // order. We calculate the time by adding the posted time and the given delay. 105 // order. We calculate the time by adding the posted time and the given delay.
103 TimeTicks time_to_run; 106 TimeTicks time_to_run;
104 }; 107 };
105 108
106 struct SequencedTaskLessThan { 109 struct SequencedTaskLessThan {
107 public: 110 public:
108 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 111 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
109 if (lhs.time_to_run < rhs.time_to_run) 112 if (lhs.time_to_run < rhs.time_to_run)
(...skipping 13 matching lines...) Expand all
123 // 126 //
124 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 127 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
125 class SequencedWorkerPoolTaskRunner : public TaskRunner { 128 class SequencedWorkerPoolTaskRunner : public TaskRunner {
126 public: 129 public:
127 SequencedWorkerPoolTaskRunner( 130 SequencedWorkerPoolTaskRunner(
128 scoped_refptr<SequencedWorkerPool> pool, 131 scoped_refptr<SequencedWorkerPool> pool,
129 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 132 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
130 133
131 // TaskRunner implementation 134 // TaskRunner implementation
132 bool PostDelayedTask(const tracked_objects::Location& from_here, 135 bool PostDelayedTask(const tracked_objects::Location& from_here,
133 const Closure& task, 136 OnceClosure task,
134 TimeDelta delay) override; 137 TimeDelta delay) override;
135 bool RunsTasksOnCurrentThread() const override; 138 bool RunsTasksOnCurrentThread() const override;
136 139
137 private: 140 private:
138 ~SequencedWorkerPoolTaskRunner() override; 141 ~SequencedWorkerPoolTaskRunner() override;
139 142
140 const scoped_refptr<SequencedWorkerPool> pool_; 143 const scoped_refptr<SequencedWorkerPool> pool_;
141 144
142 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 145 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
143 146
144 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 147 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
145 }; 148 };
146 149
147 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 150 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
148 scoped_refptr<SequencedWorkerPool> pool, 151 scoped_refptr<SequencedWorkerPool> pool,
149 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 152 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
150 : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {} 153 : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {}
151 154
152 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 155 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
153 } 156 }
154 157
155 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 158 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
156 const tracked_objects::Location& from_here, 159 const tracked_objects::Location& from_here,
157 const Closure& task, 160 OnceClosure task,
158 TimeDelta delay) { 161 TimeDelta delay) {
159 if (delay.is_zero()) { 162 if (delay.is_zero()) {
160 return pool_->PostWorkerTaskWithShutdownBehavior( 163 return pool_->PostWorkerTaskWithShutdownBehavior(from_here, std::move(task),
161 from_here, task, shutdown_behavior_); 164 shutdown_behavior_);
162 } 165 }
163 return pool_->PostDelayedWorkerTask(from_here, task, delay); 166 return pool_->PostDelayedWorkerTask(from_here, std::move(task), delay);
164 } 167 }
165 168
166 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 169 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
167 return pool_->RunsTasksOnCurrentThread(); 170 return pool_->RunsTasksOnCurrentThread();
168 } 171 }
169 172
170 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 173 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
171 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 174 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
172 // fixed sequence token. 175 // fixed sequence token.
173 // 176 //
174 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 177 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
175 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 178 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
176 public: 179 public:
177 SequencedWorkerPoolSequencedTaskRunner( 180 SequencedWorkerPoolSequencedTaskRunner(
178 scoped_refptr<SequencedWorkerPool> pool, 181 scoped_refptr<SequencedWorkerPool> pool,
179 SequencedWorkerPool::SequenceToken token, 182 SequencedWorkerPool::SequenceToken token,
180 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 183 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
181 184
182 // TaskRunner implementation 185 // TaskRunner implementation
183 bool PostDelayedTask(const tracked_objects::Location& from_here, 186 bool PostDelayedTask(const tracked_objects::Location& from_here,
184 const Closure& task, 187 OnceClosure task,
185 TimeDelta delay) override; 188 TimeDelta delay) override;
186 bool RunsTasksOnCurrentThread() const override; 189 bool RunsTasksOnCurrentThread() const override;
187 190
188 // SequencedTaskRunner implementation 191 // SequencedTaskRunner implementation
189 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 192 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
190 const Closure& task, 193 OnceClosure task,
191 TimeDelta delay) override; 194 TimeDelta delay) override;
192 195
193 private: 196 private:
194 ~SequencedWorkerPoolSequencedTaskRunner() override; 197 ~SequencedWorkerPoolSequencedTaskRunner() override;
195 198
196 const scoped_refptr<SequencedWorkerPool> pool_; 199 const scoped_refptr<SequencedWorkerPool> pool_;
197 200
198 const SequencedWorkerPool::SequenceToken token_; 201 const SequencedWorkerPool::SequenceToken token_;
199 202
200 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 203 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
201 204
202 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 205 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
203 }; 206 };
204 207
205 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 208 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
206 scoped_refptr<SequencedWorkerPool> pool, 209 scoped_refptr<SequencedWorkerPool> pool,
207 SequencedWorkerPool::SequenceToken token, 210 SequencedWorkerPool::SequenceToken token,
208 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 211 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
209 : pool_(std::move(pool)), 212 : pool_(std::move(pool)),
210 token_(token), 213 token_(token),
211 shutdown_behavior_(shutdown_behavior) {} 214 shutdown_behavior_(shutdown_behavior) {}
212 215
213 SequencedWorkerPoolSequencedTaskRunner:: 216 SequencedWorkerPoolSequencedTaskRunner::
214 ~SequencedWorkerPoolSequencedTaskRunner() { 217 ~SequencedWorkerPoolSequencedTaskRunner() {
215 } 218 }
216 219
217 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 220 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
218 const tracked_objects::Location& from_here, 221 const tracked_objects::Location& from_here,
219 const Closure& task, 222 OnceClosure task,
220 TimeDelta delay) { 223 TimeDelta delay) {
221 if (delay.is_zero()) { 224 if (delay.is_zero()) {
222 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 225 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
223 token_, from_here, task, shutdown_behavior_); 226 token_, from_here, std::move(task), shutdown_behavior_);
224 } 227 }
225 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 228 return pool_->PostDelayedSequencedWorkerTask(token_, from_here,
229 std::move(task), delay);
226 } 230 }
227 231
228 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 232 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
229 return pool_->IsRunningSequenceOnCurrentThread(token_); 233 return pool_->IsRunningSequenceOnCurrentThread(token_);
230 } 234 }
231 235
232 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 236 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
233 const tracked_objects::Location& from_here, 237 const tracked_objects::Location& from_here,
234 const Closure& task, 238 OnceClosure task,
235 TimeDelta delay) { 239 TimeDelta delay) {
236 // There's no way to run nested tasks, so simply forward to 240 // There's no way to run nested tasks, so simply forward to
237 // PostDelayedTask. 241 // PostDelayedTask.
238 return PostDelayedTask(from_here, task, delay); 242 return PostDelayedTask(from_here, std::move(task), delay);
239 } 243 }
240 244
241 // Create a process-wide unique ID to represent this task in trace events. This 245 // Create a process-wide unique ID to represent this task in trace events. This
242 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 246 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
243 // with MessageLoop pointers on other processes. 247 // with MessageLoop pointers on other processes.
244 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) { 248 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
245 return (static_cast<uint64_t>(task.trace_id) << 32) | 249 return (static_cast<uint64_t>(task.trace_id) << 32) |
246 static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool)); 250 static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
247 } 251 }
248 252
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
337 341
338 SequenceToken GetNamedSequenceToken(const std::string& name); 342 SequenceToken GetNamedSequenceToken(const std::string& name);
339 343
340 // This function accepts a name and an ID. If the name is null, the 344 // This function accepts a name and an ID. If the name is null, the
341 // token ID is used. This allows us to implement the optional name lookup 345 // token ID is used. This allows us to implement the optional name lookup
342 // from a single function without having to enter the lock a separate time. 346 // from a single function without having to enter the lock a separate time.
343 bool PostTask(const std::string* optional_token_name, 347 bool PostTask(const std::string* optional_token_name,
344 SequenceToken sequence_token, 348 SequenceToken sequence_token,
345 WorkerShutdown shutdown_behavior, 349 WorkerShutdown shutdown_behavior,
346 const tracked_objects::Location& from_here, 350 const tracked_objects::Location& from_here,
347 const Closure& task, 351 OnceClosure task,
348 TimeDelta delay); 352 TimeDelta delay);
349 353
350 bool RunsTasksOnCurrentThread() const; 354 bool RunsTasksOnCurrentThread() const;
351 355
352 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 356 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
353 357
354 void CleanupForTesting(); 358 void CleanupForTesting();
355 359
356 void SignalHasWorkForTesting(); 360 void SignalHasWorkForTesting();
357 361
(...skipping 16 matching lines...) Expand all
374 enum CleanupState { 378 enum CleanupState {
375 CLEANUP_REQUESTED, 379 CLEANUP_REQUESTED,
376 CLEANUP_STARTING, 380 CLEANUP_STARTING,
377 CLEANUP_RUNNING, 381 CLEANUP_RUNNING,
378 CLEANUP_FINISHING, 382 CLEANUP_FINISHING,
379 CLEANUP_DONE, 383 CLEANUP_DONE,
380 }; 384 };
381 385
382 // Helper used by PostTask() to complete the work when redirection is on. 386 // Helper used by PostTask() to complete the work when redirection is on.
383 // Coalesce upon resolution of http://crbug.com/622400. 387 // Coalesce upon resolution of http://crbug.com/622400.
384 void PostTaskToTaskScheduler(const SequencedTask& sequenced); 388 void PostTaskToTaskScheduler(SequencedTask sequenced);
385 389
386 // Called from within the lock, this converts the given token name into a 390 // Called from within the lock, this converts the given token name into a
387 // token ID, creating a new one if necessary. 391 // token ID, creating a new one if necessary.
388 int LockedGetNamedTokenID(const std::string& name); 392 int LockedGetNamedTokenID(const std::string& name);
389 393
390 // Called from within the lock, this returns the next sequence task number. 394 // Called from within the lock, this returns the next sequence task number.
391 int64_t LockedGetNextSequenceTaskNumber(); 395 int64_t LockedGetNextSequenceTaskNumber();
392 396
393 // Gets new task. There are 3 cases depending on the return value: 397 // Gets new task. There are 3 cases depending on the return value:
394 // 398 //
395 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 399 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
396 // be run immediately. 400 // be run immediately.
397 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 401 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
398 // and |task| is not filled in. In this case, the caller should wait until 402 // and |task| is not filled in. In this case, the caller should wait until
399 // a task is posted. 403 // a task is posted.
400 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 404 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
401 // immediately, and |task| is not filled in. Likewise, |wait_time| is 405 // immediately, and |task| is not filled in. Likewise, |wait_time| is
402 // filled in the time to wait until the next task to run. In this case, the 406 // filled in the time to wait until the next task to run. In this case, the
403 // caller should wait the time. 407 // caller should wait the time.
404 // 408 //
405 // In any case, the calling code should clear the given 409 // In any case, the calling code should clear the given
406 // delete_these_outside_lock vector the next time the lock is released. 410 // delete_these_outside_lock vector the next time the lock is released.
407 // See the implementation for a more detailed description. 411 // See the implementation for a more detailed description.
408 GetWorkStatus GetWork(SequencedTask* task, 412 GetWorkStatus GetWork(SequencedTask* task,
409 TimeDelta* wait_time, 413 TimeDelta* wait_time,
410 std::vector<Closure>* delete_these_outside_lock); 414 std::vector<OnceClosure>* delete_these_outside_lock);
411 415
412 void HandleCleanup(); 416 void HandleCleanup();
413 417
414 // Peforms init and cleanup around running the given task. WillRun... 418 // Peforms init and cleanup around running the given task. WillRun...
415 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 419 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
416 // The calling code should call FinishStartingAdditionalThread once the 420 // The calling code should call FinishStartingAdditionalThread once the
417 // lock is released if the return values is nonzero. 421 // lock is released if the return values is nonzero.
418 int WillRunWorkerTask(const SequencedTask& task); 422 int WillRunWorkerTask(const SequencedTask& task);
419 void DidRunWorkerTask(const SequencedTask& task); 423 void DidRunWorkerTask(const SequencedTask& task);
420 424
(...skipping 238 matching lines...) Expand 10 before | Expand all | Expand 10 after
659 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 663 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
660 AutoLock lock(lock_); 664 AutoLock lock(lock_);
661 return SequenceToken(LockedGetNamedTokenID(name)); 665 return SequenceToken(LockedGetNamedTokenID(name));
662 } 666 }
663 667
664 bool SequencedWorkerPool::Inner::PostTask( 668 bool SequencedWorkerPool::Inner::PostTask(
665 const std::string* optional_token_name, 669 const std::string* optional_token_name,
666 SequenceToken sequence_token, 670 SequenceToken sequence_token,
667 WorkerShutdown shutdown_behavior, 671 WorkerShutdown shutdown_behavior,
668 const tracked_objects::Location& from_here, 672 const tracked_objects::Location& from_here,
669 const Closure& task, 673 OnceClosure task,
670 TimeDelta delay) { 674 TimeDelta delay) {
671 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 675 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
672 SequencedTask sequenced(from_here); 676 SequencedTask sequenced(from_here);
673 sequenced.sequence_token_id = sequence_token.id_; 677 sequenced.sequence_token_id = sequence_token.id_;
674 sequenced.shutdown_behavior = shutdown_behavior; 678 sequenced.shutdown_behavior = shutdown_behavior;
675 sequenced.posted_from = from_here; 679 sequenced.posted_from = from_here;
676 sequenced.task = 680 sequenced.task = shutdown_behavior == BLOCK_SHUTDOWN
677 shutdown_behavior == BLOCK_SHUTDOWN ? 681 ? base::MakeCriticalClosure(std::move(task))
678 base::MakeCriticalClosure(task) : task; 682 : std::move(task);
679 sequenced.time_to_run = TimeTicks::Now() + delay; 683 sequenced.time_to_run = TimeTicks::Now() + delay;
680 684
681 int create_thread_id = 0; 685 int create_thread_id = 0;
682 { 686 {
683 AutoLock lock(lock_); 687 AutoLock lock(lock_);
684 688
685 if (shutdown_called_) { 689 if (shutdown_called_) {
686 // Don't allow a new task to be posted if it doesn't block shutdown. 690 // Don't allow a new task to be posted if it doesn't block shutdown.
687 if (shutdown_behavior != BLOCK_SHUTDOWN) 691 if (shutdown_behavior != BLOCK_SHUTDOWN)
688 return false; 692 return false;
(...skipping 23 matching lines...) Expand all
712 TRACE_EVENT_FLAG_FLOW_OUT); 716 TRACE_EVENT_FLAG_FLOW_OUT);
713 717
714 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 718 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
715 719
716 // Now that we have the lock, apply the named token rules. 720 // Now that we have the lock, apply the named token rules.
717 if (optional_token_name) 721 if (optional_token_name)
718 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 722 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
719 723
720 if (subtle::NoBarrier_Load(&g_all_pools_state) == 724 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
721 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 725 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
722 PostTaskToTaskScheduler(sequenced); 726 PostTaskToTaskScheduler(std::move(sequenced));
723 } else { 727 } else {
724 pending_tasks_.insert(sequenced); 728 SequencedWorkerPool::WorkerShutdown shutdown_behavior =
729 sequenced.shutdown_behavior;
730 pending_tasks_.insert(std::move(sequenced));
725 731
726 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) 732 if (shutdown_behavior == BLOCK_SHUTDOWN)
727 blocking_shutdown_pending_task_count_++; 733 blocking_shutdown_pending_task_count_++;
728 734
729 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 735 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
730 } 736 }
731 } 737 }
732 738
733 if (subtle::NoBarrier_Load(&g_all_pools_state) != 739 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
734 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 740 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
735 // Actually start the additional thread or signal an existing one outside 741 // Actually start the additional thread or signal an existing one outside
736 // the lock. 742 // the lock.
(...skipping 15 matching lines...) Expand all
752 } else { 758 } else {
753 DCHECK(sequenced_task_runner_map_.empty()); 759 DCHECK(sequenced_task_runner_map_.empty());
754 } 760 }
755 } 761 }
756 #endif // DCHECK_IS_ON() 762 #endif // DCHECK_IS_ON()
757 763
758 return true; 764 return true;
759 } 765 }
760 766
761 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 767 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
762 const SequencedTask& sequenced) { 768 SequencedTask sequenced) {
763 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 769 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
764 subtle::NoBarrier_Load(&g_all_pools_state)); 770 subtle::NoBarrier_Load(&g_all_pools_state));
765 771
766 lock_.AssertAcquired(); 772 lock_.AssertAcquired();
767 773
768 // Confirm that the TaskScheduler's shutdown behaviors use the same 774 // Confirm that the TaskScheduler's shutdown behaviors use the same
769 // underlying values as SequencedWorkerPool. 775 // underlying values as SequencedWorkerPool.
770 static_assert( 776 static_assert(
771 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 777 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
772 static_cast<int>(CONTINUE_ON_SHUTDOWN), 778 static_cast<int>(CONTINUE_ON_SHUTDOWN),
(...skipping 26 matching lines...) Expand all
799 const ExecutionMode execution_mode = 805 const ExecutionMode execution_mode =
800 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED 806 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
801 : ExecutionMode::SEQUENCED; 807 : ExecutionMode::SEQUENCED;
802 *sequenced_task_runner = 808 *sequenced_task_runner =
803 CreateTaskRunnerWithTraits(pool_traits, execution_mode); 809 CreateTaskRunnerWithTraits(pool_traits, execution_mode);
804 } 810 }
805 } 811 }
806 812
807 if (sequenced_task_runner) { 813 if (sequenced_task_runner) {
808 (*sequenced_task_runner) 814 (*sequenced_task_runner)
809 ->PostTask(sequenced.posted_from, sequenced.task); 815 ->PostTask(sequenced.posted_from, std::move(sequenced.task));
810 } else { 816 } else {
811 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are 817 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
812 // however a few pools that use only one thread and therefore can currently 818 // however a few pools that use only one thread and therefore can currently
813 // legitimatelly assume thread affinity despite using SequencedWorkerPool. 819 // legitimatelly assume thread affinity despite using SequencedWorkerPool.
814 // Such pools typically only give access to their TaskRunner which will be 820 // Such pools typically only give access to their TaskRunner which will be
815 // SINGLE_THREADED per nature of the pool having only one thread but this 821 // SINGLE_THREADED per nature of the pool having only one thread but this
816 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() 822 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask()
817 // directly. 823 // directly.
818 DCHECK_GT(max_threads_, 1U); 824 DCHECK_GT(max_threads_, 1U);
819 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, 825 base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
820 sequenced.task); 826 std::move(sequenced.task));
821 } 827 }
822 } 828 }
823 829
824 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 830 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
825 AutoLock lock(lock_); 831 AutoLock lock(lock_);
826 if (subtle::NoBarrier_Load(&g_all_pools_state) == 832 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
827 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 833 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
828 if (!runs_tasks_on_verifier_) { 834 if (!runs_tasks_on_verifier_) {
829 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( 835 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
830 TaskTraits().WithFileIO().WithPriority(task_priority_), 836 TaskTraits().WithFileIO().WithPriority(task_priority_),
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
947 while (true) { 953 while (true) {
948 #if defined(OS_MACOSX) 954 #if defined(OS_MACOSX)
949 base::mac::ScopedNSAutoreleasePool autorelease_pool; 955 base::mac::ScopedNSAutoreleasePool autorelease_pool;
950 #endif 956 #endif
951 957
952 HandleCleanup(); 958 HandleCleanup();
953 959
954 // See GetWork for what delete_these_outside_lock is doing. 960 // See GetWork for what delete_these_outside_lock is doing.
955 SequencedTask task; 961 SequencedTask task;
956 TimeDelta wait_time; 962 TimeDelta wait_time;
957 std::vector<Closure> delete_these_outside_lock; 963 std::vector<OnceClosure> delete_these_outside_lock;
958 GetWorkStatus status = 964 GetWorkStatus status =
959 GetWork(&task, &wait_time, &delete_these_outside_lock); 965 GetWork(&task, &wait_time, &delete_these_outside_lock);
960 if (status == GET_WORK_FOUND) { 966 if (status == GET_WORK_FOUND) {
961 TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 967 TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
962 "SequencedWorkerPool::Inner::ThreadLoop", 968 "SequencedWorkerPool::Inner::ThreadLoop",
963 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), 969 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
964 TRACE_EVENT_FLAG_FLOW_IN, 970 TRACE_EVENT_FLAG_FLOW_IN,
965 "src_file", task.posted_from.file_name(), 971 "src_file", task.posted_from.file_name(),
966 "src_func", task.posted_from.function_name()); 972 "src_func", task.posted_from.function_name());
967 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event( 973 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event(
(...skipping 10 matching lines...) Expand all
978 984
979 // Complete thread creation outside the lock if necessary. 985 // Complete thread creation outside the lock if necessary.
980 if (new_thread_id) 986 if (new_thread_id)
981 FinishStartingAdditionalThread(new_thread_id); 987 FinishStartingAdditionalThread(new_thread_id);
982 988
983 this_worker->set_running_task_info( 989 this_worker->set_running_task_info(
984 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 990 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
985 991
986 tracked_objects::TaskStopwatch stopwatch; 992 tracked_objects::TaskStopwatch stopwatch;
987 stopwatch.Start(); 993 stopwatch.Start();
988 task.task.Run(); 994 std::move(task.task).Run();
989 stopwatch.Stop(); 995 stopwatch.Stop();
990 996
991 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 997 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
992 task, stopwatch); 998 task, stopwatch);
993 999
994 // Make sure our task is erased outside the lock for the 1000 // Make sure our task is erased outside the lock for the
995 // same reason we do this with delete_these_oustide_lock. 1001 // same reason we do this with delete_these_oustide_lock.
996 // Also, do it before calling reset_running_task_info() so 1002 // Also, do it before calling reset_running_task_info() so
997 // that sequence-checking from within the task's destructor 1003 // that sequence-checking from within the task's destructor
998 // still works. 1004 // still works.
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
1130 1136
1131 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1137 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
1132 lock_.AssertAcquired(); 1138 lock_.AssertAcquired();
1133 // We assume that we never create enough tasks to wrap around. 1139 // We assume that we never create enough tasks to wrap around.
1134 return next_sequence_task_number_++; 1140 return next_sequence_task_number_++;
1135 } 1141 }
1136 1142
1137 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1143 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
1138 SequencedTask* task, 1144 SequencedTask* task,
1139 TimeDelta* wait_time, 1145 TimeDelta* wait_time,
1140 std::vector<Closure>* delete_these_outside_lock) { 1146 std::vector<OnceClosure>* delete_these_outside_lock) {
1141 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 1147 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1142 subtle::NoBarrier_Load(&g_all_pools_state)); 1148 subtle::NoBarrier_Load(&g_all_pools_state));
1143 1149
1144 lock_.AssertAcquired(); 1150 lock_.AssertAcquired();
1145 1151
1146 // Find the next task with a sequence token that's not currently in use. 1152 // Find the next task with a sequence token that's not currently in use.
1147 // If the token is in use, that means another thread is running something 1153 // If the token is in use, that means another thread is running something
1148 // in that sequence, and we can't run it without going out-of-order. 1154 // in that sequence, and we can't run it without going out-of-order.
1149 // 1155 //
1150 // This algorithm is simple and fair, but inefficient in some cases. For 1156 // This algorithm is simple and fair, but inefficient in some cases. For
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
1188 // task that's supposed to run after one that's currently running could 1194 // task that's supposed to run after one that's currently running could
1189 // cause an obscure crash. 1195 // cause an obscure crash.
1190 // 1196 //
1191 // We really want to delete these tasks outside the lock in case the 1197 // We really want to delete these tasks outside the lock in case the
1192 // closures are holding refs to objects that want to post work from 1198 // closures are holding refs to objects that want to post work from
1193 // their destructorss (which would deadlock). The closures are 1199 // their destructorss (which would deadlock). The closures are
1194 // internally refcounted, so we just need to keep a copy of them alive 1200 // internally refcounted, so we just need to keep a copy of them alive
1195 // until the lock is exited. The calling code can just clear() the 1201 // until the lock is exited. The calling code can just clear() the
1196 // vector they passed to us once the lock is exited to make this 1202 // vector they passed to us once the lock is exited to make this
1197 // happen. 1203 // happen.
1198 delete_these_outside_lock->push_back(i->task); 1204 OnceClosure cb = std::move(i->task);
1205 delete_these_outside_lock->push_back(std::move(cb));
1199 pending_tasks_.erase(i++); 1206 pending_tasks_.erase(i++);
1200 continue; 1207 continue;
1201 } 1208 }
1202 1209
1203 if (i->time_to_run > current_time) { 1210 if (i->time_to_run > current_time) {
1204 // The time to run has not come yet. 1211 // The time to run has not come yet.
1205 *wait_time = i->time_to_run - current_time; 1212 *wait_time = i->time_to_run - current_time;
1206 status = GET_WORK_WAIT; 1213 status = GET_WORK_WAIT;
1207 if (cleanup_state_ == CLEANUP_RUNNING) { 1214 if (cleanup_state_ == CLEANUP_RUNNING) {
1208 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 1215 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
1209 delete_these_outside_lock->push_back(i->task); 1216 delete_these_outside_lock->push_back(std::move(i->task));
1210 pending_tasks_.erase(i); 1217 pending_tasks_.erase(i);
1211 } 1218 }
1212 break; 1219 break;
1213 } 1220 }
1214 1221
1215 // Found a runnable task. 1222 // Found a runnable task.o
1216 *task = *i; 1223 *task = std::move(const_cast<SequencedTask&>(*i));
1217 pending_tasks_.erase(i); 1224 pending_tasks_.erase(i);
1218 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 1225 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1219 blocking_shutdown_pending_task_count_--; 1226 blocking_shutdown_pending_task_count_--;
1220 } 1227 }
1221 1228
1222 status = GET_WORK_FOUND; 1229 status = GET_WORK_FOUND;
1223 break; 1230 break;
1224 } 1231 }
1225 1232
1226 return status; 1233 return status;
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
1476 } 1483 }
1477 1484
1478 scoped_refptr<TaskRunner> 1485 scoped_refptr<TaskRunner>
1479 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1486 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1480 WorkerShutdown shutdown_behavior) { 1487 WorkerShutdown shutdown_behavior) {
1481 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1488 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1482 } 1489 }
1483 1490
1484 bool SequencedWorkerPool::PostWorkerTask( 1491 bool SequencedWorkerPool::PostWorkerTask(
1485 const tracked_objects::Location& from_here, 1492 const tracked_objects::Location& from_here,
1486 const Closure& task) { 1493 OnceClosure task) {
1487 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1494 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, from_here,
1488 from_here, task, TimeDelta()); 1495 std::move(task), TimeDelta());
1489 } 1496 }
1490 1497
1491 bool SequencedWorkerPool::PostDelayedWorkerTask( 1498 bool SequencedWorkerPool::PostDelayedWorkerTask(
1492 const tracked_objects::Location& from_here, 1499 const tracked_objects::Location& from_here,
1493 const Closure& task, 1500 OnceClosure task,
1494 TimeDelta delay) { 1501 TimeDelta delay) {
1495 WorkerShutdown shutdown_behavior = 1502 WorkerShutdown shutdown_behavior =
1496 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1503 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1497 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1504 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here,
1498 from_here, task, delay); 1505 std::move(task), delay);
1499 } 1506 }
1500 1507
1501 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1508 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1502 const tracked_objects::Location& from_here, 1509 const tracked_objects::Location& from_here,
1503 const Closure& task, 1510 OnceClosure task,
1504 WorkerShutdown shutdown_behavior) { 1511 WorkerShutdown shutdown_behavior) {
1505 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1512 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here,
1506 from_here, task, TimeDelta()); 1513 std::move(task), TimeDelta());
1507 } 1514 }
1508 1515
1509 bool SequencedWorkerPool::PostSequencedWorkerTask( 1516 bool SequencedWorkerPool::PostSequencedWorkerTask(
1510 SequenceToken sequence_token, 1517 SequenceToken sequence_token,
1511 const tracked_objects::Location& from_here, 1518 const tracked_objects::Location& from_here,
1512 const Closure& task) { 1519 OnceClosure task) {
1513 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1520 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, from_here,
1514 from_here, task, TimeDelta()); 1521 std::move(task), TimeDelta());
1515 } 1522 }
1516 1523
1517 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1524 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1518 SequenceToken sequence_token, 1525 SequenceToken sequence_token,
1519 const tracked_objects::Location& from_here, 1526 const tracked_objects::Location& from_here,
1520 const Closure& task, 1527 OnceClosure task,
1521 TimeDelta delay) { 1528 TimeDelta delay) {
1522 WorkerShutdown shutdown_behavior = 1529 WorkerShutdown shutdown_behavior =
1523 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1530 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1524 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1531 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here,
1525 from_here, task, delay); 1532 std::move(task), delay);
1526 } 1533 }
1527 1534
1528 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1535 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1529 const std::string& token_name, 1536 const std::string& token_name,
1530 const tracked_objects::Location& from_here, 1537 const tracked_objects::Location& from_here,
1531 const Closure& task) { 1538 OnceClosure task) {
1532 DCHECK(!token_name.empty()); 1539 DCHECK(!token_name.empty());
1533 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1540 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1534 from_here, task, TimeDelta()); 1541 from_here, std::move(task), TimeDelta());
1535 } 1542 }
1536 1543
1537 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1544 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1538 SequenceToken sequence_token, 1545 SequenceToken sequence_token,
1539 const tracked_objects::Location& from_here, 1546 const tracked_objects::Location& from_here,
1540 const Closure& task, 1547 OnceClosure task,
1541 WorkerShutdown shutdown_behavior) { 1548 WorkerShutdown shutdown_behavior) {
1542 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1549 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here,
1543 from_here, task, TimeDelta()); 1550 std::move(task), TimeDelta());
1544 } 1551 }
1545 1552
1546 bool SequencedWorkerPool::PostDelayedTask( 1553 bool SequencedWorkerPool::PostDelayedTask(
1547 const tracked_objects::Location& from_here, 1554 const tracked_objects::Location& from_here,
1548 const Closure& task, 1555 OnceClosure task,
1549 TimeDelta delay) { 1556 TimeDelta delay) {
1550 return PostDelayedWorkerTask(from_here, task, delay); 1557 return PostDelayedWorkerTask(from_here, std::move(task), delay);
1551 } 1558 }
1552 1559
1553 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1560 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1554 return inner_->RunsTasksOnCurrentThread(); 1561 return inner_->RunsTasksOnCurrentThread();
1555 } 1562 }
1556 1563
1557 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1564 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1558 SequenceToken sequence_token) const { 1565 SequenceToken sequence_token) const {
1559 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1566 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1560 } 1567 }
1561 1568
1562 void SequencedWorkerPool::FlushForTesting() { 1569 void SequencedWorkerPool::FlushForTesting() {
1563 inner_->CleanupForTesting(); 1570 inner_->CleanupForTesting();
1564 } 1571 }
1565 1572
1566 void SequencedWorkerPool::SignalHasWorkForTesting() { 1573 void SequencedWorkerPool::SignalHasWorkForTesting() {
1567 inner_->SignalHasWorkForTesting(); 1574 inner_->SignalHasWorkForTesting();
1568 } 1575 }
1569 1576
1570 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1577 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1571 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1578 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1572 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1579 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1573 } 1580 }
1574 1581
1575 bool SequencedWorkerPool::IsShutdownInProgress() { 1582 bool SequencedWorkerPool::IsShutdownInProgress() {
1576 return inner_->IsShutdownInProgress(); 1583 return inner_->IsShutdownInProgress();
1577 } 1584 }
1578 1585
1579 } // namespace base 1586 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/worker_pool.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698