Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <list> | 7 #include <list> |
| 8 #include <map> | 8 #include <map> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 212 | 212 |
| 213 // Create a process-wide unique ID to represent this task in trace events. This | 213 // Create a process-wide unique ID to represent this task in trace events. This |
| 214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding | 214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding |
| 215 // with MessageLoop pointers on other processes. | 215 // with MessageLoop pointers on other processes. |
| 216 uint64 GetTaskTraceID(const SequencedTask& task, | 216 uint64 GetTaskTraceID(const SequencedTask& task, |
| 217 void* pool) { | 217 void* pool) { |
| 218 return (static_cast<uint64>(task.trace_id) << 32) | | 218 return (static_cast<uint64>(task.trace_id) << 32) | |
| 219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); | 219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); |
| 220 } | 220 } |
| 221 | 221 |
| 222 base::LazyInstance<base::ThreadLocalPointer< | |
| 223 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = | |
| 224 LAZY_INSTANCE_INITIALIZER; | |
| 225 | |
| 226 } // namespace | 222 } // namespace |
| 227 | 223 |
| 228 // Worker --------------------------------------------------------------------- | 224 // Worker --------------------------------------------------------------------- |
| 229 | 225 |
| 230 class SequencedWorkerPool::Worker : public SimpleThread { | 226 class SequencedWorkerPool::Worker : public SimpleThread { |
| 231 public: | 227 public: |
| 232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 228 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
| 233 // around as long as we are running. | 229 // around as long as we are running. |
| 234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 230 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
| 235 int thread_number, | 231 int thread_number, |
| 236 const std::string& thread_name_prefix); | 232 const std::string& thread_name_prefix); |
| 237 ~Worker() override; | 233 ~Worker() override; |
| 238 | 234 |
| 239 // SimpleThread implementation. This actually runs the background thread. | 235 // SimpleThread implementation. This actually runs the background thread. |
| 240 void Run() override; | 236 void Run() override; |
| 241 | 237 |
| 238 // Gets the worker for the current thread out of thread-local storage. | |
| 239 static Worker* GetForCurrentThread(); | |
| 240 | |
| 242 // Indicates that a task is about to be run. The parameters provide | 241 // Indicates that a task is about to be run. The parameters provide |
| 243 // additional metainformation about the task being run. | 242 // additional metainformation about the task being run. |
| 244 void set_running_task_info(SequenceToken token, | 243 void set_running_task_info(SequenceToken token, |
| 245 WorkerShutdown shutdown_behavior) { | 244 WorkerShutdown shutdown_behavior) { |
| 246 is_processing_task_ = true; | 245 is_processing_task_ = true; |
| 247 task_sequence_token_ = token; | 246 task_sequence_token_ = token; |
| 248 task_shutdown_behavior_ = shutdown_behavior; | 247 task_shutdown_behavior_ = shutdown_behavior; |
| 249 } | 248 } |
| 250 | 249 |
| 251 // Indicates that the task has finished running. | 250 // Indicates that the task has finished running. |
| 252 void reset_running_task_info() { is_processing_task_ = false; } | 251 void reset_running_task_info() { is_processing_task_ = false; } |
| 253 | 252 |
| 254 // Whether the worker is processing a task. | 253 // Whether the worker is processing a task. |
| 255 bool is_processing_task() { return is_processing_task_; } | 254 bool is_processing_task() { return is_processing_task_; } |
| 256 | 255 |
| 257 SequenceToken task_sequence_token() const { | 256 SequenceToken task_sequence_token() const { |
| 258 DCHECK(is_processing_task_); | 257 DCHECK(is_processing_task_); |
| 259 return task_sequence_token_; | 258 return task_sequence_token_; |
| 260 } | 259 } |
| 261 | 260 |
| 262 WorkerShutdown task_shutdown_behavior() const { | 261 WorkerShutdown task_shutdown_behavior() const { |
| 263 DCHECK(is_processing_task_); | 262 DCHECK(is_processing_task_); |
| 264 return task_shutdown_behavior_; | 263 return task_shutdown_behavior_; |
| 265 } | 264 } |
| 266 | 265 |
| 266 const scoped_refptr<SequencedWorkerPool>& worker_pool() const { | |
|
danakj
2015/10/27 20:03:20
just return type scoped_refptr<SWP>?
Bernhard Bauer
2015/10/28 13:36:28
Done.
| |
| 267 return worker_pool_; | |
| 268 } | |
| 269 | |
| 267 private: | 270 private: |
| 271 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky | |
| 272 g_lazy_tls_ptr; | |
|
danakj
2015/10/27 20:03:20
"Data members of classes, both static and non-stat
Bernhard Bauer
2015/10/28 13:36:28
Done.
| |
| 273 | |
| 268 scoped_refptr<SequencedWorkerPool> worker_pool_; | 274 scoped_refptr<SequencedWorkerPool> worker_pool_; |
| 269 // The sequence token of the task being processed. Only valid when | 275 // The sequence token of the task being processed. Only valid when |
| 270 // is_processing_task_ is true. | 276 // is_processing_task_ is true. |
| 271 SequenceToken task_sequence_token_; | 277 SequenceToken task_sequence_token_; |
| 272 // The shutdown behavior of the task being processed. Only valid when | 278 // The shutdown behavior of the task being processed. Only valid when |
| 273 // is_processing_task_ is true. | 279 // is_processing_task_ is true. |
| 274 WorkerShutdown task_shutdown_behavior_; | 280 WorkerShutdown task_shutdown_behavior_; |
| 275 // Whether the Worker is processing a task. | 281 // Whether the Worker is processing a task. |
| 276 bool is_processing_task_; | 282 bool is_processing_task_; |
| 277 | 283 |
| (...skipping 223 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 501 } | 507 } |
| 502 | 508 |
| 503 SequencedWorkerPool::Worker::~Worker() { | 509 SequencedWorkerPool::Worker::~Worker() { |
| 504 } | 510 } |
| 505 | 511 |
| 506 void SequencedWorkerPool::Worker::Run() { | 512 void SequencedWorkerPool::Worker::Run() { |
| 507 #if defined(OS_WIN) | 513 #if defined(OS_WIN) |
| 508 win::ScopedCOMInitializer com_initializer; | 514 win::ScopedCOMInitializer com_initializer; |
| 509 #endif | 515 #endif |
| 510 | 516 |
| 511 // Store a pointer to the running sequence in thread local storage for | 517 // Store a pointer to this worker in thread local storage for static function |
| 512 // static function access. | 518 // access. |
| 513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_); | 519 DCHECK(!g_lazy_tls_ptr.Get().Get()); |
| 520 g_lazy_tls_ptr.Get().Set(this); | |
| 514 | 521 |
| 515 // Just jump back to the Inner object to run the thread, since it has all the | 522 // Just jump back to the Inner object to run the thread, since it has all the |
| 516 // tracking information and queues. It might be more natural to implement | 523 // tracking information and queues. It might be more natural to implement |
| 517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 524 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
| 518 // having these worker objects at all, but that method lacks the ability to | 525 // having these worker objects at all, but that method lacks the ability to |
| 519 // send thread-specific information easily to the thread loop. | 526 // send thread-specific information easily to the thread loop. |
| 520 worker_pool_->inner_->ThreadLoop(this); | 527 worker_pool_->inner_->ThreadLoop(this); |
| 521 // Release our cyclic reference once we're done. | 528 // Release our cyclic reference once we're done. |
| 522 worker_pool_ = NULL; | 529 worker_pool_ = nullptr; |
| 523 } | 530 } |
| 524 | 531 |
| 532 // static | |
| 533 SequencedWorkerPool::Worker* | |
| 534 SequencedWorkerPool::Worker::GetForCurrentThread() { | |
| 535 // Don't construct lazy instance on check. | |
| 536 if (g_lazy_tls_ptr == nullptr) | |
| 537 return nullptr; | |
| 538 | |
| 539 return g_lazy_tls_ptr.Get().Get(); | |
| 540 } | |
| 541 | |
| 542 // static | |
| 543 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky | |
| 544 SequencedWorkerPool::Worker::g_lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER; | |
| 545 | |
| 525 // Inner definitions --------------------------------------------------------- | 546 // Inner definitions --------------------------------------------------------- |
| 526 | 547 |
| 527 SequencedWorkerPool::Inner::Inner( | 548 SequencedWorkerPool::Inner::Inner( |
| 528 SequencedWorkerPool* worker_pool, | 549 SequencedWorkerPool* worker_pool, |
| 529 size_t max_threads, | 550 size_t max_threads, |
| 530 const std::string& thread_name_prefix, | 551 const std::string& thread_name_prefix, |
| 531 TestingObserver* observer) | 552 TestingObserver* observer) |
| 532 : worker_pool_(worker_pool), | 553 : worker_pool_(worker_pool), |
| 533 lock_(), | 554 lock_(), |
| 534 has_work_cv_(&lock_), | 555 has_work_cv_(&lock_), |
| (...skipping 603 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1138 return !thread_being_created_ && | 1159 return !thread_being_created_ && |
| 1139 blocking_shutdown_thread_count_ == 0 && | 1160 blocking_shutdown_thread_count_ == 0 && |
| 1140 blocking_shutdown_pending_task_count_ == 0; | 1161 blocking_shutdown_pending_task_count_ == 0; |
| 1141 } | 1162 } |
| 1142 | 1163 |
| 1143 base::StaticAtomicSequenceNumber | 1164 base::StaticAtomicSequenceNumber |
| 1144 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1165 SequencedWorkerPool::Inner::g_last_sequence_number_; |
| 1145 | 1166 |
| 1146 // SequencedWorkerPool -------------------------------------------------------- | 1167 // SequencedWorkerPool -------------------------------------------------------- |
| 1147 | 1168 |
| 1169 std::string SequencedWorkerPool::SequenceToken::ToString() const { | |
| 1170 return base::StringPrintf("[%d]", id_); | |
| 1171 } | |
| 1172 | |
| 1148 // static | 1173 // static |
| 1149 SequencedWorkerPool::SequenceToken | 1174 SequencedWorkerPool::SequenceToken |
| 1150 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { | 1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { |
| 1151 // Don't construct lazy instance on check. | 1176 Worker* worker = Worker::GetForCurrentThread(); |
| 1152 if (g_lazy_tls_ptr == NULL) | 1177 if (!worker) |
| 1153 return SequenceToken(); | 1178 return SequenceToken(); |
| 1154 | 1179 |
| 1155 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); | 1180 return worker->task_sequence_token(); |
| 1156 if (!token) | 1181 } |
| 1157 return SequenceToken(); | 1182 |
| 1158 return *token; | 1183 // static |
| 1184 scoped_refptr<SequencedWorkerPool> | |
| 1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | |
| 1186 Worker* worker = Worker::GetForCurrentThread(); | |
| 1187 if (!worker) | |
| 1188 return nullptr; | |
| 1189 | |
| 1190 return worker->worker_pool(); | |
| 1159 } | 1191 } |
| 1160 | 1192 |
| 1161 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1162 const std::string& thread_name_prefix) | 1194 const std::string& thread_name_prefix) |
| 1163 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1164 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
| 1165 } | 1197 } |
| 1166 | 1198 |
| 1167 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1168 const std::string& thread_name_prefix, | 1200 const std::string& thread_name_prefix, |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1298 | 1330 |
| 1299 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1300 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
| 1301 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1302 } | 1334 } |
| 1303 | 1335 |
| 1304 bool SequencedWorkerPool::IsShutdownInProgress() { | 1336 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1305 return inner_->IsShutdownInProgress(); | 1337 return inner_->IsShutdownInProgress(); |
| 1306 } | 1338 } |
| 1307 | 1339 |
| 1340 std::ostream& operator<<(std::ostream& out, | |
| 1341 const SequencedWorkerPool::SequenceToken& token) { | |
| 1342 return out << token.ToString(); | |
| 1343 } | |
| 1344 | |
| 1308 } // namespace base | 1345 } // namespace base |
| OLD | NEW |