Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Side by Side Diff: content/common/sequenced_worker_pool.cc

Issue 8416019: Add a sequenced worker pool (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: '' Created 9 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/common/sequenced_worker_pool.h"
6
7 #include <deque>
8 #include <set>
9
10 #include "base/atomicops.h"
11 #include "base/bind.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/metrics/histogram.h"
14 #include "base/threading/thread.h"
15 #include "base/stringprintf.h"
16 #include "base/synchronization/condition_variable.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "base/threading/simple_thread.h"
19
20 namespace {
21
22 struct SequencedTask {
23 int sequence_token_id;
24 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
25 tracked_objects::Location location;
26 base::Closure task;
27 };
28
29 } // namespace
30
31 // Worker ---------------------------------------------------------------------
32
33 class SequencedWorkerPool::Worker : public base::SimpleThread {
34 public:
35 Worker(SequencedWorkerPool::Inner* inner, int thread_number);
36 ~Worker();
37
38 // SimpleThread implementation. This actually runs the background thread.
39 virtual void Run();
40
41 private:
42 SequencedWorkerPool::Inner* inner_;
43 SequencedWorkerPool::WorkerShutdown current_shutdown_mode_;
44
45 DISALLOW_COPY_AND_ASSIGN(Worker);
46 };
47
48
49 // Inner ----------------------------------------------------------------------
50
51 class SequencedWorkerPool::Inner
52 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> {
53 public:
54 Inner(size_t max_threads);
55 virtual ~Inner();
56
57 // Backends for SequenceWorkerPool.
58 SequenceToken GetSequenceToken();
59 SequenceToken GetNamedSequenceToken(const std::string& name);
60 bool PostTask(int sequence_token_id,
61 SequencedWorkerPool::WorkerShutdown shutdown_behavior,
62 const tracked_objects::Location& from_here,
63 const base::Closure& task);
64 void Shutdown();
65 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer);
66
67 // Runs the worker loop on the background thread.
68 void ThreadLoop(Worker* this_worker);
69
70 private:
71 bool GetWork(SequencedTask* task);
72
73 void WillRunWorkerTask(Worker* worker, const SequencedTask& task);
74 void DidRunWorkerTask(Worker* worker, const SequencedTask& task);
75
76 // Returns true if there are no threads currently running the given
77 // sequence token.
78 bool IsSequenceTokenRunnable(int sequence_token_id) const;
79
80 // Creates an additional worker thread if all threads are busy and the
81 // addition of one more could run an additional task waiting in the queue.
82 // Returns true if a thread was created.
83 bool StartAdditionalThreadIfHelpful();
84
85 // The last sequence number used. Managed by GetSequenceToken, since this
86 // only does threadsafe increment operations, you do not need to hold the
87 // lock.
88 volatile base::subtle::Atomic32 last_sequence_number_;
89
90 // This lock protects |everything in this class|. Do not read or modify
91 // anything without holding this lock. Do not block while holding this
92 // lock.
93 base::Lock lock_;
94
95 // Condition variable used to wake up worker threads when a task is runnable.
96 base::ConditionVariable cond_var_;
97
98 // The maximum number of worker threads we'll create.
99 size_t max_threads_;
100
101 // Associates all known sequence token names with their IDs.
102 std::map<std::string, int> named_sequence_tokens_;
103
104 // Owning pointers to all threads we've created so far. Since we lazily
105 // create threads, this may be less than max_threads_ and will be initially
106 // empty.
107 std::vector<linked_ptr<Worker> > threads_;
jar (doing other things) 2011/11/29 18:44:15 You don't need to have a list of threads. All you
brettw 2011/12/01 02:44:57 This won't work. I need the actual SimpleThread* t
jar (doing other things) 2011/12/09 19:01:48 Hmm.... this is a confusing requirement. What is
108
109 // Number of threads currently running tasks.
110 size_t running_thread_count_;
111
112 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
113 // flag set.
114 size_t blocking_shutdown_thread_count_;
115
116 // In-order list of all pending tasks. These are tasks waiting for a thread
117 // to run on or that are blocked on a previous task in their sequence.
118 //
119 // We maintain the pending_task_count_ separately for metrics because
120 // list.size() can be linear time.
121 std::list<SequencedTask> pending_tasks_;
122 size_t pending_task_count_;
123
124 // Number of tasks in the pending_tasks_ list that are marked as blocking
125 // shutdown.
126 size_t blocking_shutdown_pending_task_count_;
127
128 // Lists all sequence tokens currently executing.
129 std::set<int> current_sequences_;
130
131 // Set when the app is terminating and no further tasks should be allowed,
132 // though we may still be running existing tasks.
133 bool terminating_;
134
135 // Set when the worker pool is being destroyed, and all worker threads should
136 // exit.
137 bool exit_workers_;
138
139 SequencedWorkerPool::TestingObserver* testing_observer_;
140
141 // Created lazily when terminating_ is set and there are pending tasks, this
142 // is signaled by ScheduleWork whel all blocking tasks have completed.
143 scoped_ptr<base::WaitableEvent> shutdown_complete_;
144 };
145
146 SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner,
147 int thread_number)
148 : base::SimpleThread(
149 StringPrintf("BrowserWorker%d", thread_number).c_str()),
150 inner_(inner),
151 current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) {
152 Start();
153 }
154
155 SequencedWorkerPool::Worker::~Worker() {
156 }
157
158 void SequencedWorkerPool::Worker::Run() {
159 // Just jump back to the Inner object to run the thread, since it has all the
160 // tracking information and queues. It might be more natural to implement
161 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
162 // having these worker objects at all, but that method lacks the ability to
163 // send thread-specific information easily to the thread loop.
164 inner_->ThreadLoop(this);
165 }
166
167 SequencedWorkerPool::Inner::Inner(size_t max_threads)
168 : last_sequence_number_(0),
169 lock_(),
170 cond_var_(&lock_),
171 max_threads_(max_threads),
172 running_thread_count_(0),
173 blocking_shutdown_thread_count_(0),
174 pending_task_count_(0),
175 blocking_shutdown_pending_task_count_(0),
176 terminating_(false),
177 exit_workers_(false) {
178 }
179
180 SequencedWorkerPool::Inner::~Inner() {
181 {
182 base::AutoLock lock(lock_);
183
184 // Tell all workers to exit. The Worker destructor will actually join with
185 // each corresponding thread when the object is torn down.
186 exit_workers_ = true;
187 cond_var_.Broadcast();
jar (doing other things) 2011/11/29 18:44:15 This works... but assuming each thread will, if it
brettw 2011/12/01 02:44:57 Okay, done.
188 }
189
190 // Need to explicitly join with the threads before they're destroyed or else
191 // they will be running when our object is half torn down.
192 for (size_t i = 0; i < threads_.size(); i++)
193 threads_[i]->Join();
194 threads_.clear();
195 }
196
197 SequencedWorkerPool::SequenceToken
198 SequencedWorkerPool::Inner::GetSequenceToken() {
199 base::subtle::Atomic32 result =
200 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
201 return SequenceToken(static_cast<int>(result));
202 }
203
204 SequencedWorkerPool::SequenceToken
205 SequencedWorkerPool::Inner::GetNamedSequenceToken(
206 const std::string& name) {
207 base::AutoLock lock(lock_);
208 std::map<std::string, int>::const_iterator found =
209 named_sequence_tokens_.find(name);
210 if (found != named_sequence_tokens_.end())
211 return SequenceToken(found->second); // Got an existing one.
212
213 // Create a new one for this name.
214 SequenceToken result = GetSequenceToken();
215 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
216 return result;
217 }
218
219 bool SequencedWorkerPool::Inner::PostTask(
220 int sequence_token_id,
221 SequencedWorkerPool::WorkerShutdown shutdown_behavior,
222 const tracked_objects::Location& from_here,
223 const base::Closure& task) {
224 base::AutoLock lock(lock_);
225
226 if (terminating_)
227 return false;
228
229 SequencedTask sequenced;
230 sequenced.sequence_token_id = sequence_token_id;
231 sequenced.shutdown_behavior = shutdown_behavior;
232 sequenced.location = from_here;
233 sequenced.task = task;
234
235 pending_tasks_.push_back(sequenced);
236 pending_task_count_++;
237 if (shutdown_behavior == BLOCK_SHUTDOWN)
238 blocking_shutdown_pending_task_count_++;
239
240 if (!StartAdditionalThreadIfHelpful()) {
241 // Wake up a worker thread to run this.
242 cond_var_.Signal();
jar (doing other things) 2011/11/29 18:44:15 nit: Nicer to signal when after you release the lo
243 }
244
245 return true;
246 }
247
248 void SequencedWorkerPool::Inner::Shutdown() {
249 // Mark us as terminated and go through and drop all tasks that aren't
250 // required to run on shutdown. Since no new tasks will get posted once the
251 // terminated flag is set, this ensures that all remaining tasks are required
252 // for shutdown whenever the termianted_ flag is set.
253 {
254 base::AutoLock lock(lock_);
255 DCHECK(!terminating_);
256 terminating_ = true;
257
258 if (blocking_shutdown_thread_count_ == 0 &&
259 blocking_shutdown_pending_task_count_ == 0) {
260 // There are no pending or running tasks blocking shutdown, we're done.
jar (doing other things) 2011/11/29 18:44:15 This is perfect. You've effectively checked the c
brettw 2011/12/01 02:44:57 I did this but couldn't write the loop here to kee
261 return;
262 }
263
264 // Need to wait for some tasks, create the event.
265 DCHECK(!shutdown_complete_.get());
266 shutdown_complete_.reset(new base::WaitableEvent(false, false));
267 }
268
269 // If we get here, we know we're either waiting on a blocking task that's
270 // currently running, waiting on a blocking task that hasn't been scheduled
271 // yet, or both. Block on the "queue empty" event to know when all tasks are
272 // complete. This must be done outside the lock.
273 if (testing_observer_)
274 testing_observer_->WillWaitForShutdown();
275
276 base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now();
277 shutdown_complete_->Wait();
278 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
279 base::TimeTicks::Now() - shutdown_wait_begin);
280 }
281
282 void SequencedWorkerPool::Inner::SetTestingObserver(
283 SequencedWorkerPool::TestingObserver* observer) {
284 base::AutoLock lock(lock_);
285 testing_observer_ = observer;
286 }
287
288 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
289 base::AutoLock lock(lock_);
290 while (!exit_workers_) {
291 SequencedTask task;
292 if (GetWork(&task)) {
293 WillRunWorkerTask(this_worker, task); // Must be done inside the lock.
294 {
295 base::AutoUnlock unlock(lock_);
296 task.task.Run();
297 }
298 DidRunWorkerTask(this_worker, task); // Must be done inside the lock.
299 } else {
300 cond_var_.Wait();
301 }
302 }
303 }
304
305 bool SequencedWorkerPool::Inner::GetWork(SequencedTask* task) {
306 lock_.AssertAcquired();
307
308 DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
309 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
310 pending_task_count_);
311
312 // Find the next task with a sequence token that's not currently in use.
313 // If the token is in use, that means another thread is running something
314 // in that sequence, and we can't run it without going out-of-order.
315 //
316 // This algorithm is simple and fair, but inefficient in some cases. For
317 // example, say somebody schedules 1000 slow tasks with the same sequence
318 // number. We'll have to go through all those tasks each time we feel like
319 // there might be work to schedule. If this proves to be a problem, we
320 // should make this more efficient.
321 //
322 // One possible enhancement would be to keep a map from sequence ID to a
323 // list of pending but currently blocked SequencedTasks for that ID.
324 // When a worker finishes a task of one sequence token, it can pick up the
325 // next one from that token right away.
326 //
327 // This may lead to starvation if there are sufficient numbers of sequences
328 // in use. To alleviate this, we could add an incrementing priority counter
329 // to each SequencedTask. Then maintain a priority_queue of all runnable
330 // tasks, sorted by priority counter. When a sequenced task is completed
331 // we would pop the head element off of that tasks pending list and add it
332 // to the priority queue. Then we would run the first item in the priority
333 // queue.
334 bool found_task = false;
335 int unrunnable_tasks = 0;
336 std::list<SequencedTask>::iterator i = pending_tasks_.begin();
337 while (i != pending_tasks_.end()) {
338 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
339 if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
340 // We're shutting down and the task we just found isn't blocking
341 // shutdown. Delete it and get more work.
342 //
343 // Note that we do not want to delete unrunnable tasks. Deleting a task
344 // can have side effects (like freeing some objects) and deleting a
345 // task that's supposed to run after one that's currently running could
346 // cause an obscure crash.
347 i = pending_tasks_.erase(i);
jar (doing other things) 2011/11/29 18:44:15 So you decided to leak the less important tasks?
brettw 2011/12/01 02:44:57 This is the worker thread, but I forgot to do it o
348 pending_task_count_--;
jar (doing other things) 2011/11/29 18:44:15 if you add a "continue;" here, you won't need ane
349 } else {
350 // Found a runnable task.
351 *task = *i;
352 i = pending_tasks_.erase(i);
353 pending_task_count_--;
354 if (task->shutdown_behavior == BLOCK_SHUTDOWN)
355 blocking_shutdown_pending_task_count_--;
jar (doing other things) 2011/11/29 18:44:15 Lines 353-355 are critical accounting lines, and k
brettw 2011/12/01 02:44:57 It does not seem clear to me that there's an obvio
356
357 found_task = true;
358 break;
359 }
360 } else {
361 unrunnable_tasks++;
362 ++i;
jar (doing other things) 2011/11/29 18:44:15 This was harder to read than needed, because this
363 }
364 }
365
366 // Track the number of tasks we had to skip over to see if we should be
367 // making this more efficient. If this number ever becomes large or is
368 // frequently "some", we should consider the optimization above.
369 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
370 unrunnable_tasks);
371 return found_task;
372 }
373
374 void SequencedWorkerPool::Inner::WillRunWorkerTask(Worker* worker,
375 const SequencedTask& task) {
376 lock_.AssertAcquired();
377
378 // Mark the task's sequence number as in use.
379 if (task.sequence_token_id)
380 current_sequences_.insert(task.sequence_token_id);
381
382 running_thread_count_++;
383
384 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN)
385 blocking_shutdown_thread_count_++;
386
387 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
388 // creates a new thread if there is no free one, there is a race when posting
389 // tasks that many tasks could have been posted before a thread started
390 // running them, so only one thread would have been created. So we also check
391 // for more threads after removing our task from the queue, which also has
392 // the nice side effect of creating the workers from background threads
393 // rather than the main thread of the app.
394 //
395 // If another thread wasn't created, we want to wake up an existing thread
396 // if there is one waiting to pick up the next task.
397 if (!StartAdditionalThreadIfHelpful() && !pending_tasks_.empty() &&
398 running_thread_count_ < threads_.size())
399 cond_var_.Signal();
400 }
401
402 void SequencedWorkerPool::Inner::DidRunWorkerTask(Worker* worker,
403 const SequencedTask& task) {
404 lock_.AssertAcquired();
405
406 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) {
407 DCHECK(blocking_shutdown_thread_count_ > 0);
408 blocking_shutdown_thread_count_--;
409 }
410
411 if (task.sequence_token_id)
412 current_sequences_.erase(task.sequence_token_id);
413
414 running_thread_count_--;
415
416 if (terminating_ && shutdown_complete_.get() &&
417 blocking_shutdown_pending_task_count_ == 0 &&
418 blocking_shutdown_thread_count_ == 0) {
419 // No more blocking shutdown work.
420 shutdown_complete_->Signal();
421 }
422 }
423
424 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
425 int sequence_token_id) const {
426 lock_.AssertAcquired();
427 return !sequence_token_id ||
428 current_sequences_.find(sequence_token_id) ==
429 current_sequences_.end();
430 }
431
432 bool SequencedWorkerPool::Inner::StartAdditionalThreadIfHelpful() {
433 if (threads_.size() < max_threads_ &&
434 running_thread_count_ == threads_.size()) {
435 // We could use an additional thread if there's work to be done.
436 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin();
jar (doing other things) 2011/11/29 18:44:15 I was hoping/expecting that you wouldn't need to s
brettw 2011/12/01 02:44:57 I don't want to re-use GetWork. It's already compl
437 i != pending_tasks_.end(); ++i) {
438 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
439 // Found a runnable task, start a thread.
440 threads_.push_back(linked_ptr<Worker>(
441 new Worker(this, threads_.size())));
442 return true;
443 }
444 }
445 }
446 return false;
447 }
448
449 // SequencedWorkerPool --------------------------------------------------------
450
451 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads)
452 : inner_(new Inner(max_threads)) {
453 }
454
455 SequencedWorkerPool::~SequencedWorkerPool() {
456 }
457
458 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
459 return inner_->GetSequenceToken();
460 }
461
462 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
463 const std::string& name) {
464 return inner_->GetNamedSequenceToken(name);
465 }
466
467 bool SequencedWorkerPool::PostWorkerTask(
468 WorkerShutdown shutdown_behavior,
469 const tracked_objects::Location& from_here,
470 const base::Closure& task) {
471 return inner_->PostTask(0, shutdown_behavior, from_here, task);
472 }
473
474 bool SequencedWorkerPool::PostSequencedWorkerTask(
475 SequenceToken sequence_token,
476 WorkerShutdown shutdown_behavior,
477 const tracked_objects::Location& from_here,
478 const base::Closure& task) {
479 return inner_->PostTask(sequence_token.id_, shutdown_behavior,
480 from_here, task);
481 }
482
483 void SequencedWorkerPool::Shutdown() {
484 inner_->Shutdown();
485 }
486
487 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
488 inner_->SetTestingObserver(observer);
489 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698