OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
274 SequenceToken sequence_token, | 274 SequenceToken sequence_token, |
275 WorkerShutdown shutdown_behavior, | 275 WorkerShutdown shutdown_behavior, |
276 const tracked_objects::Location& from_here, | 276 const tracked_objects::Location& from_here, |
277 const Closure& task, | 277 const Closure& task, |
278 TimeDelta delay); | 278 TimeDelta delay); |
279 | 279 |
280 bool RunsTasksOnCurrentThread() const; | 280 bool RunsTasksOnCurrentThread() const; |
281 | 281 |
282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
283 | 283 |
284 void FlushForTesting(); | 284 void CleanupForTesting(); |
285 | 285 |
286 void SignalHasWorkForTesting(); | 286 void SignalHasWorkForTesting(); |
287 | 287 |
288 int GetWorkSignalCountForTesting() const; | 288 int GetWorkSignalCountForTesting() const; |
289 | 289 |
290 void Shutdown(int max_blocking_tasks_after_shutdown); | 290 void Shutdown(int max_blocking_tasks_after_shutdown); |
291 | 291 |
292 // Runs the worker loop on the background thread. | 292 // Runs the worker loop on the background thread. |
293 void ThreadLoop(Worker* this_worker); | 293 void ThreadLoop(Worker* this_worker); |
294 | 294 |
295 private: | 295 private: |
296 enum GetWorkStatus { | 296 enum GetWorkStatus { |
297 GET_WORK_FOUND, | 297 GET_WORK_FOUND, |
298 GET_WORK_NOT_FOUND, | 298 GET_WORK_NOT_FOUND, |
299 GET_WORK_WAIT, | 299 GET_WORK_WAIT, |
300 }; | 300 }; |
301 | 301 |
302 // Returns whether there are no more pending tasks and all threads | 302 enum CleanupState { |
303 // are idle. Must be called under lock. | 303 CLEANUP_REQUESTED, |
304 bool IsIdle() const; | 304 CLEANUP_STARTING, |
305 CLEANUP_RUNNING, | |
306 CLEANUP_FINISHING, | |
307 CLEANUP_DONE, | |
308 }; | |
305 | 309 |
306 // Called from within the lock, this converts the given token name into a | 310 // Called from within the lock, this converts the given token name into a |
307 // token ID, creating a new one if necessary. | 311 // token ID, creating a new one if necessary. |
308 int LockedGetNamedTokenID(const std::string& name); | 312 int LockedGetNamedTokenID(const std::string& name); |
309 | 313 |
310 // Called from within the lock, this returns the next sequence task number. | 314 // Called from within the lock, this returns the next sequence task number. |
311 int64 LockedGetNextSequenceTaskNumber(); | 315 int64 LockedGetNextSequenceTaskNumber(); |
312 | 316 |
313 // Called from within the lock, returns the shutdown behavior of the task | 317 // Called from within the lock, returns the shutdown behavior of the task |
314 // running on the currently executing worker thread. If invoked from a thread | 318 // running on the currently executing worker thread. If invoked from a thread |
(...skipping 12 matching lines...) Expand all Loading... | |
327 // filled in the time to wait until the next task to run. In this case, the | 331 // filled in the time to wait until the next task to run. In this case, the |
328 // caller should wait the time. | 332 // caller should wait the time. |
329 // | 333 // |
330 // In any case, the calling code should clear the given | 334 // In any case, the calling code should clear the given |
331 // delete_these_outside_lock vector the next time the lock is released. | 335 // delete_these_outside_lock vector the next time the lock is released. |
332 // See the implementation for a more detailed description. | 336 // See the implementation for a more detailed description. |
333 GetWorkStatus GetWork(SequencedTask* task, | 337 GetWorkStatus GetWork(SequencedTask* task, |
334 TimeDelta* wait_time, | 338 TimeDelta* wait_time, |
335 std::vector<Closure>* delete_these_outside_lock); | 339 std::vector<Closure>* delete_these_outside_lock); |
336 | 340 |
341 void HandleCleanup(); | |
342 | |
337 // Peforms init and cleanup around running the given task. WillRun... | 343 // Peforms init and cleanup around running the given task. WillRun... |
338 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 344 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
339 // The calling code should call FinishStartingAdditionalThread once the | 345 // The calling code should call FinishStartingAdditionalThread once the |
340 // lock is released if the return values is nonzero. | 346 // lock is released if the return values is nonzero. |
341 int WillRunWorkerTask(const SequencedTask& task); | 347 int WillRunWorkerTask(const SequencedTask& task); |
342 void DidRunWorkerTask(const SequencedTask& task); | 348 void DidRunWorkerTask(const SequencedTask& task); |
343 | 349 |
344 // Returns true if there are no threads currently running the given | 350 // Returns true if there are no threads currently running the given |
345 // sequence token. | 351 // sequence token. |
346 bool IsSequenceTokenRunnable(int sequence_token_id) const; | 352 bool IsSequenceTokenRunnable(int sequence_token_id) const; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
382 // This lock protects |everything in this class|. Do not read or modify | 388 // This lock protects |everything in this class|. Do not read or modify |
383 // anything without holding this lock. Do not block while holding this | 389 // anything without holding this lock. Do not block while holding this |
384 // lock. | 390 // lock. |
385 mutable Lock lock_; | 391 mutable Lock lock_; |
386 | 392 |
387 // Condition variable that is waited on by worker threads until new | 393 // Condition variable that is waited on by worker threads until new |
388 // tasks are posted or shutdown starts. | 394 // tasks are posted or shutdown starts. |
389 ConditionVariable has_work_cv_; | 395 ConditionVariable has_work_cv_; |
390 | 396 |
391 // Condition variable that is waited on by non-worker threads (in | 397 // Condition variable that is waited on by non-worker threads (in |
392 // FlushForTesting()) until IsIdle() goes to true. | |
393 ConditionVariable is_idle_cv_; | |
394 | |
395 // Condition variable that is waited on by non-worker threads (in | |
396 // Shutdown()) until CanShutdown() goes to true. | 398 // Shutdown()) until CanShutdown() goes to true. |
397 ConditionVariable can_shutdown_cv_; | 399 ConditionVariable can_shutdown_cv_; |
398 | 400 |
399 // The maximum number of worker threads we'll create. | 401 // The maximum number of worker threads we'll create. |
400 const size_t max_threads_; | 402 const size_t max_threads_; |
401 | 403 |
402 const std::string thread_name_prefix_; | 404 const std::string thread_name_prefix_; |
403 | 405 |
404 // Associates all known sequence token names with their IDs. | 406 // Associates all known sequence token names with their IDs. |
405 std::map<std::string, int> named_sequence_tokens_; | 407 std::map<std::string, int> named_sequence_tokens_; |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
443 int trace_id_; | 445 int trace_id_; |
444 | 446 |
445 // Set when Shutdown is called and no further tasks should be | 447 // Set when Shutdown is called and no further tasks should be |
446 // allowed, though we may still be running existing tasks. | 448 // allowed, though we may still be running existing tasks. |
447 bool shutdown_called_; | 449 bool shutdown_called_; |
448 | 450 |
449 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() | 451 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() |
450 // has been called. | 452 // has been called. |
451 int max_blocking_tasks_after_shutdown_; | 453 int max_blocking_tasks_after_shutdown_; |
452 | 454 |
455 // State used to cleanup for testing, all guarded by lock_. | |
456 CleanupState cleanup_state_; | |
457 size_t cleanup_idlers_; | |
458 ConditionVariable cleanup_cv_; | |
459 | |
453 TestingObserver* const testing_observer_; | 460 TestingObserver* const testing_observer_; |
454 | 461 |
455 DISALLOW_COPY_AND_ASSIGN(Inner); | 462 DISALLOW_COPY_AND_ASSIGN(Inner); |
456 }; | 463 }; |
457 | 464 |
458 // Worker definitions --------------------------------------------------------- | 465 // Worker definitions --------------------------------------------------------- |
459 | 466 |
460 SequencedWorkerPool::Worker::Worker( | 467 SequencedWorkerPool::Worker::Worker( |
461 const scoped_refptr<SequencedWorkerPool>& worker_pool, | 468 const scoped_refptr<SequencedWorkerPool>& worker_pool, |
462 int thread_number, | 469 int thread_number, |
(...skipping 23 matching lines...) Expand all Loading... | |
486 | 493 |
487 SequencedWorkerPool::Inner::Inner( | 494 SequencedWorkerPool::Inner::Inner( |
488 SequencedWorkerPool* worker_pool, | 495 SequencedWorkerPool* worker_pool, |
489 size_t max_threads, | 496 size_t max_threads, |
490 const std::string& thread_name_prefix, | 497 const std::string& thread_name_prefix, |
491 TestingObserver* observer) | 498 TestingObserver* observer) |
492 : worker_pool_(worker_pool), | 499 : worker_pool_(worker_pool), |
493 last_sequence_number_(0), | 500 last_sequence_number_(0), |
494 lock_(), | 501 lock_(), |
495 has_work_cv_(&lock_), | 502 has_work_cv_(&lock_), |
496 is_idle_cv_(&lock_), | |
497 can_shutdown_cv_(&lock_), | 503 can_shutdown_cv_(&lock_), |
498 max_threads_(max_threads), | 504 max_threads_(max_threads), |
499 thread_name_prefix_(thread_name_prefix), | 505 thread_name_prefix_(thread_name_prefix), |
500 thread_being_created_(false), | 506 thread_being_created_(false), |
501 waiting_thread_count_(0), | 507 waiting_thread_count_(0), |
502 blocking_shutdown_thread_count_(0), | 508 blocking_shutdown_thread_count_(0), |
503 next_sequence_task_number_(0), | 509 next_sequence_task_number_(0), |
504 blocking_shutdown_pending_task_count_(0), | 510 blocking_shutdown_pending_task_count_(0), |
505 trace_id_(0), | 511 trace_id_(0), |
506 shutdown_called_(false), | 512 shutdown_called_(false), |
507 max_blocking_tasks_after_shutdown_(0), | 513 max_blocking_tasks_after_shutdown_(0), |
514 cleanup_state_(CLEANUP_DONE), | |
515 cleanup_idlers_(0), | |
516 cleanup_cv_(&lock_), | |
508 testing_observer_(observer) {} | 517 testing_observer_(observer) {} |
509 | 518 |
510 SequencedWorkerPool::Inner::~Inner() { | 519 SequencedWorkerPool::Inner::~Inner() { |
511 // You must call Shutdown() before destroying the pool. | 520 // You must call Shutdown() before destroying the pool. |
512 DCHECK(shutdown_called_); | 521 DCHECK(shutdown_called_); |
513 | 522 |
514 // Need to explicitly join with the threads before they're destroyed or else | 523 // Need to explicitly join with the threads before they're destroyed or else |
515 // they will be running when our object is half torn down. | 524 // they will be running when our object is half torn down. |
516 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
517 it->second->Join(); | 526 it->second->Join(); |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
602 | 611 |
603 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
604 SequenceToken sequence_token) const { | 613 SequenceToken sequence_token) const { |
605 AutoLock lock(lock_); | 614 AutoLock lock(lock_); |
606 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
607 if (found == threads_.end()) | 616 if (found == threads_.end()) |
608 return false; | 617 return false; |
609 return found->second->running_sequence().Equals(sequence_token); | 618 return found->second->running_sequence().Equals(sequence_token); |
610 } | 619 } |
611 | 620 |
612 void SequencedWorkerPool::Inner::FlushForTesting() { | 621 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
622 void SequencedWorkerPool::Inner::CleanupForTesting() { | |
623 DCHECK(!RunsTasksOnCurrentThread()); | |
624 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
613 AutoLock lock(lock_); | 625 AutoLock lock(lock_); |
614 while (!IsIdle()) | 626 if (shutdown_called_) |
615 is_idle_cv_.Wait(); | 627 return; |
628 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | |
629 if (!thread_being_created_ && threads_.empty()) | |
630 return; | |
631 cleanup_state_ = CLEANUP_REQUESTED; | |
632 cleanup_idlers_ = 0; | |
633 has_work_cv_.Signal(); | |
634 while (cleanup_state_ != CLEANUP_DONE) | |
635 cleanup_cv_.Wait(); | |
616 } | 636 } |
617 | 637 |
618 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { | 638 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
619 SignalHasWork(); | 639 SignalHasWork(); |
620 } | 640 } |
621 | 641 |
622 void SequencedWorkerPool::Inner::Shutdown( | 642 void SequencedWorkerPool::Inner::Shutdown( |
623 int max_new_blocking_tasks_after_shutdown) { | 643 int max_new_blocking_tasks_after_shutdown) { |
624 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); | 644 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
625 { | 645 { |
626 AutoLock lock(lock_); | 646 AutoLock lock(lock_); |
627 | |
628 if (shutdown_called_) | 647 if (shutdown_called_) |
629 return; | 648 return; |
649 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | |
630 shutdown_called_ = true; | 650 shutdown_called_ = true; |
631 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 651 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
632 | 652 |
633 // Tickle the threads. This will wake up a waiting one so it will know that | 653 // Tickle the threads. This will wake up a waiting one so it will know that |
634 // it can exit, which in turn will wake up any other waiting ones. | 654 // it can exit, which in turn will wake up any other waiting ones. |
635 SignalHasWork(); | 655 SignalHasWork(); |
636 | 656 |
637 // There are no pending or running tasks blocking shutdown, we're done. | 657 // There are no pending or running tasks blocking shutdown, we're done. |
638 if (CanShutdown()) | 658 if (CanShutdown()) |
639 return; | 659 return; |
(...skipping 25 matching lines...) Expand all Loading... | |
665 std::pair<ThreadMap::iterator, bool> result = | 685 std::pair<ThreadMap::iterator, bool> result = |
666 threads_.insert( | 686 threads_.insert( |
667 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | 687 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
668 DCHECK(result.second); | 688 DCHECK(result.second); |
669 | 689 |
670 while (true) { | 690 while (true) { |
671 #if defined(OS_MACOSX) | 691 #if defined(OS_MACOSX) |
672 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 692 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
673 #endif | 693 #endif |
674 | 694 |
695 HandleCleanup(); | |
696 | |
675 // See GetWork for what delete_these_outside_lock is doing. | 697 // See GetWork for what delete_these_outside_lock is doing. |
676 SequencedTask task; | 698 SequencedTask task; |
677 TimeDelta wait_time; | 699 TimeDelta wait_time; |
678 std::vector<Closure> delete_these_outside_lock; | 700 std::vector<Closure> delete_these_outside_lock; |
679 GetWorkStatus status = | 701 GetWorkStatus status = |
680 GetWork(&task, &wait_time, &delete_these_outside_lock); | 702 GetWork(&task, &wait_time, &delete_these_outside_lock); |
681 if (status == GET_WORK_FOUND) { | 703 if (status == GET_WORK_FOUND) { |
682 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", | 704 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", |
683 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); | 705 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); |
684 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", | 706 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", |
(...skipping 25 matching lines...) Expand all Loading... | |
710 start_time, tracked_objects::ThreadData::NowForEndOfRun()); | 732 start_time, tracked_objects::ThreadData::NowForEndOfRun()); |
711 | 733 |
712 this_worker->set_running_task_info( | 734 this_worker->set_running_task_info( |
713 SequenceToken(), CONTINUE_ON_SHUTDOWN); | 735 SequenceToken(), CONTINUE_ON_SHUTDOWN); |
714 | 736 |
715 // Make sure our task is erased outside the lock for the same reason | 737 // Make sure our task is erased outside the lock for the same reason |
716 // we do this with delete_these_oustide_lock. | 738 // we do this with delete_these_oustide_lock. |
717 task.task = Closure(); | 739 task.task = Closure(); |
718 } | 740 } |
719 DidRunWorkerTask(task); // Must be done inside the lock. | 741 DidRunWorkerTask(task); // Must be done inside the lock. |
742 } else if (cleanup_state_ == CLEANUP_RUNNING) { | |
743 switch (status) { | |
744 case GET_WORK_WAIT: { | |
745 AutoUnlock unlock(lock_); | |
746 delete_these_outside_lock.clear(); | |
747 } | |
748 break; | |
749 case GET_WORK_NOT_FOUND: | |
750 CHECK(delete_these_outside_lock.empty()); | |
751 cleanup_state_ = CLEANUP_FINISHING; | |
752 cleanup_cv_.Broadcast(); | |
753 break; | |
754 default: | |
755 NOTREACHED(); | |
756 } | |
720 } else { | 757 } else { |
721 // When we're terminating and there's no more work, we can | 758 // When we're terminating and there's no more work, we can |
722 // shut down, other workers can complete any pending or new tasks. | 759 // shut down, other workers can complete any pending or new tasks. |
723 // We can get additional tasks posted after shutdown_called_ is set | 760 // We can get additional tasks posted after shutdown_called_ is set |
724 // but only worker threads are allowed to post tasks at that time, and | 761 // but only worker threads are allowed to post tasks at that time, and |
725 // the workers responsible for posting those tasks will be available | 762 // the workers responsible for posting those tasks will be available |
726 // to run them. Also, there may be some tasks stuck behind running | 763 // to run them. Also, there may be some tasks stuck behind running |
727 // ones with the same sequence token, but additional threads won't | 764 // ones with the same sequence token, but additional threads won't |
728 // help this case. | 765 // help this case. |
729 if (shutdown_called_ && | 766 if (shutdown_called_ && |
730 blocking_shutdown_pending_task_count_ == 0) | 767 blocking_shutdown_pending_task_count_ == 0) |
731 break; | 768 break; |
732 waiting_thread_count_++; | 769 waiting_thread_count_++; |
733 // This is the only time that IsIdle() can go to true. | |
734 if (IsIdle()) | |
735 is_idle_cv_.Signal(); | |
736 | 770 |
737 switch (status) { | 771 switch (status) { |
738 case GET_WORK_NOT_FOUND: | 772 case GET_WORK_NOT_FOUND: |
739 has_work_cv_.Wait(); | 773 has_work_cv_.Wait(); |
740 break; | 774 break; |
741 case GET_WORK_WAIT: | 775 case GET_WORK_WAIT: |
742 has_work_cv_.TimedWait(wait_time); | 776 has_work_cv_.TimedWait(wait_time); |
743 break; | 777 break; |
744 default: | 778 default: |
745 NOTREACHED(); | 779 NOTREACHED(); |
746 } | 780 } |
747 waiting_thread_count_--; | 781 waiting_thread_count_--; |
748 } | 782 } |
749 } | 783 } |
750 } // Release lock_. | 784 } // Release lock_. |
751 | 785 |
752 // We noticed we should exit. Wake up the next worker so it knows it should | 786 // We noticed we should exit. Wake up the next worker so it knows it should |
753 // exit as well (because the Shutdown() code only signals once). | 787 // exit as well (because the Shutdown() code only signals once). |
754 SignalHasWork(); | 788 SignalHasWork(); |
755 | 789 |
756 // Possibly unblock shutdown. | 790 // Possibly unblock shutdown. |
757 can_shutdown_cv_.Signal(); | 791 can_shutdown_cv_.Signal(); |
758 } | 792 } |
759 | 793 |
760 bool SequencedWorkerPool::Inner::IsIdle() const { | 794 void SequencedWorkerPool::Inner::HandleCleanup() { |
michaeln
2013/01/23 00:16:08
The synchronization logic is mostly in here. This
| |
761 lock_.AssertAcquired(); | 795 lock_.AssertAcquired(); |
762 return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); | 796 if (cleanup_state_ == CLEANUP_DONE) |
797 return; | |
798 if (cleanup_state_ == CLEANUP_REQUESTED) { | |
799 // We win, we get to do the cleanup as soon as the others wise up and idle. | |
800 cleanup_state_ = CLEANUP_STARTING; | |
801 while (thread_being_created_ || | |
802 cleanup_idlers_ != threads_.size() - 1) { | |
803 has_work_cv_.Signal(); | |
804 cleanup_cv_.Wait(); | |
805 } | |
806 cleanup_state_ = CLEANUP_RUNNING; | |
807 return; | |
808 } | |
809 if (cleanup_state_ == CLEANUP_STARTING) { | |
810 // Another worker thread is cleaning up, we idle here until thats done. | |
811 ++cleanup_idlers_; | |
812 cleanup_cv_.Broadcast(); | |
813 while (cleanup_state_ != CLEANUP_FINISHING) { | |
814 cleanup_cv_.Wait(); | |
815 } | |
816 --cleanup_idlers_; | |
817 cleanup_cv_.Broadcast(); | |
818 return; | |
819 } | |
820 if (cleanup_state_ == CLEANUP_FINISHING) { | |
821 // We wait for all idlers to wake up prior to being DONE. | |
822 while (cleanup_idlers_ != 0) { | |
823 cleanup_cv_.Broadcast(); | |
824 cleanup_cv_.Wait(); | |
825 } | |
826 if (cleanup_state_ == CLEANUP_FINISHING) { | |
827 cleanup_state_ = CLEANUP_DONE; | |
828 cleanup_cv_.Signal(); | |
829 } | |
830 return; | |
831 } | |
763 } | 832 } |
764 | 833 |
765 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( | 834 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
766 const std::string& name) { | 835 const std::string& name) { |
767 lock_.AssertAcquired(); | 836 lock_.AssertAcquired(); |
768 DCHECK(!name.empty()); | 837 DCHECK(!name.empty()); |
769 | 838 |
770 std::map<std::string, int>::const_iterator found = | 839 std::map<std::string, int>::const_iterator found = |
771 named_sequence_tokens_.find(name); | 840 named_sequence_tokens_.find(name); |
772 if (found != named_sequence_tokens_.end()) | 841 if (found != named_sequence_tokens_.end()) |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
835 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { | 904 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { |
836 unrunnable_tasks++; | 905 unrunnable_tasks++; |
837 ++i; | 906 ++i; |
838 continue; | 907 continue; |
839 } | 908 } |
840 | 909 |
841 if (i->time_to_run > current_time) { | 910 if (i->time_to_run > current_time) { |
842 // The time to run has not come yet. | 911 // The time to run has not come yet. |
843 *wait_time = i->time_to_run - current_time; | 912 *wait_time = i->time_to_run - current_time; |
844 status = GET_WORK_WAIT; | 913 status = GET_WORK_WAIT; |
914 if (cleanup_state_ == CLEANUP_RUNNING) { | |
915 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | |
916 delete_these_outside_lock->push_back(i->task); | |
917 pending_tasks_.erase(i); | |
918 } | |
845 break; | 919 break; |
846 } | 920 } |
847 | 921 |
848 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 922 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
849 // We're shutting down and the task we just found isn't blocking | 923 // We're shutting down and the task we just found isn't blocking |
850 // shutdown. Delete it and get more work. | 924 // shutdown. Delete it and get more work. |
851 // | 925 // |
852 // Note that we do not want to delete unrunnable tasks. Deleting a task | 926 // Note that we do not want to delete unrunnable tasks. Deleting a task |
853 // can have side effects (like freeing some objects) and deleting a | 927 // can have side effects (like freeing some objects) and deleting a |
854 // task that's supposed to run after one that's currently running could | 928 // task that's supposed to run after one that's currently running could |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
961 // cause a worker to start since one is pending. | 1035 // cause a worker to start since one is pending. |
962 // 3. Main thread initiates shutdown. | 1036 // 3. Main thread initiates shutdown. |
963 // 4. No more threads are created since the shutdown_called_ flag is set. | 1037 // 4. No more threads are created since the shutdown_called_ flag is set. |
964 // | 1038 // |
965 // The result is that one may expect that max_threads_ workers to be created | 1039 // The result is that one may expect that max_threads_ workers to be created |
966 // given the workload, but in reality fewer may be created because the | 1040 // given the workload, but in reality fewer may be created because the |
967 // sequence of thread creation on the background threads is racing with the | 1041 // sequence of thread creation on the background threads is racing with the |
968 // shutdown call. | 1042 // shutdown call. |
969 if (!shutdown_called_ && | 1043 if (!shutdown_called_ && |
970 !thread_being_created_ && | 1044 !thread_being_created_ && |
1045 cleanup_state_ == CLEANUP_DONE && | |
971 threads_.size() < max_threads_ && | 1046 threads_.size() < max_threads_ && |
972 waiting_thread_count_ == 0) { | 1047 waiting_thread_count_ == 0) { |
973 // We could use an additional thread if there's work to be done. | 1048 // We could use an additional thread if there's work to be done. |
974 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); | 1049 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); |
975 i != pending_tasks_.end(); ++i) { | 1050 i != pending_tasks_.end(); ++i) { |
976 if (IsSequenceTokenRunnable(i->sequence_token_id)) { | 1051 if (IsSequenceTokenRunnable(i->sequence_token_id)) { |
977 // Found a runnable task, mark the thread as being started. | 1052 // Found a runnable task, mark the thread as being started. |
978 thread_being_created_ = true; | 1053 thread_being_created_ = true; |
979 return static_cast<int>(threads_.size() + 1); | 1054 return static_cast<int>(threads_.size() + 1); |
980 } | 1055 } |
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1139 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1214 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
1140 return inner_->RunsTasksOnCurrentThread(); | 1215 return inner_->RunsTasksOnCurrentThread(); |
1141 } | 1216 } |
1142 | 1217 |
1143 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1218 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1144 SequenceToken sequence_token) const { | 1219 SequenceToken sequence_token) const { |
1145 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1220 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1146 } | 1221 } |
1147 | 1222 |
1148 void SequencedWorkerPool::FlushForTesting() { | 1223 void SequencedWorkerPool::FlushForTesting() { |
1149 inner_->FlushForTesting(); | 1224 inner_->CleanupForTesting(); |
1150 } | 1225 } |
1151 | 1226 |
1152 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1227 void SequencedWorkerPool::SignalHasWorkForTesting() { |
1153 inner_->SignalHasWorkForTesting(); | 1228 inner_->SignalHasWorkForTesting(); |
1154 } | 1229 } |
1155 | 1230 |
1156 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1231 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1157 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1232 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
1158 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1233 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1159 } | 1234 } |
1160 | 1235 |
1161 } // namespace base | 1236 } // namespace base |
OLD | NEW |