| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <list> | 9 #include <list> |
| 10 #include <map> | 10 #include <map> |
| 11 #include <memory> | 11 #include <memory> |
| 12 #include <set> | 12 #include <set> |
| 13 #include <utility> | 13 #include <utility> |
| 14 #include <vector> | 14 #include <vector> |
| 15 | 15 |
| 16 #include "base/atomic_sequence_num.h" | 16 #include "base/atomic_sequence_num.h" |
| 17 #include "base/atomicops.h" |
| 17 #include "base/callback.h" | 18 #include "base/callback.h" |
| 18 #include "base/compiler_specific.h" | 19 #include "base/compiler_specific.h" |
| 19 #include "base/critical_closure.h" | 20 #include "base/critical_closure.h" |
| 20 #include "base/lazy_instance.h" | 21 #include "base/lazy_instance.h" |
| 21 #include "base/logging.h" | 22 #include "base/logging.h" |
| 22 #include "base/macros.h" | 23 #include "base/macros.h" |
| 23 #include "base/memory/ptr_util.h" | 24 #include "base/memory/ptr_util.h" |
| 24 #include "base/stl_util.h" | 25 #include "base/stl_util.h" |
| 25 #include "base/strings/stringprintf.h" | 26 #include "base/strings/stringprintf.h" |
| 26 #include "base/synchronization/condition_variable.h" | 27 #include "base/synchronization/condition_variable.h" |
| (...skipping 24 matching lines...) Expand all Loading... |
| 51 namespace base { | 52 namespace base { |
| 52 | 53 |
| 53 namespace { | 54 namespace { |
| 54 | 55 |
| 55 // An enum representing the state of all pools. Any given process should only | 56 // An enum representing the state of all pools. Any given process should only |
| 56 // ever transition from NONE_ACTIVE to the active states, transitions between | 57 // ever transition from NONE_ACTIVE to the active states, transitions between |
| 57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition | 58 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition |
| 58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called | 59 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called |
| 59 // and the WORKER_CREATED transition occurs when a Worker needs to be created | 60 // and the WORKER_CREATED transition occurs when a Worker needs to be created |
| 60 // because the first task was posted and the state is still NONE_ACTIVE. | 61 // because the first task was posted and the state is still NONE_ACTIVE. |
| 62 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race |
| 63 // between reads/writes, strict memory ordering isn't required per no other |
| 64 // state being inferred from its value. Explicit synchronization (e.g. locks or |
| 65 // events) would be overkill (it's fine for other threads to still see |
| 66 // NONE_ACTIVE after the first Worker was created -- this is not possible for |
| 67 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no |
| 68 // other threads are active). |
| 61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool | 69 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
| 62 // will be phased out completely otherwise). | 70 // will be phased out completely otherwise). |
| 63 enum class AllPoolsState { | 71 enum AllPoolsState : subtle::Atomic32 { |
| 64 NONE_ACTIVE, | 72 NONE_ACTIVE, |
| 65 WORKER_CREATED, | 73 WORKER_CREATED, |
| 66 REDIRECTED_TO_TASK_SCHEDULER, | 74 REDIRECTED_TO_TASK_SCHEDULER, |
| 67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; | 75 }; |
| 76 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
| 68 | 77 |
| 69 struct SequencedTask : public TrackingInfo { | 78 struct SequencedTask : public TrackingInfo { |
| 70 SequencedTask() | 79 SequencedTask() |
| 71 : sequence_token_id(0), | 80 : sequence_token_id(0), |
| 72 trace_id(0), | 81 trace_id(0), |
| 73 sequence_task_number(0), | 82 sequence_task_number(0), |
| 74 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 83 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
| 75 | 84 |
| 76 explicit SequencedTask(const tracked_objects::Location& from_here) | 85 explicit SequencedTask(const tracked_objects::Location& from_here) |
| 77 : base::TrackingInfo(from_here, TimeTicks()), | 86 : base::TrackingInfo(from_here, TimeTicks()), |
| (...skipping 467 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 545 // Worker definitions --------------------------------------------------------- | 554 // Worker definitions --------------------------------------------------------- |
| 546 | 555 |
| 547 SequencedWorkerPool::Worker::Worker( | 556 SequencedWorkerPool::Worker::Worker( |
| 548 scoped_refptr<SequencedWorkerPool> worker_pool, | 557 scoped_refptr<SequencedWorkerPool> worker_pool, |
| 549 int thread_number, | 558 int thread_number, |
| 550 const std::string& prefix) | 559 const std::string& prefix) |
| 551 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 560 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
| 552 worker_pool_(std::move(worker_pool)), | 561 worker_pool_(std::move(worker_pool)), |
| 553 task_shutdown_behavior_(BLOCK_SHUTDOWN), | 562 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| 554 is_processing_task_(false) { | 563 is_processing_task_(false) { |
| 555 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 564 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 565 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 556 Start(); | 566 Start(); |
| 557 } | 567 } |
| 558 | 568 |
| 559 SequencedWorkerPool::Worker::~Worker() { | 569 SequencedWorkerPool::Worker::~Worker() { |
| 560 } | 570 } |
| 561 | 571 |
| 562 void SequencedWorkerPool::Worker::Run() { | 572 void SequencedWorkerPool::Worker::Run() { |
| 563 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 573 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 574 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 564 | 575 |
| 565 #if defined(OS_WIN) | 576 #if defined(OS_WIN) |
| 566 win::ScopedCOMInitializer com_initializer; | 577 win::ScopedCOMInitializer com_initializer; |
| 567 #endif | 578 #endif |
| 568 | 579 |
| 569 // Store a pointer to this worker in thread local storage for static function | 580 // Store a pointer to this worker in thread local storage for static function |
| 570 // access. | 581 // access. |
| 571 DCHECK(!lazy_tls_ptr_.Get().Get()); | 582 DCHECK(!lazy_tls_ptr_.Get().Get()); |
| 572 lazy_tls_ptr_.Get().Set(this); | 583 lazy_tls_ptr_.Get().Set(this); |
| 573 | 584 |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 699 "SequencedWorkerPool::Inner::PostTask", | 710 "SequencedWorkerPool::Inner::PostTask", |
| 700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), | 711 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
| 701 TRACE_EVENT_FLAG_FLOW_OUT); | 712 TRACE_EVENT_FLAG_FLOW_OUT); |
| 702 | 713 |
| 703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); | 714 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
| 704 | 715 |
| 705 // Now that we have the lock, apply the named token rules. | 716 // Now that we have the lock, apply the named token rules. |
| 706 if (optional_token_name) | 717 if (optional_token_name) |
| 707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | 718 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| 708 | 719 |
| 709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 720 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 721 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 710 PostTaskToTaskScheduler(sequenced); | 722 PostTaskToTaskScheduler(sequenced); |
| 711 } else { | 723 } else { |
| 712 pending_tasks_.insert(sequenced); | 724 pending_tasks_.insert(sequenced); |
| 713 | 725 |
| 714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) | 726 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
| 715 blocking_shutdown_pending_task_count_++; | 727 blocking_shutdown_pending_task_count_++; |
| 716 | 728 |
| 717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 729 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| 718 } | 730 } |
| 719 } | 731 } |
| 720 | 732 |
| 721 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 733 if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| 734 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 722 // Actually start the additional thread or signal an existing one outside | 735 // Actually start the additional thread or signal an existing one outside |
| 723 // the lock. | 736 // the lock. |
| 724 if (create_thread_id) | 737 if (create_thread_id) |
| 725 FinishStartingAdditionalThread(create_thread_id); | 738 FinishStartingAdditionalThread(create_thread_id); |
| 726 else | 739 else |
| 727 SignalHasWork(); | 740 SignalHasWork(); |
| 728 } | 741 } |
| 729 | 742 |
| 730 #if DCHECK_IS_ON() | 743 #if DCHECK_IS_ON() |
| 731 { | 744 { |
| 732 AutoLock lock_for_dcheck(lock_); | 745 AutoLock lock_for_dcheck(lock_); |
| 733 // Some variables are exposed in both modes for convenience but only really | 746 // Some variables are exposed in both modes for convenience but only really |
| 734 // intended for one of them at runtime, confirm exclusive usage here. | 747 // intended for one of them at runtime, confirm exclusive usage here. |
| 735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 748 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 749 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 736 DCHECK(pending_tasks_.empty()); | 750 DCHECK(pending_tasks_.empty()); |
| 737 DCHECK_EQ(0, create_thread_id); | 751 DCHECK_EQ(0, create_thread_id); |
| 738 } else { | 752 } else { |
| 739 DCHECK(sequenced_task_runner_map_.empty()); | 753 DCHECK(sequenced_task_runner_map_.empty()); |
| 740 } | 754 } |
| 741 } | 755 } |
| 742 #endif // DCHECK_IS_ON() | 756 #endif // DCHECK_IS_ON() |
| 743 | 757 |
| 744 return true; | 758 return true; |
| 745 } | 759 } |
| 746 | 760 |
| 747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | 761 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| 748 const SequencedTask& sequenced) { | 762 const SequencedTask& sequenced) { |
| 749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 763 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 764 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 750 | 765 |
| 751 lock_.AssertAcquired(); | 766 lock_.AssertAcquired(); |
| 752 | 767 |
| 753 // Confirm that the TaskScheduler's shutdown behaviors use the same | 768 // Confirm that the TaskScheduler's shutdown behaviors use the same |
| 754 // underlying values as SequencedWorkerPool. | 769 // underlying values as SequencedWorkerPool. |
| 755 static_assert( | 770 static_assert( |
| 756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == | 771 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
| 757 static_cast<int>(CONTINUE_ON_SHUTDOWN), | 772 static_cast<int>(CONTINUE_ON_SHUTDOWN), |
| 758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 773 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| 759 "CONTINUE_ON_SHUTDOWN."); | 774 "CONTINUE_ON_SHUTDOWN."); |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() | 816 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() |
| 802 // directly. | 817 // directly. |
| 803 DCHECK_GT(max_threads_, 1U); | 818 DCHECK_GT(max_threads_, 1U); |
| 804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, | 819 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, |
| 805 sequenced.task); | 820 sequenced.task); |
| 806 } | 821 } |
| 807 } | 822 } |
| 808 | 823 |
| 809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 824 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| 810 AutoLock lock(lock_); | 825 AutoLock lock(lock_); |
| 811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 826 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 827 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 812 if (!runs_tasks_on_verifier_) { | 828 if (!runs_tasks_on_verifier_) { |
| 813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | 829 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| 814 TaskTraits().WithFileIO().WithPriority(task_priority_), | 830 TaskTraits().WithFileIO().WithPriority(task_priority_), |
| 815 ExecutionMode::PARALLEL); | 831 ExecutionMode::PARALLEL); |
| 816 } | 832 } |
| 817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | 833 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| 818 } else { | 834 } else { |
| 819 return ContainsKey(threads_, PlatformThread::CurrentId()); | 835 return ContainsKey(threads_, PlatformThread::CurrentId()); |
| 820 } | 836 } |
| 821 } | 837 } |
| 822 | 838 |
| 823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 839 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 824 SequenceToken sequence_token) const { | 840 SequenceToken sequence_token) const { |
| 825 AutoLock lock(lock_); | 841 AutoLock lock(lock_); |
| 826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 842 if (subtle::NoBarrier_Load(&g_all_pools_state) == |
| 843 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 827 // TODO(gab): This currently only verifies that the current thread is a | 844 // TODO(gab): This currently only verifies that the current thread is a |
| 828 // thread on which a task bound to |sequence_token| *could* run, but it | 845 // thread on which a task bound to |sequence_token| *could* run, but it |
| 829 // doesn't verify that the current is *currently running* a task bound to | 846 // doesn't verify that the current is *currently running* a task bound to |
| 830 // |sequence_token|. | 847 // |sequence_token|. |
| 831 const auto sequenced_task_runner_it = | 848 const auto sequenced_task_runner_it = |
| 832 sequenced_task_runner_map_.find(sequence_token.id_); | 849 sequenced_task_runner_map_.find(sequence_token.id_); |
| 833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && | 850 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| 834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); | 851 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
| 835 } else { | 852 } else { |
| 836 ThreadMap::const_iterator found = | 853 ThreadMap::const_iterator found = |
| (...skipping 30 matching lines...) Expand all Loading... |
| 867 int max_new_blocking_tasks_after_shutdown) { | 884 int max_new_blocking_tasks_after_shutdown) { |
| 868 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); | 885 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
| 869 { | 886 { |
| 870 AutoLock lock(lock_); | 887 AutoLock lock(lock_); |
| 871 // Cleanup and Shutdown should not be called concurrently. | 888 // Cleanup and Shutdown should not be called concurrently. |
| 872 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 889 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 873 if (shutdown_called_) | 890 if (shutdown_called_) |
| 874 return; | 891 return; |
| 875 shutdown_called_ = true; | 892 shutdown_called_ = true; |
| 876 | 893 |
| 877 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) | 894 if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| 895 AllPoolsState::WORKER_CREATED) |
| 878 return; | 896 return; |
| 879 | 897 |
| 880 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 898 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| 881 | 899 |
| 882 // Tickle the threads. This will wake up a waiting one so it will know that | 900 // Tickle the threads. This will wake up a waiting one so it will know that |
| 883 // it can exit, which in turn will wake up any other waiting ones. | 901 // it can exit, which in turn will wake up any other waiting ones. |
| 884 SignalHasWork(); | 902 SignalHasWork(); |
| 885 | 903 |
| 886 // There are no pending or running tasks blocking shutdown, we're done. | 904 // There are no pending or running tasks blocking shutdown, we're done. |
| 887 if (CanShutdown()) | 905 if (CanShutdown()) |
| (...skipping 21 matching lines...) Expand all Loading... |
| 909 TimeTicks::Now() - shutdown_wait_begin); | 927 TimeTicks::Now() - shutdown_wait_begin); |
| 910 #endif | 928 #endif |
| 911 } | 929 } |
| 912 | 930 |
| 913 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { | 931 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
| 914 AutoLock lock(lock_); | 932 AutoLock lock(lock_); |
| 915 return shutdown_called_; | 933 return shutdown_called_; |
| 916 } | 934 } |
| 917 | 935 |
| 918 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 936 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| 919 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 937 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 938 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 920 { | 939 { |
| 921 AutoLock lock(lock_); | 940 AutoLock lock(lock_); |
| 922 DCHECK(thread_being_created_); | 941 DCHECK(thread_being_created_); |
| 923 thread_being_created_ = false; | 942 thread_being_created_ = false; |
| 924 auto result = threads_.insert( | 943 auto result = threads_.insert( |
| 925 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); | 944 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); |
| 926 DCHECK(result.second); | 945 DCHECK(result.second); |
| 927 | 946 |
| 928 while (true) { | 947 while (true) { |
| 929 #if defined(OS_MACOSX) | 948 #if defined(OS_MACOSX) |
| (...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1044 | 1063 |
| 1045 // We noticed we should exit. Wake up the next worker so it knows it should | 1064 // We noticed we should exit. Wake up the next worker so it knows it should |
| 1046 // exit as well (because the Shutdown() code only signals once). | 1065 // exit as well (because the Shutdown() code only signals once). |
| 1047 SignalHasWork(); | 1066 SignalHasWork(); |
| 1048 | 1067 |
| 1049 // Possibly unblock shutdown. | 1068 // Possibly unblock shutdown. |
| 1050 can_shutdown_cv_.Signal(); | 1069 can_shutdown_cv_.Signal(); |
| 1051 } | 1070 } |
| 1052 | 1071 |
| 1053 void SequencedWorkerPool::Inner::HandleCleanup() { | 1072 void SequencedWorkerPool::Inner::HandleCleanup() { |
| 1054 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1073 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1074 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1055 | 1075 |
| 1056 lock_.AssertAcquired(); | 1076 lock_.AssertAcquired(); |
| 1057 if (cleanup_state_ == CLEANUP_DONE) | 1077 if (cleanup_state_ == CLEANUP_DONE) |
| 1058 return; | 1078 return; |
| 1059 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1079 if (cleanup_state_ == CLEANUP_REQUESTED) { |
| 1060 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1080 // We win, we get to do the cleanup as soon as the others wise up and idle. |
| 1061 cleanup_state_ = CLEANUP_STARTING; | 1081 cleanup_state_ = CLEANUP_STARTING; |
| 1062 while (thread_being_created_ || | 1082 while (thread_being_created_ || |
| 1063 cleanup_idlers_ != threads_.size() - 1) { | 1083 cleanup_idlers_ != threads_.size() - 1) { |
| 1064 has_work_cv_.Signal(); | 1084 has_work_cv_.Signal(); |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1111 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1131 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
| 1112 lock_.AssertAcquired(); | 1132 lock_.AssertAcquired(); |
| 1113 // We assume that we never create enough tasks to wrap around. | 1133 // We assume that we never create enough tasks to wrap around. |
| 1114 return next_sequence_task_number_++; | 1134 return next_sequence_task_number_++; |
| 1115 } | 1135 } |
| 1116 | 1136 |
| 1117 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1137 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| 1118 SequencedTask* task, | 1138 SequencedTask* task, |
| 1119 TimeDelta* wait_time, | 1139 TimeDelta* wait_time, |
| 1120 std::vector<Closure>* delete_these_outside_lock) { | 1140 std::vector<Closure>* delete_these_outside_lock) { |
| 1121 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1141 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1142 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1122 | 1143 |
| 1123 lock_.AssertAcquired(); | 1144 lock_.AssertAcquired(); |
| 1124 | 1145 |
| 1125 // Find the next task with a sequence token that's not currently in use. | 1146 // Find the next task with a sequence token that's not currently in use. |
| 1126 // If the token is in use, that means another thread is running something | 1147 // If the token is in use, that means another thread is running something |
| 1127 // in that sequence, and we can't run it without going out-of-order. | 1148 // in that sequence, and we can't run it without going out-of-order. |
| 1128 // | 1149 // |
| 1129 // This algorithm is simple and fair, but inefficient in some cases. For | 1150 // This algorithm is simple and fair, but inefficient in some cases. For |
| 1130 // example, say somebody schedules 1000 slow tasks with the same sequence | 1151 // example, say somebody schedules 1000 slow tasks with the same sequence |
| 1131 // number. We'll have to go through all those tasks each time we feel like | 1152 // number. We'll have to go through all those tasks each time we feel like |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1199 } | 1220 } |
| 1200 | 1221 |
| 1201 status = GET_WORK_FOUND; | 1222 status = GET_WORK_FOUND; |
| 1202 break; | 1223 break; |
| 1203 } | 1224 } |
| 1204 | 1225 |
| 1205 return status; | 1226 return status; |
| 1206 } | 1227 } |
| 1207 | 1228 |
| 1208 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { | 1229 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| 1209 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1230 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1231 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1210 | 1232 |
| 1211 lock_.AssertAcquired(); | 1233 lock_.AssertAcquired(); |
| 1212 | 1234 |
| 1213 // Mark the task's sequence number as in use. | 1235 // Mark the task's sequence number as in use. |
| 1214 if (task.sequence_token_id) | 1236 if (task.sequence_token_id) |
| 1215 current_sequences_.insert(task.sequence_token_id); | 1237 current_sequences_.insert(task.sequence_token_id); |
| 1216 | 1238 |
| 1217 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN | 1239 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN |
| 1218 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread | 1240 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread |
| 1219 // completes. | 1241 // completes. |
| (...skipping 12 matching lines...) Expand all Loading... |
| 1232 // if there is one waiting to pick up the next task. | 1254 // if there is one waiting to pick up the next task. |
| 1233 // | 1255 // |
| 1234 // Note that we really need to do this *before* running the task, not | 1256 // Note that we really need to do this *before* running the task, not |
| 1235 // after. Otherwise, if more than one task is posted, the creation of the | 1257 // after. Otherwise, if more than one task is posted, the creation of the |
| 1236 // second thread (since we only create one at a time) will be blocked by | 1258 // second thread (since we only create one at a time) will be blocked by |
| 1237 // the execution of the first task, which could be arbitrarily long. | 1259 // the execution of the first task, which could be arbitrarily long. |
| 1238 return PrepareToStartAdditionalThreadIfHelpful(); | 1260 return PrepareToStartAdditionalThreadIfHelpful(); |
| 1239 } | 1261 } |
| 1240 | 1262 |
| 1241 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 1263 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| 1242 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1264 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1265 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1243 | 1266 |
| 1244 lock_.AssertAcquired(); | 1267 lock_.AssertAcquired(); |
| 1245 | 1268 |
| 1246 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { | 1269 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
| 1247 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 1270 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
| 1248 blocking_shutdown_thread_count_--; | 1271 blocking_shutdown_thread_count_--; |
| 1249 } | 1272 } |
| 1250 | 1273 |
| 1251 if (task.sequence_token_id) | 1274 if (task.sequence_token_id) |
| 1252 current_sequences_.erase(task.sequence_token_id); | 1275 current_sequences_.erase(task.sequence_token_id); |
| 1253 } | 1276 } |
| 1254 | 1277 |
| 1255 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 1278 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| 1256 int sequence_token_id) const { | 1279 int sequence_token_id) const { |
| 1257 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1280 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1281 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1258 | 1282 |
| 1259 lock_.AssertAcquired(); | 1283 lock_.AssertAcquired(); |
| 1260 return !sequence_token_id || | 1284 return !sequence_token_id || |
| 1261 current_sequences_.find(sequence_token_id) == | 1285 current_sequences_.find(sequence_token_id) == |
| 1262 current_sequences_.end(); | 1286 current_sequences_.end(); |
| 1263 } | 1287 } |
| 1264 | 1288 |
| 1265 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { | 1289 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| 1266 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1290 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1291 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1267 | 1292 |
| 1268 lock_.AssertAcquired(); | 1293 lock_.AssertAcquired(); |
| 1269 // How thread creation works: | 1294 // How thread creation works: |
| 1270 // | 1295 // |
| 1271 // We'de like to avoid creating threads with the lock held. However, we | 1296 // We'de like to avoid creating threads with the lock held. However, we |
| 1272 // need to be sure that we have an accurate accounting of the threads for | 1297 // need to be sure that we have an accurate accounting of the threads for |
| 1273 // proper Joining and deltion on shutdown. | 1298 // proper Joining and deltion on shutdown. |
| 1274 // | 1299 // |
| 1275 // We need to figure out if we need another thread with the lock held, which | 1300 // We need to figure out if we need another thread with the lock held, which |
| 1276 // is what this function does. It then marks us as in the process of creating | 1301 // is what this function does. It then marks us as in the process of creating |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1308 thread_being_created_ = true; | 1333 thread_being_created_ = true; |
| 1309 return static_cast<int>(threads_.size() + 1); | 1334 return static_cast<int>(threads_.size() + 1); |
| 1310 } | 1335 } |
| 1311 } | 1336 } |
| 1312 } | 1337 } |
| 1313 return 0; | 1338 return 0; |
| 1314 } | 1339 } |
| 1315 | 1340 |
| 1316 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 1341 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| 1317 int thread_number) { | 1342 int thread_number) { |
| 1318 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1343 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1344 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1319 | 1345 |
| 1320 // Called outside of the lock. | 1346 // Called outside of the lock. |
| 1321 DCHECK_GT(thread_number, 0); | 1347 DCHECK_GT(thread_number, 0); |
| 1322 | 1348 |
| 1323 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { | 1349 if (subtle::NoBarrier_Load(&g_all_pools_state) != |
| 1324 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | 1350 AllPoolsState::WORKER_CREATED) { |
| 1325 g_all_pools_state = AllPoolsState::WORKER_CREATED; | 1351 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
| 1352 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1353 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); |
| 1326 } | 1354 } |
| 1327 | 1355 |
| 1328 // The worker is assigned to the list when the thread actually starts, which | 1356 // The worker is assigned to the list when the thread actually starts, which |
| 1329 // will manage the memory of the pointer. | 1357 // will manage the memory of the pointer. |
| 1330 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 1358 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| 1331 } | 1359 } |
| 1332 | 1360 |
| 1333 void SequencedWorkerPool::Inner::SignalHasWork() { | 1361 void SequencedWorkerPool::Inner::SignalHasWork() { |
| 1334 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 1362 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, |
| 1363 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1335 | 1364 |
| 1336 has_work_cv_.Signal(); | 1365 has_work_cv_.Signal(); |
| 1337 if (testing_observer_) { | 1366 if (testing_observer_) { |
| 1338 testing_observer_->OnHasWork(); | 1367 testing_observer_->OnHasWork(); |
| 1339 } | 1368 } |
| 1340 } | 1369 } |
| 1341 | 1370 |
| 1342 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1371 bool SequencedWorkerPool::Inner::CanShutdown() const { |
| 1343 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | 1372 DCHECK_EQ(AllPoolsState::WORKER_CREATED, |
| 1373 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1344 lock_.AssertAcquired(); | 1374 lock_.AssertAcquired(); |
| 1345 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1375 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| 1346 return !thread_being_created_ && | 1376 return !thread_being_created_ && |
| 1347 blocking_shutdown_thread_count_ == 0 && | 1377 blocking_shutdown_thread_count_ == 0 && |
| 1348 blocking_shutdown_pending_task_count_ == 0; | 1378 blocking_shutdown_pending_task_count_ == 0; |
| 1349 } | 1379 } |
| 1350 | 1380 |
| 1351 base::StaticAtomicSequenceNumber | 1381 base::StaticAtomicSequenceNumber |
| 1352 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1382 SequencedWorkerPool::Inner::g_last_sequence_number_; |
| 1353 | 1383 |
| (...skipping 24 matching lines...) Expand all Loading... |
| 1378 } | 1408 } |
| 1379 | 1409 |
| 1380 // static | 1410 // static |
| 1381 void SequencedWorkerPool:: | 1411 void SequencedWorkerPool:: |
| 1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { | 1412 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { |
| 1383 DCHECK(TaskScheduler::GetInstance()); | 1413 DCHECK(TaskScheduler::GetInstance()); |
| 1384 // Hitting this DCHECK indicates that a task was posted to a | 1414 // Hitting this DCHECK indicates that a task was posted to a |
| 1385 // SequencedWorkerPool before the TaskScheduler was initialized and | 1415 // SequencedWorkerPool before the TaskScheduler was initialized and |
| 1386 // redirected, posting task to SequencedWorkerPools needs to at least be | 1416 // redirected, posting task to SequencedWorkerPools needs to at least be |
| 1387 // delayed until after that point. | 1417 // delayed until after that point. |
| 1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | 1418 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, |
| 1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; | 1419 subtle::NoBarrier_Load(&g_all_pools_state)); |
| 1420 subtle::NoBarrier_Store(&g_all_pools_state, |
| 1421 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
| 1390 } | 1422 } |
| 1391 | 1423 |
| 1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1424 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1393 const std::string& thread_name_prefix, | 1425 const std::string& thread_name_prefix, |
| 1394 base::TaskPriority task_priority) | 1426 base::TaskPriority task_priority) |
| 1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1427 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1396 inner_(new Inner(this, | 1428 inner_(new Inner(this, |
| 1397 max_threads, | 1429 max_threads, |
| 1398 thread_name_prefix, | 1430 thread_name_prefix, |
| 1399 task_priority, | 1431 task_priority, |
| (...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1570 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1571 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
| 1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1572 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1541 } | 1573 } |
| 1542 | 1574 |
| 1543 bool SequencedWorkerPool::IsShutdownInProgress() { | 1575 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1544 return inner_->IsShutdownInProgress(); | 1576 return inner_->IsShutdownInProgress(); |
| 1545 } | 1577 } |
| 1546 | 1578 |
| 1547 } // namespace base | 1579 } // namespace base |
| OLD | NEW |