| Index: base/threading/sequenced_worker_pool.cc
|
| ===================================================================
|
| --- base/threading/sequenced_worker_pool.cc (revision 186525)
|
| +++ base/threading/sequenced_worker_pool.cc (working copy)
|
| @@ -281,7 +281,7 @@
|
|
|
| bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
|
|
|
| - void FlushForTesting();
|
| + void CleanupForTesting();
|
|
|
| void SignalHasWorkForTesting();
|
|
|
| @@ -299,9 +299,13 @@
|
| GET_WORK_WAIT,
|
| };
|
|
|
| - // Returns whether there are no more pending tasks and all threads
|
| - // are idle. Must be called under lock.
|
| - bool IsIdle() const;
|
| + enum CleanupState {
|
| + CLEANUP_REQUESTED,
|
| + CLEANUP_STARTING,
|
| + CLEANUP_RUNNING,
|
| + CLEANUP_FINISHING,
|
| + CLEANUP_DONE,
|
| + };
|
|
|
| // Called from within the lock, this converts the given token name into a
|
| // token ID, creating a new one if necessary.
|
| @@ -334,6 +338,8 @@
|
| TimeDelta* wait_time,
|
| std::vector<Closure>* delete_these_outside_lock);
|
|
|
| + void HandleCleanup();
|
| +
|
| // Peforms init and cleanup around running the given task. WillRun...
|
| // returns the value from PrepareToStartAdditionalThreadIfNecessary.
|
| // The calling code should call FinishStartingAdditionalThread once the
|
| @@ -389,10 +395,6 @@
|
| ConditionVariable has_work_cv_;
|
|
|
| // Condition variable that is waited on by non-worker threads (in
|
| - // FlushForTesting()) until IsIdle() goes to true.
|
| - ConditionVariable is_idle_cv_;
|
| -
|
| - // Condition variable that is waited on by non-worker threads (in
|
| // Shutdown()) until CanShutdown() goes to true.
|
| ConditionVariable can_shutdown_cv_;
|
|
|
| @@ -450,6 +452,11 @@
|
| // has been called.
|
| int max_blocking_tasks_after_shutdown_;
|
|
|
| + // State used to cleanup for testing, all guarded by lock_.
|
| + CleanupState cleanup_state_;
|
| + size_t cleanup_idlers_;
|
| + ConditionVariable cleanup_cv_;
|
| +
|
| TestingObserver* const testing_observer_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Inner);
|
| @@ -493,7 +500,6 @@
|
| last_sequence_number_(0),
|
| lock_(),
|
| has_work_cv_(&lock_),
|
| - is_idle_cv_(&lock_),
|
| can_shutdown_cv_(&lock_),
|
| max_threads_(max_threads),
|
| thread_name_prefix_(thread_name_prefix),
|
| @@ -505,6 +511,9 @@
|
| trace_id_(0),
|
| shutdown_called_(false),
|
| max_blocking_tasks_after_shutdown_(0),
|
| + cleanup_state_(CLEANUP_DONE),
|
| + cleanup_idlers_(0),
|
| + cleanup_cv_(&lock_),
|
| testing_observer_(observer) {}
|
|
|
| SequencedWorkerPool::Inner::~Inner() {
|
| @@ -609,10 +618,21 @@
|
| return found->second->running_sequence().Equals(sequence_token);
|
| }
|
|
|
| -void SequencedWorkerPool::Inner::FlushForTesting() {
|
| +// See https://code.google.com/p/chromium/issues/detail?id=168415
|
| +void SequencedWorkerPool::Inner::CleanupForTesting() {
|
| + DCHECK(!RunsTasksOnCurrentThread());
|
| + base::ThreadRestrictions::ScopedAllowWait allow_wait;
|
| AutoLock lock(lock_);
|
| - while (!IsIdle())
|
| - is_idle_cv_.Wait();
|
| + CHECK_EQ(CLEANUP_DONE, cleanup_state_);
|
| + if (shutdown_called_)
|
| + return;
|
| + if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
|
| + return;
|
| + cleanup_state_ = CLEANUP_REQUESTED;
|
| + cleanup_idlers_ = 0;
|
| + has_work_cv_.Signal();
|
| + while (cleanup_state_ != CLEANUP_DONE)
|
| + cleanup_cv_.Wait();
|
| }
|
|
|
| void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
|
| @@ -624,7 +644,8 @@
|
| DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
|
| {
|
| AutoLock lock(lock_);
|
| -
|
| + // Cleanup and Shutdown should not be called concurrently.
|
| + CHECK_EQ(CLEANUP_DONE, cleanup_state_);
|
| if (shutdown_called_)
|
| return;
|
| shutdown_called_ = true;
|
| @@ -672,6 +693,8 @@
|
| base::mac::ScopedNSAutoreleasePool autorelease_pool;
|
| #endif
|
|
|
| + HandleCleanup();
|
| +
|
| // See GetWork for what delete_these_outside_lock is doing.
|
| SequencedTask task;
|
| TimeDelta wait_time;
|
| @@ -720,6 +743,21 @@
|
| SequenceToken(), CONTINUE_ON_SHUTDOWN);
|
| }
|
| DidRunWorkerTask(task); // Must be done inside the lock.
|
| + } else if (cleanup_state_ == CLEANUP_RUNNING) {
|
| + switch (status) {
|
| + case GET_WORK_WAIT: {
|
| + AutoUnlock unlock(lock_);
|
| + delete_these_outside_lock.clear();
|
| + }
|
| + break;
|
| + case GET_WORK_NOT_FOUND:
|
| + CHECK(delete_these_outside_lock.empty());
|
| + cleanup_state_ = CLEANUP_FINISHING;
|
| + cleanup_cv_.Broadcast();
|
| + break;
|
| + default:
|
| + NOTREACHED();
|
| + }
|
| } else {
|
| // When we're terminating and there's no more work, we can
|
| // shut down, other workers can complete any pending or new tasks.
|
| @@ -733,9 +771,6 @@
|
| blocking_shutdown_pending_task_count_ == 0)
|
| break;
|
| waiting_thread_count_++;
|
| - // This is the only time that IsIdle() can go to true.
|
| - if (IsIdle())
|
| - is_idle_cv_.Signal();
|
|
|
| switch (status) {
|
| case GET_WORK_NOT_FOUND:
|
| @@ -760,9 +795,44 @@
|
| can_shutdown_cv_.Signal();
|
| }
|
|
|
| -bool SequencedWorkerPool::Inner::IsIdle() const {
|
| +void SequencedWorkerPool::Inner::HandleCleanup() {
|
| lock_.AssertAcquired();
|
| - return pending_tasks_.empty() && waiting_thread_count_ == threads_.size();
|
| + if (cleanup_state_ == CLEANUP_DONE)
|
| + return;
|
| + if (cleanup_state_ == CLEANUP_REQUESTED) {
|
| + // We win, we get to do the cleanup as soon as the others wise up and idle.
|
| + cleanup_state_ = CLEANUP_STARTING;
|
| + while (thread_being_created_ ||
|
| + cleanup_idlers_ != threads_.size() - 1) {
|
| + has_work_cv_.Signal();
|
| + cleanup_cv_.Wait();
|
| + }
|
| + cleanup_state_ = CLEANUP_RUNNING;
|
| + return;
|
| + }
|
| + if (cleanup_state_ == CLEANUP_STARTING) {
|
| + // Another worker thread is cleaning up, we idle here until thats done.
|
| + ++cleanup_idlers_;
|
| + cleanup_cv_.Broadcast();
|
| + while (cleanup_state_ != CLEANUP_FINISHING) {
|
| + cleanup_cv_.Wait();
|
| + }
|
| + --cleanup_idlers_;
|
| + cleanup_cv_.Broadcast();
|
| + return;
|
| + }
|
| + if (cleanup_state_ == CLEANUP_FINISHING) {
|
| + // We wait for all idlers to wake up prior to being DONE.
|
| + while (cleanup_idlers_ != 0) {
|
| + cleanup_cv_.Broadcast();
|
| + cleanup_cv_.Wait();
|
| + }
|
| + if (cleanup_state_ == CLEANUP_FINISHING) {
|
| + cleanup_state_ = CLEANUP_DONE;
|
| + cleanup_cv_.Signal();
|
| + }
|
| + return;
|
| + }
|
| }
|
|
|
| int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
|
| @@ -866,6 +936,11 @@
|
| // The time to run has not come yet.
|
| *wait_time = i->time_to_run - current_time;
|
| status = GET_WORK_WAIT;
|
| + if (cleanup_state_ == CLEANUP_RUNNING) {
|
| + // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
|
| + delete_these_outside_lock->push_back(i->task);
|
| + pending_tasks_.erase(i);
|
| + }
|
| break;
|
| }
|
|
|
| @@ -972,6 +1047,7 @@
|
| // shutdown call.
|
| if (!shutdown_called_ &&
|
| !thread_being_created_ &&
|
| + cleanup_state_ == CLEANUP_DONE &&
|
| threads_.size() < max_threads_ &&
|
| waiting_thread_count_ == 0) {
|
| // We could use an additional thread if there's work to be done.
|
| @@ -1150,7 +1226,7 @@
|
| }
|
|
|
| void SequencedWorkerPool::FlushForTesting() {
|
| - inner_->FlushForTesting();
|
| + inner_->CleanupForTesting();
|
| }
|
|
|
| void SequencedWorkerPool::SignalHasWorkForTesting() {
|
|
|