Chromium Code Reviews| Index: base/threading/sequenced_worker_pool.cc |
| =================================================================== |
| --- base/threading/sequenced_worker_pool.cc (revision 183651) |
| +++ base/threading/sequenced_worker_pool.cc (working copy) |
| @@ -281,7 +281,7 @@ |
| bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| - void FlushForTesting(); |
| + void CleanupForTesting(); |
|
akalin
2013/02/26 00:12:23
any reason for this rename? perhaps make it consis
|
| void SignalHasWorkForTesting(); |
| @@ -299,9 +299,13 @@ |
| GET_WORK_WAIT, |
| }; |
| - // Returns whether there are no more pending tasks and all threads |
| - // are idle. Must be called under lock. |
| - bool IsIdle() const; |
| + enum CleanupState { |
| + CLEANUP_REQUESTED, |
| + CLEANUP_STARTING, |
| + CLEANUP_RUNNING, |
| + CLEANUP_FINISHING, |
| + CLEANUP_DONE, |
| + }; |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| @@ -334,6 +338,8 @@ |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock); |
| + void HandleCleanup(); |
| + |
| // Peforms init and cleanup around running the given task. WillRun... |
| // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| // The calling code should call FinishStartingAdditionalThread once the |
| @@ -389,10 +395,6 @@ |
| ConditionVariable has_work_cv_; |
| // Condition variable that is waited on by non-worker threads (in |
| - // FlushForTesting()) until IsIdle() goes to true. |
| - ConditionVariable is_idle_cv_; |
| - |
| - // Condition variable that is waited on by non-worker threads (in |
| // Shutdown()) until CanShutdown() goes to true. |
| ConditionVariable can_shutdown_cv_; |
| @@ -450,6 +452,11 @@ |
| // has been called. |
| int max_blocking_tasks_after_shutdown_; |
| + // State used to cleanup for testing, all guarded by lock_. |
| + CleanupState cleanup_state_; |
| + size_t cleanup_idlers_; |
| + ConditionVariable cleanup_cv_; |
| + |
| TestingObserver* const testing_observer_; |
| DISALLOW_COPY_AND_ASSIGN(Inner); |
| @@ -493,7 +500,6 @@ |
| last_sequence_number_(0), |
| lock_(), |
| has_work_cv_(&lock_), |
| - is_idle_cv_(&lock_), |
| can_shutdown_cv_(&lock_), |
| max_threads_(max_threads), |
| thread_name_prefix_(thread_name_prefix), |
| @@ -505,6 +511,9 @@ |
| trace_id_(0), |
| shutdown_called_(false), |
| max_blocking_tasks_after_shutdown_(0), |
| + cleanup_state_(CLEANUP_DONE), |
| + cleanup_idlers_(0), |
| + cleanup_cv_(&lock_), |
| testing_observer_(observer) {} |
| SequencedWorkerPool::Inner::~Inner() { |
| @@ -609,10 +618,21 @@ |
| return found->second->running_sequence().Equals(sequence_token); |
| } |
| -void SequencedWorkerPool::Inner::FlushForTesting() { |
| +// See https://code.google.com/p/chromium/issues/detail?id=168415 |
| +void SequencedWorkerPool::Inner::CleanupForTesting() { |
| + DCHECK(!RunsTasksOnCurrentThread()); |
| + base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| AutoLock lock(lock_); |
| - while (!IsIdle()) |
| - is_idle_cv_.Wait(); |
| + CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| + if (shutdown_called_) |
| + return; |
| + if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
| + return; |
| + cleanup_state_ = CLEANUP_REQUESTED; |
| + cleanup_idlers_ = 0; |
| + has_work_cv_.Signal(); |
| + while (cleanup_state_ != CLEANUP_DONE) |
| + cleanup_cv_.Wait(); |
| } |
| void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
| @@ -624,7 +644,8 @@ |
| DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
| { |
| AutoLock lock(lock_); |
| - |
| + // Cleanup and Shutdown should not be called concurrently. |
| + CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| if (shutdown_called_) |
| return; |
| shutdown_called_ = true; |
| @@ -672,6 +693,8 @@ |
| base::mac::ScopedNSAutoreleasePool autorelease_pool; |
| #endif |
| + HandleCleanup(); |
| + |
| // See GetWork for what delete_these_outside_lock is doing. |
| SequencedTask task; |
| TimeDelta wait_time; |
| @@ -720,6 +743,21 @@ |
| SequenceToken(), CONTINUE_ON_SHUTDOWN); |
| } |
| DidRunWorkerTask(task); // Must be done inside the lock. |
| + } else if (cleanup_state_ == CLEANUP_RUNNING) { |
| + switch (status) { |
| + case GET_WORK_WAIT: { |
| + AutoUnlock unlock(lock_); |
| + delete_these_outside_lock.clear(); |
| + } |
| + break; |
| + case GET_WORK_NOT_FOUND: |
| + CHECK(delete_these_outside_lock.empty()); |
| + cleanup_state_ = CLEANUP_FINISHING; |
| + cleanup_cv_.Broadcast(); |
| + break; |
| + default: |
| + NOTREACHED(); |
| + } |
| } else { |
| // When we're terminating and there's no more work, we can |
| // shut down, other workers can complete any pending or new tasks. |
| @@ -733,9 +771,6 @@ |
| blocking_shutdown_pending_task_count_ == 0) |
| break; |
| waiting_thread_count_++; |
| - // This is the only time that IsIdle() can go to true. |
| - if (IsIdle()) |
| - is_idle_cv_.Signal(); |
| switch (status) { |
| case GET_WORK_NOT_FOUND: |
| @@ -760,9 +795,44 @@ |
| can_shutdown_cv_.Signal(); |
| } |
| -bool SequencedWorkerPool::Inner::IsIdle() const { |
| +void SequencedWorkerPool::Inner::HandleCleanup() { |
| lock_.AssertAcquired(); |
| - return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); |
| + if (cleanup_state_ == CLEANUP_DONE) |
| + return; |
| + if (cleanup_state_ == CLEANUP_REQUESTED) { |
|
akalin
2013/02/26 00:12:23
sad that cleanup handling became this complex, but
michaeln
2013/02/26 00:33:00
I know... as mentioned a while ago... in some sens
|
| + // We win, we get to do the cleanup as soon as the others wise up and idle. |
| + cleanup_state_ = CLEANUP_STARTING; |
| + while (thread_being_created_ || |
| + cleanup_idlers_ != threads_.size() - 1) { |
| + has_work_cv_.Signal(); |
| + cleanup_cv_.Wait(); |
| + } |
| + cleanup_state_ = CLEANUP_RUNNING; |
| + return; |
| + } |
| + if (cleanup_state_ == CLEANUP_STARTING) { |
| + // Another worker thread is cleaning up, we idle here until thats done. |
| + ++cleanup_idlers_; |
| + cleanup_cv_.Broadcast(); |
| + while (cleanup_state_ != CLEANUP_FINISHING) { |
| + cleanup_cv_.Wait(); |
| + } |
| + --cleanup_idlers_; |
| + cleanup_cv_.Broadcast(); |
| + return; |
| + } |
| + if (cleanup_state_ == CLEANUP_FINISHING) { |
| + // We wait for all idlers to wake up prior to being DONE. |
| + while (cleanup_idlers_ != 0) { |
| + cleanup_cv_.Broadcast(); |
| + cleanup_cv_.Wait(); |
| + } |
| + if (cleanup_state_ == CLEANUP_FINISHING) { |
| + cleanup_state_ = CLEANUP_DONE; |
| + cleanup_cv_.Signal(); |
| + } |
| + return; |
| + } |
| } |
| int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| @@ -845,6 +915,11 @@ |
| // The time to run has not come yet. |
| *wait_time = i->time_to_run - current_time; |
| status = GET_WORK_WAIT; |
| + if (cleanup_state_ == CLEANUP_RUNNING) { |
| + // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
| + delete_these_outside_lock->push_back(i->task); |
| + pending_tasks_.erase(i); |
| + } |
| break; |
| } |
| @@ -971,6 +1046,7 @@ |
| // shutdown call. |
| if (!shutdown_called_ && |
| !thread_being_created_ && |
| + cleanup_state_ == CLEANUP_DONE && |
| threads_.size() < max_threads_ && |
| waiting_thread_count_ == 0) { |
| // We could use an additional thread if there's work to be done. |
| @@ -1149,7 +1225,7 @@ |
| } |
| void SequencedWorkerPool::FlushForTesting() { |
| - inner_->FlushForTesting(); |
| + inner_->CleanupForTesting(); |
| } |
| void SequencedWorkerPool::SignalHasWorkForTesting() { |