OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
11 #include <memory> | 11 #include <memory> |
12 #include <set> | 12 #include <set> |
13 #include <unordered_map> | |
13 #include <utility> | 14 #include <utility> |
14 #include <vector> | 15 #include <vector> |
15 | 16 |
16 #include "base/atomic_sequence_num.h" | 17 #include "base/atomic_sequence_num.h" |
17 #include "base/callback.h" | 18 #include "base/callback.h" |
18 #include "base/compiler_specific.h" | 19 #include "base/compiler_specific.h" |
19 #include "base/critical_closure.h" | 20 #include "base/critical_closure.h" |
20 #include "base/lazy_instance.h" | 21 #include "base/lazy_instance.h" |
21 #include "base/logging.h" | 22 #include "base/logging.h" |
22 #include "base/macros.h" | 23 #include "base/macros.h" |
(...skipping 22 matching lines...) Expand all Loading... | |
45 #endif | 46 #endif |
46 | 47 |
47 #if !defined(OS_NACL) | 48 #if !defined(OS_NACL) |
48 #include "base/metrics/histogram.h" | 49 #include "base/metrics/histogram.h" |
49 #endif | 50 #endif |
50 | 51 |
51 namespace base { | 52 namespace base { |
52 | 53 |
53 namespace { | 54 namespace { |
54 | 55 |
55 // An enum representing the state of all pools. Any given process should only | 56 // An enum representing the state of all pools. Any given non-test process |
56 // ever transition from NONE_ACTIVE to the active states, transitions between | 57 // should only ever transition from NONE_ACTIVE to the active states, |
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition | 58 // transitions between actives states are unexpected. The |
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called | 59 // REDIRECTED_TO_TASK_SCHEDULER transition occurs when |
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created | 60 // RedirectToTaskSchedulerForProcess() is called and the WORKER_CREATED |
60 // because the first task was posted and the state is still NONE_ACTIVE. | 61 // transition occurs when a Worker needs to be created because the first task |
62 // was posted and the state is still NONE_ACTIVE. In a test process, | |
63 // ResetRedirectToTaskSchedulerForProcessForTesting() causes a transition to | |
64 // the NONE_ACTIVE state. | |
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool | 65 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool |
62 // will be phased out completely otherwise). | 66 // will be phased out completely otherwise). |
63 enum class AllPoolsState { | 67 enum class AllPoolsState { |
64 NONE_ACTIVE, | 68 NONE_ACTIVE, |
65 WORKER_CREATED, | 69 WORKER_CREATED, |
66 REDIRECTED_TO_TASK_SCHEDULER, | 70 REDIRECTED_TO_TASK_SCHEDULER, |
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; | 71 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; |
68 | 72 |
69 struct SequencedTask : public TrackingInfo { | 73 struct SequencedTask : public TrackingInfo { |
70 SequencedTask() | 74 SequencedTask() |
(...skipping 292 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
363 }; | 367 }; |
364 | 368 |
365 enum CleanupState { | 369 enum CleanupState { |
366 CLEANUP_REQUESTED, | 370 CLEANUP_REQUESTED, |
367 CLEANUP_STARTING, | 371 CLEANUP_STARTING, |
368 CLEANUP_RUNNING, | 372 CLEANUP_RUNNING, |
369 CLEANUP_FINISHING, | 373 CLEANUP_FINISHING, |
370 CLEANUP_DONE, | 374 CLEANUP_DONE, |
371 }; | 375 }; |
372 | 376 |
377 struct TaskRunnerAndShutdownBehavior { | |
378 scoped_refptr<TaskRunner> task_runner; | |
379 TaskShutdownBehavior shutdown_behavior; | |
380 }; | |
381 | |
382 bool RunsTasksOnCurrentThreadAssertSynchronized() const; | |
383 | |
373 // Helper used by PostTask() to complete the work when redirection is on. | 384 // Helper used by PostTask() to complete the work when redirection is on. |
374 // Coalesce upon resolution of http://crbug.com/622400. | 385 // Coalesce upon resolution of http://crbug.com/622400. |
375 void PostTaskToTaskScheduler(const SequencedTask& sequenced); | 386 bool PostTaskToTaskScheduler(const SequencedTask& sequenced, |
387 const TimeDelta& delay); | |
376 | 388 |
377 // Called from within the lock, this converts the given token name into a | 389 // Called from within the lock, this converts the given token name into a |
378 // token ID, creating a new one if necessary. | 390 // token ID, creating a new one if necessary. |
379 int LockedGetNamedTokenID(const std::string& name); | 391 int LockedGetNamedTokenID(const std::string& name); |
380 | 392 |
381 // Called from within the lock, this returns the next sequence task number. | 393 // Called from within the lock, this returns the next sequence task number. |
382 int64_t LockedGetNextSequenceTaskNumber(); | 394 int64_t LockedGetNextSequenceTaskNumber(); |
383 | 395 |
384 // Gets new task. There are 3 cases depending on the return value: | 396 // Gets new task. There are 3 cases depending on the return value: |
385 // | 397 // |
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
523 TestingObserver* const testing_observer_; | 535 TestingObserver* const testing_observer_; |
524 | 536 |
525 // Members below are used for the experimental redirection to TaskScheduler. | 537 // Members below are used for the experimental redirection to TaskScheduler. |
526 // TODO(gab): Remove these if http://crbug.com/622400 fails | 538 // TODO(gab): Remove these if http://crbug.com/622400 fails |
527 // (SequencedWorkerPool will be phased out completely otherwise). | 539 // (SequencedWorkerPool will be phased out completely otherwise). |
528 | 540 |
529 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the | 541 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
530 // TaskScheduler as an experiment (unused otherwise). | 542 // TaskScheduler as an experiment (unused otherwise). |
531 const base::TaskPriority task_priority_; | 543 const base::TaskPriority task_priority_; |
532 | 544 |
545 // The single-threaded TaskRunner used to redirect tasks to the TaskScheduler. | |
546 // Always null when the pool isn't single-threaded. | |
547 TaskRunnerAndShutdownBehavior single_threaded_task_runner_; | |
548 | |
533 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect | 549 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect |
534 // SequencedWorkerPool usage to the TaskScheduler. | 550 // sequenced tasks to the TaskScheduler. |
535 std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; | 551 std::unordered_map<int, TaskRunnerAndShutdownBehavior> |
552 sequenced_task_runner_map_; | |
553 | |
554 // TaskScheduler TaskRunners to redirect unsequenced tasks to the | |
555 // TaskScheduler. | |
556 std::unordered_map<TaskShutdownBehavior, scoped_refptr<TaskRunner>> | |
557 unsequenced_task_runners_; | |
536 | 558 |
537 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as | 559 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as |
538 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). | 560 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). |
539 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). | 561 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). |
540 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; | 562 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; |
541 | 563 |
542 DISALLOW_COPY_AND_ASSIGN(Inner); | 564 DISALLOW_COPY_AND_ASSIGN(Inner); |
543 }; | 565 }; |
544 | 566 |
545 // Worker definitions --------------------------------------------------------- | 567 // Worker definitions --------------------------------------------------------- |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
660 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); | 682 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
661 SequencedTask sequenced(from_here); | 683 SequencedTask sequenced(from_here); |
662 sequenced.sequence_token_id = sequence_token.id_; | 684 sequenced.sequence_token_id = sequence_token.id_; |
663 sequenced.shutdown_behavior = shutdown_behavior; | 685 sequenced.shutdown_behavior = shutdown_behavior; |
664 sequenced.posted_from = from_here; | 686 sequenced.posted_from = from_here; |
665 sequenced.task = | 687 sequenced.task = |
666 shutdown_behavior == BLOCK_SHUTDOWN ? | 688 shutdown_behavior == BLOCK_SHUTDOWN ? |
667 base::MakeCriticalClosure(task) : task; | 689 base::MakeCriticalClosure(task) : task; |
668 sequenced.time_to_run = TimeTicks::Now() + delay; | 690 sequenced.time_to_run = TimeTicks::Now() + delay; |
669 | 691 |
692 bool task_successfully_posted = true; | |
693 | |
670 int create_thread_id = 0; | 694 int create_thread_id = 0; |
671 { | 695 { |
672 AutoLock lock(lock_); | 696 AutoLock lock(lock_); |
673 | 697 |
674 if (shutdown_called_) { | 698 if (shutdown_called_ && |
699 g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
gab
2016/09/07 14:59:09
Good catch, trunk uses atomics for this now though
| |
675 // Don't allow a new task to be posted if it doesn't block shutdown. | 700 // Don't allow a new task to be posted if it doesn't block shutdown. |
676 if (shutdown_behavior != BLOCK_SHUTDOWN) | 701 if (shutdown_behavior != BLOCK_SHUTDOWN) |
677 return false; | 702 return false; |
678 | 703 |
679 // If the current thread is running a task, and that task doesn't block | 704 // If the current thread is running a task, and that task doesn't block |
680 // shutdown, then it shouldn't be allowed to post any more tasks. | 705 // shutdown, then it shouldn't be allowed to post any more tasks. |
681 ThreadMap::const_iterator found = | 706 ThreadMap::const_iterator found = |
682 threads_.find(PlatformThread::CurrentId()); | 707 threads_.find(PlatformThread::CurrentId()); |
683 if (found != threads_.end() && found->second->is_processing_task() && | 708 if (found != threads_.end() && found->second->is_processing_task() && |
684 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { | 709 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { |
(...skipping 15 matching lines...) Expand all Loading... | |
700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), | 725 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
701 TRACE_EVENT_FLAG_FLOW_OUT); | 726 TRACE_EVENT_FLAG_FLOW_OUT); |
702 | 727 |
703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); | 728 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
704 | 729 |
705 // Now that we have the lock, apply the named token rules. | 730 // Now that we have the lock, apply the named token rules. |
706 if (optional_token_name) | 731 if (optional_token_name) |
707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | 732 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
708 | 733 |
709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 734 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
710 PostTaskToTaskScheduler(sequenced); | 735 task_successfully_posted = PostTaskToTaskScheduler(sequenced, delay); |
711 } else { | 736 } else { |
712 pending_tasks_.insert(sequenced); | 737 pending_tasks_.insert(sequenced); |
713 | 738 |
714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) | 739 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
715 blocking_shutdown_pending_task_count_++; | 740 blocking_shutdown_pending_task_count_++; |
716 | 741 |
717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 742 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
718 } | 743 } |
719 } | 744 } |
720 | 745 |
(...skipping 13 matching lines...) Expand all Loading... | |
734 // intended for one of them at runtime, confirm exclusive usage here. | 759 // intended for one of them at runtime, confirm exclusive usage here. |
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 760 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
736 DCHECK(pending_tasks_.empty()); | 761 DCHECK(pending_tasks_.empty()); |
737 DCHECK_EQ(0, create_thread_id); | 762 DCHECK_EQ(0, create_thread_id); |
738 } else { | 763 } else { |
739 DCHECK(sequenced_task_runner_map_.empty()); | 764 DCHECK(sequenced_task_runner_map_.empty()); |
740 } | 765 } |
741 } | 766 } |
742 #endif // DCHECK_IS_ON() | 767 #endif // DCHECK_IS_ON() |
743 | 768 |
744 return true; | 769 return task_successfully_posted; |
745 } | 770 } |
746 | 771 |
747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | 772 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThreadAssertSynchronized() |
748 const SequencedTask& sequenced) { | 773 const { |
774 lock_.AssertAcquired(); | |
775 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
gab
2016/09/07 14:59:09
atomic
| |
776 // TODO(fdoray): Add a special case for single-threaded pools. | |
777 if (!runs_tasks_on_verifier_) { | |
778 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | |
779 TaskTraits().WithFileIO().WithPriority(task_priority_), | |
780 ExecutionMode::PARALLEL); | |
781 } | |
782 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | |
783 } | |
784 return ContainsKey(threads_, PlatformThread::CurrentId()); | |
785 } | |
786 | |
787 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | |
788 const SequencedTask& sequenced, | |
789 const TimeDelta& delay) { | |
749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | 790 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); |
750 | 791 |
751 lock_.AssertAcquired(); | 792 lock_.AssertAcquired(); |
752 | 793 |
753 // Confirm that the TaskScheduler's shutdown behaviors use the same | 794 // Confirm that the TaskScheduler's shutdown behaviors use the same |
754 // underlying values as SequencedWorkerPool. | 795 // underlying values as SequencedWorkerPool. |
755 static_assert( | 796 static_assert( |
756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == | 797 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == |
757 static_cast<int>(CONTINUE_ON_SHUTDOWN), | 798 static_cast<int>(CONTINUE_ON_SHUTDOWN), |
758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 799 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
759 "CONTINUE_ON_SHUTDOWN."); | 800 "CONTINUE_ON_SHUTDOWN."); |
760 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == | 801 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == |
761 static_cast<int>(SKIP_ON_SHUTDOWN), | 802 static_cast<int>(SKIP_ON_SHUTDOWN), |
762 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 803 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
763 "SKIP_ON_SHUTDOWN."); | 804 "SKIP_ON_SHUTDOWN."); |
764 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == | 805 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == |
765 static_cast<int>(BLOCK_SHUTDOWN), | 806 static_cast<int>(BLOCK_SHUTDOWN), |
766 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | 807 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " |
767 "BLOCK_SHUTDOWN."); | 808 "BLOCK_SHUTDOWN."); |
768 | 809 |
769 const TaskShutdownBehavior task_shutdown_behavior = | 810 const TaskShutdownBehavior task_shutdown_behavior = |
770 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); | 811 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); |
771 const TaskTraits pool_traits = | 812 const TaskTraits pool_traits = |
772 TaskTraits() | 813 TaskTraits() |
773 .WithFileIO() | 814 .WithFileIO() |
774 .WithPriority(task_priority_) | 815 .WithPriority(task_priority_) |
775 .WithShutdownBehavior(task_shutdown_behavior); | 816 .WithShutdownBehavior(task_shutdown_behavior); |
776 | 817 |
777 // Find or create the TaskScheduler TaskRunner to redirect this task to if | 818 // Find or create the TaskScheduler TaskRunner to redirect this task to. |
778 // it is posted to a specific sequence. | 819 scoped_refptr<TaskRunner> task_runner; |
779 scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; | 820 if (max_threads_ == 1) { |
gab
2016/09/07 14:59:08
I'm thinking we might just want to disallow pools
| |
780 if (sequenced.sequence_token_id) { | 821 if (!single_threaded_task_runner_.task_runner) { |
781 sequenced_task_runner = | 822 single_threaded_task_runner_.task_runner = CreateTaskRunnerWithTraits( |
782 &sequenced_task_runner_map_[sequenced.sequence_token_id]; | 823 pool_traits, ExecutionMode::SINGLE_THREADED); |
783 if (!*sequenced_task_runner) { | 824 single_threaded_task_runner_.shutdown_behavior = task_shutdown_behavior; |
784 const ExecutionMode execution_mode = | |
785 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED | |
786 : ExecutionMode::SEQUENCED; | |
787 *sequenced_task_runner = | |
788 CreateTaskRunnerWithTraits(pool_traits, execution_mode); | |
789 } | 825 } |
826 | |
827 // Single-threaded pools can legitimately assume thread affinity. Disallow | |
828 // posting tasks with different shutdown behaviors in such pools since the | |
829 // TaskScheduler can't force them to run on the same thread. | |
830 DCHECK_EQ(task_shutdown_behavior, | |
831 single_threaded_task_runner_.shutdown_behavior); | |
gab
2016/09/07 14:59:08
Different sequences on single-threaded pool could
| |
832 | |
833 task_runner = single_threaded_task_runner_.task_runner; | |
834 } else if (sequenced.sequence_token_id) { | |
835 TaskRunnerAndShutdownBehavior& task_runner_and_shutdown_behavior = | |
836 sequenced_task_runner_map_[sequenced.sequence_token_id]; | |
837 if (!task_runner_and_shutdown_behavior.task_runner) { | |
838 task_runner_and_shutdown_behavior.task_runner = | |
839 CreateTaskRunnerWithTraits(pool_traits, ExecutionMode::SEQUENCED); | |
840 task_runner_and_shutdown_behavior.shutdown_behavior = | |
841 task_shutdown_behavior; | |
842 } | |
843 | |
844 // Posting tasks to the same sequence with different shutdown behaviors | |
845 // isn't supported by the TaskScheduler. | |
846 DCHECK_EQ(task_shutdown_behavior, | |
847 task_runner_and_shutdown_behavior.shutdown_behavior); | |
848 | |
849 task_runner = task_runner_and_shutdown_behavior.task_runner; | |
850 } else { | |
851 scoped_refptr<TaskRunner>& task_runner_for_shutdown_behavior_ref = | |
852 unsequenced_task_runners_[task_shutdown_behavior]; | |
853 if (!task_runner_for_shutdown_behavior_ref) { | |
854 task_runner_for_shutdown_behavior_ref = | |
855 CreateTaskRunnerWithTraits(pool_traits, ExecutionMode::PARALLEL); | |
856 } | |
857 | |
858 task_runner = task_runner_for_shutdown_behavior_ref; | |
790 } | 859 } |
791 | 860 |
792 if (sequenced_task_runner) { | 861 return task_runner->PostDelayedTask(sequenced.posted_from, sequenced.task, |
793 (*sequenced_task_runner) | 862 delay); |
794 ->PostTask(sequenced.posted_from, sequenced.task); | |
795 } else { | |
796 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are | |
797 // however a few pools that use only one thread and therefore can currently | |
798 // legitimatelly assume thread affinity despite using SequencedWorkerPool. | |
799 // Such pools typically only give access to their TaskRunner which will be | |
800 // SINGLE_THREADED per nature of the pool having only one thread but this | |
801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() | |
802 // directly. | |
803 DCHECK_GT(max_threads_, 1U); | |
804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, | |
805 sequenced.task); | |
806 } | |
807 } | 863 } |
808 | 864 |
809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 865 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
810 AutoLock lock(lock_); | 866 AutoLock lock(lock_); |
811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 867 return RunsTasksOnCurrentThreadAssertSynchronized(); |
812 if (!runs_tasks_on_verifier_) { | |
813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | |
814 TaskTraits().WithFileIO().WithPriority(task_priority_), | |
815 ExecutionMode::PARALLEL); | |
816 } | |
817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | |
818 } else { | |
819 return ContainsKey(threads_, PlatformThread::CurrentId()); | |
820 } | |
821 } | 868 } |
822 | 869 |
823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 870 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
824 SequenceToken sequence_token) const { | 871 SequenceToken sequence_token) const { |
825 AutoLock lock(lock_); | 872 AutoLock lock(lock_); |
826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | 873 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
874 if (!sequence_token.IsValid()) | |
875 return RunsTasksOnCurrentThreadAssertSynchronized(); | |
danakj
2016/09/01 23:16:34
The non-task runner code would return false if the
| |
876 | |
827 // TODO(gab): This currently only verifies that the current thread is a | 877 // TODO(gab): This currently only verifies that the current thread is a |
danakj
2016/09/01 23:16:34
This TODO applies to the above also right
| |
828 // thread on which a task bound to |sequence_token| *could* run, but it | 878 // thread on which a task bound to |sequence_token| *could* run, but it |
829 // doesn't verify that the current is *currently running* a task bound to | 879 // doesn't verify that the current is *currently running* a task bound to |
830 // |sequence_token|. | 880 // |sequence_token|. |
831 const auto sequenced_task_runner_it = | 881 const auto sequenced_task_runner_it = |
832 sequenced_task_runner_map_.find(sequence_token.id_); | 882 sequenced_task_runner_map_.find(sequence_token.id_); |
833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && | 883 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && |
834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); | 884 sequenced_task_runner_it->second.task_runner |
885 ->RunsTasksOnCurrentThread(); | |
835 } else { | 886 } else { |
836 ThreadMap::const_iterator found = | 887 ThreadMap::const_iterator found = |
837 threads_.find(PlatformThread::CurrentId()); | 888 threads_.find(PlatformThread::CurrentId()); |
838 if (found == threads_.end()) | 889 if (found == threads_.end()) |
839 return false; | 890 return false; |
840 return found->second->is_processing_task() && | 891 return found->second->is_processing_task() && |
841 sequence_token.Equals(found->second->task_sequence_token()); | 892 sequence_token.Equals(found->second->task_sequence_token()); |
842 } | 893 } |
843 } | 894 } |
844 | 895 |
(...skipping 526 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1371 scoped_refptr<SequencedWorkerPool> | 1422 scoped_refptr<SequencedWorkerPool> |
1372 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1423 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
1373 Worker* worker = Worker::GetForCurrentThread(); | 1424 Worker* worker = Worker::GetForCurrentThread(); |
1374 if (!worker) | 1425 if (!worker) |
1375 return nullptr; | 1426 return nullptr; |
1376 | 1427 |
1377 return worker->worker_pool(); | 1428 return worker->worker_pool(); |
1378 } | 1429 } |
1379 | 1430 |
1380 // static | 1431 // static |
1381 void SequencedWorkerPool:: | 1432 void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() { |
1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { | |
1383 DCHECK(TaskScheduler::GetInstance()); | 1433 DCHECK(TaskScheduler::GetInstance()); |
1384 // Hitting this DCHECK indicates that a task was posted to a | 1434 // Hitting this DCHECK indicates that a task was posted to a |
1385 // SequencedWorkerPool before the TaskScheduler was initialized and | 1435 // SequencedWorkerPool before the TaskScheduler was initialized and |
1386 // redirected, posting task to SequencedWorkerPools needs to at least be | 1436 // redirected, posting task to SequencedWorkerPools needs to at least be |
1387 // delayed until after that point. | 1437 // delayed until after that point. |
1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | 1438 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); |
1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; | 1439 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; |
1390 } | 1440 } |
1391 | 1441 |
1442 // static | |
1443 void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() { | |
1444 g_all_pools_state = AllPoolsState::NONE_ACTIVE; | |
danakj
2016/09/01 23:16:34
DCHECK that it's REDIRECTED_TO_TASK_SCHEDULER curr
gab
2016/09/07 14:59:08
And use atomics now.
| |
1445 } | |
1446 | |
1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1447 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1393 const std::string& thread_name_prefix, | 1448 const std::string& thread_name_prefix, |
1394 base::TaskPriority task_priority) | 1449 base::TaskPriority task_priority) |
1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1450 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1396 inner_(new Inner(this, | 1451 inner_(new Inner(this, |
1397 max_threads, | 1452 max_threads, |
1398 thread_name_prefix, | 1453 thread_name_prefix, |
1399 task_priority, | 1454 task_priority, |
1400 NULL)) {} | 1455 NULL)) {} |
1401 | 1456 |
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1521 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1576 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
1522 return inner_->RunsTasksOnCurrentThread(); | 1577 return inner_->RunsTasksOnCurrentThread(); |
1523 } | 1578 } |
1524 | 1579 |
1525 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1580 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1526 SequenceToken sequence_token) const { | 1581 SequenceToken sequence_token) const { |
1527 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1582 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1528 } | 1583 } |
1529 | 1584 |
1530 void SequencedWorkerPool::FlushForTesting() { | 1585 void SequencedWorkerPool::FlushForTesting() { |
1586 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
gab
2016/09/07 14:59:08
Atomics
| |
1531 inner_->CleanupForTesting(); | 1587 inner_->CleanupForTesting(); |
1532 } | 1588 } |
1533 | 1589 |
1534 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1590 void SequencedWorkerPool::SignalHasWorkForTesting() { |
1535 inner_->SignalHasWorkForTesting(); | 1591 inner_->SignalHasWorkForTesting(); |
1536 } | 1592 } |
1537 | 1593 |
1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1594 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1595 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1596 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1541 } | 1597 } |
1542 | 1598 |
1543 bool SequencedWorkerPool::IsShutdownInProgress() { | 1599 bool SequencedWorkerPool::IsShutdownInProgress() { |
1544 return inner_->IsShutdownInProgress(); | 1600 return inner_->IsShutdownInProgress(); |
1545 } | 1601 } |
1546 | 1602 |
1547 } // namespace base | 1603 } // namespace base |
OLD | NEW |