OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <list> | 9 #include <list> |
10 #include <map> | 10 #include <map> |
11 #include <memory> | 11 #include <memory> |
12 #include <set> | 12 #include <set> |
13 #include <utility> | 13 #include <utility> |
14 #include <vector> | 14 #include <vector> |
15 | 15 |
16 #include "base/atomic_sequence_num.h" | 16 #include "base/atomic_sequence_num.h" |
17 #include "base/callback.h" | 17 #include "base/callback.h" |
18 #include "base/compiler_specific.h" | 18 #include "base/compiler_specific.h" |
19 #include "base/critical_closure.h" | 19 #include "base/critical_closure.h" |
20 #include "base/lazy_instance.h" | 20 #include "base/lazy_instance.h" |
21 #include "base/logging.h" | 21 #include "base/logging.h" |
22 #include "base/macros.h" | 22 #include "base/macros.h" |
23 #include "base/memory/ptr_util.h" | 23 #include "base/memory/ptr_util.h" |
24 #include "base/stl_util.h" | 24 #include "base/stl_util.h" |
25 #include "base/strings/stringprintf.h" | 25 #include "base/strings/stringprintf.h" |
26 #include "base/synchronization/condition_variable.h" | 26 #include "base/synchronization/condition_variable.h" |
27 #include "base/synchronization/lock.h" | 27 #include "base/synchronization/lock.h" |
28 #include "base/task_scheduler/post_task.h" | |
29 #include "base/task_scheduler/task_scheduler.h" | |
28 #include "base/threading/platform_thread.h" | 30 #include "base/threading/platform_thread.h" |
29 #include "base/threading/simple_thread.h" | 31 #include "base/threading/simple_thread.h" |
30 #include "base/threading/thread_local.h" | 32 #include "base/threading/thread_local.h" |
31 #include "base/threading/thread_restrictions.h" | 33 #include "base/threading/thread_restrictions.h" |
32 #include "base/threading/thread_task_runner_handle.h" | 34 #include "base/threading/thread_task_runner_handle.h" |
33 #include "base/time/time.h" | 35 #include "base/time/time.h" |
34 #include "base/trace_event/heap_profiler.h" | 36 #include "base/trace_event/heap_profiler.h" |
35 #include "base/trace_event/trace_event.h" | 37 #include "base/trace_event/trace_event.h" |
36 #include "base/tracked_objects.h" | 38 #include "base/tracked_objects.h" |
37 #include "build/build_config.h" | 39 #include "build/build_config.h" |
38 | 40 |
39 #if defined(OS_MACOSX) | 41 #if defined(OS_MACOSX) |
40 #include "base/mac/scoped_nsautorelease_pool.h" | 42 #include "base/mac/scoped_nsautorelease_pool.h" |
41 #elif defined(OS_WIN) | 43 #elif defined(OS_WIN) |
42 #include "base/win/scoped_com_initializer.h" | 44 #include "base/win/scoped_com_initializer.h" |
43 #endif | 45 #endif |
44 | 46 |
45 #if !defined(OS_NACL) | 47 #if !defined(OS_NACL) |
46 #include "base/metrics/histogram.h" | 48 #include "base/metrics/histogram.h" |
47 #endif | 49 #endif |
48 | 50 |
49 namespace base { | 51 namespace base { |
50 | 52 |
51 namespace { | 53 namespace { |
52 | 54 |
55 // An enum representing the state of all pools. Any given process should only | |
56 // ever transition from NONE_ACTIVE to the active states, transitions between | |
57 // actives states are unexpected. The REDIRECTED_TO_TASK_SCHEDULER transition | |
58 // occurs when RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() is called | |
59 // and the WORKER_CREATED transition occurs when a Worker needs to be created | |
60 // because the first task was posted and the state is still NONE_ACTIVE. | |
61 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool | |
62 // will be phased out completely otherwise). | |
63 enum class AllPoolsState { | |
64 NONE_ACTIVE, | |
65 WORKER_CREATED, | |
66 REDIRECTED_TO_TASK_SCHEDULER, | |
67 } g_all_pools_state = AllPoolsState::NONE_ACTIVE; | |
68 | |
53 struct SequencedTask : public TrackingInfo { | 69 struct SequencedTask : public TrackingInfo { |
54 SequencedTask() | 70 SequencedTask() |
55 : sequence_token_id(0), | 71 : sequence_token_id(0), |
56 trace_id(0), | 72 trace_id(0), |
57 sequence_task_number(0), | 73 sequence_task_number(0), |
58 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} | 74 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
59 | 75 |
60 explicit SequencedTask(const tracked_objects::Location& from_here) | 76 explicit SequencedTask(const tracked_objects::Location& from_here) |
61 : base::TrackingInfo(from_here, TimeTicks()), | 77 : base::TrackingInfo(from_here, TimeTicks()), |
62 sequence_token_id(0), | 78 sequence_token_id(0), |
(...skipping 284 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
347 }; | 363 }; |
348 | 364 |
349 enum CleanupState { | 365 enum CleanupState { |
350 CLEANUP_REQUESTED, | 366 CLEANUP_REQUESTED, |
351 CLEANUP_STARTING, | 367 CLEANUP_STARTING, |
352 CLEANUP_RUNNING, | 368 CLEANUP_RUNNING, |
353 CLEANUP_FINISHING, | 369 CLEANUP_FINISHING, |
354 CLEANUP_DONE, | 370 CLEANUP_DONE, |
355 }; | 371 }; |
356 | 372 |
373 // Helper used by PostTask() to complete the work when redirection is on. | |
374 // Coalesce upon resolution of http://crbug.com/622400. | |
375 void PostTaskToTaskScheduler(const SequencedTask& sequenced); | |
376 | |
357 // Called from within the lock, this converts the given token name into a | 377 // Called from within the lock, this converts the given token name into a |
358 // token ID, creating a new one if necessary. | 378 // token ID, creating a new one if necessary. |
359 int LockedGetNamedTokenID(const std::string& name); | 379 int LockedGetNamedTokenID(const std::string& name); |
360 | 380 |
361 // Called from within the lock, this returns the next sequence task number. | 381 // Called from within the lock, this returns the next sequence task number. |
362 int64_t LockedGetNextSequenceTaskNumber(); | 382 int64_t LockedGetNextSequenceTaskNumber(); |
363 | 383 |
364 // Gets new task. There are 3 cases depending on the return value: | 384 // Gets new task. There are 3 cases depending on the return value: |
365 // | 385 // |
366 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should | 386 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should |
(...skipping 28 matching lines...) Expand all Loading... | |
395 | 415 |
396 // Checks if all threads are busy and the addition of one more could run an | 416 // Checks if all threads are busy and the addition of one more could run an |
397 // additional task waiting in the queue. This must be called from within | 417 // additional task waiting in the queue. This must be called from within |
398 // the lock. | 418 // the lock. |
399 // | 419 // |
400 // If another thread is helpful, this will mark the thread as being in the | 420 // If another thread is helpful, this will mark the thread as being in the |
401 // process of starting and returns the index of the new thread which will be | 421 // process of starting and returns the index of the new thread which will be |
402 // 0 or more. The caller should then call FinishStartingAdditionalThread to | 422 // 0 or more. The caller should then call FinishStartingAdditionalThread to |
403 // complete initialization once the lock is released. | 423 // complete initialization once the lock is released. |
404 // | 424 // |
405 // If another thread is not necessary, returne 0; | 425 // If another thread is not necessary, return 0; |
406 // | 426 // |
407 // See the implementedion for more. | 427 // See the implementedion for more. |
408 int PrepareToStartAdditionalThreadIfHelpful(); | 428 int PrepareToStartAdditionalThreadIfHelpful(); |
409 | 429 |
410 // The second part of thread creation after | 430 // The second part of thread creation after |
411 // PrepareToStartAdditionalThreadIfHelpful with the thread number it | 431 // PrepareToStartAdditionalThreadIfHelpful with the thread number it |
412 // generated. This actually creates the thread and should be called outside | 432 // generated. This actually creates the thread and should be called outside |
413 // the lock to avoid blocking important work starting a thread in the lock. | 433 // the lock to avoid blocking important work starting a thread in the lock. |
414 void FinishStartingAdditionalThread(int thread_number); | 434 void FinishStartingAdditionalThread(int thread_number); |
415 | 435 |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
495 // has been called. | 515 // has been called. |
496 int max_blocking_tasks_after_shutdown_; | 516 int max_blocking_tasks_after_shutdown_; |
497 | 517 |
498 // State used to cleanup for testing, all guarded by lock_. | 518 // State used to cleanup for testing, all guarded by lock_. |
499 CleanupState cleanup_state_; | 519 CleanupState cleanup_state_; |
500 size_t cleanup_idlers_; | 520 size_t cleanup_idlers_; |
501 ConditionVariable cleanup_cv_; | 521 ConditionVariable cleanup_cv_; |
502 | 522 |
503 TestingObserver* const testing_observer_; | 523 TestingObserver* const testing_observer_; |
504 | 524 |
525 // Members below are used for the experimental redirection to TaskScheduler. | |
526 // TODO(gab): Remove these if http://crbug.com/622400 fails | |
527 // (SequencedWorkerPool will be phased out completely otherwise). | |
528 | |
505 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the | 529 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the |
506 // TaskScheduler as an experiment (unused otherwise). | 530 // TaskScheduler as an experiment (unused otherwise). |
507 const base::TaskPriority task_priority_; | 531 const base::TaskPriority task_priority_; |
508 | 532 |
533 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect | |
534 // SequencedWorkerPool usage to the TaskScheduler. | |
535 std::map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; | |
536 | |
537 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as | |
538 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). | |
539 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). | |
540 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; | |
541 | |
509 DISALLOW_COPY_AND_ASSIGN(Inner); | 542 DISALLOW_COPY_AND_ASSIGN(Inner); |
510 }; | 543 }; |
511 | 544 |
512 // Worker definitions --------------------------------------------------------- | 545 // Worker definitions --------------------------------------------------------- |
513 | 546 |
514 SequencedWorkerPool::Worker::Worker( | 547 SequencedWorkerPool::Worker::Worker( |
515 scoped_refptr<SequencedWorkerPool> worker_pool, | 548 scoped_refptr<SequencedWorkerPool> worker_pool, |
516 int thread_number, | 549 int thread_number, |
517 const std::string& prefix) | 550 const std::string& prefix) |
518 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 551 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
519 worker_pool_(std::move(worker_pool)), | 552 worker_pool_(std::move(worker_pool)), |
520 task_shutdown_behavior_(BLOCK_SHUTDOWN), | 553 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
521 is_processing_task_(false) { | 554 is_processing_task_(false) { |
555 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
522 Start(); | 556 Start(); |
523 } | 557 } |
524 | 558 |
525 SequencedWorkerPool::Worker::~Worker() { | 559 SequencedWorkerPool::Worker::~Worker() { |
526 } | 560 } |
527 | 561 |
528 void SequencedWorkerPool::Worker::Run() { | 562 void SequencedWorkerPool::Worker::Run() { |
563 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
564 | |
529 #if defined(OS_WIN) | 565 #if defined(OS_WIN) |
530 win::ScopedCOMInitializer com_initializer; | 566 win::ScopedCOMInitializer com_initializer; |
531 #endif | 567 #endif |
532 | 568 |
533 // Store a pointer to this worker in thread local storage for static function | 569 // Store a pointer to this worker in thread local storage for static function |
534 // access. | 570 // access. |
535 DCHECK(!lazy_tls_ptr_.Get().Get()); | 571 DCHECK(!lazy_tls_ptr_.Get().Get()); |
536 lazy_tls_ptr_.Get().Set(this); | 572 lazy_tls_ptr_.Get().Set(this); |
537 | 573 |
538 // Just jump back to the Inner object to run the thread, since it has all the | 574 // Just jump back to the Inner object to run the thread, since it has all the |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
627 sequenced.shutdown_behavior = shutdown_behavior; | 663 sequenced.shutdown_behavior = shutdown_behavior; |
628 sequenced.posted_from = from_here; | 664 sequenced.posted_from = from_here; |
629 sequenced.task = | 665 sequenced.task = |
630 shutdown_behavior == BLOCK_SHUTDOWN ? | 666 shutdown_behavior == BLOCK_SHUTDOWN ? |
631 base::MakeCriticalClosure(task) : task; | 667 base::MakeCriticalClosure(task) : task; |
632 sequenced.time_to_run = TimeTicks::Now() + delay; | 668 sequenced.time_to_run = TimeTicks::Now() + delay; |
633 | 669 |
634 int create_thread_id = 0; | 670 int create_thread_id = 0; |
635 { | 671 { |
636 AutoLock lock(lock_); | 672 AutoLock lock(lock_); |
673 | |
danakj
2016/08/15 22:05:49
(you added whitespace here as a result)
gab
2016/09/07 15:00:10
True, but I kind of think it's cleaner that way as
| |
637 if (shutdown_called_) { | 674 if (shutdown_called_) { |
638 // Don't allow a new task to be posted if it doesn't block shutdown. | 675 // Don't allow a new task to be posted if it doesn't block shutdown. |
639 if (shutdown_behavior != BLOCK_SHUTDOWN) | 676 if (shutdown_behavior != BLOCK_SHUTDOWN) |
640 return false; | 677 return false; |
641 | 678 |
642 // If the current thread is running a task, and that task doesn't block | 679 // If the current thread is running a task, and that task doesn't block |
643 // shutdown, then it shouldn't be allowed to post any more tasks. | 680 // shutdown, then it shouldn't be allowed to post any more tasks. |
644 ThreadMap::const_iterator found = | 681 ThreadMap::const_iterator found = |
645 threads_.find(PlatformThread::CurrentId()); | 682 threads_.find(PlatformThread::CurrentId()); |
646 if (found != threads_.end() && found->second->is_processing_task() && | 683 if (found != threads_.end() && found->second->is_processing_task() && |
(...skipping 15 matching lines...) Expand all Loading... | |
662 "SequencedWorkerPool::Inner::PostTask", | 699 "SequencedWorkerPool::Inner::PostTask", |
663 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), | 700 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), |
664 TRACE_EVENT_FLAG_FLOW_OUT); | 701 TRACE_EVENT_FLAG_FLOW_OUT); |
665 | 702 |
666 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); | 703 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
667 | 704 |
668 // Now that we have the lock, apply the named token rules. | 705 // Now that we have the lock, apply the named token rules. |
669 if (optional_token_name) | 706 if (optional_token_name) |
670 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | 707 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
671 | 708 |
672 pending_tasks_.insert(sequenced); | 709 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
673 if (shutdown_behavior == BLOCK_SHUTDOWN) | 710 PostTaskToTaskScheduler(sequenced); |
674 blocking_shutdown_pending_task_count_++; | 711 } else { |
712 pending_tasks_.insert(sequenced); | |
675 | 713 |
676 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 714 if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) |
715 blocking_shutdown_pending_task_count_++; | |
716 | |
717 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | |
718 } | |
677 } | 719 } |
678 | 720 |
679 // Actually start the additional thread or signal an existing one now that | 721 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
680 // we're outside the lock. | 722 // Actually start the additional thread or signal an existing one outside |
681 if (create_thread_id) | 723 // the lock. |
682 FinishStartingAdditionalThread(create_thread_id); | 724 if (create_thread_id) |
683 else | 725 FinishStartingAdditionalThread(create_thread_id); |
684 SignalHasWork(); | 726 else |
727 SignalHasWork(); | |
728 } | |
729 | |
730 #if DCHECK_IS_ON() | |
731 { | |
732 AutoLock lock_for_dcheck(lock_); | |
733 // Some variables are exposed in both modes for convenience but only really | |
734 // intended for one of them at runtime, confirm exclusive usage here. | |
735 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { | |
736 DCHECK(pending_tasks_.empty()); | |
737 DCHECK_EQ(0, create_thread_id); | |
738 } else { | |
739 DCHECK(sequenced_task_runner_map_.empty()); | |
740 } | |
741 } | |
742 #endif // DCHECK_IS_ON() | |
685 | 743 |
686 return true; | 744 return true; |
687 } | 745 } |
688 | 746 |
747 void SequencedWorkerPool::Inner::PostTaskToTaskScheduler( | |
748 const SequencedTask& sequenced) { | |
fdoray
2016/08/25 15:14:01
This method doesn't honor delays :(
gab
2016/09/07 15:00:10
Oops, looks like you're addressing that in https:/
| |
749 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
750 | |
751 lock_.AssertAcquired(); | |
752 | |
753 // Confirm that the TaskScheduler's shutdown behaviors use the same | |
754 // underlying values as SequencedWorkerPool. | |
755 static_assert( | |
756 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == | |
757 static_cast<int>(CONTINUE_ON_SHUTDOWN), | |
758 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | |
759 "CONTINUE_ON_SHUTDOWN."); | |
760 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == | |
761 static_cast<int>(SKIP_ON_SHUTDOWN), | |
762 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | |
763 "SKIP_ON_SHUTDOWN."); | |
764 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == | |
765 static_cast<int>(BLOCK_SHUTDOWN), | |
766 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " | |
767 "BLOCK_SHUTDOWN."); | |
768 | |
769 const TaskShutdownBehavior task_shutdown_behavior = | |
770 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); | |
771 const TaskTraits pool_traits = | |
772 TaskTraits() | |
773 .WithFileIO() | |
774 .WithPriority(task_priority_) | |
775 .WithShutdownBehavior(task_shutdown_behavior); | |
fdoray
2016/08/25 15:14:01
SequencedWorkerPool allows tasks with different sh
gab
2016/09/07 15:00:10
Yes, let's do that (seems like that's what you're
| |
776 | |
777 // Find or create the TaskScheduler TaskRunner to redirect this task to if | |
778 // it is posted to a specific sequence. | |
779 scoped_refptr<TaskRunner>* sequenced_task_runner = nullptr; | |
780 if (sequenced.sequence_token_id) { | |
781 sequenced_task_runner = | |
782 &sequenced_task_runner_map_[sequenced.sequence_token_id]; | |
783 if (!*sequenced_task_runner) { | |
784 const ExecutionMode execution_mode = | |
785 max_threads_ == 1U ? ExecutionMode::SINGLE_THREADED | |
786 : ExecutionMode::SEQUENCED; | |
fdoray
2016/08/25 15:14:01
Should we DCHECK_LE(sequenced_task_runner_map_.siz
gab
2016/09/07 15:00:10
Good point, I think this is the case in practice s
| |
787 *sequenced_task_runner = | |
788 CreateTaskRunnerWithTraits(pool_traits, execution_mode); | |
789 } | |
790 } | |
791 | |
792 if (sequenced_task_runner) { | |
793 (*sequenced_task_runner) | |
794 ->PostTask(sequenced.posted_from, sequenced.task); | |
795 } else { | |
796 // PostTaskWithTraits() posts a task with PARALLEL semantics. There are | |
797 // however a few pools that use only one thread and therefore can currently | |
798 // legitimatelly assume thread affinity despite using SequencedWorkerPool. | |
799 // Such pools typically only give access to their TaskRunner which will be | |
800 // SINGLE_THREADED per nature of the pool having only one thread but this | |
801 // DCHECK ensures no such pools use SequencedWorkerPool::PostTask() | |
802 // directly. | |
803 DCHECK_GT(max_threads_, 1U); | |
804 base::PostTaskWithTraits(sequenced.posted_from, pool_traits, | |
805 sequenced.task); | |
806 } | |
807 } | |
808 | |
689 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 809 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
690 AutoLock lock(lock_); | 810 AutoLock lock(lock_); |
691 return ContainsKey(threads_, PlatformThread::CurrentId()); | 811 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
812 if (!runs_tasks_on_verifier_) { | |
813 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( | |
814 TaskTraits().WithFileIO().WithPriority(task_priority_), | |
815 ExecutionMode::PARALLEL); | |
fdoray
2016/08/25 15:14:01
Should we have a special case for single-threaded
gab
2016/09/07 15:00:10
Yes, looks like you got this too :-)
| |
816 } | |
817 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); | |
818 } else { | |
819 return ContainsKey(threads_, PlatformThread::CurrentId()); | |
820 } | |
692 } | 821 } |
693 | 822 |
694 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 823 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
695 SequenceToken sequence_token) const { | 824 SequenceToken sequence_token) const { |
696 AutoLock lock(lock_); | 825 AutoLock lock(lock_); |
697 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 826 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { |
698 if (found == threads_.end()) | 827 // TODO(gab): This currently only verifies that the current thread is a |
699 return false; | 828 // thread on which a task bound to |sequence_token| *could* run, but it |
700 return found->second->is_processing_task() && | 829 // doesn't verify that the current is *currently running* a task bound to |
701 sequence_token.Equals(found->second->task_sequence_token()); | 830 // |sequence_token|. |
831 const auto sequenced_task_runner_it = | |
832 sequenced_task_runner_map_.find(sequence_token.id_); | |
833 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && | |
834 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); | |
835 } else { | |
836 ThreadMap::const_iterator found = | |
837 threads_.find(PlatformThread::CurrentId()); | |
838 if (found == threads_.end()) | |
839 return false; | |
840 return found->second->is_processing_task() && | |
841 sequence_token.Equals(found->second->task_sequence_token()); | |
842 } | |
702 } | 843 } |
703 | 844 |
704 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 845 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
705 void SequencedWorkerPool::Inner::CleanupForTesting() { | 846 void SequencedWorkerPool::Inner::CleanupForTesting() { |
706 DCHECK(!RunsTasksOnCurrentThread()); | 847 DCHECK(!RunsTasksOnCurrentThread()); |
707 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 848 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
708 AutoLock lock(lock_); | 849 AutoLock lock(lock_); |
709 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 850 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
710 if (shutdown_called_) | 851 if (shutdown_called_) |
711 return; | 852 return; |
(...skipping 13 matching lines...) Expand all Loading... | |
725 void SequencedWorkerPool::Inner::Shutdown( | 866 void SequencedWorkerPool::Inner::Shutdown( |
726 int max_new_blocking_tasks_after_shutdown) { | 867 int max_new_blocking_tasks_after_shutdown) { |
727 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); | 868 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
728 { | 869 { |
729 AutoLock lock(lock_); | 870 AutoLock lock(lock_); |
730 // Cleanup and Shutdown should not be called concurrently. | 871 // Cleanup and Shutdown should not be called concurrently. |
731 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 872 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
732 if (shutdown_called_) | 873 if (shutdown_called_) |
733 return; | 874 return; |
734 shutdown_called_ = true; | 875 shutdown_called_ = true; |
876 | |
877 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) | |
878 return; | |
879 | |
735 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 880 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
736 | 881 |
737 // Tickle the threads. This will wake up a waiting one so it will know that | 882 // Tickle the threads. This will wake up a waiting one so it will know that |
738 // it can exit, which in turn will wake up any other waiting ones. | 883 // it can exit, which in turn will wake up any other waiting ones. |
739 SignalHasWork(); | 884 SignalHasWork(); |
740 | 885 |
741 // There are no pending or running tasks blocking shutdown, we're done. | 886 // There are no pending or running tasks blocking shutdown, we're done. |
742 if (CanShutdown()) | 887 if (CanShutdown()) |
743 return; | 888 return; |
744 } | 889 } |
(...skipping 19 matching lines...) Expand all Loading... | |
764 TimeTicks::Now() - shutdown_wait_begin); | 909 TimeTicks::Now() - shutdown_wait_begin); |
765 #endif | 910 #endif |
766 } | 911 } |
767 | 912 |
768 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { | 913 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { |
769 AutoLock lock(lock_); | 914 AutoLock lock(lock_); |
770 return shutdown_called_; | 915 return shutdown_called_; |
771 } | 916 } |
772 | 917 |
773 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 918 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
919 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
774 { | 920 { |
775 AutoLock lock(lock_); | 921 AutoLock lock(lock_); |
776 DCHECK(thread_being_created_); | 922 DCHECK(thread_being_created_); |
777 thread_being_created_ = false; | 923 thread_being_created_ = false; |
778 auto result = threads_.insert( | 924 auto result = threads_.insert( |
779 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); | 925 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); |
780 DCHECK(result.second); | 926 DCHECK(result.second); |
781 | 927 |
782 while (true) { | 928 while (true) { |
783 #if defined(OS_MACOSX) | 929 #if defined(OS_MACOSX) |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
898 | 1044 |
899 // We noticed we should exit. Wake up the next worker so it knows it should | 1045 // We noticed we should exit. Wake up the next worker so it knows it should |
900 // exit as well (because the Shutdown() code only signals once). | 1046 // exit as well (because the Shutdown() code only signals once). |
901 SignalHasWork(); | 1047 SignalHasWork(); |
902 | 1048 |
903 // Possibly unblock shutdown. | 1049 // Possibly unblock shutdown. |
904 can_shutdown_cv_.Signal(); | 1050 can_shutdown_cv_.Signal(); |
905 } | 1051 } |
906 | 1052 |
907 void SequencedWorkerPool::Inner::HandleCleanup() { | 1053 void SequencedWorkerPool::Inner::HandleCleanup() { |
1054 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | |
1055 | |
908 lock_.AssertAcquired(); | 1056 lock_.AssertAcquired(); |
909 if (cleanup_state_ == CLEANUP_DONE) | 1057 if (cleanup_state_ == CLEANUP_DONE) |
910 return; | 1058 return; |
911 if (cleanup_state_ == CLEANUP_REQUESTED) { | 1059 if (cleanup_state_ == CLEANUP_REQUESTED) { |
912 // We win, we get to do the cleanup as soon as the others wise up and idle. | 1060 // We win, we get to do the cleanup as soon as the others wise up and idle. |
913 cleanup_state_ = CLEANUP_STARTING; | 1061 cleanup_state_ = CLEANUP_STARTING; |
914 while (thread_being_created_ || | 1062 while (thread_being_created_ || |
915 cleanup_idlers_ != threads_.size() - 1) { | 1063 cleanup_idlers_ != threads_.size() - 1) { |
916 has_work_cv_.Signal(); | 1064 has_work_cv_.Signal(); |
917 cleanup_cv_.Wait(); | 1065 cleanup_cv_.Wait(); |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
963 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 1111 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
964 lock_.AssertAcquired(); | 1112 lock_.AssertAcquired(); |
965 // We assume that we never create enough tasks to wrap around. | 1113 // We assume that we never create enough tasks to wrap around. |
966 return next_sequence_task_number_++; | 1114 return next_sequence_task_number_++; |
967 } | 1115 } |
968 | 1116 |
969 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 1117 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
970 SequencedTask* task, | 1118 SequencedTask* task, |
971 TimeDelta* wait_time, | 1119 TimeDelta* wait_time, |
972 std::vector<Closure>* delete_these_outside_lock) { | 1120 std::vector<Closure>* delete_these_outside_lock) { |
1121 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | |
1122 | |
973 lock_.AssertAcquired(); | 1123 lock_.AssertAcquired(); |
974 | 1124 |
975 // Find the next task with a sequence token that's not currently in use. | 1125 // Find the next task with a sequence token that's not currently in use. |
976 // If the token is in use, that means another thread is running something | 1126 // If the token is in use, that means another thread is running something |
977 // in that sequence, and we can't run it without going out-of-order. | 1127 // in that sequence, and we can't run it without going out-of-order. |
978 // | 1128 // |
979 // This algorithm is simple and fair, but inefficient in some cases. For | 1129 // This algorithm is simple and fair, but inefficient in some cases. For |
980 // example, say somebody schedules 1000 slow tasks with the same sequence | 1130 // example, say somebody schedules 1000 slow tasks with the same sequence |
981 // number. We'll have to go through all those tasks each time we feel like | 1131 // number. We'll have to go through all those tasks each time we feel like |
982 // there might be work to schedule. If this proves to be a problem, we | 1132 // there might be work to schedule. If this proves to be a problem, we |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1049 } | 1199 } |
1050 | 1200 |
1051 status = GET_WORK_FOUND; | 1201 status = GET_WORK_FOUND; |
1052 break; | 1202 break; |
1053 } | 1203 } |
1054 | 1204 |
1055 return status; | 1205 return status; |
1056 } | 1206 } |
1057 | 1207 |
1058 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { | 1208 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
1209 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | |
1210 | |
1059 lock_.AssertAcquired(); | 1211 lock_.AssertAcquired(); |
1060 | 1212 |
1061 // Mark the task's sequence number as in use. | 1213 // Mark the task's sequence number as in use. |
1062 if (task.sequence_token_id) | 1214 if (task.sequence_token_id) |
1063 current_sequences_.insert(task.sequence_token_id); | 1215 current_sequences_.insert(task.sequence_token_id); |
1064 | 1216 |
1065 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN | 1217 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN |
1066 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread | 1218 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread |
1067 // completes. | 1219 // completes. |
1068 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) | 1220 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) |
(...skipping 11 matching lines...) Expand all Loading... | |
1080 // if there is one waiting to pick up the next task. | 1232 // if there is one waiting to pick up the next task. |
1081 // | 1233 // |
1082 // Note that we really need to do this *before* running the task, not | 1234 // Note that we really need to do this *before* running the task, not |
1083 // after. Otherwise, if more than one task is posted, the creation of the | 1235 // after. Otherwise, if more than one task is posted, the creation of the |
1084 // second thread (since we only create one at a time) will be blocked by | 1236 // second thread (since we only create one at a time) will be blocked by |
1085 // the execution of the first task, which could be arbitrarily long. | 1237 // the execution of the first task, which could be arbitrarily long. |
1086 return PrepareToStartAdditionalThreadIfHelpful(); | 1238 return PrepareToStartAdditionalThreadIfHelpful(); |
1087 } | 1239 } |
1088 | 1240 |
1089 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 1241 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
1242 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | |
1243 | |
1090 lock_.AssertAcquired(); | 1244 lock_.AssertAcquired(); |
1091 | 1245 |
1092 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { | 1246 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
1093 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 1247 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
1094 blocking_shutdown_thread_count_--; | 1248 blocking_shutdown_thread_count_--; |
1095 } | 1249 } |
1096 | 1250 |
1097 if (task.sequence_token_id) | 1251 if (task.sequence_token_id) |
1098 current_sequences_.erase(task.sequence_token_id); | 1252 current_sequences_.erase(task.sequence_token_id); |
1099 } | 1253 } |
1100 | 1254 |
1101 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 1255 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
1102 int sequence_token_id) const { | 1256 int sequence_token_id) const { |
1257 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
1258 | |
1103 lock_.AssertAcquired(); | 1259 lock_.AssertAcquired(); |
1104 return !sequence_token_id || | 1260 return !sequence_token_id || |
1105 current_sequences_.find(sequence_token_id) == | 1261 current_sequences_.find(sequence_token_id) == |
1106 current_sequences_.end(); | 1262 current_sequences_.end(); |
1107 } | 1263 } |
1108 | 1264 |
1109 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { | 1265 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
1266 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
1267 | |
1110 lock_.AssertAcquired(); | 1268 lock_.AssertAcquired(); |
1111 // How thread creation works: | 1269 // How thread creation works: |
1112 // | 1270 // |
1113 // We'de like to avoid creating threads with the lock held. However, we | 1271 // We'de like to avoid creating threads with the lock held. However, we |
1114 // need to be sure that we have an accurate accounting of the threads for | 1272 // need to be sure that we have an accurate accounting of the threads for |
1115 // proper Joining and deltion on shutdown. | 1273 // proper Joining and deltion on shutdown. |
1116 // | 1274 // |
1117 // We need to figure out if we need another thread with the lock held, which | 1275 // We need to figure out if we need another thread with the lock held, which |
1118 // is what this function does. It then marks us as in the process of creating | 1276 // is what this function does. It then marks us as in the process of creating |
1119 // a thread. When we do shutdown, we wait until the thread_being_created_ | 1277 // a thread. When we do shutdown, we wait until the thread_being_created_ |
(...skipping 30 matching lines...) Expand all Loading... | |
1150 thread_being_created_ = true; | 1308 thread_being_created_ = true; |
1151 return static_cast<int>(threads_.size() + 1); | 1309 return static_cast<int>(threads_.size() + 1); |
1152 } | 1310 } |
1153 } | 1311 } |
1154 } | 1312 } |
1155 return 0; | 1313 return 0; |
1156 } | 1314 } |
1157 | 1315 |
1158 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 1316 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
1159 int thread_number) { | 1317 int thread_number) { |
1318 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
1319 | |
1160 // Called outside of the lock. | 1320 // Called outside of the lock. |
1161 DCHECK_GT(thread_number, 0); | 1321 DCHECK_GT(thread_number, 0); |
1162 | 1322 |
1323 if (g_all_pools_state != AllPoolsState::WORKER_CREATED) { | |
1324 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | |
1325 g_all_pools_state = AllPoolsState::WORKER_CREATED; | |
1326 } | |
1327 | |
1163 // The worker is assigned to the list when the thread actually starts, which | 1328 // The worker is assigned to the list when the thread actually starts, which |
1164 // will manage the memory of the pointer. | 1329 // will manage the memory of the pointer. |
1165 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 1330 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
1166 } | 1331 } |
1167 | 1332 |
1168 void SequencedWorkerPool::Inner::SignalHasWork() { | 1333 void SequencedWorkerPool::Inner::SignalHasWork() { |
1334 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); | |
1335 | |
1169 has_work_cv_.Signal(); | 1336 has_work_cv_.Signal(); |
1170 if (testing_observer_) { | 1337 if (testing_observer_) { |
1171 testing_observer_->OnHasWork(); | 1338 testing_observer_->OnHasWork(); |
1172 } | 1339 } |
1173 } | 1340 } |
1174 | 1341 |
1175 bool SequencedWorkerPool::Inner::CanShutdown() const { | 1342 bool SequencedWorkerPool::Inner::CanShutdown() const { |
1343 DCHECK_EQ(AllPoolsState::WORKER_CREATED, g_all_pools_state); | |
1176 lock_.AssertAcquired(); | 1344 lock_.AssertAcquired(); |
1177 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 1345 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
1178 return !thread_being_created_ && | 1346 return !thread_being_created_ && |
1179 blocking_shutdown_thread_count_ == 0 && | 1347 blocking_shutdown_thread_count_ == 0 && |
1180 blocking_shutdown_pending_task_count_ == 0; | 1348 blocking_shutdown_pending_task_count_ == 0; |
1181 } | 1349 } |
1182 | 1350 |
1183 base::StaticAtomicSequenceNumber | 1351 base::StaticAtomicSequenceNumber |
1184 SequencedWorkerPool::Inner::g_last_sequence_number_; | 1352 SequencedWorkerPool::Inner::g_last_sequence_number_; |
1185 | 1353 |
(...skipping 16 matching lines...) Expand all Loading... | |
1202 // static | 1370 // static |
1203 scoped_refptr<SequencedWorkerPool> | 1371 scoped_refptr<SequencedWorkerPool> |
1204 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { | 1372 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { |
1205 Worker* worker = Worker::GetForCurrentThread(); | 1373 Worker* worker = Worker::GetForCurrentThread(); |
1206 if (!worker) | 1374 if (!worker) |
1207 return nullptr; | 1375 return nullptr; |
1208 | 1376 |
1209 return worker->worker_pool(); | 1377 return worker->worker_pool(); |
1210 } | 1378 } |
1211 | 1379 |
1380 // static | |
1381 void SequencedWorkerPool:: | |
1382 RedirectSequencedWorkerPoolsToTaskSchedulerForProcess() { | |
1383 DCHECK(TaskScheduler::GetInstance()); | |
1384 // Hitting this DCHECK indicates that a task was posted to a | |
1385 // SequencedWorkerPool before the TaskScheduler was initialized and | |
1386 // redirected, posting task to SequencedWorkerPools needs to at least be | |
1387 // delayed until after that point. | |
1388 DCHECK_EQ(AllPoolsState::NONE_ACTIVE, g_all_pools_state); | |
1389 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; | |
1390 } | |
1391 | |
1212 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | 1392 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, |
1213 const std::string& thread_name_prefix, | 1393 const std::string& thread_name_prefix, |
1214 base::TaskPriority task_priority) | 1394 base::TaskPriority task_priority) |
1215 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), | 1395 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), |
1216 inner_(new Inner(this, | 1396 inner_(new Inner(this, |
1217 max_threads, | 1397 max_threads, |
1218 thread_name_prefix, | 1398 thread_name_prefix, |
1219 task_priority, | 1399 task_priority, |
1220 NULL)) {} | 1400 NULL)) {} |
1221 | 1401 |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1358 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1538 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1359 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); | 1539 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); |
1360 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1540 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1361 } | 1541 } |
1362 | 1542 |
1363 bool SequencedWorkerPool::IsShutdownInProgress() { | 1543 bool SequencedWorkerPool::IsShutdownInProgress() { |
1364 return inner_->IsShutdownInProgress(); | 1544 return inner_->IsShutdownInProgress(); |
1365 } | 1545 } |
1366 | 1546 |
1367 } // namespace base | 1547 } // namespace base |
OLD | NEW |