| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <deque> | 7 #include <deque> |
| 8 #include <set> | 8 #include <set> |
| 9 | 9 |
| 10 #include "base/atomicops.h" | 10 #include "base/atomicops.h" |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 50 | 50 |
| 51 | 51 |
| 52 // Inner ---------------------------------------------------------------------- | 52 // Inner ---------------------------------------------------------------------- |
| 53 | 53 |
| 54 class SequencedWorkerPool::Inner | 54 class SequencedWorkerPool::Inner |
| 55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { | 55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { |
| 56 public: | 56 public: |
| 57 Inner(size_t max_threads, const std::string& thread_name_prefix); | 57 Inner(size_t max_threads, const std::string& thread_name_prefix); |
| 58 virtual ~Inner(); | 58 virtual ~Inner(); |
| 59 | 59 |
| 60 // Backends for SequenceWorkerPool. | |
| 61 SequenceToken GetSequenceToken(); | 60 SequenceToken GetSequenceToken(); |
| 61 |
| 62 SequenceToken GetNamedSequenceToken(const std::string& name); | 62 SequenceToken GetNamedSequenceToken(const std::string& name); |
| 63 bool PostTask(int sequence_token_id, | 63 |
| 64 // This function accepts a name and an ID. If the name is null, the |
| 65 // token ID is used. This allows us to implement the optional name lookup |
| 66 // from a single function without having to enter the lock a separate time. |
| 67 bool PostTask(const std::string* optional_token_name, |
| 68 int sequence_token_id, |
| 64 SequencedWorkerPool::WorkerShutdown shutdown_behavior, | 69 SequencedWorkerPool::WorkerShutdown shutdown_behavior, |
| 65 const tracked_objects::Location& from_here, | 70 const tracked_objects::Location& from_here, |
| 66 const base::Closure& task); | 71 const base::Closure& task); |
| 72 |
| 67 void Shutdown(); | 73 void Shutdown(); |
| 74 |
| 68 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); | 75 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); |
| 69 | 76 |
| 70 // Runs the worker loop on the background thread. | 77 // Runs the worker loop on the background thread. |
| 71 void ThreadLoop(Worker* this_worker); | 78 void ThreadLoop(Worker* this_worker); |
| 72 | 79 |
| 73 private: | 80 private: |
| 81 // Called from within the lock, this converts the given token name into a |
| 82 // token ID, creating a new one if necessary. |
| 83 int LockedGetNamedTokenID(const std::string& name); |
| 84 |
| 74 // The calling code should clear the given delete_these_oustide_lock | 85 // The calling code should clear the given delete_these_oustide_lock |
| 75 // vector the next time the lock is released. See the implementation for | 86 // vector the next time the lock is released. See the implementation for |
| 76 // a more detailed description. | 87 // a more detailed description. |
| 77 bool GetWork(SequencedTask* task, | 88 bool GetWork(SequencedTask* task, |
| 78 std::vector<base::Closure>* delete_these_outside_lock); | 89 std::vector<base::Closure>* delete_these_outside_lock); |
| 79 | 90 |
| 80 // Peforms init and cleanup around running the given task. WillRun... | 91 // Peforms init and cleanup around running the given task. WillRun... |
| 81 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 92 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| 82 // The calling code should call FinishStartingAdditionalThread once the | 93 // The calling code should call FinishStartingAdditionalThread once the |
| 83 // lock is released if the return values is nonzero. | 94 // lock is released if the return values is nonzero. |
| (...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 228 SequencedWorkerPool::Inner::GetSequenceToken() { | 239 SequencedWorkerPool::Inner::GetSequenceToken() { |
| 229 base::subtle::Atomic32 result = | 240 base::subtle::Atomic32 result = |
| 230 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); | 241 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
| 231 return SequenceToken(static_cast<int>(result)); | 242 return SequenceToken(static_cast<int>(result)); |
| 232 } | 243 } |
| 233 | 244 |
| 234 SequencedWorkerPool::SequenceToken | 245 SequencedWorkerPool::SequenceToken |
| 235 SequencedWorkerPool::Inner::GetNamedSequenceToken( | 246 SequencedWorkerPool::Inner::GetNamedSequenceToken( |
| 236 const std::string& name) { | 247 const std::string& name) { |
| 237 base::AutoLock lock(lock_); | 248 base::AutoLock lock(lock_); |
| 238 std::map<std::string, int>::const_iterator found = | 249 return SequenceToken(LockedGetNamedTokenID(name)); |
| 239 named_sequence_tokens_.find(name); | |
| 240 if (found != named_sequence_tokens_.end()) | |
| 241 return SequenceToken(found->second); // Got an existing one. | |
| 242 | |
| 243 // Create a new one for this name. | |
| 244 SequenceToken result = GetSequenceToken(); | |
| 245 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); | |
| 246 return result; | |
| 247 } | 250 } |
| 248 | 251 |
| 249 bool SequencedWorkerPool::Inner::PostTask( | 252 bool SequencedWorkerPool::Inner::PostTask( |
| 253 const std::string* optional_token_name, |
| 250 int sequence_token_id, | 254 int sequence_token_id, |
| 251 SequencedWorkerPool::WorkerShutdown shutdown_behavior, | 255 SequencedWorkerPool::WorkerShutdown shutdown_behavior, |
| 252 const tracked_objects::Location& from_here, | 256 const tracked_objects::Location& from_here, |
| 253 const base::Closure& task) { | 257 const base::Closure& task) { |
| 254 SequencedTask sequenced; | 258 SequencedTask sequenced; |
| 255 sequenced.sequence_token_id = sequence_token_id; | 259 sequenced.sequence_token_id = sequence_token_id; |
| 256 sequenced.shutdown_behavior = shutdown_behavior; | 260 sequenced.shutdown_behavior = shutdown_behavior; |
| 257 sequenced.location = from_here; | 261 sequenced.location = from_here; |
| 258 sequenced.task = task; | 262 sequenced.task = task; |
| 259 | 263 |
| 260 int create_thread_id = 0; | 264 int create_thread_id = 0; |
| 261 { | 265 { |
| 262 base::AutoLock lock(lock_); | 266 base::AutoLock lock(lock_); |
| 263 if (terminating_) | 267 if (terminating_) |
| 264 return false; | 268 return false; |
| 265 | 269 |
| 270 // Now that we have the lock, apply the named token rules. |
| 271 if (optional_token_name) |
| 272 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| 273 |
| 266 pending_tasks_.push_back(sequenced); | 274 pending_tasks_.push_back(sequenced); |
| 267 pending_task_count_++; | 275 pending_task_count_++; |
| 268 if (shutdown_behavior == BLOCK_SHUTDOWN) | 276 if (shutdown_behavior == BLOCK_SHUTDOWN) |
| 269 blocking_shutdown_pending_task_count_++; | 277 blocking_shutdown_pending_task_count_++; |
| 270 | 278 |
| 271 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 279 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| 272 } | 280 } |
| 273 | 281 |
| 274 // Actually start the additional thread or signal an existing one now that | 282 // Actually start the additional thread or signal an existing one now that |
| 275 // we're outside the lock. | 283 // we're outside the lock. |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 370 waiting_thread_count_--; | 378 waiting_thread_count_--; |
| 371 } | 379 } |
| 372 } | 380 } |
| 373 } | 381 } |
| 374 | 382 |
| 375 // We noticed we should exit. Wake up the next worker so it knows it should | 383 // We noticed we should exit. Wake up the next worker so it knows it should |
| 376 // exit as well (because the Shutdown() code only signals once). | 384 // exit as well (because the Shutdown() code only signals once). |
| 377 cond_var_.Signal(); | 385 cond_var_.Signal(); |
| 378 } | 386 } |
| 379 | 387 |
| 388 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| 389 const std::string& name) { |
| 390 lock_.AssertAcquired(); |
| 391 DCHECK(!name.empty()); |
| 392 |
| 393 std::map<std::string, int>::const_iterator found = |
| 394 named_sequence_tokens_.find(name); |
| 395 if (found != named_sequence_tokens_.end()) |
| 396 return found->second; // Got an existing one. |
| 397 |
| 398 // Create a new one for this name. |
| 399 SequenceToken result = GetSequenceToken(); |
| 400 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); |
| 401 return result.id_; |
| 402 } |
| 403 |
| 380 bool SequencedWorkerPool::Inner::GetWork( | 404 bool SequencedWorkerPool::Inner::GetWork( |
| 381 SequencedTask* task, | 405 SequencedTask* task, |
| 382 std::vector<base::Closure>* delete_these_outside_lock) { | 406 std::vector<base::Closure>* delete_these_outside_lock) { |
| 383 lock_.AssertAcquired(); | 407 lock_.AssertAcquired(); |
| 384 | 408 |
| 385 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); | 409 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); |
| 386 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", | 410 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", |
| 387 static_cast<int>(pending_task_count_)); | 411 static_cast<int>(pending_task_count_)); |
| 388 | 412 |
| 389 // Find the next task with a sequence token that's not currently in use. | 413 // Find the next task with a sequence token that's not currently in use. |
| (...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 586 } | 610 } |
| 587 | 611 |
| 588 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 612 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
| 589 const std::string& name) { | 613 const std::string& name) { |
| 590 return inner_->GetNamedSequenceToken(name); | 614 return inner_->GetNamedSequenceToken(name); |
| 591 } | 615 } |
| 592 | 616 |
| 593 bool SequencedWorkerPool::PostWorkerTask( | 617 bool SequencedWorkerPool::PostWorkerTask( |
| 594 const tracked_objects::Location& from_here, | 618 const tracked_objects::Location& from_here, |
| 595 const base::Closure& task) { | 619 const base::Closure& task) { |
| 596 return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task); | 620 return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); |
| 597 } | 621 } |
| 598 | 622 |
| 599 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( | 623 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( |
| 600 const tracked_objects::Location& from_here, | 624 const tracked_objects::Location& from_here, |
| 601 const base::Closure& task, | 625 const base::Closure& task, |
| 602 WorkerShutdown shutdown_behavior) { | 626 WorkerShutdown shutdown_behavior) { |
| 603 return inner_->PostTask(0, shutdown_behavior, from_here, task); | 627 return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); |
| 604 } | 628 } |
| 605 | 629 |
| 606 bool SequencedWorkerPool::PostSequencedWorkerTask( | 630 bool SequencedWorkerPool::PostSequencedWorkerTask( |
| 607 SequenceToken sequence_token, | 631 SequenceToken sequence_token, |
| 608 const tracked_objects::Location& from_here, | 632 const tracked_objects::Location& from_here, |
| 609 const base::Closure& task) { | 633 const base::Closure& task) { |
| 610 return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN, | 634 return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, |
| 611 from_here, task); | 635 from_here, task); |
| 612 } | 636 } |
| 613 | 637 |
| 638 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( |
| 639 const std::string& token_name, |
| 640 const tracked_objects::Location& from_here, |
| 641 const base::Closure& task) { |
| 642 DCHECK(!token_name.empty()); |
| 643 return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); |
| 644 } |
| 645 |
| 614 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( | 646 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( |
| 615 SequenceToken sequence_token, | 647 SequenceToken sequence_token, |
| 616 const tracked_objects::Location& from_here, | 648 const tracked_objects::Location& from_here, |
| 617 const base::Closure& task, | 649 const base::Closure& task, |
| 618 WorkerShutdown shutdown_behavior) { | 650 WorkerShutdown shutdown_behavior) { |
| 619 return inner_->PostTask(sequence_token.id_, shutdown_behavior, | 651 return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, |
| 620 from_here, task); | 652 from_here, task); |
| 621 } | 653 } |
| 622 | 654 |
| 623 void SequencedWorkerPool::Shutdown() { | 655 void SequencedWorkerPool::Shutdown() { |
| 624 inner_->Shutdown(); | 656 inner_->Shutdown(); |
| 625 } | 657 } |
| 626 | 658 |
| 627 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { | 659 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { |
| 628 inner_->SetTestingObserver(observer); | 660 inner_->SetTestingObserver(observer); |
| 629 } | 661 } |
| 630 | 662 |
| 631 } // namespace base | 663 } // namespace base |
| OLD | NEW |