Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2517443002: Revert of Disallow posting tasks to SequencedWorkerPools by default. (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <unordered_map> 13 #include <unordered_map>
14 #include <utility> 14 #include <utility>
15 #include <vector> 15 #include <vector>
16 16
17 #include "base/atomic_sequence_num.h" 17 #include "base/atomic_sequence_num.h"
18 #include "base/atomicops.h"
18 #include "base/callback.h" 19 #include "base/callback.h"
19 #include "base/compiler_specific.h" 20 #include "base/compiler_specific.h"
20 #include "base/critical_closure.h" 21 #include "base/critical_closure.h"
21 #include "base/debug/dump_without_crashing.h"
22 #include "base/lazy_instance.h" 22 #include "base/lazy_instance.h"
23 #include "base/logging.h" 23 #include "base/logging.h"
24 #include "base/macros.h" 24 #include "base/macros.h"
25 #include "base/memory/ptr_util.h" 25 #include "base/memory/ptr_util.h"
26 #include "base/stl_util.h" 26 #include "base/stl_util.h"
27 #include "base/strings/stringprintf.h" 27 #include "base/strings/stringprintf.h"
28 #include "base/synchronization/condition_variable.h" 28 #include "base/synchronization/condition_variable.h"
29 #include "base/synchronization/lock.h" 29 #include "base/synchronization/lock.h"
30 #include "base/task_scheduler/post_task.h" 30 #include "base/task_scheduler/post_task.h"
31 #include "base/task_scheduler/task_scheduler.h" 31 #include "base/task_scheduler/task_scheduler.h"
(...skipping 15 matching lines...) Expand all
47 #endif 47 #endif
48 48
49 #if !defined(OS_NACL) 49 #if !defined(OS_NACL)
50 #include "base/metrics/histogram_macros.h" 50 #include "base/metrics/histogram_macros.h"
51 #endif 51 #endif
52 52
53 namespace base { 53 namespace base {
54 54
55 namespace { 55 namespace {
56 56
57 // An enum representing the state of all pools. A non-test process should only 57 // An enum representing the state of all pools. Any given non-test process
58 // ever transition from POST_TASK_DISABLED to one of the active states. A test 58 // should only ever transition from NONE_ACTIVE to one of the active states.
59 // process may transition from one of the active states to POST_TASK_DISABLED 59 // Transitions between actives states are unexpected. The
60 // when DisableForProcessForTesting() is called. 60 // REDIRECTED_TO_TASK_SCHEDULER transition occurs when
61 // RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition
62 // occurs when a Worker needs to be created because the first task was posted
63 // and the state is still NONE_ACTIVE. In a test process, a transition to
64 // NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is
65 // called.
61 // 66 //
62 // External memory synchronization is required to call a method that reads 67 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race
63 // |g_all_pools_state| after calling a method that modifies it. 68 // between reads/writes, strict memory ordering isn't required per no other
69 // state being inferred from its value. Explicit synchronization (e.g. locks or
70 // events) would be overkill (it's fine for other threads to still see
71 // NONE_ACTIVE after the first Worker was created -- this is not possible for
72 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
73 // other threads are active).
64 // 74 //
65 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool 75 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
66 // will be phased out completely otherwise). 76 // will be phased out completely otherwise).
67 enum class AllPoolsState { 77 enum AllPoolsState : subtle::Atomic32 {
68 POST_TASK_DISABLED, 78 NONE_ACTIVE,
69 USE_WORKER_POOL, 79 WORKER_CREATED,
70 REDIRECTED_TO_TASK_SCHEDULER, 80 REDIRECTED_TO_TASK_SCHEDULER,
71 }; 81 };
72 AllPoolsState g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; 82 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
73 83
74 struct SequencedTask : public TrackingInfo { 84 struct SequencedTask : public TrackingInfo {
75 SequencedTask() 85 SequencedTask()
76 : sequence_token_id(0), 86 : sequence_token_id(0),
77 trace_id(0), 87 trace_id(0),
78 sequence_task_number(0), 88 sequence_task_number(0),
79 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 89 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
80 90
81 explicit SequencedTask(const tracked_objects::Location& from_here) 91 explicit SequencedTask(const tracked_objects::Location& from_here)
82 : base::TrackingInfo(from_here, TimeTicks()), 92 : base::TrackingInfo(from_here, TimeTicks()),
(...skipping 482 matching lines...) Expand 10 before | Expand all | Expand 10 after
565 // Worker definitions --------------------------------------------------------- 575 // Worker definitions ---------------------------------------------------------
566 576
567 SequencedWorkerPool::Worker::Worker( 577 SequencedWorkerPool::Worker::Worker(
568 scoped_refptr<SequencedWorkerPool> worker_pool, 578 scoped_refptr<SequencedWorkerPool> worker_pool,
569 int thread_number, 579 int thread_number,
570 const std::string& prefix) 580 const std::string& prefix)
571 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 581 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
572 worker_pool_(std::move(worker_pool)), 582 worker_pool_(std::move(worker_pool)),
573 task_shutdown_behavior_(BLOCK_SHUTDOWN), 583 task_shutdown_behavior_(BLOCK_SHUTDOWN),
574 is_processing_task_(false) { 584 is_processing_task_(false) {
575 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 585 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
586 subtle::NoBarrier_Load(&g_all_pools_state));
576 Start(); 587 Start();
577 } 588 }
578 589
579 SequencedWorkerPool::Worker::~Worker() { 590 SequencedWorkerPool::Worker::~Worker() {
580 } 591 }
581 592
582 void SequencedWorkerPool::Worker::Run() { 593 void SequencedWorkerPool::Worker::Run() {
583 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 594 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
595 subtle::NoBarrier_Load(&g_all_pools_state));
584 596
585 #if defined(OS_WIN) 597 #if defined(OS_WIN)
586 win::ScopedCOMInitializer com_initializer; 598 win::ScopedCOMInitializer com_initializer;
587 #endif 599 #endif
588 600
589 // Store a pointer to this worker in thread local storage for static function 601 // Store a pointer to this worker in thread local storage for static function
590 // access. 602 // access.
591 DCHECK(!lazy_tls_ptr_.Get().Get()); 603 DCHECK(!lazy_tls_ptr_.Get().Get());
592 lazy_tls_ptr_.Get().Set(this); 604 lazy_tls_ptr_.Get().Set(this);
593 605
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
672 return SequenceToken(LockedGetNamedTokenID(name)); 684 return SequenceToken(LockedGetNamedTokenID(name));
673 } 685 }
674 686
675 bool SequencedWorkerPool::Inner::PostTask( 687 bool SequencedWorkerPool::Inner::PostTask(
676 const std::string* optional_token_name, 688 const std::string* optional_token_name,
677 SequenceToken sequence_token, 689 SequenceToken sequence_token,
678 WorkerShutdown shutdown_behavior, 690 WorkerShutdown shutdown_behavior,
679 const tracked_objects::Location& from_here, 691 const tracked_objects::Location& from_here,
680 const Closure& task, 692 const Closure& task,
681 TimeDelta delay) { 693 TimeDelta delay) {
682 DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
683 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
684 debug::DumpWithoutCrashing();
685
686 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 694 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
687 SequencedTask sequenced(from_here); 695 SequencedTask sequenced(from_here);
688 sequenced.sequence_token_id = sequence_token.id_; 696 sequenced.sequence_token_id = sequence_token.id_;
689 sequenced.shutdown_behavior = shutdown_behavior; 697 sequenced.shutdown_behavior = shutdown_behavior;
690 sequenced.posted_from = from_here; 698 sequenced.posted_from = from_here;
691 sequenced.task = 699 sequenced.task =
692 shutdown_behavior == BLOCK_SHUTDOWN ? 700 shutdown_behavior == BLOCK_SHUTDOWN ?
693 base::MakeCriticalClosure(task) : task; 701 base::MakeCriticalClosure(task) : task;
694 sequenced.time_to_run = TimeTicks::Now() + delay; 702 sequenced.time_to_run = TimeTicks::Now() + delay;
695 703
(...skipping 29 matching lines...) Expand all
725 "SequencedWorkerPool::Inner::PostTask", 733 "SequencedWorkerPool::Inner::PostTask",
726 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 734 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
727 TRACE_EVENT_FLAG_FLOW_OUT); 735 TRACE_EVENT_FLAG_FLOW_OUT);
728 736
729 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 737 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
730 738
731 // Now that we have the lock, apply the named token rules. 739 // Now that we have the lock, apply the named token rules.
732 if (optional_token_name) 740 if (optional_token_name)
733 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 741 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
734 742
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 743 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
744 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
736 if (!PostTaskToTaskScheduler(sequenced, delay)) 745 if (!PostTaskToTaskScheduler(sequenced, delay))
737 return false; 746 return false;
738 } else { 747 } else {
739 pending_tasks_.insert(sequenced); 748 pending_tasks_.insert(sequenced);
740 749
741 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) 750 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
742 blocking_shutdown_pending_task_count_++; 751 blocking_shutdown_pending_task_count_++;
743 752
744 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 753 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
745 } 754 }
746 } 755 }
747 756
748 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure 757 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
749 // correct behavior if a task is posted to a SequencedWorkerPool before 758 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
750 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
751 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
752 // Actually start the additional thread or signal an existing one outside 759 // Actually start the additional thread or signal an existing one outside
753 // the lock. 760 // the lock.
754 if (create_thread_id) 761 if (create_thread_id)
755 FinishStartingAdditionalThread(create_thread_id); 762 FinishStartingAdditionalThread(create_thread_id);
756 else 763 else
757 SignalHasWork(); 764 SignalHasWork();
758 } 765 }
759 766
760 #if DCHECK_IS_ON() 767 #if DCHECK_IS_ON()
761 { 768 {
762 AutoLock lock_for_dcheck(lock_); 769 AutoLock lock_for_dcheck(lock_);
763 // Some variables are exposed in both modes for convenience but only really 770 // Some variables are exposed in both modes for convenience but only really
764 // intended for one of them at runtime, confirm exclusive usage here. 771 // intended for one of them at runtime, confirm exclusive usage here.
765 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 772 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
773 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
766 DCHECK(pending_tasks_.empty()); 774 DCHECK(pending_tasks_.empty());
767 DCHECK_EQ(0, create_thread_id); 775 DCHECK_EQ(0, create_thread_id);
768 } else { 776 } else {
769 DCHECK(sequenced_task_runner_map_.empty()); 777 DCHECK(sequenced_task_runner_map_.empty());
770 } 778 }
771 } 779 }
772 #endif // DCHECK_IS_ON() 780 #endif // DCHECK_IS_ON()
773 781
774 return true; 782 return true;
775 } 783 }
776 784
777 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 785 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
778 const SequencedTask& sequenced, 786 const SequencedTask& sequenced,
779 const TimeDelta& delay) { 787 const TimeDelta& delay) {
780 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 788 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
789 subtle::NoBarrier_Load(&g_all_pools_state));
781 790
782 lock_.AssertAcquired(); 791 lock_.AssertAcquired();
783 792
784 // Confirm that the TaskScheduler's shutdown behaviors use the same 793 // Confirm that the TaskScheduler's shutdown behaviors use the same
785 // underlying values as SequencedWorkerPool. 794 // underlying values as SequencedWorkerPool.
786 static_assert( 795 static_assert(
787 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 796 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
788 static_cast<int>(CONTINUE_ON_SHUTDOWN), 797 static_cast<int>(CONTINUE_ON_SHUTDOWN),
789 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 798 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
790 "CONTINUE_ON_SHUTDOWN."); 799 "CONTINUE_ON_SHUTDOWN.");
(...skipping 13 matching lines...) Expand all
804 .WithPriority(task_priority_) 813 .WithPriority(task_priority_)
805 .WithShutdownBehavior(task_shutdown_behavior); 814 .WithShutdownBehavior(task_shutdown_behavior);
806 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) 815 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits)
807 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); 816 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay);
808 } 817 }
809 818
810 scoped_refptr<TaskRunner> 819 scoped_refptr<TaskRunner>
811 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( 820 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
812 int sequence_token_id, 821 int sequence_token_id,
813 const TaskTraits& traits) { 822 const TaskTraits& traits) {
814 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 823 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
824 subtle::NoBarrier_Load(&g_all_pools_state));
815 825
816 lock_.AssertAcquired(); 826 lock_.AssertAcquired();
817 827
818 static_assert( 828 static_assert(
819 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, 829 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0,
820 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " 830 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be "
821 "used as an index in |unsequenced_task_runners_|."); 831 "used as an index in |unsequenced_task_runners_|.");
822 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, 832 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1,
823 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " 833 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to "
824 "be used as an index in |unsequenced_task_runners_|."); 834 "be used as an index in |unsequenced_task_runners_|.");
(...skipping 16 matching lines...) Expand all
841 task_runner = sequence_token_id 851 task_runner = sequence_token_id
842 ? CreateSequencedTaskRunnerWithTraits(traits) 852 ? CreateSequencedTaskRunnerWithTraits(traits)
843 : CreateTaskRunnerWithTraits(traits); 853 : CreateTaskRunnerWithTraits(traits);
844 } 854 }
845 855
846 return task_runner; 856 return task_runner;
847 } 857 }
848 858
849 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 859 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
850 AutoLock lock(lock_); 860 AutoLock lock(lock_);
851 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 861 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
862 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
852 if (!runs_tasks_on_verifier_) { 863 if (!runs_tasks_on_verifier_) {
853 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( 864 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
854 TaskTraits().WithFileIO().WithPriority(task_priority_)); 865 TaskTraits().WithFileIO().WithPriority(task_priority_));
855 } 866 }
856 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); 867 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
857 } else { 868 } else {
858 return ContainsKey(threads_, PlatformThread::CurrentId()); 869 return ContainsKey(threads_, PlatformThread::CurrentId());
859 } 870 }
860 } 871 }
861 872
862 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 873 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
863 SequenceToken sequence_token) const { 874 SequenceToken sequence_token) const {
864 DCHECK(sequence_token.IsValid()); 875 DCHECK(sequence_token.IsValid());
865 876
866 AutoLock lock(lock_); 877 AutoLock lock(lock_);
867 878
868 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 879 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
880 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
869 const auto sequenced_task_runner_it = 881 const auto sequenced_task_runner_it =
870 sequenced_task_runner_map_.find(sequence_token.id_); 882 sequenced_task_runner_map_.find(sequence_token.id_);
871 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && 883 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
872 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); 884 sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
873 } else { 885 } else {
874 ThreadMap::const_iterator found = 886 ThreadMap::const_iterator found =
875 threads_.find(PlatformThread::CurrentId()); 887 threads_.find(PlatformThread::CurrentId());
876 return found != threads_.end() && found->second->is_processing_task() && 888 return found != threads_.end() && found->second->is_processing_task() &&
877 sequence_token.Equals(found->second->task_sequence_token()); 889 sequence_token.Equals(found->second->task_sequence_token());
878 } 890 }
879 } 891 }
880 892
881 // See https://code.google.com/p/chromium/issues/detail?id=168415 893 // See https://code.google.com/p/chromium/issues/detail?id=168415
882 void SequencedWorkerPool::Inner::CleanupForTesting() { 894 void SequencedWorkerPool::Inner::CleanupForTesting() {
883 DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); 895 DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state),
896 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
884 AutoLock lock(lock_); 897 AutoLock lock(lock_);
885 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 898 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
886 if (shutdown_called_) 899 if (shutdown_called_)
887 return; 900 return;
888 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 901 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
889 return; 902 return;
890 cleanup_state_ = CLEANUP_REQUESTED; 903 cleanup_state_ = CLEANUP_REQUESTED;
891 cleanup_idlers_ = 0; 904 cleanup_idlers_ = 0;
892 has_work_cv_.Signal(); 905 has_work_cv_.Signal();
893 while (cleanup_state_ != CLEANUP_DONE) 906 while (cleanup_state_ != CLEANUP_DONE)
(...skipping 10 matching lines...) Expand all
904 { 917 {
905 AutoLock lock(lock_); 918 AutoLock lock(lock_);
906 // Cleanup and Shutdown should not be called concurrently. 919 // Cleanup and Shutdown should not be called concurrently.
907 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 920 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
908 if (shutdown_called_) 921 if (shutdown_called_)
909 return; 922 return;
910 shutdown_called_ = true; 923 shutdown_called_ = true;
911 924
912 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 925 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
913 926
914 if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) 927 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
928 AllPoolsState::WORKER_CREATED) {
915 return; 929 return;
930 }
916 931
917 // Tickle the threads. This will wake up a waiting one so it will know that 932 // Tickle the threads. This will wake up a waiting one so it will know that
918 // it can exit, which in turn will wake up any other waiting ones. 933 // it can exit, which in turn will wake up any other waiting ones.
919 SignalHasWork(); 934 SignalHasWork();
920 935
921 // There are no pending or running tasks blocking shutdown, we're done. 936 // There are no pending or running tasks blocking shutdown, we're done.
922 if (CanShutdown()) 937 if (CanShutdown())
923 return; 938 return;
924 } 939 }
925 940
(...skipping 18 matching lines...) Expand all
944 TimeTicks::Now() - shutdown_wait_begin); 959 TimeTicks::Now() - shutdown_wait_begin);
945 #endif 960 #endif
946 } 961 }
947 962
948 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 963 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
949 AutoLock lock(lock_); 964 AutoLock lock(lock_);
950 return shutdown_called_; 965 return shutdown_called_;
951 } 966 }
952 967
953 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 968 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
954 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 969 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
970 subtle::NoBarrier_Load(&g_all_pools_state));
955 { 971 {
956 AutoLock lock(lock_); 972 AutoLock lock(lock_);
957 DCHECK(thread_being_created_); 973 DCHECK(thread_being_created_);
958 thread_being_created_ = false; 974 thread_being_created_ = false;
959 auto result = threads_.insert( 975 auto result = threads_.insert(
960 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 976 std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
961 DCHECK(result.second); 977 DCHECK(result.second);
962 978
963 while (true) { 979 while (true) {
964 #if defined(OS_MACOSX) 980 #if defined(OS_MACOSX)
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
1076 1092
1077 // We noticed we should exit. Wake up the next worker so it knows it should 1093 // We noticed we should exit. Wake up the next worker so it knows it should
1078 // exit as well (because the Shutdown() code only signals once). 1094 // exit as well (because the Shutdown() code only signals once).
1079 SignalHasWork(); 1095 SignalHasWork();
1080 1096
1081 // Possibly unblock shutdown. 1097 // Possibly unblock shutdown.
1082 can_shutdown_cv_.Signal(); 1098 can_shutdown_cv_.Signal();
1083 } 1099 }
1084 1100
1085 void SequencedWorkerPool::Inner::HandleCleanup() { 1101 void SequencedWorkerPool::Inner::HandleCleanup() {
1086 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1102 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1103 subtle::NoBarrier_Load(&g_all_pools_state));
1087 1104
1088 lock_.AssertAcquired(); 1105 lock_.AssertAcquired();
1089 if (cleanup_state_ == CLEANUP_DONE) 1106 if (cleanup_state_ == CLEANUP_DONE)
1090 return; 1107 return;
1091 if (cleanup_state_ == CLEANUP_REQUESTED) { 1108 if (cleanup_state_ == CLEANUP_REQUESTED) {
1092 // We win, we get to do the cleanup as soon as the others wise up and idle. 1109 // We win, we get to do the cleanup as soon as the others wise up and idle.
1093 cleanup_state_ = CLEANUP_STARTING; 1110 cleanup_state_ = CLEANUP_STARTING;
1094 while (thread_being_created_ || 1111 while (thread_being_created_ ||
1095 cleanup_idlers_ != threads_.size() - 1) { 1112 cleanup_idlers_ != threads_.size() - 1) {
1096 has_work_cv_.Signal(); 1113 has_work_cv_.Signal();
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
1143 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1160 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
1144 lock_.AssertAcquired(); 1161 lock_.AssertAcquired();
1145 // We assume that we never create enough tasks to wrap around. 1162 // We assume that we never create enough tasks to wrap around.
1146 return next_sequence_task_number_++; 1163 return next_sequence_task_number_++;
1147 } 1164 }
1148 1165
1149 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1166 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
1150 SequencedTask* task, 1167 SequencedTask* task,
1151 TimeDelta* wait_time, 1168 TimeDelta* wait_time,
1152 std::vector<Closure>* delete_these_outside_lock) { 1169 std::vector<Closure>* delete_these_outside_lock) {
1153 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1170 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1171 subtle::NoBarrier_Load(&g_all_pools_state));
1154 1172
1155 lock_.AssertAcquired(); 1173 lock_.AssertAcquired();
1156 1174
1157 // Find the next task with a sequence token that's not currently in use. 1175 // Find the next task with a sequence token that's not currently in use.
1158 // If the token is in use, that means another thread is running something 1176 // If the token is in use, that means another thread is running something
1159 // in that sequence, and we can't run it without going out-of-order. 1177 // in that sequence, and we can't run it without going out-of-order.
1160 // 1178 //
1161 // This algorithm is simple and fair, but inefficient in some cases. For 1179 // This algorithm is simple and fair, but inefficient in some cases. For
1162 // example, say somebody schedules 1000 slow tasks with the same sequence 1180 // example, say somebody schedules 1000 slow tasks with the same sequence
1163 // number. We'll have to go through all those tasks each time we feel like 1181 // number. We'll have to go through all those tasks each time we feel like
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
1231 } 1249 }
1232 1250
1233 status = GET_WORK_FOUND; 1251 status = GET_WORK_FOUND;
1234 break; 1252 break;
1235 } 1253 }
1236 1254
1237 return status; 1255 return status;
1238 } 1256 }
1239 1257
1240 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1258 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1241 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1259 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1260 subtle::NoBarrier_Load(&g_all_pools_state));
1242 1261
1243 lock_.AssertAcquired(); 1262 lock_.AssertAcquired();
1244 1263
1245 // Mark the task's sequence number as in use. 1264 // Mark the task's sequence number as in use.
1246 if (task.sequence_token_id) 1265 if (task.sequence_token_id)
1247 current_sequences_.insert(task.sequence_token_id); 1266 current_sequences_.insert(task.sequence_token_id);
1248 1267
1249 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1268 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1250 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1269 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1251 // completes. 1270 // completes.
(...skipping 12 matching lines...) Expand all
1264 // if there is one waiting to pick up the next task. 1283 // if there is one waiting to pick up the next task.
1265 // 1284 //
1266 // Note that we really need to do this *before* running the task, not 1285 // Note that we really need to do this *before* running the task, not
1267 // after. Otherwise, if more than one task is posted, the creation of the 1286 // after. Otherwise, if more than one task is posted, the creation of the
1268 // second thread (since we only create one at a time) will be blocked by 1287 // second thread (since we only create one at a time) will be blocked by
1269 // the execution of the first task, which could be arbitrarily long. 1288 // the execution of the first task, which could be arbitrarily long.
1270 return PrepareToStartAdditionalThreadIfHelpful(); 1289 return PrepareToStartAdditionalThreadIfHelpful();
1271 } 1290 }
1272 1291
1273 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1292 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1274 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1293 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1294 subtle::NoBarrier_Load(&g_all_pools_state));
1275 1295
1276 lock_.AssertAcquired(); 1296 lock_.AssertAcquired();
1277 1297
1278 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1298 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1279 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1299 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1280 blocking_shutdown_thread_count_--; 1300 blocking_shutdown_thread_count_--;
1281 } 1301 }
1282 1302
1283 if (task.sequence_token_id) 1303 if (task.sequence_token_id)
1284 current_sequences_.erase(task.sequence_token_id); 1304 current_sequences_.erase(task.sequence_token_id);
1285 } 1305 }
1286 1306
1287 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1307 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1288 int sequence_token_id) const { 1308 int sequence_token_id) const {
1289 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1309 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1310 subtle::NoBarrier_Load(&g_all_pools_state));
1290 1311
1291 lock_.AssertAcquired(); 1312 lock_.AssertAcquired();
1292 return !sequence_token_id || 1313 return !sequence_token_id ||
1293 current_sequences_.find(sequence_token_id) == 1314 current_sequences_.find(sequence_token_id) ==
1294 current_sequences_.end(); 1315 current_sequences_.end();
1295 } 1316 }
1296 1317
1297 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1318 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1298 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1319 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1320 subtle::NoBarrier_Load(&g_all_pools_state));
1299 1321
1300 lock_.AssertAcquired(); 1322 lock_.AssertAcquired();
1301 // How thread creation works: 1323 // How thread creation works:
1302 // 1324 //
1303 // We'de like to avoid creating threads with the lock held. However, we 1325 // We'de like to avoid creating threads with the lock held. However, we
1304 // need to be sure that we have an accurate accounting of the threads for 1326 // need to be sure that we have an accurate accounting of the threads for
1305 // proper Joining and deltion on shutdown. 1327 // proper Joining and deltion on shutdown.
1306 // 1328 //
1307 // We need to figure out if we need another thread with the lock held, which 1329 // We need to figure out if we need another thread with the lock held, which
1308 // is what this function does. It then marks us as in the process of creating 1330 // is what this function does. It then marks us as in the process of creating
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1340 thread_being_created_ = true; 1362 thread_being_created_ = true;
1341 return static_cast<int>(threads_.size() + 1); 1363 return static_cast<int>(threads_.size() + 1);
1342 } 1364 }
1343 } 1365 }
1344 } 1366 }
1345 return 0; 1367 return 0;
1346 } 1368 }
1347 1369
1348 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1370 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1349 int thread_number) { 1371 int thread_number) {
1350 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1372 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1373 subtle::NoBarrier_Load(&g_all_pools_state));
1351 1374
1352 // Called outside of the lock. 1375 // Called outside of the lock.
1353 DCHECK_GT(thread_number, 0); 1376 DCHECK_GT(thread_number, 0);
1354 1377
1378 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
1379 AllPoolsState::WORKER_CREATED) {
1380 DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
1381 subtle::NoBarrier_Load(&g_all_pools_state));
1382 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
1383 }
1384
1355 // The worker is assigned to the list when the thread actually starts, which 1385 // The worker is assigned to the list when the thread actually starts, which
1356 // will manage the memory of the pointer. 1386 // will manage the memory of the pointer.
1357 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1387 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1358 } 1388 }
1359 1389
1360 void SequencedWorkerPool::Inner::SignalHasWork() { 1390 void SequencedWorkerPool::Inner::SignalHasWork() {
1361 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1391 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER,
1392 subtle::NoBarrier_Load(&g_all_pools_state));
1362 1393
1363 has_work_cv_.Signal(); 1394 has_work_cv_.Signal();
1364 if (testing_observer_) { 1395 if (testing_observer_) {
1365 testing_observer_->OnHasWork(); 1396 testing_observer_->OnHasWork();
1366 } 1397 }
1367 } 1398 }
1368 1399
1369 bool SequencedWorkerPool::Inner::CanShutdown() const { 1400 bool SequencedWorkerPool::Inner::CanShutdown() const {
1370 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1401 DCHECK_EQ(AllPoolsState::WORKER_CREATED,
1402 subtle::NoBarrier_Load(&g_all_pools_state));
1371 lock_.AssertAcquired(); 1403 lock_.AssertAcquired();
1372 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1404 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1373 return !thread_being_created_ && 1405 return !thread_being_created_ &&
1374 blocking_shutdown_thread_count_ == 0 && 1406 blocking_shutdown_thread_count_ == 0 &&
1375 blocking_shutdown_pending_task_count_ == 0; 1407 blocking_shutdown_pending_task_count_ == 0;
1376 } 1408 }
1377 1409
1378 base::StaticAtomicSequenceNumber 1410 base::StaticAtomicSequenceNumber
1379 SequencedWorkerPool::Inner::g_last_sequence_number_; 1411 SequencedWorkerPool::Inner::g_last_sequence_number_;
1380 1412
(...skipping 17 matching lines...) Expand all
1398 scoped_refptr<SequencedWorkerPool> 1430 scoped_refptr<SequencedWorkerPool>
1399 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1431 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1400 Worker* worker = Worker::GetForCurrentThread(); 1432 Worker* worker = Worker::GetForCurrentThread();
1401 if (!worker) 1433 if (!worker)
1402 return nullptr; 1434 return nullptr;
1403 1435
1404 return worker->worker_pool(); 1436 return worker->worker_pool();
1405 } 1437 }
1406 1438
1407 // static 1439 // static
1408 void SequencedWorkerPool::EnableForProcess() { 1440 void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
1409 DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); 1441 DCHECK(TaskScheduler::GetInstance());
1410 g_all_pools_state = AllPoolsState::USE_WORKER_POOL; 1442 // Hitting this DCHECK indicates that a task was posted to a
1443 // SequencedWorkerPool before the TaskScheduler was initialized and
1444 // redirected, posting task to SequencedWorkerPools needs to at least be
1445 // delayed until after that point.
1446 DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
1447 subtle::NoBarrier_Load(&g_all_pools_state));
1448 subtle::NoBarrier_Store(&g_all_pools_state,
1449 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
1411 } 1450 }
1412 1451
1413 // static 1452 // static
1414 void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() { 1453 void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
1415 DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); 1454 // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER
1416 DCHECK(TaskScheduler::GetInstance()); 1455 // to stop redirecting tasks. It can also be called when the current state is
1417 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; 1456 // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called
1418 } 1457 // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has
1419 1458 // been created if this isn't called).
1420 // static 1459 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE);
1421 void SequencedWorkerPool::DisableForProcessForTesting() {
1422 g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
1423 }
1424
1425 // static
1426 bool SequencedWorkerPool::IsEnabled() {
1427 return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED;
1428 } 1460 }
1429 1461
1430 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1462 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1431 const std::string& thread_name_prefix, 1463 const std::string& thread_name_prefix,
1432 base::TaskPriority task_priority) 1464 base::TaskPriority task_priority)
1433 : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), 1465 : constructor_task_runner_(SequencedTaskRunnerHandle::Get()),
1434 inner_(new Inner(this, 1466 inner_(new Inner(this,
1435 max_threads, 1467 max_threads,
1436 thread_name_prefix, 1468 thread_name_prefix,
1437 task_priority, 1469 task_priority,
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after
1556 return PostDelayedWorkerTask(from_here, task, delay); 1588 return PostDelayedWorkerTask(from_here, task, delay);
1557 } 1589 }
1558 1590
1559 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1591 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1560 return inner_->RunsTasksOnCurrentThread(); 1592 return inner_->RunsTasksOnCurrentThread();
1561 } 1593 }
1562 1594
1563 void SequencedWorkerPool::FlushForTesting() { 1595 void SequencedWorkerPool::FlushForTesting() {
1564 DCHECK(!RunsTasksOnCurrentThread()); 1596 DCHECK(!RunsTasksOnCurrentThread());
1565 base::ThreadRestrictions::ScopedAllowWait allow_wait; 1597 base::ThreadRestrictions::ScopedAllowWait allow_wait;
1566 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 1598 if (subtle::NoBarrier_Load(&g_all_pools_state) ==
1599 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
1567 // TODO(gab): Remove this if http://crbug.com/622400 fails. 1600 // TODO(gab): Remove this if http://crbug.com/622400 fails.
1568 TaskScheduler::GetInstance()->FlushForTesting(); 1601 TaskScheduler::GetInstance()->FlushForTesting();
1569 } else { 1602 } else {
1570 inner_->CleanupForTesting(); 1603 inner_->CleanupForTesting();
1571 } 1604 }
1572 } 1605 }
1573 1606
1574 void SequencedWorkerPool::SignalHasWorkForTesting() { 1607 void SequencedWorkerPool::SignalHasWorkForTesting() {
1575 inner_->SignalHasWorkForTesting(); 1608 inner_->SignalHasWorkForTesting();
1576 } 1609 }
1577 1610
1578 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1611 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1579 DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread()); 1612 DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread());
1580 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1613 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1581 } 1614 }
1582 1615
1583 bool SequencedWorkerPool::IsShutdownInProgress() { 1616 bool SequencedWorkerPool::IsShutdownInProgress() {
1584 return inner_->IsShutdownInProgress(); 1617 return inner_->IsShutdownInProgress();
1585 } 1618 }
1586 1619
1587 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1620 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1588 SequenceToken sequence_token) const { 1621 SequenceToken sequence_token) const {
1589 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1622 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1590 } 1623 }
1591 1624
1592 } // namespace base 1625 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698