Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9663075: Implementation of SequencedTaskRunner based on SequencedWorkerPool. (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomicops.h" 13 #include "base/atomicops.h"
14 #include "base/callback.h" 14 #include "base/callback.h"
15 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
16 #include "base/logging.h" 16 #include "base/logging.h"
17 #include "base/memory/linked_ptr.h" 17 #include "base/memory/linked_ptr.h"
18 #include "base/message_loop_proxy.h" 18 #include "base/message_loop_proxy.h"
19 #include "base/metrics/histogram.h" 19 #include "base/metrics/histogram.h"
20 #include "base/stl_util.h" 20 #include "base/stl_util.h"
21 #include "base/stringprintf.h" 21 #include "base/stringprintf.h"
22 #include "base/synchronization/condition_variable.h" 22 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h" 23 #include "base/synchronization/lock.h"
24 #include "base/threading/platform_thread.h" 24 #include "base/threading/platform_thread.h"
25 #include "base/threading/sequenced_task_runner_impl.h"
25 #include "base/threading/simple_thread.h" 26 #include "base/threading/simple_thread.h"
26 #include "base/time.h" 27 #include "base/time.h"
27 #include "base/tracked_objects.h" 28 #include "base/tracked_objects.h"
28 29
29 namespace base { 30 namespace base {
30 31
31 namespace { 32 namespace {
32 33
33 struct SequencedTask { 34 struct SequencedTask {
34 SequencedTask() 35 SequencedTask()
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 // by it). 74 // by it).
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 75 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
75 const std::string& thread_name_prefix); 76 const std::string& thread_name_prefix);
76 77
77 ~Inner(); 78 ~Inner();
78 79
79 SequenceToken GetSequenceToken(); 80 SequenceToken GetSequenceToken();
80 81
81 SequenceToken GetNamedSequenceToken(const std::string& name); 82 SequenceToken GetNamedSequenceToken(const std::string& name);
82 83
84 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner();
85
86 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
akalin 2012/03/15 19:17:35 Overloads are against style guide -- I think there
Francois 2012/03/18 16:28:29 Done.
87 SequenceToken sequence_token);
88
89 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
90 const std::string& token_name);
91
83 // This function accepts a name and an ID. If the name is null, the 92 // This function accepts a name and an ID. If the name is null, the
84 // token ID is used. This allows us to implement the optional name lookup 93 // token ID is used. This allows us to implement the optional name lookup
85 // from a single function without having to enter the lock a separate time. 94 // from a single function without having to enter the lock a separate time.
86 bool PostTask(const std::string* optional_token_name, 95 bool PostTask(const std::string* optional_token_name,
87 SequenceToken sequence_token, 96 SequenceToken sequence_token,
88 WorkerShutdown shutdown_behavior, 97 WorkerShutdown shutdown_behavior,
89 const tracked_objects::Location& from_here, 98 const tracked_objects::Location& from_here,
90 const Closure& task); 99 const Closure& task);
91 100
92 bool RunsTasksOnCurrentThread() const; 101 bool RunsTasksOnCurrentThread() const;
(...skipping 13 matching lines...) Expand all
106 115
107 private: 116 private:
108 // Returns whether there are no more pending tasks and all threads 117 // Returns whether there are no more pending tasks and all threads
109 // are idle. Must be called under lock. 118 // are idle. Must be called under lock.
110 bool IsIdle() const; 119 bool IsIdle() const;
111 120
112 // Called from within the lock, this converts the given token name into a 121 // Called from within the lock, this converts the given token name into a
113 // token ID, creating a new one if necessary. 122 // token ID, creating a new one if necessary.
114 int LockedGetNamedTokenID(const std::string& name); 123 int LockedGetNamedTokenID(const std::string& name);
115 124
125 // Called from within the lock, this returns a reference to a
126 // SequencedTaskRunner which wraps the SequencedWorkerPool object.
127 scoped_refptr<SequencedTaskRunner> LockedGetSequencedTaskRunner(
128 SequenceToken token);
129
116 // The calling code should clear the given delete_these_oustide_lock 130 // The calling code should clear the given delete_these_oustide_lock
117 // vector the next time the lock is released. See the implementation for 131 // vector the next time the lock is released. See the implementation for
118 // a more detailed description. 132 // a more detailed description.
119 bool GetWork(SequencedTask* task, 133 bool GetWork(SequencedTask* task,
120 std::vector<Closure>* delete_these_outside_lock); 134 std::vector<Closure>* delete_these_outside_lock);
121 135
122 // Peforms init and cleanup around running the given task. WillRun... 136 // Peforms init and cleanup around running the given task. WillRun...
123 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 137 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
124 // The calling code should call FinishStartingAdditionalThread once the 138 // The calling code should call FinishStartingAdditionalThread once the
125 // lock is released if the return values is nonzero. 139 // lock is released if the return values is nonzero.
(...skipping 26 matching lines...) Expand all
152 166
153 // Signal |has_work_| and increment |has_work_signal_count_|. 167 // Signal |has_work_| and increment |has_work_signal_count_|.
154 void SignalHasWork(); 168 void SignalHasWork();
155 169
156 // Checks whether there is work left that's blocking shutdown. Must be 170 // Checks whether there is work left that's blocking shutdown. Must be
157 // called inside the lock. 171 // called inside the lock.
158 bool CanShutdown() const; 172 bool CanShutdown() const;
159 173
160 SequencedWorkerPool* const worker_pool_; 174 SequencedWorkerPool* const worker_pool_;
161 175
176 WeakPtr<SequencedTaskRunner> sequenced_task_runner_;
akalin 2012/03/15 19:17:35 no need for this member variable. Creating a new
Francois 2012/03/18 16:28:29 Do you mean that it should do "return scoped_refpt
177
162 // The last sequence number used. Managed by GetSequenceToken, since this 178 // The last sequence number used. Managed by GetSequenceToken, since this
163 // only does threadsafe increment operations, you do not need to hold the 179 // only does threadsafe increment operations, you do not need to hold the
164 // lock. 180 // lock.
165 volatile subtle::Atomic32 last_sequence_number_; 181 volatile subtle::Atomic32 last_sequence_number_;
166 182
167 // This lock protects |everything in this class|. Do not read or modify 183 // This lock protects |everything in this class|. Do not read or modify
168 // anything without holding this lock. Do not block while holding this 184 // anything without holding this lock. Do not block while holding this
169 // lock. 185 // lock.
170 mutable Lock lock_; 186 mutable Lock lock_;
171 187
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after
302 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); 318 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
303 return SequenceToken(static_cast<int>(result)); 319 return SequenceToken(static_cast<int>(result));
304 } 320 }
305 321
306 SequencedWorkerPool::SequenceToken 322 SequencedWorkerPool::SequenceToken
307 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 323 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
308 AutoLock lock(lock_); 324 AutoLock lock(lock_);
309 return SequenceToken(LockedGetNamedTokenID(name)); 325 return SequenceToken(LockedGetNamedTokenID(name));
310 } 326 }
311 327
328 scoped_refptr<SequencedTaskRunner>
329 SequencedWorkerPool::Inner::GetSequencedTaskRunner() {
330 AutoLock lock(lock_);
331 return LockedGetSequencedTaskRunner(GetSequenceToken());
332 }
333
334 scoped_refptr<SequencedTaskRunner>
335 SequencedWorkerPool::Inner::GetSequencedTaskRunner(SequenceToken token) {
336 AutoLock lock(lock_);
akalin 2012/03/15 19:17:35 per the comments above, shouldn't need a lock for
337 return LockedGetSequencedTaskRunner(token);
338 }
339
340 scoped_refptr<SequencedTaskRunner>
341 SequencedWorkerPool::Inner::GetSequencedTaskRunner(
342 const std::string& token_name) {
343 AutoLock lock(lock_);
344 return LockedGetSequencedTaskRunner(
345 SequenceToken(LockedGetNamedTokenID(token_name)));
346 }
347
312 bool SequencedWorkerPool::Inner::PostTask( 348 bool SequencedWorkerPool::Inner::PostTask(
313 const std::string* optional_token_name, 349 const std::string* optional_token_name,
314 SequenceToken sequence_token, 350 SequenceToken sequence_token,
315 WorkerShutdown shutdown_behavior, 351 WorkerShutdown shutdown_behavior,
316 const tracked_objects::Location& from_here, 352 const tracked_objects::Location& from_here,
317 const Closure& task) { 353 const Closure& task) {
318 SequencedTask sequenced; 354 SequencedTask sequenced;
319 sequenced.sequence_token_id = sequence_token.id_; 355 sequenced.sequence_token_id = sequence_token.id_;
320 sequenced.shutdown_behavior = shutdown_behavior; 356 sequenced.shutdown_behavior = shutdown_behavior;
321 sequenced.location = from_here; 357 sequenced.location = from_here;
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after
489 named_sequence_tokens_.find(name); 525 named_sequence_tokens_.find(name);
490 if (found != named_sequence_tokens_.end()) 526 if (found != named_sequence_tokens_.end())
491 return found->second; // Got an existing one. 527 return found->second; // Got an existing one.
492 528
493 // Create a new one for this name. 529 // Create a new one for this name.
494 SequenceToken result = GetSequenceToken(); 530 SequenceToken result = GetSequenceToken();
495 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 531 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
496 return result.id_; 532 return result.id_;
497 } 533 }
498 534
535 scoped_refptr<SequencedTaskRunner>
536 SequencedWorkerPool::Inner::LockedGetSequencedTaskRunner(SequenceToken token) {
537 if (!sequenced_task_runner_) {
538 scoped_refptr<SequencedTaskRunner> ref = new SequencedTaskRunnerImpl(
539 scoped_refptr<SequencedWorkerPool>(worker_pool_), token);
540 sequenced_task_runner_ =
541 static_cast<SequencedTaskRunnerImpl*>(ref.get())->AsWeakPtr();
542 return ref;
543 }
544 return scoped_refptr<SequencedTaskRunner>(sequenced_task_runner_.get());
545 }
546
499 bool SequencedWorkerPool::Inner::GetWork( 547 bool SequencedWorkerPool::Inner::GetWork(
500 SequencedTask* task, 548 SequencedTask* task,
501 std::vector<Closure>* delete_these_outside_lock) { 549 std::vector<Closure>* delete_these_outside_lock) {
502 lock_.AssertAcquired(); 550 lock_.AssertAcquired();
503 551
504 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); 552 DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
505 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 553 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
506 static_cast<int>(pending_task_count_)); 554 static_cast<int>(pending_task_count_));
507 555
508 // Find the next task with a sequence token that's not currently in use. 556 // Find the next task with a sequence token that's not currently in use.
(...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after
725 773
726 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 774 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
727 return inner_->GetSequenceToken(); 775 return inner_->GetSequenceToken();
728 } 776 }
729 777
730 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 778 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
731 const std::string& name) { 779 const std::string& name) {
732 return inner_->GetNamedSequenceToken(name); 780 return inner_->GetNamedSequenceToken(name);
733 } 781 }
734 782
783 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
784 ) {
785 return inner_->GetSequencedTaskRunner();
786 }
787
788 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
789 SequenceToken token) {
790 return inner_->GetSequencedTaskRunner(token);
791 }
792
793 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
794 const std::string& token_name) {
795 return inner_->GetSequencedTaskRunner(token_name);
796 }
797
735 bool SequencedWorkerPool::PostWorkerTask( 798 bool SequencedWorkerPool::PostWorkerTask(
736 const tracked_objects::Location& from_here, 799 const tracked_objects::Location& from_here,
737 const Closure& task) { 800 const Closure& task) {
738 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 801 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
739 from_here, task); 802 from_here, task);
740 } 803 }
741 804
742 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 805 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
743 const tracked_objects::Location& from_here, 806 const tracked_objects::Location& from_here,
744 const Closure& task, 807 const Closure& task,
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
810 void SequencedWorkerPool::Shutdown() { 873 void SequencedWorkerPool::Shutdown() {
811 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 874 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
812 inner_->Shutdown(); 875 inner_->Shutdown();
813 } 876 }
814 877
815 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { 878 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
816 inner_->SetTestingObserver(observer); 879 inner_->SetTestingObserver(observer);
817 } 880 }
818 881
819 } // namespace base 882 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698