Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(401)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1423773003: Add SequencedTaskRunnerHandle to get a SequencedTaskRunner for the current thread / sequence. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: export SequenceToken Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
212 212
213 // Create a process-wide unique ID to represent this task in trace events. This 213 // Create a process-wide unique ID to represent this task in trace events. This
214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
215 // with MessageLoop pointers on other processes. 215 // with MessageLoop pointers on other processes.
216 uint64 GetTaskTraceID(const SequencedTask& task, 216 uint64 GetTaskTraceID(const SequencedTask& task,
217 void* pool) { 217 void* pool) {
218 return (static_cast<uint64>(task.trace_id) << 32) | 218 return (static_cast<uint64>(task.trace_id) << 32) |
219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
220 } 220 }
221 221
222 base::LazyInstance<base::ThreadLocalPointer<
223 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
224 LAZY_INSTANCE_INITIALIZER;
225
226 } // namespace 222 } // namespace
227 223
228 // Worker --------------------------------------------------------------------- 224 // Worker ---------------------------------------------------------------------
229 225
230 class SequencedWorkerPool::Worker : public SimpleThread { 226 class SequencedWorkerPool::Worker : public SimpleThread {
231 public: 227 public:
232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 228 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
233 // around as long as we are running. 229 // around as long as we are running.
234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 230 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
235 int thread_number, 231 int thread_number,
236 const std::string& thread_name_prefix); 232 const std::string& thread_name_prefix);
237 ~Worker() override; 233 ~Worker() override;
238 234
239 // SimpleThread implementation. This actually runs the background thread. 235 // SimpleThread implementation. This actually runs the background thread.
240 void Run() override; 236 void Run() override;
241 237
238 // Gets the worker for the current thread out of thread-local storage.
239 static Worker* GetForCurrentThread();
240
242 // Indicates that a task is about to be run. The parameters provide 241 // Indicates that a task is about to be run. The parameters provide
243 // additional metainformation about the task being run. 242 // additional metainformation about the task being run.
244 void set_running_task_info(SequenceToken token, 243 void set_running_task_info(SequenceToken token,
245 WorkerShutdown shutdown_behavior) { 244 WorkerShutdown shutdown_behavior) {
246 is_processing_task_ = true; 245 is_processing_task_ = true;
247 task_sequence_token_ = token; 246 task_sequence_token_ = token;
248 task_shutdown_behavior_ = shutdown_behavior; 247 task_shutdown_behavior_ = shutdown_behavior;
249 } 248 }
250 249
251 // Indicates that the task has finished running. 250 // Indicates that the task has finished running.
252 void reset_running_task_info() { is_processing_task_ = false; } 251 void reset_running_task_info() { is_processing_task_ = false; }
253 252
254 // Whether the worker is processing a task. 253 // Whether the worker is processing a task.
255 bool is_processing_task() { return is_processing_task_; } 254 bool is_processing_task() { return is_processing_task_; }
256 255
257 SequenceToken task_sequence_token() const { 256 SequenceToken task_sequence_token() const {
258 DCHECK(is_processing_task_); 257 DCHECK(is_processing_task_);
259 return task_sequence_token_; 258 return task_sequence_token_;
260 } 259 }
261 260
262 WorkerShutdown task_shutdown_behavior() const { 261 WorkerShutdown task_shutdown_behavior() const {
263 DCHECK(is_processing_task_); 262 DCHECK(is_processing_task_);
264 return task_shutdown_behavior_; 263 return task_shutdown_behavior_;
265 } 264 }
266 265
266 scoped_refptr<SequencedWorkerPool> worker_pool() const {
267 return worker_pool_;
268 }
269
267 private: 270 private:
271 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
272 lazy_tls_ptr_;
273
268 scoped_refptr<SequencedWorkerPool> worker_pool_; 274 scoped_refptr<SequencedWorkerPool> worker_pool_;
269 // The sequence token of the task being processed. Only valid when 275 // The sequence token of the task being processed. Only valid when
270 // is_processing_task_ is true. 276 // is_processing_task_ is true.
271 SequenceToken task_sequence_token_; 277 SequenceToken task_sequence_token_;
272 // The shutdown behavior of the task being processed. Only valid when 278 // The shutdown behavior of the task being processed. Only valid when
273 // is_processing_task_ is true. 279 // is_processing_task_ is true.
274 WorkerShutdown task_shutdown_behavior_; 280 WorkerShutdown task_shutdown_behavior_;
275 // Whether the Worker is processing a task. 281 // Whether the Worker is processing a task.
276 bool is_processing_task_; 282 bool is_processing_task_;
277 283
(...skipping 223 matching lines...) Expand 10 before | Expand all | Expand 10 after
501 } 507 }
502 508
503 SequencedWorkerPool::Worker::~Worker() { 509 SequencedWorkerPool::Worker::~Worker() {
504 } 510 }
505 511
506 void SequencedWorkerPool::Worker::Run() { 512 void SequencedWorkerPool::Worker::Run() {
507 #if defined(OS_WIN) 513 #if defined(OS_WIN)
508 win::ScopedCOMInitializer com_initializer; 514 win::ScopedCOMInitializer com_initializer;
509 #endif 515 #endif
510 516
511 // Store a pointer to the running sequence in thread local storage for 517 // Store a pointer to this worker in thread local storage for static function
512 // static function access. 518 // access.
513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_); 519 DCHECK(!lazy_tls_ptr_.Get().Get());
520 lazy_tls_ptr_.Get().Set(this);
514 521
515 // Just jump back to the Inner object to run the thread, since it has all the 522 // Just jump back to the Inner object to run the thread, since it has all the
516 // tracking information and queues. It might be more natural to implement 523 // tracking information and queues. It might be more natural to implement
517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 524 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
518 // having these worker objects at all, but that method lacks the ability to 525 // having these worker objects at all, but that method lacks the ability to
519 // send thread-specific information easily to the thread loop. 526 // send thread-specific information easily to the thread loop.
520 worker_pool_->inner_->ThreadLoop(this); 527 worker_pool_->inner_->ThreadLoop(this);
521 // Release our cyclic reference once we're done. 528 // Release our cyclic reference once we're done.
522 worker_pool_ = NULL; 529 worker_pool_ = nullptr;
523 } 530 }
524 531
532 // static
533 SequencedWorkerPool::Worker*
534 SequencedWorkerPool::Worker::GetForCurrentThread() {
535 // Don't construct lazy instance on check.
536 if (lazy_tls_ptr_ == nullptr)
537 return nullptr;
538
539 return lazy_tls_ptr_.Get().Get();
540 }
541
542 // static
543 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
544 SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
545
525 // Inner definitions --------------------------------------------------------- 546 // Inner definitions ---------------------------------------------------------
526 547
527 SequencedWorkerPool::Inner::Inner( 548 SequencedWorkerPool::Inner::Inner(
528 SequencedWorkerPool* worker_pool, 549 SequencedWorkerPool* worker_pool,
529 size_t max_threads, 550 size_t max_threads,
530 const std::string& thread_name_prefix, 551 const std::string& thread_name_prefix,
531 TestingObserver* observer) 552 TestingObserver* observer)
532 : worker_pool_(worker_pool), 553 : worker_pool_(worker_pool),
533 lock_(), 554 lock_(),
534 has_work_cv_(&lock_), 555 has_work_cv_(&lock_),
(...skipping 603 matching lines...) Expand 10 before | Expand all | Expand 10 after
1138 return !thread_being_created_ && 1159 return !thread_being_created_ &&
1139 blocking_shutdown_thread_count_ == 0 && 1160 blocking_shutdown_thread_count_ == 0 &&
1140 blocking_shutdown_pending_task_count_ == 0; 1161 blocking_shutdown_pending_task_count_ == 0;
1141 } 1162 }
1142 1163
1143 base::StaticAtomicSequenceNumber 1164 base::StaticAtomicSequenceNumber
1144 SequencedWorkerPool::Inner::g_last_sequence_number_; 1165 SequencedWorkerPool::Inner::g_last_sequence_number_;
1145 1166
1146 // SequencedWorkerPool -------------------------------------------------------- 1167 // SequencedWorkerPool --------------------------------------------------------
1147 1168
1169 std::string SequencedWorkerPool::SequenceToken::ToString() const {
1170 return base::StringPrintf("[%d]", id_);
1171 }
1172
1148 // static 1173 // static
1149 SequencedWorkerPool::SequenceToken 1174 SequencedWorkerPool::SequenceToken
1150 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1151 // Don't construct lazy instance on check. 1176 Worker* worker = Worker::GetForCurrentThread();
1152 if (g_lazy_tls_ptr == NULL) 1177 if (!worker)
1153 return SequenceToken(); 1178 return SequenceToken();
1154 1179
1155 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); 1180 return worker->task_sequence_token();
1156 if (!token) 1181 }
1157 return SequenceToken(); 1182
1158 return *token; 1183 // static
1184 scoped_refptr<SequencedWorkerPool>
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1186 Worker* worker = Worker::GetForCurrentThread();
1187 if (!worker)
1188 return nullptr;
1189
1190 return worker->worker_pool();
1159 } 1191 }
1160 1192
1161 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1162 const std::string& thread_name_prefix) 1194 const std::string& thread_name_prefix)
1163 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1164 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1165 } 1197 }
1166 1198
1167 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1168 const std::string& thread_name_prefix, 1200 const std::string& thread_name_prefix,
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after
1299 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1300 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1301 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1302 } 1334 }
1303 1335
1304 bool SequencedWorkerPool::IsShutdownInProgress() { 1336 bool SequencedWorkerPool::IsShutdownInProgress() {
1305 return inner_->IsShutdownInProgress(); 1337 return inner_->IsShutdownInProgress();
1306 } 1338 }
1307 1339
1308 } // namespace base 1340 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698