| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
| 6 | 6 |
| 7 #include <list> | 7 #include <list> |
| 8 #include <map> | 8 #include <map> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
| 233 // around as long as we are running. | 233 // around as long as we are running. |
| 234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
| 235 int thread_number, | 235 int thread_number, |
| 236 const std::string& thread_name_prefix); | 236 const std::string& thread_name_prefix); |
| 237 ~Worker() override; | 237 ~Worker() override; |
| 238 | 238 |
| 239 // SimpleThread implementation. This actually runs the background thread. | 239 // SimpleThread implementation. This actually runs the background thread. |
| 240 void Run() override; | 240 void Run() override; |
| 241 | 241 |
| 242 // Indicates that a task is about to be run. The parameters provide |
| 243 // additional metainformation about the task being run. |
| 242 void set_running_task_info(SequenceToken token, | 244 void set_running_task_info(SequenceToken token, |
| 243 WorkerShutdown shutdown_behavior) { | 245 WorkerShutdown shutdown_behavior) { |
| 244 running_sequence_ = token; | 246 is_processing_task_ = true; |
| 245 running_shutdown_behavior_ = shutdown_behavior; | 247 task_sequence_token_ = token; |
| 248 task_shutdown_behavior_ = shutdown_behavior; |
| 246 } | 249 } |
| 247 | 250 |
| 248 SequenceToken running_sequence() const { | 251 // Indicates that the task has finished running. |
| 249 return running_sequence_; | 252 void reset_running_task_info() { is_processing_task_ = false; } |
| 253 |
| 254 // Whether the worker is processing a task. |
| 255 bool is_processing_task() { return is_processing_task_; } |
| 256 |
| 257 SequenceToken task_sequence_token() const { |
| 258 DCHECK(is_processing_task_); |
| 259 return task_sequence_token_; |
| 250 } | 260 } |
| 251 | 261 |
| 252 WorkerShutdown running_shutdown_behavior() const { | 262 WorkerShutdown task_shutdown_behavior() const { |
| 253 return running_shutdown_behavior_; | 263 DCHECK(is_processing_task_); |
| 264 return task_shutdown_behavior_; |
| 254 } | 265 } |
| 255 | 266 |
| 256 private: | 267 private: |
| 257 scoped_refptr<SequencedWorkerPool> worker_pool_; | 268 scoped_refptr<SequencedWorkerPool> worker_pool_; |
| 258 SequenceToken running_sequence_; | 269 // The sequence token of the task being processed. Only valid when |
| 259 WorkerShutdown running_shutdown_behavior_; | 270 // is_processing_task_ is true. |
| 271 SequenceToken task_sequence_token_; |
| 272 // The shutdown behavior of the task being processed. Only valid when |
| 273 // is_processing_task_ is true. |
| 274 WorkerShutdown task_shutdown_behavior_; |
| 275 // Whether the Worker is processing a task. |
| 276 bool is_processing_task_; |
| 260 | 277 |
| 261 DISALLOW_COPY_AND_ASSIGN(Worker); | 278 DISALLOW_COPY_AND_ASSIGN(Worker); |
| 262 }; | 279 }; |
| 263 | 280 |
| 264 // Inner ---------------------------------------------------------------------- | 281 // Inner ---------------------------------------------------------------------- |
| 265 | 282 |
| 266 class SequencedWorkerPool::Inner { | 283 class SequencedWorkerPool::Inner { |
| 267 public: | 284 public: |
| 268 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 285 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
| 269 // by it). | 286 // by it). |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 319 CLEANUP_DONE, | 336 CLEANUP_DONE, |
| 320 }; | 337 }; |
| 321 | 338 |
| 322 // Called from within the lock, this converts the given token name into a | 339 // Called from within the lock, this converts the given token name into a |
| 323 // token ID, creating a new one if necessary. | 340 // token ID, creating a new one if necessary. |
| 324 int LockedGetNamedTokenID(const std::string& name); | 341 int LockedGetNamedTokenID(const std::string& name); |
| 325 | 342 |
| 326 // Called from within the lock, this returns the next sequence task number. | 343 // Called from within the lock, this returns the next sequence task number. |
| 327 int64 LockedGetNextSequenceTaskNumber(); | 344 int64 LockedGetNextSequenceTaskNumber(); |
| 328 | 345 |
| 329 // Called from within the lock, returns the shutdown behavior of the task | |
| 330 // running on the currently executing worker thread. If invoked from a thread | |
| 331 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. | |
| 332 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; | |
| 333 | |
| 334 // Gets new task. There are 3 cases depending on the return value: | 346 // Gets new task. There are 3 cases depending on the return value: |
| 335 // | 347 // |
| 336 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should | 348 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should |
| 337 // be run immediately. | 349 // be run immediately. |
| 338 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, | 350 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, |
| 339 // and |task| is not filled in. In this case, the caller should wait until | 351 // and |task| is not filled in. In this case, the caller should wait until |
| 340 // a task is posted. | 352 // a task is posted. |
| 341 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 353 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
| 342 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 354 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
| 343 // filled in the time to wait until the next task to run. In this case, the | 355 // filled in the time to wait until the next task to run. In this case, the |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 476 }; | 488 }; |
| 477 | 489 |
| 478 // Worker definitions --------------------------------------------------------- | 490 // Worker definitions --------------------------------------------------------- |
| 479 | 491 |
| 480 SequencedWorkerPool::Worker::Worker( | 492 SequencedWorkerPool::Worker::Worker( |
| 481 const scoped_refptr<SequencedWorkerPool>& worker_pool, | 493 const scoped_refptr<SequencedWorkerPool>& worker_pool, |
| 482 int thread_number, | 494 int thread_number, |
| 483 const std::string& prefix) | 495 const std::string& prefix) |
| 484 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 496 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
| 485 worker_pool_(worker_pool), | 497 worker_pool_(worker_pool), |
| 486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { | 498 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| 499 is_processing_task_(false) { |
| 487 Start(); | 500 Start(); |
| 488 } | 501 } |
| 489 | 502 |
| 490 SequencedWorkerPool::Worker::~Worker() { | 503 SequencedWorkerPool::Worker::~Worker() { |
| 491 } | 504 } |
| 492 | 505 |
| 493 void SequencedWorkerPool::Worker::Run() { | 506 void SequencedWorkerPool::Worker::Run() { |
| 494 #if defined(OS_WIN) | 507 #if defined(OS_WIN) |
| 495 win::ScopedCOMInitializer com_initializer; | 508 win::ScopedCOMInitializer com_initializer; |
| 496 #endif | 509 #endif |
| 497 | 510 |
| 498 // Store a pointer to the running sequence in thread local storage for | 511 // Store a pointer to the running sequence in thread local storage for |
| 499 // static function access. | 512 // static function access. |
| 500 g_lazy_tls_ptr.Get().Set(&running_sequence_); | 513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_); |
| 501 | 514 |
| 502 // Just jump back to the Inner object to run the thread, since it has all the | 515 // Just jump back to the Inner object to run the thread, since it has all the |
| 503 // tracking information and queues. It might be more natural to implement | 516 // tracking information and queues. It might be more natural to implement |
| 504 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
| 505 // having these worker objects at all, but that method lacks the ability to | 518 // having these worker objects at all, but that method lacks the ability to |
| 506 // send thread-specific information easily to the thread loop. | 519 // send thread-specific information easily to the thread loop. |
| 507 worker_pool_->inner_->ThreadLoop(this); | 520 worker_pool_->inner_->ThreadLoop(this); |
| 508 // Release our cyclic reference once we're done. | 521 // Release our cyclic reference once we're done. |
| 509 worker_pool_ = NULL; | 522 worker_pool_ = NULL; |
| 510 } | 523 } |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 576 sequenced.posted_from = from_here; | 589 sequenced.posted_from = from_here; |
| 577 sequenced.task = | 590 sequenced.task = |
| 578 shutdown_behavior == BLOCK_SHUTDOWN ? | 591 shutdown_behavior == BLOCK_SHUTDOWN ? |
| 579 base::MakeCriticalClosure(task) : task; | 592 base::MakeCriticalClosure(task) : task; |
| 580 sequenced.time_to_run = TimeTicks::Now() + delay; | 593 sequenced.time_to_run = TimeTicks::Now() + delay; |
| 581 | 594 |
| 582 int create_thread_id = 0; | 595 int create_thread_id = 0; |
| 583 { | 596 { |
| 584 AutoLock lock(lock_); | 597 AutoLock lock(lock_); |
| 585 if (shutdown_called_) { | 598 if (shutdown_called_) { |
| 586 if (shutdown_behavior != BLOCK_SHUTDOWN || | 599 // Don't allow a new task to be posted if it doesn't block shutdown. |
| 587 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { | 600 if (shutdown_behavior != BLOCK_SHUTDOWN) |
| 601 return false; |
| 602 |
| 603 // If the current thread is running a task, and that task doesn't block |
| 604 // shutdown, then it shouldn't be allowed to post any more tasks. |
| 605 ThreadMap::const_iterator found = |
| 606 threads_.find(PlatformThread::CurrentId()); |
| 607 if (found != threads_.end() && found->second->is_processing_task() && |
| 608 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { |
| 588 return false; | 609 return false; |
| 589 } | 610 } |
| 611 |
| 590 if (max_blocking_tasks_after_shutdown_ <= 0) { | 612 if (max_blocking_tasks_after_shutdown_ <= 0) { |
| 591 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; | 613 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; |
| 592 return false; | 614 return false; |
| 593 } | 615 } |
| 594 max_blocking_tasks_after_shutdown_ -= 1; | 616 max_blocking_tasks_after_shutdown_ -= 1; |
| 595 } | 617 } |
| 596 | 618 |
| 597 // The trace_id is used for identifying the task in about:tracing. | 619 // The trace_id is used for identifying the task in about:tracing. |
| 598 sequenced.trace_id = trace_id_++; | 620 sequenced.trace_id = trace_id_++; |
| 599 | 621 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 628 AutoLock lock(lock_); | 650 AutoLock lock(lock_); |
| 629 return ContainsKey(threads_, PlatformThread::CurrentId()); | 651 return ContainsKey(threads_, PlatformThread::CurrentId()); |
| 630 } | 652 } |
| 631 | 653 |
| 632 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 654 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 633 SequenceToken sequence_token) const { | 655 SequenceToken sequence_token) const { |
| 634 AutoLock lock(lock_); | 656 AutoLock lock(lock_); |
| 635 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 657 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 636 if (found == threads_.end()) | 658 if (found == threads_.end()) |
| 637 return false; | 659 return false; |
| 638 return sequence_token.Equals(found->second->running_sequence()); | 660 return found->second->is_processing_task() && |
| 661 sequence_token.Equals(found->second->task_sequence_token()); |
| 639 } | 662 } |
| 640 | 663 |
| 641 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 664 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
| 642 void SequencedWorkerPool::Inner::CleanupForTesting() { | 665 void SequencedWorkerPool::Inner::CleanupForTesting() { |
| 643 DCHECK(!RunsTasksOnCurrentThread()); | 666 DCHECK(!RunsTasksOnCurrentThread()); |
| 644 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 667 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| 645 AutoLock lock(lock_); | 668 AutoLock lock(lock_); |
| 646 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 669 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
| 647 if (shutdown_called_) | 670 if (shutdown_called_) |
| 648 return; | 671 return; |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 758 tracked_objects::TaskStopwatch stopwatch; | 781 tracked_objects::TaskStopwatch stopwatch; |
| 759 stopwatch.Start(); | 782 stopwatch.Start(); |
| 760 task.task.Run(); | 783 task.task.Run(); |
| 761 stopwatch.Stop(); | 784 stopwatch.Stop(); |
| 762 | 785 |
| 763 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 786 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
| 764 task, stopwatch); | 787 task, stopwatch); |
| 765 | 788 |
| 766 // Make sure our task is erased outside the lock for the | 789 // Make sure our task is erased outside the lock for the |
| 767 // same reason we do this with delete_these_oustide_lock. | 790 // same reason we do this with delete_these_oustide_lock. |
| 768 // Also, do it before calling set_running_task_info() so | 791 // Also, do it before calling reset_running_task_info() so |
| 769 // that sequence-checking from within the task's destructor | 792 // that sequence-checking from within the task's destructor |
| 770 // still works. | 793 // still works. |
| 771 task.task = Closure(); | 794 task.task = Closure(); |
| 772 | 795 |
| 773 this_worker->set_running_task_info( | 796 this_worker->reset_running_task_info(); |
| 774 SequenceToken(), CONTINUE_ON_SHUTDOWN); | |
| 775 } | 797 } |
| 776 DidRunWorkerTask(task); // Must be done inside the lock. | 798 DidRunWorkerTask(task); // Must be done inside the lock. |
| 777 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 799 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
| 778 switch (status) { | 800 switch (status) { |
| 779 case GET_WORK_WAIT: { | 801 case GET_WORK_WAIT: { |
| 780 AutoUnlock unlock(lock_); | 802 AutoUnlock unlock(lock_); |
| 781 delete_these_outside_lock.clear(); | 803 delete_these_outside_lock.clear(); |
| 782 } | 804 } |
| 783 break; | 805 break; |
| 784 case GET_WORK_NOT_FOUND: | 806 case GET_WORK_NOT_FOUND: |
| (...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 897 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); | 919 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); |
| 898 return result.id_; | 920 return result.id_; |
| 899 } | 921 } |
| 900 | 922 |
| 901 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 923 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
| 902 lock_.AssertAcquired(); | 924 lock_.AssertAcquired(); |
| 903 // We assume that we never create enough tasks to wrap around. | 925 // We assume that we never create enough tasks to wrap around. |
| 904 return next_sequence_task_number_++; | 926 return next_sequence_task_number_++; |
| 905 } | 927 } |
| 906 | 928 |
| 907 SequencedWorkerPool::WorkerShutdown | |
| 908 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { | |
| 909 lock_.AssertAcquired(); | |
| 910 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | |
| 911 if (found == threads_.end()) | |
| 912 return CONTINUE_ON_SHUTDOWN; | |
| 913 return found->second->running_shutdown_behavior(); | |
| 914 } | |
| 915 | |
| 916 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 929 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| 917 SequencedTask* task, | 930 SequencedTask* task, |
| 918 TimeDelta* wait_time, | 931 TimeDelta* wait_time, |
| 919 std::vector<Closure>* delete_these_outside_lock) { | 932 std::vector<Closure>* delete_these_outside_lock) { |
| 920 lock_.AssertAcquired(); | 933 lock_.AssertAcquired(); |
| 921 | 934 |
| 922 // Find the next task with a sequence token that's not currently in use. | 935 // Find the next task with a sequence token that's not currently in use. |
| 923 // If the token is in use, that means another thread is running something | 936 // If the token is in use, that means another thread is running something |
| 924 // in that sequence, and we can't run it without going out-of-order. | 937 // in that sequence, and we can't run it without going out-of-order. |
| 925 // | 938 // |
| (...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1289 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1302 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
| 1290 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1303 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
| 1291 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1304 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
| 1292 } | 1305 } |
| 1293 | 1306 |
| 1294 bool SequencedWorkerPool::IsShutdownInProgress() { | 1307 bool SequencedWorkerPool::IsShutdownInProgress() { |
| 1295 return inner_->IsShutdownInProgress(); | 1308 return inner_->IsShutdownInProgress(); |
| 1296 } | 1309 } |
| 1297 | 1310 |
| 1298 } // namespace base | 1311 } // namespace base |
| OLD | NEW |