OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
11 #include <vector> | 11 #include <vector> |
12 | 12 |
13 #include "base/atomicops.h" | 13 #include "base/atomicops.h" |
14 #include "base/callback.h" | 14 #include "base/callback.h" |
15 #include "base/compiler_specific.h" | 15 #include "base/compiler_specific.h" |
16 #include "base/critical_closure.h" | 16 #include "base/critical_closure.h" |
17 #include "base/debug/trace_event.h" | 17 #include "base/debug/trace_event.h" |
| 18 #include "base/lazy_instance.h" |
18 #include "base/logging.h" | 19 #include "base/logging.h" |
19 #include "base/memory/linked_ptr.h" | 20 #include "base/memory/linked_ptr.h" |
20 #include "base/message_loop/message_loop_proxy.h" | 21 #include "base/message_loop/message_loop_proxy.h" |
21 #include "base/metrics/histogram.h" | 22 #include "base/metrics/histogram.h" |
22 #include "base/stl_util.h" | 23 #include "base/stl_util.h" |
23 #include "base/strings/stringprintf.h" | 24 #include "base/strings/stringprintf.h" |
24 #include "base/synchronization/condition_variable.h" | 25 #include "base/synchronization/condition_variable.h" |
25 #include "base/synchronization/lock.h" | 26 #include "base/synchronization/lock.h" |
26 #include "base/threading/platform_thread.h" | 27 #include "base/threading/platform_thread.h" |
27 #include "base/threading/simple_thread.h" | 28 #include "base/threading/simple_thread.h" |
| 29 #include "base/threading/thread_local.h" |
28 #include "base/threading/thread_restrictions.h" | 30 #include "base/threading/thread_restrictions.h" |
29 #include "base/time/time.h" | 31 #include "base/time/time.h" |
30 #include "base/tracked_objects.h" | 32 #include "base/tracked_objects.h" |
31 | 33 |
32 #if defined(OS_MACOSX) | 34 #if defined(OS_MACOSX) |
33 #include "base/mac/scoped_nsautorelease_pool.h" | 35 #include "base/mac/scoped_nsautorelease_pool.h" |
34 #endif | 36 #endif |
35 | 37 |
36 namespace base { | 38 namespace base { |
37 | 39 |
38 namespace { | 40 namespace { |
39 | 41 |
| 42 base::LazyInstance<base::ThreadLocalPointer<SequencedWorkerPool> > |
| 43 lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER; |
| 44 |
40 struct SequencedTask : public TrackingInfo { | 45 struct SequencedTask : public TrackingInfo { |
41 SequencedTask() | 46 SequencedTask() |
42 : sequence_token_id(0), | 47 : sequence_token_id(0), |
43 trace_id(0), | 48 trace_id(0), |
44 sequence_task_number(0), | 49 sequence_task_number(0), |
45 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
46 | 51 |
47 explicit SequencedTask(const tracked_objects::Location& from_here) | 52 explicit SequencedTask(const tracked_objects::Location& from_here) |
48 : base::TrackingInfo(from_here, TimeTicks()), | 53 : base::TrackingInfo(from_here, TimeTicks()), |
49 sequence_token_id(0), | 54 sequence_token_id(0), |
(...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
260 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 265 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
261 const std::string& thread_name_prefix, | 266 const std::string& thread_name_prefix, |
262 TestingObserver* observer); | 267 TestingObserver* observer); |
263 | 268 |
264 ~Inner(); | 269 ~Inner(); |
265 | 270 |
266 SequenceToken GetSequenceToken(); | 271 SequenceToken GetSequenceToken(); |
267 | 272 |
268 SequenceToken GetNamedSequenceToken(const std::string& name); | 273 SequenceToken GetNamedSequenceToken(const std::string& name); |
269 | 274 |
| 275 bool GetCurrentThreadSequenceToken(SequenceToken* result_token) const; |
| 276 |
270 // This function accepts a name and an ID. If the name is null, the | 277 // This function accepts a name and an ID. If the name is null, the |
271 // token ID is used. This allows us to implement the optional name lookup | 278 // token ID is used. This allows us to implement the optional name lookup |
272 // from a single function without having to enter the lock a separate time. | 279 // from a single function without having to enter the lock a separate time. |
273 bool PostTask(const std::string* optional_token_name, | 280 bool PostTask(const std::string* optional_token_name, |
274 SequenceToken sequence_token, | 281 SequenceToken sequence_token, |
275 WorkerShutdown shutdown_behavior, | 282 WorkerShutdown shutdown_behavior, |
276 const tracked_objects::Location& from_here, | 283 const tracked_objects::Location& from_here, |
277 const Closure& task, | 284 const Closure& task, |
278 TimeDelta delay); | 285 TimeDelta delay); |
279 | 286 |
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
472 prefix + StringPrintf("Worker%d", thread_number).c_str()), | 479 prefix + StringPrintf("Worker%d", thread_number).c_str()), |
473 worker_pool_(worker_pool), | 480 worker_pool_(worker_pool), |
474 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { | 481 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { |
475 Start(); | 482 Start(); |
476 } | 483 } |
477 | 484 |
478 SequencedWorkerPool::Worker::~Worker() { | 485 SequencedWorkerPool::Worker::~Worker() { |
479 } | 486 } |
480 | 487 |
481 void SequencedWorkerPool::Worker::Run() { | 488 void SequencedWorkerPool::Worker::Run() { |
| 489 // Store the SequencedWorkerPool associated with this worker thread. |
| 490 lazy_tls_ptr.Get().Set(worker_pool_.get()); |
| 491 |
482 // Just jump back to the Inner object to run the thread, since it has all the | 492 // Just jump back to the Inner object to run the thread, since it has all the |
483 // tracking information and queues. It might be more natural to implement | 493 // tracking information and queues. It might be more natural to implement |
484 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 494 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
485 // having these worker objects at all, but that method lacks the ability to | 495 // having these worker objects at all, but that method lacks the ability to |
486 // send thread-specific information easily to the thread loop. | 496 // send thread-specific information easily to the thread loop. |
487 worker_pool_->inner_->ThreadLoop(this); | 497 worker_pool_->inner_->ThreadLoop(this); |
488 // Release our cyclic reference once we're done. | 498 // Release our cyclic reference once we're done. |
489 worker_pool_ = NULL; | 499 worker_pool_ = NULL; |
490 } | 500 } |
491 | 501 |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
536 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); | 546 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
537 return SequenceToken(static_cast<int>(result)); | 547 return SequenceToken(static_cast<int>(result)); |
538 } | 548 } |
539 | 549 |
540 SequencedWorkerPool::SequenceToken | 550 SequencedWorkerPool::SequenceToken |
541 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { | 551 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
542 AutoLock lock(lock_); | 552 AutoLock lock(lock_); |
543 return SequenceToken(LockedGetNamedTokenID(name)); | 553 return SequenceToken(LockedGetNamedTokenID(name)); |
544 } | 554 } |
545 | 555 |
| 556 bool SequencedWorkerPool::Inner::GetCurrentThreadSequenceToken( |
| 557 SequenceToken* result_token) const { |
| 558 DCHECK(result_token); |
| 559 AutoLock lock(lock_); |
| 560 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 561 if (found == threads_.end()) |
| 562 return false; |
| 563 *result_token = found->second->running_sequence(); |
| 564 return true; |
| 565 } |
| 566 |
546 bool SequencedWorkerPool::Inner::PostTask( | 567 bool SequencedWorkerPool::Inner::PostTask( |
547 const std::string* optional_token_name, | 568 const std::string* optional_token_name, |
548 SequenceToken sequence_token, | 569 SequenceToken sequence_token, |
549 WorkerShutdown shutdown_behavior, | 570 WorkerShutdown shutdown_behavior, |
550 const tracked_objects::Location& from_here, | 571 const tracked_objects::Location& from_here, |
551 const Closure& task, | 572 const Closure& task, |
552 TimeDelta delay) { | 573 TimeDelta delay) { |
553 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); | 574 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
554 SequencedTask sequenced(from_here); | 575 SequencedTask sequenced(from_here); |
555 sequenced.sequence_token_id = sequence_token.id_; | 576 sequenced.sequence_token_id = sequence_token.id_; |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
604 return true; | 625 return true; |
605 } | 626 } |
606 | 627 |
607 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 628 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
608 AutoLock lock(lock_); | 629 AutoLock lock(lock_); |
609 return ContainsKey(threads_, PlatformThread::CurrentId()); | 630 return ContainsKey(threads_, PlatformThread::CurrentId()); |
610 } | 631 } |
611 | 632 |
612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 633 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
613 SequenceToken sequence_token) const { | 634 SequenceToken sequence_token) const { |
614 AutoLock lock(lock_); | 635 SequenceToken current_thread_token; |
615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 636 if (!GetCurrentThreadSequenceToken(¤t_thread_token)) |
616 if (found == threads_.end()) | |
617 return false; | 637 return false; |
618 return found->second->running_sequence().Equals(sequence_token); | 638 return current_thread_token.Equals(sequence_token); |
619 } | 639 } |
620 | 640 |
621 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 641 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
622 void SequencedWorkerPool::Inner::CleanupForTesting() { | 642 void SequencedWorkerPool::Inner::CleanupForTesting() { |
623 DCHECK(!RunsTasksOnCurrentThread()); | 643 DCHECK(!RunsTasksOnCurrentThread()); |
624 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 644 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
625 AutoLock lock(lock_); | 645 AutoLock lock(lock_); |
626 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 646 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
627 if (shutdown_called_) | 647 if (shutdown_called_) |
628 return; | 648 return; |
(...skipping 454 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1083 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1103 bool SequencedWorkerPool::Inner::CanShutdown() const { |
1084 lock_.AssertAcquired(); | 1104 lock_.AssertAcquired(); |
1085 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1105 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
1086 return !thread_being_created_ && | 1106 return !thread_being_created_ && |
1087 blocking_shutdown_thread_count_ == 0 && | 1107 blocking_shutdown_thread_count_ == 0 && |
1088 blocking_shutdown_pending_task_count_ == 0; | 1108 blocking_shutdown_pending_task_count_ == 0; |
1089 } | 1109 } |
1090 | 1110 |
1091 // SequencedWorkerPool -------------------------------------------------------- | 1111 // SequencedWorkerPool -------------------------------------------------------- |
1092 | 1112 |
| 1113 // static |
| 1114 SequencedWorkerPool* SequencedWorkerPool::Get() { |
| 1115 return lazy_tls_ptr.Get().Get(); |
| 1116 } |
| 1117 |
1093 SequencedWorkerPool::SequencedWorkerPool( | 1118 SequencedWorkerPool::SequencedWorkerPool( |
1094 size_t max_threads, | 1119 size_t max_threads, |
1095 const std::string& thread_name_prefix) | 1120 const std::string& thread_name_prefix) |
1096 : constructor_message_loop_(MessageLoopProxy::current()), | 1121 : constructor_message_loop_(MessageLoopProxy::current()), |
1097 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1122 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
1098 } | 1123 } |
1099 | 1124 |
1100 SequencedWorkerPool::SequencedWorkerPool( | 1125 SequencedWorkerPool::SequencedWorkerPool( |
1101 size_t max_threads, | 1126 size_t max_threads, |
1102 const std::string& thread_name_prefix, | 1127 const std::string& thread_name_prefix, |
(...skipping 17 matching lines...) Expand all Loading... |
1120 | 1145 |
1121 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1146 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
1122 return inner_->GetSequenceToken(); | 1147 return inner_->GetSequenceToken(); |
1123 } | 1148 } |
1124 | 1149 |
1125 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 1150 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
1126 const std::string& name) { | 1151 const std::string& name) { |
1127 return inner_->GetNamedSequenceToken(name); | 1152 return inner_->GetNamedSequenceToken(name); |
1128 } | 1153 } |
1129 | 1154 |
| 1155 bool SequencedWorkerPool::GetCurrentThreadSequenceToken( |
| 1156 SequenceToken* result_token) const { |
| 1157 return inner_->GetCurrentThreadSequenceToken(result_token); |
| 1158 } |
| 1159 |
1130 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( | 1160 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
1131 SequenceToken token) { | 1161 SequenceToken token) { |
1132 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); | 1162 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
1133 } | 1163 } |
1134 | 1164 |
1135 scoped_refptr<SequencedTaskRunner> | 1165 scoped_refptr<SequencedTaskRunner> |
1136 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( | 1166 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( |
1137 SequenceToken token, WorkerShutdown shutdown_behavior) { | 1167 SequenceToken token, WorkerShutdown shutdown_behavior) { |
1138 return new SequencedWorkerPoolSequencedTaskRunner( | 1168 return new SequencedWorkerPoolSequencedTaskRunner( |
1139 this, token, shutdown_behavior); | 1169 this, token, shutdown_behavior); |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1230 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1260 void SequencedWorkerPool::SignalHasWorkForTesting() { |
1231 inner_->SignalHasWorkForTesting(); | 1261 inner_->SignalHasWorkForTesting(); |
1232 } | 1262 } |
1233 | 1263 |
1234 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1264 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1235 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1265 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
1236 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1266 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1237 } | 1267 } |
1238 | 1268 |
1239 } // namespace base | 1269 } // namespace base |
OLD | NEW |