Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(26)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 2241703002: Add an experiment to redirect SequencedWorkerPool tasks to TaskScheduler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@b3_delay_metrics_blocking
Patch Set: self-review Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <list> 9 #include <list>
10 #include <map> 10 #include <map>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <utility> 13 #include <utility>
14 #include <vector> 14 #include <vector>
15 15
16 #include "base/atomic_sequence_num.h" 16 #include "base/atomic_sequence_num.h"
17 #include "base/callback.h" 17 #include "base/callback.h"
18 #include "base/compiler_specific.h" 18 #include "base/compiler_specific.h"
19 #include "base/critical_closure.h" 19 #include "base/critical_closure.h"
20 #include "base/lazy_instance.h" 20 #include "base/lazy_instance.h"
21 #include "base/logging.h" 21 #include "base/logging.h"
22 #include "base/macros.h" 22 #include "base/macros.h"
23 #include "base/memory/ptr_util.h" 23 #include "base/memory/ptr_util.h"
24 #include "base/stl_util.h" 24 #include "base/stl_util.h"
25 #include "base/strings/stringprintf.h" 25 #include "base/strings/stringprintf.h"
26 #include "base/synchronization/condition_variable.h" 26 #include "base/synchronization/condition_variable.h"
27 #include "base/synchronization/lock.h" 27 #include "base/synchronization/lock.h"
28 #include "base/task_scheduler/post_task.h"
29 #include "base/task_scheduler/task_scheduler.h"
28 #include "base/threading/platform_thread.h" 30 #include "base/threading/platform_thread.h"
29 #include "base/threading/simple_thread.h" 31 #include "base/threading/simple_thread.h"
30 #include "base/threading/thread_local.h" 32 #include "base/threading/thread_local.h"
31 #include "base/threading/thread_restrictions.h" 33 #include "base/threading/thread_restrictions.h"
32 #include "base/threading/thread_task_runner_handle.h" 34 #include "base/threading/thread_task_runner_handle.h"
33 #include "base/time/time.h" 35 #include "base/time/time.h"
34 #include "base/trace_event/heap_profiler.h" 36 #include "base/trace_event/heap_profiler.h"
35 #include "base/trace_event/trace_event.h" 37 #include "base/trace_event/trace_event.h"
36 #include "base/tracked_objects.h" 38 #include "base/tracked_objects.h"
37 #include "build/build_config.h" 39 #include "build/build_config.h"
38 40
39 #if defined(OS_MACOSX) 41 #if defined(OS_MACOSX)
40 #include "base/mac/scoped_nsautorelease_pool.h" 42 #include "base/mac/scoped_nsautorelease_pool.h"
41 #elif defined(OS_WIN) 43 #elif defined(OS_WIN)
42 #include "base/win/scoped_com_initializer.h" 44 #include "base/win/scoped_com_initializer.h"
43 #endif 45 #endif
44 46
45 #if !defined(OS_NACL) 47 #if !defined(OS_NACL)
46 #include "base/metrics/histogram.h" 48 #include "base/metrics/histogram.h"
47 #endif 49 #endif
48 50
49 namespace base { 51 namespace base {
50 52
51 namespace { 53 namespace {
52 54
55 // An enum representing the state of all pools. Any given process should only
56 // ever transition from NONE_ACTIVE to the active states, transitions between
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created
60 // because the first task was posted and the state is still NONE_ACTIVE.
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
62 // will be phased out completely otherwise).
63 enum class AllPoolsState {
64 NONE_ACTIVE,
65 WORKER_CREATED,
66 REDIRECTED_TO_TASK_SCHEDULER,
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE;
68
53 struct SequencedTask : public TrackingInfo { 69 struct SequencedTask : public TrackingInfo {
54 SequencedTask() 70 SequencedTask()
55 : sequence_token_id(0), 71 : sequence_token_id(0),
56 trace_id(0), 72 trace_id(0),
57 sequence_task_number(0), 73 sequence_task_number(0),
58 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 74 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
59 75
60 explicit SequencedTask(const tracked_objects::Location& from_here) 76 explicit SequencedTask(const tracked_objects::Location& from_here)
61 : base::TrackingInfo(from_here, TimeTicks()), 77 : base::TrackingInfo(from_here, TimeTicks()),
62 sequence_token_id(0), 78 sequence_token_id(0),
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 }; 363 };
348 364
349 enum CleanupState { 365 enum CleanupState {
350 CLEANUP_REQUESTED, 366 CLEANUP_REQUESTED,
351 CLEANUP_STARTING, 367 CLEANUP_STARTING,
352 CLEANUP_RUNNING, 368 CLEANUP_RUNNING,
353 CLEANUP_FINISHING, 369 CLEANUP_FINISHING,
354 CLEANUP_DONE, 370 CLEANUP_DONE,
355 }; 371 };
356 372
373 // Helper used by PostTask() to complete the work when redirection is on.
374 // Coalesce upon resolution of http://crbug.com/622400.
375 void PostTaskToTaskScheduler(const SequencedTask& sequenced);
376
357 // Called from within the lock, this converts the given token name into a 377 // Called from within the lock, this converts the given token name into a
358 // token ID, creating a new one if necessary. 378 // token ID, creating a new one if necessary.
359 int LockedGetNamedTokenID(const std::string& name); 379 int LockedGetNamedTokenID(const std::string& name);
360 380
361 // Called from within the lock, this returns the next sequence task number. 381 // Called from within the lock, this returns the next sequence task number.
362 int64_t LockedGetNextSequenceTaskNumber(); 382 int64_t LockedGetNextSequenceTaskNumber();
363 383
364 // Gets new task. There are 3 cases depending on the return value: 384 // Gets new task. There are 3 cases depending on the return value:
365 // 385 //
366 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 386 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
(...skipping 28 matching lines...) Expand all
395 415
396 // Checks if all threads are busy and the addition of one more could run an 416 // Checks if all threads are busy and the addition of one more could run an
397 // additional task waiting in the queue. This must be called from within 417 // additional task waiting in the queue. This must be called from within
398 // the lock. 418 // the lock.
399 // 419 //
400 // If another thread is helpful, this will mark the thread as being in the 420 // If another thread is helpful, this will mark the thread as being in the
401 // process of starting and returns the index of the new thread which will be 421 // process of starting and returns the index of the new thread which will be
402 // 0 or more. The caller should then call FinishStartingAdditionalThread to 422 // 0 or more. The caller should then call FinishStartingAdditionalThread to
403 // complete initialization once the lock is released. 423 // complete initialization once the lock is released.
404 // 424 //
405 // If another thread is not necessary, returne 0; 425 // If another thread is not necessary, return 0;
406 // 426 //
407 // See the implementedion for more. 427 // See the implementedion for more.
408 int PrepareToStartAdditionalThreadIfHelpful(); 428 int PrepareToStartAdditionalThreadIfHelpful();
409 429
410 // The second part of thread creation after 430 // The second part of thread creation after
411 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 431 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
412 // generated. This actually creates the thread and should be called outside 432 // generated. This actually creates the thread and should be called outside
413 // the lock to avoid blocking important work starting a thread in the lock. 433 // the lock to avoid blocking important work starting a thread in the lock.
414 void FinishStartingAdditionalThread(int thread_number); 434 void FinishStartingAdditionalThread(int thread_number);
415 435
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
495 // has been called. 515 // has been called.
496 int max_blocking_tasks_after_shutdown_; 516 int max_blocking_tasks_after_shutdown_;
497 517
498 // State used to cleanup for testing, all guarded by lock_. 518 // State used to cleanup for testing, all guarded by lock_.
499 CleanupState cleanup_state_; 519 CleanupState cleanup_state_;
500 size_t cleanup_idlers_; 520 size_t cleanup_idlers_;
501 ConditionVariable cleanup_cv_; 521 ConditionVariable cleanup_cv_;
502 522
503 TestingObserver* const testing_observer_; 523 TestingObserver* const testing_observer_;
504 524
525 // Members below are used for the experimental redirection to TaskScheduler.
526 // TODO(gab): Remove these if http://crbug.com/622400 fails
527 // (SequencedWorkerPool will be phased out completely otherwise).
528
505 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the 529 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
506 // TaskScheduler as an experiment (unused otherwise). 530 // TaskScheduler as an experiment (unused otherwise).
507 const base::TaskPriority task_priority_; 531 const base::TaskPriority task_priority_;
508 532
533 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
534 // SequencedWorkerPool usage to the TaskScheduler.
535 std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
536
537 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
538 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
539 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
540 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
541
509 DISALLOW_COPY_AND_ASSIGN(Inner); 542 DISALLOW_COPY_AND_ASSIGN(Inner);
510 }; 543 };
511 544
512 // Worker definitions --------------------------------------------------------- 545 // Worker definitions ---------------------------------------------------------
513 546
514 SequencedWorkerPool::Worker::Worker( 547 SequencedWorkerPool::Worker::Worker(
515 scoped_refptr<SequencedWorkerPool> worker_pool, 548 scoped_refptr<SequencedWorkerPool> worker_pool,
516 int thread_number, 549 int thread_number,
517 const std::string& prefix) 550 const std::string& prefix)
518 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 551 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
519 worker_pool_(std::move(worker_pool)), 552 worker_pool_(std::move(worker_pool)),
520 task_shutdown_behavior_(BLOCK_SHUTDOWN), 553 task_shutdown_behavior_(BLOCK_SHUTDOWN),
521 is_processing_task_(false) { 554 is_processing_task_(false) {
555 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
522 Start(); 556 Start();
523 } 557 }
524 558
525 SequencedWorkerPool::Worker::~Worker() { 559 SequencedWorkerPool::Worker::~Worker() {
526 } 560 }
527 561
528 void SequencedWorkerPool::Worker::Run() { 562 void SequencedWorkerPool::Worker::Run() {
563 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
564
529 #if defined(OS_WIN) 565 #if defined(OS_WIN)
530 win::ScopedCOMInitializer com_initializer; 566 win::ScopedCOMInitializer com_initializer;
531 #endif 567 #endif
532 568
533 // Store a pointer to this worker in thread local storage for static function 569 // Store a pointer to this worker in thread local storage for static function
534 // access. 570 // access.
535 DCHECK(!lazy_tls_ptr_.Get().Get()); 571 DCHECK(!lazy_tls_ptr_.Get().Get());
536 lazy_tls_ptr_.Get().Set(this); 572 lazy_tls_ptr_.Get().Set(this);
537 573
538 // Just jump back to the Inner object to run the thread, since it has all the 574 // Just jump back to the Inner object to run the thread, since it has all the
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
627 sequenced.shutdown_behavior = shutdown_behavior; 663 sequenced.shutdown_behavior = shutdown_behavior;
628 sequenced.posted_from = from_here; 664 sequenced.posted_from = from_here;
629 sequenced.task = 665 sequenced.task =
630 shutdown_behavior == BLOCK_SHUTDOWN ? 666 shutdown_behavior == BLOCK_SHUTDOWN ?
631 base::MakeCriticalClosure(task) : task; 667 base::MakeCriticalClosure(task) : task;
632 sequenced.time_to_run = TimeTicks::Now() + delay; 668 sequenced.time_to_run = TimeTicks::Now() + delay;
633 669
634 int create_thread_id = 0; 670 int create_thread_id = 0;
635 { 671 {
636 AutoLock lock(lock_); 672 AutoLock lock(lock_);
673
danakj 2016/08/15 22:05:49 (you added whitespace here as a result)
gab 2016/09/07 15:00:10 True, but I kind of think it's cleaner that way as
637 if (shutdown_called_) { 674 if (shutdown_called_) {
638 // Don't allow a new task to be posted if it doesn't block shutdown. 675 // Don't allow a new task to be posted if it doesn't block shutdown.
639 if (shutdown_behavior != BLOCK_SHUTDOWN) 676 if (shutdown_behavior != BLOCK_SHUTDOWN)
640 return false; 677 return false;
641 678
642 // If the current thread is running a task, and that task doesn't block 679 // If the current thread is running a task, and that task doesn't block
643 // shutdown, then it shouldn't be allowed to post any more tasks. 680 // shutdown, then it shouldn't be allowed to post any more tasks.
644 ThreadMap::const_iterator found = 681 ThreadMap::const_iterator found =
645 threads_.find(PlatformThread::CurrentId()); 682 threads_.find(PlatformThread::CurrentId());
646 if (found != threads_.end() && found->second->is_processing_task() && 683 if (found != threads_.end() && found->second->is_processing_task() &&
(...skipping 15 matching lines...) Expand all
662 "SequencedWorkerPool::Inner::PostTask", 699 "SequencedWorkerPool::Inner::PostTask",
663 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
664 TRACE_EVENT_FLAG_FLOW_OUT); 701 TRACE_EVENT_FLAG_FLOW_OUT);
665 702
666 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
667 704
668 // Now that we have the lock, apply the named token rules. 705 // Now that we have the lock, apply the named token rules.
669 if (optional_token_name) 706 if (optional_token_name)
670 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
671 708
672 pending_tasks_.insert(sequenced); 709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
673 if (shutdown_behavior == BLOCK_SHUTDOWN) 710 PostTaskToTaskScheduler(sequenced);
674 blocking_shutdown_pending_task_count_++; 711 } else {
712 pending_tasks_.insert(sequenced);
675 713
676 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
715 blocking_shutdown_pending_task_count_++;
716
717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
718 }
677 } 719 }
678 720
679 // Actually start the additional thread or signal an existing one now that 721 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
680 // we're outside the lock. 722 // Actually start the additional thread or signal an existing one outside
681 if (create_thread_id) 723 // the lock.
682 FinishStartingAdditionalThread(create_thread_id); 724 if (create_thread_id)
683 else 725 FinishStartingAdditionalThread(create_thread_id);
684 SignalHasWork(); 726 else
727 SignalHasWork();
728 }
729
730 #if DCHECK_IS_ON()
731 {
732 AutoLock lock_for_dcheck(lock_);
733 // Some variables are exposed in both modes for convenience but only really
734 // intended for one of them at runtime, confirm exclusive usage here.
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
736 DCHECK(pending_tasks_.empty());
737 DCHECK_EQ(0, create_thread_id);
738 } else {
739 DCHECK(sequenced_task_runner_map_.empty());
740 }
741 }
742 #endif // DCHECK_IS_ON()
685 743
686 return true; 744 return true;
687 } 745 }
688 746
747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
748 const SequencedTask& sequenced) {
fdoray 2016/08/25 15:14:01 This method doesn't honor delays :(
gab 2016/09/07 15:00:10 Oops, looks like you're addressing that in https:/
749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
750
751 lock_.AssertAcquired();
752
753 // Confirm that the TaskScheduler's shutdown behaviors use the same
754 // underlying values as SequencedWorkerPool.
755 static_assert(
756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
757 static_cast<int>(CONTINUE_ON_SHUTDOWN),
758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
759 "CONTINUE_ON_SHUTDOWN.");
760 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
761 static_cast<int>(SKIP_ON_SHUTDOWN),
762 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
763 "SKIP_ON_SHUTDOWN.");
764 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
765 static_cast<int>(BLOCK_SHUTDOWN),
766 "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
767 "BLOCK_SHUTDOWN.");
768
769 const TaskShutdownBehavior task_shutdown_behavior =
770 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
771 const TaskTraits pool_traits =
772 TaskTraits()
773 .WithFileIO()
774 .WithPriority(task_priority_)
775 .WithShutdownBehavior(task_shutdown_behavior);
fdoray 2016/08/25 15:14:01 SequencedWorkerPool allows tasks with different sh
gab 2016/09/07 15:00:10 Yes, let's do that (seems like that's what you're
776
777 // Find or create the TaskScheduler TaskRunner to redirect this task to if
778 // it is posted to a specific sequence.
779 scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr;
780 if (sequenced.sequence_token_id) {
781 sequenced_task_runner =
782 &sequenced_task_runner_map_[sequenced.sequence_token_id];
783 if (!*sequenced_task_runner) {
784 const ExecutionMode execution_mode =
785 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED
786 : ExecutionMode::SEQUENCED;
fdoray 2016/08/25 15:14:01 Should we DCHECK_LE(sequenced_task_runner_map_.siz
gab 2016/09/07 15:00:10 Good point, I think this is the case in practice s
787 *sequenced_task_runner =
788 CreateTaskRunnerWithTraits(pool_traits, execution_mode);
789 }
790 }
791
792 if (sequenced_task_runner) {
793 (*sequenced_task_runner)
794 ->PostTask(sequenced.posted_from, sequenced.task);
795 } else {
796 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are
797 // however a few pools that use only one thread and therefore can currently
798 // legitimatelly assume thread affinity despite using SequencedWorkerPool.
799 // Such pools typically only give access to their TaskRunner which will be
800 // SINGLE_THREADED per nature of the pool having only one thread but this
801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask()
802 // directly.
803 DCHECK_GT(max_threads_, 1U);
804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits,
805 sequenced.task);
806 }
807 }
808
689 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
690 AutoLock lock(lock_); 810 AutoLock lock(lock_);
691 return ContainsKey(threads_, PlatformThread::CurrentId()); 811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
812 if (!runs_tasks_on_verifier_) {
813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
814 TaskTraits().WithFileIO().WithPriority(task_priority_),
815 ExecutionMode::PARALLEL);
fdoray 2016/08/25 15:14:01 Should we have a special case for single-threaded
gab 2016/09/07 15:00:10 Yes, looks like you got this too :-)
816 }
817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
818 } else {
819 return ContainsKey(threads_, PlatformThread::CurrentId());
820 }
692 } 821 }
693 822
694 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
695 SequenceToken sequence_token) const { 824 SequenceToken sequence_token) const {
696 AutoLock lock(lock_); 825 AutoLock lock(lock_);
697 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
698 if (found == threads_.end()) 827 // TODO(gab): This currently only verifies that the current thread is a
699 return false; 828 // thread on which a task bound to |sequence_token| *could* run, but it
700 return found->second->is_processing_task() && 829 // doesn't verify that the current is *currently running* a task bound to
701 sequence_token.Equals(found->second->task_sequence_token()); 830 // |sequence_token|.
831 const auto sequenced_task_runner_it =
832 sequenced_task_runner_map_.find(sequence_token.id_);
833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
835 } else {
836 ThreadMap::const_iterator found =
837 threads_.find(PlatformThread::CurrentId());
838 if (found == threads_.end())
839 return false;
840 return found->second->is_processing_task() &&
841 sequence_token.Equals(found->second->task_sequence_token());
842 }
702 } 843 }
703 844
704 // See https://code.google.com/p/chromium/issues/detail?id=168415 845 // See https://code.google.com/p/chromium/issues/detail?id=168415
705 void SequencedWorkerPool::Inner::CleanupForTesting() { 846 void SequencedWorkerPool::Inner::CleanupForTesting() {
706 DCHECK(!RunsTasksOnCurrentThread()); 847 DCHECK(!RunsTasksOnCurrentThread());
707 base::ThreadRestrictions::ScopedAllowWait allow_wait; 848 base::ThreadRestrictions::ScopedAllowWait allow_wait;
708 AutoLock lock(lock_); 849 AutoLock lock(lock_);
709 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 850 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
710 if (shutdown_called_) 851 if (shutdown_called_)
711 return; 852 return;
(...skipping 13 matching lines...) Expand all
725 void SequencedWorkerPool::Inner::Shutdown( 866 void SequencedWorkerPool::Inner::Shutdown(
726 int max_new_blocking_tasks_after_shutdown) { 867 int max_new_blocking_tasks_after_shutdown) {
727 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 868 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
728 { 869 {
729 AutoLock lock(lock_); 870 AutoLock lock(lock_);
730 // Cleanup and Shutdown should not be called concurrently. 871 // Cleanup and Shutdown should not be called concurrently.
731 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 872 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
732 if (shutdown_called_) 873 if (shutdown_called_)
733 return; 874 return;
734 shutdown_called_ = true; 875 shutdown_called_ = true;
876
877 if (g_all_pools_state != AllPoolsState::WORKER_CREATED)
878 return;
879
735 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 880 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
736 881
737 // Tickle the threads. This will wake up a waiting one so it will know that 882 // Tickle the threads. This will wake up a waiting one so it will know that
738 // it can exit, which in turn will wake up any other waiting ones. 883 // it can exit, which in turn will wake up any other waiting ones.
739 SignalHasWork(); 884 SignalHasWork();
740 885
741 // There are no pending or running tasks blocking shutdown, we're done. 886 // There are no pending or running tasks blocking shutdown, we're done.
742 if (CanShutdown()) 887 if (CanShutdown())
743 return; 888 return;
744 } 889 }
(...skipping 19 matching lines...) Expand all
764 TimeTicks::Now() - shutdown_wait_begin); 909 TimeTicks::Now() - shutdown_wait_begin);
765 #endif 910 #endif
766 } 911 }
767 912
768 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 913 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
769 AutoLock lock(lock_); 914 AutoLock lock(lock_);
770 return shutdown_called_; 915 return shutdown_called_;
771 } 916 }
772 917
773 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 918 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
919 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
774 { 920 {
775 AutoLock lock(lock_); 921 AutoLock lock(lock_);
776 DCHECK(thread_being_created_); 922 DCHECK(thread_being_created_);
777 thread_being_created_ = false; 923 thread_being_created_ = false;
778 auto result = threads_.insert( 924 auto result = threads_.insert(
779 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 925 std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
780 DCHECK(result.second); 926 DCHECK(result.second);
781 927
782 while (true) { 928 while (true) {
783 #if defined(OS_MACOSX) 929 #if defined(OS_MACOSX)
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
898 1044
899 // We noticed we should exit. Wake up the next worker so it knows it should 1045 // We noticed we should exit. Wake up the next worker so it knows it should
900 // exit as well (because the Shutdown() code only signals once). 1046 // exit as well (because the Shutdown() code only signals once).
901 SignalHasWork(); 1047 SignalHasWork();
902 1048
903 // Possibly unblock shutdown. 1049 // Possibly unblock shutdown.
904 can_shutdown_cv_.Signal(); 1050 can_shutdown_cv_.Signal();
905 } 1051 }
906 1052
907 void SequencedWorkerPool::Inner::HandleCleanup() { 1053 void SequencedWorkerPool::Inner::HandleCleanup() {
1054 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1055
908 lock_.AssertAcquired(); 1056 lock_.AssertAcquired();
909 if (cleanup_state_ == CLEANUP_DONE) 1057 if (cleanup_state_ == CLEANUP_DONE)
910 return; 1058 return;
911 if (cleanup_state_ == CLEANUP_REQUESTED) { 1059 if (cleanup_state_ == CLEANUP_REQUESTED) {
912 // We win, we get to do the cleanup as soon as the others wise up and idle. 1060 // We win, we get to do the cleanup as soon as the others wise up and idle.
913 cleanup_state_ = CLEANUP_STARTING; 1061 cleanup_state_ = CLEANUP_STARTING;
914 while (thread_being_created_ || 1062 while (thread_being_created_ ||
915 cleanup_idlers_ != threads_.size() - 1) { 1063 cleanup_idlers_ != threads_.size() - 1) {
916 has_work_cv_.Signal(); 1064 has_work_cv_.Signal();
917 cleanup_cv_.Wait(); 1065 cleanup_cv_.Wait();
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
963 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1111 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
964 lock_.AssertAcquired(); 1112 lock_.AssertAcquired();
965 // We assume that we never create enough tasks to wrap around. 1113 // We assume that we never create enough tasks to wrap around.
966 return next_sequence_task_number_++; 1114 return next_sequence_task_number_++;
967 } 1115 }
968 1116
969 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1117 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
970 SequencedTask* task, 1118 SequencedTask* task,
971 TimeDelta* wait_time, 1119 TimeDelta* wait_time,
972 std::vector<Closure>* delete_these_outside_lock) { 1120 std::vector<Closure>* delete_these_outside_lock) {
1121 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1122
973 lock_.AssertAcquired(); 1123 lock_.AssertAcquired();
974 1124
975 // Find the next task with a sequence token that's not currently in use. 1125 // Find the next task with a sequence token that's not currently in use.
976 // If the token is in use, that means another thread is running something 1126 // If the token is in use, that means another thread is running something
977 // in that sequence, and we can't run it without going out-of-order. 1127 // in that sequence, and we can't run it without going out-of-order.
978 // 1128 //
979 // This algorithm is simple and fair, but inefficient in some cases. For 1129 // This algorithm is simple and fair, but inefficient in some cases. For
980 // example, say somebody schedules 1000 slow tasks with the same sequence 1130 // example, say somebody schedules 1000 slow tasks with the same sequence
981 // number. We'll have to go through all those tasks each time we feel like 1131 // number. We'll have to go through all those tasks each time we feel like
982 // there might be work to schedule. If this proves to be a problem, we 1132 // there might be work to schedule. If this proves to be a problem, we
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
1049 } 1199 }
1050 1200
1051 status = GET_WORK_FOUND; 1201 status = GET_WORK_FOUND;
1052 break; 1202 break;
1053 } 1203 }
1054 1204
1055 return status; 1205 return status;
1056 } 1206 }
1057 1207
1058 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1208 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1209 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1210
1059 lock_.AssertAcquired(); 1211 lock_.AssertAcquired();
1060 1212
1061 // Mark the task's sequence number as in use. 1213 // Mark the task's sequence number as in use.
1062 if (task.sequence_token_id) 1214 if (task.sequence_token_id)
1063 current_sequences_.insert(task.sequence_token_id); 1215 current_sequences_.insert(task.sequence_token_id);
1064 1216
1065 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1217 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1066 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1218 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1067 // completes. 1219 // completes.
1068 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1220 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
(...skipping 11 matching lines...) Expand all
1080 // if there is one waiting to pick up the next task. 1232 // if there is one waiting to pick up the next task.
1081 // 1233 //
1082 // Note that we really need to do this *before* running the task, not 1234 // Note that we really need to do this *before* running the task, not
1083 // after. Otherwise, if more than one task is posted, the creation of the 1235 // after. Otherwise, if more than one task is posted, the creation of the
1084 // second thread (since we only create one at a time) will be blocked by 1236 // second thread (since we only create one at a time) will be blocked by
1085 // the execution of the first task, which could be arbitrarily long. 1237 // the execution of the first task, which could be arbitrarily long.
1086 return PrepareToStartAdditionalThreadIfHelpful(); 1238 return PrepareToStartAdditionalThreadIfHelpful();
1087 } 1239 }
1088 1240
1089 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1241 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1242 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1243
1090 lock_.AssertAcquired(); 1244 lock_.AssertAcquired();
1091 1245
1092 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1246 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1093 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1247 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1094 blocking_shutdown_thread_count_--; 1248 blocking_shutdown_thread_count_--;
1095 } 1249 }
1096 1250
1097 if (task.sequence_token_id) 1251 if (task.sequence_token_id)
1098 current_sequences_.erase(task.sequence_token_id); 1252 current_sequences_.erase(task.sequence_token_id);
1099 } 1253 }
1100 1254
1101 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1255 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1102 int sequence_token_id) const { 1256 int sequence_token_id) const {
1257 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1258
1103 lock_.AssertAcquired(); 1259 lock_.AssertAcquired();
1104 return !sequence_token_id || 1260 return !sequence_token_id ||
1105 current_sequences_.find(sequence_token_id) == 1261 current_sequences_.find(sequence_token_id) ==
1106 current_sequences_.end(); 1262 current_sequences_.end();
1107 } 1263 }
1108 1264
1109 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1265 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1266 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1267
1110 lock_.AssertAcquired(); 1268 lock_.AssertAcquired();
1111 // How thread creation works: 1269 // How thread creation works:
1112 // 1270 //
1113 // We'de like to avoid creating threads with the lock held. However, we 1271 // We'de like to avoid creating threads with the lock held. However, we
1114 // need to be sure that we have an accurate accounting of the threads for 1272 // need to be sure that we have an accurate accounting of the threads for
1115 // proper Joining and deltion on shutdown. 1273 // proper Joining and deltion on shutdown.
1116 // 1274 //
1117 // We need to figure out if we need another thread with the lock held, which 1275 // We need to figure out if we need another thread with the lock held, which
1118 // is what this function does. It then marks us as in the process of creating 1276 // is what this function does. It then marks us as in the process of creating
1119 // a thread. When we do shutdown, we wait until the thread_being_created_ 1277 // a thread. When we do shutdown, we wait until the thread_being_created_
(...skipping 30 matching lines...) Expand all
1150 thread_being_created_ = true; 1308 thread_being_created_ = true;
1151 return static_cast<int>(threads_.size() + 1); 1309 return static_cast<int>(threads_.size() + 1);
1152 } 1310 }
1153 } 1311 }
1154 } 1312 }
1155 return 0; 1313 return 0;
1156 } 1314 }
1157 1315
1158 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1316 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1159 int thread_number) { 1317 int thread_number) {
1318 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1319
1160 // Called outside of the lock. 1320 // Called outside of the lock.
1161 DCHECK_GT(thread_number, 0); 1321 DCHECK_GT(thread_number, 0);
1162 1322
1323 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) {
1324 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1325 g_all_pools_state = AllPoolsState::WORKER_CREATED;
1326 }
1327
1163 // The worker is assigned to the list when the thread actually starts, which 1328 // The worker is assigned to the list when the thread actually starts, which
1164 // will manage the memory of the pointer. 1329 // will manage the memory of the pointer.
1165 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1330 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1166 } 1331 }
1167 1332
1168 void SequencedWorkerPool::Inner::SignalHasWork() { 1333 void SequencedWorkerPool::Inner::SignalHasWork() {
1334 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
1335
1169 has_work_cv_.Signal(); 1336 has_work_cv_.Signal();
1170 if (testing_observer_) { 1337 if (testing_observer_) {
1171 testing_observer_->OnHasWork(); 1338 testing_observer_->OnHasWork();
1172 } 1339 }
1173 } 1340 }
1174 1341
1175 bool SequencedWorkerPool::Inner::CanShutdown() const { 1342 bool SequencedWorkerPool::Inner::CanShutdown() const {
1343 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state);
1176 lock_.AssertAcquired(); 1344 lock_.AssertAcquired();
1177 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1345 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1178 return !thread_being_created_ && 1346 return !thread_being_created_ &&
1179 blocking_shutdown_thread_count_ == 0 && 1347 blocking_shutdown_thread_count_ == 0 &&
1180 blocking_shutdown_pending_task_count_ == 0; 1348 blocking_shutdown_pending_task_count_ == 0;
1181 } 1349 }
1182 1350
1183 base::StaticAtomicSequenceNumber 1351 base::StaticAtomicSequenceNumber
1184 SequencedWorkerPool::Inner::g_last_sequence_number_; 1352 SequencedWorkerPool::Inner::g_last_sequence_number_;
1185 1353
(...skipping 16 matching lines...) Expand all
1202 // static 1370 // static
1203 scoped_refptr<SequencedWorkerPool> 1371 scoped_refptr<SequencedWorkerPool>
1204 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1372 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1205 Worker* worker = Worker::GetForCurrentThread(); 1373 Worker* worker = Worker::GetForCurrentThread();
1206 if (!worker) 1374 if (!worker)
1207 return nullptr; 1375 return nullptr;
1208 1376
1209 return worker->worker_pool(); 1377 return worker->worker_pool();
1210 } 1378 }
1211 1379
1380 // static
1381 void SequencedWorkerPool::
1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() {
1383 DCHECK(TaskScheduler::GetInstance());
1384 // Hitting this DCHECK indicates that a task was posted to a
1385 // SequencedWorkerPool before the TaskScheduler was initialized and
1386 // redirected, posting task to SequencedWorkerPools needs to at least be
1387 // delayed until after that point.
1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state);
1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
1390 }
1391
1212 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1213 const std::string& thread_name_prefix, 1393 const std::string& thread_name_prefix,
1214 base::TaskPriority task_priority) 1394 base::TaskPriority task_priority)
1215 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1216 inner_(new Inner(this, 1396 inner_(new Inner(this,
1217 max_threads, 1397 max_threads,
1218 thread_name_prefix, 1398 thread_name_prefix,
1219 task_priority, 1399 task_priority,
1220 NULL)) {} 1400 NULL)) {}
1221 1401
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
1358 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1359 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1360 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1361 } 1541 }
1362 1542
1363 bool SequencedWorkerPool::IsShutdownInProgress() { 1543 bool SequencedWorkerPool::IsShutdownInProgress() {
1364 return inner_->IsShutdownInProgress(); 1544 return inner_->IsShutdownInProgress();
1365 } 1545 }
1366 1546
1367 } // namespace base 1547 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/chrome_browser_main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698