OLD | NEW |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ | 5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ |
6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ | 6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ |
7 #pragma once | 7 #pragma once |
8 | 8 |
| 9 #include <cstddef> |
| 10 #include <list> |
| 11 #include <map> |
| 12 #include <set> |
9 #include <string> | 13 #include <string> |
| 14 #include <vector> |
10 | 15 |
| 16 #include "base/atomicops.h" |
| 17 #include "base/base_export.h" |
| 18 #include "base/basictypes.h" |
11 #include "base/callback.h" | 19 #include "base/callback.h" |
12 #include "base/memory/linked_ptr.h" | 20 #include "base/memory/linked_ptr.h" |
13 #include "base/memory/ref_counted.h" | 21 #include "base/memory/ref_counted.h" |
| 22 #include "base/synchronization/condition_variable.h" |
| 23 #include "base/synchronization/lock.h" |
14 #include "base/tracked_objects.h" | 24 #include "base/tracked_objects.h" |
15 #include "base/base_export.h" | |
16 | 25 |
17 namespace base { | 26 namespace base { |
18 | 27 |
19 // A worker thread pool that enforces ordering between sets of tasks. It also | 28 // A worker thread pool that enforces ordering between sets of tasks. It also |
20 // allows you to specify what should happen to your tasks on shutdown. | 29 // allows you to specify what should happen to your tasks on shutdown. |
21 // | 30 // |
22 // To enforce ordering, get a unique sequence token from the pool and post all | 31 // To enforce ordering, get a unique sequence token from the pool and post all |
23 // tasks you want to order with the token. All tasks with the same token are | 32 // tasks you want to order with the token. All tasks with the same token are |
24 // guaranteed to execute serially, though not necessarily on the same thread. | 33 // guaranteed to execute serially, though not necessarily on the same thread. |
25 // | 34 // |
(...skipping 13 matching lines...) Expand all Loading... |
39 // | 48 // |
40 // This class is designed to be leaked on shutdown to allow the | 49 // This class is designed to be leaked on shutdown to allow the |
41 // CONTINUE_ON_SHUTDOWN behavior to be implemented. To enforce the | 50 // CONTINUE_ON_SHUTDOWN behavior to be implemented. To enforce the |
42 // BLOCK_SHUTDOWN behavior, you must call Shutdown() which will wait until | 51 // BLOCK_SHUTDOWN behavior, you must call Shutdown() which will wait until |
43 // the necessary tasks have completed. | 52 // the necessary tasks have completed. |
44 // | 53 // |
45 // Implementation note: This does not use a base::WorkerPool since that does | 54 // Implementation note: This does not use a base::WorkerPool since that does |
46 // not enforce shutdown semantics or allow us to specify how many worker | 55 // not enforce shutdown semantics or allow us to specify how many worker |
47 // threads to run. For the typical use case of random background work, we don't | 56 // threads to run. For the typical use case of random background work, we don't |
48 // necessarily want to be super aggressive about creating threads. | 57 // necessarily want to be super aggressive about creating threads. |
49 class BASE_EXPORT SequencedWorkerPool { | 58 class BASE_EXPORT SequencedWorkerPool |
| 59 : public RefCountedThreadSafe<SequencedWorkerPool> { |
50 public: | 60 public: |
51 // Defines what should happen to a task posted to the worker pool on shutdown. | 61 // Defines what should happen to a task posted to the worker pool on shutdown. |
52 enum WorkerShutdown { | 62 enum WorkerShutdown { |
53 // Tasks posted with this mode which have not run at shutdown will be | 63 // Tasks posted with this mode which have not run at shutdown will be |
54 // deleted rather than run, and any tasks with this mode running at | 64 // deleted rather than run, and any tasks with this mode running at |
55 // shutdown will be ignored (the worker thread will not be joined). | 65 // shutdown will be ignored (the worker thread will not be joined). |
56 // | 66 // |
57 // This option provides a nice way to post stuff you don't want blocking | 67 // This option provides a nice way to post stuff you don't want blocking |
58 // shutdown. For example, you might be doing a slow DNS lookup and if it's | 68 // shutdown. For example, you might be doing a slow DNS lookup and if it's |
59 // blocked on the OS, you may not want to stop shutdown, since the result | 69 // blocked on the OS, you may not want to stop shutdown, since the result |
(...skipping 20 matching lines...) Expand all Loading... |
80 // Generally, this should be used only for user data, for example, a task | 90 // Generally, this should be used only for user data, for example, a task |
81 // writing a preference file. | 91 // writing a preference file. |
82 // | 92 // |
83 // If a task is posted during shutdown, it will not get run since the | 93 // If a task is posted during shutdown, it will not get run since the |
84 // workers may already be stopped. In this case, the post operation will | 94 // workers may already be stopped. In this case, the post operation will |
85 // fail (return false) and the task will be deleted. | 95 // fail (return false) and the task will be deleted. |
86 BLOCK_SHUTDOWN, | 96 BLOCK_SHUTDOWN, |
87 }; | 97 }; |
88 | 98 |
89 // Opaque identifier that defines sequencing of tasks posted to the worker | 99 // Opaque identifier that defines sequencing of tasks posted to the worker |
90 // pool. See NewSequenceToken(). | 100 // pool. |
91 class SequenceToken { | 101 class SequenceToken { |
92 public: | 102 public: |
93 explicit SequenceToken() : id_(0) {} | 103 SequenceToken() : id_(0) {} |
94 ~SequenceToken() {} | 104 ~SequenceToken() {} |
95 | 105 |
96 bool Equals(const SequenceToken& other) const { | 106 bool Equals(const SequenceToken& other) const { |
97 return id_ == other.id_; | 107 return id_ == other.id_; |
98 } | 108 } |
99 | 109 |
100 private: | 110 private: |
101 friend class SequencedWorkerPool; | 111 friend class SequencedWorkerPool; |
102 | 112 |
103 SequenceToken(int id) : id_(id) {} | 113 explicit SequenceToken(int id) : id_(id) {} |
104 | 114 |
105 int id_; | 115 int id_; |
106 }; | 116 }; |
107 | 117 |
108 // Allows tests to perform certain actions. | 118 // Allows tests to perform certain actions. |
109 class TestingObserver { | 119 class TestingObserver { |
110 public: | 120 public: |
111 virtual ~TestingObserver() {} | 121 virtual ~TestingObserver() {} |
112 virtual void WillWaitForShutdown() = 0; | 122 virtual void WillWaitForShutdown() = 0; |
113 }; | 123 }; |
114 | 124 |
115 // Pass the maximum number of threads (they will be lazily created as needed) | 125 // Pass the maximum number of threads (they will be lazily created as needed) |
116 // and a prefix for the thread name to ad in debugging. | 126 // and a prefix for the thread name to ad in debugging. |
117 SequencedWorkerPool(size_t max_threads, | 127 SequencedWorkerPool(size_t max_threads, |
118 const std::string& thread_name_prefix); | 128 const std::string& thread_name_prefix); |
119 ~SequencedWorkerPool(); | |
120 | 129 |
121 // Returns a unique token that can be used to sequence tasks posted to | 130 // Returns a unique token that can be used to sequence tasks posted to |
122 // PostSequencedWorkerTask(). Valid tokens are alwys nonzero. | 131 // PostSequencedWorkerTask(). Valid tokens are alwys nonzero. |
123 SequenceToken GetSequenceToken(); | 132 SequenceToken GetSequenceToken(); |
124 | 133 |
125 // Returns the sequence token associated with the given name. Calling this | 134 // Returns the sequence token associated with the given name. Calling this |
126 // function multiple times with the same string will always produce the | 135 // function multiple times with the same string will always produce the |
127 // same sequence token. If the name has not been used before, a new token | 136 // same sequence token. If the name has not been used before, a new token |
128 // will be created. | 137 // will be created. |
129 SequenceToken GetNamedSequenceToken(const std::string& name); | 138 SequenceToken GetNamedSequenceToken(const std::string& name); |
(...skipping 12 matching lines...) Expand all Loading... |
142 // cause nondeterministic crashes because the task could be keeping some | 151 // cause nondeterministic crashes because the task could be keeping some |
143 // objects alive which do work in their destructor, which could voilate the | 152 // objects alive which do work in their destructor, which could voilate the |
144 // assumptions of the running task. | 153 // assumptions of the running task. |
145 // | 154 // |
146 // The task will be guaranteed to run to completion before shutdown | 155 // The task will be guaranteed to run to completion before shutdown |
147 // (BLOCK_SHUTDOWN semantics). | 156 // (BLOCK_SHUTDOWN semantics). |
148 // | 157 // |
149 // Returns true if the task was posted successfully. This may fail during | 158 // Returns true if the task was posted successfully. This may fail during |
150 // shutdown regardless of the specified ShutdownBehavior. | 159 // shutdown regardless of the specified ShutdownBehavior. |
151 bool PostWorkerTask(const tracked_objects::Location& from_here, | 160 bool PostWorkerTask(const tracked_objects::Location& from_here, |
152 const base::Closure& task); | 161 const Closure& task); |
153 | 162 |
154 // Same as PostWorkerTask but allows specification of the shutdown behavior. | 163 // Same as PostWorkerTask but allows specification of the shutdown behavior. |
155 bool PostWorkerTaskWithShutdownBehavior( | 164 bool PostWorkerTaskWithShutdownBehavior( |
156 const tracked_objects::Location& from_here, | 165 const tracked_objects::Location& from_here, |
157 const base::Closure& task, | 166 const Closure& task, |
158 WorkerShutdown shutdown_behavior); | 167 WorkerShutdown shutdown_behavior); |
159 | 168 |
160 // Like PostWorkerTask above, but provides sequencing semantics. This means | 169 // Like PostWorkerTask above, but provides sequencing semantics. This means |
161 // that tasks posted with the same sequence token (see GetSequenceToken()) | 170 // that tasks posted with the same sequence token (see GetSequenceToken()) |
162 // are guaranteed to execute in order. This is useful in cases where you're | 171 // are guaranteed to execute in order. This is useful in cases where you're |
163 // doing operations that may depend on previous ones, like appending to a | 172 // doing operations that may depend on previous ones, like appending to a |
164 // file. | 173 // file. |
165 // | 174 // |
166 // The task will be guaranteed to run to completion before shutdown | 175 // The task will be guaranteed to run to completion before shutdown |
167 // (BLOCK_SHUTDOWN semantics). | 176 // (BLOCK_SHUTDOWN semantics). |
168 // | 177 // |
169 // Returns true if the task was posted successfully. This may fail during | 178 // Returns true if the task was posted successfully. This may fail during |
170 // shutdown regardless of the specified ShutdownBehavior. | 179 // shutdown regardless of the specified ShutdownBehavior. |
171 bool PostSequencedWorkerTask(SequenceToken sequence_token, | 180 bool PostSequencedWorkerTask(SequenceToken sequence_token, |
172 const tracked_objects::Location& from_here, | 181 const tracked_objects::Location& from_here, |
173 const base::Closure& task); | 182 const Closure& task); |
174 | 183 |
175 // Like PostSequencedWorkerTask above, but allows you to specify a named | 184 // Like PostSequencedWorkerTask above, but allows you to specify a named |
176 // token, which saves an extra call to GetNamedSequenceToken. | 185 // token, which saves an extra call to GetNamedSequenceToken. |
177 bool PostNamedSequencedWorkerTask(const std::string& token_name, | 186 bool PostNamedSequencedWorkerTask(const std::string& token_name, |
178 const tracked_objects::Location& from_here, | 187 const tracked_objects::Location& from_here, |
179 const base::Closure& task); | 188 const Closure& task); |
180 | 189 |
181 // Same as PostSequencedWorkerTask but allows specification of the shutdown | 190 // Same as PostSequencedWorkerTask but allows specification of the shutdown |
182 // behavior. | 191 // behavior. |
183 bool PostSequencedWorkerTaskWithShutdownBehavior( | 192 bool PostSequencedWorkerTaskWithShutdownBehavior( |
184 SequenceToken sequence_token, | 193 SequenceToken sequence_token, |
185 const tracked_objects::Location& from_here, | 194 const tracked_objects::Location& from_here, |
186 const base::Closure& task, | 195 const Closure& task, |
187 WorkerShutdown shutdown_behavior); | 196 WorkerShutdown shutdown_behavior); |
188 | 197 |
189 // Blocks until all pending tasks are complete. This should only be called in | 198 // Blocks until all pending tasks are complete. This should only be called in |
190 // unit tests when you want to validate something that should have happened. | 199 // unit tests when you want to validate something that should have happened. |
191 // | 200 // |
192 // Note that calling this will not prevent other threads from posting work to | 201 // Note that calling this will not prevent other threads from posting work to |
193 // the queue while the calling thread is waiting on Flush(). In this case, | 202 // the queue while the calling thread is waiting on Flush(). In this case, |
194 // Flush will return only when there's no more work in the queue. Normally, | 203 // Flush will return only when there's no more work in the queue. Normally, |
195 // this doesn't come up sine in a test, all the work is being posted from | 204 // this doesn't come up sine in a test, all the work is being posted from |
196 // the main thread. | 205 // the main thread. |
197 void FlushForTesting(); | 206 void FlushForTesting(); |
198 | 207 |
199 // Implements the worker pool shutdown. This should be called during app | 208 // Implements the worker pool shutdown. This should be called during app |
200 // shutdown, and will discard/join with appropriate tasks before returning. | 209 // shutdown, and will discard/join with appropriate tasks before returning. |
201 // After this call, subsequent calls to post tasks will fail. | 210 // After this call, subsequent calls to post tasks will fail. |
202 void Shutdown(); | 211 void Shutdown(); |
203 | 212 |
204 // Called by tests to set the testing observer. This is NULL by default | 213 // Called by tests to set the testing observer. This is NULL by default |
205 // and ownership of the pointer is kept with the caller. | 214 // and ownership of the pointer is kept with the caller. |
206 void SetTestingObserver(TestingObserver* observer); | 215 void SetTestingObserver(TestingObserver* observer); |
207 | 216 |
208 private: | 217 private: |
209 class Inner; | 218 friend class RefCountedThreadSafe<SequencedWorkerPool>; |
210 class Worker; | 219 class Worker; |
211 | 220 |
212 friend class Inner; | 221 struct SequencedTask { |
213 friend class Worker; | 222 SequencedTask(); |
| 223 ~SequencedTask(); |
214 | 224 |
215 scoped_refptr<Inner> inner_; | 225 int sequence_token_id; |
| 226 WorkerShutdown shutdown_behavior; |
| 227 tracked_objects::Location location; |
| 228 Closure task; |
| 229 }; |
| 230 |
| 231 ~SequencedWorkerPool(); |
| 232 |
| 233 // This function accepts a name and an ID. If the name is null, the |
| 234 // token ID is used. This allows us to implement the optional name lookup |
| 235 // from a single function without having to enter the lock a separate time. |
| 236 bool PostTaskHelper(const std::string* optional_token_name, |
| 237 SequenceToken sequence_token, |
| 238 WorkerShutdown shutdown_behavior, |
| 239 const tracked_objects::Location& from_here, |
| 240 const Closure& task); |
| 241 |
| 242 // Runs the worker loop on the background thread. |
| 243 void ThreadLoop(Worker* this_worker); |
| 244 |
| 245 // Called from within the lock, this converts the given token name into a |
| 246 // token ID, creating a new one if necessary. |
| 247 int LockedGetNamedTokenID(const std::string& name); |
| 248 |
| 249 // The calling code should clear the given delete_these_oustide_lock |
| 250 // vector the next time the lock is released. See the implementation for |
| 251 // a more detailed description. |
| 252 bool GetWork(SequencedTask* task, |
| 253 std::vector<Closure>* delete_these_outside_lock); |
| 254 |
| 255 // Peforms init and cleanup around running the given task. WillRun... |
| 256 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| 257 // The calling code should call FinishStartingAdditionalThread once the |
| 258 // lock is released if the return values is nonzero. |
| 259 int WillRunWorkerTask(const SequencedTask& task); |
| 260 void DidRunWorkerTask(const SequencedTask& task); |
| 261 |
| 262 // Returns true if there are no threads currently running the given |
| 263 // sequence token. |
| 264 bool IsSequenceTokenRunnable(int sequence_token_id) const; |
| 265 |
| 266 // Checks if all threads are busy and the addition of one more could run an |
| 267 // additional task waiting in the queue. This must be called from within |
| 268 // the lock. |
| 269 // |
| 270 // If another thread is helpful, this will mark the thread as being in the |
| 271 // process of starting and returns the index of the new thread which will be |
| 272 // 0 or more. The caller should then call FinishStartingAdditionalThread to |
| 273 // complete initialization once the lock is released. |
| 274 // |
| 275 // If another thread is not necessary, returne 0; |
| 276 // |
| 277 // See the implementedion for more. |
| 278 int PrepareToStartAdditionalThreadIfHelpful(); |
| 279 |
| 280 // The second part of thread creation after |
| 281 // PrepareToStartAdditionalThreadIfHelpful with the thread number it |
| 282 // generated. This actually creates the thread and should be called outside |
| 283 // the lock to avoid blocking important work starting a thread in the lock. |
| 284 void FinishStartingAdditionalThread(int thread_number); |
| 285 |
| 286 // Checks whether there is work left that's blocking shutdown. Must be |
| 287 // called inside the lock. |
| 288 bool CanShutdown() const; |
| 289 |
| 290 // The last sequence number used. Managed by GetSequenceToken, since this |
| 291 // only does threadsafe increment operations, you do not need to hold the |
| 292 // lock. |
| 293 volatile subtle::Atomic32 last_sequence_number_; |
| 294 |
| 295 // This lock protects |everything in this class|. Do not read or modify |
| 296 // anything without holding this lock. Do not block while holding this |
| 297 // lock. |
| 298 Lock lock_; |
| 299 |
| 300 // Condition variable used to wake up worker threads when a task is runnable. |
| 301 ConditionVariable cond_var_; |
| 302 |
| 303 // The maximum number of worker threads we'll create. |
| 304 size_t max_threads_; |
| 305 |
| 306 std::string thread_name_prefix_; |
| 307 |
| 308 // Associates all known sequence token names with their IDs. |
| 309 std::map<std::string, int> named_sequence_tokens_; |
| 310 |
| 311 // Owning pointers to all threads we've created so far. Since we lazily |
| 312 // create threads, this may be less than max_threads_ and will be initially |
| 313 // empty. |
| 314 std::vector<linked_ptr<Worker> > threads_; |
| 315 |
| 316 // Set to true when we're in the process of creating another thread. |
| 317 // See PrepareToStartAdditionalThreadIfHelpful for more. |
| 318 bool thread_being_created_; |
| 319 |
| 320 // Number of threads currently waiting for work. |
| 321 size_t waiting_thread_count_; |
| 322 |
| 323 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN |
| 324 // flag set. |
| 325 size_t blocking_shutdown_thread_count_; |
| 326 |
| 327 // In-order list of all pending tasks. These are tasks waiting for a thread |
| 328 // to run on or that are blocked on a previous task in their sequence. |
| 329 // |
| 330 // We maintain the pending_task_count_ separately for metrics because |
| 331 // list.size() can be linear time. |
| 332 std::list<SequencedTask> pending_tasks_; |
| 333 size_t pending_task_count_; |
| 334 |
| 335 // Number of tasks in the pending_tasks_ list that are marked as blocking |
| 336 // shutdown. |
| 337 size_t blocking_shutdown_pending_task_count_; |
| 338 |
| 339 // Lists all sequence tokens currently executing. |
| 340 std::set<int> current_sequences_; |
| 341 |
| 342 // Set when Shutdown is called and no further tasks should be |
| 343 // allowed, though we may still be running existing tasks. |
| 344 bool shutdown_called_; |
| 345 |
| 346 TestingObserver* testing_observer_; |
216 | 347 |
217 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); | 348 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); |
218 }; | 349 }; |
219 | 350 |
220 } // namespace base | 351 } // namespace base |
221 | 352 |
222 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ | 353 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ |
OLD | NEW |