OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <deque> | 7 #include "base/compiler_specific.h" |
8 #include <set> | 8 #include "base/logging.h" |
9 | |
10 #include "base/atomicops.h" | |
11 #include "base/bind.h" | |
12 #include "base/memory/scoped_ptr.h" | |
13 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
14 #include "base/stringprintf.h" | 10 #include "base/stringprintf.h" |
15 #include "base/synchronization/condition_variable.h" | |
16 #include "base/threading/simple_thread.h" | 11 #include "base/threading/simple_thread.h" |
17 #include "base/threading/thread.h" | 12 #include "base/time.h" |
18 | 13 |
19 namespace base { | 14 namespace base { |
20 | 15 |
21 namespace { | |
22 | |
23 struct SequencedTask { | |
24 int sequence_token_id; | |
25 SequencedWorkerPool::WorkerShutdown shutdown_behavior; | |
26 tracked_objects::Location location; | |
27 base::Closure task; | |
28 }; | |
29 | |
30 } // namespace | |
31 | |
32 // Worker --------------------------------------------------------------------- | 16 // Worker --------------------------------------------------------------------- |
33 | 17 |
34 class SequencedWorkerPool::Worker : public base::SimpleThread { | 18 class SequencedWorkerPool::Worker : public SimpleThread { |
35 public: | 19 public: |
36 Worker(SequencedWorkerPool::Inner* inner, | 20 // Hold a ref to |worker_pool|, since we want to keep it around even |
| 21 // if it doesn't join our thread. Note that this (deliberately) |
| 22 // leaks on shutdown. |
| 23 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
37 int thread_number, | 24 int thread_number, |
38 const std::string& thread_name_prefix); | 25 const std::string& thread_name_prefix); |
39 ~Worker(); | 26 virtual ~Worker(); |
40 | 27 |
41 // SimpleThread implementation. This actually runs the background thread. | 28 // SimpleThread implementation. This actually runs the background thread. |
42 virtual void Run(); | 29 virtual void Run() OVERRIDE; |
43 | 30 |
44 private: | 31 private: |
45 SequencedWorkerPool::Inner* inner_; | 32 const scoped_refptr<SequencedWorkerPool> worker_pool_; |
46 SequencedWorkerPool::WorkerShutdown current_shutdown_mode_; | |
47 | 33 |
48 DISALLOW_COPY_AND_ASSIGN(Worker); | 34 DISALLOW_COPY_AND_ASSIGN(Worker); |
49 }; | 35 }; |
50 | 36 |
51 | 37 SequencedWorkerPool::Worker::Worker( |
52 // Inner ---------------------------------------------------------------------- | 38 const scoped_refptr<SequencedWorkerPool>& worker_pool, |
53 | 39 int thread_number, |
54 class SequencedWorkerPool::Inner | 40 const std::string& prefix) |
55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { | 41 : SimpleThread( |
56 public: | |
57 Inner(size_t max_threads, const std::string& thread_name_prefix); | |
58 virtual ~Inner(); | |
59 | |
60 SequenceToken GetSequenceToken(); | |
61 | |
62 SequenceToken GetNamedSequenceToken(const std::string& name); | |
63 | |
64 // This function accepts a name and an ID. If the name is null, the | |
65 // token ID is used. This allows us to implement the optional name lookup | |
66 // from a single function without having to enter the lock a separate time. | |
67 bool PostTask(const std::string* optional_token_name, | |
68 int sequence_token_id, | |
69 SequencedWorkerPool::WorkerShutdown shutdown_behavior, | |
70 const tracked_objects::Location& from_here, | |
71 const base::Closure& task); | |
72 | |
73 void Flush(); | |
74 | |
75 void Shutdown(); | |
76 | |
77 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); | |
78 | |
79 // Runs the worker loop on the background thread. | |
80 void ThreadLoop(Worker* this_worker); | |
81 | |
82 private: | |
83 // Called from within the lock, this converts the given token name into a | |
84 // token ID, creating a new one if necessary. | |
85 int LockedGetNamedTokenID(const std::string& name); | |
86 | |
87 // The calling code should clear the given delete_these_oustide_lock | |
88 // vector the next time the lock is released. See the implementation for | |
89 // a more detailed description. | |
90 bool GetWork(SequencedTask* task, | |
91 std::vector<base::Closure>* delete_these_outside_lock); | |
92 | |
93 // Peforms init and cleanup around running the given task. WillRun... | |
94 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | |
95 // The calling code should call FinishStartingAdditionalThread once the | |
96 // lock is released if the return values is nonzero. | |
97 int WillRunWorkerTask(const SequencedTask& task); | |
98 void DidRunWorkerTask(const SequencedTask& task); | |
99 | |
100 // Returns true if there are no threads currently running the given | |
101 // sequence token. | |
102 bool IsSequenceTokenRunnable(int sequence_token_id) const; | |
103 | |
104 // Checks if all threads are busy and the addition of one more could run an | |
105 // additional task waiting in the queue. This must be called from within | |
106 // the lock. | |
107 // | |
108 // If another thread is helpful, this will mark the thread as being in the | |
109 // process of starting and returns the index of the new thread which will be | |
110 // 0 or more. The caller should then call FinishStartingAdditionalThread to | |
111 // complete initialization once the lock is released. | |
112 // | |
113 // If another thread is not necessary, returne 0; | |
114 // | |
115 // See the implementedion for more. | |
116 int PrepareToStartAdditionalThreadIfHelpful(); | |
117 | |
118 // The second part of thread creation after | |
119 // PrepareToStartAdditionalThreadIfHelpful with the thread number it | |
120 // generated. This actually creates the thread and should be called outside | |
121 // the lock to avoid blocking important work starting a thread in the lock. | |
122 void FinishStartingAdditionalThread(int thread_number); | |
123 | |
124 // Checks whether there is work left that's blocking shutdown. Must be | |
125 // called inside the lock. | |
126 bool CanShutdown() const; | |
127 | |
128 // The last sequence number used. Managed by GetSequenceToken, since this | |
129 // only does threadsafe increment operations, you do not need to hold the | |
130 // lock. | |
131 volatile base::subtle::Atomic32 last_sequence_number_; | |
132 | |
133 // This lock protects |everything in this class|. Do not read or modify | |
134 // anything without holding this lock. Do not block while holding this | |
135 // lock. | |
136 base::Lock lock_; | |
137 | |
138 // Condition variable used to wake up worker threads when a task is runnable. | |
139 base::ConditionVariable cond_var_; | |
140 | |
141 // The maximum number of worker threads we'll create. | |
142 size_t max_threads_; | |
143 | |
144 std::string thread_name_prefix_; | |
145 | |
146 // Associates all known sequence token names with their IDs. | |
147 std::map<std::string, int> named_sequence_tokens_; | |
148 | |
149 // Owning pointers to all threads we've created so far. Since we lazily | |
150 // create threads, this may be less than max_threads_ and will be initially | |
151 // empty. | |
152 std::vector<linked_ptr<Worker> > threads_; | |
153 | |
154 // Set to true when we're in the process of creating another thread. | |
155 // See PrepareToStartAdditionalThreadIfHelpful for more. | |
156 bool thread_being_created_; | |
157 | |
158 // Number of threads currently waiting for work. | |
159 size_t waiting_thread_count_; | |
160 | |
161 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN | |
162 // flag set. | |
163 size_t blocking_shutdown_thread_count_; | |
164 | |
165 // In-order list of all pending tasks. These are tasks waiting for a thread | |
166 // to run on or that are blocked on a previous task in their sequence. | |
167 // | |
168 // We maintain the pending_task_count_ separately for metrics because | |
169 // list.size() can be linear time. | |
170 std::list<SequencedTask> pending_tasks_; | |
171 size_t pending_task_count_; | |
172 | |
173 // Number of tasks in the pending_tasks_ list that are marked as blocking | |
174 // shutdown. | |
175 size_t blocking_shutdown_pending_task_count_; | |
176 | |
177 // Lists all sequence tokens currently executing. | |
178 std::set<int> current_sequences_; | |
179 | |
180 // Set when the app is terminating and no further tasks should be allowed, | |
181 // though we may still be running existing tasks. | |
182 bool terminating_; | |
183 | |
184 // Set when Shutdown is called to do some assertions. | |
185 bool shutdown_called_; | |
186 | |
187 SequencedWorkerPool::TestingObserver* testing_observer_; | |
188 }; | |
189 | |
190 SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner, | |
191 int thread_number, | |
192 const std::string& prefix) | |
193 : base::SimpleThread( | |
194 prefix + StringPrintf("Worker%d", thread_number).c_str()), | 42 prefix + StringPrintf("Worker%d", thread_number).c_str()), |
195 inner_(inner), | 43 worker_pool_(worker_pool) { |
196 current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) { | |
197 Start(); | 44 Start(); |
198 } | 45 } |
199 | 46 |
200 SequencedWorkerPool::Worker::~Worker() { | 47 SequencedWorkerPool::Worker::~Worker() { |
201 } | 48 } |
202 | 49 |
203 void SequencedWorkerPool::Worker::Run() { | 50 void SequencedWorkerPool::Worker::Run() { |
204 // Just jump back to the Inner object to run the thread, since it has all the | 51 // Just jump back to the Inner object to run the thread, since it has all the |
205 // tracking information and queues. It might be more natural to implement | 52 // tracking information and queues. It might be more natural to implement |
206 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 53 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
207 // having these worker objects at all, but that method lacks the ability to | 54 // having these worker objects at all, but that method lacks the ability to |
208 // send thread-specific information easily to the thread loop. | 55 // send thread-specific information easily to the thread loop. |
209 inner_->ThreadLoop(this); | 56 worker_pool_->ThreadLoop(this); |
210 } | 57 } |
211 | 58 |
212 SequencedWorkerPool::Inner::Inner(size_t max_threads, | 59 // SequencedWorkerPool -------------------------------------------------------- |
213 const std::string& thread_name_prefix) | 60 |
| 61 SequencedWorkerPool::SequencedTask::SequencedTask() |
| 62 : sequence_token_id(0), |
| 63 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
| 64 |
| 65 SequencedWorkerPool::SequencedTask::~SequencedTask() {} |
| 66 |
| 67 SequencedWorkerPool::SequencedWorkerPool( |
| 68 size_t max_threads, |
| 69 const std::string& thread_name_prefix) |
214 : last_sequence_number_(0), | 70 : last_sequence_number_(0), |
215 lock_(), | 71 lock_(), |
216 cond_var_(&lock_), | 72 cond_var_(&lock_), |
217 max_threads_(max_threads), | 73 max_threads_(max_threads), |
218 thread_name_prefix_(thread_name_prefix), | 74 thread_name_prefix_(thread_name_prefix), |
219 thread_being_created_(false), | 75 thread_being_created_(false), |
220 waiting_thread_count_(0), | 76 waiting_thread_count_(0), |
221 blocking_shutdown_thread_count_(0), | 77 blocking_shutdown_thread_count_(0), |
222 pending_task_count_(0), | 78 pending_task_count_(0), |
223 blocking_shutdown_pending_task_count_(0), | 79 blocking_shutdown_pending_task_count_(0), |
224 terminating_(false), | |
225 shutdown_called_(false), | 80 shutdown_called_(false), |
226 testing_observer_(NULL) { | 81 testing_observer_(NULL) { |
227 } | 82 } |
228 | 83 |
229 SequencedWorkerPool::Inner::~Inner() { | 84 SequencedWorkerPool::~SequencedWorkerPool() { |
230 // You must call Shutdown() before destroying the pool. | 85 // You must call Shutdown() before destroying the pool. |
231 DCHECK(shutdown_called_); | 86 DCHECK(shutdown_called_); |
232 | 87 |
233 // Need to explicitly join with the threads before they're destroyed or else | 88 // Need to explicitly join with the threads before they're destroyed or else |
234 // they will be running when our object is half torn down. | 89 // they will be running when our object is half torn down. |
235 for (size_t i = 0; i < threads_.size(); i++) | 90 for (size_t i = 0; i < threads_.size(); i++) |
236 threads_[i]->Join(); | 91 threads_[i]->Join(); |
237 threads_.clear(); | 92 threads_.clear(); |
238 } | 93 } |
239 | 94 |
240 SequencedWorkerPool::SequenceToken | 95 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
241 SequencedWorkerPool::Inner::GetSequenceToken() { | 96 subtle::Atomic32 result = |
242 base::subtle::Atomic32 result = | 97 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
243 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); | |
244 return SequenceToken(static_cast<int>(result)); | 98 return SequenceToken(static_cast<int>(result)); |
245 } | 99 } |
246 | 100 |
247 SequencedWorkerPool::SequenceToken | 101 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
248 SequencedWorkerPool::Inner::GetNamedSequenceToken( | |
249 const std::string& name) { | 102 const std::string& name) { |
250 base::AutoLock lock(lock_); | 103 AutoLock lock(lock_); |
251 return SequenceToken(LockedGetNamedTokenID(name)); | 104 return SequenceToken(LockedGetNamedTokenID(name)); |
252 } | 105 } |
253 | 106 |
254 bool SequencedWorkerPool::Inner::PostTask( | 107 bool SequencedWorkerPool::PostWorkerTask( |
255 const std::string* optional_token_name, | |
256 int sequence_token_id, | |
257 SequencedWorkerPool::WorkerShutdown shutdown_behavior, | |
258 const tracked_objects::Location& from_here, | 108 const tracked_objects::Location& from_here, |
259 const base::Closure& task) { | 109 const Closure& task) { |
260 SequencedTask sequenced; | 110 return PostTaskHelper(NULL, SequenceToken(), BLOCK_SHUTDOWN, |
261 sequenced.sequence_token_id = sequence_token_id; | 111 from_here, task); |
262 sequenced.shutdown_behavior = shutdown_behavior; | |
263 sequenced.location = from_here; | |
264 sequenced.task = task; | |
265 | |
266 int create_thread_id = 0; | |
267 { | |
268 base::AutoLock lock(lock_); | |
269 if (terminating_) | |
270 return false; | |
271 | |
272 // Now that we have the lock, apply the named token rules. | |
273 if (optional_token_name) | |
274 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); | |
275 | |
276 pending_tasks_.push_back(sequenced); | |
277 pending_task_count_++; | |
278 if (shutdown_behavior == BLOCK_SHUTDOWN) | |
279 blocking_shutdown_pending_task_count_++; | |
280 | |
281 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | |
282 } | |
283 | |
284 // Actually start the additional thread or signal an existing one now that | |
285 // we're outside the lock. | |
286 if (create_thread_id) | |
287 FinishStartingAdditionalThread(create_thread_id); | |
288 else | |
289 cond_var_.Signal(); | |
290 | |
291 return true; | |
292 } | 112 } |
293 | 113 |
294 void SequencedWorkerPool::Inner::Flush() { | 114 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( |
| 115 const tracked_objects::Location& from_here, |
| 116 const Closure& task, |
| 117 WorkerShutdown shutdown_behavior) { |
| 118 return PostTaskHelper(NULL, SequenceToken(), shutdown_behavior, |
| 119 from_here, task); |
| 120 } |
| 121 |
| 122 bool SequencedWorkerPool::PostSequencedWorkerTask( |
| 123 SequenceToken sequence_token, |
| 124 const tracked_objects::Location& from_here, |
| 125 const Closure& task) { |
| 126 return PostTaskHelper(NULL, sequence_token, BLOCK_SHUTDOWN, |
| 127 from_here, task); |
| 128 } |
| 129 |
| 130 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( |
| 131 const std::string& token_name, |
| 132 const tracked_objects::Location& from_here, |
| 133 const Closure& task) { |
| 134 DCHECK(!token_name.empty()); |
| 135 return PostTaskHelper(&token_name, SequenceToken(), BLOCK_SHUTDOWN, |
| 136 from_here, task); |
| 137 } |
| 138 |
| 139 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( |
| 140 SequenceToken sequence_token, |
| 141 const tracked_objects::Location& from_here, |
| 142 const Closure& task, |
| 143 WorkerShutdown shutdown_behavior) { |
| 144 return PostTaskHelper(NULL, sequence_token, shutdown_behavior, |
| 145 from_here, task); |
| 146 } |
| 147 |
| 148 void SequencedWorkerPool::FlushForTesting() { |
295 { | 149 { |
296 base::AutoLock lock(lock_); | 150 AutoLock lock(lock_); |
297 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) | 151 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
298 cond_var_.Wait(); | 152 cond_var_.Wait(); |
299 } | 153 } |
300 cond_var_.Signal(); | 154 cond_var_.Signal(); |
301 } | 155 } |
302 | 156 |
303 void SequencedWorkerPool::Inner::Shutdown() { | 157 void SequencedWorkerPool::Shutdown() { |
304 if (shutdown_called_) | |
305 return; | |
306 shutdown_called_ = true; | |
307 | |
308 // Mark us as terminated and go through and drop all tasks that aren't | 158 // Mark us as terminated and go through and drop all tasks that aren't |
309 // required to run on shutdown. Since no new tasks will get posted once the | 159 // required to run on shutdown. Since no new tasks will get posted once the |
310 // terminated flag is set, this ensures that all remaining tasks are required | 160 // terminated flag is set, this ensures that all remaining tasks are required |
311 // for shutdown whenever the termianted_ flag is set. | 161 // for shutdown whenever the termianted_ flag is set. |
312 { | 162 { |
313 base::AutoLock lock(lock_); | 163 AutoLock lock(lock_); |
314 DCHECK(!terminating_); | 164 |
315 terminating_ = true; | 165 if (shutdown_called_) |
| 166 return; |
| 167 shutdown_called_ = true; |
316 | 168 |
317 // Tickle the threads. This will wake up a waiting one so it will know that | 169 // Tickle the threads. This will wake up a waiting one so it will know that |
318 // it can exit, which in turn will wake up any other waiting ones. | 170 // it can exit, which in turn will wake up any other waiting ones. |
319 cond_var_.Signal(); | 171 cond_var_.Signal(); |
320 | 172 |
321 // There are no pending or running tasks blocking shutdown, we're done. | 173 // There are no pending or running tasks blocking shutdown, we're done. |
322 if (CanShutdown()) | 174 if (CanShutdown()) |
323 return; | 175 return; |
324 } | 176 } |
325 | 177 |
326 // If we get here, we know we're either waiting on a blocking task that's | 178 // If we get here, we know we're either waiting on a blocking task that's |
327 // currently running, waiting on a blocking task that hasn't been scheduled | 179 // currently running, waiting on a blocking task that hasn't been scheduled |
328 // yet, or both. Block on the "queue empty" event to know when all tasks are | 180 // yet, or both. Block on the "queue empty" event to know when all tasks are |
329 // complete. This must be done outside the lock. | 181 // complete. This must be done outside the lock. |
330 if (testing_observer_) | 182 if (testing_observer_) |
331 testing_observer_->WillWaitForShutdown(); | 183 testing_observer_->WillWaitForShutdown(); |
332 | 184 |
333 base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now(); | 185 TimeTicks shutdown_wait_begin = TimeTicks::Now(); |
334 | 186 |
335 // Wait for no more tasks. | 187 // Wait for no more tasks. |
336 { | 188 { |
337 base::AutoLock lock(lock_); | 189 AutoLock lock(lock_); |
338 while (!CanShutdown()) | 190 while (!CanShutdown()) |
339 cond_var_.Wait(); | 191 cond_var_.Wait(); |
340 } | 192 } |
341 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", | 193 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
342 base::TimeTicks::Now() - shutdown_wait_begin); | 194 TimeTicks::Now() - shutdown_wait_begin); |
343 } | 195 } |
344 | 196 |
345 void SequencedWorkerPool::Inner::SetTestingObserver( | 197 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { |
346 SequencedWorkerPool::TestingObserver* observer) { | 198 AutoLock lock(lock_); |
347 base::AutoLock lock(lock_); | |
348 testing_observer_ = observer; | 199 testing_observer_ = observer; |
349 } | 200 } |
350 | 201 |
351 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 202 bool SequencedWorkerPool::PostTaskHelper( |
| 203 const std::string* optional_token_name, |
| 204 SequenceToken sequence_token, |
| 205 WorkerShutdown shutdown_behavior, |
| 206 const tracked_objects::Location& from_here, |
| 207 const Closure& task) { |
| 208 SequencedTask sequenced; |
| 209 sequenced.sequence_token_id = sequence_token.id_; |
| 210 sequenced.shutdown_behavior = shutdown_behavior; |
| 211 sequenced.location = from_here; |
| 212 sequenced.task = task; |
| 213 |
| 214 int create_thread_id = 0; |
352 { | 215 { |
353 base::AutoLock lock(lock_); | 216 AutoLock lock(lock_); |
| 217 if (shutdown_called_) |
| 218 return false; |
| 219 |
| 220 // Now that we have the lock, apply the named token rules. |
| 221 if (optional_token_name) |
| 222 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| 223 |
| 224 pending_tasks_.push_back(sequenced); |
| 225 pending_task_count_++; |
| 226 if (shutdown_behavior == BLOCK_SHUTDOWN) |
| 227 blocking_shutdown_pending_task_count_++; |
| 228 |
| 229 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| 230 } |
| 231 |
| 232 // Actually start the additional thread or signal an existing one now that |
| 233 // we're outside the lock. |
| 234 if (create_thread_id) |
| 235 FinishStartingAdditionalThread(create_thread_id); |
| 236 else |
| 237 cond_var_.Signal(); |
| 238 |
| 239 return true; |
| 240 } |
| 241 |
| 242 void SequencedWorkerPool::ThreadLoop(Worker* this_worker) { |
| 243 { |
| 244 AutoLock lock(lock_); |
354 DCHECK(thread_being_created_); | 245 DCHECK(thread_being_created_); |
355 thread_being_created_ = false; | 246 thread_being_created_ = false; |
356 threads_.push_back(linked_ptr<Worker>(this_worker)); | 247 threads_.push_back(linked_ptr<Worker>(this_worker)); |
357 | 248 |
358 while (true) { | 249 while (true) { |
359 // See GetWork for what delete_these_outside_lock is doing. | 250 // See GetWork for what delete_these_outside_lock is doing. |
360 SequencedTask task; | 251 SequencedTask task; |
361 std::vector<base::Closure> delete_these_outside_lock; | 252 std::vector<Closure> delete_these_outside_lock; |
362 if (GetWork(&task, &delete_these_outside_lock)) { | 253 if (GetWork(&task, &delete_these_outside_lock)) { |
363 int new_thread_id = WillRunWorkerTask(task); | 254 int new_thread_id = WillRunWorkerTask(task); |
364 { | 255 { |
365 base::AutoUnlock unlock(lock_); | 256 AutoUnlock unlock(lock_); |
366 cond_var_.Signal(); | 257 cond_var_.Signal(); |
367 delete_these_outside_lock.clear(); | 258 delete_these_outside_lock.clear(); |
368 | 259 |
369 // Complete thread creation outside the lock if necessary. | 260 // Complete thread creation outside the lock if necessary. |
370 if (new_thread_id) | 261 if (new_thread_id) |
371 FinishStartingAdditionalThread(new_thread_id); | 262 FinishStartingAdditionalThread(new_thread_id); |
372 | 263 |
373 task.task.Run(); | 264 task.task.Run(); |
374 | 265 |
375 // Make sure our task is erased outside the lock for the same reason | 266 // Make sure our task is erased outside the lock for the same reason |
376 // we do this with delete_these_oustide_lock. | 267 // we do this with delete_these_oustide_lock. |
377 task.task = base::Closure(); | 268 task.task = Closure(); |
378 } | 269 } |
379 DidRunWorkerTask(task); // Must be done inside the lock. | 270 DidRunWorkerTask(task); // Must be done inside the lock. |
380 } else { | 271 } else { |
381 // When we're terminating and there's no more work, we can shut down. | 272 // When we're terminating and there's no more work, we can |
382 // You can't get more tasks posted once terminating_ is set. There may | 273 // shut down. You can't get more tasks posted once |
383 // be some tasks stuck behind running ones with the same sequence | 274 // shutdown_called_ is set. There may be some tasks stuck |
384 // token, but additional threads won't help this case. | 275 // behind running ones with the same sequence token, but |
385 if (terminating_) | 276 // additional threads won't help this case. |
| 277 if (shutdown_called_) |
386 break; | 278 break; |
387 waiting_thread_count_++; | 279 waiting_thread_count_++; |
388 cond_var_.Signal(); // For Flush() that may be waiting on the | 280 cond_var_.Signal(); // For Flush() that may be waiting on the |
389 // waiting thread count to go up. | 281 // waiting thread count to go up. |
390 cond_var_.Wait(); | 282 cond_var_.Wait(); |
391 waiting_thread_count_--; | 283 waiting_thread_count_--; |
392 } | 284 } |
393 } | 285 } |
394 } | 286 } |
395 | 287 |
396 // We noticed we should exit. Wake up the next worker so it knows it should | 288 // We noticed we should exit. Wake up the next worker so it knows it should |
397 // exit as well (because the Shutdown() code only signals once). | 289 // exit as well (because the Shutdown() code only signals once). |
398 cond_var_.Signal(); | 290 cond_var_.Signal(); |
399 } | 291 } |
400 | 292 |
401 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( | 293 int SequencedWorkerPool::LockedGetNamedTokenID( |
402 const std::string& name) { | 294 const std::string& name) { |
403 lock_.AssertAcquired(); | 295 lock_.AssertAcquired(); |
404 DCHECK(!name.empty()); | 296 DCHECK(!name.empty()); |
405 | 297 |
406 std::map<std::string, int>::const_iterator found = | 298 std::map<std::string, int>::const_iterator found = |
407 named_sequence_tokens_.find(name); | 299 named_sequence_tokens_.find(name); |
408 if (found != named_sequence_tokens_.end()) | 300 if (found != named_sequence_tokens_.end()) |
409 return found->second; // Got an existing one. | 301 return found->second; // Got an existing one. |
410 | 302 |
411 // Create a new one for this name. | 303 // Create a new one for this name. |
412 SequenceToken result = GetSequenceToken(); | 304 SequenceToken result = GetSequenceToken(); |
413 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); | 305 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); |
414 return result.id_; | 306 return result.id_; |
415 } | 307 } |
416 | 308 |
417 bool SequencedWorkerPool::Inner::GetWork( | 309 bool SequencedWorkerPool::GetWork( |
418 SequencedTask* task, | 310 SequencedTask* task, |
419 std::vector<base::Closure>* delete_these_outside_lock) { | 311 std::vector<Closure>* delete_these_outside_lock) { |
420 lock_.AssertAcquired(); | 312 lock_.AssertAcquired(); |
421 | 313 |
422 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); | 314 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); |
423 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", | 315 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", |
424 static_cast<int>(pending_task_count_)); | 316 static_cast<int>(pending_task_count_)); |
425 | 317 |
426 // Find the next task with a sequence token that's not currently in use. | 318 // Find the next task with a sequence token that's not currently in use. |
427 // If the token is in use, that means another thread is running something | 319 // If the token is in use, that means another thread is running something |
428 // in that sequence, and we can't run it without going out-of-order. | 320 // in that sequence, and we can't run it without going out-of-order. |
429 // | 321 // |
(...skipping 18 matching lines...) Expand all Loading... |
448 bool found_task = false; | 340 bool found_task = false; |
449 int unrunnable_tasks = 0; | 341 int unrunnable_tasks = 0; |
450 std::list<SequencedTask>::iterator i = pending_tasks_.begin(); | 342 std::list<SequencedTask>::iterator i = pending_tasks_.begin(); |
451 while (i != pending_tasks_.end()) { | 343 while (i != pending_tasks_.end()) { |
452 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { | 344 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { |
453 unrunnable_tasks++; | 345 unrunnable_tasks++; |
454 ++i; | 346 ++i; |
455 continue; | 347 continue; |
456 } | 348 } |
457 | 349 |
458 if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { | 350 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
459 // We're shutting down and the task we just found isn't blocking | 351 // We're shutting down and the task we just found isn't blocking |
460 // shutdown. Delete it and get more work. | 352 // shutdown. Delete it and get more work. |
461 // | 353 // |
462 // Note that we do not want to delete unrunnable tasks. Deleting a task | 354 // Note that we do not want to delete unrunnable tasks. Deleting a task |
463 // can have side effects (like freeing some objects) and deleting a | 355 // can have side effects (like freeing some objects) and deleting a |
464 // task that's supposed to run after one that's currently running could | 356 // task that's supposed to run after one that's currently running could |
465 // cause an obscure crash. | 357 // cause an obscure crash. |
466 // | 358 // |
467 // We really want to delete these tasks outside the lock in case the | 359 // We really want to delete these tasks outside the lock in case the |
468 // closures are holding refs to objects that want to post work from | 360 // closures are holding refs to objects that want to post work from |
(...skipping 19 matching lines...) Expand all Loading... |
488 } | 380 } |
489 | 381 |
490 // Track the number of tasks we had to skip over to see if we should be | 382 // Track the number of tasks we had to skip over to see if we should be |
491 // making this more efficient. If this number ever becomes large or is | 383 // making this more efficient. If this number ever becomes large or is |
492 // frequently "some", we should consider the optimization above. | 384 // frequently "some", we should consider the optimization above. |
493 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", | 385 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", |
494 unrunnable_tasks); | 386 unrunnable_tasks); |
495 return found_task; | 387 return found_task; |
496 } | 388 } |
497 | 389 |
498 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { | 390 int SequencedWorkerPool::WillRunWorkerTask(const SequencedTask& task) { |
499 lock_.AssertAcquired(); | 391 lock_.AssertAcquired(); |
500 | 392 |
501 // Mark the task's sequence number as in use. | 393 // Mark the task's sequence number as in use. |
502 if (task.sequence_token_id) | 394 if (task.sequence_token_id) |
503 current_sequences_.insert(task.sequence_token_id); | 395 current_sequences_.insert(task.sequence_token_id); |
504 | 396 |
505 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) | 397 if (task.shutdown_behavior == BLOCK_SHUTDOWN) |
506 blocking_shutdown_thread_count_++; | 398 blocking_shutdown_thread_count_++; |
507 | 399 |
508 // We just picked up a task. Since StartAdditionalThreadIfHelpful only | 400 // We just picked up a task. Since StartAdditionalThreadIfHelpful only |
509 // creates a new thread if there is no free one, there is a race when posting | 401 // creates a new thread if there is no free one, there is a race when posting |
510 // tasks that many tasks could have been posted before a thread started | 402 // tasks that many tasks could have been posted before a thread started |
511 // running them, so only one thread would have been created. So we also check | 403 // running them, so only one thread would have been created. So we also check |
512 // whether we should create more threads after removing our task from the | 404 // whether we should create more threads after removing our task from the |
513 // queue, which also has the nice side effect of creating the workers from | 405 // queue, which also has the nice side effect of creating the workers from |
514 // background threads rather than the main thread of the app. | 406 // background threads rather than the main thread of the app. |
515 // | 407 // |
516 // If another thread wasn't created, we want to wake up an existing thread | 408 // If another thread wasn't created, we want to wake up an existing thread |
517 // if there is one waiting to pick up the next task. | 409 // if there is one waiting to pick up the next task. |
518 // | 410 // |
519 // Note that we really need to do this *before* running the task, not | 411 // Note that we really need to do this *before* running the task, not |
520 // after. Otherwise, if more than one task is posted, the creation of the | 412 // after. Otherwise, if more than one task is posted, the creation of the |
521 // second thread (since we only create one at a time) will be blocked by | 413 // second thread (since we only create one at a time) will be blocked by |
522 // the execution of the first task, which could be arbitrarily long. | 414 // the execution of the first task, which could be arbitrarily long. |
523 return PrepareToStartAdditionalThreadIfHelpful(); | 415 return PrepareToStartAdditionalThreadIfHelpful(); |
524 } | 416 } |
525 | 417 |
526 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 418 void SequencedWorkerPool::DidRunWorkerTask(const SequencedTask& task) { |
527 lock_.AssertAcquired(); | 419 lock_.AssertAcquired(); |
528 | 420 |
529 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) { | 421 if (task.shutdown_behavior == BLOCK_SHUTDOWN) { |
530 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 422 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
531 blocking_shutdown_thread_count_--; | 423 blocking_shutdown_thread_count_--; |
532 } | 424 } |
533 | 425 |
534 if (task.sequence_token_id) | 426 if (task.sequence_token_id) |
535 current_sequences_.erase(task.sequence_token_id); | 427 current_sequences_.erase(task.sequence_token_id); |
536 } | 428 } |
537 | 429 |
538 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 430 bool SequencedWorkerPool::IsSequenceTokenRunnable( |
539 int sequence_token_id) const { | 431 int sequence_token_id) const { |
540 lock_.AssertAcquired(); | 432 lock_.AssertAcquired(); |
541 return !sequence_token_id || | 433 return !sequence_token_id || |
542 current_sequences_.find(sequence_token_id) == | 434 current_sequences_.find(sequence_token_id) == |
543 current_sequences_.end(); | 435 current_sequences_.end(); |
544 } | 436 } |
545 | 437 |
546 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { | 438 int SequencedWorkerPool::PrepareToStartAdditionalThreadIfHelpful() { |
| 439 lock_.AssertAcquired(); |
547 // How thread creation works: | 440 // How thread creation works: |
548 // | 441 // |
549 // We'de like to avoid creating threads with the lock held. However, we | 442 // We'de like to avoid creating threads with the lock held. However, we |
550 // need to be sure that we have an accurate accounting of the threads for | 443 // need to be sure that we have an accurate accounting of the threads for |
551 // proper Joining and deltion on shutdown. | 444 // proper Joining and deltion on shutdown. |
552 // | 445 // |
553 // We need to figure out if we need another thread with the lock held, which | 446 // We need to figure out if we need another thread with the lock held, which |
554 // is what this function does. It then marks us as in the process of creating | 447 // is what this function does. It then marks us as in the process of creating |
555 // a thread. When we do shutdown, we wait until the thread_being_created_ | 448 // a thread. When we do shutdown, we wait until the thread_being_created_ |
556 // flag is cleared, which ensures that the new thread is properly added to | 449 // flag is cleared, which ensures that the new thread is properly added to |
557 // all the data structures and we can't leak it. Once shutdown starts, we'll | 450 // all the data structures and we can't leak it. Once shutdown starts, we'll |
558 // refuse to create more threads or they would be leaked. | 451 // refuse to create more threads or they would be leaked. |
559 // | 452 // |
560 // Note that this creates a mostly benign race condition on shutdown that | 453 // Note that this creates a mostly benign race condition on shutdown that |
561 // will cause fewer workers to be created than one would expect. It isn't | 454 // will cause fewer workers to be created than one would expect. It isn't |
562 // much of an issue in real life, but affects some tests. Since we only spawn | 455 // much of an issue in real life, but affects some tests. Since we only spawn |
563 // one worker at a time, the following sequence of events can happen: | 456 // one worker at a time, the following sequence of events can happen: |
564 // | 457 // |
565 // 1. Main thread posts a bunch of unrelated tasks that would normally be | 458 // 1. Main thread posts a bunch of unrelated tasks that would normally be |
566 // run on separate threads. | 459 // run on separate threads. |
567 // 2. The first task post causes us to start a worker. Other tasks do not | 460 // 2. The first task post causes us to start a worker. Other tasks do not |
568 // cause a worker to start since one is pending. | 461 // cause a worker to start since one is pending. |
569 // 3. Main thread initiates shutdown. | 462 // 3. Main thread initiates shutdown. |
570 // 4. No more threads are created since the terminating_ flag is set. | 463 // 4. No more threads are created since the shutdown_called_ flag is set. |
571 // | 464 // |
572 // The result is that one may expect that max_threads_ workers to be created | 465 // The result is that one may expect that max_threads_ workers to be created |
573 // given the workload, but in reality fewer may be created because the | 466 // given the workload, but in reality fewer may be created because the |
574 // sequence of thread creation on the background threads is racing with the | 467 // sequence of thread creation on the background threads is racing with the |
575 // shutdown call. | 468 // shutdown call. |
576 if (!terminating_ && | 469 if (!shutdown_called_ && |
577 !thread_being_created_ && | 470 !thread_being_created_ && |
578 threads_.size() < max_threads_ && | 471 threads_.size() < max_threads_ && |
579 waiting_thread_count_ == 0) { | 472 waiting_thread_count_ == 0) { |
580 // We could use an additional thread if there's work to be done. | 473 // We could use an additional thread if there's work to be done. |
581 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin(); | 474 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin(); |
582 i != pending_tasks_.end(); ++i) { | 475 i != pending_tasks_.end(); ++i) { |
583 if (IsSequenceTokenRunnable(i->sequence_token_id)) { | 476 if (IsSequenceTokenRunnable(i->sequence_token_id)) { |
584 // Found a runnable task, mark the thread as being started. | 477 // Found a runnable task, mark the thread as being started. |
585 thread_being_created_ = true; | 478 thread_being_created_ = true; |
586 return static_cast<int>(threads_.size() + 1); | 479 return static_cast<int>(threads_.size() + 1); |
587 } | 480 } |
588 } | 481 } |
589 } | 482 } |
590 return 0; | 483 return 0; |
591 } | 484 } |
592 | 485 |
593 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 486 void SequencedWorkerPool::FinishStartingAdditionalThread( |
594 int thread_number) { | 487 int thread_number) { |
595 // Called outside of the lock. | 488 // Called outside of the lock. |
596 DCHECK(thread_number > 0); | 489 DCHECK(thread_number > 0); |
597 | 490 |
598 // The worker is assigned to the list when the thread actually starts, which | 491 // The worker is assigned to the list when the thread actually starts, which |
599 // will manage the memory of the pointer. | 492 // will manage the memory of the pointer. |
600 new Worker(this, thread_number, thread_name_prefix_); | 493 new Worker(this, thread_number, thread_name_prefix_); |
601 } | 494 } |
602 | 495 |
603 bool SequencedWorkerPool::Inner::CanShutdown() const { | 496 bool SequencedWorkerPool::CanShutdown() const { |
604 lock_.AssertAcquired(); | 497 lock_.AssertAcquired(); |
605 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 498 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
606 return !thread_being_created_ && | 499 return !thread_being_created_ && |
607 blocking_shutdown_thread_count_ == 0 && | 500 blocking_shutdown_thread_count_ == 0 && |
608 blocking_shutdown_pending_task_count_ == 0; | 501 blocking_shutdown_pending_task_count_ == 0; |
609 } | 502 } |
610 | 503 |
611 // SequencedWorkerPool -------------------------------------------------------- | |
612 | |
613 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, | |
614 const std::string& thread_name_prefix) | |
615 : inner_(new Inner(max_threads, thread_name_prefix)) { | |
616 } | |
617 | |
618 SequencedWorkerPool::~SequencedWorkerPool() { | |
619 } | |
620 | |
621 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | |
622 return inner_->GetSequenceToken(); | |
623 } | |
624 | |
625 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | |
626 const std::string& name) { | |
627 return inner_->GetNamedSequenceToken(name); | |
628 } | |
629 | |
630 bool SequencedWorkerPool::PostWorkerTask( | |
631 const tracked_objects::Location& from_here, | |
632 const base::Closure& task) { | |
633 return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); | |
634 } | |
635 | |
636 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( | |
637 const tracked_objects::Location& from_here, | |
638 const base::Closure& task, | |
639 WorkerShutdown shutdown_behavior) { | |
640 return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); | |
641 } | |
642 | |
643 bool SequencedWorkerPool::PostSequencedWorkerTask( | |
644 SequenceToken sequence_token, | |
645 const tracked_objects::Location& from_here, | |
646 const base::Closure& task) { | |
647 return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, | |
648 from_here, task); | |
649 } | |
650 | |
651 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( | |
652 const std::string& token_name, | |
653 const tracked_objects::Location& from_here, | |
654 const base::Closure& task) { | |
655 DCHECK(!token_name.empty()); | |
656 return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); | |
657 } | |
658 | |
659 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( | |
660 SequenceToken sequence_token, | |
661 const tracked_objects::Location& from_here, | |
662 const base::Closure& task, | |
663 WorkerShutdown shutdown_behavior) { | |
664 return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, | |
665 from_here, task); | |
666 } | |
667 | |
668 void SequencedWorkerPool::FlushForTesting() { | |
669 inner_->Flush(); | |
670 } | |
671 | |
672 void SequencedWorkerPool::Shutdown() { | |
673 inner_->Shutdown(); | |
674 } | |
675 | |
676 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { | |
677 inner_->SetTestingObserver(observer); | |
678 } | |
679 | |
680 } // namespace base | 504 } // namespace base |
OLD | NEW |