Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2285633003: Test SequencedWorkerPool with redirection to the TaskScheduler. (Closed)
Patch Set: CR robliao #7 Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <unordered_map>
13 #include <utility> 14 #include <utility>
14 #include <vector> 15 #include <vector>
15 16
16 #include "base/atomic_sequence_num.h" 17 #include "base/atomic_sequence_num.h"
17 #include "base/callback.h" 18 #include "base/callback.h"
18 #include "base/compiler_specific.h" 19 #include "base/compiler_specific.h"
19 #include "base/critical_closure.h" 20 #include "base/critical_closure.h"
20 #include "base/lazy_instance.h" 21 #include "base/lazy_instance.h"
21 #include "base/logging.h" 22 #include "base/logging.h"
22 #include "base/macros.h" 23 #include "base/macros.h"
(...skipping 22 matching lines...) Expand all
45 #endif 46 #endif
46 47
47 #if !defined(OS_NACL) 48 #if !defined(OS_NACL)
48 #include "base/metrics/histogram.h" 49 #include "base/metrics/histogram.h"
49 #endif 50 #endif
50 51
51 namespace base { 52 namespace base {
52 53
53 namespace { 54 namespace {
54 55
55 // An enum representing the state of all pools. Any given process should only 56 // An enum representing the state of all pools. Any given non-test process
56 // ever transition from NONE_ACTIVE to the active states, transitions between 57 // should only ever transition from NONE_ACTIVE to the active states,
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition 58 // transitions between actives states are unexpected. The
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called 59 // REDIRECTED_TO_TASK_SCHEDULER transition occurs when
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created 60 // RedirectToTaskSchedulerForProcess() is called and the WORKER_CREATED
60 // because the first task was posted and the state is still NONE_ACTIVE. 61 // transition occurs when a Worker needs to be created because the first task
62 // was posted and the state is still NONE_ACTIVE. In a test process,
63 // ResetRedirectToTaskSchedulerForProcessForTesting() causes a transition to
64 // the NONE_ACTIVE state.
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool 65 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
62 // will be phased out completely otherwise). 66 // will be phased out completely otherwise).
63 enum class AllPoolsState { 67 enum class AllPoolsState {
64 NONE_ACTIVE, 68 NONE_ACTIVE,
65 WORKER_CREATED, 69 WORKER_CREATED,
66 REDIRECTED_TO_TASK_SCHEDULER, 70 REDIRECTED_TO_TASK_SCHEDULER,
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; 71 } g_all_pools_state = AllPoolsState::NONE_ACTIVE;
68 72
69 struct SequencedTask : public TrackingInfo { 73 struct SequencedTask : public TrackingInfo {
70 SequencedTask() 74 SequencedTask()
(...skipping 292 matching lines...) Expand 10 before | Expand all | Expand 10 after
363 }; 367 };
364 368
365 enum CleanupState { 369 enum CleanupState {
366 CLEANUP_REQUESTED, 370 CLEANUP_REQUESTED,
367 CLEANUP_STARTING, 371 CLEANUP_STARTING,
368 CLEANUP_RUNNING, 372 CLEANUP_RUNNING,
369 CLEANUP_FINISHING, 373 CLEANUP_FINISHING,
370 CLEANUP_DONE, 374 CLEANUP_DONE,
371 }; 375 };
372 376
377 struct TaskRunnerAndShutdownBehavior {
378 scoped_refptr<TaskRunner> task_runner;
379 TaskShutdownBehavior shutdown_behavior;
380 };
381
382 bool RunsTasksOnCurrentThreadAssertSynchronized() const;
383
373 // Helper used by PostTask() to complete the work when redirection is on. 384 // Helper used by PostTask() to complete the work when redirection is on.
374 // Coalesce upon resolution of http://crbug.com/622400. 385 // Coalesce upon resolution of http://crbug.com/622400.
375 void PostTaskToTaskScheduler(const SequencedTask& sequenced); 386 bool PostTaskToTaskScheduler(const SequencedTask& sequenced,
387 const TimeDelta& delay);
376 388
377 // Called from within the lock, this converts the given token name into a 389 // Called from within the lock, this converts the given token name into a
378 // token ID, creating a new one if necessary. 390 // token ID, creating a new one if necessary.
379 int LockedGetNamedTokenID(const std::string& name); 391 int LockedGetNamedTokenID(const std::string& name);
380 392
381 // Called from within the lock, this returns the next sequence task number. 393 // Called from within the lock, this returns the next sequence task number.
382 int64_t LockedGetNextSequenceTaskNumber(); 394 int64_t LockedGetNextSequenceTaskNumber();
383 395
384 // Gets new task. There are 3 cases depending on the return value: 396 // Gets new task. There are 3 cases depending on the return value:
385 // 397 //
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
523 TestingObserver* const testing_observer_; 535 TestingObserver* const testing_observer_;
524 536
525 // Members below are used for the experimental redirection to TaskScheduler. 537 // Members below are used for the experimental redirection to TaskScheduler.
526 // TODO(gab): Remove these if http://crbug.com/622400 fails 538 // TODO(gab): Remove these if http://crbug.com/622400 fails
527 // (SequencedWorkerPool will be phased out completely otherwise). 539 // (SequencedWorkerPool will be phased out completely otherwise).
528 540
529 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the 541 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
530 // TaskScheduler as an experiment (unused otherwise). 542 // TaskScheduler as an experiment (unused otherwise).
531 const base::TaskPriority task_priority_; 543 const base::TaskPriority task_priority_;
532 544
545 // The single-threaded TaskRunner used to redirect tasks to the TaskScheduler.
546 // Always null when the pool isn't single-threaded.
547 TaskRunnerAndShutdownBehavior single_threaded_task_runner_;
548
533 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect 549 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
534 // SequencedWorkerPool usage to the TaskScheduler. 550 // sequenced tasks to the TaskScheduler.
535 std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; 551 std::unordered_map<int, TaskRunnerAndShutdownBehavior>
552 sequenced_task_runner_map_;
553
554 // TaskScheduler TaskRunners to redirect unsequenced tasks to the
555 // TaskScheduler.
556 std::unordered_map<TaskShutdownBehavior, scoped_refptr<TaskRunner>>
557 unsequenced_task_runners_;
536 558
537 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as 559 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
538 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). 560 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
539 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). 561 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
540 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; 562 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
541 563
542 DISALLOW_COPY_AND_ASSIGN(Inner); 564 DISALLOW_COPY_AND_ASSIGN(Inner);
543 }; 565 };
544 566
545 // Worker definitions --------------------------------------------------------- 567 // Worker definitions ---------------------------------------------------------
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
660 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 682 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
661 SequencedTask sequenced(from_here); 683 SequencedTask sequenced(from_here);
662 sequenced.sequence_token_id = sequence_token.id_; 684 sequenced.sequence_token_id = sequence_token.id_;
663 sequenced.shutdown_behavior = shutdown_behavior; 685 sequenced.shutdown_behavior = shutdown_behavior;
664 sequenced.posted_from = from_here; 686 sequenced.posted_from = from_here;
665 sequenced.task = 687 sequenced.task =
666 shutdown_behavior == BLOCK_SHUTDOWN ? 688 shutdown_behavior == BLOCK_SHUTDOWN ?
667 base::MakeCriticalClosure(task) : task; 689 base::MakeCriticalClosure(task) : task;
668 sequenced.time_to_run = TimeTicks::Now() + delay; 690 sequenced.time_to_run = TimeTicks::Now() + delay;
669 691
692 bool task_successfully_posted = true;
693
670 int create_thread_id = 0; 694 int create_thread_id = 0;
671 { 695 {
672 AutoLock lock(lock_); 696 AutoLock lock(lock_);
673 697
674 if (shutdown_called_) { 698 if (shutdown_called_ &&
699 g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
gab 2016/09/07 14:59:09 Good catch, trunk uses atomics for this now though
675 // Don't allow a new task to be posted if it doesn't block shutdown. 700 // Don't allow a new task to be posted if it doesn't block shutdown.
676 if (shutdown_behavior != BLOCK_SHUTDOWN) 701 if (shutdown_behavior != BLOCK_SHUTDOWN)
677 return false; 702 return false;
678 703
679 // If the current thread is running a task, and that task doesn't block 704 // If the current thread is running a task, and that task doesn't block
680 // shutdown, then it shouldn't be allowed to post any more tasks. 705 // shutdown, then it shouldn't be allowed to post any more tasks.
681 ThreadMap::const_iterator found = 706 ThreadMap::const_iterator found =
682 threads_.find(PlatformThread::CurrentId()); 707 threads_.find(PlatformThread::CurrentId());
683 if (found != threads_.end() && found->second->is_processing_task() && 708 if (found != threads_.end() && found->second->is_processing_task() &&
684 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { 709 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
(...skipping 15 matching lines...) Expand all
700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 725 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
701 TRACE_EVENT_FLAG_FLOW_OUT); 726 TRACE_EVENT_FLAG_FLOW_OUT);
702 727
703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 728 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
704 729
705 // Now that we have the lock, apply the named token rules. 730 // Now that we have the lock, apply the named token rules.
706 if (optional_token_name) 731 if (optional_token_name)
707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 732 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
708 733
709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 734 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
710 PostTaskToTaskScheduler(sequenced); 735 task_successfully_posted = PostTaskToTaskScheduler(sequenced, delay);
711 } else { 736 } else {
712 pending_tasks_.insert(sequenced); 737 pending_tasks_.insert(sequenced);
713 738
714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) 739 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
715 blocking_shutdown_pending_task_count_++; 740 blocking_shutdown_pending_task_count_++;
716 741
717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 742 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
718 } 743 }
719 } 744 }
720 745
(...skipping 13 matching lines...) Expand all
734 // intended for one of them at runtime, confirm exclusive usage here. 759 // intended for one of them at runtime, confirm exclusive usage here.
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 760 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
736 DCHECK(pending_tasks_.empty()); 761 DCHECK(pending_tasks_.empty());
737 DCHECK_EQ(0, create_thread_id); 762 DCHECK_EQ(0, create_thread_id);
738 } else { 763 } else {
739 DCHECK(sequenced_task_runner_map_.empty()); 764 DCHECK(sequenced_task_runner_map_.empty());
740 } 765 }
741 } 766 }
742 #endif // DCHECK_IS_ON() 767 #endif // DCHECK_IS_ON()
743 768
744 return true; 769 return task_successfully_posted;
745 } 770 }
746 771
747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 772 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThreadAssertSynchronized()
748 const SequencedTask& sequenced) { 773 const {
774 lock_.AssertAcquired();
775 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
gab 2016/09/07 14:59:09 atomic
776 // TODO(fdoray): Add a special case for single-threaded pools.
777 if (!runs_tasks_on_verifier_) {
778 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
779 TaskTraits().WithFileIO().WithPriority(task_priority_),
780 ExecutionMode::PARALLEL);
781 }
782 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
783 }
784 return ContainsKey(threads_, PlatformThread::CurrentId());
785 }
786
787 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
788 const SequencedTask& sequenced,
789 const TimeDelta& delay) {
749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 790 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
750 791
751 lock_.AssertAcquired(); 792 lock_.AssertAcquired();
752 793
753 // Confirm that the TaskScheduler's shutdown behaviors use the same 794 // Confirm that the TaskScheduler's shutdown behaviors use the same
754 // underlying values as SequencedWorkerPool. 795 // underlying values as SequencedWorkerPool.
755 static_assert( 796 static_assert(
756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 797 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
757 static_cast<int>(CONTINUE_ON_SHUTDOWN), 798 static_cast<int>(CONTINUE_ON_SHUTDOWN),
758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 799 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
759 "CONTINUE_ON_SHUTDOWN."); 800 "CONTINUE_ON_SHUTDOWN.");
760 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 801 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
761 static_cast<int>(SKIP_ON_SHUTDOWN), 802 static_cast<int>(SKIP_ON_SHUTDOWN),
762 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 803 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
763 "SKIP_ON_SHUTDOWN."); 804 "SKIP_ON_SHUTDOWN.");
764 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == 805 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
765 static_cast<int>(BLOCK_SHUTDOWN), 806 static_cast<int>(BLOCK_SHUTDOWN),
766 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 807 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
767 "BLOCK_SHUTDOWN."); 808 "BLOCK_SHUTDOWN.");
768 809
769 const TaskShutdownBehavior task_shutdown_behavior = 810 const TaskShutdownBehavior task_shutdown_behavior =
770 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); 811 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
771 const TaskTraits pool_traits = 812 const TaskTraits pool_traits =
772 TaskTraits() 813 TaskTraits()
773 .WithFileIO() 814 .WithFileIO()
774 .WithPriority(task_priority_) 815 .WithPriority(task_priority_)
775 .WithShutdownBehavior(task_shutdown_behavior); 816 .WithShutdownBehavior(task_shutdown_behavior);
776 817
777 // Find or create the TaskScheduler TaskRunner to redirect this task to if 818 // Find or create the TaskScheduler TaskRunner to redirect this task to.
778 // it is posted to a specific sequence. 819 scoped_refptr<TaskRunner> task_runner;
779 scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; 820 if (max_threads_ == 1) {
gab 2016/09/07 14:59:08 I'm thinking we might just want to disallow pools
780 if (sequenced.sequence_token_id) { 821 if (!single_threaded_task_runner_.task_runner) {
781 sequenced_task_runner = 822 single_threaded_task_runner_.task_runner = CreateTaskRunnerWithTraits(
782 &sequenced_task_runner_map_[sequenced.sequence_token_id]; 823 pool_traits, ExecutionMode::SINGLE_THREADED);
783 if (!*sequenced_task_runner) { 824 single_threaded_task_runner_.shutdown_behavior = task_shutdown_behavior;
784 const ExecutionMode execution_mode =
785 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
786 : ExecutionMode::SEQUENCED;
787 *sequenced_task_runner =
788 CreateTaskRunnerWithTraits(pool_traits, execution_mode);
789 } 825 }
826
827 // Single-threaded pools can legitimately assume thread affinity. Disallow
828 // posting tasks with different shutdown behaviors in such pools since the
829 // TaskScheduler can't force them to run on the same thread.
830 DCHECK_EQ(task_shutdown_behavior,
831 single_threaded_task_runner_.shutdown_behavior);
gab 2016/09/07 14:59:08 Different sequences on single-threaded pool could
832
833 task_runner = single_threaded_task_runner_.task_runner;
834 } else if (sequenced.sequence_token_id) {
835 TaskRunnerAndShutdownBehavior& task_runner_and_shutdown_behavior =
836 sequenced_task_runner_map_[sequenced.sequence_token_id];
837 if (!task_runner_and_shutdown_behavior.task_runner) {
838 task_runner_and_shutdown_behavior.task_runner =
839 CreateTaskRunnerWithTraits(pool_traits, ExecutionMode::SEQUENCED);
840 task_runner_and_shutdown_behavior.shutdown_behavior =
841 task_shutdown_behavior;
842 }
843
844 // Posting tasks to the same sequence with different shutdown behaviors
845 // isn't supported by the TaskScheduler.
846 DCHECK_EQ(task_shutdown_behavior,
847 task_runner_and_shutdown_behavior.shutdown_behavior);
848
849 task_runner = task_runner_and_shutdown_behavior.task_runner;
850 } else {
851 scoped_refptr<TaskRunner>& task_runner_for_shutdown_behavior_ref =
852 unsequenced_task_runners_[task_shutdown_behavior];
853 if (!task_runner_for_shutdown_behavior_ref) {
854 task_runner_for_shutdown_behavior_ref =
855 CreateTaskRunnerWithTraits(pool_traits, ExecutionMode::PARALLEL);
856 }
857
858 task_runner = task_runner_for_shutdown_behavior_ref;
790 } 859 }
791 860
792 if (sequenced_task_runner) { 861 return task_runner->PostDelayedTask(sequenced.posted_from, sequenced.task,
793 (*sequenced_task_runner) 862 delay);
794 ->PostTask(sequenced.posted_from, sequenced.task);
795 } else {
796 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
797 // however a few pools that use only one thread and therefore can currently
798 // legitimatelly assume thread affinity despite using SequencedWorkerPool.
799 // Such pools typically only give access to their TaskRunner which will be
800 // SINGLE_THREADED per nature of the pool having only one thread but this
801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask()
802 // directly.
803 DCHECK_GT(max_threads_, 1U);
804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
805 sequenced.task);
806 }
807 } 863 }
808 864
809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 865 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
810 AutoLock lock(lock_); 866 AutoLock lock(lock_);
811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 867 return RunsTasksOnCurrentThreadAssertSynchronized();
812 if (!runs_tasks_on_verifier_) {
813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
814 TaskTraits().WithFileIO().WithPriority(task_priority_),
815 ExecutionMode::PARALLEL);
816 }
817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
818 } else {
819 return ContainsKey(threads_, PlatformThread::CurrentId());
820 }
821 } 868 }
822 869
823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 870 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
824 SequenceToken sequence_token) const { 871 SequenceToken sequence_token) const {
825 AutoLock lock(lock_); 872 AutoLock lock(lock_);
826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 873 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
874 if (!sequence_token.IsValid())
875 return RunsTasksOnCurrentThreadAssertSynchronized();
danakj 2016/09/01 23:16:34 The non-task runner code would return false if the
876
827 // TODO(gab): This currently only verifies that the current thread is a 877 // TODO(gab): This currently only verifies that the current thread is a
danakj 2016/09/01 23:16:34 This TODO applies to the above also right
828 // thread on which a task bound to |sequence_token| *could* run, but it 878 // thread on which a task bound to |sequence_token| *could* run, but it
829 // doesn't verify that the current is *currently running* a task bound to 879 // doesn't verify that the current is *currently running* a task bound to
830 // |sequence_token|. 880 // |sequence_token|.
831 const auto sequenced_task_runner_it = 881 const auto sequenced_task_runner_it =
832 sequenced_task_runner_map_.find(sequence_token.id_); 882 sequenced_task_runner_map_.find(sequence_token.id_);
833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && 883 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); 884 sequenced_task_runner_it->second.task_runner
885 ->RunsTasksOnCurrentThread();
835 } else { 886 } else {
836 ThreadMap::const_iterator found = 887 ThreadMap::const_iterator found =
837 threads_.find(PlatformThread::CurrentId()); 888 threads_.find(PlatformThread::CurrentId());
838 if (found == threads_.end()) 889 if (found == threads_.end())
839 return false; 890 return false;
840 return found->second->is_processing_task() && 891 return found->second->is_processing_task() &&
841 sequence_token.Equals(found->second->task_sequence_token()); 892 sequence_token.Equals(found->second->task_sequence_token());
842 } 893 }
843 } 894 }
844 895
(...skipping 526 matching lines...) Expand 10 before | Expand all | Expand 10 after
1371 scoped_refptr<SequencedWorkerPool> 1422 scoped_refptr<SequencedWorkerPool>
1372 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1423 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1373 Worker* worker = Worker::GetForCurrentThread(); 1424 Worker* worker = Worker::GetForCurrentThread();
1374 if (!worker) 1425 if (!worker)
1375 return nullptr; 1426 return nullptr;
1376 1427
1377 return worker->worker_pool(); 1428 return worker->worker_pool();
1378 } 1429 }
1379 1430
1380 // static 1431 // static
1381 void SequencedWorkerPool:: 1432 void SequencedWorkerPool::RedirectToTaskSchedulerForProcess() {
1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
1383 DCHECK(TaskScheduler::GetInstance()); 1433 DCHECK(TaskScheduler::GetInstance());
1384 // Hitting this DCHECK indicates that a task was posted to a 1434 // Hitting this DCHECK indicates that a task was posted to a
1385 // SequencedWorkerPool before the TaskScheduler was initialized and 1435 // SequencedWorkerPool before the TaskScheduler was initialized and
1386 // redirected, posting task to SequencedWorkerPools needs to at least be 1436 // redirected, posting task to SequencedWorkerPools needs to at least be
1387 // delayed until after that point. 1437 // delayed until after that point.
1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); 1438 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; 1439 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
1390 } 1440 }
1391 1441
1442 // static
1443 void SequencedWorkerPool::ResetRedirectToTaskSchedulerForProcessForTesting() {
1444 g_all_pools_state = AllPoolsState::NONE_ACTIVE;
danakj 2016/09/01 23:16:34 DCHECK that it's REDIRECTED_TO_TASK_SCHEDULER curr
gab 2016/09/07 14:59:08 And use atomics now.
1445 }
1446
1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1447 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1393 const std::string& thread_name_prefix, 1448 const std::string& thread_name_prefix,
1394 base::TaskPriority task_priority) 1449 base::TaskPriority task_priority)
1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1450 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1396 inner_(new Inner(this, 1451 inner_(new Inner(this,
1397 max_threads, 1452 max_threads,
1398 thread_name_prefix, 1453 thread_name_prefix,
1399 task_priority, 1454 task_priority,
1400 NULL)) {} 1455 NULL)) {}
1401 1456
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
1521 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1576 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1522 return inner_->RunsTasksOnCurrentThread(); 1577 return inner_->RunsTasksOnCurrentThread();
1523 } 1578 }
1524 1579
1525 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1580 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1526 SequenceToken sequence_token) const { 1581 SequenceToken sequence_token) const {
1527 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1582 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1528 } 1583 }
1529 1584
1530 void SequencedWorkerPool::FlushForTesting() { 1585 void SequencedWorkerPool::FlushForTesting() {
1586 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
gab 2016/09/07 14:59:08 Atomics
1531 inner_->CleanupForTesting(); 1587 inner_->CleanupForTesting();
1532 } 1588 }
1533 1589
1534 void SequencedWorkerPool::SignalHasWorkForTesting() { 1590 void SequencedWorkerPool::SignalHasWorkForTesting() {
1535 inner_->SignalHasWorkForTesting(); 1591 inner_->SignalHasWorkForTesting();
1536 } 1592 }
1537 1593
1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1594 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1595 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1596 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1541 } 1597 }
1542 1598
1543 bool SequencedWorkerPool::IsShutdownInProgress() { 1599 bool SequencedWorkerPool::IsShutdownInProgress() {
1544 return inner_->IsShutdownInProgress(); 1600 return inner_->IsShutdownInProgress();
1545 } 1601 }
1546 1602
1547 } // namespace base 1603 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698