Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(424)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2445763002: Disallow posting tasks to SequencedWorkerPools by default. (Closed)
Patch Set: self-review Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <unordered_map> 13 #include <unordered_map>
14 #include <utility> 14 #include <utility>
15 #include <vector> 15 #include <vector>
16 16
17 #include "base/atomic_sequence_num.h" 17 #include "base/atomic_sequence_num.h"
18 #include "base/atomicops.h"
19 #include "base/callback.h" 18 #include "base/callback.h"
20 #include "base/compiler_specific.h" 19 #include "base/compiler_specific.h"
21 #include "base/critical_closure.h" 20 #include "base/critical_closure.h"
21 #include "base/debug/dump_without_crashing.h"
22 #include "base/lazy_instance.h" 22 #include "base/lazy_instance.h"
23 #include "base/logging.h" 23 #include "base/logging.h"
24 #include "base/macros.h" 24 #include "base/macros.h"
25 #include "base/memory/ptr_util.h" 25 #include "base/memory/ptr_util.h"
26 #include "base/stl_util.h" 26 #include "base/stl_util.h"
27 #include "base/strings/stringprintf.h" 27 #include "base/strings/stringprintf.h"
28 #include "base/synchronization/condition_variable.h" 28 #include "base/synchronization/condition_variable.h"
29 #include "base/synchronization/lock.h" 29 #include "base/synchronization/lock.h"
30 #include "base/task_scheduler/post_task.h" 30 #include "base/task_scheduler/post_task.h"
31 #include "base/task_scheduler/task_scheduler.h" 31 #include "base/task_scheduler/task_scheduler.h"
(...skipping 15 matching lines...) Expand all
47 #endif 47 #endif
48 48
49 #if !defined(OS_NACL) 49 #if !defined(OS_NACL)
50 #include "base/metrics/histogram_macros.h" 50 #include "base/metrics/histogram_macros.h"
51 #endif 51 #endif
52 52
53 namespace base { 53 namespace base {
54 54
55 namespace { 55 namespace {
56 56
57 // An enum representing the state of all pools. Any given non-test process 57 // An enum representing the state of all pools. A non-test process should only
58 // should only ever transition from NONE_ACTIVE to one of the active states. 58 // ever transition from POST_TASK_DISABLED to one of the active states. A test
59 // Transitions between actives states are unexpected. The 59 // process may transition from one of the active states to POST_TASK_DISABLED
60 // REDIRECTED_TO_TASK_SCHEDULER transition occurs when 60 // when DisableForProcessForTesting() is called.
61 // RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition
62 // occurs when a Worker needs to be created because the first task was posted
63 // and the state is still NONE_ACTIVE. In a test process, a transition to
64 // NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is
65 // called.
66 // 61 //
67 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race 62 // External memory synchronization is required to call a method that reads
68 // between reads/writes, strict memory ordering isn't required per no other 63 // |g_all_pools_state| after calling a method that modifies it.
69 // state being inferred from its value. Explicit synchronization (e.g. locks or
70 // events) would be overkill (it's fine for other threads to still see
71 // NONE_ACTIVE after the first Worker was created -- this is not possible for
72 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no
73 // other threads are active).
74 // 64 //
75 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool 65 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
76 // will be phased out completely otherwise). 66 // will be phased out completely otherwise).
77 enum AllPoolsState : subtle::Atomic32 { 67 enum class AllPoolsState {
78 NONE_ACTIVE, 68 POST_TASK_DISABLED,
79 WORKER_CREATED, 69 USE_WORKER_POOL,
80 REDIRECTED_TO_TASK_SCHEDULER, 70 REDIRECTED_TO_TASK_SCHEDULER,
81 }; 71 };
82 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; 72 AllPoolsState g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
83 73
84 struct SequencedTask : public TrackingInfo { 74 struct SequencedTask : public TrackingInfo {
85 SequencedTask() 75 SequencedTask()
86 : sequence_token_id(0), 76 : sequence_token_id(0),
87 trace_id(0), 77 trace_id(0),
88 sequence_task_number(0), 78 sequence_task_number(0),
89 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 79 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
90 80
91 explicit SequencedTask(const tracked_objects::Location& from_here) 81 explicit SequencedTask(const tracked_objects::Location& from_here)
92 : base::TrackingInfo(from_here, TimeTicks()), 82 : base::TrackingInfo(from_here, TimeTicks()),
(...skipping 482 matching lines...) Expand 10 before | Expand all | Expand 10 after
575 // Worker definitions --------------------------------------------------------- 565 // Worker definitions ---------------------------------------------------------
576 566
577 SequencedWorkerPool::Worker::Worker( 567 SequencedWorkerPool::Worker::Worker(
578 scoped_refptr<SequencedWorkerPool> worker_pool, 568 scoped_refptr<SequencedWorkerPool> worker_pool,
579 int thread_number, 569 int thread_number,
580 const std::string& prefix) 570 const std::string& prefix)
581 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 571 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
582 worker_pool_(std::move(worker_pool)), 572 worker_pool_(std::move(worker_pool)),
583 task_shutdown_behavior_(BLOCK_SHUTDOWN), 573 task_shutdown_behavior_(BLOCK_SHUTDOWN),
584 is_processing_task_(false) { 574 is_processing_task_(false) {
585 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 575 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
586 subtle::NoBarrier_Load(&g_all_pools_state));
587 Start(); 576 Start();
588 } 577 }
589 578
590 SequencedWorkerPool::Worker::~Worker() { 579 SequencedWorkerPool::Worker::~Worker() {
591 } 580 }
592 581
593 void SequencedWorkerPool::Worker::Run() { 582 void SequencedWorkerPool::Worker::Run() {
594 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 583 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
595 subtle::NoBarrier_Load(&g_all_pools_state));
596 584
597 #if defined(OS_WIN) 585 #if defined(OS_WIN)
598 win::ScopedCOMInitializer com_initializer; 586 win::ScopedCOMInitializer com_initializer;
599 #endif 587 #endif
600 588
601 // Store a pointer to this worker in thread local storage for static function 589 // Store a pointer to this worker in thread local storage for static function
602 // access. 590 // access.
603 DCHECK(!lazy_tls_ptr_.Get().Get()); 591 DCHECK(!lazy_tls_ptr_.Get().Get());
604 lazy_tls_ptr_.Get().Set(this); 592 lazy_tls_ptr_.Get().Set(this);
605 593
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
684 return SequenceToken(LockedGetNamedTokenID(name)); 672 return SequenceToken(LockedGetNamedTokenID(name));
685 } 673 }
686 674
687 bool SequencedWorkerPool::Inner::PostTask( 675 bool SequencedWorkerPool::Inner::PostTask(
688 const std::string* optional_token_name, 676 const std::string* optional_token_name,
689 SequenceToken sequence_token, 677 SequenceToken sequence_token,
690 WorkerShutdown shutdown_behavior, 678 WorkerShutdown shutdown_behavior,
691 const tracked_objects::Location& from_here, 679 const tracked_objects::Location& from_here,
692 const Closure& task, 680 const Closure& task,
693 TimeDelta delay) { 681 TimeDelta delay) {
682 DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
683 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
684 debug::DumpWithoutCrashing();
685
694 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 686 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
695 SequencedTask sequenced(from_here); 687 SequencedTask sequenced(from_here);
696 sequenced.sequence_token_id = sequence_token.id_; 688 sequenced.sequence_token_id = sequence_token.id_;
697 sequenced.shutdown_behavior = shutdown_behavior; 689 sequenced.shutdown_behavior = shutdown_behavior;
698 sequenced.posted_from = from_here; 690 sequenced.posted_from = from_here;
699 sequenced.task = 691 sequenced.task =
700 shutdown_behavior == BLOCK_SHUTDOWN ? 692 shutdown_behavior == BLOCK_SHUTDOWN ?
701 base::MakeCriticalClosure(task) : task; 693 base::MakeCriticalClosure(task) : task;
702 sequenced.time_to_run = TimeTicks::Now() + delay; 694 sequenced.time_to_run = TimeTicks::Now() + delay;
703 695
(...skipping 29 matching lines...) Expand all
733 "SequencedWorkerPool::Inner::PostTask", 725 "SequencedWorkerPool::Inner::PostTask",
734 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 726 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
735 TRACE_EVENT_FLAG_FLOW_OUT); 727 TRACE_EVENT_FLAG_FLOW_OUT);
736 728
737 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 729 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
738 730
739 // Now that we have the lock, apply the named token rules. 731 // Now that we have the lock, apply the named token rules.
740 if (optional_token_name) 732 if (optional_token_name)
741 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 733 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
742 734
743 if (subtle::NoBarrier_Load(&g_all_pools_state) == 735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
744 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
745 if (!PostTaskToTaskScheduler(sequenced, delay)) 736 if (!PostTaskToTaskScheduler(sequenced, delay))
746 return false; 737 return false;
747 } else { 738 } else {
748 pending_tasks_.insert(sequenced); 739 pending_tasks_.insert(sequenced);
749 740
750 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) 741 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
751 blocking_shutdown_pending_task_count_++; 742 blocking_shutdown_pending_task_count_++;
752 743
753 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 744 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
754 } 745 }
755 } 746 }
756 747
757 if (subtle::NoBarrier_Load(&g_all_pools_state) != 748 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure
758 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 749 // correct behavior if a task is posted to a SequencedWorkerPool before
750 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
751 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
759 // Actually start the additional thread or signal an existing one outside 752 // Actually start the additional thread or signal an existing one outside
760 // the lock. 753 // the lock.
761 if (create_thread_id) 754 if (create_thread_id)
762 FinishStartingAdditionalThread(create_thread_id); 755 FinishStartingAdditionalThread(create_thread_id);
763 else 756 else
764 SignalHasWork(); 757 SignalHasWork();
765 } 758 }
766 759
767 #if DCHECK_IS_ON() 760 #if DCHECK_IS_ON()
768 { 761 {
769 AutoLock lock_for_dcheck(lock_); 762 AutoLock lock_for_dcheck(lock_);
770 // Some variables are exposed in both modes for convenience but only really 763 // Some variables are exposed in both modes for convenience but only really
771 // intended for one of them at runtime, confirm exclusive usage here. 764 // intended for one of them at runtime, confirm exclusive usage here.
772 if (subtle::NoBarrier_Load(&g_all_pools_state) == 765 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
773 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
774 DCHECK(pending_tasks_.empty()); 766 DCHECK(pending_tasks_.empty());
775 DCHECK_EQ(0, create_thread_id); 767 DCHECK_EQ(0, create_thread_id);
776 } else { 768 } else {
777 DCHECK(sequenced_task_runner_map_.empty()); 769 DCHECK(sequenced_task_runner_map_.empty());
778 } 770 }
779 } 771 }
780 #endif // DCHECK_IS_ON() 772 #endif // DCHECK_IS_ON()
781 773
782 return true; 774 return true;
783 } 775 }
784 776
785 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 777 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
786 const SequencedTask& sequenced, 778 const SequencedTask& sequenced,
787 const TimeDelta& delay) { 779 const TimeDelta& delay) {
788 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 780 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
789 subtle::NoBarrier_Load(&g_all_pools_state));
790 781
791 lock_.AssertAcquired(); 782 lock_.AssertAcquired();
792 783
793 // Confirm that the TaskScheduler's shutdown behaviors use the same 784 // Confirm that the TaskScheduler's shutdown behaviors use the same
794 // underlying values as SequencedWorkerPool. 785 // underlying values as SequencedWorkerPool.
795 static_assert( 786 static_assert(
796 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 787 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
797 static_cast<int>(CONTINUE_ON_SHUTDOWN), 788 static_cast<int>(CONTINUE_ON_SHUTDOWN),
798 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 789 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
799 "CONTINUE_ON_SHUTDOWN."); 790 "CONTINUE_ON_SHUTDOWN.");
(...skipping 13 matching lines...) Expand all
813 .WithPriority(task_priority_) 804 .WithPriority(task_priority_)
814 .WithShutdownBehavior(task_shutdown_behavior); 805 .WithShutdownBehavior(task_shutdown_behavior);
815 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) 806 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits)
816 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); 807 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay);
817 } 808 }
818 809
819 scoped_refptr<TaskRunner> 810 scoped_refptr<TaskRunner>
820 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( 811 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
821 int sequence_token_id, 812 int sequence_token_id,
822 const TaskTraits& traits) { 813 const TaskTraits& traits) {
823 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 814 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
824 subtle::NoBarrier_Load(&g_all_pools_state));
825 815
826 lock_.AssertAcquired(); 816 lock_.AssertAcquired();
827 817
828 static_assert( 818 static_assert(
829 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, 819 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0,
830 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " 820 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be "
831 "used as an index in |unsequenced_task_runners_|."); 821 "used as an index in |unsequenced_task_runners_|.");
832 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, 822 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1,
833 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " 823 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to "
834 "be used as an index in |unsequenced_task_runners_|."); 824 "be used as an index in |unsequenced_task_runners_|.");
(...skipping 16 matching lines...) Expand all
851 ExecutionMode execution_mode = 841 ExecutionMode execution_mode =
852 sequence_token_id ? ExecutionMode::SEQUENCED : ExecutionMode::PARALLEL; 842 sequence_token_id ? ExecutionMode::SEQUENCED : ExecutionMode::PARALLEL;
853 task_runner = CreateTaskRunnerWithTraits(traits, execution_mode); 843 task_runner = CreateTaskRunnerWithTraits(traits, execution_mode);
854 } 844 }
855 845
856 return task_runner; 846 return task_runner;
857 } 847 }
858 848
859 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 849 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
860 AutoLock lock(lock_); 850 AutoLock lock(lock_);
861 if (subtle::NoBarrier_Load(&g_all_pools_state) == 851 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
862 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
863 if (!runs_tasks_on_verifier_) { 852 if (!runs_tasks_on_verifier_) {
864 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( 853 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
865 TaskTraits().WithFileIO().WithPriority(task_priority_), 854 TaskTraits().WithFileIO().WithPriority(task_priority_),
866 ExecutionMode::PARALLEL); 855 ExecutionMode::PARALLEL);
867 } 856 }
868 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); 857 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
869 } else { 858 } else {
870 return ContainsKey(threads_, PlatformThread::CurrentId()); 859 return ContainsKey(threads_, PlatformThread::CurrentId());
871 } 860 }
872 } 861 }
873 862
874 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 863 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
875 SequenceToken sequence_token) const { 864 SequenceToken sequence_token) const {
876 DCHECK(sequence_token.IsValid()); 865 DCHECK(sequence_token.IsValid());
877 866
878 AutoLock lock(lock_); 867 AutoLock lock(lock_);
879 868
880 if (subtle::NoBarrier_Load(&g_all_pools_state) == 869 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
881 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
882 const auto sequenced_task_runner_it = 870 const auto sequenced_task_runner_it =
883 sequenced_task_runner_map_.find(sequence_token.id_); 871 sequenced_task_runner_map_.find(sequence_token.id_);
884 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && 872 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
885 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); 873 sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
886 } else { 874 } else {
887 ThreadMap::const_iterator found = 875 ThreadMap::const_iterator found =
888 threads_.find(PlatformThread::CurrentId()); 876 threads_.find(PlatformThread::CurrentId());
889 return found != threads_.end() && found->second->is_processing_task() && 877 return found != threads_.end() && found->second->is_processing_task() &&
890 sequence_token.Equals(found->second->task_sequence_token()); 878 sequence_token.Equals(found->second->task_sequence_token());
891 } 879 }
892 } 880 }
893 881
894 // See https://code.google.com/p/chromium/issues/detail?id=168415 882 // See https://code.google.com/p/chromium/issues/detail?id=168415
895 void SequencedWorkerPool::Inner::CleanupForTesting() { 883 void SequencedWorkerPool::Inner::CleanupForTesting() {
896 DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state), 884 DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
897 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
898 AutoLock lock(lock_); 885 AutoLock lock(lock_);
899 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 886 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
900 if (shutdown_called_) 887 if (shutdown_called_)
901 return; 888 return;
902 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 889 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
903 return; 890 return;
904 cleanup_state_ = CLEANUP_REQUESTED; 891 cleanup_state_ = CLEANUP_REQUESTED;
905 cleanup_idlers_ = 0; 892 cleanup_idlers_ = 0;
906 has_work_cv_.Signal(); 893 has_work_cv_.Signal();
907 while (cleanup_state_ != CLEANUP_DONE) 894 while (cleanup_state_ != CLEANUP_DONE)
(...skipping 10 matching lines...) Expand all
918 { 905 {
919 AutoLock lock(lock_); 906 AutoLock lock(lock_);
920 // Cleanup and Shutdown should not be called concurrently. 907 // Cleanup and Shutdown should not be called concurrently.
921 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 908 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
922 if (shutdown_called_) 909 if (shutdown_called_)
923 return; 910 return;
924 shutdown_called_ = true; 911 shutdown_called_ = true;
925 912
926 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 913 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
927 914
928 if (subtle::NoBarrier_Load(&g_all_pools_state) != 915 if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL)
929 AllPoolsState::WORKER_CREATED) {
930 return; 916 return;
931 }
932 917
933 // Tickle the threads. This will wake up a waiting one so it will know that 918 // Tickle the threads. This will wake up a waiting one so it will know that
934 // it can exit, which in turn will wake up any other waiting ones. 919 // it can exit, which in turn will wake up any other waiting ones.
935 SignalHasWork(); 920 SignalHasWork();
936 921
937 // There are no pending or running tasks blocking shutdown, we're done. 922 // There are no pending or running tasks blocking shutdown, we're done.
938 if (CanShutdown()) 923 if (CanShutdown())
939 return; 924 return;
940 } 925 }
941 926
(...skipping 18 matching lines...) Expand all
960 TimeTicks::Now() - shutdown_wait_begin); 945 TimeTicks::Now() - shutdown_wait_begin);
961 #endif 946 #endif
962 } 947 }
963 948
964 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 949 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
965 AutoLock lock(lock_); 950 AutoLock lock(lock_);
966 return shutdown_called_; 951 return shutdown_called_;
967 } 952 }
968 953
969 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 954 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
970 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 955 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
971 subtle::NoBarrier_Load(&g_all_pools_state));
972 { 956 {
973 AutoLock lock(lock_); 957 AutoLock lock(lock_);
974 DCHECK(thread_being_created_); 958 DCHECK(thread_being_created_);
975 thread_being_created_ = false; 959 thread_being_created_ = false;
976 auto result = threads_.insert( 960 auto result = threads_.insert(
977 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 961 std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
978 DCHECK(result.second); 962 DCHECK(result.second);
979 963
980 while (true) { 964 while (true) {
981 #if defined(OS_MACOSX) 965 #if defined(OS_MACOSX)
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
1093 1077
1094 // We noticed we should exit. Wake up the next worker so it knows it should 1078 // We noticed we should exit. Wake up the next worker so it knows it should
1095 // exit as well (because the Shutdown() code only signals once). 1079 // exit as well (because the Shutdown() code only signals once).
1096 SignalHasWork(); 1080 SignalHasWork();
1097 1081
1098 // Possibly unblock shutdown. 1082 // Possibly unblock shutdown.
1099 can_shutdown_cv_.Signal(); 1083 can_shutdown_cv_.Signal();
1100 } 1084 }
1101 1085
1102 void SequencedWorkerPool::Inner::HandleCleanup() { 1086 void SequencedWorkerPool::Inner::HandleCleanup() {
1103 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 1087 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1104 subtle::NoBarrier_Load(&g_all_pools_state));
1105 1088
1106 lock_.AssertAcquired(); 1089 lock_.AssertAcquired();
1107 if (cleanup_state_ == CLEANUP_DONE) 1090 if (cleanup_state_ == CLEANUP_DONE)
1108 return; 1091 return;
1109 if (cleanup_state_ == CLEANUP_REQUESTED) { 1092 if (cleanup_state_ == CLEANUP_REQUESTED) {
1110 // We win, we get to do the cleanup as soon as the others wise up and idle. 1093 // We win, we get to do the cleanup as soon as the others wise up and idle.
1111 cleanup_state_ = CLEANUP_STARTING; 1094 cleanup_state_ = CLEANUP_STARTING;
1112 while (thread_being_created_ || 1095 while (thread_being_created_ ||
1113 cleanup_idlers_ != threads_.size() - 1) { 1096 cleanup_idlers_ != threads_.size() - 1) {
1114 has_work_cv_.Signal(); 1097 has_work_cv_.Signal();
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
1161 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1144 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
1162 lock_.AssertAcquired(); 1145 lock_.AssertAcquired();
1163 // We assume that we never create enough tasks to wrap around. 1146 // We assume that we never create enough tasks to wrap around.
1164 return next_sequence_task_number_++; 1147 return next_sequence_task_number_++;
1165 } 1148 }
1166 1149
1167 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1150 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
1168 SequencedTask* task, 1151 SequencedTask* task,
1169 TimeDelta* wait_time, 1152 TimeDelta* wait_time,
1170 std::vector<Closure>* delete_these_outside_lock) { 1153 std::vector<Closure>* delete_these_outside_lock) {
1171 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 1154 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1172 subtle::NoBarrier_Load(&g_all_pools_state));
1173 1155
1174 lock_.AssertAcquired(); 1156 lock_.AssertAcquired();
1175 1157
1176 // Find the next task with a sequence token that's not currently in use. 1158 // Find the next task with a sequence token that's not currently in use.
1177 // If the token is in use, that means another thread is running something 1159 // If the token is in use, that means another thread is running something
1178 // in that sequence, and we can't run it without going out-of-order. 1160 // in that sequence, and we can't run it without going out-of-order.
1179 // 1161 //
1180 // This algorithm is simple and fair, but inefficient in some cases. For 1162 // This algorithm is simple and fair, but inefficient in some cases. For
1181 // example, say somebody schedules 1000 slow tasks with the same sequence 1163 // example, say somebody schedules 1000 slow tasks with the same sequence
1182 // number. We'll have to go through all those tasks each time we feel like 1164 // number. We'll have to go through all those tasks each time we feel like
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
1250 } 1232 }
1251 1233
1252 status = GET_WORK_FOUND; 1234 status = GET_WORK_FOUND;
1253 break; 1235 break;
1254 } 1236 }
1255 1237
1256 return status; 1238 return status;
1257 } 1239 }
1258 1240
1259 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1241 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1260 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 1242 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1261 subtle::NoBarrier_Load(&g_all_pools_state));
1262 1243
1263 lock_.AssertAcquired(); 1244 lock_.AssertAcquired();
1264 1245
1265 // Mark the task's sequence number as in use. 1246 // Mark the task's sequence number as in use.
1266 if (task.sequence_token_id) 1247 if (task.sequence_token_id)
1267 current_sequences_.insert(task.sequence_token_id); 1248 current_sequences_.insert(task.sequence_token_id);
1268 1249
1269 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1250 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1270 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1251 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1271 // completes. 1252 // completes.
(...skipping 12 matching lines...) Expand all
1284 // if there is one waiting to pick up the next task. 1265 // if there is one waiting to pick up the next task.
1285 // 1266 //
1286 // Note that we really need to do this *before* running the task, not 1267 // Note that we really need to do this *before* running the task, not
1287 // after. Otherwise, if more than one task is posted, the creation of the 1268 // after. Otherwise, if more than one task is posted, the creation of the
1288 // second thread (since we only create one at a time) will be blocked by 1269 // second thread (since we only create one at a time) will be blocked by
1289 // the execution of the first task, which could be arbitrarily long. 1270 // the execution of the first task, which could be arbitrarily long.
1290 return PrepareToStartAdditionalThreadIfHelpful(); 1271 return PrepareToStartAdditionalThreadIfHelpful();
1291 } 1272 }
1292 1273
1293 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1274 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1294 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 1275 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1295 subtle::NoBarrier_Load(&g_all_pools_state));
1296 1276
1297 lock_.AssertAcquired(); 1277 lock_.AssertAcquired();
1298 1278
1299 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1279 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1300 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1280 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1301 blocking_shutdown_thread_count_--; 1281 blocking_shutdown_thread_count_--;
1302 } 1282 }
1303 1283
1304 if (task.sequence_token_id) 1284 if (task.sequence_token_id)
1305 current_sequences_.erase(task.sequence_token_id); 1285 current_sequences_.erase(task.sequence_token_id);
1306 } 1286 }
1307 1287
1308 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1288 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1309 int sequence_token_id) const { 1289 int sequence_token_id) const {
1310 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 1290 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1311 subtle::NoBarrier_Load(&g_all_pools_state));
1312 1291
1313 lock_.AssertAcquired(); 1292 lock_.AssertAcquired();
1314 return !sequence_token_id || 1293 return !sequence_token_id ||
1315 current_sequences_.find(sequence_token_id) == 1294 current_sequences_.find(sequence_token_id) ==
1316 current_sequences_.end(); 1295 current_sequences_.end();
1317 } 1296 }
1318 1297
1319 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1298 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1320 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 1299 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1321 subtle::NoBarrier_Load(&g_all_pools_state));
1322 1300
1323 lock_.AssertAcquired(); 1301 lock_.AssertAcquired();
1324 // How thread creation works: 1302 // How thread creation works:
1325 // 1303 //
1326 // We'de like to avoid creating threads with the lock held. However, we 1304 // We'de like to avoid creating threads with the lock held. However, we
1327 // need to be sure that we have an accurate accounting of the threads for 1305 // need to be sure that we have an accurate accounting of the threads for
1328 // proper Joining and deltion on shutdown. 1306 // proper Joining and deltion on shutdown.
1329 // 1307 //
1330 // We need to figure out if we need another thread with the lock held, which 1308 // We need to figure out if we need another thread with the lock held, which
1331 // is what this function does. It then marks us as in the process of creating 1309 // is what this function does. It then marks us as in the process of creating
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1363 thread_being_created_ = true; 1341 thread_being_created_ = true;
1364 return static_cast<int>(threads_.size() + 1); 1342 return static_cast<int>(threads_.size() + 1);
1365 } 1343 }
1366 } 1344 }
1367 } 1345 }
1368 return 0; 1346 return 0;
1369 } 1347 }
1370 1348
1371 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1349 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1372 int thread_number) { 1350 int thread_number) {
1373 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 1351 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1374 subtle::NoBarrier_Load(&g_all_pools_state));
1375 1352
1376 // Called outside of the lock. 1353 // Called outside of the lock.
1377 DCHECK_GT(thread_number, 0); 1354 DCHECK_GT(thread_number, 0);
1378 1355
1379 if (subtle::NoBarrier_Load(&g_all_pools_state) !=
1380 AllPoolsState::WORKER_CREATED) {
1381 DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
1382 subtle::NoBarrier_Load(&g_all_pools_state));
1383 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED);
1384 }
1385
1386 // The worker is assigned to the list when the thread actually starts, which 1356 // The worker is assigned to the list when the thread actually starts, which
1387 // will manage the memory of the pointer. 1357 // will manage the memory of the pointer.
1388 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1358 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1389 } 1359 }
1390 1360
1391 void SequencedWorkerPool::Inner::SignalHasWork() { 1361 void SequencedWorkerPool::Inner::SignalHasWork() {
1392 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, 1362 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1393 subtle::NoBarrier_Load(&g_all_pools_state));
1394 1363
1395 has_work_cv_.Signal(); 1364 has_work_cv_.Signal();
1396 if (testing_observer_) { 1365 if (testing_observer_) {
1397 testing_observer_->OnHasWork(); 1366 testing_observer_->OnHasWork();
1398 } 1367 }
1399 } 1368 }
1400 1369
1401 bool SequencedWorkerPool::Inner::CanShutdown() const { 1370 bool SequencedWorkerPool::Inner::CanShutdown() const {
1402 DCHECK_EQ(AllPoolsState::WORKER_CREATED, 1371 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
1403 subtle::NoBarrier_Load(&g_all_pools_state));
1404 lock_.AssertAcquired(); 1372 lock_.AssertAcquired();
1405 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1373 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1406 return !thread_being_created_ && 1374 return !thread_being_created_ &&
1407 blocking_shutdown_thread_count_ == 0 && 1375 blocking_shutdown_thread_count_ == 0 &&
1408 blocking_shutdown_pending_task_count_ == 0; 1376 blocking_shutdown_pending_task_count_ == 0;
1409 } 1377 }
1410 1378
1411 base::StaticAtomicSequenceNumber 1379 base::StaticAtomicSequenceNumber
1412 SequencedWorkerPool::Inner::g_last_sequence_number_; 1380 SequencedWorkerPool::Inner::g_last_sequence_number_;
1413 1381
(...skipping 17 matching lines...) Expand all
1431 scoped_refptr<SequencedWorkerPool> 1399 scoped_refptr<SequencedWorkerPool>
1432 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1400 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1433 Worker* worker = Worker::GetForCurrentThread(); 1401 Worker* worker = Worker::GetForCurrentThread();
1434 if (!worker) 1402 if (!worker)
1435 return nullptr; 1403 return nullptr;
1436 1404
1437 return worker->worker_pool(); 1405 return worker->worker_pool();
1438 } 1406 }
1439 1407
1440 // static 1408 // static
1441 void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { 1409 void SequencedWorkerPool::EnableForProcess() {
gab 2016/11/01 16:09:47 DCHECK_EQ(POST_TASK_DISABLED, ...)?
fdoray 2016/11/01 20:40:28 Done.
1442 DCHECK(TaskScheduler::GetInstance()); 1410 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
1443 // Hitting this DCHECK indicates that a task was posted to a 1411 g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
1444 // SequencedWorkerPool before the TaskScheduler was initialized and
1445 // redirected, posting task to SequencedWorkerPools needs to at least be
1446 // delayed until after that point.
1447 DCHECK_EQ(AllPoolsState::NONE_ACTIVE,
1448 subtle::NoBarrier_Load(&g_all_pools_state));
1449 subtle::NoBarrier_Store(&g_all_pools_state,
1450 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
1451 } 1412 }
1452 1413
1453 // static 1414 // static
1454 void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { 1415 void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() {
1455 // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER 1416 DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
gab 2016/11/01 16:09:47 Also DCHECK(TaskScheduler::GetInstance()); as pe
fdoray 2016/11/01 20:40:28 Done.
1456 // to stop redirecting tasks. It can also be called when the current state is 1417 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
1457 // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called 1418 }
1458 // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has 1419
1459 // been created if this isn't called). 1420 // static
1460 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE); 1421 void SequencedWorkerPool::DisableForProcessForTesting() {
1422 g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
1461 } 1423 }
1462 1424
1463 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1425 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1464 const std::string& thread_name_prefix, 1426 const std::string& thread_name_prefix,
1465 base::TaskPriority task_priority) 1427 base::TaskPriority task_priority)
1466 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1428 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1467 inner_(new Inner(this, 1429 inner_(new Inner(this,
1468 max_threads, 1430 max_threads,
1469 thread_name_prefix, 1431 thread_name_prefix,
1470 task_priority, 1432 task_priority,
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after
1589 return PostDelayedWorkerTask(from_here, task, delay); 1551 return PostDelayedWorkerTask(from_here, task, delay);
1590 } 1552 }
1591 1553
1592 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1554 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1593 return inner_->RunsTasksOnCurrentThread(); 1555 return inner_->RunsTasksOnCurrentThread();
1594 } 1556 }
1595 1557
1596 void SequencedWorkerPool::FlushForTesting() { 1558 void SequencedWorkerPool::FlushForTesting() {
1597 DCHECK(!RunsTasksOnCurrentThread()); 1559 DCHECK(!RunsTasksOnCurrentThread());
1598 base::ThreadRestrictions::ScopedAllowWait allow_wait; 1560 base::ThreadRestrictions::ScopedAllowWait allow_wait;
1599 if (subtle::NoBarrier_Load(&g_all_pools_state) == 1561 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
1600 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
1601 // TODO(gab): Remove this if http://crbug.com/622400 fails. 1562 // TODO(gab): Remove this if http://crbug.com/622400 fails.
1602 TaskScheduler::GetInstance()->FlushForTesting(); 1563 TaskScheduler::GetInstance()->FlushForTesting();
1603 } else { 1564 } else {
1604 inner_->CleanupForTesting(); 1565 inner_->CleanupForTesting();
1605 } 1566 }
1606 } 1567 }
1607 1568
1608 void SequencedWorkerPool::SignalHasWorkForTesting() { 1569 void SequencedWorkerPool::SignalHasWorkForTesting() {
1609 inner_->SignalHasWorkForTesting(); 1570 inner_->SignalHasWorkForTesting();
1610 } 1571 }
1611 1572
1612 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1573 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1613 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1574 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1614 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1575 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1615 } 1576 }
1616 1577
1617 bool SequencedWorkerPool::IsShutdownInProgress() { 1578 bool SequencedWorkerPool::IsShutdownInProgress() {
1618 return inner_->IsShutdownInProgress(); 1579 return inner_->IsShutdownInProgress();
1619 } 1580 }
1620 1581
1621 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1582 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1622 SequenceToken sequence_token) const { 1583 SequenceToken sequence_token) const {
1623 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1584 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1624 } 1585 }
1625 1586
1626 } // namespace base 1587 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698