Index: base/threading/sequenced_worker_pool.cc |
=================================================================== |
--- base/threading/sequenced_worker_pool.cc (revision 186525) |
+++ base/threading/sequenced_worker_pool.cc (working copy) |
@@ -281,7 +281,7 @@ |
bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
- void FlushForTesting(); |
+ void CleanupForTesting(); |
void SignalHasWorkForTesting(); |
@@ -299,9 +299,13 @@ |
GET_WORK_WAIT, |
}; |
- // Returns whether there are no more pending tasks and all threads |
- // are idle. Must be called under lock. |
- bool IsIdle() const; |
+ enum CleanupState { |
+ CLEANUP_REQUESTED, |
+ CLEANUP_STARTING, |
+ CLEANUP_RUNNING, |
+ CLEANUP_FINISHING, |
+ CLEANUP_DONE, |
+ }; |
// Called from within the lock, this converts the given token name into a |
// token ID, creating a new one if necessary. |
@@ -334,6 +338,8 @@ |
TimeDelta* wait_time, |
std::vector<Closure>* delete_these_outside_lock); |
+ void HandleCleanup(); |
+ |
// Peforms init and cleanup around running the given task. WillRun... |
// returns the value from PrepareToStartAdditionalThreadIfNecessary. |
// The calling code should call FinishStartingAdditionalThread once the |
@@ -389,10 +395,6 @@ |
ConditionVariable has_work_cv_; |
// Condition variable that is waited on by non-worker threads (in |
- // FlushForTesting()) until IsIdle() goes to true. |
- ConditionVariable is_idle_cv_; |
- |
- // Condition variable that is waited on by non-worker threads (in |
// Shutdown()) until CanShutdown() goes to true. |
ConditionVariable can_shutdown_cv_; |
@@ -450,6 +452,11 @@ |
// has been called. |
int max_blocking_tasks_after_shutdown_; |
+ // State used to cleanup for testing, all guarded by lock_. |
+ CleanupState cleanup_state_; |
+ size_t cleanup_idlers_; |
+ ConditionVariable cleanup_cv_; |
+ |
TestingObserver* const testing_observer_; |
DISALLOW_COPY_AND_ASSIGN(Inner); |
@@ -493,7 +500,6 @@ |
last_sequence_number_(0), |
lock_(), |
has_work_cv_(&lock_), |
- is_idle_cv_(&lock_), |
can_shutdown_cv_(&lock_), |
max_threads_(max_threads), |
thread_name_prefix_(thread_name_prefix), |
@@ -505,6 +511,9 @@ |
trace_id_(0), |
shutdown_called_(false), |
max_blocking_tasks_after_shutdown_(0), |
+ cleanup_state_(CLEANUP_DONE), |
+ cleanup_idlers_(0), |
+ cleanup_cv_(&lock_), |
testing_observer_(observer) {} |
SequencedWorkerPool::Inner::~Inner() { |
@@ -609,10 +618,21 @@ |
return found->second->running_sequence().Equals(sequence_token); |
} |
-void SequencedWorkerPool::Inner::FlushForTesting() { |
+// See https://code.google.com/p/chromium/issues/detail?id=168415 |
+void SequencedWorkerPool::Inner::CleanupForTesting() { |
+ DCHECK(!RunsTasksOnCurrentThread()); |
+ base::ThreadRestrictions::ScopedAllowWait allow_wait; |
AutoLock lock(lock_); |
- while (!IsIdle()) |
- is_idle_cv_.Wait(); |
+ CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
+ if (shutdown_called_) |
+ return; |
+ if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) |
+ return; |
+ cleanup_state_ = CLEANUP_REQUESTED; |
+ cleanup_idlers_ = 0; |
+ has_work_cv_.Signal(); |
+ while (cleanup_state_ != CLEANUP_DONE) |
+ cleanup_cv_.Wait(); |
} |
void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
@@ -624,7 +644,8 @@ |
DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); |
{ |
AutoLock lock(lock_); |
- |
+ // Cleanup and Shutdown should not be called concurrently. |
+ CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
if (shutdown_called_) |
return; |
shutdown_called_ = true; |
@@ -672,6 +693,8 @@ |
base::mac::ScopedNSAutoreleasePool autorelease_pool; |
#endif |
+ HandleCleanup(); |
+ |
// See GetWork for what delete_these_outside_lock is doing. |
SequencedTask task; |
TimeDelta wait_time; |
@@ -720,6 +743,21 @@ |
SequenceToken(), CONTINUE_ON_SHUTDOWN); |
} |
DidRunWorkerTask(task); // Must be done inside the lock. |
+ } else if (cleanup_state_ == CLEANUP_RUNNING) { |
+ switch (status) { |
+ case GET_WORK_WAIT: { |
+ AutoUnlock unlock(lock_); |
+ delete_these_outside_lock.clear(); |
+ } |
+ break; |
+ case GET_WORK_NOT_FOUND: |
+ CHECK(delete_these_outside_lock.empty()); |
+ cleanup_state_ = CLEANUP_FINISHING; |
+ cleanup_cv_.Broadcast(); |
+ break; |
+ default: |
+ NOTREACHED(); |
+ } |
} else { |
// When we're terminating and there's no more work, we can |
// shut down, other workers can complete any pending or new tasks. |
@@ -733,9 +771,6 @@ |
blocking_shutdown_pending_task_count_ == 0) |
break; |
waiting_thread_count_++; |
- // This is the only time that IsIdle() can go to true. |
- if (IsIdle()) |
- is_idle_cv_.Signal(); |
switch (status) { |
case GET_WORK_NOT_FOUND: |
@@ -760,9 +795,44 @@ |
can_shutdown_cv_.Signal(); |
} |
-bool SequencedWorkerPool::Inner::IsIdle() const { |
+void SequencedWorkerPool::Inner::HandleCleanup() { |
lock_.AssertAcquired(); |
- return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); |
+ if (cleanup_state_ == CLEANUP_DONE) |
+ return; |
+ if (cleanup_state_ == CLEANUP_REQUESTED) { |
+ // We win, we get to do the cleanup as soon as the others wise up and idle. |
+ cleanup_state_ = CLEANUP_STARTING; |
+ while (thread_being_created_ || |
+ cleanup_idlers_ != threads_.size() - 1) { |
+ has_work_cv_.Signal(); |
+ cleanup_cv_.Wait(); |
+ } |
+ cleanup_state_ = CLEANUP_RUNNING; |
+ return; |
+ } |
+ if (cleanup_state_ == CLEANUP_STARTING) { |
+ // Another worker thread is cleaning up, we idle here until thats done. |
+ ++cleanup_idlers_; |
+ cleanup_cv_.Broadcast(); |
+ while (cleanup_state_ != CLEANUP_FINISHING) { |
+ cleanup_cv_.Wait(); |
+ } |
+ --cleanup_idlers_; |
+ cleanup_cv_.Broadcast(); |
+ return; |
+ } |
+ if (cleanup_state_ == CLEANUP_FINISHING) { |
+ // We wait for all idlers to wake up prior to being DONE. |
+ while (cleanup_idlers_ != 0) { |
+ cleanup_cv_.Broadcast(); |
+ cleanup_cv_.Wait(); |
+ } |
+ if (cleanup_state_ == CLEANUP_FINISHING) { |
+ cleanup_state_ = CLEANUP_DONE; |
+ cleanup_cv_.Signal(); |
+ } |
+ return; |
+ } |
} |
int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
@@ -866,6 +936,11 @@ |
// The time to run has not come yet. |
*wait_time = i->time_to_run - current_time; |
status = GET_WORK_WAIT; |
+ if (cleanup_state_ == CLEANUP_RUNNING) { |
+ // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. |
+ delete_these_outside_lock->push_back(i->task); |
+ pending_tasks_.erase(i); |
+ } |
break; |
} |
@@ -972,6 +1047,7 @@ |
// shutdown call. |
if (!shutdown_called_ && |
!thread_being_created_ && |
+ cleanup_state_ == CLEANUP_DONE && |
threads_.size() < max_threads_ && |
waiting_thread_count_ == 0) { |
// We could use an additional thread if there's work to be done. |
@@ -1150,7 +1226,7 @@ |
} |
void SequencedWorkerPool::FlushForTesting() { |
- inner_->FlushForTesting(); |
+ inner_->CleanupForTesting(); |
} |
void SequencedWorkerPool::SignalHasWorkForTesting() { |