Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <list> | 7 #include <list> |
| 8 #include <map> | 8 #include <map> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 274 SequenceToken sequence_token, | 274 SequenceToken sequence_token, |
| 275 WorkerShutdown shutdown_behavior, | 275 WorkerShutdown shutdown_behavior, |
| 276 const tracked_objects::Location& from_here, | 276 const tracked_objects::Location& from_here, |
| 277 const Closure& task, | 277 const Closure& task, |
| 278 TimeDelta delay); | 278 TimeDelta delay); |
| 279 | 279 |
| 280 bool RunsTasksOnCurrentThread() const; | 280 bool RunsTasksOnCurrentThread() const; |
| 281 | 281 |
| 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| 283 | 283 |
| 284 void FlushForTesting(); | 284 void CleanupForTesting(); |
|
akalin
2013/02/26 00:12:23
any reason for this rename? perhaps make it consis
| |
| 285 | 285 |
| 286 void SignalHasWorkForTesting(); | 286 void SignalHasWorkForTesting(); |
| 287 | 287 |
| 288 int GetWorkSignalCountForTesting() const; | 288 int GetWorkSignalCountForTesting() const; |
| 289 | 289 |
| 290 void Shutdown(int max_blocking_tasks_after_shutdown); | 290 void Shutdown(int max_blocking_tasks_after_shutdown); |
| 291 | 291 |
| 292 // Runs the worker loop on the background thread. | 292 // Runs the worker loop on the background thread. |
| 293 void ThreadLoop(Worker* this_worker); | 293 void ThreadLoop(Worker* this_worker); |
| 294 | 294 |
| 295 private: | 295 private: |
| 296 enum GetWorkStatus { | 296 enum GetWorkStatus { |
| 297 GET_WORK_FOUND, | 297 GET_WORK_FOUND, |
| 298 GET_WORK_NOT_FOUND, | 298 GET_WORK_NOT_FOUND, |
| 299 GET_WORK_WAIT, | 299 GET_WORK_WAIT, |
| 300 }; | 300 }; |
| 301 | 301 |
| 302 // Returns whether there are no more pending tasks and all threads | 302 enum CleanupState { |
| 303 // are idle. Must be called under lock. | 303 CLEANUP_REQUESTED, |
| 304 bool IsIdle() const; | 304 CLEANUP_STARTING, |
| 305 CLEANUP_RUNNING, | |
| 306 CLEANUP_FINISHING, | |
| 307 CLEANUP_DONE, | |
| 308 }; | |
| 305 | 309 |
| 306 // Called from within the lock, this converts the given token name into a | 310 // Called from within the lock, this converts the given token name into a |
| 307 // token ID, creating a new one if necessary. | 311 // token ID, creating a new one if necessary. |
| 308 int LockedGetNamedTokenID(const std::string& name); | 312 int LockedGetNamedTokenID(const std::string& name); |
| 309 | 313 |
| 310 // Called from within the lock, this returns the next sequence task number. | 314 // Called from within the lock, this returns the next sequence task number. |
| 311 int64 LockedGetNextSequenceTaskNumber(); | 315 int64 LockedGetNextSequenceTaskNumber(); |
| 312 | 316 |
| 313 // Called from within the lock, returns the shutdown behavior of the task | 317 // Called from within the lock, returns the shutdown behavior of the task |
| 314 // running on the currently executing worker thread. If invoked from a thread | 318 // running on the currently executing worker thread. If invoked from a thread |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 327 // filled in the time to wait until the next task to run. In this case, the | 331 // filled in the time to wait until the next task to run. In this case, the |
| 328 // caller should wait the time. | 332 // caller should wait the time. |
| 329 // | 333 // |
| 330 // In any case, the calling code should clear the given | 334 // In any case, the calling code should clear the given |
| 331 // delete_these_outside_lock vector the next time the lock is released. | 335 // delete_these_outside_lock vector the next time the lock is released. |
| 332 // See the implementation for a more detailed description. | 336 // See the implementation for a more detailed description. |
| 333 GetWorkStatus GetWork(SequencedTask* task, | 337 GetWorkStatus GetWork(SequencedTask* task, |
| 334 TimeDelta* wait_time, | 338 TimeDelta* wait_time, |
| 335 std::vector<Closure>* delete_these_outside_lock); | 339 std::vector<Closure>* delete_these_outside_lock); |
| 336 | 340 |
| 341 void HandleCleanup(); | |
| 342 | |
| 337 // Peforms init and cleanup around running the given task. WillRun... | 343 // Peforms init and cleanup around running the given task. WillRun... |
| 338 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 344 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| 339 // The calling code should call FinishStartingAdditionalThread once the | 345 // The calling code should call FinishStartingAdditionalThread once the |
| 340 // lock is released if the return values is nonzero. | 346 // lock is released if the return values is nonzero. |
| 341 int WillRunWorkerTask(const SequencedTask& task); | 347 int WillRunWorkerTask(const SequencedTask& task); |
| 342 void DidRunWorkerTask(const SequencedTask& task); | 348 void DidRunWorkerTask(const SequencedTask& task); |
| 343 | 349 |
| 344 // Returns true if there are no threads currently running the given | 350 // Returns true if there are no threads currently running the given |
| 345 // sequence token. | 351 // sequence token. |
| 346 bool IsSequenceTokenRunnable(int sequence_token_id) const; | 352 bool IsSequenceTokenRunnable(int sequence_token_id) const; |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 382 // This lock protects |everything in this class|. Do not read or modify | 388 // This lock protects |everything in this class|. Do not read or modify |
| 383 // anything without holding this lock. Do not block while holding this | 389 // anything without holding this lock. Do not block while holding this |
| 384 // lock. | 390 // lock. |
| 385 mutable Lock lock_; | 391 mutable Lock lock_; |
| 386 | 392 |
| 387 // Condition variable that is waited on by worker threads until new | 393 // Condition variable that is waited on by worker threads until new |
| 388 // tasks are posted or shutdown starts. | 394 // tasks are posted or shutdown starts. |
| 389 ConditionVariable has_work_cv_; | 395 ConditionVariable has_work_cv_; |
| 390 | 396 |
| 391 // Condition variable that is waited on by non-worker threads (in | 397 // Condition variable that is waited on by non-worker threads (in |
| 392 // FlushForTesting()) until IsIdle() goes to true. | |
| 393 ConditionVariable is_idle_cv_; | |
| 394 | |
| 395 // Condition variable that is waited on by non-worker threads (in | |
| 396 // Shutdown()) until CanShutdown() goes to true. | 398 // Shutdown()) until CanShutdown() goes to true. |
| 397 ConditionVariable can_shutdown_cv_; | 399 ConditionVariable can_shutdown_cv_; |
| 398 | 400 |
| 399 // The maximum number of worker threads we'll create. | 401 // The maximum number of worker threads we'll create. |
| 400 const size_t max_threads_; | 402 const size_t max_threads_; |
| 401 | 403 |
| 402 const std::string thread_name_prefix_; | 404 const std::string thread_name_prefix_; |
| 403 | 405 |
| 404 // Associates all known sequence token names with their IDs. | 406 // Associates all known sequence token names with their IDs. |
| 405 std::map<std::string, int> named_sequence_tokens_; | 407 std::map<std::string, int> named_sequence_tokens_; |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 443 int trace_id_; | 445 int trace_id_; |
| 444 | 446 |
| 445 // Set when Shutdown is called and no further tasks should be | 447 // Set when Shutdown is called and no further tasks should be |
| 446 // allowed, though we may still be running existing tasks. | 448 // allowed, though we may still be running existing tasks. |
| 447 bool shutdown_called_; | 449 bool shutdown_called_; |
| 448 | 450 |
| 449 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() | 451 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() |
| 450 // has been called. | 452 // has been called. |
| 451 int max_blocking_tasks_after_shutdown_; | 453 int max_blocking_tasks_after_shutdown_; |
| 452 | 454 |
| 455 // State used to cleanup for testing, all guarded by lock_. | |
| 456 CleanupState cleanup_state_; | |
| 457 size_t cleanup_idlers_; | |
| 458 ConditionVariable cleanup_cv_; | |
| 459 | |
| 453 TestingObserver* const testing_observer_; | 460 TestingObserver* const testing_observer_; |
| 454 | 461 |
| 455 DISALLOW_COPY_AND_ASSIGN(Inner); | 462 DISALLOW_COPY_AND_ASSIGN(Inner); |
| 456 }; | 463 }; |
| 457 | 464 |
| 458 // Worker definitions --------------------------------------------------------- | 465 // Worker definitions --------------------------------------------------------- |
| 459 | 466 |
| 460 SequencedWorkerPool::Worker::Worker( | 467 SequencedWorkerPool::Worker::Worker( |
| 461 const scoped_refptr<SequencedWorkerPool>& worker_pool, | 468 const scoped_refptr<SequencedWorkerPool>& worker_pool, |
| 462 int thread_number, | 469 int thread_number, |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 486 | 493 |
| 487 SequencedWorkerPool::Inner::Inner( | 494 SequencedWorkerPool::Inner::Inner( |
| 488 SequencedWorkerPool* worker_pool, | 495 SequencedWorkerPool* worker_pool, |
| 489 size_t max_threads, | 496 size_t max_threads, |
| 490 const std::string& thread_name_prefix, | 497 const std::string& thread_name_prefix, |
| 491 TestingObserver* observer) | 498 TestingObserver* observer) |
| 492 : worker_pool_(worker_pool), | 499 : worker_pool_(worker_pool), |
| 493 last_sequence_number_(0), | 500 last_sequence_number_(0), |
| 494 lock_(), | 501 lock_(), |
| 495 has_work_cv_(&lock_), | 502 has_work_cv_(&lock_), |
| 496 is_idle_cv_(&lock_), | |
| 497 can_shutdown_cv_(&lock_), | 503 can_shutdown_cv_(&lock_), |
| 498 max_threads_(max_threads), | 504 max_threads_(max_threads), |
| 499 thread_name_prefix_(thread_name_prefix), | 505 thread_name_prefix_(thread_name_prefix), |
| 500 thread_being_created_(false), | 506 thread_being_created_(false), |
| 501 waiting_thread_count_(0), | 507 waiting_thread_count_(0), |
| 502 blocking_shutdown_thread_count_(0), | 508 blocking_shutdown_thread_count_(0), |
| 503 next_sequence_task_number_(0), | 509 next_sequence_task_number_(0), |
| 504 blocking_shutdown_pending_task_count_(0), | 510 blocking_shutdown_pending_task_count_(0), |
| 505 trace_id_(0), | 511 trace_id_(0), |
| 506 shutdown_called_(false), | 512 shutdown_called_(false), |
| 507 max_blocking_tasks_after_shutdown_(0), | 513 max_blocking_tasks_after_shutdown_(0), |
| 514 cleanup_state_(CLEANUP_DONE), | |
| 515 cleanup_idlers_(0), | |
| 516 cleanup_cv_(&lock_), | |
| 508 testing_observer_(observer) {} | 517 testing_observer_(observer) {} |
| 509 | 518 |
| 510 SequencedWorkerPool::Inner::~Inner() { | 519 SequencedWorkerPool::Inner::~Inner() { |
| 511 // You must call Shutdown() before destroying the pool. | 520 // You must call Shutdown() before destroying the pool. |
| 512 DCHECK(shutdown_called_); | 521 DCHECK(shutdown_called_); |
| 513 | 522 |
| 514 // Need to explicitly join with the threads before they're destroyed or else | 523 // Need to explicitly join with the threads before they're destroyed or else |
| 515 // they will be running when our object is half torn down. | 524 // they will be running when our object is half torn down. |
| 516 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
| 517 it->second->Join(); | 526 it->second->Join(); |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 602 | 611 |
| 603 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 604 SequenceToken sequence_token) const { | 613 SequenceToken sequence_token) const { |
| 605 AutoLock lock(lock_); | 614 AutoLock lock(lock_); |
| 606 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 607 if (found == threads_.end()) | 616 if (found == threads_.end()) |
| 608 return false; | 617 return false; |
| 609 return found->second->running_sequence().Equals(sequence_token); | 618 return found->second->running_sequence().Equals(sequence_token); |
| 610 } | 619 } |
| 611 | 620 |
| 612 void SequencedWorkerPool::Inner::FlushForTesting() { | 621 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 622 void SequencedWorkerPool::Inner::CleanupForTesting() { | |
| 623 DCHECK(!RunsTasksOnCurrentThread()); | |
| 624 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
| 613 AutoLock lock(lock_); | 625 AutoLock lock(lock_); |
| 614 while (!IsIdle()) | 626 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 615 is_idle_cv_.Wait(); | 627 if (shutdown_called_) |
| 628 return; | |
| 629 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | |
| 630 return; | |
| 631 cleanup_state_ = CLEANUP_REQUESTED; | |
| 632 cleanup_idlers_ = 0; | |
| 633 has_work_cv_.Signal(); | |
| 634 while (cleanup_state_ != CLEANUP_DONE) | |
| 635 cleanup_cv_.Wait(); | |
| 616 } | 636 } |
| 617 | 637 |
| 618 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { | 638 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
| 619 SignalHasWork(); | 639 SignalHasWork(); |
| 620 } | 640 } |
| 621 | 641 |
| 622 void SequencedWorkerPool::Inner::Shutdown( | 642 void SequencedWorkerPool::Inner::Shutdown( |
| 623 int max_new_blocking_tasks_after_shutdown) { | 643 int max_new_blocking_tasks_after_shutdown) { |
| 624 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); | 644 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
| 625 { | 645 { |
| 626 AutoLock lock(lock_); | 646 AutoLock lock(lock_); |
| 627 | 647 // Cleanup and Shutdown should not be called concurrently. |
| 648 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | |
| 628 if (shutdown_called_) | 649 if (shutdown_called_) |
| 629 return; | 650 return; |
| 630 shutdown_called_ = true; | 651 shutdown_called_ = true; |
| 631 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 652 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
| 632 | 653 |
| 633 // Tickle the threads. This will wake up a waiting one so it will know that | 654 // Tickle the threads. This will wake up a waiting one so it will know that |
| 634 // it can exit, which in turn will wake up any other waiting ones. | 655 // it can exit, which in turn will wake up any other waiting ones. |
| 635 SignalHasWork(); | 656 SignalHasWork(); |
| 636 | 657 |
| 637 // There are no pending or running tasks blocking shutdown, we're done. | 658 // There are no pending or running tasks blocking shutdown, we're done. |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 665 std::pair<ThreadMap::iterator, bool> result = | 686 std::pair<ThreadMap::iterator, bool> result = |
| 666 threads_.insert( | 687 threads_.insert( |
| 667 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | 688 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
| 668 DCHECK(result.second); | 689 DCHECK(result.second); |
| 669 | 690 |
| 670 while (true) { | 691 while (true) { |
| 671 #if defined(OS_MACOSX) | 692 #if defined(OS_MACOSX) |
| 672 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 693 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
| 673 #endif | 694 #endif |
| 674 | 695 |
| 696 HandleCleanup(); | |
| 697 | |
| 675 // See GetWork for what delete_these_outside_lock is doing. | 698 // See GetWork for what delete_these_outside_lock is doing. |
| 676 SequencedTask task; | 699 SequencedTask task; |
| 677 TimeDelta wait_time; | 700 TimeDelta wait_time; |
| 678 std::vector<Closure> delete_these_outside_lock; | 701 std::vector<Closure> delete_these_outside_lock; |
| 679 GetWorkStatus status = | 702 GetWorkStatus status = |
| 680 GetWork(&task, &wait_time, &delete_these_outside_lock); | 703 GetWork(&task, &wait_time, &delete_these_outside_lock); |
| 681 if (status == GET_WORK_FOUND) { | 704 if (status == GET_WORK_FOUND) { |
| 682 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", | 705 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", |
| 683 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); | 706 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); |
| 684 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", | 707 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 713 // same reason we do this with delete_these_oustide_lock. | 736 // same reason we do this with delete_these_oustide_lock. |
| 714 // Also, do it before calling set_running_task_info() so | 737 // Also, do it before calling set_running_task_info() so |
| 715 // that sequence-checking from within the task's destructor | 738 // that sequence-checking from within the task's destructor |
| 716 // still works. | 739 // still works. |
| 717 task.task = Closure(); | 740 task.task = Closure(); |
| 718 | 741 |
| 719 this_worker->set_running_task_info( | 742 this_worker->set_running_task_info( |
| 720 SequenceToken(), CONTINUE_ON_SHUTDOWN); | 743 SequenceToken(), CONTINUE_ON_SHUTDOWN); |
| 721 } | 744 } |
| 722 DidRunWorkerTask(task); // Must be done inside the lock. | 745 DidRunWorkerTask(task); // Must be done inside the lock. |
| 746 } else if (cleanup_state_ == CLEANUP_RUNNING) { | |
| 747 switch (status) { | |
| 748 case GET_WORK_WAIT: { | |
| 749 AutoUnlock unlock(lock_); | |
| 750 delete_these_outside_lock.clear(); | |
| 751 } | |
| 752 break; | |
| 753 case GET_WORK_NOT_FOUND: | |
| 754 CHECK(delete_these_outside_lock.empty()); | |
| 755 cleanup_state_ = CLEANUP_FINISHING; | |
| 756 cleanup_cv_.Broadcast(); | |
| 757 break; | |
| 758 default: | |
| 759 NOTREACHED(); | |
| 760 } | |
| 723 } else { | 761 } else { |
| 724 // When we're terminating and there's no more work, we can | 762 // When we're terminating and there's no more work, we can |
| 725 // shut down, other workers can complete any pending or new tasks. | 763 // shut down, other workers can complete any pending or new tasks. |
| 726 // We can get additional tasks posted after shutdown_called_ is set | 764 // We can get additional tasks posted after shutdown_called_ is set |
| 727 // but only worker threads are allowed to post tasks at that time, and | 765 // but only worker threads are allowed to post tasks at that time, and |
| 728 // the workers responsible for posting those tasks will be available | 766 // the workers responsible for posting those tasks will be available |
| 729 // to run them. Also, there may be some tasks stuck behind running | 767 // to run them. Also, there may be some tasks stuck behind running |
| 730 // ones with the same sequence token, but additional threads won't | 768 // ones with the same sequence token, but additional threads won't |
| 731 // help this case. | 769 // help this case. |
| 732 if (shutdown_called_ && | 770 if (shutdown_called_ && |
| 733 blocking_shutdown_pending_task_count_ == 0) | 771 blocking_shutdown_pending_task_count_ == 0) |
| 734 break; | 772 break; |
| 735 waiting_thread_count_++; | 773 waiting_thread_count_++; |
| 736 // This is the only time that IsIdle() can go to true. | |
| 737 if (IsIdle()) | |
| 738 is_idle_cv_.Signal(); | |
| 739 | 774 |
| 740 switch (status) { | 775 switch (status) { |
| 741 case GET_WORK_NOT_FOUND: | 776 case GET_WORK_NOT_FOUND: |
| 742 has_work_cv_.Wait(); | 777 has_work_cv_.Wait(); |
| 743 break; | 778 break; |
| 744 case GET_WORK_WAIT: | 779 case GET_WORK_WAIT: |
| 745 has_work_cv_.TimedWait(wait_time); | 780 has_work_cv_.TimedWait(wait_time); |
| 746 break; | 781 break; |
| 747 default: | 782 default: |
| 748 NOTREACHED(); | 783 NOTREACHED(); |
| 749 } | 784 } |
| 750 waiting_thread_count_--; | 785 waiting_thread_count_--; |
| 751 } | 786 } |
| 752 } | 787 } |
| 753 } // Release lock_. | 788 } // Release lock_. |
| 754 | 789 |
| 755 // We noticed we should exit. Wake up the next worker so it knows it should | 790 // We noticed we should exit. Wake up the next worker so it knows it should |
| 756 // exit as well (because the Shutdown() code only signals once). | 791 // exit as well (because the Shutdown() code only signals once). |
| 757 SignalHasWork(); | 792 SignalHasWork(); |
| 758 | 793 |
| 759 // Possibly unblock shutdown. | 794 // Possibly unblock shutdown. |
| 760 can_shutdown_cv_.Signal(); | 795 can_shutdown_cv_.Signal(); |
| 761 } | 796 } |
| 762 | 797 |
| 763 bool SequencedWorkerPool::Inner::IsIdle() const { | 798 void SequencedWorkerPool::Inner::HandleCleanup() { |
| 764 lock_.AssertAcquired(); | 799 lock_.AssertAcquired(); |
| 765 return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); | 800 if (cleanup_state_ == CLEANUP_DONE) |
| 801 return; | |
| 802 if (cleanup_state_ == CLEANUP_REQUESTED) { | |
|
akalin
2013/02/26 00:12:23
sad that cleanup handling became this complex, but
michaeln
2013/02/26 00:33:00
I know... as mentioned a while ago... in some sens
| |
| 803 // We win, we get to do the cleanup as soon as the others wise up and idle. | |
| 804 cleanup_state_ = CLEANUP_STARTING; | |
| 805 while (thread_being_created_ || | |
| 806 cleanup_idlers_ != threads_.size() - 1) { | |
| 807 has_work_cv_.Signal(); | |
| 808 cleanup_cv_.Wait(); | |
| 809 } | |
| 810 cleanup_state_ = CLEANUP_RUNNING; | |
| 811 return; | |
| 812 } | |
| 813 if (cleanup_state_ == CLEANUP_STARTING) { | |
| 814 // Another worker thread is cleaning up, we idle here until thats done. | |
| 815 ++cleanup_idlers_; | |
| 816 cleanup_cv_.Broadcast(); | |
| 817 while (cleanup_state_ != CLEANUP_FINISHING) { | |
| 818 cleanup_cv_.Wait(); | |
| 819 } | |
| 820 --cleanup_idlers_; | |
| 821 cleanup_cv_.Broadcast(); | |
| 822 return; | |
| 823 } | |
| 824 if (cleanup_state_ == CLEANUP_FINISHING) { | |
| 825 // We wait for all idlers to wake up prior to being DONE. | |
| 826 while (cleanup_idlers_ != 0) { | |
| 827 cleanup_cv_.Broadcast(); | |
| 828 cleanup_cv_.Wait(); | |
| 829 } | |
| 830 if (cleanup_state_ == CLEANUP_FINISHING) { | |
| 831 cleanup_state_ = CLEANUP_DONE; | |
| 832 cleanup_cv_.Signal(); | |
| 833 } | |
| 834 return; | |
| 835 } | |
| 766 } | 836 } |
| 767 | 837 |
| 768 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( | 838 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| 769 const std::string& name) { | 839 const std::string& name) { |
| 770 lock_.AssertAcquired(); | 840 lock_.AssertAcquired(); |
| 771 DCHECK(!name.empty()); | 841 DCHECK(!name.empty()); |
| 772 | 842 |
| 773 std::map<std::string, int>::const_iterator found = | 843 std::map<std::string, int>::const_iterator found = |
| 774 named_sequence_tokens_.find(name); | 844 named_sequence_tokens_.find(name); |
| 775 if (found != named_sequence_tokens_.end()) | 845 if (found != named_sequence_tokens_.end()) |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 838 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { | 908 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { |
| 839 unrunnable_tasks++; | 909 unrunnable_tasks++; |
| 840 ++i; | 910 ++i; |
| 841 continue; | 911 continue; |
| 842 } | 912 } |
| 843 | 913 |
| 844 if (i->time_to_run > current_time) { | 914 if (i->time_to_run > current_time) { |
| 845 // The time to run has not come yet. | 915 // The time to run has not come yet. |
| 846 *wait_time = i->time_to_run - current_time; | 916 *wait_time = i->time_to_run - current_time; |
| 847 status = GET_WORK_WAIT; | 917 status = GET_WORK_WAIT; |
| 918 if (cleanup_state_ == CLEANUP_RUNNING) { | |
| 919 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | |
| 920 delete_these_outside_lock->push_back(i->task); | |
| 921 pending_tasks_.erase(i); | |
| 922 } | |
| 848 break; | 923 break; |
| 849 } | 924 } |
| 850 | 925 |
| 851 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 926 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
| 852 // We're shutting down and the task we just found isn't blocking | 927 // We're shutting down and the task we just found isn't blocking |
| 853 // shutdown. Delete it and get more work. | 928 // shutdown. Delete it and get more work. |
| 854 // | 929 // |
| 855 // Note that we do not want to delete unrunnable tasks. Deleting a task | 930 // Note that we do not want to delete unrunnable tasks. Deleting a task |
| 856 // can have side effects (like freeing some objects) and deleting a | 931 // can have side effects (like freeing some objects) and deleting a |
| 857 // task that's supposed to run after one that's currently running could | 932 // task that's supposed to run after one that's currently running could |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 964 // cause a worker to start since one is pending. | 1039 // cause a worker to start since one is pending. |
| 965 // 3. Main thread initiates shutdown. | 1040 // 3. Main thread initiates shutdown. |
| 966 // 4. No more threads are created since the shutdown_called_ flag is set. | 1041 // 4. No more threads are created since the shutdown_called_ flag is set. |
| 967 // | 1042 // |
| 968 // The result is that one may expect that max_threads_ workers to be created | 1043 // The result is that one may expect that max_threads_ workers to be created |
| 969 // given the workload, but in reality fewer may be created because the | 1044 // given the workload, but in reality fewer may be created because the |
| 970 // sequence of thread creation on the background threads is racing with the | 1045 // sequence of thread creation on the background threads is racing with the |
| 971 // shutdown call. | 1046 // shutdown call. |
| 972 if (!shutdown_called_ && | 1047 if (!shutdown_called_ && |
| 973 !thread_being_created_ && | 1048 !thread_being_created_ && |
| 1049 cleanup_state_ == CLEANUP_DONE && | |
| 974 threads_.size() < max_threads_ && | 1050 threads_.size() < max_threads_ && |
| 975 waiting_thread_count_ == 0) { | 1051 waiting_thread_count_ == 0) { |
| 976 // We could use an additional thread if there's work to be done. | 1052 // We could use an additional thread if there's work to be done. |
| 977 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); | 1053 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); |
| 978 i != pending_tasks_.end(); ++i) { | 1054 i != pending_tasks_.end(); ++i) { |
| 979 if (IsSequenceTokenRunnable(i->sequence_token_id)) { | 1055 if (IsSequenceTokenRunnable(i->sequence_token_id)) { |
| 980 // Found a runnable task, mark the thread as being started. | 1056 // Found a runnable task, mark the thread as being started. |
| 981 thread_being_created_ = true; | 1057 thread_being_created_ = true; |
| 982 return static_cast<int>(threads_.size() + 1); | 1058 return static_cast<int>(threads_.size() + 1); |
| 983 } | 1059 } |
| (...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1142 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1218 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| 1143 return inner_->RunsTasksOnCurrentThread(); | 1219 return inner_->RunsTasksOnCurrentThread(); |
| 1144 } | 1220 } |
| 1145 | 1221 |
| 1146 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1222 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 1147 SequenceToken sequence_token) const { | 1223 SequenceToken sequence_token) const { |
| 1148 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1224 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 1149 } | 1225 } |
| 1150 | 1226 |
| 1151 void SequencedWorkerPool::FlushForTesting() { | 1227 void SequencedWorkerPool::FlushForTesting() { |
| 1152 inner_->FlushForTesting(); | 1228 inner_->CleanupForTesting(); |
| 1153 } | 1229 } |
| 1154 | 1230 |
| 1155 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1231 void SequencedWorkerPool::SignalHasWorkForTesting() { |
| 1156 inner_->SignalHasWorkForTesting(); | 1232 inner_->SignalHasWorkForTesting(); |
| 1157 } | 1233 } |
| 1158 | 1234 |
| 1159 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1235 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1160 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1236 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
| 1161 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1237 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1162 } | 1238 } |
| 1163 | 1239 |
| 1164 } // namespace base | 1240 } // namespace base |
| OLD | NEW |