| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <list> | 9 #include <list> |
| 10 #include <map> | 10 #include <map> |
| 11 #include <memory> | 11 #include <memory> |
| 12 #include <set> | 12 #include <set> |
| 13 #include <unordered_map> | 13 #include <unordered_map> |
| 14 #include <utility> | 14 #include <utility> |
| 15 #include <vector> | 15 #include <vector> |
| 16 | 16 |
| 17 #include "base/atomic_sequence_num.h" | 17 #include "base/atomic_sequence_num.h" |
| 18 #include "base/atomicops.h" | |
| 19 #include "base/callback.h" | 18 #include "base/callback.h" |
| 20 #include "base/compiler_specific.h" | 19 #include "base/compiler_specific.h" |
| 21 #include "base/critical_closure.h" | 20 #include "base/critical_closure.h" |
| 21 #include "base/debug/dump_without_crashing.h" |
| 22 #include "base/lazy_instance.h" | 22 #include "base/lazy_instance.h" |
| 23 #include "base/logging.h" | 23 #include "base/logging.h" |
| 24 #include "base/macros.h" | 24 #include "base/macros.h" |
| 25 #include "base/memory/ptr_util.h" | 25 #include "base/memory/ptr_util.h" |
| 26 #include "base/stl_util.h" | 26 #include "base/stl_util.h" |
| 27 #include "base/strings/stringprintf.h" | 27 #include "base/strings/stringprintf.h" |
| 28 #include "base/synchronization/condition_variable.h" | 28 #include "base/synchronization/condition_variable.h" |
| 29 #include "base/synchronization/lock.h" | 29 #include "base/synchronization/lock.h" |
| 30 #include "base/task_scheduler/post_task.h" | 30 #include "base/task_scheduler/post_task.h" |
| 31 #include "base/task_scheduler/task_scheduler.h" | 31 #include "base/task_scheduler/task_scheduler.h" |
| (...skipping 15 matching lines...) Expand all Loading... |
| 47 #endif | 47 #endif |
| 48 | 48 |
| 49 #if !defined(OS_NACL) | 49 #if !defined(OS_NACL) |
| 50 #include "base/metrics/histogram_macros.h" | 50 #include "base/metrics/histogram_macros.h" |
| 51 #endif | 51 #endif |
| 52 | 52 |
| 53 namespace base { | 53 namespace base { |
| 54 | 54 |
| 55 namespace { | 55 namespace { |
| 56 | 56 |
| 57 // An enum representing the state of all pools. Any given non-test process | 57 // An enum representing the state of all pools. A non-test process should only |
| 58 // should only ever transition from NONE_ACTIVE to one of the active states. | 58 // ever transition from POST_TASK_DISABLED to one of the active states. A test |
| 59 // Transitions between actives states are unexpected. The | 59 // process may transition from one of the active states to POST_TASK_DISABLED |
| 60 // REDIRECTED_TO_TASK_SCHEDULER transition occurs when | 60 // when DisableForProcessForTesting() is called. |
| 61 // RedirectToTaskSchedulerForProcess() is called. The WORKER_CREATED transition | |
| 62 // occurs when a Worker needs to be created because the first task was posted | |
| 63 // and the state is still NONE_ACTIVE. In a test process, a transition to | |
| 64 // NONE_ACTIVE occurs when ResetRedirectToTaskSchedulerForProcessForTesting() is | |
| 65 // called. | |
| 66 // | 61 // |
| 67 // |g_all_pools_state| uses relaxed atomic operations to ensure no data race | 62 // External memory synchronization is required to call a method that reads |
| 68 // between reads/writes, strict memory ordering isn't required per no other | 63 // |g_all_pools_state| after calling a method that modifies it. |
| 69 // state being inferred from its value. Explicit synchronization (e.g. locks or | |
| 70 // events) would be overkill (it's fine for other threads to still see | |
| 71 // NONE_ACTIVE after the first Worker was created -- this is not possible for | |
| 72 // REDIRECTED_TO_TASK_SCHEDULER per its API requesting to be invoked while no | |
| 73 // other threads are active). | |
| 74 // | 64 // |
| 75 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool | 65 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
| 76 // will be phased out completely otherwise). | 66 // will be phased out completely otherwise). |
| 77 enum AllPoolsState : subtle::Atomic32 { | 67 enum class AllPoolsState { |
| 78 NONE_ACTIVE, | 68 POST_TASK_DISABLED, |
| 79 WORKER_CREATED, | 69 USE_WORKER_POOL, |
| 80 REDIRECTED_TO_TASK_SCHEDULER, | 70 REDIRECTED_TO_TASK_SCHEDULER, |
| 81 }; | 71 }; |
| 82 subtle::Atomic32 g_all_pools_state = AllPoolsState::NONE_ACTIVE; | 72 AllPoolsState g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; |
| 83 | 73 |
| 84 struct SequencedTask : public TrackingInfo { | 74 struct SequencedTask : public TrackingInfo { |
| 85 SequencedTask() | 75 SequencedTask() |
| 86 : sequence_token_id(0), | 76 : sequence_token_id(0), |
| 87 trace_id(0), | 77 trace_id(0), |
| 88 sequence_task_number(0), | 78 sequence_task_number(0), |
| 89 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 79 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
| 90 | 80 |
| 91 explicit SequencedTask(const tracked_objects::Location& from_here) | 81 explicit SequencedTask(const tracked_objects::Location& from_here) |
| 92 : base::TrackingInfo(from_here, TimeTicks()), | 82 : base::TrackingInfo(from_here, TimeTicks()), |
| (...skipping 482 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 575 // Worker definitions --------------------------------------------------------- | 565 // Worker definitions --------------------------------------------------------- |
| 576 | 566 |
| 577 SequencedWorkerPool::Worker::Worker( | 567 SequencedWorkerPool::Worker::Worker( |
| 578 scoped_refptr<SequencedWorkerPool> worker_pool, | 568 scoped_refptr<SequencedWorkerPool> worker_pool, |
| 579 int thread_number, | 569 int thread_number, |
| 580 const std::string& prefix) | 570 const std::string& prefix) |
| 581 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 571 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
| 582 worker_pool_(std::move(worker_pool)), | 572 worker_pool_(std::move(worker_pool)), |
| 583 task_shutdown_behavior_(BLOCK_SHUTDOWN), | 573 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| 584 is_processing_task_(false) { | 574 is_processing_task_(false) { |
| 585 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 575 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 586 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 587 Start(); | 576 Start(); |
| 588 } | 577 } |
| 589 | 578 |
| 590 SequencedWorkerPool::Worker::~Worker() { | 579 SequencedWorkerPool::Worker::~Worker() { |
| 591 } | 580 } |
| 592 | 581 |
| 593 void SequencedWorkerPool::Worker::Run() { | 582 void SequencedWorkerPool::Worker::Run() { |
| 594 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 583 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 595 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 596 | 584 |
| 597 #if defined(OS_WIN) | 585 #if defined(OS_WIN) |
| 598 win::ScopedCOMInitializer com_initializer; | 586 win::ScopedCOMInitializer com_initializer; |
| 599 #endif | 587 #endif |
| 600 | 588 |
| 601 // Store a pointer to this worker in thread local storage for static function | 589 // Store a pointer to this worker in thread local storage for static function |
| 602 // access. | 590 // access. |
| 603 DCHECK(!lazy_tls_ptr_.Get().Get()); | 591 DCHECK(!lazy_tls_ptr_.Get().Get()); |
| 604 lazy_tls_ptr_.Get().Set(this); | 592 lazy_tls_ptr_.Get().Set(this); |
| 605 | 593 |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 684 return SequenceToken(LockedGetNamedTokenID(name)); | 672 return SequenceToken(LockedGetNamedTokenID(name)); |
| 685 } | 673 } |
| 686 | 674 |
| 687 bool SequencedWorkerPool::Inner::PostTask( | 675 bool SequencedWorkerPool::Inner::PostTask( |
| 688 const std::string* optional_token_name, | 676 const std::string* optional_token_name, |
| 689 SequenceToken sequence_token, | 677 SequenceToken sequence_token, |
| 690 WorkerShutdown shutdown_behavior, | 678 WorkerShutdown shutdown_behavior, |
| 691 const tracked_objects::Location& from_here, | 679 const tracked_objects::Location& from_here, |
| 692 const Closure& task, | 680 const Closure& task, |
| 693 TimeDelta delay) { | 681 TimeDelta delay) { |
| 682 DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
| 683 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) |
| 684 debug::DumpWithoutCrashing(); |
| 685 |
| 694 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); | 686 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
| 695 SequencedTask sequenced(from_here); | 687 SequencedTask sequenced(from_here); |
| 696 sequenced.sequence_token_id = sequence_token.id_; | 688 sequenced.sequence_token_id = sequence_token.id_; |
| 697 sequenced.shutdown_behavior = shutdown_behavior; | 689 sequenced.shutdown_behavior = shutdown_behavior; |
| 698 sequenced.posted_from = from_here; | 690 sequenced.posted_from = from_here; |
| 699 sequenced.task = | 691 sequenced.task = |
| 700 shutdown_behavior == BLOCK_SHUTDOWN ? | 692 shutdown_behavior == BLOCK_SHUTDOWN ? |
| 701 base::MakeCriticalClosure(task) : task; | 693 base::MakeCriticalClosure(task) : task; |
| 702 sequenced.time_to_run = TimeTicks::Now() + delay; | 694 sequenced.time_to_run = TimeTicks::Now() + delay; |
| 703 | 695 |
| (...skipping 29 matching lines...) Expand all Loading... |
| 733 "SequencedWorkerPool::Inner::PostTask", | 725 "SequencedWorkerPool::Inner::PostTask", |
| 734 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), | 726 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
| 735 TRACE_EVENT_FLAG_FLOW_OUT); | 727 TRACE_EVENT_FLAG_FLOW_OUT); |
| 736 | 728 |
| 737 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); | 729 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
| 738 | 730 |
| 739 // Now that we have the lock, apply the named token rules. | 731 // Now that we have the lock, apply the named token rules. |
| 740 if (optional_token_name) | 732 if (optional_token_name) |
| 741 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | 733 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| 742 | 734 |
| 743 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 744 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
| 745 if (!PostTaskToTaskScheduler(sequenced, delay)) | 736 if (!PostTaskToTaskScheduler(sequenced, delay)) |
| 746 return false; | 737 return false; |
| 747 } else { | 738 } else { |
| 748 pending_tasks_.insert(sequenced); | 739 pending_tasks_.insert(sequenced); |
| 749 | 740 |
| 750 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) | 741 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
| 751 blocking_shutdown_pending_task_count_++; | 742 blocking_shutdown_pending_task_count_++; |
| 752 | 743 |
| 753 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 744 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| 754 } | 745 } |
| 755 } | 746 } |
| 756 | 747 |
| 757 if (subtle::NoBarrier_Load(&g_all_pools_state) != | 748 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure |
| 758 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 749 // correct behavior if a task is posted to a SequencedWorkerPool before |
| 750 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. |
| 751 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 759 // Actually start the additional thread or signal an existing one outside | 752 // Actually start the additional thread or signal an existing one outside |
| 760 // the lock. | 753 // the lock. |
| 761 if (create_thread_id) | 754 if (create_thread_id) |
| 762 FinishStartingAdditionalThread(create_thread_id); | 755 FinishStartingAdditionalThread(create_thread_id); |
| 763 else | 756 else |
| 764 SignalHasWork(); | 757 SignalHasWork(); |
| 765 } | 758 } |
| 766 | 759 |
| 767 #if DCHECK_IS_ON() | 760 #if DCHECK_IS_ON() |
| 768 { | 761 { |
| 769 AutoLock lock_for_dcheck(lock_); | 762 AutoLock lock_for_dcheck(lock_); |
| 770 // Some variables are exposed in both modes for convenience but only really | 763 // Some variables are exposed in both modes for convenience but only really |
| 771 // intended for one of them at runtime, confirm exclusive usage here. | 764 // intended for one of them at runtime, confirm exclusive usage here. |
| 772 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 765 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 773 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
| 774 DCHECK(pending_tasks_.empty()); | 766 DCHECK(pending_tasks_.empty()); |
| 775 DCHECK_EQ(0, create_thread_id); | 767 DCHECK_EQ(0, create_thread_id); |
| 776 } else { | 768 } else { |
| 777 DCHECK(sequenced_task_runner_map_.empty()); | 769 DCHECK(sequenced_task_runner_map_.empty()); |
| 778 } | 770 } |
| 779 } | 771 } |
| 780 #endif // DCHECK_IS_ON() | 772 #endif // DCHECK_IS_ON() |
| 781 | 773 |
| 782 return true; | 774 return true; |
| 783 } | 775 } |
| 784 | 776 |
| 785 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | 777 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( |
| 786 const SequencedTask& sequenced, | 778 const SequencedTask& sequenced, |
| 787 const TimeDelta& delay) { | 779 const TimeDelta& delay) { |
| 788 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 780 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| 789 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 790 | 781 |
| 791 lock_.AssertAcquired(); | 782 lock_.AssertAcquired(); |
| 792 | 783 |
| 793 // Confirm that the TaskScheduler's shutdown behaviors use the same | 784 // Confirm that the TaskScheduler's shutdown behaviors use the same |
| 794 // underlying values as SequencedWorkerPool. | 785 // underlying values as SequencedWorkerPool. |
| 795 static_assert( | 786 static_assert( |
| 796 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == | 787 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
| 797 static_cast<int>(CONTINUE_ON_SHUTDOWN), | 788 static_cast<int>(CONTINUE_ON_SHUTDOWN), |
| 798 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 789 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
| 799 "CONTINUE_ON_SHUTDOWN."); | 790 "CONTINUE_ON_SHUTDOWN."); |
| (...skipping 13 matching lines...) Expand all Loading... |
| 813 .WithPriority(task_priority_) | 804 .WithPriority(task_priority_) |
| 814 .WithShutdownBehavior(task_shutdown_behavior); | 805 .WithShutdownBehavior(task_shutdown_behavior); |
| 815 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) | 806 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) |
| 816 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); | 807 ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); |
| 817 } | 808 } |
| 818 | 809 |
| 819 scoped_refptr<TaskRunner> | 810 scoped_refptr<TaskRunner> |
| 820 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( | 811 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( |
| 821 int sequence_token_id, | 812 int sequence_token_id, |
| 822 const TaskTraits& traits) { | 813 const TaskTraits& traits) { |
| 823 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 814 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| 824 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 825 | 815 |
| 826 lock_.AssertAcquired(); | 816 lock_.AssertAcquired(); |
| 827 | 817 |
| 828 static_assert( | 818 static_assert( |
| 829 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, | 819 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, |
| 830 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " | 820 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " |
| 831 "used as an index in |unsequenced_task_runners_|."); | 821 "used as an index in |unsequenced_task_runners_|."); |
| 832 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, | 822 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, |
| 833 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " | 823 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " |
| 834 "be used as an index in |unsequenced_task_runners_|."); | 824 "be used as an index in |unsequenced_task_runners_|."); |
| (...skipping 16 matching lines...) Expand all Loading... |
| 851 ExecutionMode execution_mode = | 841 ExecutionMode execution_mode = |
| 852 sequence_token_id ? ExecutionMode::SEQUENCED : ExecutionMode::PARALLEL; | 842 sequence_token_id ? ExecutionMode::SEQUENCED : ExecutionMode::PARALLEL; |
| 853 task_runner = CreateTaskRunnerWithTraits(traits, execution_mode); | 843 task_runner = CreateTaskRunnerWithTraits(traits, execution_mode); |
| 854 } | 844 } |
| 855 | 845 |
| 856 return task_runner; | 846 return task_runner; |
| 857 } | 847 } |
| 858 | 848 |
| 859 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 849 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| 860 AutoLock lock(lock_); | 850 AutoLock lock(lock_); |
| 861 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 851 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 862 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
| 863 if (!runs_tasks_on_verifier_) { | 852 if (!runs_tasks_on_verifier_) { |
| 864 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | 853 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( |
| 865 TaskTraits().WithFileIO().WithPriority(task_priority_), | 854 TaskTraits().WithFileIO().WithPriority(task_priority_), |
| 866 ExecutionMode::PARALLEL); | 855 ExecutionMode::PARALLEL); |
| 867 } | 856 } |
| 868 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | 857 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); |
| 869 } else { | 858 } else { |
| 870 return ContainsKey(threads_, PlatformThread::CurrentId()); | 859 return ContainsKey(threads_, PlatformThread::CurrentId()); |
| 871 } | 860 } |
| 872 } | 861 } |
| 873 | 862 |
| 874 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 863 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 875 SequenceToken sequence_token) const { | 864 SequenceToken sequence_token) const { |
| 876 DCHECK(sequence_token.IsValid()); | 865 DCHECK(sequence_token.IsValid()); |
| 877 | 866 |
| 878 AutoLock lock(lock_); | 867 AutoLock lock(lock_); |
| 879 | 868 |
| 880 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 869 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 881 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
| 882 const auto sequenced_task_runner_it = | 870 const auto sequenced_task_runner_it = |
| 883 sequenced_task_runner_map_.find(sequence_token.id_); | 871 sequenced_task_runner_map_.find(sequence_token.id_); |
| 884 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && | 872 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
| 885 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); | 873 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); |
| 886 } else { | 874 } else { |
| 887 ThreadMap::const_iterator found = | 875 ThreadMap::const_iterator found = |
| 888 threads_.find(PlatformThread::CurrentId()); | 876 threads_.find(PlatformThread::CurrentId()); |
| 889 return found != threads_.end() && found->second->is_processing_task() && | 877 return found != threads_.end() && found->second->is_processing_task() && |
| 890 sequence_token.Equals(found->second->task_sequence_token()); | 878 sequence_token.Equals(found->second->task_sequence_token()); |
| 891 } | 879 } |
| 892 } | 880 } |
| 893 | 881 |
| 894 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 882 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 895 void SequencedWorkerPool::Inner::CleanupForTesting() { | 883 void SequencedWorkerPool::Inner::CleanupForTesting() { |
| 896 DCHECK_NE(subtle::NoBarrier_Load(&g_all_pools_state), | 884 DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); |
| 897 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); | |
| 898 AutoLock lock(lock_); | 885 AutoLock lock(lock_); |
| 899 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 886 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 900 if (shutdown_called_) | 887 if (shutdown_called_) |
| 901 return; | 888 return; |
| 902 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | 889 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
| 903 return; | 890 return; |
| 904 cleanup_state_ = CLEANUP_REQUESTED; | 891 cleanup_state_ = CLEANUP_REQUESTED; |
| 905 cleanup_idlers_ = 0; | 892 cleanup_idlers_ = 0; |
| 906 has_work_cv_.Signal(); | 893 has_work_cv_.Signal(); |
| 907 while (cleanup_state_ != CLEANUP_DONE) | 894 while (cleanup_state_ != CLEANUP_DONE) |
| (...skipping 10 matching lines...) Expand all Loading... |
| 918 { | 905 { |
| 919 AutoLock lock(lock_); | 906 AutoLock lock(lock_); |
| 920 // Cleanup and Shutdown should not be called concurrently. | 907 // Cleanup and Shutdown should not be called concurrently. |
| 921 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 908 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 922 if (shutdown_called_) | 909 if (shutdown_called_) |
| 923 return; | 910 return; |
| 924 shutdown_called_ = true; | 911 shutdown_called_ = true; |
| 925 | 912 |
| 926 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 913 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| 927 | 914 |
| 928 if (subtle::NoBarrier_Load(&g_all_pools_state) != | 915 if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) |
| 929 AllPoolsState::WORKER_CREATED) { | |
| 930 return; | 916 return; |
| 931 } | |
| 932 | 917 |
| 933 // Tickle the threads. This will wake up a waiting one so it will know that | 918 // Tickle the threads. This will wake up a waiting one so it will know that |
| 934 // it can exit, which in turn will wake up any other waiting ones. | 919 // it can exit, which in turn will wake up any other waiting ones. |
| 935 SignalHasWork(); | 920 SignalHasWork(); |
| 936 | 921 |
| 937 // There are no pending or running tasks blocking shutdown, we're done. | 922 // There are no pending or running tasks blocking shutdown, we're done. |
| 938 if (CanShutdown()) | 923 if (CanShutdown()) |
| 939 return; | 924 return; |
| 940 } | 925 } |
| 941 | 926 |
| (...skipping 18 matching lines...) Expand all Loading... |
| 960 TimeTicks::Now() - shutdown_wait_begin); | 945 TimeTicks::Now() - shutdown_wait_begin); |
| 961 #endif | 946 #endif |
| 962 } | 947 } |
| 963 | 948 |
| 964 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { | 949 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
| 965 AutoLock lock(lock_); | 950 AutoLock lock(lock_); |
| 966 return shutdown_called_; | 951 return shutdown_called_; |
| 967 } | 952 } |
| 968 | 953 |
| 969 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 954 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| 970 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 955 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 971 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 972 { | 956 { |
| 973 AutoLock lock(lock_); | 957 AutoLock lock(lock_); |
| 974 DCHECK(thread_being_created_); | 958 DCHECK(thread_being_created_); |
| 975 thread_being_created_ = false; | 959 thread_being_created_ = false; |
| 976 auto result = threads_.insert( | 960 auto result = threads_.insert( |
| 977 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); | 961 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); |
| 978 DCHECK(result.second); | 962 DCHECK(result.second); |
| 979 | 963 |
| 980 while (true) { | 964 while (true) { |
| 981 #if defined(OS_MACOSX) | 965 #if defined(OS_MACOSX) |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1093 | 1077 |
| 1094 // We noticed we should exit. Wake up the next worker so it knows it should | 1078 // We noticed we should exit. Wake up the next worker so it knows it should |
| 1095 // exit as well (because the Shutdown() code only signals once). | 1079 // exit as well (because the Shutdown() code only signals once). |
| 1096 SignalHasWork(); | 1080 SignalHasWork(); |
| 1097 | 1081 |
| 1098 // Possibly unblock shutdown. | 1082 // Possibly unblock shutdown. |
| 1099 can_shutdown_cv_.Signal(); | 1083 can_shutdown_cv_.Signal(); |
| 1100 } | 1084 } |
| 1101 | 1085 |
| 1102 void SequencedWorkerPool::Inner::HandleCleanup() { | 1086 void SequencedWorkerPool::Inner::HandleCleanup() { |
| 1103 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1087 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1104 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1105 | 1088 |
| 1106 lock_.AssertAcquired(); | 1089 lock_.AssertAcquired(); |
| 1107 if (cleanup_state_ == CLEANUP_DONE) | 1090 if (cleanup_state_ == CLEANUP_DONE) |
| 1108 return; | 1091 return; |
| 1109 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1092 if (cleanup_state_ == CLEANUP_REQUESTED) { |
| 1110 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1093 // We win, we get to do the cleanup as soon as the others wise up and idle. |
| 1111 cleanup_state_ = CLEANUP_STARTING; | 1094 cleanup_state_ = CLEANUP_STARTING; |
| 1112 while (thread_being_created_ || | 1095 while (thread_being_created_ || |
| 1113 cleanup_idlers_ != threads_.size() - 1) { | 1096 cleanup_idlers_ != threads_.size() - 1) { |
| 1114 has_work_cv_.Signal(); | 1097 has_work_cv_.Signal(); |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1161 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1144 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
| 1162 lock_.AssertAcquired(); | 1145 lock_.AssertAcquired(); |
| 1163 // We assume that we never create enough tasks to wrap around. | 1146 // We assume that we never create enough tasks to wrap around. |
| 1164 return next_sequence_task_number_++; | 1147 return next_sequence_task_number_++; |
| 1165 } | 1148 } |
| 1166 | 1149 |
| 1167 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1150 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| 1168 SequencedTask* task, | 1151 SequencedTask* task, |
| 1169 TimeDelta* wait_time, | 1152 TimeDelta* wait_time, |
| 1170 std::vector<Closure>* delete_these_outside_lock) { | 1153 std::vector<Closure>* delete_these_outside_lock) { |
| 1171 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1154 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1172 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1173 | 1155 |
| 1174 lock_.AssertAcquired(); | 1156 lock_.AssertAcquired(); |
| 1175 | 1157 |
| 1176 // Find the next task with a sequence token that's not currently in use. | 1158 // Find the next task with a sequence token that's not currently in use. |
| 1177 // If the token is in use, that means another thread is running something | 1159 // If the token is in use, that means another thread is running something |
| 1178 // in that sequence, and we can't run it without going out-of-order. | 1160 // in that sequence, and we can't run it without going out-of-order. |
| 1179 // | 1161 // |
| 1180 // This algorithm is simple and fair, but inefficient in some cases. For | 1162 // This algorithm is simple and fair, but inefficient in some cases. For |
| 1181 // example, say somebody schedules 1000 slow tasks with the same sequence | 1163 // example, say somebody schedules 1000 slow tasks with the same sequence |
| 1182 // number. We'll have to go through all those tasks each time we feel like | 1164 // number. We'll have to go through all those tasks each time we feel like |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1250 } | 1232 } |
| 1251 | 1233 |
| 1252 status = GET_WORK_FOUND; | 1234 status = GET_WORK_FOUND; |
| 1253 break; | 1235 break; |
| 1254 } | 1236 } |
| 1255 | 1237 |
| 1256 return status; | 1238 return status; |
| 1257 } | 1239 } |
| 1258 | 1240 |
| 1259 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { | 1241 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| 1260 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1242 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1261 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1262 | 1243 |
| 1263 lock_.AssertAcquired(); | 1244 lock_.AssertAcquired(); |
| 1264 | 1245 |
| 1265 // Mark the task's sequence number as in use. | 1246 // Mark the task's sequence number as in use. |
| 1266 if (task.sequence_token_id) | 1247 if (task.sequence_token_id) |
| 1267 current_sequences_.insert(task.sequence_token_id); | 1248 current_sequences_.insert(task.sequence_token_id); |
| 1268 | 1249 |
| 1269 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN | 1250 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN |
| 1270 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread | 1251 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread |
| 1271 // completes. | 1252 // completes. |
| (...skipping 12 matching lines...) Expand all Loading... |
| 1284 // if there is one waiting to pick up the next task. | 1265 // if there is one waiting to pick up the next task. |
| 1285 // | 1266 // |
| 1286 // Note that we really need to do this *before* running the task, not | 1267 // Note that we really need to do this *before* running the task, not |
| 1287 // after. Otherwise, if more than one task is posted, the creation of the | 1268 // after. Otherwise, if more than one task is posted, the creation of the |
| 1288 // second thread (since we only create one at a time) will be blocked by | 1269 // second thread (since we only create one at a time) will be blocked by |
| 1289 // the execution of the first task, which could be arbitrarily long. | 1270 // the execution of the first task, which could be arbitrarily long. |
| 1290 return PrepareToStartAdditionalThreadIfHelpful(); | 1271 return PrepareToStartAdditionalThreadIfHelpful(); |
| 1291 } | 1272 } |
| 1292 | 1273 |
| 1293 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 1274 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| 1294 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1275 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1295 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1296 | 1276 |
| 1297 lock_.AssertAcquired(); | 1277 lock_.AssertAcquired(); |
| 1298 | 1278 |
| 1299 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { | 1279 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
| 1300 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 1280 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
| 1301 blocking_shutdown_thread_count_--; | 1281 blocking_shutdown_thread_count_--; |
| 1302 } | 1282 } |
| 1303 | 1283 |
| 1304 if (task.sequence_token_id) | 1284 if (task.sequence_token_id) |
| 1305 current_sequences_.erase(task.sequence_token_id); | 1285 current_sequences_.erase(task.sequence_token_id); |
| 1306 } | 1286 } |
| 1307 | 1287 |
| 1308 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 1288 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| 1309 int sequence_token_id) const { | 1289 int sequence_token_id) const { |
| 1310 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1290 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| 1311 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1312 | 1291 |
| 1313 lock_.AssertAcquired(); | 1292 lock_.AssertAcquired(); |
| 1314 return !sequence_token_id || | 1293 return !sequence_token_id || |
| 1315 current_sequences_.find(sequence_token_id) == | 1294 current_sequences_.find(sequence_token_id) == |
| 1316 current_sequences_.end(); | 1295 current_sequences_.end(); |
| 1317 } | 1296 } |
| 1318 | 1297 |
| 1319 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { | 1298 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| 1320 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1299 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| 1321 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1322 | 1300 |
| 1323 lock_.AssertAcquired(); | 1301 lock_.AssertAcquired(); |
| 1324 // How thread creation works: | 1302 // How thread creation works: |
| 1325 // | 1303 // |
| 1326 // We'de like to avoid creating threads with the lock held. However, we | 1304 // We'de like to avoid creating threads with the lock held. However, we |
| 1327 // need to be sure that we have an accurate accounting of the threads for | 1305 // need to be sure that we have an accurate accounting of the threads for |
| 1328 // proper Joining and deltion on shutdown. | 1306 // proper Joining and deltion on shutdown. |
| 1329 // | 1307 // |
| 1330 // We need to figure out if we need another thread with the lock held, which | 1308 // We need to figure out if we need another thread with the lock held, which |
| 1331 // is what this function does. It then marks us as in the process of creating | 1309 // is what this function does. It then marks us as in the process of creating |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1363 thread_being_created_ = true; | 1341 thread_being_created_ = true; |
| 1364 return static_cast<int>(threads_.size() + 1); | 1342 return static_cast<int>(threads_.size() + 1); |
| 1365 } | 1343 } |
| 1366 } | 1344 } |
| 1367 } | 1345 } |
| 1368 return 0; | 1346 return 0; |
| 1369 } | 1347 } |
| 1370 | 1348 |
| 1371 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 1349 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| 1372 int thread_number) { | 1350 int thread_number) { |
| 1373 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1351 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1374 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1375 | 1352 |
| 1376 // Called outside of the lock. | 1353 // Called outside of the lock. |
| 1377 DCHECK_GT(thread_number, 0); | 1354 DCHECK_GT(thread_number, 0); |
| 1378 | 1355 |
| 1379 if (subtle::NoBarrier_Load(&g_all_pools_state) != | |
| 1380 AllPoolsState::WORKER_CREATED) { | |
| 1381 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, | |
| 1382 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1383 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::WORKER_CREATED); | |
| 1384 } | |
| 1385 | |
| 1386 // The worker is assigned to the list when the thread actually starts, which | 1356 // The worker is assigned to the list when the thread actually starts, which |
| 1387 // will manage the memory of the pointer. | 1357 // will manage the memory of the pointer. |
| 1388 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 1358 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| 1389 } | 1359 } |
| 1390 | 1360 |
| 1391 void SequencedWorkerPool::Inner::SignalHasWork() { | 1361 void SequencedWorkerPool::Inner::SignalHasWork() { |
| 1392 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, | 1362 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
| 1393 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1394 | 1363 |
| 1395 has_work_cv_.Signal(); | 1364 has_work_cv_.Signal(); |
| 1396 if (testing_observer_) { | 1365 if (testing_observer_) { |
| 1397 testing_observer_->OnHasWork(); | 1366 testing_observer_->OnHasWork(); |
| 1398 } | 1367 } |
| 1399 } | 1368 } |
| 1400 | 1369 |
| 1401 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1370 bool SequencedWorkerPool::Inner::CanShutdown() const { |
| 1402 DCHECK_EQ(AllPoolsState::WORKER_CREATED, | 1371 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); |
| 1403 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1404 lock_.AssertAcquired(); | 1372 lock_.AssertAcquired(); |
| 1405 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1373 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| 1406 return !thread_being_created_ && | 1374 return !thread_being_created_ && |
| 1407 blocking_shutdown_thread_count_ == 0 && | 1375 blocking_shutdown_thread_count_ == 0 && |
| 1408 blocking_shutdown_pending_task_count_ == 0; | 1376 blocking_shutdown_pending_task_count_ == 0; |
| 1409 } | 1377 } |
| 1410 | 1378 |
| 1411 base::StaticAtomicSequenceNumber | 1379 base::StaticAtomicSequenceNumber |
| 1412 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1380 SequencedWorkerPool::Inner::g_last_sequence_number_; |
| 1413 | 1381 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 1431 scoped_refptr<SequencedWorkerPool> | 1399 scoped_refptr<SequencedWorkerPool> |
| 1432 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1400 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
| 1433 Worker* worker = Worker::GetForCurrentThread(); | 1401 Worker* worker = Worker::GetForCurrentThread(); |
| 1434 if (!worker) | 1402 if (!worker) |
| 1435 return nullptr; | 1403 return nullptr; |
| 1436 | 1404 |
| 1437 return worker->worker_pool(); | 1405 return worker->worker_pool(); |
| 1438 } | 1406 } |
| 1439 | 1407 |
| 1440 // static | 1408 // static |
| 1441 void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { | 1409 void SequencedWorkerPool::EnableForProcess() { |
| 1442 DCHECK(TaskScheduler::GetInstance()); | 1410 DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
| 1443 // Hitting this DCHECK indicates that a task was posted to a | 1411 g_all_pools_state = AllPoolsState::USE_WORKER_POOL; |
| 1444 // SequencedWorkerPool before the TaskScheduler was initialized and | |
| 1445 // redirected, posting task to SequencedWorkerPools needs to at least be | |
| 1446 // delayed until after that point. | |
| 1447 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, | |
| 1448 subtle::NoBarrier_Load(&g_all_pools_state)); | |
| 1449 subtle::NoBarrier_Store(&g_all_pools_state, | |
| 1450 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); | |
| 1451 } | 1412 } |
| 1452 | 1413 |
| 1453 // static | 1414 // static |
| 1454 void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { | 1415 void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess() { |
| 1455 // This can be called when the current state is REDIRECTED_TO_TASK_SCHEDULER | 1416 DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); |
| 1456 // to stop redirecting tasks. It can also be called when the current state is | 1417 DCHECK(TaskScheduler::GetInstance()); |
| 1457 // WORKER_CREATED to allow RedirectToTaskSchedulerForProcess() to be called | 1418 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
| 1458 // (RedirectToTaskSchedulerForProcess() cannot be called after a worker has | 1419 } |
| 1459 // been created if this isn't called). | 1420 |
| 1460 subtle::NoBarrier_Store(&g_all_pools_state, AllPoolsState::NONE_ACTIVE); | 1421 // static |
| 1422 void SequencedWorkerPool::DisableForProcessForTesting() { |
| 1423 g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; |
| 1424 } |
| 1425 |
| 1426 // static |
| 1427 bool SequencedWorkerPool::IsEnabled() { |
| 1428 return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED; |
| 1461 } | 1429 } |
| 1462 | 1430 |
| 1463 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1431 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
| 1464 const std::string& thread_name_prefix, | 1432 const std::string& thread_name_prefix, |
| 1465 base::TaskPriority task_priority) | 1433 base::TaskPriority task_priority) |
| 1466 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1434 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
| 1467 inner_(new Inner(this, | 1435 inner_(new Inner(this, |
| 1468 max_threads, | 1436 max_threads, |
| 1469 thread_name_prefix, | 1437 thread_name_prefix, |
| 1470 task_priority, | 1438 task_priority, |
| (...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1589 return PostDelayedWorkerTask(from_here, task, delay); | 1557 return PostDelayedWorkerTask(from_here, task, delay); |
| 1590 } | 1558 } |
| 1591 | 1559 |
| 1592 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1560 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| 1593 return inner_->RunsTasksOnCurrentThread(); | 1561 return inner_->RunsTasksOnCurrentThread(); |
| 1594 } | 1562 } |
| 1595 | 1563 |
| 1596 void SequencedWorkerPool::FlushForTesting() { | 1564 void SequencedWorkerPool::FlushForTesting() { |
| 1597 DCHECK(!RunsTasksOnCurrentThread()); | 1565 DCHECK(!RunsTasksOnCurrentThread()); |
| 1598 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 1566 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 1599 if (subtle::NoBarrier_Load(&g_all_pools_state) == | 1567 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
| 1600 AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
| 1601 // TODO(gab): Remove this if http://crbug.com/622400 fails. | 1568 // TODO(gab): Remove this if http://crbug.com/622400 fails. |
| 1602 TaskScheduler::GetInstance()->FlushForTesting(); | 1569 TaskScheduler::GetInstance()->FlushForTesting(); |
| 1603 } else { | 1570 } else { |
| 1604 inner_->CleanupForTesting(); | 1571 inner_->CleanupForTesting(); |
| 1605 } | 1572 } |
| 1606 } | 1573 } |
| 1607 | 1574 |
| 1608 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1575 void SequencedWorkerPool::SignalHasWorkForTesting() { |
| 1609 inner_->SignalHasWorkForTesting(); | 1576 inner_->SignalHasWorkForTesting(); |
| 1610 } | 1577 } |
| 1611 | 1578 |
| 1612 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1579 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1613 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1580 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
| 1614 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1581 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1615 } | 1582 } |
| 1616 | 1583 |
| 1617 bool SequencedWorkerPool::IsShutdownInProgress() { | 1584 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1618 return inner_->IsShutdownInProgress(); | 1585 return inner_->IsShutdownInProgress(); |
| 1619 } | 1586 } |
| 1620 | 1587 |
| 1621 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1588 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1622 SequenceToken sequence_token) const { | 1589 SequenceToken sequence_token) const { |
| 1623 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1590 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1624 } | 1591 } |
| 1625 | 1592 |
| 1626 } // namespace base | 1593 } // namespace base |
| OLD | NEW |