Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2241703002: Add an experiment to redirect SequencedWorkerPool tasks to TaskScheduler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b3_delay_metrics_blocking
Patch Set: Split PostTask into helpers and add bug comments. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <utility> 13 #include <utility>
14 #include <vector> 14 #include <vector>
15 15
16 #include "base/atomic_sequence_num.h" 16 #include "base/atomic_sequence_num.h"
17 #include "base/callback.h" 17 #include "base/callback.h"
18 #include "base/compiler_specific.h" 18 #include "base/compiler_specific.h"
19 #include "base/critical_closure.h" 19 #include "base/critical_closure.h"
20 #include "base/lazy_instance.h" 20 #include "base/lazy_instance.h"
21 #include "base/logging.h" 21 #include "base/logging.h"
22 #include "base/macros.h" 22 #include "base/macros.h"
23 #include "base/memory/ptr_util.h" 23 #include "base/memory/ptr_util.h"
24 #include "base/stl_util.h" 24 #include "base/stl_util.h"
25 #include "base/strings/stringprintf.h" 25 #include "base/strings/stringprintf.h"
26 #include "base/synchronization/condition_variable.h" 26 #include "base/synchronization/condition_variable.h"
27 #include "base/synchronization/lock.h" 27 #include "base/synchronization/lock.h"
28 #include "base/task_scheduler/post_task.h"
29 #include "base/task_scheduler/task_scheduler.h"
28 #include "base/threading/platform_thread.h" 30 #include "base/threading/platform_thread.h"
29 #include "base/threading/simple_thread.h" 31 #include "base/threading/simple_thread.h"
30 #include "base/threading/thread_local.h" 32 #include "base/threading/thread_local.h"
31 #include "base/threading/thread_restrictions.h" 33 #include "base/threading/thread_restrictions.h"
32 #include "base/threading/thread_task_runner_handle.h" 34 #include "base/threading/thread_task_runner_handle.h"
33 #include "base/time/time.h" 35 #include "base/time/time.h"
34 #include "base/trace_event/heap_profiler.h" 36 #include "base/trace_event/heap_profiler.h"
35 #include "base/trace_event/trace_event.h" 37 #include "base/trace_event/trace_event.h"
36 #include "base/tracked_objects.h" 38 #include "base/tracked_objects.h"
37 #include "build/build_config.h" 39 #include "build/build_config.h"
38 40
39 #if defined(OS_MACOSX) 41 #if defined(OS_MACOSX)
40 #include "base/mac/scoped_nsautorelease_pool.h" 42 #include "base/mac/scoped_nsautorelease_pool.h"
41 #elif defined(OS_WIN) 43 #elif defined(OS_WIN)
42 #include "base/win/scoped_com_initializer.h" 44 #include "base/win/scoped_com_initializer.h"
43 #endif 45 #endif
44 46
45 #if !defined(OS_NACL) 47 #if !defined(OS_NACL)
46 #include "base/metrics/histogram.h" 48 #include "base/metrics/histogram.h"
47 #endif 49 #endif
48 50
49 namespace base { 51 namespace base {
50 52
51 namespace { 53 namespace {
52 54
55 // An enum representing the state of all pools. Any given process should only
56 // ever transition from NONE_ACTIVE to the active states, transitions between
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created
60 // because the first task was posted and the state is still NONE_ACTIVE.
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
62 // will be phased out completely otherwise).
63 enum class AllPoolsState {
64 NONE_ACTIVE,
65 WORKER_CREATED,
66 REDIRECTED_TO_TASK_SCHEDULER,
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE;
68
53 struct SequencedTask : public TrackingInfo { 69 struct SequencedTask : public TrackingInfo {
54 SequencedTask() 70 SequencedTask()
55 : sequence_token_id(0), 71 : sequence_token_id(0),
56 trace_id(0), 72 trace_id(0),
57 sequence_task_number(0), 73 sequence_task_number(0),
58 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 74 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
59 75
60 explicit SequencedTask(const tracked_objects::Location& from_here) 76 explicit SequencedTask(const tracked_objects::Location& from_here)
61 : base::TrackingInfo(from_here, TimeTicks()), 77 : base::TrackingInfo(from_here, TimeTicks()),
62 sequence_token_id(0), 78 sequence_token_id(0),
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 }; 363 };
348 364
349 enum CleanupState { 365 enum CleanupState {
350 CLEANUP_REQUESTED, 366 CLEANUP_REQUESTED,
351 CLEANUP_STARTING, 367 CLEANUP_STARTING,
352 CLEANUP_RUNNING, 368 CLEANUP_RUNNING,
353 CLEANUP_FINISHING, 369 CLEANUP_FINISHING,
354 CLEANUP_DONE, 370 CLEANUP_DONE,
355 }; 371 };
356 372
373 // Helpers used by PostTask() to complete the work depending on whether
374 // redirection is on. Coalesce upon resolution of http://crbug.com/622400.
375 void PostTaskToTaskScheduler(const SequencedTask& sequenced);
376 void PostTaskToPool(const SequencedTask& sequenced);
377
357 // Called from within the lock, this converts the given token name into a 378 // Called from within the lock, this converts the given token name into a
358 // token ID, creating a new one if necessary. 379 // token ID, creating a new one if necessary.
359 int LockedGetNamedTokenID(const std::string& name); 380 int LockedGetNamedTokenID(const std::string& name);
360 381
361 // Called from within the lock, this returns the next sequence task number. 382 // Called from within the lock, this returns the next sequence task number.
362 int64_t LockedGetNextSequenceTaskNumber(); 383 int64_t LockedGetNextSequenceTaskNumber();
363 384
364 // Gets new task. There are 3 cases depending on the return value: 385 // Gets new task. There are 3 cases depending on the return value:
365 // 386 //
366 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 387 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
495 // has been called. 516 // has been called.
496 int max_blocking_tasks_after_shutdown_; 517 int max_blocking_tasks_after_shutdown_;
497 518
498 // State used to cleanup for testing, all guarded by lock_. 519 // State used to cleanup for testing, all guarded by lock_.
499 CleanupState cleanup_state_; 520 CleanupState cleanup_state_;
500 size_t cleanup_idlers_; 521 size_t cleanup_idlers_;
501 ConditionVariable cleanup_cv_; 522 ConditionVariable cleanup_cv_;
502 523
503 TestingObserver* const testing_observer_; 524 TestingObserver* const testing_observer_;
504 525
526 // Members below are used for the experimental redirection to TaskScheduler.
527 // TODO(gab): Remove these if http://crbug.com/622400 fails
528 // (SequencedWorkerPool will be phased out completely otherwise).
529
505 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the 530 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
506 // TaskScheduler as an experiment (unused otherwise). 531 // TaskScheduler as an experiment (unused otherwise).
507 const base::TaskPriority task_priority_; 532 const base::TaskPriority task_priority_;
508 533
534 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
535 // SequencedWorkerPool usage to the TaskScheduler.
536 std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
537
538 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
539 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
540 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
541 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
542
509 DISALLOW_COPY_AND_ASSIGN(Inner); 543 DISALLOW_COPY_AND_ASSIGN(Inner);
510 }; 544 };
511 545
512 // Worker definitions --------------------------------------------------------- 546 // Worker definitions ---------------------------------------------------------
513 547
514 SequencedWorkerPool::Worker::Worker( 548 SequencedWorkerPool::Worker::Worker(
515 scoped_refptr<SequencedWorkerPool> worker_pool, 549 scoped_refptr<SequencedWorkerPool> worker_pool,
516 int thread_number, 550 int thread_number,
517 const std::string& prefix) 551 const std::string& prefix)
518 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 552 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
519 worker_pool_(std::move(worker_pool)), 553 worker_pool_(std::move(worker_pool)),
520 task_shutdown_behavior_(BLOCK_SHUTDOWN), 554 task_shutdown_behavior_(BLOCK_SHUTDOWN),
521 is_processing_task_(false) { 555 is_processing_task_(false) {
556 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
522 Start(); 557 Start();
523 } 558 }
524 559
525 SequencedWorkerPool::Worker::~Worker() { 560 SequencedWorkerPool::Worker::~Worker() {
526 } 561 }
527 562
528 void SequencedWorkerPool::Worker::Run() { 563 void SequencedWorkerPool::Worker::Run() {
564 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
danakj 2016/08/15 21:02:52 Run is happening before RedirectSequencedWorkerPoo
gab 2016/08/15 21:32:44 No, this documents that Run() should never happen
565
529 #if defined(OS_WIN) 566 #if defined(OS_WIN)
530 win::ScopedCOMInitializer com_initializer; 567 win::ScopedCOMInitializer com_initializer;
531 #endif 568 #endif
532 569
533 // Store a pointer to this worker in thread local storage for static function 570 // Store a pointer to this worker in thread local storage for static function
534 // access. 571 // access.
535 DCHECK(!lazy_tls_ptr_.Get().Get()); 572 DCHECK(!lazy_tls_ptr_.Get().Get());
536 lazy_tls_ptr_.Get().Set(this); 573 lazy_tls_ptr_.Get().Set(this);
537 574
538 // Just jump back to the Inner object to run the thread, since it has all the 575 // Just jump back to the Inner object to run the thread, since it has all the
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
624 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 661 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
625 SequencedTask sequenced(from_here); 662 SequencedTask sequenced(from_here);
626 sequenced.sequence_token_id = sequence_token.id_; 663 sequenced.sequence_token_id = sequence_token.id_;
627 sequenced.shutdown_behavior = shutdown_behavior; 664 sequenced.shutdown_behavior = shutdown_behavior;
628 sequenced.posted_from = from_here; 665 sequenced.posted_from = from_here;
629 sequenced.task = 666 sequenced.task =
630 shutdown_behavior == BLOCK_SHUTDOWN ? 667 shutdown_behavior == BLOCK_SHUTDOWN ?
631 base::MakeCriticalClosure(task) : task; 668 base::MakeCriticalClosure(task) : task;
632 sequenced.time_to_run = TimeTicks::Now() + delay; 669 sequenced.time_to_run = TimeTicks::Now() + delay;
633 670
634 int create_thread_id = 0; 671 AutoLock lock(lock_);
635 {
636 AutoLock lock(lock_);
637 if (shutdown_called_) {
638 // Don't allow a new task to be posted if it doesn't block shutdown.
639 if (shutdown_behavior != BLOCK_SHUTDOWN)
640 return false;
641 672
642 // If the current thread is running a task, and that task doesn't block 673 if (shutdown_called_) {
643 // shutdown, then it shouldn't be allowed to post any more tasks. 674 // Don't allow a new task to be posted if it doesn't block shutdown.
644 ThreadMap::const_iterator found = 675 if (shutdown_behavior != BLOCK_SHUTDOWN)
645 threads_.find(PlatformThread::CurrentId()); 676 return false;
646 if (found != threads_.end() && found->second->is_processing_task() &&
647 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
648 return false;
649 }
650 677
651 if (max_blocking_tasks_after_shutdown_ <= 0) { 678 // If the current thread is running a task, and that task doesn't block
652 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 679 // shutdown, then it shouldn't be allowed to post any more tasks.
653 return false; 680 ThreadMap::const_iterator found =
654 } 681 threads_.find(PlatformThread::CurrentId());
655 max_blocking_tasks_after_shutdown_ -= 1; 682 if (found != threads_.end() && found->second->is_processing_task() &&
683 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
684 return false;
656 } 685 }
657 686
658 // The trace_id is used for identifying the task in about:tracing. 687 if (max_blocking_tasks_after_shutdown_ <= 0) {
659 sequenced.trace_id = trace_id_++; 688 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
660 689 return false;
661 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 690 }
662 "SequencedWorkerPool::Inner::PostTask", 691 max_blocking_tasks_after_shutdown_ -= 1;
663 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
664 TRACE_EVENT_FLAG_FLOW_OUT);
665
666 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
667
668 // Now that we have the lock, apply the named token rules.
669 if (optional_token_name)
670 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
671
672 pending_tasks_.insert(sequenced);
673 if (shutdown_behavior == BLOCK_SHUTDOWN)
674 blocking_shutdown_pending_task_count_++;
675
676 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
677 } 692 }
678 693
679 // Actually start the additional thread or signal an existing one now that 694 // The trace_id is used for identifying the task in about:tracing.
680 // we're outside the lock. 695 sequenced.trace_id = trace_id_++;
681 if (create_thread_id) 696
682 FinishStartingAdditionalThread(create_thread_id); 697 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
698 "SequencedWorkerPool::Inner::PostTask",
699 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
700 TRACE_EVENT_FLAG_FLOW_OUT);
701
702 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
703
704 // Now that we have the lock, apply the named token rules.
705 if (optional_token_name)
706 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
707
708 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
709 PostTaskToTaskScheduler(sequenced);
710 } else {
711 PostTaskToPool(sequenced);
712 }
713
714 // Some variables are exposed in both modes for convenience but only really
715 // intended for one of them at runtime, confirm exclusive usage here.
716 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER)
717 DCHECK(pending_tasks_.empty());
683 else 718 else
684 SignalHasWork(); 719 DCHECK(sequenced_task_runner_map_.empty());
685 720
686 return true; 721 return true;
687 } 722 }
688 723
724 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
725 const SequencedTask& sequenced) {
726 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
727
728 lock_.AssertAcquired();
729
730 // Confirm that the TaskScheduler's shutdown behaviors use the same
731 // underlying values as SequencedWorkerPool.
732 static_assert(
733 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
734 static_cast<int>(CONTINUE_ON_SHUTDOWN),
735 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
736 "CONTINUE_ON_SHUTDOWN.");
737 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
738 static_cast<int>(SKIP_ON_SHUTDOWN),
739 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
740 "SKIP_ON_SHUTDOWN.");
741 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
742 static_cast<int>(BLOCK_SHUTDOWN),
743 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
744 "BLOCK_SHUTDOWN.");
745
746 const TaskShutdownBehavior task_shutdown_behavior =
747 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
748 const TaskTraits pool_traits =
749 TaskTraits()
750 .WithFileIO()
751 .WithPriority(task_priority_)
752 .WithShutdownBehavior(task_shutdown_behavior);
753
754 // Find or create the TaskScheduler TaskRunner to redirect this task to if
755 // it is posted to a specific sequence.
756 scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr;
757 if (sequenced.sequence_token_id) {
758 sequenced_task_runner =
759 &sequenced_task_runner_map_[sequenced.sequence_token_id];
760 if (!*sequenced_task_runner) {
761 const ExecutionMode execution_mode =
762 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
763 : ExecutionMode::SEQUENCED;
764 *sequenced_task_runner =
765 CreateTaskRunnerWithTraits(pool_traits, execution_mode);
766 }
767 }
768
769 if (sequenced_task_runner) {
770 (*sequenced_task_runner)
771 ->PostTask(sequenced.posted_from, sequenced.task);
772 } else {
773 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
774 // however a few pools that use only one thread and therefore can
danakj 2016/08/15 21:02:52 nit: "few pools" extra space
gab 2016/08/15 21:32:44 Done.
775 // currently legitimatelly assuming thread affinity despite using
danakj 2016/08/15 21:02:52 "legitimately assume"
gab 2016/08/15 21:32:44 Done.
776 // SequencedWorkerPool. Such pools typically only give access to their
777 // TaskRunner which will be SINGLE_THREADED per nature of the pool
778 // having only one thread but this DCHECK ensures no such pools use
779 // SequencedWorkerPool::PostTask() directly.
780 DCHECK_GT(max_threads_, 1U);
781 base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
782 sequenced.task);
783 }
784 }
785
786 void SequencedWorkerPool::Inner::PostTaskToPool(
787 const SequencedTask& sequenced) {
788 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
789
790 lock_.AssertAcquired();
791
792 pending_tasks_.insert(sequenced);
793
794 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
795 blocking_shutdown_pending_task_count_++;
796
797 int create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
798
799 {
800 AutoUnlock unlock(lock_);
brettw 2016/08/15 21:02:21 Sorry to add another pass here. I'm really relucta
danakj 2016/08/15 21:02:52 hm, this will relock the lock when you leave the b
gab 2016/08/15 21:32:44 Re-inlined PostTaskToPool() -- as without this sec
gab 2016/08/15 21:32:44 N/A after re-inlining PostTaskToPool per Brett's c
801
802 // Actually start the additional thread or signal an existing one outside
803 // the lock.
804 if (create_thread_id)
805 FinishStartingAdditionalThread(create_thread_id);
806 else
807 SignalHasWork();
808 }
809 }
810
689 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 811 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
690 AutoLock lock(lock_); 812 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
691 return ContainsKey(threads_, PlatformThread::CurrentId()); 813 if (!runs_tasks_on_verifier_) {
814 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
815 TaskTraits().WithFileIO().WithPriority(task_priority_),
816 ExecutionMode::PARALLEL);
817 }
818 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
819 } else {
820 AutoLock lock(lock_);
821 return ContainsKey(threads_, PlatformThread::CurrentId());
822 }
692 } 823 }
693 824
694 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 825 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
695 SequenceToken sequence_token) const { 826 SequenceToken sequence_token) const {
696 AutoLock lock(lock_); 827 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
697 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 828 // TODO(gab): This currently only verifies that the current thread is a
698 if (found == threads_.end()) 829 // thread on which a task bound to |sequence_token| *could* run, but it
699 return false; 830 // doesn't verify that the current is *currently running* a task bound to
700 return found->second->is_processing_task() && 831 // |sequence_token|.
701 sequence_token.Equals(found->second->task_sequence_token()); 832 const auto sequenced_task_runner_it =
833 sequenced_task_runner_map_.find(sequence_token.id_);
834 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
835 sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
836 } else {
837 AutoLock lock(lock_);
838 ThreadMap::const_iterator found =
839 threads_.find(PlatformThread::CurrentId());
840 if (found == threads_.end())
841 return false;
842 return found->second->is_processing_task() &&
843 sequence_token.Equals(found->second->task_sequence_token());
844 }
702 } 845 }
703 846
704 // See https://code.google.com/p/chromium/issues/detail?id=168415 847 // See https://code.google.com/p/chromium/issues/detail?id=168415
705 void SequencedWorkerPool::Inner::CleanupForTesting() { 848 void SequencedWorkerPool::Inner::CleanupForTesting() {
706 DCHECK(!RunsTasksOnCurrentThread()); 849 DCHECK(!RunsTasksOnCurrentThread());
707 base::ThreadRestrictions::ScopedAllowWait allow_wait; 850 base::ThreadRestrictions::ScopedAllowWait allow_wait;
708 AutoLock lock(lock_); 851 AutoLock lock(lock_);
709 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 852 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
710 if (shutdown_called_) 853 if (shutdown_called_)
711 return; 854 return;
(...skipping 13 matching lines...) Expand all
725 void SequencedWorkerPool::Inner::Shutdown( 868 void SequencedWorkerPool::Inner::Shutdown(
726 int max_new_blocking_tasks_after_shutdown) { 869 int max_new_blocking_tasks_after_shutdown) {
727 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 870 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
728 { 871 {
729 AutoLock lock(lock_); 872 AutoLock lock(lock_);
730 // Cleanup and Shutdown should not be called concurrently. 873 // Cleanup and Shutdown should not be called concurrently.
731 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 874 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
732 if (shutdown_called_) 875 if (shutdown_called_)
733 return; 876 return;
734 shutdown_called_ = true; 877 shutdown_called_ = true;
878
879 if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
880 return;
881
735 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 882 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
736 883
737 // Tickle the threads. This will wake up a waiting one so it will know that 884 // Tickle the threads. This will wake up a waiting one so it will know that
738 // it can exit, which in turn will wake up any other waiting ones. 885 // it can exit, which in turn will wake up any other waiting ones.
739 SignalHasWork(); 886 SignalHasWork();
740 887
741 // There are no pending or running tasks blocking shutdown, we're done. 888 // There are no pending or running tasks blocking shutdown, we're done.
742 if (CanShutdown()) 889 if (CanShutdown())
743 return; 890 return;
744 } 891 }
(...skipping 19 matching lines...) Expand all
764 TimeTicks::Now() - shutdown_wait_begin); 911 TimeTicks::Now() - shutdown_wait_begin);
765 #endif 912 #endif
766 } 913 }
767 914
768 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 915 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
769 AutoLock lock(lock_); 916 AutoLock lock(lock_);
770 return shutdown_called_; 917 return shutdown_called_;
771 } 918 }
772 919
773 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 920 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
921 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
danakj 2016/08/15 21:02:52 Why are some of these != REDIRECTED instead of ==
gab 2016/08/15 21:32:44 We never explicitly set POOL (a.k.a. WORKER_CREATE
danakj 2016/08/15 22:05:49 Hm ok. I would personally prefer == checks wheneve
gab 2016/08/20 15:21:31 Unless the first Worker is created while only the
774 { 922 {
775 AutoLock lock(lock_); 923 AutoLock lock(lock_);
776 DCHECK(thread_being_created_); 924 DCHECK(thread_being_created_);
777 thread_being_created_ = false; 925 thread_being_created_ = false;
778 auto result = threads_.insert( 926 auto result = threads_.insert(
779 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 927 std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
780 DCHECK(result.second); 928 DCHECK(result.second);
781 929
782 while (true) { 930 while (true) {
783 #if defined(OS_MACOSX) 931 #if defined(OS_MACOSX)
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
898 1046
899 // We noticed we should exit. Wake up the next worker so it knows it should 1047 // We noticed we should exit. Wake up the next worker so it knows it should
900 // exit as well (because the Shutdown() code only signals once). 1048 // exit as well (because the Shutdown() code only signals once).
901 SignalHasWork(); 1049 SignalHasWork();
902 1050
903 // Possibly unblock shutdown. 1051 // Possibly unblock shutdown.
904 can_shutdown_cv_.Signal(); 1052 can_shutdown_cv_.Signal();
905 } 1053 }
906 1054
907 void SequencedWorkerPool::Inner::HandleCleanup() { 1055 void SequencedWorkerPool::Inner::HandleCleanup() {
1056 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1057
908 lock_.AssertAcquired(); 1058 lock_.AssertAcquired();
909 if (cleanup_state_ == CLEANUP_DONE) 1059 if (cleanup_state_ == CLEANUP_DONE)
910 return; 1060 return;
911 if (cleanup_state_ == CLEANUP_REQUESTED) { 1061 if (cleanup_state_ == CLEANUP_REQUESTED) {
912 // We win, we get to do the cleanup as soon as the others wise up and idle. 1062 // We win, we get to do the cleanup as soon as the others wise up and idle.
913 cleanup_state_ = CLEANUP_STARTING; 1063 cleanup_state_ = CLEANUP_STARTING;
914 while (thread_being_created_ || 1064 while (thread_being_created_ ||
915 cleanup_idlers_ != threads_.size() - 1) { 1065 cleanup_idlers_ != threads_.size() - 1) {
916 has_work_cv_.Signal(); 1066 has_work_cv_.Signal();
917 cleanup_cv_.Wait(); 1067 cleanup_cv_.Wait();
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
963 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1113 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
964 lock_.AssertAcquired(); 1114 lock_.AssertAcquired();
965 // We assume that we never create enough tasks to wrap around. 1115 // We assume that we never create enough tasks to wrap around.
966 return next_sequence_task_number_++; 1116 return next_sequence_task_number_++;
967 } 1117 }
968 1118
969 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1119 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
970 SequencedTask* task, 1120 SequencedTask* task,
971 TimeDelta* wait_time, 1121 TimeDelta* wait_time,
972 std::vector<Closure>* delete_these_outside_lock) { 1122 std::vector<Closure>* delete_these_outside_lock) {
1123 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1124
973 lock_.AssertAcquired(); 1125 lock_.AssertAcquired();
974 1126
975 // Find the next task with a sequence token that's not currently in use. 1127 // Find the next task with a sequence token that's not currently in use.
976 // If the token is in use, that means another thread is running something 1128 // If the token is in use, that means another thread is running something
977 // in that sequence, and we can't run it without going out-of-order. 1129 // in that sequence, and we can't run it without going out-of-order.
978 // 1130 //
979 // This algorithm is simple and fair, but inefficient in some cases. For 1131 // This algorithm is simple and fair, but inefficient in some cases. For
980 // example, say somebody schedules 1000 slow tasks with the same sequence 1132 // example, say somebody schedules 1000 slow tasks with the same sequence
981 // number. We'll have to go through all those tasks each time we feel like 1133 // number. We'll have to go through all those tasks each time we feel like
982 // there might be work to schedule. If this proves to be a problem, we 1134 // there might be work to schedule. If this proves to be a problem, we
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
1049 } 1201 }
1050 1202
1051 status = GET_WORK_FOUND; 1203 status = GET_WORK_FOUND;
1052 break; 1204 break;
1053 } 1205 }
1054 1206
1055 return status; 1207 return status;
1056 } 1208 }
1057 1209
1058 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1210 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1211 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1212
1059 lock_.AssertAcquired(); 1213 lock_.AssertAcquired();
1060 1214
1061 // Mark the task's sequence number as in use. 1215 // Mark the task's sequence number as in use.
1062 if (task.sequence_token_id) 1216 if (task.sequence_token_id)
1063 current_sequences_.insert(task.sequence_token_id); 1217 current_sequences_.insert(task.sequence_token_id);
1064 1218
1065 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1219 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1066 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1220 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1067 // completes. 1221 // completes.
1068 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1222 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
(...skipping 11 matching lines...) Expand all
1080 // if there is one waiting to pick up the next task. 1234 // if there is one waiting to pick up the next task.
1081 // 1235 //
1082 // Note that we really need to do this *before* running the task, not 1236 // Note that we really need to do this *before* running the task, not
1083 // after. Otherwise, if more than one task is posted, the creation of the 1237 // after. Otherwise, if more than one task is posted, the creation of the
1084 // second thread (since we only create one at a time) will be blocked by 1238 // second thread (since we only create one at a time) will be blocked by
1085 // the execution of the first task, which could be arbitrarily long. 1239 // the execution of the first task, which could be arbitrarily long.
1086 return PrepareToStartAdditionalThreadIfHelpful(); 1240 return PrepareToStartAdditionalThreadIfHelpful();
1087 } 1241 }
1088 1242
1089 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1243 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1244 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1245
1090 lock_.AssertAcquired(); 1246 lock_.AssertAcquired();
1091 1247
1092 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1248 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1093 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1249 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1094 blocking_shutdown_thread_count_--; 1250 blocking_shutdown_thread_count_--;
1095 } 1251 }
1096 1252
1097 if (task.sequence_token_id) 1253 if (task.sequence_token_id)
1098 current_sequences_.erase(task.sequence_token_id); 1254 current_sequences_.erase(task.sequence_token_id);
1099 } 1255 }
1100 1256
1101 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1257 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1102 int sequence_token_id) const { 1258 int sequence_token_id) const {
1259 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1260
1103 lock_.AssertAcquired(); 1261 lock_.AssertAcquired();
1104 return !sequence_token_id || 1262 return !sequence_token_id ||
1105 current_sequences_.find(sequence_token_id) == 1263 current_sequences_.find(sequence_token_id) ==
1106 current_sequences_.end(); 1264 current_sequences_.end();
1107 } 1265 }
1108 1266
1109 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1267 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1268 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1269
1110 lock_.AssertAcquired(); 1270 lock_.AssertAcquired();
1111 // How thread creation works: 1271 // How thread creation works:
1112 // 1272 //
1113 // We'de like to avoid creating threads with the lock held. However, we 1273 // We'de like to avoid creating threads with the lock held. However, we
1114 // need to be sure that we have an accurate accounting of the threads for 1274 // need to be sure that we have an accurate accounting of the threads for
1115 // proper Joining and deltion on shutdown. 1275 // proper Joining and deltion on shutdown.
1116 // 1276 //
1117 // We need to figure out if we need another thread with the lock held, which 1277 // We need to figure out if we need another thread with the lock held, which
1118 // is what this function does. It then marks us as in the process of creating 1278 // is what this function does. It then marks us as in the process of creating
1119 // a thread. When we do shutdown, we wait until the thread_being_created_ 1279 // a thread. When we do shutdown, we wait until the thread_being_created_
(...skipping 30 matching lines...) Expand all
1150 thread_being_created_ = true; 1310 thread_being_created_ = true;
1151 return static_cast<int>(threads_.size() + 1); 1311 return static_cast<int>(threads_.size() + 1);
1152 } 1312 }
1153 } 1313 }
1154 } 1314 }
1155 return 0; 1315 return 0;
1156 } 1316 }
1157 1317
1158 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1318 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1159 int thread_number) { 1319 int thread_number) {
1320 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1321
1160 // Called outside of the lock. 1322 // Called outside of the lock.
1161 DCHECK_GT(thread_number, 0); 1323 DCHECK_GT(thread_number, 0);
1162 1324
1325 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
1326 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1327 g_all_pools_state = AllPoolsState::WORKER_CREATED;
1328 }
1329
1163 // The worker is assigned to the list when the thread actually starts, which 1330 // The worker is assigned to the list when the thread actually starts, which
1164 // will manage the memory of the pointer. 1331 // will manage the memory of the pointer.
1165 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1332 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1166 } 1333 }
1167 1334
1168 void SequencedWorkerPool::Inner::SignalHasWork() { 1335 void SequencedWorkerPool::Inner::SignalHasWork() {
1336 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1337
1169 has_work_cv_.Signal(); 1338 has_work_cv_.Signal();
1170 if (testing_observer_) { 1339 if (testing_observer_) {
1171 testing_observer_->OnHasWork(); 1340 testing_observer_->OnHasWork();
1172 } 1341 }
1173 } 1342 }
1174 1343
1175 bool SequencedWorkerPool::Inner::CanShutdown() const { 1344 bool SequencedWorkerPool::Inner::CanShutdown() const {
1345 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1176 lock_.AssertAcquired(); 1346 lock_.AssertAcquired();
1177 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1347 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1178 return !thread_being_created_ && 1348 return !thread_being_created_ &&
1179 blocking_shutdown_thread_count_ == 0 && 1349 blocking_shutdown_thread_count_ == 0 &&
1180 blocking_shutdown_pending_task_count_ == 0; 1350 blocking_shutdown_pending_task_count_ == 0;
1181 } 1351 }
1182 1352
1183 base::StaticAtomicSequenceNumber 1353 base::StaticAtomicSequenceNumber
1184 SequencedWorkerPool::Inner::g_last_sequence_number_; 1354 SequencedWorkerPool::Inner::g_last_sequence_number_;
1185 1355
(...skipping 16 matching lines...) Expand all
1202 // static 1372 // static
1203 scoped_refptr<SequencedWorkerPool> 1373 scoped_refptr<SequencedWorkerPool>
1204 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1374 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1205 Worker* worker = Worker::GetForCurrentThread(); 1375 Worker* worker = Worker::GetForCurrentThread();
1206 if (!worker) 1376 if (!worker)
1207 return nullptr; 1377 return nullptr;
1208 1378
1209 return worker->worker_pool(); 1379 return worker->worker_pool();
1210 } 1380 }
1211 1381
1382 // static
1383 void SequencedWorkerPool::
1384 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
1385 DCHECK(TaskScheduler::GetInstance());
1386 // Hitting this DCHECK indicates that a task was posted to a
1387 // SequencedWorkerPool before the TaskScheduler was initialized and
1388 // redirected, posting task to SequencedWorkerPools needs to at least be
1389 // delayed until after that point.
1390 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1391 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
1392 }
1393
1212 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1394 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1213 const std::string& thread_name_prefix, 1395 const std::string& thread_name_prefix,
1214 base::TaskPriority task_priority) 1396 base::TaskPriority task_priority)
1215 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1397 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1216 inner_(new Inner(this, 1398 inner_(new Inner(this,
1217 max_threads, 1399 max_threads,
1218 thread_name_prefix, 1400 thread_name_prefix,
1219 task_priority, 1401 task_priority,
1220 NULL)) {} 1402 NULL)) {}
1221 1403
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
1364 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1546 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1365 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1547 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1366 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1548 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1367 } 1549 }
1368 1550
1369 bool SequencedWorkerPool::IsShutdownInProgress() { 1551 bool SequencedWorkerPool::IsShutdownInProgress() {
1370 return inner_->IsShutdownInProgress(); 1552 return inner_->IsShutdownInProgress();
1371 } 1553 }
1372 1554
1373 } // namespace base 1555 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698