| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <list> | 7 #include <list> |
| 8 #include <map> | 8 #include <map> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| 11 #include <vector> | 11 #include <vector> |
| 12 | 12 |
| 13 #include "base/atomicops.h" | 13 #include "base/atomicops.h" |
| 14 #include "base/callback.h" | 14 #include "base/callback.h" |
| 15 #include "base/compiler_specific.h" | 15 #include "base/compiler_specific.h" |
| 16 #include "base/critical_closure.h" | 16 #include "base/critical_closure.h" |
| 17 #include "base/debug/trace_event.h" | 17 #include "base/debug/trace_event.h" |
| 18 #include "base/lazy_instance.h" |
| 18 #include "base/logging.h" | 19 #include "base/logging.h" |
| 19 #include "base/memory/linked_ptr.h" | 20 #include "base/memory/linked_ptr.h" |
| 20 #include "base/message_loop/message_loop_proxy.h" | 21 #include "base/message_loop/message_loop_proxy.h" |
| 21 #include "base/metrics/histogram.h" | 22 #include "base/metrics/histogram.h" |
| 22 #include "base/stl_util.h" | 23 #include "base/stl_util.h" |
| 23 #include "base/strings/stringprintf.h" | 24 #include "base/strings/stringprintf.h" |
| 24 #include "base/synchronization/condition_variable.h" | 25 #include "base/synchronization/condition_variable.h" |
| 25 #include "base/synchronization/lock.h" | 26 #include "base/synchronization/lock.h" |
| 26 #include "base/threading/platform_thread.h" | 27 #include "base/threading/platform_thread.h" |
| 27 #include "base/threading/simple_thread.h" | 28 #include "base/threading/simple_thread.h" |
| 29 #include "base/threading/thread_local.h" |
| 28 #include "base/threading/thread_restrictions.h" | 30 #include "base/threading/thread_restrictions.h" |
| 29 #include "base/time/time.h" | 31 #include "base/time/time.h" |
| 30 #include "base/tracked_objects.h" | 32 #include "base/tracked_objects.h" |
| 31 | 33 |
| 32 #if defined(OS_MACOSX) | 34 #if defined(OS_MACOSX) |
| 33 #include "base/mac/scoped_nsautorelease_pool.h" | 35 #include "base/mac/scoped_nsautorelease_pool.h" |
| 34 #endif | 36 #endif |
| 35 | 37 |
| 36 namespace base { | 38 namespace base { |
| 37 | 39 |
| 38 namespace { | 40 namespace { |
| 39 | 41 |
| 42 base::LazyInstance<base::ThreadLocalPointer<SequencedWorkerPool> > |
| 43 lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER; |
| 44 |
| 40 struct SequencedTask : public TrackingInfo { | 45 struct SequencedTask : public TrackingInfo { |
| 41 SequencedTask() | 46 SequencedTask() |
| 42 : sequence_token_id(0), | 47 : sequence_token_id(0), |
| 43 trace_id(0), | 48 trace_id(0), |
| 44 sequence_task_number(0), | 49 sequence_task_number(0), |
| 45 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
| 46 | 51 |
| 47 explicit SequencedTask(const tracked_objects::Location& from_here) | 52 explicit SequencedTask(const tracked_objects::Location& from_here) |
| 48 : base::TrackingInfo(from_here, TimeTicks()), | 53 : base::TrackingInfo(from_here, TimeTicks()), |
| 49 sequence_token_id(0), | 54 sequence_token_id(0), |
| (...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 260 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 265 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
| 261 const std::string& thread_name_prefix, | 266 const std::string& thread_name_prefix, |
| 262 TestingObserver* observer); | 267 TestingObserver* observer); |
| 263 | 268 |
| 264 ~Inner(); | 269 ~Inner(); |
| 265 | 270 |
| 266 SequenceToken GetSequenceToken(); | 271 SequenceToken GetSequenceToken(); |
| 267 | 272 |
| 268 SequenceToken GetNamedSequenceToken(const std::string& name); | 273 SequenceToken GetNamedSequenceToken(const std::string& name); |
| 269 | 274 |
| 275 bool GetCurrentThreadSequenceToken(SequenceToken* result_token) const; |
| 276 |
| 270 // This function accepts a name and an ID. If the name is null, the | 277 // This function accepts a name and an ID. If the name is null, the |
| 271 // token ID is used. This allows us to implement the optional name lookup | 278 // token ID is used. This allows us to implement the optional name lookup |
| 272 // from a single function without having to enter the lock a separate time. | 279 // from a single function without having to enter the lock a separate time. |
| 273 bool PostTask(const std::string* optional_token_name, | 280 bool PostTask(const std::string* optional_token_name, |
| 274 SequenceToken sequence_token, | 281 SequenceToken sequence_token, |
| 275 WorkerShutdown shutdown_behavior, | 282 WorkerShutdown shutdown_behavior, |
| 276 const tracked_objects::Location& from_here, | 283 const tracked_objects::Location& from_here, |
| 277 const Closure& task, | 284 const Closure& task, |
| 278 TimeDelta delay); | 285 TimeDelta delay); |
| 279 | 286 |
| (...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 472 prefix + StringPrintf("Worker%d", thread_number).c_str()), | 479 prefix + StringPrintf("Worker%d", thread_number).c_str()), |
| 473 worker_pool_(worker_pool), | 480 worker_pool_(worker_pool), |
| 474 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { | 481 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { |
| 475 Start(); | 482 Start(); |
| 476 } | 483 } |
| 477 | 484 |
| 478 SequencedWorkerPool::Worker::~Worker() { | 485 SequencedWorkerPool::Worker::~Worker() { |
| 479 } | 486 } |
| 480 | 487 |
| 481 void SequencedWorkerPool::Worker::Run() { | 488 void SequencedWorkerPool::Worker::Run() { |
| 489 // Store the SequencedWorkerPool associated with this worker thread. |
| 490 lazy_tls_ptr.Get().Set(worker_pool_.get()); |
| 491 |
| 482 // Just jump back to the Inner object to run the thread, since it has all the | 492 // Just jump back to the Inner object to run the thread, since it has all the |
| 483 // tracking information and queues. It might be more natural to implement | 493 // tracking information and queues. It might be more natural to implement |
| 484 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 494 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
| 485 // having these worker objects at all, but that method lacks the ability to | 495 // having these worker objects at all, but that method lacks the ability to |
| 486 // send thread-specific information easily to the thread loop. | 496 // send thread-specific information easily to the thread loop. |
| 487 worker_pool_->inner_->ThreadLoop(this); | 497 worker_pool_->inner_->ThreadLoop(this); |
| 488 // Release our cyclic reference once we're done. | 498 // Release our cyclic reference once we're done. |
| 489 worker_pool_ = NULL; | 499 worker_pool_ = NULL; |
| 490 } | 500 } |
| 491 | 501 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 536 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); | 546 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
| 537 return SequenceToken(static_cast<int>(result)); | 547 return SequenceToken(static_cast<int>(result)); |
| 538 } | 548 } |
| 539 | 549 |
| 540 SequencedWorkerPool::SequenceToken | 550 SequencedWorkerPool::SequenceToken |
| 541 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { | 551 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
| 542 AutoLock lock(lock_); | 552 AutoLock lock(lock_); |
| 543 return SequenceToken(LockedGetNamedTokenID(name)); | 553 return SequenceToken(LockedGetNamedTokenID(name)); |
| 544 } | 554 } |
| 545 | 555 |
| 556 bool SequencedWorkerPool::Inner::GetCurrentThreadSequenceToken( |
| 557 SequenceToken* result_token) const { |
| 558 DCHECK(result_token); |
| 559 AutoLock lock(lock_); |
| 560 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 561 if (found == threads_.end()) |
| 562 return false; |
| 563 *result_token = found->second->running_sequence(); |
| 564 return true; |
| 565 } |
| 566 |
| 546 bool SequencedWorkerPool::Inner::PostTask( | 567 bool SequencedWorkerPool::Inner::PostTask( |
| 547 const std::string* optional_token_name, | 568 const std::string* optional_token_name, |
| 548 SequenceToken sequence_token, | 569 SequenceToken sequence_token, |
| 549 WorkerShutdown shutdown_behavior, | 570 WorkerShutdown shutdown_behavior, |
| 550 const tracked_objects::Location& from_here, | 571 const tracked_objects::Location& from_here, |
| 551 const Closure& task, | 572 const Closure& task, |
| 552 TimeDelta delay) { | 573 TimeDelta delay) { |
| 553 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); | 574 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
| 554 SequencedTask sequenced(from_here); | 575 SequencedTask sequenced(from_here); |
| 555 sequenced.sequence_token_id = sequence_token.id_; | 576 sequenced.sequence_token_id = sequence_token.id_; |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 604 return true; | 625 return true; |
| 605 } | 626 } |
| 606 | 627 |
| 607 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 628 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| 608 AutoLock lock(lock_); | 629 AutoLock lock(lock_); |
| 609 return ContainsKey(threads_, PlatformThread::CurrentId()); | 630 return ContainsKey(threads_, PlatformThread::CurrentId()); |
| 610 } | 631 } |
| 611 | 632 |
| 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 633 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 613 SequenceToken sequence_token) const { | 634 SequenceToken sequence_token) const { |
| 614 AutoLock lock(lock_); | 635 SequenceToken current_thread_token; |
| 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 636 if (!GetCurrentThreadSequenceToken(¤t_thread_token)) |
| 616 if (found == threads_.end()) | |
| 617 return false; | 637 return false; |
| 618 return found->second->running_sequence().Equals(sequence_token); | 638 return current_thread_token.Equals(sequence_token); |
| 619 } | 639 } |
| 620 | 640 |
| 621 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 641 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 622 void SequencedWorkerPool::Inner::CleanupForTesting() { | 642 void SequencedWorkerPool::Inner::CleanupForTesting() { |
| 623 DCHECK(!RunsTasksOnCurrentThread()); | 643 DCHECK(!RunsTasksOnCurrentThread()); |
| 624 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 644 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 625 AutoLock lock(lock_); | 645 AutoLock lock(lock_); |
| 626 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 646 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 627 if (shutdown_called_) | 647 if (shutdown_called_) |
| 628 return; | 648 return; |
| (...skipping 454 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1083 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1103 bool SequencedWorkerPool::Inner::CanShutdown() const { |
| 1084 lock_.AssertAcquired(); | 1104 lock_.AssertAcquired(); |
| 1085 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1105 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| 1086 return !thread_being_created_ && | 1106 return !thread_being_created_ && |
| 1087 blocking_shutdown_thread_count_ == 0 && | 1107 blocking_shutdown_thread_count_ == 0 && |
| 1088 blocking_shutdown_pending_task_count_ == 0; | 1108 blocking_shutdown_pending_task_count_ == 0; |
| 1089 } | 1109 } |
| 1090 | 1110 |
| 1091 // SequencedWorkerPool -------------------------------------------------------- | 1111 // SequencedWorkerPool -------------------------------------------------------- |
| 1092 | 1112 |
| 1113 // static |
| 1114 SequencedWorkerPool* SequencedWorkerPool::Get() { |
| 1115 return lazy_tls_ptr.Get().Get(); |
| 1116 } |
| 1117 |
| 1093 SequencedWorkerPool::SequencedWorkerPool( | 1118 SequencedWorkerPool::SequencedWorkerPool( |
| 1094 size_t max_threads, | 1119 size_t max_threads, |
| 1095 const std::string& thread_name_prefix) | 1120 const std::string& thread_name_prefix) |
| 1096 : constructor_message_loop_(MessageLoopProxy::current()), | 1121 : constructor_message_loop_(MessageLoopProxy::current()), |
| 1097 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { | 1122 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { |
| 1098 } | 1123 } |
| 1099 | 1124 |
| 1100 SequencedWorkerPool::SequencedWorkerPool( | 1125 SequencedWorkerPool::SequencedWorkerPool( |
| 1101 size_t max_threads, | 1126 size_t max_threads, |
| 1102 const std::string& thread_name_prefix, | 1127 const std::string& thread_name_prefix, |
| (...skipping 17 matching lines...) Expand all Loading... |
| 1120 | 1145 |
| 1121 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 1146 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| 1122 return inner_->GetSequenceToken(); | 1147 return inner_->GetSequenceToken(); |
| 1123 } | 1148 } |
| 1124 | 1149 |
| 1125 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 1150 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
| 1126 const std::string& name) { | 1151 const std::string& name) { |
| 1127 return inner_->GetNamedSequenceToken(name); | 1152 return inner_->GetNamedSequenceToken(name); |
| 1128 } | 1153 } |
| 1129 | 1154 |
| 1155 bool SequencedWorkerPool::GetCurrentThreadSequenceToken( |
| 1156 SequenceToken* result_token) const { |
| 1157 return inner_->GetCurrentThreadSequenceToken(result_token); |
| 1158 } |
| 1159 |
| 1130 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( | 1160 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
| 1131 SequenceToken token) { | 1161 SequenceToken token) { |
| 1132 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); | 1162 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
| 1133 } | 1163 } |
| 1134 | 1164 |
| 1135 scoped_refptr<SequencedTaskRunner> | 1165 scoped_refptr<SequencedTaskRunner> |
| 1136 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( | 1166 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( |
| 1137 SequenceToken token, WorkerShutdown shutdown_behavior) { | 1167 SequenceToken token, WorkerShutdown shutdown_behavior) { |
| 1138 return new SequencedWorkerPoolSequencedTaskRunner( | 1168 return new SequencedWorkerPoolSequencedTaskRunner( |
| 1139 this, token, shutdown_behavior); | 1169 this, token, shutdown_behavior); |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1230 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1260 void SequencedWorkerPool::SignalHasWorkForTesting() { |
| 1231 inner_->SignalHasWorkForTesting(); | 1261 inner_->SignalHasWorkForTesting(); |
| 1232 } | 1262 } |
| 1233 | 1263 |
| 1234 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1264 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1235 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1265 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
| 1236 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1266 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1237 } | 1267 } |
| 1238 | 1268 |
| 1239 } // namespace base | 1269 } // namespace base |
| OLD | NEW |