OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
274 SequenceToken sequence_token, | 274 SequenceToken sequence_token, |
275 WorkerShutdown shutdown_behavior, | 275 WorkerShutdown shutdown_behavior, |
276 const tracked_objects::Location& from_here, | 276 const tracked_objects::Location& from_here, |
277 const Closure& task, | 277 const Closure& task, |
278 TimeDelta delay); | 278 TimeDelta delay); |
279 | 279 |
280 bool RunsTasksOnCurrentThread() const; | 280 bool RunsTasksOnCurrentThread() const; |
281 | 281 |
282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
283 | 283 |
284 void FlushForTesting(); | 284 void CleanupForTesting(); |
akalin
2013/02/26 00:12:23
any reason for this rename? perhaps make it consis
| |
285 | 285 |
286 void SignalHasWorkForTesting(); | 286 void SignalHasWorkForTesting(); |
287 | 287 |
288 int GetWorkSignalCountForTesting() const; | 288 int GetWorkSignalCountForTesting() const; |
289 | 289 |
290 void Shutdown(int max_blocking_tasks_after_shutdown); | 290 void Shutdown(int max_blocking_tasks_after_shutdown); |
291 | 291 |
292 // Runs the worker loop on the background thread. | 292 // Runs the worker loop on the background thread. |
293 void ThreadLoop(Worker* this_worker); | 293 void ThreadLoop(Worker* this_worker); |
294 | 294 |
295 private: | 295 private: |
296 enum GetWorkStatus { | 296 enum GetWorkStatus { |
297 GET_WORK_FOUND, | 297 GET_WORK_FOUND, |
298 GET_WORK_NOT_FOUND, | 298 GET_WORK_NOT_FOUND, |
299 GET_WORK_WAIT, | 299 GET_WORK_WAIT, |
300 }; | 300 }; |
301 | 301 |
302 // Returns whether there are no more pending tasks and all threads | 302 enum CleanupState { |
303 // are idle. Must be called under lock. | 303 CLEANUP_REQUESTED, |
304 bool IsIdle() const; | 304 CLEANUP_STARTING, |
305 CLEANUP_RUNNING, | |
306 CLEANUP_FINISHING, | |
307 CLEANUP_DONE, | |
308 }; | |
305 | 309 |
306 // Called from within the lock, this converts the given token name into a | 310 // Called from within the lock, this converts the given token name into a |
307 // token ID, creating a new one if necessary. | 311 // token ID, creating a new one if necessary. |
308 int LockedGetNamedTokenID(const std::string& name); | 312 int LockedGetNamedTokenID(const std::string& name); |
309 | 313 |
310 // Called from within the lock, this returns the next sequence task number. | 314 // Called from within the lock, this returns the next sequence task number. |
311 int64 LockedGetNextSequenceTaskNumber(); | 315 int64 LockedGetNextSequenceTaskNumber(); |
312 | 316 |
313 // Called from within the lock, returns the shutdown behavior of the task | 317 // Called from within the lock, returns the shutdown behavior of the task |
314 // running on the currently executing worker thread. If invoked from a thread | 318 // running on the currently executing worker thread. If invoked from a thread |
(...skipping 12 matching lines...) Expand all Loading... | |
327 // filled in the time to wait until the next task to run. In this case, the | 331 // filled in the time to wait until the next task to run. In this case, the |
328 // caller should wait the time. | 332 // caller should wait the time. |
329 // | 333 // |
330 // In any case, the calling code should clear the given | 334 // In any case, the calling code should clear the given |
331 // delete_these_outside_lock vector the next time the lock is released. | 335 // delete_these_outside_lock vector the next time the lock is released. |
332 // See the implementation for a more detailed description. | 336 // See the implementation for a more detailed description. |
333 GetWorkStatus GetWork(SequencedTask* task, | 337 GetWorkStatus GetWork(SequencedTask* task, |
334 TimeDelta* wait_time, | 338 TimeDelta* wait_time, |
335 std::vector<Closure>* delete_these_outside_lock); | 339 std::vector<Closure>* delete_these_outside_lock); |
336 | 340 |
341 void HandleCleanup(); | |
342 | |
337 // Peforms init and cleanup around running the given task. WillRun... | 343 // Peforms init and cleanup around running the given task. WillRun... |
338 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 344 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
339 // The calling code should call FinishStartingAdditionalThread once the | 345 // The calling code should call FinishStartingAdditionalThread once the |
340 // lock is released if the return values is nonzero. | 346 // lock is released if the return values is nonzero. |
341 int WillRunWorkerTask(const SequencedTask& task); | 347 int WillRunWorkerTask(const SequencedTask& task); |
342 void DidRunWorkerTask(const SequencedTask& task); | 348 void DidRunWorkerTask(const SequencedTask& task); |
343 | 349 |
344 // Returns true if there are no threads currently running the given | 350 // Returns true if there are no threads currently running the given |
345 // sequence token. | 351 // sequence token. |
346 bool IsSequenceTokenRunnable(int sequence_token_id) const; | 352 bool IsSequenceTokenRunnable(int sequence_token_id) const; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
382 // This lock protects |everything in this class|. Do not read or modify | 388 // This lock protects |everything in this class|. Do not read or modify |
383 // anything without holding this lock. Do not block while holding this | 389 // anything without holding this lock. Do not block while holding this |
384 // lock. | 390 // lock. |
385 mutable Lock lock_; | 391 mutable Lock lock_; |
386 | 392 |
387 // Condition variable that is waited on by worker threads until new | 393 // Condition variable that is waited on by worker threads until new |
388 // tasks are posted or shutdown starts. | 394 // tasks are posted or shutdown starts. |
389 ConditionVariable has_work_cv_; | 395 ConditionVariable has_work_cv_; |
390 | 396 |
391 // Condition variable that is waited on by non-worker threads (in | 397 // Condition variable that is waited on by non-worker threads (in |
392 // FlushForTesting()) until IsIdle() goes to true. | |
393 ConditionVariable is_idle_cv_; | |
394 | |
395 // Condition variable that is waited on by non-worker threads (in | |
396 // Shutdown()) until CanShutdown() goes to true. | 398 // Shutdown()) until CanShutdown() goes to true. |
397 ConditionVariable can_shutdown_cv_; | 399 ConditionVariable can_shutdown_cv_; |
398 | 400 |
399 // The maximum number of worker threads we'll create. | 401 // The maximum number of worker threads we'll create. |
400 const size_t max_threads_; | 402 const size_t max_threads_; |
401 | 403 |
402 const std::string thread_name_prefix_; | 404 const std::string thread_name_prefix_; |
403 | 405 |
404 // Associates all known sequence token names with their IDs. | 406 // Associates all known sequence token names with their IDs. |
405 std::map<std::string, int> named_sequence_tokens_; | 407 std::map<std::string, int> named_sequence_tokens_; |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
443 int trace_id_; | 445 int trace_id_; |
444 | 446 |
445 // Set when Shutdown is called and no further tasks should be | 447 // Set when Shutdown is called and no further tasks should be |
446 // allowed, though we may still be running existing tasks. | 448 // allowed, though we may still be running existing tasks. |
447 bool shutdown_called_; | 449 bool shutdown_called_; |
448 | 450 |
449 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() | 451 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() |
450 // has been called. | 452 // has been called. |
451 int max_blocking_tasks_after_shutdown_; | 453 int max_blocking_tasks_after_shutdown_; |
452 | 454 |
455 // State used to cleanup for testing, all guarded by lock_. | |
456 CleanupState cleanup_state_; | |
457 size_t cleanup_idlers_; | |
458 ConditionVariable cleanup_cv_; | |
459 | |
453 TestingObserver* const testing_observer_; | 460 TestingObserver* const testing_observer_; |
454 | 461 |
455 DISALLOW_COPY_AND_ASSIGN(Inner); | 462 DISALLOW_COPY_AND_ASSIGN(Inner); |
456 }; | 463 }; |
457 | 464 |
458 // Worker definitions --------------------------------------------------------- | 465 // Worker definitions --------------------------------------------------------- |
459 | 466 |
460 SequencedWorkerPool::Worker::Worker( | 467 SequencedWorkerPool::Worker::Worker( |
461 const scoped_refptr<SequencedWorkerPool>& worker_pool, | 468 const scoped_refptr<SequencedWorkerPool>& worker_pool, |
462 int thread_number, | 469 int thread_number, |
(...skipping 23 matching lines...) Expand all Loading... | |
486 | 493 |
487 SequencedWorkerPool::Inner::Inner( | 494 SequencedWorkerPool::Inner::Inner( |
488 SequencedWorkerPool* worker_pool, | 495 SequencedWorkerPool* worker_pool, |
489 size_t max_threads, | 496 size_t max_threads, |
490 const std::string& thread_name_prefix, | 497 const std::string& thread_name_prefix, |
491 TestingObserver* observer) | 498 TestingObserver* observer) |
492 : worker_pool_(worker_pool), | 499 : worker_pool_(worker_pool), |
493 last_sequence_number_(0), | 500 last_sequence_number_(0), |
494 lock_(), | 501 lock_(), |
495 has_work_cv_(&lock_), | 502 has_work_cv_(&lock_), |
496 is_idle_cv_(&lock_), | |
497 can_shutdown_cv_(&lock_), | 503 can_shutdown_cv_(&lock_), |
498 max_threads_(max_threads), | 504 max_threads_(max_threads), |
499 thread_name_prefix_(thread_name_prefix), | 505 thread_name_prefix_(thread_name_prefix), |
500 thread_being_created_(false), | 506 thread_being_created_(false), |
501 waiting_thread_count_(0), | 507 waiting_thread_count_(0), |
502 blocking_shutdown_thread_count_(0), | 508 blocking_shutdown_thread_count_(0), |
503 next_sequence_task_number_(0), | 509 next_sequence_task_number_(0), |
504 blocking_shutdown_pending_task_count_(0), | 510 blocking_shutdown_pending_task_count_(0), |
505 trace_id_(0), | 511 trace_id_(0), |
506 shutdown_called_(false), | 512 shutdown_called_(false), |
507 max_blocking_tasks_after_shutdown_(0), | 513 max_blocking_tasks_after_shutdown_(0), |
514 cleanup_state_(CLEANUP_DONE), | |
515 cleanup_idlers_(0), | |
516 cleanup_cv_(&lock_), | |
508 testing_observer_(observer) {} | 517 testing_observer_(observer) {} |
509 | 518 |
510 SequencedWorkerPool::Inner::~Inner() { | 519 SequencedWorkerPool::Inner::~Inner() { |
511 // You must call Shutdown() before destroying the pool. | 520 // You must call Shutdown() before destroying the pool. |
512 DCHECK(shutdown_called_); | 521 DCHECK(shutdown_called_); |
513 | 522 |
514 // Need to explicitly join with the threads before they're destroyed or else | 523 // Need to explicitly join with the threads before they're destroyed or else |
515 // they will be running when our object is half torn down. | 524 // they will be running when our object is half torn down. |
516 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) | 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
517 it->second->Join(); | 526 it->second->Join(); |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
602 | 611 |
603 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 612 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
604 SequenceToken sequence_token) const { | 613 SequenceToken sequence_token) const { |
605 AutoLock lock(lock_); | 614 AutoLock lock(lock_); |
606 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
607 if (found == threads_.end()) | 616 if (found == threads_.end()) |
608 return false; | 617 return false; |
609 return found->second->running_sequence().Equals(sequence_token); | 618 return found->second->running_sequence().Equals(sequence_token); |
610 } | 619 } |
611 | 620 |
612 void SequencedWorkerPool::Inner::FlushForTesting() { | 621 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
622 void SequencedWorkerPool::Inner::CleanupForTesting() { | |
623 DCHECK(!RunsTasksOnCurrentThread()); | |
624 base::ThreadRestrictions::ScopedAllowWait allow_wait; | |
613 AutoLock lock(lock_); | 625 AutoLock lock(lock_); |
614 while (!IsIdle()) | 626 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
615 is_idle_cv_.Wait(); | 627 if (shutdown_called_) |
628 return; | |
629 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) | |
630 return; | |
631 cleanup_state_ = CLEANUP_REQUESTED; | |
632 cleanup_idlers_ = 0; | |
633 has_work_cv_.Signal(); | |
634 while (cleanup_state_ != CLEANUP_DONE) | |
635 cleanup_cv_.Wait(); | |
616 } | 636 } |
617 | 637 |
618 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { | 638 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
619 SignalHasWork(); | 639 SignalHasWork(); |
620 } | 640 } |
621 | 641 |
622 void SequencedWorkerPool::Inner::Shutdown( | 642 void SequencedWorkerPool::Inner::Shutdown( |
623 int max_new_blocking_tasks_after_shutdown) { | 643 int max_new_blocking_tasks_after_shutdown) { |
624 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); | 644 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
625 { | 645 { |
626 AutoLock lock(lock_); | 646 AutoLock lock(lock_); |
627 | 647 // Cleanup and Shutdown should not be called concurrently. |
648 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | |
628 if (shutdown_called_) | 649 if (shutdown_called_) |
629 return; | 650 return; |
630 shutdown_called_ = true; | 651 shutdown_called_ = true; |
631 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; | 652 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; |
632 | 653 |
633 // Tickle the threads. This will wake up a waiting one so it will know that | 654 // Tickle the threads. This will wake up a waiting one so it will know that |
634 // it can exit, which in turn will wake up any other waiting ones. | 655 // it can exit, which in turn will wake up any other waiting ones. |
635 SignalHasWork(); | 656 SignalHasWork(); |
636 | 657 |
637 // There are no pending or running tasks blocking shutdown, we're done. | 658 // There are no pending or running tasks blocking shutdown, we're done. |
(...skipping 27 matching lines...) Expand all Loading... | |
665 std::pair<ThreadMap::iterator, bool> result = | 686 std::pair<ThreadMap::iterator, bool> result = |
666 threads_.insert( | 687 threads_.insert( |
667 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | 688 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
668 DCHECK(result.second); | 689 DCHECK(result.second); |
669 | 690 |
670 while (true) { | 691 while (true) { |
671 #if defined(OS_MACOSX) | 692 #if defined(OS_MACOSX) |
672 base::mac::ScopedNSAutoreleasePool autorelease_pool; | 693 base::mac::ScopedNSAutoreleasePool autorelease_pool; |
673 #endif | 694 #endif |
674 | 695 |
696 HandleCleanup(); | |
697 | |
675 // See GetWork for what delete_these_outside_lock is doing. | 698 // See GetWork for what delete_these_outside_lock is doing. |
676 SequencedTask task; | 699 SequencedTask task; |
677 TimeDelta wait_time; | 700 TimeDelta wait_time; |
678 std::vector<Closure> delete_these_outside_lock; | 701 std::vector<Closure> delete_these_outside_lock; |
679 GetWorkStatus status = | 702 GetWorkStatus status = |
680 GetWork(&task, &wait_time, &delete_these_outside_lock); | 703 GetWork(&task, &wait_time, &delete_these_outside_lock); |
681 if (status == GET_WORK_FOUND) { | 704 if (status == GET_WORK_FOUND) { |
682 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", | 705 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", |
683 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); | 706 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); |
684 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", | 707 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", |
(...skipping 28 matching lines...) Expand all Loading... | |
713 // same reason we do this with delete_these_oustide_lock. | 736 // same reason we do this with delete_these_oustide_lock. |
714 // Also, do it before calling set_running_task_info() so | 737 // Also, do it before calling set_running_task_info() so |
715 // that sequence-checking from within the task's destructor | 738 // that sequence-checking from within the task's destructor |
716 // still works. | 739 // still works. |
717 task.task = Closure(); | 740 task.task = Closure(); |
718 | 741 |
719 this_worker->set_running_task_info( | 742 this_worker->set_running_task_info( |
720 SequenceToken(), CONTINUE_ON_SHUTDOWN); | 743 SequenceToken(), CONTINUE_ON_SHUTDOWN); |
721 } | 744 } |
722 DidRunWorkerTask(task); // Must be done inside the lock. | 745 DidRunWorkerTask(task); // Must be done inside the lock. |
746 } else if (cleanup_state_ == CLEANUP_RUNNING) { | |
747 switch (status) { | |
748 case GET_WORK_WAIT: { | |
749 AutoUnlock unlock(lock_); | |
750 delete_these_outside_lock.clear(); | |
751 } | |
752 break; | |
753 case GET_WORK_NOT_FOUND: | |
754 CHECK(delete_these_outside_lock.empty()); | |
755 cleanup_state_ = CLEANUP_FINISHING; | |
756 cleanup_cv_.Broadcast(); | |
757 break; | |
758 default: | |
759 NOTREACHED(); | |
760 } | |
723 } else { | 761 } else { |
724 // When we're terminating and there's no more work, we can | 762 // When we're terminating and there's no more work, we can |
725 // shut down, other workers can complete any pending or new tasks. | 763 // shut down, other workers can complete any pending or new tasks. |
726 // We can get additional tasks posted after shutdown_called_ is set | 764 // We can get additional tasks posted after shutdown_called_ is set |
727 // but only worker threads are allowed to post tasks at that time, and | 765 // but only worker threads are allowed to post tasks at that time, and |
728 // the workers responsible for posting those tasks will be available | 766 // the workers responsible for posting those tasks will be available |
729 // to run them. Also, there may be some tasks stuck behind running | 767 // to run them. Also, there may be some tasks stuck behind running |
730 // ones with the same sequence token, but additional threads won't | 768 // ones with the same sequence token, but additional threads won't |
731 // help this case. | 769 // help this case. |
732 if (shutdown_called_ && | 770 if (shutdown_called_ && |
733 blocking_shutdown_pending_task_count_ == 0) | 771 blocking_shutdown_pending_task_count_ == 0) |
734 break; | 772 break; |
735 waiting_thread_count_++; | 773 waiting_thread_count_++; |
736 // This is the only time that IsIdle() can go to true. | |
737 if (IsIdle()) | |
738 is_idle_cv_.Signal(); | |
739 | 774 |
740 switch (status) { | 775 switch (status) { |
741 case GET_WORK_NOT_FOUND: | 776 case GET_WORK_NOT_FOUND: |
742 has_work_cv_.Wait(); | 777 has_work_cv_.Wait(); |
743 break; | 778 break; |
744 case GET_WORK_WAIT: | 779 case GET_WORK_WAIT: |
745 has_work_cv_.TimedWait(wait_time); | 780 has_work_cv_.TimedWait(wait_time); |
746 break; | 781 break; |
747 default: | 782 default: |
748 NOTREACHED(); | 783 NOTREACHED(); |
749 } | 784 } |
750 waiting_thread_count_--; | 785 waiting_thread_count_--; |
751 } | 786 } |
752 } | 787 } |
753 } // Release lock_. | 788 } // Release lock_. |
754 | 789 |
755 // We noticed we should exit. Wake up the next worker so it knows it should | 790 // We noticed we should exit. Wake up the next worker so it knows it should |
756 // exit as well (because the Shutdown() code only signals once). | 791 // exit as well (because the Shutdown() code only signals once). |
757 SignalHasWork(); | 792 SignalHasWork(); |
758 | 793 |
759 // Possibly unblock shutdown. | 794 // Possibly unblock shutdown. |
760 can_shutdown_cv_.Signal(); | 795 can_shutdown_cv_.Signal(); |
761 } | 796 } |
762 | 797 |
763 bool SequencedWorkerPool::Inner::IsIdle() const { | 798 void SequencedWorkerPool::Inner::HandleCleanup() { |
764 lock_.AssertAcquired(); | 799 lock_.AssertAcquired(); |
765 return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); | 800 if (cleanup_state_ == CLEANUP_DONE) |
801 return; | |
802 if (cleanup_state_ == CLEANUP_REQUESTED) { | |
akalin
2013/02/26 00:12:23
sad that cleanup handling became this complex, but
michaeln
2013/02/26 00:33:00
I know... as mentioned a while ago... in some sens
| |
803 // We win, we get to do the cleanup as soon as the others wise up and idle. | |
804 cleanup_state_ = CLEANUP_STARTING; | |
805 while (thread_being_created_ || | |
806 cleanup_idlers_ != threads_.size() - 1) { | |
807 has_work_cv_.Signal(); | |
808 cleanup_cv_.Wait(); | |
809 } | |
810 cleanup_state_ = CLEANUP_RUNNING; | |
811 return; | |
812 } | |
813 if (cleanup_state_ == CLEANUP_STARTING) { | |
814 // Another worker thread is cleaning up, we idle here until thats done. | |
815 ++cleanup_idlers_; | |
816 cleanup_cv_.Broadcast(); | |
817 while (cleanup_state_ != CLEANUP_FINISHING) { | |
818 cleanup_cv_.Wait(); | |
819 } | |
820 --cleanup_idlers_; | |
821 cleanup_cv_.Broadcast(); | |
822 return; | |
823 } | |
824 if (cleanup_state_ == CLEANUP_FINISHING) { | |
825 // We wait for all idlers to wake up prior to being DONE. | |
826 while (cleanup_idlers_ != 0) { | |
827 cleanup_cv_.Broadcast(); | |
828 cleanup_cv_.Wait(); | |
829 } | |
830 if (cleanup_state_ == CLEANUP_FINISHING) { | |
831 cleanup_state_ = CLEANUP_DONE; | |
832 cleanup_cv_.Signal(); | |
833 } | |
834 return; | |
835 } | |
766 } | 836 } |
767 | 837 |
768 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( | 838 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
769 const std::string& name) { | 839 const std::string& name) { |
770 lock_.AssertAcquired(); | 840 lock_.AssertAcquired(); |
771 DCHECK(!name.empty()); | 841 DCHECK(!name.empty()); |
772 | 842 |
773 std::map<std::string, int>::const_iterator found = | 843 std::map<std::string, int>::const_iterator found = |
774 named_sequence_tokens_.find(name); | 844 named_sequence_tokens_.find(name); |
775 if (found != named_sequence_tokens_.end()) | 845 if (found != named_sequence_tokens_.end()) |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
838 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { | 908 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { |
839 unrunnable_tasks++; | 909 unrunnable_tasks++; |
840 ++i; | 910 ++i; |
841 continue; | 911 continue; |
842 } | 912 } |
843 | 913 |
844 if (i->time_to_run > current_time) { | 914 if (i->time_to_run > current_time) { |
845 // The time to run has not come yet. | 915 // The time to run has not come yet. |
846 *wait_time = i->time_to_run - current_time; | 916 *wait_time = i->time_to_run - current_time; |
847 status = GET_WORK_WAIT; | 917 status = GET_WORK_WAIT; |
918 if (cleanup_state_ == CLEANUP_RUNNING) { | |
919 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. | |
920 delete_these_outside_lock->push_back(i->task); | |
921 pending_tasks_.erase(i); | |
922 } | |
848 break; | 923 break; |
849 } | 924 } |
850 | 925 |
851 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 926 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
852 // We're shutting down and the task we just found isn't blocking | 927 // We're shutting down and the task we just found isn't blocking |
853 // shutdown. Delete it and get more work. | 928 // shutdown. Delete it and get more work. |
854 // | 929 // |
855 // Note that we do not want to delete unrunnable tasks. Deleting a task | 930 // Note that we do not want to delete unrunnable tasks. Deleting a task |
856 // can have side effects (like freeing some objects) and deleting a | 931 // can have side effects (like freeing some objects) and deleting a |
857 // task that's supposed to run after one that's currently running could | 932 // task that's supposed to run after one that's currently running could |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
964 // cause a worker to start since one is pending. | 1039 // cause a worker to start since one is pending. |
965 // 3. Main thread initiates shutdown. | 1040 // 3. Main thread initiates shutdown. |
966 // 4. No more threads are created since the shutdown_called_ flag is set. | 1041 // 4. No more threads are created since the shutdown_called_ flag is set. |
967 // | 1042 // |
968 // The result is that one may expect that max_threads_ workers to be created | 1043 // The result is that one may expect that max_threads_ workers to be created |
969 // given the workload, but in reality fewer may be created because the | 1044 // given the workload, but in reality fewer may be created because the |
970 // sequence of thread creation on the background threads is racing with the | 1045 // sequence of thread creation on the background threads is racing with the |
971 // shutdown call. | 1046 // shutdown call. |
972 if (!shutdown_called_ && | 1047 if (!shutdown_called_ && |
973 !thread_being_created_ && | 1048 !thread_being_created_ && |
1049 cleanup_state_ == CLEANUP_DONE && | |
974 threads_.size() < max_threads_ && | 1050 threads_.size() < max_threads_ && |
975 waiting_thread_count_ == 0) { | 1051 waiting_thread_count_ == 0) { |
976 // We could use an additional thread if there's work to be done. | 1052 // We could use an additional thread if there's work to be done. |
977 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); | 1053 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); |
978 i != pending_tasks_.end(); ++i) { | 1054 i != pending_tasks_.end(); ++i) { |
979 if (IsSequenceTokenRunnable(i->sequence_token_id)) { | 1055 if (IsSequenceTokenRunnable(i->sequence_token_id)) { |
980 // Found a runnable task, mark the thread as being started. | 1056 // Found a runnable task, mark the thread as being started. |
981 thread_being_created_ = true; | 1057 thread_being_created_ = true; |
982 return static_cast<int>(threads_.size() + 1); | 1058 return static_cast<int>(threads_.size() + 1); |
983 } | 1059 } |
(...skipping 158 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1142 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 1218 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
1143 return inner_->RunsTasksOnCurrentThread(); | 1219 return inner_->RunsTasksOnCurrentThread(); |
1144 } | 1220 } |
1145 | 1221 |
1146 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | 1222 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
1147 SequenceToken sequence_token) const { | 1223 SequenceToken sequence_token) const { |
1148 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | 1224 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
1149 } | 1225 } |
1150 | 1226 |
1151 void SequencedWorkerPool::FlushForTesting() { | 1227 void SequencedWorkerPool::FlushForTesting() { |
1152 inner_->FlushForTesting(); | 1228 inner_->CleanupForTesting(); |
1153 } | 1229 } |
1154 | 1230 |
1155 void SequencedWorkerPool::SignalHasWorkForTesting() { | 1231 void SequencedWorkerPool::SignalHasWorkForTesting() { |
1156 inner_->SignalHasWorkForTesting(); | 1232 inner_->SignalHasWorkForTesting(); |
1157 } | 1233 } |
1158 | 1234 |
1159 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1235 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1160 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1236 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
1161 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1237 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1162 } | 1238 } |
1163 | 1239 |
1164 } // namespace base | 1240 } // namespace base |
OLD | NEW |