Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(257)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9347056: Fix up SequencedWorkerPool in preparation for making it a TaskRunner (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <deque> 7 #include "base/compiler_specific.h"
8 #include <set> 8 #include "base/logging.h"
9
10 #include "base/atomicops.h"
11 #include "base/bind.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/metrics/histogram.h" 9 #include "base/metrics/histogram.h"
14 #include "base/stringprintf.h" 10 #include "base/stringprintf.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/threading/simple_thread.h" 11 #include "base/threading/simple_thread.h"
17 #include "base/threading/thread.h" 12 #include "base/time.h"
18 13
19 namespace base { 14 namespace base {
20 15
21 namespace {
22
23 struct SequencedTask {
24 int sequence_token_id;
25 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
26 tracked_objects::Location location;
27 base::Closure task;
28 };
29
30 } // namespace
31
32 // Worker --------------------------------------------------------------------- 16 // Worker ---------------------------------------------------------------------
33 17
34 class SequencedWorkerPool::Worker : public base::SimpleThread { 18 class SequencedWorkerPool::Worker : public SimpleThread {
35 public: 19 public:
36 Worker(SequencedWorkerPool::Inner* inner, 20 // Hold a ref to |worker_pool|, since we want to keep it around even
21 // if it doesn't join our thread. Note that this (deliberately)
22 // leaks on shutdown.
23 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
37 int thread_number, 24 int thread_number,
38 const std::string& thread_name_prefix); 25 const std::string& thread_name_prefix);
39 ~Worker(); 26 virtual ~Worker();
40 27
41 // SimpleThread implementation. This actually runs the background thread. 28 // SimpleThread implementation. This actually runs the background thread.
42 virtual void Run(); 29 virtual void Run() OVERRIDE;
43 30
44 private: 31 private:
45 SequencedWorkerPool::Inner* inner_; 32 const scoped_refptr<SequencedWorkerPool> worker_pool_;
46 SequencedWorkerPool::WorkerShutdown current_shutdown_mode_;
47 33
48 DISALLOW_COPY_AND_ASSIGN(Worker); 34 DISALLOW_COPY_AND_ASSIGN(Worker);
49 }; 35 };
50 36
51 37 SequencedWorkerPool::Worker::Worker(
52 // Inner ---------------------------------------------------------------------- 38 const scoped_refptr<SequencedWorkerPool>& worker_pool,
53 39 int thread_number,
54 class SequencedWorkerPool::Inner 40 const std::string& prefix)
55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { 41 : SimpleThread(
56 public:
57 Inner(size_t max_threads, const std::string& thread_name_prefix);
58 virtual ~Inner();
59
60 SequenceToken GetSequenceToken();
61
62 SequenceToken GetNamedSequenceToken(const std::string& name);
63
64 // This function accepts a name and an ID. If the name is null, the
65 // token ID is used. This allows us to implement the optional name lookup
66 // from a single function without having to enter the lock a separate time.
67 bool PostTask(const std::string* optional_token_name,
68 int sequence_token_id,
69 SequencedWorkerPool::WorkerShutdown shutdown_behavior,
70 const tracked_objects::Location& from_here,
71 const base::Closure& task);
72
73 void Flush();
74
75 void Shutdown();
76
77 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer);
78
79 // Runs the worker loop on the background thread.
80 void ThreadLoop(Worker* this_worker);
81
82 private:
83 // Called from within the lock, this converts the given token name into a
84 // token ID, creating a new one if necessary.
85 int LockedGetNamedTokenID(const std::string& name);
86
87 // The calling code should clear the given delete_these_oustide_lock
88 // vector the next time the lock is released. See the implementation for
89 // a more detailed description.
90 bool GetWork(SequencedTask* task,
91 std::vector<base::Closure>* delete_these_outside_lock);
92
93 // Peforms init and cleanup around running the given task. WillRun...
94 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
95 // The calling code should call FinishStartingAdditionalThread once the
96 // lock is released if the return values is nonzero.
97 int WillRunWorkerTask(const SequencedTask& task);
98 void DidRunWorkerTask(const SequencedTask& task);
99
100 // Returns true if there are no threads currently running the given
101 // sequence token.
102 bool IsSequenceTokenRunnable(int sequence_token_id) const;
103
104 // Checks if all threads are busy and the addition of one more could run an
105 // additional task waiting in the queue. This must be called from within
106 // the lock.
107 //
108 // If another thread is helpful, this will mark the thread as being in the
109 // process of starting and returns the index of the new thread which will be
110 // 0 or more. The caller should then call FinishStartingAdditionalThread to
111 // complete initialization once the lock is released.
112 //
113 // If another thread is not necessary, returne 0;
114 //
115 // See the implementedion for more.
116 int PrepareToStartAdditionalThreadIfHelpful();
117
118 // The second part of thread creation after
119 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
120 // generated. This actually creates the thread and should be called outside
121 // the lock to avoid blocking important work starting a thread in the lock.
122 void FinishStartingAdditionalThread(int thread_number);
123
124 // Checks whether there is work left that's blocking shutdown. Must be
125 // called inside the lock.
126 bool CanShutdown() const;
127
128 // The last sequence number used. Managed by GetSequenceToken, since this
129 // only does threadsafe increment operations, you do not need to hold the
130 // lock.
131 volatile base::subtle::Atomic32 last_sequence_number_;
132
133 // This lock protects |everything in this class|. Do not read or modify
134 // anything without holding this lock. Do not block while holding this
135 // lock.
136 base::Lock lock_;
137
138 // Condition variable used to wake up worker threads when a task is runnable.
139 base::ConditionVariable cond_var_;
140
141 // The maximum number of worker threads we'll create.
142 size_t max_threads_;
143
144 std::string thread_name_prefix_;
145
146 // Associates all known sequence token names with their IDs.
147 std::map<std::string, int> named_sequence_tokens_;
148
149 // Owning pointers to all threads we've created so far. Since we lazily
150 // create threads, this may be less than max_threads_ and will be initially
151 // empty.
152 std::vector<linked_ptr<Worker> > threads_;
153
154 // Set to true when we're in the process of creating another thread.
155 // See PrepareToStartAdditionalThreadIfHelpful for more.
156 bool thread_being_created_;
157
158 // Number of threads currently waiting for work.
159 size_t waiting_thread_count_;
160
161 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
162 // flag set.
163 size_t blocking_shutdown_thread_count_;
164
165 // In-order list of all pending tasks. These are tasks waiting for a thread
166 // to run on or that are blocked on a previous task in their sequence.
167 //
168 // We maintain the pending_task_count_ separately for metrics because
169 // list.size() can be linear time.
170 std::list<SequencedTask> pending_tasks_;
171 size_t pending_task_count_;
172
173 // Number of tasks in the pending_tasks_ list that are marked as blocking
174 // shutdown.
175 size_t blocking_shutdown_pending_task_count_;
176
177 // Lists all sequence tokens currently executing.
178 std::set<int> current_sequences_;
179
180 // Set when the app is terminating and no further tasks should be allowed,
181 // though we may still be running existing tasks.
182 bool terminating_;
183
184 // Set when Shutdown is called to do some assertions.
185 bool shutdown_called_;
186
187 SequencedWorkerPool::TestingObserver* testing_observer_;
188 };
189
190 SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner,
191 int thread_number,
192 const std::string& prefix)
193 : base::SimpleThread(
194 prefix + StringPrintf("Worker%d", thread_number).c_str()), 42 prefix + StringPrintf("Worker%d", thread_number).c_str()),
195 inner_(inner), 43 worker_pool_(worker_pool) {
196 current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) {
197 Start(); 44 Start();
198 } 45 }
199 46
200 SequencedWorkerPool::Worker::~Worker() { 47 SequencedWorkerPool::Worker::~Worker() {
201 } 48 }
202 49
203 void SequencedWorkerPool::Worker::Run() { 50 void SequencedWorkerPool::Worker::Run() {
204 // Just jump back to the Inner object to run the thread, since it has all the 51 // Just jump back to the Inner object to run the thread, since it has all the
205 // tracking information and queues. It might be more natural to implement 52 // tracking information and queues. It might be more natural to implement
206 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 53 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
207 // having these worker objects at all, but that method lacks the ability to 54 // having these worker objects at all, but that method lacks the ability to
208 // send thread-specific information easily to the thread loop. 55 // send thread-specific information easily to the thread loop.
209 inner_->ThreadLoop(this); 56 worker_pool_->ThreadLoop(this);
210 } 57 }
211 58
212 SequencedWorkerPool::Inner::Inner(size_t max_threads, 59 // SequencedWorkerPool --------------------------------------------------------
213 const std::string& thread_name_prefix) 60
61 SequencedWorkerPool::SequencedTask::SequencedTask()
62 : sequence_token_id(0),
63 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
64
65 SequencedWorkerPool::SequencedTask::~SequencedTask() {}
66
67 SequencedWorkerPool::SequencedWorkerPool(
68 size_t max_threads,
69 const std::string& thread_name_prefix)
214 : last_sequence_number_(0), 70 : last_sequence_number_(0),
215 lock_(), 71 lock_(),
216 cond_var_(&lock_), 72 cond_var_(&lock_),
217 max_threads_(max_threads), 73 max_threads_(max_threads),
218 thread_name_prefix_(thread_name_prefix), 74 thread_name_prefix_(thread_name_prefix),
219 thread_being_created_(false), 75 thread_being_created_(false),
220 waiting_thread_count_(0), 76 waiting_thread_count_(0),
221 blocking_shutdown_thread_count_(0), 77 blocking_shutdown_thread_count_(0),
222 pending_task_count_(0), 78 pending_task_count_(0),
223 blocking_shutdown_pending_task_count_(0), 79 blocking_shutdown_pending_task_count_(0),
224 terminating_(false),
225 shutdown_called_(false), 80 shutdown_called_(false),
226 testing_observer_(NULL) { 81 testing_observer_(NULL) {
227 } 82 }
228 83
229 SequencedWorkerPool::Inner::~Inner() { 84 SequencedWorkerPool::~SequencedWorkerPool() {
230 // You must call Shutdown() before destroying the pool. 85 // You must call Shutdown() before destroying the pool.
231 DCHECK(shutdown_called_); 86 DCHECK(shutdown_called_);
232 87
233 // Need to explicitly join with the threads before they're destroyed or else 88 // Need to explicitly join with the threads before they're destroyed or else
234 // they will be running when our object is half torn down. 89 // they will be running when our object is half torn down.
235 for (size_t i = 0; i < threads_.size(); i++) 90 for (size_t i = 0; i < threads_.size(); i++)
236 threads_[i]->Join(); 91 threads_[i]->Join();
237 threads_.clear(); 92 threads_.clear();
238 } 93 }
239 94
240 SequencedWorkerPool::SequenceToken 95 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
241 SequencedWorkerPool::Inner::GetSequenceToken() { 96 subtle::Atomic32 result =
242 base::subtle::Atomic32 result = 97 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
243 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
244 return SequenceToken(static_cast<int>(result)); 98 return SequenceToken(static_cast<int>(result));
245 } 99 }
246 100
247 SequencedWorkerPool::SequenceToken 101 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
248 SequencedWorkerPool::Inner::GetNamedSequenceToken(
249 const std::string& name) { 102 const std::string& name) {
250 base::AutoLock lock(lock_); 103 AutoLock lock(lock_);
251 return SequenceToken(LockedGetNamedTokenID(name)); 104 return SequenceToken(LockedGetNamedTokenID(name));
252 } 105 }
253 106
254 bool SequencedWorkerPool::Inner::PostTask( 107 bool SequencedWorkerPool::PostWorkerTask(
255 const std::string* optional_token_name,
256 int sequence_token_id,
257 SequencedWorkerPool::WorkerShutdown shutdown_behavior,
258 const tracked_objects::Location& from_here, 108 const tracked_objects::Location& from_here,
259 const base::Closure& task) { 109 const Closure& task) {
260 SequencedTask sequenced; 110 return PostTaskHelper(NULL, SequenceToken(), BLOCK_SHUTDOWN,
261 sequenced.sequence_token_id = sequence_token_id; 111 from_here, task);
262 sequenced.shutdown_behavior = shutdown_behavior;
263 sequenced.location = from_here;
264 sequenced.task = task;
265
266 int create_thread_id = 0;
267 {
268 base::AutoLock lock(lock_);
269 if (terminating_)
270 return false;
271
272 // Now that we have the lock, apply the named token rules.
273 if (optional_token_name)
274 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
275
276 pending_tasks_.push_back(sequenced);
277 pending_task_count_++;
278 if (shutdown_behavior == BLOCK_SHUTDOWN)
279 blocking_shutdown_pending_task_count_++;
280
281 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
282 }
283
284 // Actually start the additional thread or signal an existing one now that
285 // we're outside the lock.
286 if (create_thread_id)
287 FinishStartingAdditionalThread(create_thread_id);
288 else
289 cond_var_.Signal();
290
291 return true;
292 } 112 }
293 113
294 void SequencedWorkerPool::Inner::Flush() { 114 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
115 const tracked_objects::Location& from_here,
116 const Closure& task,
117 WorkerShutdown shutdown_behavior) {
118 return PostTaskHelper(NULL, SequenceToken(), shutdown_behavior,
119 from_here, task);
120 }
121
122 bool SequencedWorkerPool::PostSequencedWorkerTask(
123 SequenceToken sequence_token,
124 const tracked_objects::Location& from_here,
125 const Closure& task) {
126 return PostTaskHelper(NULL, sequence_token, BLOCK_SHUTDOWN,
127 from_here, task);
128 }
129
130 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
131 const std::string& token_name,
132 const tracked_objects::Location& from_here,
133 const Closure& task) {
134 DCHECK(!token_name.empty());
135 return PostTaskHelper(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
136 from_here, task);
137 }
138
139 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
140 SequenceToken sequence_token,
141 const tracked_objects::Location& from_here,
142 const Closure& task,
143 WorkerShutdown shutdown_behavior) {
144 return PostTaskHelper(NULL, sequence_token, shutdown_behavior,
145 from_here, task);
146 }
147
148 void SequencedWorkerPool::FlushForTesting() {
295 { 149 {
296 base::AutoLock lock(lock_); 150 AutoLock lock(lock_);
297 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) 151 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
298 cond_var_.Wait(); 152 cond_var_.Wait();
299 } 153 }
300 cond_var_.Signal(); 154 cond_var_.Signal();
301 } 155 }
302 156
303 void SequencedWorkerPool::Inner::Shutdown() { 157 void SequencedWorkerPool::Shutdown() {
304 if (shutdown_called_)
305 return;
306 shutdown_called_ = true;
307
308 // Mark us as terminated and go through and drop all tasks that aren't 158 // Mark us as terminated and go through and drop all tasks that aren't
309 // required to run on shutdown. Since no new tasks will get posted once the 159 // required to run on shutdown. Since no new tasks will get posted once the
310 // terminated flag is set, this ensures that all remaining tasks are required 160 // terminated flag is set, this ensures that all remaining tasks are required
311 // for shutdown whenever the termianted_ flag is set. 161 // for shutdown whenever the termianted_ flag is set.
312 { 162 {
313 base::AutoLock lock(lock_); 163 AutoLock lock(lock_);
314 DCHECK(!terminating_); 164
315 terminating_ = true; 165 if (shutdown_called_)
166 return;
167 shutdown_called_ = true;
316 168
317 // Tickle the threads. This will wake up a waiting one so it will know that 169 // Tickle the threads. This will wake up a waiting one so it will know that
318 // it can exit, which in turn will wake up any other waiting ones. 170 // it can exit, which in turn will wake up any other waiting ones.
319 cond_var_.Signal(); 171 cond_var_.Signal();
320 172
321 // There are no pending or running tasks blocking shutdown, we're done. 173 // There are no pending or running tasks blocking shutdown, we're done.
322 if (CanShutdown()) 174 if (CanShutdown())
323 return; 175 return;
324 } 176 }
325 177
326 // If we get here, we know we're either waiting on a blocking task that's 178 // If we get here, we know we're either waiting on a blocking task that's
327 // currently running, waiting on a blocking task that hasn't been scheduled 179 // currently running, waiting on a blocking task that hasn't been scheduled
328 // yet, or both. Block on the "queue empty" event to know when all tasks are 180 // yet, or both. Block on the "queue empty" event to know when all tasks are
329 // complete. This must be done outside the lock. 181 // complete. This must be done outside the lock.
330 if (testing_observer_) 182 if (testing_observer_)
331 testing_observer_->WillWaitForShutdown(); 183 testing_observer_->WillWaitForShutdown();
332 184
333 base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now(); 185 TimeTicks shutdown_wait_begin = TimeTicks::Now();
334 186
335 // Wait for no more tasks. 187 // Wait for no more tasks.
336 { 188 {
337 base::AutoLock lock(lock_); 189 AutoLock lock(lock_);
338 while (!CanShutdown()) 190 while (!CanShutdown())
339 cond_var_.Wait(); 191 cond_var_.Wait();
340 } 192 }
341 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 193 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
342 base::TimeTicks::Now() - shutdown_wait_begin); 194 TimeTicks::Now() - shutdown_wait_begin);
343 } 195 }
344 196
345 void SequencedWorkerPool::Inner::SetTestingObserver( 197 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
346 SequencedWorkerPool::TestingObserver* observer) { 198 AutoLock lock(lock_);
347 base::AutoLock lock(lock_);
348 testing_observer_ = observer; 199 testing_observer_ = observer;
349 } 200 }
350 201
351 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 202 bool SequencedWorkerPool::PostTaskHelper(
203 const std::string* optional_token_name,
204 SequenceToken sequence_token,
205 WorkerShutdown shutdown_behavior,
206 const tracked_objects::Location& from_here,
207 const Closure& task) {
208 SequencedTask sequenced;
209 sequenced.sequence_token_id = sequence_token.id_;
210 sequenced.shutdown_behavior = shutdown_behavior;
211 sequenced.location = from_here;
212 sequenced.task = task;
213
214 int create_thread_id = 0;
352 { 215 {
353 base::AutoLock lock(lock_); 216 AutoLock lock(lock_);
217 if (shutdown_called_)
218 return false;
219
220 // Now that we have the lock, apply the named token rules.
221 if (optional_token_name)
222 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
223
224 pending_tasks_.push_back(sequenced);
225 pending_task_count_++;
226 if (shutdown_behavior == BLOCK_SHUTDOWN)
227 blocking_shutdown_pending_task_count_++;
228
229 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
230 }
231
232 // Actually start the additional thread or signal an existing one now that
233 // we're outside the lock.
234 if (create_thread_id)
235 FinishStartingAdditionalThread(create_thread_id);
236 else
237 cond_var_.Signal();
238
239 return true;
240 }
241
242 void SequencedWorkerPool::ThreadLoop(Worker* this_worker) {
243 {
244 AutoLock lock(lock_);
354 DCHECK(thread_being_created_); 245 DCHECK(thread_being_created_);
355 thread_being_created_ = false; 246 thread_being_created_ = false;
356 threads_.push_back(linked_ptr<Worker>(this_worker)); 247 threads_.push_back(linked_ptr<Worker>(this_worker));
357 248
358 while (true) { 249 while (true) {
359 // See GetWork for what delete_these_outside_lock is doing. 250 // See GetWork for what delete_these_outside_lock is doing.
360 SequencedTask task; 251 SequencedTask task;
361 std::vector<base::Closure> delete_these_outside_lock; 252 std::vector<Closure> delete_these_outside_lock;
362 if (GetWork(&task, &delete_these_outside_lock)) { 253 if (GetWork(&task, &delete_these_outside_lock)) {
363 int new_thread_id = WillRunWorkerTask(task); 254 int new_thread_id = WillRunWorkerTask(task);
364 { 255 {
365 base::AutoUnlock unlock(lock_); 256 AutoUnlock unlock(lock_);
366 cond_var_.Signal(); 257 cond_var_.Signal();
367 delete_these_outside_lock.clear(); 258 delete_these_outside_lock.clear();
368 259
369 // Complete thread creation outside the lock if necessary. 260 // Complete thread creation outside the lock if necessary.
370 if (new_thread_id) 261 if (new_thread_id)
371 FinishStartingAdditionalThread(new_thread_id); 262 FinishStartingAdditionalThread(new_thread_id);
372 263
373 task.task.Run(); 264 task.task.Run();
374 265
375 // Make sure our task is erased outside the lock for the same reason 266 // Make sure our task is erased outside the lock for the same reason
376 // we do this with delete_these_oustide_lock. 267 // we do this with delete_these_oustide_lock.
377 task.task = base::Closure(); 268 task.task = Closure();
378 } 269 }
379 DidRunWorkerTask(task); // Must be done inside the lock. 270 DidRunWorkerTask(task); // Must be done inside the lock.
380 } else { 271 } else {
381 // When we're terminating and there's no more work, we can shut down. 272 // When we're terminating and there's no more work, we can
382 // You can't get more tasks posted once terminating_ is set. There may 273 // shut down. You can't get more tasks posted once
383 // be some tasks stuck behind running ones with the same sequence 274 // shutdown_called_ is set. There may be some tasks stuck
384 // token, but additional threads won't help this case. 275 // behind running ones with the same sequence token, but
385 if (terminating_) 276 // additional threads won't help this case.
277 if (shutdown_called_)
386 break; 278 break;
387 waiting_thread_count_++; 279 waiting_thread_count_++;
388 cond_var_.Signal(); // For Flush() that may be waiting on the 280 cond_var_.Signal(); // For Flush() that may be waiting on the
389 // waiting thread count to go up. 281 // waiting thread count to go up.
390 cond_var_.Wait(); 282 cond_var_.Wait();
391 waiting_thread_count_--; 283 waiting_thread_count_--;
392 } 284 }
393 } 285 }
394 } 286 }
395 287
396 // We noticed we should exit. Wake up the next worker so it knows it should 288 // We noticed we should exit. Wake up the next worker so it knows it should
397 // exit as well (because the Shutdown() code only signals once). 289 // exit as well (because the Shutdown() code only signals once).
398 cond_var_.Signal(); 290 cond_var_.Signal();
399 } 291 }
400 292
401 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 293 int SequencedWorkerPool::LockedGetNamedTokenID(
402 const std::string& name) { 294 const std::string& name) {
403 lock_.AssertAcquired(); 295 lock_.AssertAcquired();
404 DCHECK(!name.empty()); 296 DCHECK(!name.empty());
405 297
406 std::map<std::string, int>::const_iterator found = 298 std::map<std::string, int>::const_iterator found =
407 named_sequence_tokens_.find(name); 299 named_sequence_tokens_.find(name);
408 if (found != named_sequence_tokens_.end()) 300 if (found != named_sequence_tokens_.end())
409 return found->second; // Got an existing one. 301 return found->second; // Got an existing one.
410 302
411 // Create a new one for this name. 303 // Create a new one for this name.
412 SequenceToken result = GetSequenceToken(); 304 SequenceToken result = GetSequenceToken();
413 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 305 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
414 return result.id_; 306 return result.id_;
415 } 307 }
416 308
417 bool SequencedWorkerPool::Inner::GetWork( 309 bool SequencedWorkerPool::GetWork(
418 SequencedTask* task, 310 SequencedTask* task,
419 std::vector<base::Closure>* delete_these_outside_lock) { 311 std::vector<Closure>* delete_these_outside_lock) {
420 lock_.AssertAcquired(); 312 lock_.AssertAcquired();
421 313
422 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); 314 DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
423 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 315 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
424 static_cast<int>(pending_task_count_)); 316 static_cast<int>(pending_task_count_));
425 317
426 // Find the next task with a sequence token that's not currently in use. 318 // Find the next task with a sequence token that's not currently in use.
427 // If the token is in use, that means another thread is running something 319 // If the token is in use, that means another thread is running something
428 // in that sequence, and we can't run it without going out-of-order. 320 // in that sequence, and we can't run it without going out-of-order.
429 // 321 //
(...skipping 18 matching lines...) Expand all
448 bool found_task = false; 340 bool found_task = false;
449 int unrunnable_tasks = 0; 341 int unrunnable_tasks = 0;
450 std::list<SequencedTask>::iterator i = pending_tasks_.begin(); 342 std::list<SequencedTask>::iterator i = pending_tasks_.begin();
451 while (i != pending_tasks_.end()) { 343 while (i != pending_tasks_.end()) {
452 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 344 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
453 unrunnable_tasks++; 345 unrunnable_tasks++;
454 ++i; 346 ++i;
455 continue; 347 continue;
456 } 348 }
457 349
458 if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 350 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
459 // We're shutting down and the task we just found isn't blocking 351 // We're shutting down and the task we just found isn't blocking
460 // shutdown. Delete it and get more work. 352 // shutdown. Delete it and get more work.
461 // 353 //
462 // Note that we do not want to delete unrunnable tasks. Deleting a task 354 // Note that we do not want to delete unrunnable tasks. Deleting a task
463 // can have side effects (like freeing some objects) and deleting a 355 // can have side effects (like freeing some objects) and deleting a
464 // task that's supposed to run after one that's currently running could 356 // task that's supposed to run after one that's currently running could
465 // cause an obscure crash. 357 // cause an obscure crash.
466 // 358 //
467 // We really want to delete these tasks outside the lock in case the 359 // We really want to delete these tasks outside the lock in case the
468 // closures are holding refs to objects that want to post work from 360 // closures are holding refs to objects that want to post work from
(...skipping 19 matching lines...) Expand all
488 } 380 }
489 381
490 // Track the number of tasks we had to skip over to see if we should be 382 // Track the number of tasks we had to skip over to see if we should be
491 // making this more efficient. If this number ever becomes large or is 383 // making this more efficient. If this number ever becomes large or is
492 // frequently "some", we should consider the optimization above. 384 // frequently "some", we should consider the optimization above.
493 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 385 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
494 unrunnable_tasks); 386 unrunnable_tasks);
495 return found_task; 387 return found_task;
496 } 388 }
497 389
498 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 390 int SequencedWorkerPool::WillRunWorkerTask(const SequencedTask& task) {
499 lock_.AssertAcquired(); 391 lock_.AssertAcquired();
500 392
501 // Mark the task's sequence number as in use. 393 // Mark the task's sequence number as in use.
502 if (task.sequence_token_id) 394 if (task.sequence_token_id)
503 current_sequences_.insert(task.sequence_token_id); 395 current_sequences_.insert(task.sequence_token_id);
504 396
505 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) 397 if (task.shutdown_behavior == BLOCK_SHUTDOWN)
506 blocking_shutdown_thread_count_++; 398 blocking_shutdown_thread_count_++;
507 399
508 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 400 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
509 // creates a new thread if there is no free one, there is a race when posting 401 // creates a new thread if there is no free one, there is a race when posting
510 // tasks that many tasks could have been posted before a thread started 402 // tasks that many tasks could have been posted before a thread started
511 // running them, so only one thread would have been created. So we also check 403 // running them, so only one thread would have been created. So we also check
512 // whether we should create more threads after removing our task from the 404 // whether we should create more threads after removing our task from the
513 // queue, which also has the nice side effect of creating the workers from 405 // queue, which also has the nice side effect of creating the workers from
514 // background threads rather than the main thread of the app. 406 // background threads rather than the main thread of the app.
515 // 407 //
516 // If another thread wasn't created, we want to wake up an existing thread 408 // If another thread wasn't created, we want to wake up an existing thread
517 // if there is one waiting to pick up the next task. 409 // if there is one waiting to pick up the next task.
518 // 410 //
519 // Note that we really need to do this *before* running the task, not 411 // Note that we really need to do this *before* running the task, not
520 // after. Otherwise, if more than one task is posted, the creation of the 412 // after. Otherwise, if more than one task is posted, the creation of the
521 // second thread (since we only create one at a time) will be blocked by 413 // second thread (since we only create one at a time) will be blocked by
522 // the execution of the first task, which could be arbitrarily long. 414 // the execution of the first task, which could be arbitrarily long.
523 return PrepareToStartAdditionalThreadIfHelpful(); 415 return PrepareToStartAdditionalThreadIfHelpful();
524 } 416 }
525 417
526 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 418 void SequencedWorkerPool::DidRunWorkerTask(const SequencedTask& task) {
527 lock_.AssertAcquired(); 419 lock_.AssertAcquired();
528 420
529 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) { 421 if (task.shutdown_behavior == BLOCK_SHUTDOWN) {
530 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 422 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
531 blocking_shutdown_thread_count_--; 423 blocking_shutdown_thread_count_--;
532 } 424 }
533 425
534 if (task.sequence_token_id) 426 if (task.sequence_token_id)
535 current_sequences_.erase(task.sequence_token_id); 427 current_sequences_.erase(task.sequence_token_id);
536 } 428 }
537 429
538 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 430 bool SequencedWorkerPool::IsSequenceTokenRunnable(
539 int sequence_token_id) const { 431 int sequence_token_id) const {
540 lock_.AssertAcquired(); 432 lock_.AssertAcquired();
541 return !sequence_token_id || 433 return !sequence_token_id ||
542 current_sequences_.find(sequence_token_id) == 434 current_sequences_.find(sequence_token_id) ==
543 current_sequences_.end(); 435 current_sequences_.end();
544 } 436 }
545 437
546 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 438 int SequencedWorkerPool::PrepareToStartAdditionalThreadIfHelpful() {
439 lock_.AssertAcquired();
547 // How thread creation works: 440 // How thread creation works:
548 // 441 //
549 // We'de like to avoid creating threads with the lock held. However, we 442 // We'de like to avoid creating threads with the lock held. However, we
550 // need to be sure that we have an accurate accounting of the threads for 443 // need to be sure that we have an accurate accounting of the threads for
551 // proper Joining and deltion on shutdown. 444 // proper Joining and deltion on shutdown.
552 // 445 //
553 // We need to figure out if we need another thread with the lock held, which 446 // We need to figure out if we need another thread with the lock held, which
554 // is what this function does. It then marks us as in the process of creating 447 // is what this function does. It then marks us as in the process of creating
555 // a thread. When we do shutdown, we wait until the thread_being_created_ 448 // a thread. When we do shutdown, we wait until the thread_being_created_
556 // flag is cleared, which ensures that the new thread is properly added to 449 // flag is cleared, which ensures that the new thread is properly added to
557 // all the data structures and we can't leak it. Once shutdown starts, we'll 450 // all the data structures and we can't leak it. Once shutdown starts, we'll
558 // refuse to create more threads or they would be leaked. 451 // refuse to create more threads or they would be leaked.
559 // 452 //
560 // Note that this creates a mostly benign race condition on shutdown that 453 // Note that this creates a mostly benign race condition on shutdown that
561 // will cause fewer workers to be created than one would expect. It isn't 454 // will cause fewer workers to be created than one would expect. It isn't
562 // much of an issue in real life, but affects some tests. Since we only spawn 455 // much of an issue in real life, but affects some tests. Since we only spawn
563 // one worker at a time, the following sequence of events can happen: 456 // one worker at a time, the following sequence of events can happen:
564 // 457 //
565 // 1. Main thread posts a bunch of unrelated tasks that would normally be 458 // 1. Main thread posts a bunch of unrelated tasks that would normally be
566 // run on separate threads. 459 // run on separate threads.
567 // 2. The first task post causes us to start a worker. Other tasks do not 460 // 2. The first task post causes us to start a worker. Other tasks do not
568 // cause a worker to start since one is pending. 461 // cause a worker to start since one is pending.
569 // 3. Main thread initiates shutdown. 462 // 3. Main thread initiates shutdown.
570 // 4. No more threads are created since the terminating_ flag is set. 463 // 4. No more threads are created since the shutdown_called_ flag is set.
571 // 464 //
572 // The result is that one may expect that max_threads_ workers to be created 465 // The result is that one may expect that max_threads_ workers to be created
573 // given the workload, but in reality fewer may be created because the 466 // given the workload, but in reality fewer may be created because the
574 // sequence of thread creation on the background threads is racing with the 467 // sequence of thread creation on the background threads is racing with the
575 // shutdown call. 468 // shutdown call.
576 if (!terminating_ && 469 if (!shutdown_called_ &&
577 !thread_being_created_ && 470 !thread_being_created_ &&
578 threads_.size() < max_threads_ && 471 threads_.size() < max_threads_ &&
579 waiting_thread_count_ == 0) { 472 waiting_thread_count_ == 0) {
580 // We could use an additional thread if there's work to be done. 473 // We could use an additional thread if there's work to be done.
581 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin(); 474 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin();
582 i != pending_tasks_.end(); ++i) { 475 i != pending_tasks_.end(); ++i) {
583 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 476 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
584 // Found a runnable task, mark the thread as being started. 477 // Found a runnable task, mark the thread as being started.
585 thread_being_created_ = true; 478 thread_being_created_ = true;
586 return static_cast<int>(threads_.size() + 1); 479 return static_cast<int>(threads_.size() + 1);
587 } 480 }
588 } 481 }
589 } 482 }
590 return 0; 483 return 0;
591 } 484 }
592 485
593 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 486 void SequencedWorkerPool::FinishStartingAdditionalThread(
594 int thread_number) { 487 int thread_number) {
595 // Called outside of the lock. 488 // Called outside of the lock.
596 DCHECK(thread_number > 0); 489 DCHECK(thread_number > 0);
597 490
598 // The worker is assigned to the list when the thread actually starts, which 491 // The worker is assigned to the list when the thread actually starts, which
599 // will manage the memory of the pointer. 492 // will manage the memory of the pointer.
600 new Worker(this, thread_number, thread_name_prefix_); 493 new Worker(this, thread_number, thread_name_prefix_);
601 } 494 }
602 495
603 bool SequencedWorkerPool::Inner::CanShutdown() const { 496 bool SequencedWorkerPool::CanShutdown() const {
604 lock_.AssertAcquired(); 497 lock_.AssertAcquired();
605 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 498 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
606 return !thread_being_created_ && 499 return !thread_being_created_ &&
607 blocking_shutdown_thread_count_ == 0 && 500 blocking_shutdown_thread_count_ == 0 &&
608 blocking_shutdown_pending_task_count_ == 0; 501 blocking_shutdown_pending_task_count_ == 0;
609 } 502 }
610 503
611 // SequencedWorkerPool --------------------------------------------------------
612
613 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
614 const std::string& thread_name_prefix)
615 : inner_(new Inner(max_threads, thread_name_prefix)) {
616 }
617
618 SequencedWorkerPool::~SequencedWorkerPool() {
619 }
620
621 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
622 return inner_->GetSequenceToken();
623 }
624
625 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
626 const std::string& name) {
627 return inner_->GetNamedSequenceToken(name);
628 }
629
630 bool SequencedWorkerPool::PostWorkerTask(
631 const tracked_objects::Location& from_here,
632 const base::Closure& task) {
633 return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task);
634 }
635
636 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
637 const tracked_objects::Location& from_here,
638 const base::Closure& task,
639 WorkerShutdown shutdown_behavior) {
640 return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task);
641 }
642
643 bool SequencedWorkerPool::PostSequencedWorkerTask(
644 SequenceToken sequence_token,
645 const tracked_objects::Location& from_here,
646 const base::Closure& task) {
647 return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN,
648 from_here, task);
649 }
650
651 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
652 const std::string& token_name,
653 const tracked_objects::Location& from_here,
654 const base::Closure& task) {
655 DCHECK(!token_name.empty());
656 return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task);
657 }
658
659 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
660 SequenceToken sequence_token,
661 const tracked_objects::Location& from_here,
662 const base::Closure& task,
663 WorkerShutdown shutdown_behavior) {
664 return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior,
665 from_here, task);
666 }
667
668 void SequencedWorkerPool::FlushForTesting() {
669 inner_->Flush();
670 }
671
672 void SequencedWorkerPool::Shutdown() {
673 inner_->Shutdown();
674 }
675
676 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
677 inner_->SetTestingObserver(observer);
678 }
679
680 } // namespace base 504 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698