Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9663075: Implementation of SequencedTaskRunner based on SequencedWorkerPool. (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomicops.h" 13 #include "base/atomicops.h"
14 #include "base/callback.h" 14 #include "base/callback.h"
15 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
16 #include "base/logging.h" 16 #include "base/logging.h"
17 #include "base/memory/linked_ptr.h" 17 #include "base/memory/linked_ptr.h"
18 #include "base/message_loop_proxy.h" 18 #include "base/message_loop_proxy.h"
19 #include "base/metrics/histogram.h" 19 #include "base/metrics/histogram.h"
20 #include "base/stl_util.h" 20 #include "base/stl_util.h"
21 #include "base/stringprintf.h" 21 #include "base/stringprintf.h"
22 #include "base/synchronization/condition_variable.h" 22 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h" 23 #include "base/synchronization/lock.h"
24 #include "base/threading/platform_thread.h" 24 #include "base/threading/platform_thread.h"
25 #include "base/threading/sequenced_task_runner_impl.h"
25 #include "base/threading/simple_thread.h" 26 #include "base/threading/simple_thread.h"
26 #include "base/time.h" 27 #include "base/time.h"
27 #include "base/tracked_objects.h" 28 #include "base/tracked_objects.h"
28 29
29 namespace base { 30 namespace base {
30 31
31 namespace { 32 namespace {
32 33
33 struct SequencedTask { 34 struct SequencedTask {
34 SequencedTask() 35 SequencedTask()
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 75 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
75 const std::string& thread_name_prefix, 76 const std::string& thread_name_prefix,
76 TestingObserver* observer); 77 TestingObserver* observer);
77 78
78 ~Inner(); 79 ~Inner();
79 80
80 SequenceToken GetSequenceToken(); 81 SequenceToken GetSequenceToken();
81 82
82 SequenceToken GetNamedSequenceToken(const std::string& name); 83 SequenceToken GetNamedSequenceToken(const std::string& name);
83 84
85 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
86 SequenceToken sequence_token);
87
84 // This function accepts a name and an ID. If the name is null, the 88 // This function accepts a name and an ID. If the name is null, the
85 // token ID is used. This allows us to implement the optional name lookup 89 // token ID is used. This allows us to implement the optional name lookup
86 // from a single function without having to enter the lock a separate time. 90 // from a single function without having to enter the lock a separate time.
87 bool PostTask(const std::string* optional_token_name, 91 bool PostTask(const std::string* optional_token_name,
88 SequenceToken sequence_token, 92 SequenceToken sequence_token,
89 WorkerShutdown shutdown_behavior, 93 WorkerShutdown shutdown_behavior,
90 const tracked_objects::Location& from_here, 94 const tracked_objects::Location& from_here,
91 const Closure& task); 95 const Closure& task);
92 96
93 bool RunsTasksOnCurrentThread() const; 97 bool RunsTasksOnCurrentThread() const;
(...skipping 11 matching lines...) Expand all
105 109
106 private: 110 private:
107 // Returns whether there are no more pending tasks and all threads 111 // Returns whether there are no more pending tasks and all threads
108 // are idle. Must be called under lock. 112 // are idle. Must be called under lock.
109 bool IsIdle() const; 113 bool IsIdle() const;
110 114
111 // Called from within the lock, this converts the given token name into a 115 // Called from within the lock, this converts the given token name into a
112 // token ID, creating a new one if necessary. 116 // token ID, creating a new one if necessary.
113 int LockedGetNamedTokenID(const std::string& name); 117 int LockedGetNamedTokenID(const std::string& name);
114 118
119 // Called from within the lock, this returns a reference to a
120 // SequencedTaskRunner which wraps the SequencedWorkerPool object.
121 scoped_refptr<SequencedTaskRunner> LockedGetSequencedTaskRunner(
122 SequenceToken token);
123
115 // The calling code should clear the given delete_these_oustide_lock 124 // The calling code should clear the given delete_these_oustide_lock
116 // vector the next time the lock is released. See the implementation for 125 // vector the next time the lock is released. See the implementation for
117 // a more detailed description. 126 // a more detailed description.
118 bool GetWork(SequencedTask* task, 127 bool GetWork(SequencedTask* task,
119 std::vector<Closure>* delete_these_outside_lock); 128 std::vector<Closure>* delete_these_outside_lock);
120 129
121 // Peforms init and cleanup around running the given task. WillRun... 130 // Peforms init and cleanup around running the given task. WillRun...
122 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 131 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
123 // The calling code should call FinishStartingAdditionalThread once the 132 // The calling code should call FinishStartingAdditionalThread once the
124 // lock is released if the return values is nonzero. 133 // lock is released if the return values is nonzero.
(...skipping 26 matching lines...) Expand all
151 160
152 // Signal |has_work_| and increment |has_work_signal_count_|. 161 // Signal |has_work_| and increment |has_work_signal_count_|.
153 void SignalHasWork(); 162 void SignalHasWork();
154 163
155 // Checks whether there is work left that's blocking shutdown. Must be 164 // Checks whether there is work left that's blocking shutdown. Must be
156 // called inside the lock. 165 // called inside the lock.
157 bool CanShutdown() const; 166 bool CanShutdown() const;
158 167
159 SequencedWorkerPool* const worker_pool_; 168 SequencedWorkerPool* const worker_pool_;
160 169
170 WeakPtr<SequencedTaskRunner> sequenced_task_runner_;
171
161 // The last sequence number used. Managed by GetSequenceToken, since this 172 // The last sequence number used. Managed by GetSequenceToken, since this
162 // only does threadsafe increment operations, you do not need to hold the 173 // only does threadsafe increment operations, you do not need to hold the
163 // lock. 174 // lock.
164 volatile subtle::Atomic32 last_sequence_number_; 175 volatile subtle::Atomic32 last_sequence_number_;
165 176
166 // This lock protects |everything in this class|. Do not read or modify 177 // This lock protects |everything in this class|. Do not read or modify
167 // anything without holding this lock. Do not block while holding this 178 // anything without holding this lock. Do not block while holding this
168 // lock. 179 // lock.
169 mutable Lock lock_; 180 mutable Lock lock_;
170 181
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); 309 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
299 return SequenceToken(static_cast<int>(result)); 310 return SequenceToken(static_cast<int>(result));
300 } 311 }
301 312
302 SequencedWorkerPool::SequenceToken 313 SequencedWorkerPool::SequenceToken
303 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 314 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
304 AutoLock lock(lock_); 315 AutoLock lock(lock_);
305 return SequenceToken(LockedGetNamedTokenID(name)); 316 return SequenceToken(LockedGetNamedTokenID(name));
306 } 317 }
307 318
319 scoped_refptr<SequencedTaskRunner>
320 SequencedWorkerPool::Inner::GetSequencedTaskRunner(SequenceToken token) {
321 AutoLock lock(lock_);
322 return LockedGetSequencedTaskRunner(token);
323 }
324
308 bool SequencedWorkerPool::Inner::PostTask( 325 bool SequencedWorkerPool::Inner::PostTask(
309 const std::string* optional_token_name, 326 const std::string* optional_token_name,
310 SequenceToken sequence_token, 327 SequenceToken sequence_token,
311 WorkerShutdown shutdown_behavior, 328 WorkerShutdown shutdown_behavior,
312 const tracked_objects::Location& from_here, 329 const tracked_objects::Location& from_here,
313 const Closure& task) { 330 const Closure& task) {
314 SequencedTask sequenced; 331 SequencedTask sequenced;
315 sequenced.sequence_token_id = sequence_token.id_; 332 sequenced.sequence_token_id = sequence_token.id_;
316 sequenced.shutdown_behavior = shutdown_behavior; 333 sequenced.shutdown_behavior = shutdown_behavior;
317 sequenced.location = from_here; 334 sequenced.location = from_here;
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after
474 named_sequence_tokens_.find(name); 491 named_sequence_tokens_.find(name);
475 if (found != named_sequence_tokens_.end()) 492 if (found != named_sequence_tokens_.end())
476 return found->second; // Got an existing one. 493 return found->second; // Got an existing one.
477 494
478 // Create a new one for this name. 495 // Create a new one for this name.
479 SequenceToken result = GetSequenceToken(); 496 SequenceToken result = GetSequenceToken();
480 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 497 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
481 return result.id_; 498 return result.id_;
482 } 499 }
483 500
501 scoped_refptr<SequencedTaskRunner>
502 SequencedWorkerPool::Inner::LockedGetSequencedTaskRunner(SequenceToken token) {
503 if (!sequenced_task_runner_) {
504 scoped_refptr<SequencedTaskRunner> ref = new SequencedTaskRunnerImpl(
505 scoped_refptr<SequencedWorkerPool>(worker_pool_), token);
506 sequenced_task_runner_ =
507 static_cast<SequencedTaskRunnerImpl*>(ref.get())->AsWeakPtr();
508 return ref;
509 }
510 return scoped_refptr<SequencedTaskRunner>(sequenced_task_runner_.get());
511 }
512
484 bool SequencedWorkerPool::Inner::GetWork( 513 bool SequencedWorkerPool::Inner::GetWork(
485 SequencedTask* task, 514 SequencedTask* task,
486 std::vector<Closure>* delete_these_outside_lock) { 515 std::vector<Closure>* delete_these_outside_lock) {
487 lock_.AssertAcquired(); 516 lock_.AssertAcquired();
488 517
489 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); 518 DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
490 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 519 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
491 static_cast<int>(pending_task_count_)); 520 static_cast<int>(pending_task_count_));
492 521
493 // Find the next task with a sequence token that's not currently in use. 522 // Find the next task with a sequence token that's not currently in use.
(...skipping 224 matching lines...) Expand 10 before | Expand all | Expand 10 after
718 747
719 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 748 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
720 return inner_->GetSequenceToken(); 749 return inner_->GetSequenceToken();
721 } 750 }
722 751
723 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 752 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
724 const std::string& name) { 753 const std::string& name) {
725 return inner_->GetNamedSequenceToken(name); 754 return inner_->GetNamedSequenceToken(name);
726 } 755 }
727 756
757 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
758 SequenceToken token) {
759 return inner_->GetSequencedTaskRunner(token);
760 }
761
728 bool SequencedWorkerPool::PostWorkerTask( 762 bool SequencedWorkerPool::PostWorkerTask(
729 const tracked_objects::Location& from_here, 763 const tracked_objects::Location& from_here,
730 const Closure& task) { 764 const Closure& task) {
731 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 765 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
732 from_here, task); 766 from_here, task);
733 } 767 }
734 768
735 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 769 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
736 const tracked_objects::Location& from_here, 770 const tracked_objects::Location& from_here,
737 const Closure& task, 771 const Closure& task,
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
795 void SequencedWorkerPool::SignalHasWorkForTesting() { 829 void SequencedWorkerPool::SignalHasWorkForTesting() {
796 inner_->SignalHasWorkForTesting(); 830 inner_->SignalHasWorkForTesting();
797 } 831 }
798 832
799 void SequencedWorkerPool::Shutdown() { 833 void SequencedWorkerPool::Shutdown() {
800 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 834 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
801 inner_->Shutdown(); 835 inner_->Shutdown();
802 } 836 }
803 837
804 } // namespace base 838 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698