| OLD | NEW |
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/threading/worker_pool_posix.h" | 5 #include "base/threading/worker_pool_posix.h" |
| 6 | 6 |
| 7 #include "base/bind.h" |
| 7 #include "base/lazy_instance.h" | 8 #include "base/lazy_instance.h" |
| 8 #include "base/logging.h" | 9 #include "base/logging.h" |
| 9 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| 10 #include "base/stringprintf.h" | 11 #include "base/stringprintf.h" |
| 11 #include "base/task.h" | 12 #include "base/task.h" |
| 12 #include "base/threading/platform_thread.h" | 13 #include "base/threading/platform_thread.h" |
| 13 #include "base/threading/worker_pool.h" | 14 #include "base/threading/worker_pool.h" |
| 15 #include "base/tracked_objects.h" |
| 14 | 16 |
| 15 namespace base { | 17 namespace base { |
| 16 | 18 |
| 17 namespace { | 19 namespace { |
| 18 | 20 |
| 19 const int kIdleSecondsBeforeExit = 10 * 60; | 21 const int kIdleSecondsBeforeExit = 10 * 60; |
| 20 // A stack size of 64 KB is too small for the CERT_PKIXVerifyCert | 22 // A stack size of 64 KB is too small for the CERT_PKIXVerifyCert |
| 21 // function of NSS because of NSS bug 439169. | 23 // function of NSS because of NSS bug 439169. |
| 22 const int kWorkerThreadStackSize = 128 * 1024; | 24 const int kWorkerThreadStackSize = 128 * 1024; |
| 23 | 25 |
| 24 class WorkerPoolImpl { | 26 class WorkerPoolImpl { |
| 25 public: | 27 public: |
| 26 WorkerPoolImpl(); | 28 WorkerPoolImpl(); |
| 27 ~WorkerPoolImpl(); | 29 ~WorkerPoolImpl(); |
| 28 | 30 |
| 29 void PostTask(const tracked_objects::Location& from_here, Task* task, | 31 void PostTask(const tracked_objects::Location& from_here, Task* task, |
| 30 bool task_is_slow); | 32 bool task_is_slow); |
| 33 void PostTask(const tracked_objects::Location& from_here, |
| 34 const base::Closure& task, bool task_is_slow); |
| 31 | 35 |
| 32 private: | 36 private: |
| 33 scoped_refptr<base::PosixDynamicThreadPool> pool_; | 37 scoped_refptr<base::PosixDynamicThreadPool> pool_; |
| 34 }; | 38 }; |
| 35 | 39 |
| 36 WorkerPoolImpl::WorkerPoolImpl() | 40 WorkerPoolImpl::WorkerPoolImpl() |
| 37 : pool_(new base::PosixDynamicThreadPool("WorkerPool", | 41 : pool_(new base::PosixDynamicThreadPool("WorkerPool", |
| 38 kIdleSecondsBeforeExit)) { | 42 kIdleSecondsBeforeExit)) { |
| 39 } | 43 } |
| 40 | 44 |
| 41 WorkerPoolImpl::~WorkerPoolImpl() { | 45 WorkerPoolImpl::~WorkerPoolImpl() { |
| 42 pool_->Terminate(); | 46 pool_->Terminate(); |
| 43 } | 47 } |
| 44 | 48 |
| 45 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, | 49 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, |
| 46 Task* task, bool task_is_slow) { | 50 Task* task, bool task_is_slow) { |
| 47 task->SetBirthPlace(from_here); | 51 pool_->PostTask(from_here, task); |
| 48 pool_->PostTask(task); | 52 } |
| 53 |
| 54 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, |
| 55 const base::Closure& task, bool task_is_slow) { |
| 56 pool_->PostTask(from_here, task); |
| 49 } | 57 } |
| 50 | 58 |
| 51 base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); | 59 base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); |
| 52 | 60 |
| 53 class WorkerThread : public PlatformThread::Delegate { | 61 class WorkerThread : public PlatformThread::Delegate { |
| 54 public: | 62 public: |
| 55 WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, | 63 WorkerThread(const std::string& name_prefix, |
| 56 base::PosixDynamicThreadPool* pool) | 64 base::PosixDynamicThreadPool* pool) |
| 57 : name_prefix_(name_prefix), | 65 : name_prefix_(name_prefix), |
| 58 idle_seconds_before_exit_(idle_seconds_before_exit), | |
| 59 pool_(pool) {} | 66 pool_(pool) {} |
| 60 | 67 |
| 61 virtual void ThreadMain(); | 68 virtual void ThreadMain(); |
| 62 | 69 |
| 63 private: | 70 private: |
| 64 const std::string name_prefix_; | 71 const std::string name_prefix_; |
| 65 const int idle_seconds_before_exit_; | |
| 66 scoped_refptr<base::PosixDynamicThreadPool> pool_; | 72 scoped_refptr<base::PosixDynamicThreadPool> pool_; |
| 67 | 73 |
| 68 DISALLOW_COPY_AND_ASSIGN(WorkerThread); | 74 DISALLOW_COPY_AND_ASSIGN(WorkerThread); |
| 69 }; | 75 }; |
| 70 | 76 |
| 71 void WorkerThread::ThreadMain() { | 77 void WorkerThread::ThreadMain() { |
| 72 const std::string name = base::StringPrintf( | 78 const std::string name = base::StringPrintf( |
| 73 "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); | 79 "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); |
| 74 PlatformThread::SetName(name.c_str()); | 80 PlatformThread::SetName(name.c_str()); |
| 75 | 81 |
| 76 for (;;) { | 82 for (;;) { |
| 77 Task* task = pool_->WaitForTask(); | 83 PosixDynamicThreadPool::PendingTask pending_task = pool_->WaitForTask(); |
| 78 if (!task) | 84 if (pending_task.task.is_null()) |
| 79 break; | 85 break; |
| 80 task->Run(); | 86 pending_task.task.Run(); |
| 81 delete task; | |
| 82 } | 87 } |
| 83 | 88 |
| 84 // The WorkerThread is non-joinable, so it deletes itself. | 89 // The WorkerThread is non-joinable, so it deletes itself. |
| 85 delete this; | 90 delete this; |
| 86 } | 91 } |
| 87 | 92 |
| 88 } // namespace | 93 } // namespace |
| 89 | 94 |
| 90 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, | 95 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, |
| 91 Task* task, bool task_is_slow) { | 96 Task* task, bool task_is_slow) { |
| 92 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); | 97 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); |
| 93 return true; | 98 return true; |
| 94 } | 99 } |
| 95 | 100 |
| 101 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, |
| 102 const base::Closure& task, bool task_is_slow) { |
| 103 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); |
| 104 return true; |
| 105 } |
| 106 |
| 107 PosixDynamicThreadPool::PendingTask::PendingTask( |
| 108 const tracked_objects::Location& posted_from, |
| 109 const base::Closure& task) |
| 110 : task(task) { |
| 111 } |
| 112 |
| 113 PosixDynamicThreadPool::PendingTask::~PendingTask() { |
| 114 } |
| 115 |
| 96 PosixDynamicThreadPool::PosixDynamicThreadPool( | 116 PosixDynamicThreadPool::PosixDynamicThreadPool( |
| 97 const std::string& name_prefix, | 117 const std::string& name_prefix, |
| 98 int idle_seconds_before_exit) | 118 int idle_seconds_before_exit) |
| 99 : name_prefix_(name_prefix), | 119 : name_prefix_(name_prefix), |
| 100 idle_seconds_before_exit_(idle_seconds_before_exit), | 120 idle_seconds_before_exit_(idle_seconds_before_exit), |
| 101 tasks_available_cv_(&lock_), | 121 pending_tasks_available_cv_(&lock_), |
| 102 num_idle_threads_(0), | 122 num_idle_threads_(0), |
| 103 terminated_(false), | 123 terminated_(false), |
| 104 num_idle_threads_cv_(NULL) {} | 124 num_idle_threads_cv_(NULL) {} |
| 105 | 125 |
| 106 PosixDynamicThreadPool::~PosixDynamicThreadPool() { | 126 PosixDynamicThreadPool::~PosixDynamicThreadPool() { |
| 107 while (!tasks_.empty()) { | 127 while (!pending_tasks_.empty()) { |
| 108 Task* task = tasks_.front(); | 128 PendingTask pending_task = pending_tasks_.front(); |
| 109 tasks_.pop(); | 129 pending_tasks_.pop(); |
| 110 delete task; | |
| 111 } | 130 } |
| 112 } | 131 } |
| 113 | 132 |
| 114 void PosixDynamicThreadPool::Terminate() { | 133 void PosixDynamicThreadPool::Terminate() { |
| 115 { | 134 { |
| 116 AutoLock locked(lock_); | 135 AutoLock locked(lock_); |
| 117 DCHECK(!terminated_) << "Thread pool is already terminated."; | 136 DCHECK(!terminated_) << "Thread pool is already terminated."; |
| 118 terminated_ = true; | 137 terminated_ = true; |
| 119 } | 138 } |
| 120 tasks_available_cv_.Broadcast(); | 139 pending_tasks_available_cv_.Broadcast(); |
| 121 } | 140 } |
| 122 | 141 |
| 123 void PosixDynamicThreadPool::PostTask(Task* task) { | 142 void PosixDynamicThreadPool::PostTask( |
| 143 const tracked_objects::Location& from_here, |
| 144 Task* task) { |
| 145 PendingTask pending_task(from_here, |
| 146 base::Bind(&subtle::TaskClosureAdapter::Run, |
| 147 new subtle::TaskClosureAdapter(task))); |
| 148 // |pending_task| and AddTask() work in conjunction here to ensure that after |
| 149 // a successful AddTask(), the TaskClosureAdapter object is deleted on the |
| 150 // worker thread. In AddTask(), the reference |pending_task.task| is handed |
| 151 // off in a destructive manner to ensure that the local copy of |
| 152 // |pending_task| doesn't keep a ref on the Closure causing the |
| 153 // TaskClosureAdapter to be deleted on the wrong thread. |
| 154 AddTask(&pending_task); |
| 155 } |
| 156 |
| 157 void PosixDynamicThreadPool::PostTask( |
| 158 const tracked_objects::Location& from_here, |
| 159 const base::Closure& task) { |
| 160 PendingTask pending_task(from_here, task); |
| 161 AddTask(&pending_task); |
| 162 } |
| 163 |
| 164 void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { |
| 124 AutoLock locked(lock_); | 165 AutoLock locked(lock_); |
| 125 DCHECK(!terminated_) << | 166 DCHECK(!terminated_) << |
| 126 "This thread pool is already terminated. Do not post new tasks."; | 167 "This thread pool is already terminated. Do not post new tasks."; |
| 127 | 168 |
| 128 tasks_.push(task); | 169 pending_tasks_.push(*pending_task); |
| 170 pending_task->task.Reset(); |
| 129 | 171 |
| 130 // We have enough worker threads. | 172 // We have enough worker threads. |
| 131 if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { | 173 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { |
| 132 tasks_available_cv_.Signal(); | 174 pending_tasks_available_cv_.Signal(); |
| 133 } else { | 175 } else { |
| 134 // The new PlatformThread will take ownership of the WorkerThread object, | 176 // The new PlatformThread will take ownership of the WorkerThread object, |
| 135 // which will delete itself on exit. | 177 // which will delete itself on exit. |
| 136 WorkerThread* worker = | 178 WorkerThread* worker = |
| 137 new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); | 179 new WorkerThread(name_prefix_, this); |
| 138 PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); | 180 PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); |
| 139 } | 181 } |
| 140 } | 182 } |
| 141 | 183 |
| 142 Task* PosixDynamicThreadPool::WaitForTask() { | 184 PosixDynamicThreadPool::PendingTask PosixDynamicThreadPool::WaitForTask() { |
| 143 AutoLock locked(lock_); | 185 AutoLock locked(lock_); |
| 144 | 186 |
| 145 if (terminated_) | 187 if (terminated_) |
| 146 return NULL; | 188 return PendingTask(FROM_HERE, base::Closure()); |
| 147 | 189 |
| 148 if (tasks_.empty()) { // No work available, wait for work. | 190 if (pending_tasks_.empty()) { // No work available, wait for work. |
| 149 num_idle_threads_++; | 191 num_idle_threads_++; |
| 150 if (num_idle_threads_cv_.get()) | 192 if (num_idle_threads_cv_.get()) |
| 151 num_idle_threads_cv_->Signal(); | 193 num_idle_threads_cv_->Signal(); |
| 152 tasks_available_cv_.TimedWait( | 194 pending_tasks_available_cv_.TimedWait( |
| 153 TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); | 195 TimeDelta::FromSeconds(idle_seconds_before_exit_)); |
| 154 num_idle_threads_--; | 196 num_idle_threads_--; |
| 155 if (num_idle_threads_cv_.get()) | 197 if (num_idle_threads_cv_.get()) |
| 156 num_idle_threads_cv_->Signal(); | 198 num_idle_threads_cv_->Signal(); |
| 157 if (tasks_.empty()) { | 199 if (pending_tasks_.empty()) { |
| 158 // We waited for work, but there's still no work. Return NULL to signal | 200 // We waited for work, but there's still no work. Return NULL to signal |
| 159 // the thread to terminate. | 201 // the thread to terminate. |
| 160 return NULL; | 202 return PendingTask(FROM_HERE, base::Closure()); |
| 161 } | 203 } |
| 162 } | 204 } |
| 163 | 205 |
| 164 Task* task = tasks_.front(); | 206 PendingTask pending_task = pending_tasks_.front(); |
| 165 tasks_.pop(); | 207 pending_tasks_.pop(); |
| 166 return task; | 208 return pending_task; |
| 167 } | 209 } |
| 168 | 210 |
| 169 } // namespace base | 211 } // namespace base |
| OLD | NEW |