Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: trunk/src/base/threading/sequenced_worker_pool.cc

Issue 18861008: Revert 210434 "Revert 210433 "Revert 210423 "base: Make Sequence..." (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomic_sequence_num.h" 13 #include "base/atomicops.h"
14 #include "base/callback.h" 14 #include "base/callback.h"
15 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h" 16 #include "base/critical_closure.h"
17 #include "base/debug/trace_event.h" 17 #include "base/debug/trace_event.h"
18 #include "base/lazy_instance.h"
19 #include "base/logging.h" 18 #include "base/logging.h"
20 #include "base/memory/linked_ptr.h" 19 #include "base/memory/linked_ptr.h"
21 #include "base/message_loop/message_loop_proxy.h" 20 #include "base/message_loop/message_loop_proxy.h"
21 #include "base/metrics/histogram.h"
22 #include "base/stl_util.h" 22 #include "base/stl_util.h"
23 #include "base/strings/stringprintf.h" 23 #include "base/strings/stringprintf.h"
24 #include "base/synchronization/condition_variable.h" 24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/lock.h" 25 #include "base/synchronization/lock.h"
26 #include "base/threading/platform_thread.h" 26 #include "base/threading/platform_thread.h"
27 #include "base/threading/simple_thread.h" 27 #include "base/threading/simple_thread.h"
28 #include "base/threading/thread_local.h"
29 #include "base/threading/thread_restrictions.h" 28 #include "base/threading/thread_restrictions.h"
30 #include "base/time/time.h" 29 #include "base/time/time.h"
31 #include "base/tracked_objects.h" 30 #include "base/tracked_objects.h"
32 31
33 #if defined(OS_MACOSX) 32 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h" 33 #include "base/mac/scoped_nsautorelease_pool.h"
35 #endif 34 #endif
36 35
37 #if !defined(OS_NACL)
38 #include "base/metrics/histogram.h"
39 #endif
40
41 namespace base { 36 namespace base {
42 37
43 namespace { 38 namespace {
44 39
45 struct SequencedTask : public TrackingInfo { 40 struct SequencedTask : public TrackingInfo {
46 SequencedTask() 41 SequencedTask()
47 : sequence_token_id(0), 42 : sequence_token_id(0),
48 trace_id(0), 43 trace_id(0),
49 sequence_task_number(0), 44 sequence_task_number(0),
50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 45 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after
211 206
212 // Create a process-wide unique ID to represent this task in trace events. This 207 // Create a process-wide unique ID to represent this task in trace events. This
213 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 208 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
214 // with MessageLoop pointers on other processes. 209 // with MessageLoop pointers on other processes.
215 uint64 GetTaskTraceID(const SequencedTask& task, 210 uint64 GetTaskTraceID(const SequencedTask& task,
216 void* pool) { 211 void* pool) {
217 return (static_cast<uint64>(task.trace_id) << 32) | 212 return (static_cast<uint64>(task.trace_id) << 32) |
218 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 213 static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
219 } 214 }
220 215
221 base::LazyInstance<base::ThreadLocalPointer<
222 SequencedWorkerPool::SequenceToken> > g_lazy_tls_ptr =
223 LAZY_INSTANCE_INITIALIZER;
224
225 } // namespace 216 } // namespace
226 217
227 // Worker --------------------------------------------------------------------- 218 // Worker ---------------------------------------------------------------------
228 219
229 class SequencedWorkerPool::Worker : public SimpleThread { 220 class SequencedWorkerPool::Worker : public SimpleThread {
230 public: 221 public:
231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 222 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
232 // around as long as we are running. 223 // around as long as we are running.
233 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 224 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
234 int thread_number, 225 int thread_number,
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after
384 void SignalHasWork(); 375 void SignalHasWork();
385 376
386 // Checks whether there is work left that's blocking shutdown. Must be 377 // Checks whether there is work left that's blocking shutdown. Must be
387 // called inside the lock. 378 // called inside the lock.
388 bool CanShutdown() const; 379 bool CanShutdown() const;
389 380
390 SequencedWorkerPool* const worker_pool_; 381 SequencedWorkerPool* const worker_pool_;
391 382
392 // The last sequence number used. Managed by GetSequenceToken, since this 383 // The last sequence number used. Managed by GetSequenceToken, since this
393 // only does threadsafe increment operations, you do not need to hold the 384 // only does threadsafe increment operations, you do not need to hold the
394 // lock. This is class-static to make SequenceTokens issued by 385 // lock.
395 // GetSequenceToken unique across SequencedWorkerPool instances. 386 volatile subtle::Atomic32 last_sequence_number_;
396 static base::StaticAtomicSequenceNumber g_last_sequence_number_;
397 387
398 // This lock protects |everything in this class|. Do not read or modify 388 // This lock protects |everything in this class|. Do not read or modify
399 // anything without holding this lock. Do not block while holding this 389 // anything without holding this lock. Do not block while holding this
400 // lock. 390 // lock.
401 mutable Lock lock_; 391 mutable Lock lock_;
402 392
403 // Condition variable that is waited on by worker threads until new 393 // Condition variable that is waited on by worker threads until new
404 // tasks are posted or shutdown starts. 394 // tasks are posted or shutdown starts.
405 ConditionVariable has_work_cv_; 395 ConditionVariable has_work_cv_;
406 396
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
482 prefix + StringPrintf("Worker%d", thread_number).c_str()), 472 prefix + StringPrintf("Worker%d", thread_number).c_str()),
483 worker_pool_(worker_pool), 473 worker_pool_(worker_pool),
484 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 474 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
485 Start(); 475 Start();
486 } 476 }
487 477
488 SequencedWorkerPool::Worker::~Worker() { 478 SequencedWorkerPool::Worker::~Worker() {
489 } 479 }
490 480
491 void SequencedWorkerPool::Worker::Run() { 481 void SequencedWorkerPool::Worker::Run() {
492 // Store a pointer to the running sequence in thread local storage for
493 // static function access.
494 g_lazy_tls_ptr.Get().Set(&running_sequence_);
495
496 // Just jump back to the Inner object to run the thread, since it has all the 482 // Just jump back to the Inner object to run the thread, since it has all the
497 // tracking information and queues. It might be more natural to implement 483 // tracking information and queues. It might be more natural to implement
498 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 484 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
499 // having these worker objects at all, but that method lacks the ability to 485 // having these worker objects at all, but that method lacks the ability to
500 // send thread-specific information easily to the thread loop. 486 // send thread-specific information easily to the thread loop.
501 worker_pool_->inner_->ThreadLoop(this); 487 worker_pool_->inner_->ThreadLoop(this);
502 // Release our cyclic reference once we're done. 488 // Release our cyclic reference once we're done.
503 worker_pool_ = NULL; 489 worker_pool_ = NULL;
504 } 490 }
505 491
506 // Inner definitions --------------------------------------------------------- 492 // Inner definitions ---------------------------------------------------------
507 493
508 SequencedWorkerPool::Inner::Inner( 494 SequencedWorkerPool::Inner::Inner(
509 SequencedWorkerPool* worker_pool, 495 SequencedWorkerPool* worker_pool,
510 size_t max_threads, 496 size_t max_threads,
511 const std::string& thread_name_prefix, 497 const std::string& thread_name_prefix,
512 TestingObserver* observer) 498 TestingObserver* observer)
513 : worker_pool_(worker_pool), 499 : worker_pool_(worker_pool),
500 last_sequence_number_(0),
514 lock_(), 501 lock_(),
515 has_work_cv_(&lock_), 502 has_work_cv_(&lock_),
516 can_shutdown_cv_(&lock_), 503 can_shutdown_cv_(&lock_),
517 max_threads_(max_threads), 504 max_threads_(max_threads),
518 thread_name_prefix_(thread_name_prefix), 505 thread_name_prefix_(thread_name_prefix),
519 thread_being_created_(false), 506 thread_being_created_(false),
520 waiting_thread_count_(0), 507 waiting_thread_count_(0),
521 blocking_shutdown_thread_count_(0), 508 blocking_shutdown_thread_count_(0),
522 next_sequence_task_number_(0), 509 next_sequence_task_number_(0),
523 blocking_shutdown_pending_task_count_(0), 510 blocking_shutdown_pending_task_count_(0),
(...skipping 14 matching lines...) Expand all
538 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
539 it->second->Join(); 526 it->second->Join();
540 threads_.clear(); 527 threads_.clear();
541 528
542 if (testing_observer_) 529 if (testing_observer_)
543 testing_observer_->OnDestruct(); 530 testing_observer_->OnDestruct();
544 } 531 }
545 532
546 SequencedWorkerPool::SequenceToken 533 SequencedWorkerPool::SequenceToken
547 SequencedWorkerPool::Inner::GetSequenceToken() { 534 SequencedWorkerPool::Inner::GetSequenceToken() {
548 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 535 subtle::Atomic32 result =
549 // is used as a sentinel value in SequenceTokens. 536 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
550 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 537 return SequenceToken(static_cast<int>(result));
551 } 538 }
552 539
553 SequencedWorkerPool::SequenceToken 540 SequencedWorkerPool::SequenceToken
554 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 541 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
555 AutoLock lock(lock_); 542 AutoLock lock(lock_);
556 return SequenceToken(LockedGetNamedTokenID(name)); 543 return SequenceToken(LockedGetNamedTokenID(name));
557 } 544 }
558 545
559 bool SequencedWorkerPool::Inner::PostTask( 546 bool SequencedWorkerPool::Inner::PostTask(
560 const std::string* optional_token_name, 547 const std::string* optional_token_name,
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
621 AutoLock lock(lock_); 608 AutoLock lock(lock_);
622 return ContainsKey(threads_, PlatformThread::CurrentId()); 609 return ContainsKey(threads_, PlatformThread::CurrentId());
623 } 610 }
624 611
625 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
626 SequenceToken sequence_token) const { 613 SequenceToken sequence_token) const {
627 AutoLock lock(lock_); 614 AutoLock lock(lock_);
628 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
629 if (found == threads_.end()) 616 if (found == threads_.end())
630 return false; 617 return false;
631 return sequence_token.Equals(found->second->running_sequence()); 618 return found->second->running_sequence().Equals(sequence_token);
632 } 619 }
633 620
634 // See https://code.google.com/p/chromium/issues/detail?id=168415 621 // See https://code.google.com/p/chromium/issues/detail?id=168415
635 void SequencedWorkerPool::Inner::CleanupForTesting() { 622 void SequencedWorkerPool::Inner::CleanupForTesting() {
636 DCHECK(!RunsTasksOnCurrentThread()); 623 DCHECK(!RunsTasksOnCurrentThread());
637 base::ThreadRestrictions::ScopedAllowWait allow_wait; 624 base::ThreadRestrictions::ScopedAllowWait allow_wait;
638 AutoLock lock(lock_); 625 AutoLock lock(lock_);
639 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 626 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
640 if (shutdown_called_) 627 if (shutdown_called_)
641 return; 628 return;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
680 testing_observer_->WillWaitForShutdown(); 667 testing_observer_->WillWaitForShutdown();
681 668
682 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 669 TimeTicks shutdown_wait_begin = TimeTicks::Now();
683 670
684 { 671 {
685 base::ThreadRestrictions::ScopedAllowWait allow_wait; 672 base::ThreadRestrictions::ScopedAllowWait allow_wait;
686 AutoLock lock(lock_); 673 AutoLock lock(lock_);
687 while (!CanShutdown()) 674 while (!CanShutdown())
688 can_shutdown_cv_.Wait(); 675 can_shutdown_cv_.Wait();
689 } 676 }
690 #if !defined(OS_NACL)
691 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 677 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
692 TimeTicks::Now() - shutdown_wait_begin); 678 TimeTicks::Now() - shutdown_wait_begin);
693 #endif
694 } 679 }
695 680
696 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 681 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
697 { 682 {
698 AutoLock lock(lock_); 683 AutoLock lock(lock_);
699 DCHECK(thread_being_created_); 684 DCHECK(thread_being_created_);
700 thread_being_created_ = false; 685 thread_being_created_ = false;
701 std::pair<ThreadMap::iterator, bool> result = 686 std::pair<ThreadMap::iterator, bool> result =
702 threads_.insert( 687 threads_.insert(
703 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 688 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
(...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after
880 return CONTINUE_ON_SHUTDOWN; 865 return CONTINUE_ON_SHUTDOWN;
881 return found->second->running_shutdown_behavior(); 866 return found->second->running_shutdown_behavior();
882 } 867 }
883 868
884 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 869 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
885 SequencedTask* task, 870 SequencedTask* task,
886 TimeDelta* wait_time, 871 TimeDelta* wait_time,
887 std::vector<Closure>* delete_these_outside_lock) { 872 std::vector<Closure>* delete_these_outside_lock) {
888 lock_.AssertAcquired(); 873 lock_.AssertAcquired();
889 874
890 #if !defined(OS_NACL)
891 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 875 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
892 static_cast<int>(pending_tasks_.size())); 876 static_cast<int>(pending_tasks_.size()));
893 #endif
894 877
895 // Find the next task with a sequence token that's not currently in use. 878 // Find the next task with a sequence token that's not currently in use.
896 // If the token is in use, that means another thread is running something 879 // If the token is in use, that means another thread is running something
897 // in that sequence, and we can't run it without going out-of-order. 880 // in that sequence, and we can't run it without going out-of-order.
898 // 881 //
899 // This algorithm is simple and fair, but inefficient in some cases. For 882 // This algorithm is simple and fair, but inefficient in some cases. For
900 // example, say somebody schedules 1000 slow tasks with the same sequence 883 // example, say somebody schedules 1000 slow tasks with the same sequence
901 // number. We'll have to go through all those tasks each time we feel like 884 // number. We'll have to go through all those tasks each time we feel like
902 // there might be work to schedule. If this proves to be a problem, we 885 // there might be work to schedule. If this proves to be a problem, we
903 // should make this more efficient. 886 // should make this more efficient.
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
968 blocking_shutdown_pending_task_count_--; 951 blocking_shutdown_pending_task_count_--;
969 } 952 }
970 953
971 status = GET_WORK_FOUND; 954 status = GET_WORK_FOUND;
972 break; 955 break;
973 } 956 }
974 957
975 // Track the number of tasks we had to skip over to see if we should be 958 // Track the number of tasks we had to skip over to see if we should be
976 // making this more efficient. If this number ever becomes large or is 959 // making this more efficient. If this number ever becomes large or is
977 // frequently "some", we should consider the optimization above. 960 // frequently "some", we should consider the optimization above.
978 #if !defined(OS_NACL)
979 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 961 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
980 unrunnable_tasks); 962 unrunnable_tasks);
981 #endif
982 return status; 963 return status;
983 } 964 }
984 965
985 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 966 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
986 lock_.AssertAcquired(); 967 lock_.AssertAcquired();
987 968
988 // Mark the task's sequence number as in use. 969 // Mark the task's sequence number as in use.
989 if (task.sequence_token_id) 970 if (task.sequence_token_id)
990 current_sequences_.insert(task.sequence_token_id); 971 current_sequences_.insert(task.sequence_token_id);
991 972
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
1100 } 1081 }
1101 1082
1102 bool SequencedWorkerPool::Inner::CanShutdown() const { 1083 bool SequencedWorkerPool::Inner::CanShutdown() const {
1103 lock_.AssertAcquired(); 1084 lock_.AssertAcquired();
1104 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1085 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1105 return !thread_being_created_ && 1086 return !thread_being_created_ &&
1106 blocking_shutdown_thread_count_ == 0 && 1087 blocking_shutdown_thread_count_ == 0 &&
1107 blocking_shutdown_pending_task_count_ == 0; 1088 blocking_shutdown_pending_task_count_ == 0;
1108 } 1089 }
1109 1090
1110 base::StaticAtomicSequenceNumber
1111 SequencedWorkerPool::Inner::g_last_sequence_number_;
1112
1113 // SequencedWorkerPool -------------------------------------------------------- 1091 // SequencedWorkerPool --------------------------------------------------------
1114 1092
1115 // static
1116 SequencedWorkerPool::SequenceToken
1117 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1118 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1119 if (!token)
1120 return SequenceToken();
1121 return *token;
1122 }
1123
1124 SequencedWorkerPool::SequencedWorkerPool( 1093 SequencedWorkerPool::SequencedWorkerPool(
1125 size_t max_threads, 1094 size_t max_threads,
1126 const std::string& thread_name_prefix) 1095 const std::string& thread_name_prefix)
1127 : constructor_message_loop_(MessageLoopProxy::current()), 1096 : constructor_message_loop_(MessageLoopProxy::current()),
1128 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1097 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1129 } 1098 }
1130 1099
1131 SequencedWorkerPool::SequencedWorkerPool( 1100 SequencedWorkerPool::SequencedWorkerPool(
1132 size_t max_threads, 1101 size_t max_threads,
1133 const std::string& thread_name_prefix, 1102 const std::string& thread_name_prefix,
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
1261 void SequencedWorkerPool::SignalHasWorkForTesting() { 1230 void SequencedWorkerPool::SignalHasWorkForTesting() {
1262 inner_->SignalHasWorkForTesting(); 1231 inner_->SignalHasWorkForTesting();
1263 } 1232 }
1264 1233
1265 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1234 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1266 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1235 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1267 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1236 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1268 } 1237 }
1269 1238
1270 } // namespace base 1239 } // namespace base
OLDNEW
« no previous file with comments | « trunk/src/base/threading/sequenced_worker_pool.h ('k') | trunk/src/base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698