OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
212 | 212 |
213 // Create a process-wide unique ID to represent this task in trace events. This | 213 // Create a process-wide unique ID to represent this task in trace events. This |
214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding | 214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding |
215 // with MessageLoop pointers on other processes. | 215 // with MessageLoop pointers on other processes. |
216 uint64 GetTaskTraceID(const SequencedTask& task, | 216 uint64 GetTaskTraceID(const SequencedTask& task, |
217 void* pool) { | 217 void* pool) { |
218 return (static_cast<uint64>(task.trace_id) << 32) | | 218 return (static_cast<uint64>(task.trace_id) << 32) | |
219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); | 219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); |
220 } | 220 } |
221 | 221 |
222 base::LazyInstance<base::ThreadLocalPointer< | |
223 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = | |
224 LAZY_INSTANCE_INITIALIZER; | |
225 | |
226 } // namespace | 222 } // namespace |
227 | 223 |
228 // Worker --------------------------------------------------------------------- | 224 // Worker --------------------------------------------------------------------- |
229 | 225 |
230 class SequencedWorkerPool::Worker : public SimpleThread { | 226 class SequencedWorkerPool::Worker : public SimpleThread { |
231 public: | 227 public: |
232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 228 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
233 // around as long as we are running. | 229 // around as long as we are running. |
234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 230 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
235 int thread_number, | 231 int thread_number, |
236 const std::string& thread_name_prefix); | 232 const std::string& thread_name_prefix); |
237 ~Worker() override; | 233 ~Worker() override; |
238 | 234 |
239 // SimpleThread implementation. This actually runs the background thread. | 235 // SimpleThread implementation. This actually runs the background thread. |
240 void Run() override; | 236 void Run() override; |
241 | 237 |
238 // Gets the worker for the current thread out of thread-local storage. | |
239 static Worker* GetForCurrentThread(); | |
240 | |
242 // Indicates that a task is about to be run. The parameters provide | 241 // Indicates that a task is about to be run. The parameters provide |
243 // additional metainformation about the task being run. | 242 // additional metainformation about the task being run. |
244 void set_running_task_info(SequenceToken token, | 243 void set_running_task_info(SequenceToken token, |
245 WorkerShutdown shutdown_behavior) { | 244 WorkerShutdown shutdown_behavior) { |
246 is_processing_task_ = true; | 245 is_processing_task_ = true; |
247 task_sequence_token_ = token; | 246 task_sequence_token_ = token; |
248 task_shutdown_behavior_ = shutdown_behavior; | 247 task_shutdown_behavior_ = shutdown_behavior; |
249 } | 248 } |
250 | 249 |
251 // Indicates that the task has finished running. | 250 // Indicates that the task has finished running. |
252 void reset_running_task_info() { is_processing_task_ = false; } | 251 void reset_running_task_info() { is_processing_task_ = false; } |
253 | 252 |
254 // Whether the worker is processing a task. | 253 // Whether the worker is processing a task. |
255 bool is_processing_task() { return is_processing_task_; } | 254 bool is_processing_task() { return is_processing_task_; } |
256 | 255 |
257 SequenceToken task_sequence_token() const { | 256 SequenceToken task_sequence_token() const { |
258 DCHECK(is_processing_task_); | 257 DCHECK(is_processing_task_); |
259 return task_sequence_token_; | 258 return task_sequence_token_; |
260 } | 259 } |
261 | 260 |
262 WorkerShutdown task_shutdown_behavior() const { | 261 WorkerShutdown task_shutdown_behavior() const { |
263 DCHECK(is_processing_task_); | 262 DCHECK(is_processing_task_); |
264 return task_shutdown_behavior_; | 263 return task_shutdown_behavior_; |
265 } | 264 } |
266 | 265 |
266 const scoped_refptr<SequencedWorkerPool>& worker_pool() const { | |
danakj
2015/10/27 20:03:20
just return type scoped_refptr<SWP>?
Bernhard Bauer
2015/10/28 13:36:28
Done.
| |
267 return worker_pool_; | |
268 } | |
269 | |
267 private: | 270 private: |
271 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky | |
272 g_lazy_tls_ptr; | |
danakj
2015/10/27 20:03:20
"Data members of classes, both static and non-stat
Bernhard Bauer
2015/10/28 13:36:28
Done.
| |
273 | |
268 scoped_refptr<SequencedWorkerPool> worker_pool_; | 274 scoped_refptr<SequencedWorkerPool> worker_pool_; |
269 // The sequence token of the task being processed. Only valid when | 275 // The sequence token of the task being processed. Only valid when |
270 // is_processing_task_ is true. | 276 // is_processing_task_ is true. |
271 SequenceToken task_sequence_token_; | 277 SequenceToken task_sequence_token_; |
272 // The shutdown behavior of the task being processed. Only valid when | 278 // The shutdown behavior of the task being processed. Only valid when |
273 // is_processing_task_ is true. | 279 // is_processing_task_ is true. |
274 WorkerShutdown task_shutdown_behavior_; | 280 WorkerShutdown task_shutdown_behavior_; |
275 // Whether the Worker is processing a task. | 281 // Whether the Worker is processing a task. |
276 bool is_processing_task_; | 282 bool is_processing_task_; |
277 | 283 |
(...skipping 223 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
501 } | 507 } |
502 | 508 |
503 SequencedWorkerPool::Worker::~Worker() { | 509 SequencedWorkerPool::Worker::~Worker() { |
504 } | 510 } |
505 | 511 |
506 void SequencedWorkerPool::Worker::Run() { | 512 void SequencedWorkerPool::Worker::Run() { |
507 #if defined(OS_WIN) | 513 #if defined(OS_WIN) |
508 win::ScopedCOMInitializer com_initializer; | 514 win::ScopedCOMInitializer com_initializer; |
509 #endif | 515 #endif |
510 | 516 |
511 // Store a pointer to the running sequence in thread local storage for | 517 // Store a pointer to this worker in thread local storage for static function |
512 // static function access. | 518 // access. |
513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_); | 519 DCHECK(!g_lazy_tls_ptr.Get().Get()); |
520 g_lazy_tls_ptr.Get().Set(this); | |
514 | 521 |
515 // Just jump back to the Inner object to run the thread, since it has all the | 522 // Just jump back to the Inner object to run the thread, since it has all the |
516 // tracking information and queues. It might be more natural to implement | 523 // tracking information and queues. It might be more natural to implement |
517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 524 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
518 // having these worker objects at all, but that method lacks the ability to | 525 // having these worker objects at all, but that method lacks the ability to |
519 // send thread-specific information easily to the thread loop. | 526 // send thread-specific information easily to the thread loop. |
520 worker_pool_->inner_->ThreadLoop(this); | 527 worker_pool_->inner_->ThreadLoop(this); |
521 // Release our cyclic reference once we're done. | 528 // Release our cyclic reference once we're done. |
522 worker_pool_ = NULL; | 529 worker_pool_ = nullptr; |
523 } | 530 } |
524 | 531 |
532 // static | |
533 SequencedWorkerPool::Worker* | |
534 SequencedWorkerPool::Worker::GetForCurrentThread() { | |
535 // Don't construct lazy instance on check. | |
536 if (g_lazy_tls_ptr == nullptr) | |
537 return nullptr; | |
538 | |
539 return g_lazy_tls_ptr.Get().Get(); | |
540 } | |
541 | |
542 // static | |
543 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky | |
544 SequencedWorkerPool::Worker::g_lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER; | |
545 | |
525 // Inner definitions --------------------------------------------------------- | 546 // Inner definitions --------------------------------------------------------- |
526 | 547 |
527 SequencedWorkerPool::Inner::Inner( | 548 SequencedWorkerPool::Inner::Inner( |
528 SequencedWorkerPool* worker_pool, | 549 SequencedWorkerPool* worker_pool, |
529 size_t max_threads, | 550 size_t max_threads, |
530 const std::string& thread_name_prefix, | 551 const std::string& thread_name_prefix, |
531 TestingObserver* observer) | 552 TestingObserver* observer) |
532 : worker_pool_(worker_pool), | 553 : worker_pool_(worker_pool), |
533 lock_(), | 554 lock_(), |
534 has_work_cv_(&lock_), | 555 has_work_cv_(&lock_), |
(...skipping 603 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1138 return !thread_being_created_ && | 1159 return !thread_being_created_ && |
1139 blocking_shutdown_thread_count_ == 0 && | 1160 blocking_shutdown_thread_count_ == 0 && |
1140 blocking_shutdown_pending_task_count_ == 0; | 1161 blocking_shutdown_pending_task_count_ == 0; |
1141 } | 1162 } |
1142 | 1163 |
1143 base::StaticAtomicSequenceNumber | 1164 base::StaticAtomicSequenceNumber |
1144 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1165 SequencedWorkerPool::Inner::g_last_sequence_number_; |
1145 | 1166 |
1146 // SequencedWorkerPool -------------------------------------------------------- | 1167 // SequencedWorkerPool -------------------------------------------------------- |
1147 | 1168 |
1169 std::string SequencedWorkerPool::SequenceToken::ToString() const { | |
1170 return base::StringPrintf("[%d]", id_); | |
1171 } | |
1172 | |
1148 // static | 1173 // static |
1149 SequencedWorkerPool::SequenceToken | 1174 SequencedWorkerPool::SequenceToken |
1150 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { | 1175 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { |
1151 // Don't construct lazy instance on check. | 1176 Worker* worker = Worker::GetForCurrentThread(); |
1152 if (g_lazy_tls_ptr == NULL) | 1177 if (!worker) |
1153 return SequenceToken(); | 1178 return SequenceToken(); |
1154 | 1179 |
1155 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); | 1180 return worker->task_sequence_token(); |
1156 if (!token) | 1181 } |
1157 return SequenceToken(); | 1182 |
1158 return *token; | 1183 // static |
1184 scoped_refptr<SequencedWorkerPool> | |
1185 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | |
1186 Worker* worker = Worker::GetForCurrentThread(); | |
1187 if (!worker) | |
1188 return nullptr; | |
1189 | |
1190 return worker->worker_pool(); | |
1159 } | 1191 } |
1160 | 1192 |
1161 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1193 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1162 const std::string& thread_name_prefix) | 1194 const std::string& thread_name_prefix) |
1163 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1195 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1164 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1196 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
1165 } | 1197 } |
1166 | 1198 |
1167 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1199 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1168 const std::string& thread_name_prefix, | 1200 const std::string& thread_name_prefix, |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1298 | 1330 |
1299 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1331 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1300 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1332 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
1301 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1333 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1302 } | 1334 } |
1303 | 1335 |
1304 bool SequencedWorkerPool::IsShutdownInProgress() { | 1336 bool SequencedWorkerPool::IsShutdownInProgress() { |
1305 return inner_->IsShutdownInProgress(); | 1337 return inner_->IsShutdownInProgress(); |
1306 } | 1338 } |
1307 | 1339 |
1340 std::ostream& operator<<(std::ostream& out, | |
1341 const SequencedWorkerPool::SequenceToken& token) { | |
1342 return out << token.ToString(); | |
1343 } | |
1344 | |
1308 } // namespace base | 1345 } // namespace base |
OLD | NEW |