Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(372)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 1124763003: Update from https://crrev.com/327068 (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: update nacl, buildtools, fix display_change_notifier_unittest Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
233 // around as long as we are running. 233 // around as long as we are running.
234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
235 int thread_number, 235 int thread_number,
236 const std::string& thread_name_prefix); 236 const std::string& thread_name_prefix);
237 ~Worker() override; 237 ~Worker() override;
238 238
239 // SimpleThread implementation. This actually runs the background thread. 239 // SimpleThread implementation. This actually runs the background thread.
240 void Run() override; 240 void Run() override;
241 241
242 // Indicates that a task is about to be run. The parameters provide
243 // additional metainformation about the task being run.
242 void set_running_task_info(SequenceToken token, 244 void set_running_task_info(SequenceToken token,
243 WorkerShutdown shutdown_behavior) { 245 WorkerShutdown shutdown_behavior) {
244 running_sequence_ = token; 246 is_processing_task_ = true;
245 running_shutdown_behavior_ = shutdown_behavior; 247 task_sequence_token_ = token;
248 task_shutdown_behavior_ = shutdown_behavior;
246 } 249 }
247 250
248 SequenceToken running_sequence() const { 251 // Indicates that the task has finished running.
249 return running_sequence_; 252 void reset_running_task_info() { is_processing_task_ = false; }
253
254 // Whether the worker is processing a task.
255 bool is_processing_task() { return is_processing_task_; }
256
257 SequenceToken task_sequence_token() const {
258 DCHECK(is_processing_task_);
259 return task_sequence_token_;
250 } 260 }
251 261
252 WorkerShutdown running_shutdown_behavior() const { 262 WorkerShutdown task_shutdown_behavior() const {
253 return running_shutdown_behavior_; 263 DCHECK(is_processing_task_);
264 return task_shutdown_behavior_;
254 } 265 }
255 266
256 private: 267 private:
257 scoped_refptr<SequencedWorkerPool> worker_pool_; 268 scoped_refptr<SequencedWorkerPool> worker_pool_;
258 SequenceToken running_sequence_; 269 // The sequence token of the task being processed. Only valid when
259 WorkerShutdown running_shutdown_behavior_; 270 // is_processing_task_ is true.
271 SequenceToken task_sequence_token_;
272 // The shutdown behavior of the task being processed. Only valid when
273 // is_processing_task_ is true.
274 WorkerShutdown task_shutdown_behavior_;
275 // Whether the Worker is processing a task.
276 bool is_processing_task_;
260 277
261 DISALLOW_COPY_AND_ASSIGN(Worker); 278 DISALLOW_COPY_AND_ASSIGN(Worker);
262 }; 279 };
263 280
264 // Inner ---------------------------------------------------------------------- 281 // Inner ----------------------------------------------------------------------
265 282
266 class SequencedWorkerPool::Inner { 283 class SequencedWorkerPool::Inner {
267 public: 284 public:
268 // Take a raw pointer to |worker| to avoid cycles (since we're owned 285 // Take a raw pointer to |worker| to avoid cycles (since we're owned
269 // by it). 286 // by it).
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
319 CLEANUP_DONE, 336 CLEANUP_DONE,
320 }; 337 };
321 338
322 // Called from within the lock, this converts the given token name into a 339 // Called from within the lock, this converts the given token name into a
323 // token ID, creating a new one if necessary. 340 // token ID, creating a new one if necessary.
324 int LockedGetNamedTokenID(const std::string& name); 341 int LockedGetNamedTokenID(const std::string& name);
325 342
326 // Called from within the lock, this returns the next sequence task number. 343 // Called from within the lock, this returns the next sequence task number.
327 int64 LockedGetNextSequenceTaskNumber(); 344 int64 LockedGetNextSequenceTaskNumber();
328 345
329 // Called from within the lock, returns the shutdown behavior of the task
330 // running on the currently executing worker thread. If invoked from a thread
331 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
332 WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
333
334 // Gets new task. There are 3 cases depending on the return value: 346 // Gets new task. There are 3 cases depending on the return value:
335 // 347 //
336 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 348 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
337 // be run immediately. 349 // be run immediately.
338 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 350 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
339 // and |task| is not filled in. In this case, the caller should wait until 351 // and |task| is not filled in. In this case, the caller should wait until
340 // a task is posted. 352 // a task is posted.
341 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 353 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
342 // immediately, and |task| is not filled in. Likewise, |wait_time| is 354 // immediately, and |task| is not filled in. Likewise, |wait_time| is
343 // filled in the time to wait until the next task to run. In this case, the 355 // filled in the time to wait until the next task to run. In this case, the
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
476 }; 488 };
477 489
478 // Worker definitions --------------------------------------------------------- 490 // Worker definitions ---------------------------------------------------------
479 491
480 SequencedWorkerPool::Worker::Worker( 492 SequencedWorkerPool::Worker::Worker(
481 const scoped_refptr<SequencedWorkerPool>& worker_pool, 493 const scoped_refptr<SequencedWorkerPool>& worker_pool,
482 int thread_number, 494 int thread_number,
483 const std::string& prefix) 495 const std::string& prefix)
484 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 496 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
485 worker_pool_(worker_pool), 497 worker_pool_(worker_pool),
486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 498 task_shutdown_behavior_(BLOCK_SHUTDOWN),
499 is_processing_task_(false) {
487 Start(); 500 Start();
488 } 501 }
489 502
490 SequencedWorkerPool::Worker::~Worker() { 503 SequencedWorkerPool::Worker::~Worker() {
491 } 504 }
492 505
493 void SequencedWorkerPool::Worker::Run() { 506 void SequencedWorkerPool::Worker::Run() {
494 #if defined(OS_WIN) 507 #if defined(OS_WIN)
495 win::ScopedCOMInitializer com_initializer; 508 win::ScopedCOMInitializer com_initializer;
496 #endif 509 #endif
497 510
498 // Store a pointer to the running sequence in thread local storage for 511 // Store a pointer to the running sequence in thread local storage for
499 // static function access. 512 // static function access.
500 g_lazy_tls_ptr.Get().Set(&running_sequence_); 513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_);
501 514
502 // Just jump back to the Inner object to run the thread, since it has all the 515 // Just jump back to the Inner object to run the thread, since it has all the
503 // tracking information and queues. It might be more natural to implement 516 // tracking information and queues. It might be more natural to implement
504 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
505 // having these worker objects at all, but that method lacks the ability to 518 // having these worker objects at all, but that method lacks the ability to
506 // send thread-specific information easily to the thread loop. 519 // send thread-specific information easily to the thread loop.
507 worker_pool_->inner_->ThreadLoop(this); 520 worker_pool_->inner_->ThreadLoop(this);
508 // Release our cyclic reference once we're done. 521 // Release our cyclic reference once we're done.
509 worker_pool_ = NULL; 522 worker_pool_ = NULL;
510 } 523 }
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
576 sequenced.posted_from = from_here; 589 sequenced.posted_from = from_here;
577 sequenced.task = 590 sequenced.task =
578 shutdown_behavior == BLOCK_SHUTDOWN ? 591 shutdown_behavior == BLOCK_SHUTDOWN ?
579 base::MakeCriticalClosure(task) : task; 592 base::MakeCriticalClosure(task) : task;
580 sequenced.time_to_run = TimeTicks::Now() + delay; 593 sequenced.time_to_run = TimeTicks::Now() + delay;
581 594
582 int create_thread_id = 0; 595 int create_thread_id = 0;
583 { 596 {
584 AutoLock lock(lock_); 597 AutoLock lock(lock_);
585 if (shutdown_called_) { 598 if (shutdown_called_) {
586 if (shutdown_behavior != BLOCK_SHUTDOWN || 599 // Don't allow a new task to be posted if it doesn't block shutdown.
587 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { 600 if (shutdown_behavior != BLOCK_SHUTDOWN)
601 return false;
602
603 // If the current thread is running a task, and that task doesn't block
604 // shutdown, then it shouldn't be allowed to post any more tasks.
605 ThreadMap::const_iterator found =
606 threads_.find(PlatformThread::CurrentId());
607 if (found != threads_.end() && found->second->is_processing_task() &&
608 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
588 return false; 609 return false;
589 } 610 }
611
590 if (max_blocking_tasks_after_shutdown_ <= 0) { 612 if (max_blocking_tasks_after_shutdown_ <= 0) {
591 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 613 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
592 return false; 614 return false;
593 } 615 }
594 max_blocking_tasks_after_shutdown_ -= 1; 616 max_blocking_tasks_after_shutdown_ -= 1;
595 } 617 }
596 618
597 // The trace_id is used for identifying the task in about:tracing. 619 // The trace_id is used for identifying the task in about:tracing.
598 sequenced.trace_id = trace_id_++; 620 sequenced.trace_id = trace_id_++;
599 621
(...skipping 28 matching lines...) Expand all
628 AutoLock lock(lock_); 650 AutoLock lock(lock_);
629 return ContainsKey(threads_, PlatformThread::CurrentId()); 651 return ContainsKey(threads_, PlatformThread::CurrentId());
630 } 652 }
631 653
632 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 654 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
633 SequenceToken sequence_token) const { 655 SequenceToken sequence_token) const {
634 AutoLock lock(lock_); 656 AutoLock lock(lock_);
635 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 657 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
636 if (found == threads_.end()) 658 if (found == threads_.end())
637 return false; 659 return false;
638 return sequence_token.Equals(found->second->running_sequence()); 660 return found->second->is_processing_task() &&
661 sequence_token.Equals(found->second->task_sequence_token());
639 } 662 }
640 663
641 // See https://code.google.com/p/chromium/issues/detail?id=168415 664 // See https://code.google.com/p/chromium/issues/detail?id=168415
642 void SequencedWorkerPool::Inner::CleanupForTesting() { 665 void SequencedWorkerPool::Inner::CleanupForTesting() {
643 DCHECK(!RunsTasksOnCurrentThread()); 666 DCHECK(!RunsTasksOnCurrentThread());
644 base::ThreadRestrictions::ScopedAllowWait allow_wait; 667 base::ThreadRestrictions::ScopedAllowWait allow_wait;
645 AutoLock lock(lock_); 668 AutoLock lock(lock_);
646 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 669 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
647 if (shutdown_called_) 670 if (shutdown_called_)
648 return; 671 return;
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 tracked_objects::TaskStopwatch stopwatch; 781 tracked_objects::TaskStopwatch stopwatch;
759 stopwatch.Start(); 782 stopwatch.Start();
760 task.task.Run(); 783 task.task.Run();
761 stopwatch.Stop(); 784 stopwatch.Stop();
762 785
763 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 786 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
764 task, stopwatch); 787 task, stopwatch);
765 788
766 // Make sure our task is erased outside the lock for the 789 // Make sure our task is erased outside the lock for the
767 // same reason we do this with delete_these_oustide_lock. 790 // same reason we do this with delete_these_oustide_lock.
768 // Also, do it before calling set_running_task_info() so 791 // Also, do it before calling reset_running_task_info() so
769 // that sequence-checking from within the task's destructor 792 // that sequence-checking from within the task's destructor
770 // still works. 793 // still works.
771 task.task = Closure(); 794 task.task = Closure();
772 795
773 this_worker->set_running_task_info( 796 this_worker->reset_running_task_info();
774 SequenceToken(), CONTINUE_ON_SHUTDOWN);
775 } 797 }
776 DidRunWorkerTask(task); // Must be done inside the lock. 798 DidRunWorkerTask(task); // Must be done inside the lock.
777 } else if (cleanup_state_ == CLEANUP_RUNNING) { 799 } else if (cleanup_state_ == CLEANUP_RUNNING) {
778 switch (status) { 800 switch (status) {
779 case GET_WORK_WAIT: { 801 case GET_WORK_WAIT: {
780 AutoUnlock unlock(lock_); 802 AutoUnlock unlock(lock_);
781 delete_these_outside_lock.clear(); 803 delete_these_outside_lock.clear();
782 } 804 }
783 break; 805 break;
784 case GET_WORK_NOT_FOUND: 806 case GET_WORK_NOT_FOUND:
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
897 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 919 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
898 return result.id_; 920 return result.id_;
899 } 921 }
900 922
901 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 923 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
902 lock_.AssertAcquired(); 924 lock_.AssertAcquired();
903 // We assume that we never create enough tasks to wrap around. 925 // We assume that we never create enough tasks to wrap around.
904 return next_sequence_task_number_++; 926 return next_sequence_task_number_++;
905 } 927 }
906 928
907 SequencedWorkerPool::WorkerShutdown
908 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
909 lock_.AssertAcquired();
910 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
911 if (found == threads_.end())
912 return CONTINUE_ON_SHUTDOWN;
913 return found->second->running_shutdown_behavior();
914 }
915
916 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 929 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
917 SequencedTask* task, 930 SequencedTask* task,
918 TimeDelta* wait_time, 931 TimeDelta* wait_time,
919 std::vector<Closure>* delete_these_outside_lock) { 932 std::vector<Closure>* delete_these_outside_lock) {
920 lock_.AssertAcquired(); 933 lock_.AssertAcquired();
921 934
922 // Find the next task with a sequence token that's not currently in use. 935 // Find the next task with a sequence token that's not currently in use.
923 // If the token is in use, that means another thread is running something 936 // If the token is in use, that means another thread is running something
924 // in that sequence, and we can't run it without going out-of-order. 937 // in that sequence, and we can't run it without going out-of-order.
925 // 938 //
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
1289 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1302 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1290 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1303 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1291 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1304 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1292 } 1305 }
1293 1306
1294 bool SequencedWorkerPool::IsShutdownInProgress() { 1307 bool SequencedWorkerPool::IsShutdownInProgress() {
1295 return inner_->IsShutdownInProgress(); 1308 return inner_->IsShutdownInProgress();
1296 } 1309 }
1297 1310
1298 } // namespace base 1311 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698