OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
86 bool PostTask(const std::string* optional_token_name, | 86 bool PostTask(const std::string* optional_token_name, |
87 SequenceToken sequence_token, | 87 SequenceToken sequence_token, |
88 WorkerShutdown shutdown_behavior, | 88 WorkerShutdown shutdown_behavior, |
89 const tracked_objects::Location& from_here, | 89 const tracked_objects::Location& from_here, |
90 const Closure& task); | 90 const Closure& task); |
91 | 91 |
92 bool RunsTasksOnCurrentThread() const; | 92 bool RunsTasksOnCurrentThread() const; |
93 | 93 |
94 void FlushForTesting(); | 94 void FlushForTesting(); |
95 | 95 |
96 void TriggerSpuriousWorkSignalForTesting(); | |
97 | |
98 int GetWorkSignalCountForTesting() const; | |
99 | |
96 void Shutdown(); | 100 void Shutdown(); |
97 | 101 |
98 void SetTestingObserver(TestingObserver* observer); | 102 void SetTestingObserver(TestingObserver* observer); |
99 | 103 |
100 // Runs the worker loop on the background thread. | 104 // Runs the worker loop on the background thread. |
101 void ThreadLoop(Worker* this_worker); | 105 void ThreadLoop(Worker* this_worker); |
102 | 106 |
103 private: | 107 private: |
108 // Returns whether there are no more pending tasks and all threads | |
109 // are idle. Must be called under lock. | |
110 bool IsIdle() const; | |
111 | |
104 // Called from within the lock, this converts the given token name into a | 112 // Called from within the lock, this converts the given token name into a |
105 // token ID, creating a new one if necessary. | 113 // token ID, creating a new one if necessary. |
106 int LockedGetNamedTokenID(const std::string& name); | 114 int LockedGetNamedTokenID(const std::string& name); |
107 | 115 |
108 // The calling code should clear the given delete_these_oustide_lock | 116 // The calling code should clear the given delete_these_oustide_lock |
109 // vector the next time the lock is released. See the implementation for | 117 // vector the next time the lock is released. See the implementation for |
110 // a more detailed description. | 118 // a more detailed description. |
111 bool GetWork(SequencedTask* task, | 119 bool GetWork(SequencedTask* task, |
112 std::vector<Closure>* delete_these_outside_lock); | 120 std::vector<Closure>* delete_these_outside_lock); |
113 | 121 |
(...skipping 21 matching lines...) Expand all Loading... | |
135 // | 143 // |
136 // See the implementedion for more. | 144 // See the implementedion for more. |
137 int PrepareToStartAdditionalThreadIfHelpful(); | 145 int PrepareToStartAdditionalThreadIfHelpful(); |
138 | 146 |
139 // The second part of thread creation after | 147 // The second part of thread creation after |
140 // PrepareToStartAdditionalThreadIfHelpful with the thread number it | 148 // PrepareToStartAdditionalThreadIfHelpful with the thread number it |
141 // generated. This actually creates the thread and should be called outside | 149 // generated. This actually creates the thread and should be called outside |
142 // the lock to avoid blocking important work starting a thread in the lock. | 150 // the lock to avoid blocking important work starting a thread in the lock. |
143 void FinishStartingAdditionalThread(int thread_number); | 151 void FinishStartingAdditionalThread(int thread_number); |
144 | 152 |
153 // Signal |has_work_| and increment |has_work_signal_count_|. | |
154 void SignalHasWork(); | |
155 | |
145 // Checks whether there is work left that's blocking shutdown. Must be | 156 // Checks whether there is work left that's blocking shutdown. Must be |
146 // called inside the lock. | 157 // called inside the lock. |
147 bool CanShutdown() const; | 158 bool CanShutdown() const; |
148 | 159 |
149 SequencedWorkerPool* const worker_pool_; | 160 SequencedWorkerPool* const worker_pool_; |
150 | 161 |
151 // The last sequence number used. Managed by GetSequenceToken, since this | 162 // The last sequence number used. Managed by GetSequenceToken, since this |
152 // only does threadsafe increment operations, you do not need to hold the | 163 // only does threadsafe increment operations, you do not need to hold the |
153 // lock. | 164 // lock. |
154 volatile subtle::Atomic32 last_sequence_number_; | 165 volatile subtle::Atomic32 last_sequence_number_; |
155 | 166 |
156 // This lock protects |everything in this class|. Do not read or modify | 167 // This lock protects |everything in this class|. Do not read or modify |
157 // anything without holding this lock. Do not block while holding this | 168 // anything without holding this lock. Do not block while holding this |
158 // lock. | 169 // lock. |
159 mutable Lock lock_; | 170 mutable Lock lock_; |
160 | 171 |
161 // Condition variable used to wake up worker threads when a task is runnable. | 172 // Condition variable that is signalled whenever a new task is |
162 ConditionVariable cond_var_; | 173 // posted or when Shutdown() is called. |
174 ConditionVariable has_work_cv_; | |
175 | |
176 // Number of times |has_work_| has been signalled. Used for testing. | |
177 int has_work_signal_count_; | |
178 | |
179 // Condition variable that is signalled whenever IsIdle() goes to | |
180 // true. | |
181 ConditionVariable is_idle_cv_; | |
182 | |
183 // Condition variable that is signalled whwnever CanShutdown() goes | |
184 // to true. | |
185 ConditionVariable can_shutdown_cv_; | |
163 | 186 |
164 // The maximum number of worker threads we'll create. | 187 // The maximum number of worker threads we'll create. |
165 const size_t max_threads_; | 188 const size_t max_threads_; |
166 | 189 |
167 const std::string thread_name_prefix_; | 190 const std::string thread_name_prefix_; |
168 | 191 |
169 // Associates all known sequence token names with their IDs. | 192 // Associates all known sequence token names with their IDs. |
170 std::map<std::string, int> named_sequence_tokens_; | 193 std::map<std::string, int> named_sequence_tokens_; |
171 | 194 |
172 // Owning pointers to all threads we've created so far, indexed by | 195 // Owning pointers to all threads we've created so far, indexed by |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
238 | 261 |
239 // Inner definitions --------------------------------------------------------- | 262 // Inner definitions --------------------------------------------------------- |
240 | 263 |
241 SequencedWorkerPool::Inner::Inner( | 264 SequencedWorkerPool::Inner::Inner( |
242 SequencedWorkerPool* worker_pool, | 265 SequencedWorkerPool* worker_pool, |
243 size_t max_threads, | 266 size_t max_threads, |
244 const std::string& thread_name_prefix) | 267 const std::string& thread_name_prefix) |
245 : worker_pool_(worker_pool), | 268 : worker_pool_(worker_pool), |
246 last_sequence_number_(0), | 269 last_sequence_number_(0), |
247 lock_(), | 270 lock_(), |
248 cond_var_(&lock_), | 271 has_work_cv_(&lock_), |
272 has_work_signal_count_(0), | |
273 is_idle_cv_(&lock_), | |
274 can_shutdown_cv_(&lock_), | |
249 max_threads_(max_threads), | 275 max_threads_(max_threads), |
250 thread_name_prefix_(thread_name_prefix), | 276 thread_name_prefix_(thread_name_prefix), |
251 thread_being_created_(false), | 277 thread_being_created_(false), |
252 waiting_thread_count_(0), | 278 waiting_thread_count_(0), |
253 blocking_shutdown_thread_count_(0), | 279 blocking_shutdown_thread_count_(0), |
254 pending_task_count_(0), | 280 pending_task_count_(0), |
255 blocking_shutdown_pending_task_count_(0), | 281 blocking_shutdown_pending_task_count_(0), |
256 shutdown_called_(false), | 282 shutdown_called_(false), |
257 testing_observer_(NULL) {} | 283 testing_observer_(NULL) {} |
258 | 284 |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
311 blocking_shutdown_pending_task_count_++; | 337 blocking_shutdown_pending_task_count_++; |
312 | 338 |
313 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 339 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
314 } | 340 } |
315 | 341 |
316 // Actually start the additional thread or signal an existing one now that | 342 // Actually start the additional thread or signal an existing one now that |
317 // we're outside the lock. | 343 // we're outside the lock. |
318 if (create_thread_id) | 344 if (create_thread_id) |
319 FinishStartingAdditionalThread(create_thread_id); | 345 FinishStartingAdditionalThread(create_thread_id); |
320 else | 346 else |
321 cond_var_.Signal(); | 347 SignalHasWork(); |
322 | 348 |
323 return true; | 349 return true; |
324 } | 350 } |
325 | 351 |
326 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 352 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
327 AutoLock lock(lock_); | 353 AutoLock lock(lock_); |
328 return ContainsKey(threads_, PlatformThread::CurrentId()); | 354 return ContainsKey(threads_, PlatformThread::CurrentId()); |
329 } | 355 } |
330 | 356 |
331 void SequencedWorkerPool::Inner::FlushForTesting() { | 357 void SequencedWorkerPool::Inner::FlushForTesting() { |
332 { | 358 AutoLock lock(lock_); |
333 AutoLock lock(lock_); | 359 while (!IsIdle()) |
334 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) | 360 is_idle_cv_.Wait(); |
335 cond_var_.Wait(); | 361 } |
336 } | 362 |
337 cond_var_.Signal(); | 363 void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { |
364 SignalHasWork(); | |
365 } | |
366 | |
367 int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const { | |
368 AutoLock lock(lock_); | |
369 return has_work_signal_count_; | |
338 } | 370 } |
339 | 371 |
340 void SequencedWorkerPool::Inner::Shutdown() { | 372 void SequencedWorkerPool::Inner::Shutdown() { |
jar (doing other things)
2012/03/12 17:39:13
IMO, it would be helpful if you commented, or asse
akalin
2012/03/12 18:34:34
Done. (Added comment, and assert is in SWP::Shutdo
| |
341 // Mark us as terminated and go through and drop all tasks that aren't | 373 // Mark us as terminated and go through and drop all tasks that aren't |
342 // required to run on shutdown. Since no new tasks will get posted once the | 374 // required to run on shutdown. Since no new tasks will get posted once the |
343 // terminated flag is set, this ensures that all remaining tasks are required | 375 // terminated flag is set, this ensures that all remaining tasks are required |
344 // for shutdown whenever the termianted_ flag is set. | 376 // for shutdown whenever the termianted_ flag is set. |
345 { | 377 { |
346 AutoLock lock(lock_); | 378 AutoLock lock(lock_); |
347 | 379 |
348 if (shutdown_called_) | 380 if (shutdown_called_) |
349 return; | 381 return; |
350 shutdown_called_ = true; | 382 shutdown_called_ = true; |
351 | 383 |
352 // Tickle the threads. This will wake up a waiting one so it will know that | 384 // Wake up all waiting threads. |
353 // it can exit, which in turn will wake up any other waiting ones. | 385 has_work_cv_.Broadcast(); |
jar (doing other things)
2012/03/12 17:39:13
I liked the original code better, relying on signa
akalin
2012/03/12 18:34:34
Okay, moved back to signal chain.
| |
354 cond_var_.Signal(); | |
355 | 386 |
356 // There are no pending or running tasks blocking shutdown, we're done. | 387 // There are no pending or running tasks blocking shutdown, we're done. |
357 if (CanShutdown()) | 388 if (CanShutdown()) |
358 return; | 389 return; |
359 } | 390 } |
360 | 391 |
361 // If we get here, we know we're either waiting on a blocking task that's | 392 // If we get here, we know we're either waiting on a blocking task that's |
362 // currently running, waiting on a blocking task that hasn't been scheduled | 393 // currently running, waiting on a blocking task that hasn't been scheduled |
363 // yet, or both. Block on the "queue empty" event to know when all tasks are | 394 // yet, or both. Block on the "queue empty" event to know when all tasks are |
jar (doing other things)
2012/03/12 17:39:13
This comment doesn't relate to any code IMO.
In ad
akalin
2012/03/12 18:34:34
Replaced comment (and the one below) with somethin
| |
364 // complete. This must be done outside the lock. | 395 // complete. This must be done outside the lock. |
365 if (testing_observer_) | 396 if (testing_observer_) |
366 testing_observer_->WillWaitForShutdown(); | 397 testing_observer_->WillWaitForShutdown(); |
367 | 398 |
368 TimeTicks shutdown_wait_begin = TimeTicks::Now(); | 399 TimeTicks shutdown_wait_begin = TimeTicks::Now(); |
369 | 400 |
370 // Wait for no more tasks. | 401 // Wait for no more tasks. |
371 { | 402 { |
372 AutoLock lock(lock_); | 403 AutoLock lock(lock_); |
373 while (!CanShutdown()) | 404 while (!CanShutdown()) |
374 cond_var_.Wait(); | 405 can_shutdown_cv_.Wait(); |
375 } | 406 } |
376 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", | 407 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
377 TimeTicks::Now() - shutdown_wait_begin); | 408 TimeTicks::Now() - shutdown_wait_begin); |
378 } | 409 } |
379 | 410 |
380 void SequencedWorkerPool::Inner::SetTestingObserver( | 411 void SequencedWorkerPool::Inner::SetTestingObserver( |
381 TestingObserver* observer) { | 412 TestingObserver* observer) { |
382 AutoLock lock(lock_); | 413 AutoLock lock(lock_); |
383 testing_observer_ = observer; | 414 testing_observer_ = observer; |
384 } | 415 } |
385 | 416 |
386 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 417 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
387 { | 418 AutoLock lock(lock_); |
jar (doing other things)
2012/03/12 17:39:13
nit: You are dead right that this curly is not nee
akalin
2012/03/12 18:34:34
Done. (It ended up being needed anyway when addres
| |
388 AutoLock lock(lock_); | 419 DCHECK(thread_being_created_); |
389 DCHECK(thread_being_created_); | 420 // This can make CanShutdown() go to true. |
390 thread_being_created_ = false; | 421 thread_being_created_ = false; |
391 std::pair<ThreadMap::iterator, bool> result = | 422 if (CanShutdown()) |
jar (doing other things)
2012/03/12 17:39:13
I don't think anyone cares about CanShutdown() unl
akalin
2012/03/12 18:34:34
Done. Moved can_shutdown_cv_ signal to end of blo
| |
392 threads_.insert( | 423 can_shutdown_cv_.Signal(); |
393 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | 424 std::pair<ThreadMap::iterator, bool> result = |
394 DCHECK(result.second); | 425 threads_.insert( |
426 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); | |
427 DCHECK(result.second); | |
395 | 428 |
396 while (true) { | 429 while (true) { |
397 // See GetWork for what delete_these_outside_lock is doing. | 430 // See GetWork for what delete_these_outside_lock is doing. |
398 SequencedTask task; | 431 SequencedTask task; |
399 std::vector<Closure> delete_these_outside_lock; | 432 std::vector<Closure> delete_these_outside_lock; |
400 if (GetWork(&task, &delete_these_outside_lock)) { | 433 if (GetWork(&task, &delete_these_outside_lock)) { |
401 int new_thread_id = WillRunWorkerTask(task); | 434 int new_thread_id = WillRunWorkerTask(task); |
402 { | 435 { |
403 AutoUnlock unlock(lock_); | 436 AutoUnlock unlock(lock_); |
404 cond_var_.Signal(); | 437 delete_these_outside_lock.clear(); |
jar (doing other things)
2012/03/12 17:39:13
This signal is basically asking that anyone else t
akalin
2012/03/12 18:34:34
Hmm. If every task posted causes a signal, then w
jar (doing other things)
2012/03/12 19:42:36
I *think* you are correct. This is a slightly (IM
akalin
2012/03/12 20:42:15
I see. Okay, I just added a comment as to why it'
| |
405 delete_these_outside_lock.clear(); | |
406 | 438 |
407 // Complete thread creation outside the lock if necessary. | 439 // Complete thread creation outside the lock if necessary. |
408 if (new_thread_id) | 440 if (new_thread_id) |
409 FinishStartingAdditionalThread(new_thread_id); | 441 FinishStartingAdditionalThread(new_thread_id); |
410 | 442 |
411 task.task.Run(); | 443 task.task.Run(); |
412 | 444 |
413 // Make sure our task is erased outside the lock for the same reason | 445 // Make sure our task is erased outside the lock for the same reason |
414 // we do this with delete_these_oustide_lock. | 446 // we do this with delete_these_oustide_lock. |
415 task.task = Closure(); | 447 task.task = Closure(); |
416 } | |
417 DidRunWorkerTask(task); // Must be done inside the lock. | |
418 } else { | |
419 // When we're terminating and there's no more work, we can | |
420 // shut down. You can't get more tasks posted once | |
421 // shutdown_called_ is set. There may be some tasks stuck | |
422 // behind running ones with the same sequence token, but | |
423 // additional threads won't help this case. | |
424 if (shutdown_called_) | |
425 break; | |
426 waiting_thread_count_++; | |
427 cond_var_.Signal(); // For Flush() that may be waiting on the | |
428 // waiting thread count to go up. | |
429 cond_var_.Wait(); | |
430 waiting_thread_count_--; | |
431 } | 448 } |
449 DidRunWorkerTask(task); // Must be done inside the lock. | |
450 } else { | |
451 // When we're terminating and there's no more work, we can | |
452 // shut down. You can't get more tasks posted once | |
453 // shutdown_called_ is set. There may be some tasks stuck | |
454 // behind running ones with the same sequence token, but | |
455 // additional threads won't help this case. | |
456 if (shutdown_called_) | |
457 break; | |
458 waiting_thread_count_++; | |
459 // This is the only time that IsIdle() can go to true. | |
460 if (IsIdle()) | |
461 is_idle_cv_.Signal(); | |
462 has_work_cv_.Wait(); | |
463 waiting_thread_count_--; | |
432 } | 464 } |
433 } | 465 } |
466 } | |
434 | 467 |
435 // We noticed we should exit. Wake up the next worker so it knows it should | 468 bool SequencedWorkerPool::Inner::IsIdle() const { |
436 // exit as well (because the Shutdown() code only signals once). | 469 lock_.AssertAcquired(); |
437 cond_var_.Signal(); | 470 return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); |
jar (doing other things)
2012/03/12 17:39:13
This is another fairly critical Signal() call. Thi
akalin
2012/03/12 18:34:34
This was deleted because of the change to using Br
| |
438 } | 471 } |
439 | 472 |
440 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( | 473 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
441 const std::string& name) { | 474 const std::string& name) { |
442 lock_.AssertAcquired(); | 475 lock_.AssertAcquired(); |
443 DCHECK(!name.empty()); | 476 DCHECK(!name.empty()); |
444 | 477 |
445 std::map<std::string, int>::const_iterator found = | 478 std::map<std::string, int>::const_iterator found = |
446 named_sequence_tokens_.find(name); | 479 named_sequence_tokens_.find(name); |
447 if (found != named_sequence_tokens_.end()) | 480 if (found != named_sequence_tokens_.end()) |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
511 // vector they passed to us once the lock is exited to make this | 544 // vector they passed to us once the lock is exited to make this |
512 // happen. | 545 // happen. |
513 delete_these_outside_lock->push_back(i->task); | 546 delete_these_outside_lock->push_back(i->task); |
514 i = pending_tasks_.erase(i); | 547 i = pending_tasks_.erase(i); |
515 pending_task_count_--; | 548 pending_task_count_--; |
516 } else { | 549 } else { |
517 // Found a runnable task. | 550 // Found a runnable task. |
518 *task = *i; | 551 *task = *i; |
519 i = pending_tasks_.erase(i); | 552 i = pending_tasks_.erase(i); |
520 pending_task_count_--; | 553 pending_task_count_--; |
521 if (task->shutdown_behavior == BLOCK_SHUTDOWN) | 554 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
555 // This can make CanShutdown() go to true. | |
522 blocking_shutdown_pending_task_count_--; | 556 blocking_shutdown_pending_task_count_--; |
557 if (CanShutdown()) | |
558 can_shutdown_cv_.Signal(); | |
jar (doing other things)
2012/03/12 17:39:13
Again, I don't like seeing special case code inter
akalin
2012/03/12 18:34:34
Done (see above)
| |
559 } | |
523 | 560 |
524 found_task = true; | 561 found_task = true; |
525 break; | 562 break; |
526 } | 563 } |
527 } | 564 } |
528 | 565 |
529 // Track the number of tasks we had to skip over to see if we should be | 566 // Track the number of tasks we had to skip over to see if we should be |
530 // making this more efficient. If this number ever becomes large or is | 567 // making this more efficient. If this number ever becomes large or is |
531 // frequently "some", we should consider the optimization above. | 568 // frequently "some", we should consider the optimization above. |
532 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", | 569 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", |
(...skipping 27 matching lines...) Expand all Loading... | |
560 // second thread (since we only create one at a time) will be blocked by | 597 // second thread (since we only create one at a time) will be blocked by |
561 // the execution of the first task, which could be arbitrarily long. | 598 // the execution of the first task, which could be arbitrarily long. |
562 return PrepareToStartAdditionalThreadIfHelpful(); | 599 return PrepareToStartAdditionalThreadIfHelpful(); |
563 } | 600 } |
564 | 601 |
565 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { | 602 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
566 lock_.AssertAcquired(); | 603 lock_.AssertAcquired(); |
567 | 604 |
568 if (task.shutdown_behavior == BLOCK_SHUTDOWN) { | 605 if (task.shutdown_behavior == BLOCK_SHUTDOWN) { |
569 DCHECK_GT(blocking_shutdown_thread_count_, 0u); | 606 DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
607 // This can make CanShutdown() go to true. | |
570 blocking_shutdown_thread_count_--; | 608 blocking_shutdown_thread_count_--; |
609 if (CanShutdown()) | |
610 can_shutdown_cv_.Signal(); | |
jar (doing other things)
2012/03/12 17:39:13
Again, please don't add special case code for this
akalin
2012/03/12 18:34:34
Done (see above)
| |
571 } | 611 } |
572 | 612 |
573 if (task.sequence_token_id) | 613 if (task.sequence_token_id) |
574 current_sequences_.erase(task.sequence_token_id); | 614 current_sequences_.erase(task.sequence_token_id); |
575 } | 615 } |
576 | 616 |
577 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( | 617 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
578 int sequence_token_id) const { | 618 int sequence_token_id) const { |
579 lock_.AssertAcquired(); | 619 lock_.AssertAcquired(); |
580 return !sequence_token_id || | 620 return !sequence_token_id || |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
633 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( | 673 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
634 int thread_number) { | 674 int thread_number) { |
635 // Called outside of the lock. | 675 // Called outside of the lock. |
636 DCHECK(thread_number > 0); | 676 DCHECK(thread_number > 0); |
637 | 677 |
638 // The worker is assigned to the list when the thread actually starts, which | 678 // The worker is assigned to the list when the thread actually starts, which |
639 // will manage the memory of the pointer. | 679 // will manage the memory of the pointer. |
640 new Worker(worker_pool_, thread_number, thread_name_prefix_); | 680 new Worker(worker_pool_, thread_number, thread_name_prefix_); |
641 } | 681 } |
642 | 682 |
683 void SequencedWorkerPool::Inner::SignalHasWork() { | |
684 has_work_cv_.Signal(); | |
685 ++has_work_signal_count_; | |
jar (doing other things)
2012/03/12 17:39:13
You are not generally inside a lock, but are manip
akalin
2012/03/12 18:34:34
My mistake. This testing variable is kind of hack
| |
686 } | |
687 | |
643 bool SequencedWorkerPool::Inner::CanShutdown() const { | 688 bool SequencedWorkerPool::Inner::CanShutdown() const { |
644 lock_.AssertAcquired(); | 689 lock_.AssertAcquired(); |
645 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. | 690 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
646 return !thread_being_created_ && | 691 return !thread_being_created_ && |
647 blocking_shutdown_thread_count_ == 0 && | 692 blocking_shutdown_thread_count_ == 0 && |
648 blocking_shutdown_pending_task_count_ == 0; | 693 blocking_shutdown_pending_task_count_ == 0; |
649 } | 694 } |
650 | 695 |
651 // SequencedWorkerPool -------------------------------------------------------- | 696 // SequencedWorkerPool -------------------------------------------------------- |
652 | 697 |
653 SequencedWorkerPool::SequencedWorkerPool( | 698 SequencedWorkerPool::SequencedWorkerPool( |
654 size_t max_threads, | 699 size_t max_threads, |
655 const std::string& thread_name_prefix) | 700 const std::string& thread_name_prefix) |
656 : constructor_message_loop_(MessageLoopProxy::current()), | 701 : constructor_message_loop_(MessageLoopProxy::current()), |
657 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), | 702 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), |
658 max_threads, thread_name_prefix)) { | 703 max_threads, thread_name_prefix)) { |
659 DCHECK(constructor_message_loop_.get()); | 704 DCHECK(constructor_message_loop_.get()); |
660 } | 705 } |
661 | 706 |
662 SequencedWorkerPool::~SequencedWorkerPool() {} | 707 SequencedWorkerPool::~SequencedWorkerPool() {} |
663 | 708 |
664 void SequencedWorkerPool::OnDestruct() const { | 709 void SequencedWorkerPool::OnDestruct() const { |
665 // TODO(akalin): Once we can easily check if we're on a worker | 710 // TODO(akalin): Once we can easily check if we're on a worker |
666 // thread or not, use that instead of restricting destruction to | 711 // thread or not, use that instead of restricting destruction to |
667 // only the constructor message loop. | 712 // only the constructor message loop. |
668 if (constructor_message_loop_->BelongsToCurrentThread()) { | 713 if (constructor_message_loop_->BelongsToCurrentThread()) |
669 delete this; | 714 delete this; |
670 } else { | 715 else |
671 constructor_message_loop_->DeleteSoon(FROM_HERE, this); | 716 constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
672 } | |
673 } | 717 } |
674 | 718 |
675 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 719 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
676 return inner_->GetSequenceToken(); | 720 return inner_->GetSequenceToken(); |
677 } | 721 } |
678 | 722 |
679 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 723 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
680 const std::string& name) { | 724 const std::string& name) { |
681 return inner_->GetNamedSequenceToken(name); | 725 return inner_->GetNamedSequenceToken(name); |
682 } | 726 } |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
741 } | 785 } |
742 | 786 |
743 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
744 return inner_->RunsTasksOnCurrentThread(); | 788 return inner_->RunsTasksOnCurrentThread(); |
745 } | 789 } |
746 | 790 |
747 void SequencedWorkerPool::FlushForTesting() { | 791 void SequencedWorkerPool::FlushForTesting() { |
748 inner_->FlushForTesting(); | 792 inner_->FlushForTesting(); |
749 } | 793 } |
750 | 794 |
795 void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { | |
796 inner_->TriggerSpuriousWorkSignalForTesting(); | |
797 } | |
798 | |
799 int SequencedWorkerPool::GetWorkSignalCountForTesting() const { | |
800 return inner_->GetWorkSignalCountForTesting(); | |
801 } | |
802 | |
751 void SequencedWorkerPool::Shutdown() { | 803 void SequencedWorkerPool::Shutdown() { |
752 inner_->Shutdown(); | 804 inner_->Shutdown(); |
753 } | 805 } |
754 | 806 |
755 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { | 807 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { |
756 inner_->SetTestingObserver(observer); | 808 inner_->SetTestingObserver(observer); |
757 } | 809 } |
758 | 810 |
759 } // namespace base | 811 } // namespace base |
OLD | NEW |