Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(211)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2241703002: Add an experiment to redirect SequencedWorkerPool tasks to TaskScheduler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b3_delay_metrics_blocking
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <utility> 13 #include <utility>
14 #include <vector> 14 #include <vector>
15 15
16 #include "base/atomic_sequence_num.h" 16 #include "base/atomic_sequence_num.h"
17 #include "base/callback.h" 17 #include "base/callback.h"
18 #include "base/compiler_specific.h" 18 #include "base/compiler_specific.h"
19 #include "base/critical_closure.h" 19 #include "base/critical_closure.h"
20 #include "base/lazy_instance.h" 20 #include "base/lazy_instance.h"
21 #include "base/logging.h" 21 #include "base/logging.h"
22 #include "base/macros.h" 22 #include "base/macros.h"
23 #include "base/memory/ptr_util.h" 23 #include "base/memory/ptr_util.h"
24 #include "base/stl_util.h" 24 #include "base/stl_util.h"
25 #include "base/strings/stringprintf.h" 25 #include "base/strings/stringprintf.h"
26 #include "base/synchronization/condition_variable.h" 26 #include "base/synchronization/condition_variable.h"
27 #include "base/synchronization/lock.h" 27 #include "base/synchronization/lock.h"
28 #include "base/task_scheduler/post_task.h"
29 #include "base/task_scheduler/task_scheduler.h"
28 #include "base/threading/platform_thread.h" 30 #include "base/threading/platform_thread.h"
29 #include "base/threading/simple_thread.h" 31 #include "base/threading/simple_thread.h"
30 #include "base/threading/thread_local.h" 32 #include "base/threading/thread_local.h"
31 #include "base/threading/thread_restrictions.h" 33 #include "base/threading/thread_restrictions.h"
32 #include "base/threading/thread_task_runner_handle.h" 34 #include "base/threading/thread_task_runner_handle.h"
33 #include "base/time/time.h" 35 #include "base/time/time.h"
34 #include "base/trace_event/heap_profiler.h" 36 #include "base/trace_event/heap_profiler.h"
35 #include "base/trace_event/trace_event.h" 37 #include "base/trace_event/trace_event.h"
36 #include "base/tracked_objects.h" 38 #include "base/tracked_objects.h"
37 #include "build/build_config.h" 39 #include "build/build_config.h"
38 40
39 #if defined(OS_MACOSX) 41 #if defined(OS_MACOSX)
40 #include "base/mac/scoped_nsautorelease_pool.h" 42 #include "base/mac/scoped_nsautorelease_pool.h"
41 #elif defined(OS_WIN) 43 #elif defined(OS_WIN)
42 #include "base/win/scoped_com_initializer.h" 44 #include "base/win/scoped_com_initializer.h"
43 #endif 45 #endif
44 46
45 #if !defined(OS_NACL) 47 #if !defined(OS_NACL)
46 #include "base/metrics/histogram.h" 48 #include "base/metrics/histogram.h"
47 #endif 49 #endif
48 50
49 namespace base { 51 namespace base {
50 52
51 namespace { 53 namespace {
52 54
55 // An enum representing the state of all pools. Any given process should only
56 // ever transition from NONE_ACTIVE to the active states, transitions between
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created
60 // because the first task was posted and the state is still NONE_ACTIVE.
brettw 2016/08/12 19:30:00 This should have a reference to a bug to remove it
gab 2016/08/12 23:27:45 Done.
61 enum class AllPoolsState {
62 NONE_ACTIVE,
63 WORKER_CREATED,
64 REDIRECTED_TO_TASK_SCHEDULER,
65 } g_all_pools_state = AllPoolsState::NONE_ACTIVE;
66
53 struct SequencedTask : public TrackingInfo { 67 struct SequencedTask : public TrackingInfo {
54 SequencedTask() 68 SequencedTask()
55 : sequence_token_id(0), 69 : sequence_token_id(0),
56 trace_id(0), 70 trace_id(0),
57 sequence_task_number(0), 71 sequence_task_number(0),
58 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 72 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
59 73
60 explicit SequencedTask(const tracked_objects::Location& from_here) 74 explicit SequencedTask(const tracked_objects::Location& from_here)
61 : base::TrackingInfo(from_here, TimeTicks()), 75 : base::TrackingInfo(from_here, TimeTicks()),
62 sequence_token_id(0), 76 sequence_token_id(0),
(...skipping 432 matching lines...) Expand 10 before | Expand all | Expand 10 after
495 // has been called. 509 // has been called.
496 int max_blocking_tasks_after_shutdown_; 510 int max_blocking_tasks_after_shutdown_;
497 511
498 // State used to cleanup for testing, all guarded by lock_. 512 // State used to cleanup for testing, all guarded by lock_.
499 CleanupState cleanup_state_; 513 CleanupState cleanup_state_;
500 size_t cleanup_idlers_; 514 size_t cleanup_idlers_;
501 ConditionVariable cleanup_cv_; 515 ConditionVariable cleanup_cv_;
502 516
503 TestingObserver* const testing_observer_; 517 TestingObserver* const testing_observer_;
504 518
519 /* Members used for the experimental redirection to TaskScheduler. */
brettw 2016/08/12 17:44:39 Can this be a normal // comment?
gab 2016/08/12 23:27:46 Sure, a /* */ comment felt more like a split from
520
505 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the 521 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
506 // TaskScheduler as an experiment (unused otherwise). 522 // TaskScheduler as an experiment (unused otherwise).
507 const base::TaskPriority task_priority_; 523 const base::TaskPriority task_priority_;
508 524
525 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
brettw 2016/08/12 19:30:00 This should have a reference to a bug to remove it
gab 2016/08/12 23:27:46 Done.
526 // SequencedWorkerPool usage to the TaskScheduler.
527 std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
528
529 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
brettw 2016/08/12 19:30:00 This should have a reference to a bug to remove it
gab 2016/08/12 23:27:46 Done.
530 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
531 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
532 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
533
509 DISALLOW_COPY_AND_ASSIGN(Inner); 534 DISALLOW_COPY_AND_ASSIGN(Inner);
510 }; 535 };
511 536
512 // Worker definitions --------------------------------------------------------- 537 // Worker definitions ---------------------------------------------------------
513 538
514 SequencedWorkerPool::Worker::Worker( 539 SequencedWorkerPool::Worker::Worker(
515 scoped_refptr<SequencedWorkerPool> worker_pool, 540 scoped_refptr<SequencedWorkerPool> worker_pool,
516 int thread_number, 541 int thread_number,
517 const std::string& prefix) 542 const std::string& prefix)
518 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 543 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
519 worker_pool_(std::move(worker_pool)), 544 worker_pool_(std::move(worker_pool)),
520 task_shutdown_behavior_(BLOCK_SHUTDOWN), 545 task_shutdown_behavior_(BLOCK_SHUTDOWN),
521 is_processing_task_(false) { 546 is_processing_task_(false) {
547 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
522 Start(); 548 Start();
523 } 549 }
524 550
525 SequencedWorkerPool::Worker::~Worker() { 551 SequencedWorkerPool::Worker::~Worker() {
526 } 552 }
527 553
528 void SequencedWorkerPool::Worker::Run() { 554 void SequencedWorkerPool::Worker::Run() {
555 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
556
529 #if defined(OS_WIN) 557 #if defined(OS_WIN)
530 win::ScopedCOMInitializer com_initializer; 558 win::ScopedCOMInitializer com_initializer;
531 #endif 559 #endif
532 560
533 // Store a pointer to this worker in thread local storage for static function 561 // Store a pointer to this worker in thread local storage for static function
534 // access. 562 // access.
535 DCHECK(!lazy_tls_ptr_.Get().Get()); 563 DCHECK(!lazy_tls_ptr_.Get().Get());
536 lazy_tls_ptr_.Get().Set(this); 564 lazy_tls_ptr_.Get().Set(this);
537 565
538 // Just jump back to the Inner object to run the thread, since it has all the 566 // Just jump back to the Inner object to run the thread, since it has all the
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
662 "SequencedWorkerPool::Inner::PostTask", 690 "SequencedWorkerPool::Inner::PostTask",
663 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 691 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
664 TRACE_EVENT_FLAG_FLOW_OUT); 692 TRACE_EVENT_FLAG_FLOW_OUT);
665 693
666 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 694 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
667 695
668 // Now that we have the lock, apply the named token rules. 696 // Now that we have the lock, apply the named token rules.
669 if (optional_token_name) 697 if (optional_token_name)
670 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 698 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
671 699
672 pending_tasks_.insert(sequenced); 700 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
673 if (shutdown_behavior == BLOCK_SHUTDOWN) 701 // Confirm that the TaskScheduler's shutdown behaviors use the same
brettw 2016/08/12 19:30:00 This should have a reference to a bug to remove it
gab 2016/08/12 23:27:46 Doesn't |g_all_pools_state| cover this?
674 blocking_shutdown_pending_task_count_++; 702 // underlying values as SequencedWorkerPool.
703 static_assert(
704 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
705 static_cast<int>(CONTINUE_ON_SHUTDOWN),
706 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
707 "CONTINUE_ON_SHUTDOWN.");
708 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
709 static_cast<int>(SKIP_ON_SHUTDOWN),
710 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
711 "SKIP_ON_SHUTDOWN.");
712 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
713 static_cast<int>(BLOCK_SHUTDOWN),
714 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
715 "BLOCK_SHUTDOWN.");
675 716
676 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 717 const TaskShutdownBehavior task_shutdown_behavior =
718 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
719 const TaskTraits pool_traits =
720 TaskTraits()
721 .WithFileIO()
722 .WithPriority(task_priority_)
723 .WithShutdownBehavior(task_shutdown_behavior);
724
725 // Find or create the TaskScheduler TaskRunner to redirect this task to if
726 // it is posted to a specific sequence.
727 scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr;
728 if (sequenced.sequence_token_id) {
729 sequenced_task_runner =
730 &sequenced_task_runner_map_[sequenced.sequence_token_id];
731 if (!*sequenced_task_runner) {
732 const ExecutionMode execution_mode =
733 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
734 : ExecutionMode::SEQUENCED;
735 *sequenced_task_runner =
736 CreateTaskRunnerWithTraits(pool_traits, execution_mode);
737 }
738 }
739
740 if (sequenced_task_runner) {
741 (*sequenced_task_runner)
742 ->PostTask(sequenced.posted_from, sequenced.task);
743 } else {
744 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
745 // however a few pools that use only one thread and therefore can
746 // currently legitimatelly assuming thread affinity despite using
747 // SequencedWorkerPool. Such pools typically only give access to their
748 // TaskRunner which will be SINGLE_THREADED per nature of the pool
749 // having only one thread but this DCHECK ensures no such pools use
750 // SequencedWorkerPool::PostTask() directly.
751 DCHECK_GT(max_threads_, 1U);
752 base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
753 sequenced.task);
754 }
755 } else { // !REDIRECTED_TO_TASK_SCHEDULER
756 pending_tasks_.insert(sequenced);
757
758 if (shutdown_behavior == BLOCK_SHUTDOWN)
759 blocking_shutdown_pending_task_count_++;
760
761 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
762 }
677 } 763 }
678 764
679 // Actually start the additional thread or signal an existing one now that 765 // Some variables are exposed in both modes for convenience but only really
680 // we're outside the lock. 766 // intended for one of them at runtime, confirm exclusive usage here.
681 if (create_thread_id) 767 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
682 FinishStartingAdditionalThread(create_thread_id); 768 DCHECK(pending_tasks_.empty());
683 else 769 DCHECK(!create_thread_id);
684 SignalHasWork(); 770 } else {
771 DCHECK(sequenced_task_runner_map_.empty());
772 }
773
774 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
775 // Actually start the additional thread or signal an existing one now that
776 // we're outside the lock.
777 if (create_thread_id)
778 FinishStartingAdditionalThread(create_thread_id);
779 else
780 SignalHasWork();
781 }
685 782
686 return true; 783 return true;
687 } 784 }
688 785
689 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 786 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
690 AutoLock lock(lock_); 787 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
691 return ContainsKey(threads_, PlatformThread::CurrentId()); 788 if (!runs_tasks_on_verifier_) {
789 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
790 TaskTraits().WithFileIO().WithPriority(task_priority_),
791 ExecutionMode::PARALLEL);
792 }
793 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
794 } else {
795 AutoLock lock(lock_);
796 return ContainsKey(threads_, PlatformThread::CurrentId());
797 }
692 } 798 }
693 799
694 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 800 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
695 SequenceToken sequence_token) const { 801 SequenceToken sequence_token) const {
696 AutoLock lock(lock_); 802 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
697 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 803 // TODO(gab): This currently only verifies that the current thread is a
698 if (found == threads_.end()) 804 // thread on which a task bound to |sequence_token| *could* run, but it
699 return false; 805 // doesn't verify that the current is *currently running* a task bound to
700 return found->second->is_processing_task() && 806 // |sequence_token|.
701 sequence_token.Equals(found->second->task_sequence_token()); 807 const auto sequenced_task_runner_it =
808 sequenced_task_runner_map_.find(sequence_token.id_);
809 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
810 sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
811 } else {
812 AutoLock lock(lock_);
813 ThreadMap::const_iterator found =
814 threads_.find(PlatformThread::CurrentId());
815 if (found == threads_.end())
816 return false;
817 return found->second->is_processing_task() &&
818 sequence_token.Equals(found->second->task_sequence_token());
819 }
702 } 820 }
703 821
704 // See https://code.google.com/p/chromium/issues/detail?id=168415 822 // See https://code.google.com/p/chromium/issues/detail?id=168415
705 void SequencedWorkerPool::Inner::CleanupForTesting() { 823 void SequencedWorkerPool::Inner::CleanupForTesting() {
706 DCHECK(!RunsTasksOnCurrentThread()); 824 DCHECK(!RunsTasksOnCurrentThread());
707 base::ThreadRestrictions::ScopedAllowWait allow_wait; 825 base::ThreadRestrictions::ScopedAllowWait allow_wait;
708 AutoLock lock(lock_); 826 AutoLock lock(lock_);
709 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 827 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
710 if (shutdown_called_) 828 if (shutdown_called_)
711 return; 829 return;
(...skipping 13 matching lines...) Expand all
725 void SequencedWorkerPool::Inner::Shutdown( 843 void SequencedWorkerPool::Inner::Shutdown(
726 int max_new_blocking_tasks_after_shutdown) { 844 int max_new_blocking_tasks_after_shutdown) {
727 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 845 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
728 { 846 {
729 AutoLock lock(lock_); 847 AutoLock lock(lock_);
730 // Cleanup and Shutdown should not be called concurrently. 848 // Cleanup and Shutdown should not be called concurrently.
731 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 849 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
732 if (shutdown_called_) 850 if (shutdown_called_)
733 return; 851 return;
734 shutdown_called_ = true; 852 shutdown_called_ = true;
853
854 if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
brettw 2016/08/12 19:30:00 This should have a reference to a bug to remove it
gab 2016/08/12 23:27:46 Doesn't |g_all_pools_state| cover this?
855 return;
856
735 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 857 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
736 858
737 // Tickle the threads. This will wake up a waiting one so it will know that 859 // Tickle the threads. This will wake up a waiting one so it will know that
738 // it can exit, which in turn will wake up any other waiting ones. 860 // it can exit, which in turn will wake up any other waiting ones.
739 SignalHasWork(); 861 SignalHasWork();
740 862
741 // There are no pending or running tasks blocking shutdown, we're done. 863 // There are no pending or running tasks blocking shutdown, we're done.
742 if (CanShutdown()) 864 if (CanShutdown())
743 return; 865 return;
744 } 866 }
(...skipping 19 matching lines...) Expand all
764 TimeTicks::Now() - shutdown_wait_begin); 886 TimeTicks::Now() - shutdown_wait_begin);
765 #endif 887 #endif
766 } 888 }
767 889
768 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 890 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
769 AutoLock lock(lock_); 891 AutoLock lock(lock_);
770 return shutdown_called_; 892 return shutdown_called_;
771 } 893 }
772 894
773 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 895 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
896 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
774 { 897 {
775 AutoLock lock(lock_); 898 AutoLock lock(lock_);
776 DCHECK(thread_being_created_); 899 DCHECK(thread_being_created_);
777 thread_being_created_ = false; 900 thread_being_created_ = false;
778 auto result = threads_.insert( 901 auto result = threads_.insert(
779 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 902 std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
780 DCHECK(result.second); 903 DCHECK(result.second);
781 904
782 while (true) { 905 while (true) {
783 #if defined(OS_MACOSX) 906 #if defined(OS_MACOSX)
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
898 1021
899 // We noticed we should exit. Wake up the next worker so it knows it should 1022 // We noticed we should exit. Wake up the next worker so it knows it should
900 // exit as well (because the Shutdown() code only signals once). 1023 // exit as well (because the Shutdown() code only signals once).
901 SignalHasWork(); 1024 SignalHasWork();
902 1025
903 // Possibly unblock shutdown. 1026 // Possibly unblock shutdown.
904 can_shutdown_cv_.Signal(); 1027 can_shutdown_cv_.Signal();
905 } 1028 }
906 1029
907 void SequencedWorkerPool::Inner::HandleCleanup() { 1030 void SequencedWorkerPool::Inner::HandleCleanup() {
1031 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1032
908 lock_.AssertAcquired(); 1033 lock_.AssertAcquired();
909 if (cleanup_state_ == CLEANUP_DONE) 1034 if (cleanup_state_ == CLEANUP_DONE)
910 return; 1035 return;
911 if (cleanup_state_ == CLEANUP_REQUESTED) { 1036 if (cleanup_state_ == CLEANUP_REQUESTED) {
912 // We win, we get to do the cleanup as soon as the others wise up and idle. 1037 // We win, we get to do the cleanup as soon as the others wise up and idle.
913 cleanup_state_ = CLEANUP_STARTING; 1038 cleanup_state_ = CLEANUP_STARTING;
914 while (thread_being_created_ || 1039 while (thread_being_created_ ||
915 cleanup_idlers_ != threads_.size() - 1) { 1040 cleanup_idlers_ != threads_.size() - 1) {
916 has_work_cv_.Signal(); 1041 has_work_cv_.Signal();
917 cleanup_cv_.Wait(); 1042 cleanup_cv_.Wait();
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
963 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1088 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
964 lock_.AssertAcquired(); 1089 lock_.AssertAcquired();
965 // We assume that we never create enough tasks to wrap around. 1090 // We assume that we never create enough tasks to wrap around.
966 return next_sequence_task_number_++; 1091 return next_sequence_task_number_++;
967 } 1092 }
968 1093
969 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1094 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
970 SequencedTask* task, 1095 SequencedTask* task,
971 TimeDelta* wait_time, 1096 TimeDelta* wait_time,
972 std::vector<Closure>* delete_these_outside_lock) { 1097 std::vector<Closure>* delete_these_outside_lock) {
1098 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1099
973 lock_.AssertAcquired(); 1100 lock_.AssertAcquired();
974 1101
975 // Find the next task with a sequence token that's not currently in use. 1102 // Find the next task with a sequence token that's not currently in use.
976 // If the token is in use, that means another thread is running something 1103 // If the token is in use, that means another thread is running something
977 // in that sequence, and we can't run it without going out-of-order. 1104 // in that sequence, and we can't run it without going out-of-order.
978 // 1105 //
979 // This algorithm is simple and fair, but inefficient in some cases. For 1106 // This algorithm is simple and fair, but inefficient in some cases. For
980 // example, say somebody schedules 1000 slow tasks with the same sequence 1107 // example, say somebody schedules 1000 slow tasks with the same sequence
981 // number. We'll have to go through all those tasks each time we feel like 1108 // number. We'll have to go through all those tasks each time we feel like
982 // there might be work to schedule. If this proves to be a problem, we 1109 // there might be work to schedule. If this proves to be a problem, we
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
1049 } 1176 }
1050 1177
1051 status = GET_WORK_FOUND; 1178 status = GET_WORK_FOUND;
1052 break; 1179 break;
1053 } 1180 }
1054 1181
1055 return status; 1182 return status;
1056 } 1183 }
1057 1184
1058 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1185 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1186 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1187
1059 lock_.AssertAcquired(); 1188 lock_.AssertAcquired();
1060 1189
1061 // Mark the task's sequence number as in use. 1190 // Mark the task's sequence number as in use.
1062 if (task.sequence_token_id) 1191 if (task.sequence_token_id)
1063 current_sequences_.insert(task.sequence_token_id); 1192 current_sequences_.insert(task.sequence_token_id);
1064 1193
1065 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1194 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1066 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1195 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1067 // completes. 1196 // completes.
1068 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1197 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
(...skipping 11 matching lines...) Expand all
1080 // if there is one waiting to pick up the next task. 1209 // if there is one waiting to pick up the next task.
1081 // 1210 //
1082 // Note that we really need to do this *before* running the task, not 1211 // Note that we really need to do this *before* running the task, not
1083 // after. Otherwise, if more than one task is posted, the creation of the 1212 // after. Otherwise, if more than one task is posted, the creation of the
1084 // second thread (since we only create one at a time) will be blocked by 1213 // second thread (since we only create one at a time) will be blocked by
1085 // the execution of the first task, which could be arbitrarily long. 1214 // the execution of the first task, which could be arbitrarily long.
1086 return PrepareToStartAdditionalThreadIfHelpful(); 1215 return PrepareToStartAdditionalThreadIfHelpful();
1087 } 1216 }
1088 1217
1089 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1218 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1219 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1220
1090 lock_.AssertAcquired(); 1221 lock_.AssertAcquired();
1091 1222
1092 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1223 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1093 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1224 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1094 blocking_shutdown_thread_count_--; 1225 blocking_shutdown_thread_count_--;
1095 } 1226 }
1096 1227
1097 if (task.sequence_token_id) 1228 if (task.sequence_token_id)
1098 current_sequences_.erase(task.sequence_token_id); 1229 current_sequences_.erase(task.sequence_token_id);
1099 } 1230 }
1100 1231
1101 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1232 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1102 int sequence_token_id) const { 1233 int sequence_token_id) const {
1234 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1235
1103 lock_.AssertAcquired(); 1236 lock_.AssertAcquired();
1104 return !sequence_token_id || 1237 return !sequence_token_id ||
1105 current_sequences_.find(sequence_token_id) == 1238 current_sequences_.find(sequence_token_id) ==
1106 current_sequences_.end(); 1239 current_sequences_.end();
1107 } 1240 }
1108 1241
1109 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1242 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1243 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1244
1110 lock_.AssertAcquired(); 1245 lock_.AssertAcquired();
1111 // How thread creation works: 1246 // How thread creation works:
1112 // 1247 //
1113 // We'de like to avoid creating threads with the lock held. However, we 1248 // We'de like to avoid creating threads with the lock held. However, we
1114 // need to be sure that we have an accurate accounting of the threads for 1249 // need to be sure that we have an accurate accounting of the threads for
1115 // proper Joining and deltion on shutdown. 1250 // proper Joining and deltion on shutdown.
1116 // 1251 //
1117 // We need to figure out if we need another thread with the lock held, which 1252 // We need to figure out if we need another thread with the lock held, which
1118 // is what this function does. It then marks us as in the process of creating 1253 // is what this function does. It then marks us as in the process of creating
1119 // a thread. When we do shutdown, we wait until the thread_being_created_ 1254 // a thread. When we do shutdown, we wait until the thread_being_created_
(...skipping 30 matching lines...) Expand all
1150 thread_being_created_ = true; 1285 thread_being_created_ = true;
1151 return static_cast<int>(threads_.size() + 1); 1286 return static_cast<int>(threads_.size() + 1);
1152 } 1287 }
1153 } 1288 }
1154 } 1289 }
1155 return 0; 1290 return 0;
1156 } 1291 }
1157 1292
1158 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1293 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1159 int thread_number) { 1294 int thread_number) {
1295 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1296
1160 // Called outside of the lock. 1297 // Called outside of the lock.
1161 DCHECK_GT(thread_number, 0); 1298 DCHECK_GT(thread_number, 0);
1162 1299
1300 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
brettw 2016/08/12 19:30:00 This should have a reference to a bug to remove it
gab 2016/08/12 23:27:46 Doesn't |g_all_pools_state| cover this?
1301 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1302 g_all_pools_state = AllPoolsState::WORKER_CREATED;
1303 }
1304
1163 // The worker is assigned to the list when the thread actually starts, which 1305 // The worker is assigned to the list when the thread actually starts, which
1164 // will manage the memory of the pointer. 1306 // will manage the memory of the pointer.
1165 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1307 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1166 } 1308 }
1167 1309
1168 void SequencedWorkerPool::Inner::SignalHasWork() { 1310 void SequencedWorkerPool::Inner::SignalHasWork() {
1311 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1312
1169 has_work_cv_.Signal(); 1313 has_work_cv_.Signal();
1170 if (testing_observer_) { 1314 if (testing_observer_) {
1171 testing_observer_->OnHasWork(); 1315 testing_observer_->OnHasWork();
1172 } 1316 }
1173 } 1317 }
1174 1318
1175 bool SequencedWorkerPool::Inner::CanShutdown() const { 1319 bool SequencedWorkerPool::Inner::CanShutdown() const {
1320 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1176 lock_.AssertAcquired(); 1321 lock_.AssertAcquired();
1177 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1322 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1178 return !thread_being_created_ && 1323 return !thread_being_created_ &&
1179 blocking_shutdown_thread_count_ == 0 && 1324 blocking_shutdown_thread_count_ == 0 &&
1180 blocking_shutdown_pending_task_count_ == 0; 1325 blocking_shutdown_pending_task_count_ == 0;
1181 } 1326 }
1182 1327
1183 base::StaticAtomicSequenceNumber 1328 base::StaticAtomicSequenceNumber
1184 SequencedWorkerPool::Inner::g_last_sequence_number_; 1329 SequencedWorkerPool::Inner::g_last_sequence_number_;
1185 1330
(...skipping 16 matching lines...) Expand all
1202 // static 1347 // static
1203 scoped_refptr<SequencedWorkerPool> 1348 scoped_refptr<SequencedWorkerPool>
1204 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1349 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1205 Worker* worker = Worker::GetForCurrentThread(); 1350 Worker* worker = Worker::GetForCurrentThread();
1206 if (!worker) 1351 if (!worker)
1207 return nullptr; 1352 return nullptr;
1208 1353
1209 return worker->worker_pool(); 1354 return worker->worker_pool();
1210 } 1355 }
1211 1356
1357 // static
1358 void SequencedWorkerPool::
1359 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
1360 DCHECK(TaskScheduler::GetInstance());
1361 // Hitting this DCHECK indicates that a task was posted to a
1362 // SequencedWorkerPool before the TaskScheduler was initialized and
1363 // redirected, posting task to SequencedWorkerPools needs to at least be
1364 // delayed until after that point.
1365 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1366 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
1367 }
1368
1212 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1369 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1213 const std::string& thread_name_prefix, 1370 const std::string& thread_name_prefix,
1214 base::TaskPriority task_priority) 1371 base::TaskPriority task_priority)
1215 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1372 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1216 inner_(new Inner(this, 1373 inner_(new Inner(this,
1217 max_threads, 1374 max_threads,
1218 thread_name_prefix, 1375 thread_name_prefix,
1219 task_priority, 1376 task_priority,
1220 NULL)) {} 1377 NULL)) {}
1221 1378
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
1364 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1521 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1365 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1522 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1366 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1523 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1367 } 1524 }
1368 1525
1369 bool SequencedWorkerPool::IsShutdownInProgress() { 1526 bool SequencedWorkerPool::IsShutdownInProgress() {
1370 return inner_->IsShutdownInProgress(); 1527 return inner_->IsShutdownInProgress();
1371 } 1528 }
1372 1529
1373 } // namespace base 1530 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698