Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(29)

Side by Side Diff: base/threading/sequenced_worker_pool.h

Issue 9347056: Fix up SequencedWorkerPool in preparation for making it a TaskRunner (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address dom_storage comments Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | base/threading/sequenced_worker_pool.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
7 #pragma once 7 #pragma once
8 8
9 #include <cstddef>
10 #include <list>
11 #include <map>
12 #include <set>
9 #include <string> 13 #include <string>
14 #include <vector>
10 15
16 #include "base/atomicops.h"
17 #include "base/base_export.h"
18 #include "base/basictypes.h"
11 #include "base/callback.h" 19 #include "base/callback.h"
12 #include "base/memory/linked_ptr.h" 20 #include "base/memory/linked_ptr.h"
13 #include "base/memory/ref_counted.h" 21 #include "base/memory/ref_counted.h"
22 #include "base/synchronization/condition_variable.h"
brettw 2012/02/24 05:43:34 One of the reasons I liked the inner class was tha
23 #include "base/synchronization/lock.h"
14 #include "base/tracked_objects.h" 24 #include "base/tracked_objects.h"
15 #include "base/base_export.h"
16 25
17 namespace base { 26 namespace base {
18 27
19 // A worker thread pool that enforces ordering between sets of tasks. It also 28 // A worker thread pool that enforces ordering between sets of tasks. It also
20 // allows you to specify what should happen to your tasks on shutdown. 29 // allows you to specify what should happen to your tasks on shutdown.
21 // 30 //
22 // To enforce ordering, get a unique sequence token from the pool and post all 31 // To enforce ordering, get a unique sequence token from the pool and post all
23 // tasks you want to order with the token. All tasks with the same token are 32 // tasks you want to order with the token. All tasks with the same token are
24 // guaranteed to execute serially, though not necessarily on the same thread. 33 // guaranteed to execute serially, though not necessarily on the same thread.
25 // 34 //
(...skipping 13 matching lines...) Expand all
39 // 48 //
40 // This class is designed to be leaked on shutdown to allow the 49 // This class is designed to be leaked on shutdown to allow the
41 // CONTINUE_ON_SHUTDOWN behavior to be implemented. To enforce the 50 // CONTINUE_ON_SHUTDOWN behavior to be implemented. To enforce the
42 // BLOCK_SHUTDOWN behavior, you must call Shutdown() which will wait until 51 // BLOCK_SHUTDOWN behavior, you must call Shutdown() which will wait until
43 // the necessary tasks have completed. 52 // the necessary tasks have completed.
44 // 53 //
45 // Implementation note: This does not use a base::WorkerPool since that does 54 // Implementation note: This does not use a base::WorkerPool since that does
46 // not enforce shutdown semantics or allow us to specify how many worker 55 // not enforce shutdown semantics or allow us to specify how many worker
47 // threads to run. For the typical use case of random background work, we don't 56 // threads to run. For the typical use case of random background work, we don't
48 // necessarily want to be super aggressive about creating threads. 57 // necessarily want to be super aggressive about creating threads.
49 class BASE_EXPORT SequencedWorkerPool { 58 class BASE_EXPORT SequencedWorkerPool
59 : public RefCountedThreadSafe<SequencedWorkerPool> {
50 public: 60 public:
51 // Defines what should happen to a task posted to the worker pool on shutdown. 61 // Defines what should happen to a task posted to the worker pool on shutdown.
52 enum WorkerShutdown { 62 enum WorkerShutdown {
53 // Tasks posted with this mode which have not run at shutdown will be 63 // Tasks posted with this mode which have not run at shutdown will be
54 // deleted rather than run, and any tasks with this mode running at 64 // deleted rather than run, and any tasks with this mode running at
55 // shutdown will be ignored (the worker thread will not be joined). 65 // shutdown will be ignored (the worker thread will not be joined).
56 // 66 //
57 // This option provides a nice way to post stuff you don't want blocking 67 // This option provides a nice way to post stuff you don't want blocking
58 // shutdown. For example, you might be doing a slow DNS lookup and if it's 68 // shutdown. For example, you might be doing a slow DNS lookup and if it's
59 // blocked on the OS, you may not want to stop shutdown, since the result 69 // blocked on the OS, you may not want to stop shutdown, since the result
(...skipping 20 matching lines...) Expand all
80 // Generally, this should be used only for user data, for example, a task 90 // Generally, this should be used only for user data, for example, a task
81 // writing a preference file. 91 // writing a preference file.
82 // 92 //
83 // If a task is posted during shutdown, it will not get run since the 93 // If a task is posted during shutdown, it will not get run since the
84 // workers may already be stopped. In this case, the post operation will 94 // workers may already be stopped. In this case, the post operation will
85 // fail (return false) and the task will be deleted. 95 // fail (return false) and the task will be deleted.
86 BLOCK_SHUTDOWN, 96 BLOCK_SHUTDOWN,
87 }; 97 };
88 98
89 // Opaque identifier that defines sequencing of tasks posted to the worker 99 // Opaque identifier that defines sequencing of tasks posted to the worker
90 // pool. See NewSequenceToken(). 100 // pool.
91 class SequenceToken { 101 class SequenceToken {
92 public: 102 public:
93 explicit SequenceToken() : id_(0) {} 103 SequenceToken() : id_(0) {}
94 ~SequenceToken() {} 104 ~SequenceToken() {}
95 105
96 bool Equals(const SequenceToken& other) const { 106 bool Equals(const SequenceToken& other) const {
97 return id_ == other.id_; 107 return id_ == other.id_;
98 } 108 }
99 109
100 private: 110 private:
101 friend class SequencedWorkerPool; 111 friend class SequencedWorkerPool;
102 112
103 SequenceToken(int id) : id_(id) {} 113 explicit SequenceToken(int id) : id_(id) {}
104 114
105 int id_; 115 int id_;
106 }; 116 };
107 117
108 // Allows tests to perform certain actions. 118 // Allows tests to perform certain actions.
109 class TestingObserver { 119 class TestingObserver {
110 public: 120 public:
111 virtual ~TestingObserver() {} 121 virtual ~TestingObserver() {}
112 virtual void WillWaitForShutdown() = 0; 122 virtual void WillWaitForShutdown() = 0;
113 }; 123 };
114 124
115 // Pass the maximum number of threads (they will be lazily created as needed) 125 // Pass the maximum number of threads (they will be lazily created as needed)
116 // and a prefix for the thread name to ad in debugging. 126 // and a prefix for the thread name to ad in debugging.
117 SequencedWorkerPool(size_t max_threads, 127 SequencedWorkerPool(size_t max_threads,
118 const std::string& thread_name_prefix); 128 const std::string& thread_name_prefix);
119 ~SequencedWorkerPool();
120 129
121 // Returns a unique token that can be used to sequence tasks posted to 130 // Returns a unique token that can be used to sequence tasks posted to
122 // PostSequencedWorkerTask(). Valid tokens are alwys nonzero. 131 // PostSequencedWorkerTask(). Valid tokens are alwys nonzero.
123 SequenceToken GetSequenceToken(); 132 SequenceToken GetSequenceToken();
124 133
125 // Returns the sequence token associated with the given name. Calling this 134 // Returns the sequence token associated with the given name. Calling this
126 // function multiple times with the same string will always produce the 135 // function multiple times with the same string will always produce the
127 // same sequence token. If the name has not been used before, a new token 136 // same sequence token. If the name has not been used before, a new token
128 // will be created. 137 // will be created.
129 SequenceToken GetNamedSequenceToken(const std::string& name); 138 SequenceToken GetNamedSequenceToken(const std::string& name);
(...skipping 12 matching lines...) Expand all
142 // cause nondeterministic crashes because the task could be keeping some 151 // cause nondeterministic crashes because the task could be keeping some
143 // objects alive which do work in their destructor, which could voilate the 152 // objects alive which do work in their destructor, which could voilate the
144 // assumptions of the running task. 153 // assumptions of the running task.
145 // 154 //
146 // The task will be guaranteed to run to completion before shutdown 155 // The task will be guaranteed to run to completion before shutdown
147 // (BLOCK_SHUTDOWN semantics). 156 // (BLOCK_SHUTDOWN semantics).
148 // 157 //
149 // Returns true if the task was posted successfully. This may fail during 158 // Returns true if the task was posted successfully. This may fail during
150 // shutdown regardless of the specified ShutdownBehavior. 159 // shutdown regardless of the specified ShutdownBehavior.
151 bool PostWorkerTask(const tracked_objects::Location& from_here, 160 bool PostWorkerTask(const tracked_objects::Location& from_here,
152 const base::Closure& task); 161 const Closure& task);
153 162
154 // Same as PostWorkerTask but allows specification of the shutdown behavior. 163 // Same as PostWorkerTask but allows specification of the shutdown behavior.
155 bool PostWorkerTaskWithShutdownBehavior( 164 bool PostWorkerTaskWithShutdownBehavior(
156 const tracked_objects::Location& from_here, 165 const tracked_objects::Location& from_here,
157 const base::Closure& task, 166 const Closure& task,
158 WorkerShutdown shutdown_behavior); 167 WorkerShutdown shutdown_behavior);
159 168
160 // Like PostWorkerTask above, but provides sequencing semantics. This means 169 // Like PostWorkerTask above, but provides sequencing semantics. This means
161 // that tasks posted with the same sequence token (see GetSequenceToken()) 170 // that tasks posted with the same sequence token (see GetSequenceToken())
162 // are guaranteed to execute in order. This is useful in cases where you're 171 // are guaranteed to execute in order. This is useful in cases where you're
163 // doing operations that may depend on previous ones, like appending to a 172 // doing operations that may depend on previous ones, like appending to a
164 // file. 173 // file.
165 // 174 //
166 // The task will be guaranteed to run to completion before shutdown 175 // The task will be guaranteed to run to completion before shutdown
167 // (BLOCK_SHUTDOWN semantics). 176 // (BLOCK_SHUTDOWN semantics).
168 // 177 //
169 // Returns true if the task was posted successfully. This may fail during 178 // Returns true if the task was posted successfully. This may fail during
170 // shutdown regardless of the specified ShutdownBehavior. 179 // shutdown regardless of the specified ShutdownBehavior.
171 bool PostSequencedWorkerTask(SequenceToken sequence_token, 180 bool PostSequencedWorkerTask(SequenceToken sequence_token,
172 const tracked_objects::Location& from_here, 181 const tracked_objects::Location& from_here,
173 const base::Closure& task); 182 const Closure& task);
174 183
175 // Like PostSequencedWorkerTask above, but allows you to specify a named 184 // Like PostSequencedWorkerTask above, but allows you to specify a named
176 // token, which saves an extra call to GetNamedSequenceToken. 185 // token, which saves an extra call to GetNamedSequenceToken.
177 bool PostNamedSequencedWorkerTask(const std::string& token_name, 186 bool PostNamedSequencedWorkerTask(const std::string& token_name,
178 const tracked_objects::Location& from_here, 187 const tracked_objects::Location& from_here,
179 const base::Closure& task); 188 const Closure& task);
180 189
181 // Same as PostSequencedWorkerTask but allows specification of the shutdown 190 // Same as PostSequencedWorkerTask but allows specification of the shutdown
182 // behavior. 191 // behavior.
183 bool PostSequencedWorkerTaskWithShutdownBehavior( 192 bool PostSequencedWorkerTaskWithShutdownBehavior(
184 SequenceToken sequence_token, 193 SequenceToken sequence_token,
185 const tracked_objects::Location& from_here, 194 const tracked_objects::Location& from_here,
186 const base::Closure& task, 195 const Closure& task,
187 WorkerShutdown shutdown_behavior); 196 WorkerShutdown shutdown_behavior);
188 197
189 // Blocks until all pending tasks are complete. This should only be called in 198 // Blocks until all pending tasks are complete. This should only be called in
190 // unit tests when you want to validate something that should have happened. 199 // unit tests when you want to validate something that should have happened.
191 // 200 //
192 // Note that calling this will not prevent other threads from posting work to 201 // Note that calling this will not prevent other threads from posting work to
193 // the queue while the calling thread is waiting on Flush(). In this case, 202 // the queue while the calling thread is waiting on Flush(). In this case,
194 // Flush will return only when there's no more work in the queue. Normally, 203 // Flush will return only when there's no more work in the queue. Normally,
195 // this doesn't come up sine in a test, all the work is being posted from 204 // this doesn't come up sine in a test, all the work is being posted from
196 // the main thread. 205 // the main thread.
197 void FlushForTesting(); 206 void FlushForTesting();
198 207
199 // Implements the worker pool shutdown. This should be called during app 208 // Implements the worker pool shutdown. This should be called during app
200 // shutdown, and will discard/join with appropriate tasks before returning. 209 // shutdown, and will discard/join with appropriate tasks before returning.
201 // After this call, subsequent calls to post tasks will fail. 210 // After this call, subsequent calls to post tasks will fail.
202 void Shutdown(); 211 void Shutdown();
203 212
204 // Called by tests to set the testing observer. This is NULL by default 213 // Called by tests to set the testing observer. This is NULL by default
205 // and ownership of the pointer is kept with the caller. 214 // and ownership of the pointer is kept with the caller.
206 void SetTestingObserver(TestingObserver* observer); 215 void SetTestingObserver(TestingObserver* observer);
207 216
208 private: 217 private:
209 class Inner; 218 friend class RefCountedThreadSafe<SequencedWorkerPool>;
210 class Worker; 219 class Worker;
211 220
212 friend class Inner; 221 struct SequencedTask {
213 friend class Worker; 222 SequencedTask();
223 ~SequencedTask();
214 224
215 scoped_refptr<Inner> inner_; 225 int sequence_token_id;
226 WorkerShutdown shutdown_behavior;
227 tracked_objects::Location location;
228 Closure task;
229 };
230
231 ~SequencedWorkerPool();
232
233 // This function accepts a name and an ID. If the name is null, the
234 // token ID is used. This allows us to implement the optional name lookup
235 // from a single function without having to enter the lock a separate time.
236 bool PostTaskHelper(const std::string* optional_token_name,
237 SequenceToken sequence_token,
238 WorkerShutdown shutdown_behavior,
239 const tracked_objects::Location& from_here,
240 const Closure& task);
241
242 // Runs the worker loop on the background thread.
243 void ThreadLoop(Worker* this_worker);
244
245 // Called from within the lock, this converts the given token name into a
246 // token ID, creating a new one if necessary.
247 int LockedGetNamedTokenID(const std::string& name);
248
249 // The calling code should clear the given delete_these_oustide_lock
250 // vector the next time the lock is released. See the implementation for
251 // a more detailed description.
252 bool GetWork(SequencedTask* task,
253 std::vector<Closure>* delete_these_outside_lock);
254
255 // Peforms init and cleanup around running the given task. WillRun...
256 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
257 // The calling code should call FinishStartingAdditionalThread once the
258 // lock is released if the return values is nonzero.
259 int WillRunWorkerTask(const SequencedTask& task);
260 void DidRunWorkerTask(const SequencedTask& task);
261
262 // Returns true if there are no threads currently running the given
263 // sequence token.
264 bool IsSequenceTokenRunnable(int sequence_token_id) const;
265
266 // Checks if all threads are busy and the addition of one more could run an
267 // additional task waiting in the queue. This must be called from within
268 // the lock.
269 //
270 // If another thread is helpful, this will mark the thread as being in the
271 // process of starting and returns the index of the new thread which will be
272 // 0 or more. The caller should then call FinishStartingAdditionalThread to
273 // complete initialization once the lock is released.
274 //
275 // If another thread is not necessary, returne 0;
276 //
277 // See the implementedion for more.
278 int PrepareToStartAdditionalThreadIfHelpful();
279
280 // The second part of thread creation after
281 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
282 // generated. This actually creates the thread and should be called outside
283 // the lock to avoid blocking important work starting a thread in the lock.
284 void FinishStartingAdditionalThread(int thread_number);
285
286 // Checks whether there is work left that's blocking shutdown. Must be
287 // called inside the lock.
288 bool CanShutdown() const;
289
290 // The last sequence number used. Managed by GetSequenceToken, since this
291 // only does threadsafe increment operations, you do not need to hold the
292 // lock.
293 volatile subtle::Atomic32 last_sequence_number_;
294
295 // This lock protects |everything in this class|. Do not read or modify
296 // anything without holding this lock. Do not block while holding this
297 // lock.
298 Lock lock_;
299
300 // Condition variable used to wake up worker threads when a task is runnable.
301 ConditionVariable cond_var_;
302
303 // The maximum number of worker threads we'll create.
304 size_t max_threads_;
305
306 std::string thread_name_prefix_;
307
308 // Associates all known sequence token names with their IDs.
309 std::map<std::string, int> named_sequence_tokens_;
310
311 // Owning pointers to all threads we've created so far. Since we lazily
312 // create threads, this may be less than max_threads_ and will be initially
313 // empty.
314 std::vector<linked_ptr<Worker> > threads_;
315
316 // Set to true when we're in the process of creating another thread.
317 // See PrepareToStartAdditionalThreadIfHelpful for more.
318 bool thread_being_created_;
319
320 // Number of threads currently waiting for work.
321 size_t waiting_thread_count_;
322
323 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
324 // flag set.
325 size_t blocking_shutdown_thread_count_;
326
327 // In-order list of all pending tasks. These are tasks waiting for a thread
328 // to run on or that are blocked on a previous task in their sequence.
329 //
330 // We maintain the pending_task_count_ separately for metrics because
331 // list.size() can be linear time.
332 std::list<SequencedTask> pending_tasks_;
333 size_t pending_task_count_;
334
335 // Number of tasks in the pending_tasks_ list that are marked as blocking
336 // shutdown.
337 size_t blocking_shutdown_pending_task_count_;
338
339 // Lists all sequence tokens currently executing.
340 std::set<int> current_sequences_;
341
342 // Set when Shutdown is called and no further tasks should be
343 // allowed, though we may still be running existing tasks.
344 bool shutdown_called_;
345
346 TestingObserver* testing_observer_;
216 347
217 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); 348 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
218 }; 349 };
219 350
220 } // namespace base 351 } // namespace base
221 352
222 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 353 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
OLDNEW
« no previous file with comments | « no previous file | base/threading/sequenced_worker_pool.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698