OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
233 // around as long as we are running. | 233 // around as long as we are running. |
234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
235 int thread_number, | 235 int thread_number, |
236 const std::string& thread_name_prefix); | 236 const std::string& thread_name_prefix); |
237 ~Worker() override; | 237 ~Worker() override; |
238 | 238 |
239 // SimpleThread implementation. This actually runs the background thread. | 239 // SimpleThread implementation. This actually runs the background thread. |
240 void Run() override; | 240 void Run() override; |
241 | 241 |
| 242 // Indicates that a task is about to be run. The parameters provide |
| 243 // additional metainformation about the task being run. |
242 void set_running_task_info(SequenceToken token, | 244 void set_running_task_info(SequenceToken token, |
243 WorkerShutdown shutdown_behavior) { | 245 WorkerShutdown shutdown_behavior) { |
244 running_sequence_ = token; | 246 is_processing_task_ = true; |
245 running_shutdown_behavior_ = shutdown_behavior; | 247 task_sequence_token_ = token; |
| 248 task_shutdown_behavior_ = shutdown_behavior; |
246 } | 249 } |
247 | 250 |
248 SequenceToken running_sequence() const { | 251 // Indicates that the task has finished running. |
249 return running_sequence_; | 252 void reset_running_task_info() { is_processing_task_ = false; } |
| 253 |
| 254 // Whether the worker is processing a task. |
| 255 bool is_processing_task() { return is_processing_task_; } |
| 256 |
| 257 SequenceToken task_sequence_token() const { |
| 258 DCHECK(is_processing_task_); |
| 259 return task_sequence_token_; |
250 } | 260 } |
251 | 261 |
252 WorkerShutdown running_shutdown_behavior() const { | 262 WorkerShutdown task_shutdown_behavior() const { |
253 return running_shutdown_behavior_; | 263 DCHECK(is_processing_task_); |
| 264 return task_shutdown_behavior_; |
254 } | 265 } |
255 | 266 |
256 private: | 267 private: |
257 scoped_refptr<SequencedWorkerPool> worker_pool_; | 268 scoped_refptr<SequencedWorkerPool> worker_pool_; |
258 SequenceToken running_sequence_; | 269 // The sequence token of the task being processed. Only valid when |
259 WorkerShutdown running_shutdown_behavior_; | 270 // is_processing_task_ is true. |
| 271 SequenceToken task_sequence_token_; |
| 272 // The shutdown behavior of the task being processed. Only valid when |
| 273 // is_processing_task_ is true. |
| 274 WorkerShutdown task_shutdown_behavior_; |
| 275 // Whether the Worker is processing a task. |
| 276 bool is_processing_task_; |
260 | 277 |
261 DISALLOW_COPY_AND_ASSIGN(Worker); | 278 DISALLOW_COPY_AND_ASSIGN(Worker); |
262 }; | 279 }; |
263 | 280 |
264 // Inner ---------------------------------------------------------------------- | 281 // Inner ---------------------------------------------------------------------- |
265 | 282 |
266 class SequencedWorkerPool::Inner { | 283 class SequencedWorkerPool::Inner { |
267 public: | 284 public: |
268 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 285 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
269 // by it). | 286 // by it). |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
319 CLEANUP_DONE, | 336 CLEANUP_DONE, |
320 }; | 337 }; |
321 | 338 |
322 // Called from within the lock, this converts the given token name into a | 339 // Called from within the lock, this converts the given token name into a |
323 // token ID, creating a new one if necessary. | 340 // token ID, creating a new one if necessary. |
324 int LockedGetNamedTokenID(const std::string& name); | 341 int LockedGetNamedTokenID(const std::string& name); |
325 | 342 |
326 // Called from within the lock, this returns the next sequence task number. | 343 // Called from within the lock, this returns the next sequence task number. |
327 int64 LockedGetNextSequenceTaskNumber(); | 344 int64 LockedGetNextSequenceTaskNumber(); |
328 | 345 |
329 // Called from within the lock, returns the shutdown behavior of the task | |
330 // running on the currently executing worker thread. If invoked from a thread | |
331 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. | |
332 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; | |
333 | |
334 // Gets new task. There are 3 cases depending on the return value: | 346 // Gets new task. There are 3 cases depending on the return value: |
335 // | 347 // |
336 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should | 348 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should |
337 // be run immediately. | 349 // be run immediately. |
338 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, | 350 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, |
339 // and |task| is not filled in. In this case, the caller should wait until | 351 // and |task| is not filled in. In this case, the caller should wait until |
340 // a task is posted. | 352 // a task is posted. |
341 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run | 353 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
342 // immediately, and |task| is not filled in. Likewise, |wait_time| is | 354 // immediately, and |task| is not filled in. Likewise, |wait_time| is |
343 // filled in the time to wait until the next task to run. In this case, the | 355 // filled in the time to wait until the next task to run. In this case, the |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
476 }; | 488 }; |
477 | 489 |
478 // Worker definitions --------------------------------------------------------- | 490 // Worker definitions --------------------------------------------------------- |
479 | 491 |
480 SequencedWorkerPool::Worker::Worker( | 492 SequencedWorkerPool::Worker::Worker( |
481 const scoped_refptr<SequencedWorkerPool>& worker_pool, | 493 const scoped_refptr<SequencedWorkerPool>& worker_pool, |
482 int thread_number, | 494 int thread_number, |
483 const std::string& prefix) | 495 const std::string& prefix) |
484 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), | 496 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), |
485 worker_pool_(worker_pool), | 497 worker_pool_(worker_pool), |
486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { | 498 task_shutdown_behavior_(BLOCK_SHUTDOWN), |
| 499 is_processing_task_(false) { |
487 Start(); | 500 Start(); |
488 } | 501 } |
489 | 502 |
490 SequencedWorkerPool::Worker::~Worker() { | 503 SequencedWorkerPool::Worker::~Worker() { |
491 } | 504 } |
492 | 505 |
493 void SequencedWorkerPool::Worker::Run() { | 506 void SequencedWorkerPool::Worker::Run() { |
494 #if defined(OS_WIN) | 507 #if defined(OS_WIN) |
495 win::ScopedCOMInitializer com_initializer; | 508 win::ScopedCOMInitializer com_initializer; |
496 #endif | 509 #endif |
497 | 510 |
498 // Store a pointer to the running sequence in thread local storage for | 511 // Store a pointer to the running sequence in thread local storage for |
499 // static function access. | 512 // static function access. |
500 g_lazy_tls_ptr.Get().Set(&running_sequence_); | 513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_); |
501 | 514 |
502 // Just jump back to the Inner object to run the thread, since it has all the | 515 // Just jump back to the Inner object to run the thread, since it has all the |
503 // tracking information and queues. It might be more natural to implement | 516 // tracking information and queues. It might be more natural to implement |
504 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
505 // having these worker objects at all, but that method lacks the ability to | 518 // having these worker objects at all, but that method lacks the ability to |
506 // send thread-specific information easily to the thread loop. | 519 // send thread-specific information easily to the thread loop. |
507 worker_pool_->inner_->ThreadLoop(this); | 520 worker_pool_->inner_->ThreadLoop(this); |
508 // Release our cyclic reference once we're done. | 521 // Release our cyclic reference once we're done. |
509 worker_pool_ = NULL; | 522 worker_pool_ = NULL; |
510 } | 523 } |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
576 sequenced.posted_from = from_here; | 589 sequenced.posted_from = from_here; |
577 sequenced.task = | 590 sequenced.task = |
578 shutdown_behavior == BLOCK_SHUTDOWN ? | 591 shutdown_behavior == BLOCK_SHUTDOWN ? |
579 base::MakeCriticalClosure(task) : task; | 592 base::MakeCriticalClosure(task) : task; |
580 sequenced.time_to_run = TimeTicks::Now() + delay; | 593 sequenced.time_to_run = TimeTicks::Now() + delay; |
581 | 594 |
582 int create_thread_id = 0; | 595 int create_thread_id = 0; |
583 { | 596 { |
584 AutoLock lock(lock_); | 597 AutoLock lock(lock_); |
585 if (shutdown_called_) { | 598 if (shutdown_called_) { |
586 if (shutdown_behavior != BLOCK_SHUTDOWN || | 599 // Don't allow a new task to be posted if it doesn't block shutdown. |
587 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { | 600 if (shutdown_behavior != BLOCK_SHUTDOWN) |
| 601 return false; |
| 602 |
| 603 // If the current thread is running a task, and that task doesn't block |
| 604 // shutdown, then it shouldn't be allowed to post any more tasks. |
| 605 ThreadMap::const_iterator found = |
| 606 threads_.find(PlatformThread::CurrentId()); |
| 607 if (found != threads_.end() && found->second->is_processing_task() && |
| 608 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { |
588 return false; | 609 return false; |
589 } | 610 } |
| 611 |
590 if (max_blocking_tasks_after_shutdown_ <= 0) { | 612 if (max_blocking_tasks_after_shutdown_ <= 0) { |
591 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; | 613 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; |
592 return false; | 614 return false; |
593 } | 615 } |
594 max_blocking_tasks_after_shutdown_ -= 1; | 616 max_blocking_tasks_after_shutdown_ -= 1; |
595 } | 617 } |
596 | 618 |
597 // The trace_id is used for identifying the task in about:tracing. | 619 // The trace_id is used for identifying the task in about:tracing. |
598 sequenced.trace_id = trace_id_++; | 620 sequenced.trace_id = trace_id_++; |
599 | 621 |
(...skipping 28 matching lines...) Expand all Loading... |
628 AutoLock lock(lock_); | 650 AutoLock lock(lock_); |
629 return ContainsKey(threads_, PlatformThread::CurrentId()); | 651 return ContainsKey(threads_, PlatformThread::CurrentId()); |
630 } | 652 } |
631 | 653 |
632 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | 654 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
633 SequenceToken sequence_token) const { | 655 SequenceToken sequence_token) const { |
634 AutoLock lock(lock_); | 656 AutoLock lock(lock_); |
635 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | 657 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
636 if (found == threads_.end()) | 658 if (found == threads_.end()) |
637 return false; | 659 return false; |
638 return sequence_token.Equals(found->second->running_sequence()); | 660 return found->second->is_processing_task() && |
| 661 sequence_token.Equals(found->second->task_sequence_token()); |
639 } | 662 } |
640 | 663 |
641 // See https://code.google.com/p/chromium/issues/detail?id=168415 | 664 // See https://code.google.com/p/chromium/issues/detail?id=168415 |
642 void SequencedWorkerPool::Inner::CleanupForTesting() { | 665 void SequencedWorkerPool::Inner::CleanupForTesting() { |
643 DCHECK(!RunsTasksOnCurrentThread()); | 666 DCHECK(!RunsTasksOnCurrentThread()); |
644 base::ThreadRestrictions::ScopedAllowWait allow_wait; | 667 base::ThreadRestrictions::ScopedAllowWait allow_wait; |
645 AutoLock lock(lock_); | 668 AutoLock lock(lock_); |
646 CHECK_EQ(CLEANUP_DONE, cleanup_state_); | 669 CHECK_EQ(CLEANUP_DONE, cleanup_state_); |
647 if (shutdown_called_) | 670 if (shutdown_called_) |
648 return; | 671 return; |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
758 tracked_objects::TaskStopwatch stopwatch; | 781 tracked_objects::TaskStopwatch stopwatch; |
759 stopwatch.Start(); | 782 stopwatch.Start(); |
760 task.task.Run(); | 783 task.task.Run(); |
761 stopwatch.Stop(); | 784 stopwatch.Stop(); |
762 | 785 |
763 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( | 786 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( |
764 task, stopwatch); | 787 task, stopwatch); |
765 | 788 |
766 // Make sure our task is erased outside the lock for the | 789 // Make sure our task is erased outside the lock for the |
767 // same reason we do this with delete_these_oustide_lock. | 790 // same reason we do this with delete_these_oustide_lock. |
768 // Also, do it before calling set_running_task_info() so | 791 // Also, do it before calling reset_running_task_info() so |
769 // that sequence-checking from within the task's destructor | 792 // that sequence-checking from within the task's destructor |
770 // still works. | 793 // still works. |
771 task.task = Closure(); | 794 task.task = Closure(); |
772 | 795 |
773 this_worker->set_running_task_info( | 796 this_worker->reset_running_task_info(); |
774 SequenceToken(), CONTINUE_ON_SHUTDOWN); | |
775 } | 797 } |
776 DidRunWorkerTask(task); // Must be done inside the lock. | 798 DidRunWorkerTask(task); // Must be done inside the lock. |
777 } else if (cleanup_state_ == CLEANUP_RUNNING) { | 799 } else if (cleanup_state_ == CLEANUP_RUNNING) { |
778 switch (status) { | 800 switch (status) { |
779 case GET_WORK_WAIT: { | 801 case GET_WORK_WAIT: { |
780 AutoUnlock unlock(lock_); | 802 AutoUnlock unlock(lock_); |
781 delete_these_outside_lock.clear(); | 803 delete_these_outside_lock.clear(); |
782 } | 804 } |
783 break; | 805 break; |
784 case GET_WORK_NOT_FOUND: | 806 case GET_WORK_NOT_FOUND: |
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
897 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); | 919 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); |
898 return result.id_; | 920 return result.id_; |
899 } | 921 } |
900 | 922 |
901 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { | 923 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
902 lock_.AssertAcquired(); | 924 lock_.AssertAcquired(); |
903 // We assume that we never create enough tasks to wrap around. | 925 // We assume that we never create enough tasks to wrap around. |
904 return next_sequence_task_number_++; | 926 return next_sequence_task_number_++; |
905 } | 927 } |
906 | 928 |
907 SequencedWorkerPool::WorkerShutdown | |
908 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { | |
909 lock_.AssertAcquired(); | |
910 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); | |
911 if (found == threads_.end()) | |
912 return CONTINUE_ON_SHUTDOWN; | |
913 return found->second->running_shutdown_behavior(); | |
914 } | |
915 | |
916 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( | 929 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
917 SequencedTask* task, | 930 SequencedTask* task, |
918 TimeDelta* wait_time, | 931 TimeDelta* wait_time, |
919 std::vector<Closure>* delete_these_outside_lock) { | 932 std::vector<Closure>* delete_these_outside_lock) { |
920 lock_.AssertAcquired(); | 933 lock_.AssertAcquired(); |
921 | 934 |
922 // Find the next task with a sequence token that's not currently in use. | 935 // Find the next task with a sequence token that's not currently in use. |
923 // If the token is in use, that means another thread is running something | 936 // If the token is in use, that means another thread is running something |
924 // in that sequence, and we can't run it without going out-of-order. | 937 // in that sequence, and we can't run it without going out-of-order. |
925 // | 938 // |
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1289 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { | 1302 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { |
1290 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 1303 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
1291 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); | 1304 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); |
1292 } | 1305 } |
1293 | 1306 |
1294 bool SequencedWorkerPool::IsShutdownInProgress() { | 1307 bool SequencedWorkerPool::IsShutdownInProgress() { |
1295 return inner_->IsShutdownInProgress(); | 1308 return inner_->IsShutdownInProgress(); |
1296 } | 1309 } |
1297 | 1310 |
1298 } // namespace base | 1311 } // namespace base |
OLD | NEW |