| OLD | NEW |
| 1 // Copyright 2017 The Chromium Authors. All rights reserved. | 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" | 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <memory> | 8 #include <memory> |
| 9 #include <string> | 9 #include <string> |
| 10 #include <utility> |
| 10 | 11 |
| 11 #include "base/bind.h" | 12 #include "base/bind.h" |
| 12 #include "base/callback.h" | 13 #include "base/callback.h" |
| 13 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
| 14 #include "base/single_thread_task_runner.h" | 15 #include "base/single_thread_task_runner.h" |
| 15 #include "base/strings/stringprintf.h" | 16 #include "base/strings/stringprintf.h" |
| 16 #include "base/synchronization/atomic_flag.h" | 17 #include "base/synchronization/atomic_flag.h" |
| 17 #include "base/task_scheduler/delayed_task_manager.h" | 18 #include "base/task_scheduler/delayed_task_manager.h" |
| 18 #include "base/task_scheduler/scheduler_worker.h" | 19 #include "base/task_scheduler/scheduler_worker.h" |
| 19 #include "base/task_scheduler/sequence.h" | 20 #include "base/task_scheduler/sequence.h" |
| 20 #include "base/task_scheduler/task.h" | 21 #include "base/task_scheduler/task.h" |
| 21 #include "base/task_scheduler/task_tracker.h" | 22 #include "base/task_scheduler/task_tracker.h" |
| 22 #include "base/task_scheduler/task_traits.h" | 23 #include "base/task_scheduler/task_traits.h" |
| 23 #include "base/threading/platform_thread.h" | 24 #include "base/threading/platform_thread.h" |
| 24 #include "base/time/time.h" | 25 #include "base/time/time.h" |
| 25 | 26 |
| 27 #if defined(OS_WIN) |
| 28 #include <windows.h> |
| 29 |
| 30 #include "base/win/scoped_com_initializer.h" |
| 31 #endif // defined(OS_WIN) |
| 32 |
| 26 namespace base { | 33 namespace base { |
| 27 namespace internal { | 34 namespace internal { |
| 28 | 35 |
| 29 namespace { | 36 namespace { |
| 30 | 37 |
| 31 // Allows for checking the PlatformThread::CurrentRef() against a set | 38 // Allows for checking the PlatformThread::CurrentRef() against a set |
| 32 // PlatformThreadRef atomically without using locks. | 39 // PlatformThreadRef atomically without using locks. |
| 33 class AtomicThreadRefChecker { | 40 class AtomicThreadRefChecker { |
| 34 public: | 41 public: |
| 35 AtomicThreadRefChecker() = default; | 42 AtomicThreadRefChecker() = default; |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 122 // Synchronizes access to |sequence_| and |has_work_|. | 129 // Synchronizes access to |sequence_| and |has_work_|. |
| 123 SchedulerLock sequence_lock_; | 130 SchedulerLock sequence_lock_; |
| 124 scoped_refptr<Sequence> sequence_ = new Sequence; | 131 scoped_refptr<Sequence> sequence_ = new Sequence; |
| 125 bool has_work_ = false; | 132 bool has_work_ = false; |
| 126 | 133 |
| 127 AtomicThreadRefChecker thread_ref_checker_; | 134 AtomicThreadRefChecker thread_ref_checker_; |
| 128 | 135 |
| 129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); | 136 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
| 130 }; | 137 }; |
| 131 | 138 |
| 139 #if defined(OS_WIN) |
| 140 |
| 141 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate { |
| 142 public: |
| 143 SchedulerWorkerCOMDelegate(const std::string& thread_name, |
| 144 TaskTracker* task_tracker) |
| 145 : SchedulerWorkerDelegate(thread_name), task_tracker_(task_tracker) {} |
| 146 |
| 147 ~SchedulerWorkerCOMDelegate() override { DCHECK(!scoped_com_initializer_); } |
| 148 |
| 149 // SchedulerWorker::Delegate: |
| 150 void OnMainEntry(SchedulerWorker* worker) override { |
| 151 SchedulerWorkerDelegate::OnMainEntry(worker); |
| 152 |
| 153 scoped_com_initializer_ = MakeUnique<win::ScopedCOMInitializer>(); |
| 154 } |
| 155 |
| 156 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { |
| 157 // This scheme below allows us to cover the following scenarios: |
| 158 // * Only SchedulerWorkerDelegate::GetWork() has work: |
| 159 // Always return the sequence from GetWork(). |
| 160 // * Only the Windows Message Queue has work: |
| 161 // Always return the sequence from GetWorkFromWindowsMessageQueue(); |
| 162 // * Both SchedulerWorkerDelegate::GetWork() and the Windows Message Queue |
| 163 // have work: |
| 164 // Process sequences from each source round-robin style. |
| 165 scoped_refptr<Sequence> sequence; |
| 166 if (get_work_first_) { |
| 167 sequence = SchedulerWorkerDelegate::GetWork(worker); |
| 168 if (sequence) |
| 169 get_work_first_ = false; |
| 170 } |
| 171 |
| 172 if (!sequence) { |
| 173 sequence = GetWorkFromWindowsMessageQueue(); |
| 174 if (sequence) |
| 175 get_work_first_ = true; |
| 176 } |
| 177 |
| 178 if (!sequence && !get_work_first_) { |
| 179 // This case is important if we checked the Windows Message Queue first |
| 180 // and found there was no work. We don't want to return null immediately |
| 181 // as that could cause the thread to go to sleep while work is waiting via |
| 182 // SchedulerWorkerDelegate::GetWork(). |
| 183 sequence = SchedulerWorkerDelegate::GetWork(worker); |
| 184 } |
| 185 return sequence; |
| 186 } |
| 187 |
| 188 void OnMainExit() override { scoped_com_initializer_.reset(); } |
| 189 |
| 190 void WaitForWork(WaitableEvent* wake_up_event) override { |
| 191 DCHECK(wake_up_event); |
| 192 const TimeDelta sleep_time = GetSleepTimeout(); |
| 193 const DWORD milliseconds_wait = |
| 194 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds(); |
| 195 HANDLE wake_up_event_handle = wake_up_event->handle(); |
| 196 DWORD result = MsgWaitForMultipleObjectsEx( |
| 197 1, &wake_up_event_handle, milliseconds_wait, QS_ALLINPUT, 0); |
| 198 if (result == WAIT_OBJECT_0) { |
| 199 // Reset the event since we woke up due to it. |
| 200 wake_up_event->Reset(); |
| 201 } |
| 202 } |
| 203 |
| 204 private: |
| 205 scoped_refptr<Sequence> GetWorkFromWindowsMessageQueue() { |
| 206 MSG msg; |
| 207 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) { |
| 208 auto pump_message_task = |
| 209 MakeUnique<Task>(FROM_HERE, |
| 210 Bind( |
| 211 [](MSG msg) { |
| 212 TranslateMessage(&msg); |
| 213 DispatchMessage(&msg); |
| 214 }, |
| 215 std::move(msg)), |
| 216 TaskTraits().MayBlock(), TimeDelta()); |
| 217 if (task_tracker_->WillPostTask(pump_message_task.get())) { |
| 218 bool was_empty = |
| 219 message_pump_sequence_->PushTask(std::move(pump_message_task)); |
| 220 DCHECK(was_empty) << "GetWorkFromWindowsMessageQueue() does not expect " |
| 221 "queueing of pump tasks."; |
| 222 return message_pump_sequence_; |
| 223 } |
| 224 } |
| 225 return nullptr; |
| 226 } |
| 227 |
| 228 bool get_work_first_ = true; |
| 229 const scoped_refptr<Sequence> message_pump_sequence_ = new Sequence; |
| 230 TaskTracker* const task_tracker_; |
| 231 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_; |
| 232 |
| 233 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate); |
| 234 }; |
| 235 |
| 236 #endif // defined(OS_WIN) |
| 237 |
| 132 } // namespace | 238 } // namespace |
| 133 | 239 |
| 134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner | 240 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| 135 : public SingleThreadTaskRunner { | 241 : public SingleThreadTaskRunner { |
| 136 public: | 242 public: |
| 137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the | 243 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
| 138 // lifetime of a dedicated |worker| for |traits|. | 244 // lifetime of a dedicated |worker| for |traits|. |
| 139 SchedulerSingleThreadTaskRunner( | 245 SchedulerSingleThreadTaskRunner( |
| 140 SchedulerSingleThreadTaskRunnerManager* const outer, | 246 SchedulerSingleThreadTaskRunnerManager* const outer, |
| 141 const TaskTraits& traits, | 247 const TaskTraits& traits, |
| (...skipping 18 matching lines...) Expand all Loading... |
| 160 } else { | 266 } else { |
| 161 outer_->delayed_task_manager_->AddDelayedTask( | 267 outer_->delayed_task_manager_->AddDelayedTask( |
| 162 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, | 268 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, |
| 163 Unretained(this))); | 269 Unretained(this))); |
| 164 } | 270 } |
| 165 return true; | 271 return true; |
| 166 } | 272 } |
| 167 | 273 |
| 168 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 274 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| 169 const Closure& closure, | 275 const Closure& closure, |
| 170 base::TimeDelta delay) override { | 276 TimeDelta delay) override { |
| 171 // Tasks are never nested within the task scheduler. | 277 // Tasks are never nested within the task scheduler. |
| 172 return PostDelayedTask(from_here, closure, delay); | 278 return PostDelayedTask(from_here, closure, delay); |
| 173 } | 279 } |
| 174 | 280 |
| 175 bool RunsTasksOnCurrentThread() const override { | 281 bool RunsTasksOnCurrentThread() const override { |
| 176 return GetDelegate()->RunsTasksOnCurrentThread(); | 282 return GetDelegate()->RunsTasksOnCurrentThread(); |
| 177 } | 283 } |
| 178 | 284 |
| 179 private: | 285 private: |
| 180 ~SchedulerSingleThreadTaskRunner() override { | 286 ~SchedulerSingleThreadTaskRunner() override { |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 230 subtle::NoBarrier_Load(&workers_unregistered_during_join_); | 336 subtle::NoBarrier_Load(&workers_unregistered_during_join_); |
| 231 DCHECK_EQ(workers_unregistered_during_join, workers_.size()) | 337 DCHECK_EQ(workers_unregistered_during_join, workers_.size()) |
| 232 << "There cannot be outstanding SingleThreadTaskRunners upon destruction " | 338 << "There cannot be outstanding SingleThreadTaskRunners upon destruction " |
| 233 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; | 339 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; |
| 234 #endif | 340 #endif |
| 235 } | 341 } |
| 236 | 342 |
| 237 scoped_refptr<SingleThreadTaskRunner> | 343 scoped_refptr<SingleThreadTaskRunner> |
| 238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( | 344 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
| 239 const TaskTraits& traits) { | 345 const TaskTraits& traits) { |
| 240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); | 346 return CreateSingleThreadTaskRunnerWithDelegate<SchedulerWorkerDelegate>( |
| 241 DCHECK_LT(index, worker_pool_params_vector_.size()); | 347 traits); |
| 242 return new SchedulerSingleThreadTaskRunner( | |
| 243 this, traits, | |
| 244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); | |
| 245 } | 348 } |
| 246 | 349 |
| 350 #if defined(OS_WIN) |
| 351 scoped_refptr<SingleThreadTaskRunner> |
| 352 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits( |
| 353 const TaskTraits& traits) { |
| 354 return CreateSingleThreadTaskRunnerWithDelegate<SchedulerWorkerCOMDelegate>( |
| 355 traits); |
| 356 } |
| 357 #endif // defined(OS_WIN) |
| 358 |
| 247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { | 359 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { |
| 248 decltype(workers_) local_workers; | 360 decltype(workers_) local_workers; |
| 249 { | 361 { |
| 250 AutoSchedulerLock auto_lock(workers_lock_); | 362 AutoSchedulerLock auto_lock(workers_lock_); |
| 251 local_workers = std::move(workers_); | 363 local_workers = std::move(workers_); |
| 252 } | 364 } |
| 253 | 365 |
| 254 for (const auto& worker : local_workers) | 366 for (const auto& worker : local_workers) |
| 255 worker->JoinForTesting(); | 367 worker->JoinForTesting(); |
| 256 | 368 |
| 257 { | 369 { |
| 258 AutoSchedulerLock auto_lock(workers_lock_); | 370 AutoSchedulerLock auto_lock(workers_lock_); |
| 259 DCHECK(workers_.empty()) | 371 DCHECK(workers_.empty()) |
| 260 << "New worker(s) unexpectedly registered during join."; | 372 << "New worker(s) unexpectedly registered during join."; |
| 261 workers_ = std::move(local_workers); | 373 workers_ = std::move(local_workers); |
| 262 } | 374 } |
| 263 } | 375 } |
| 264 | 376 |
| 377 template <typename DelegateType> |
| 378 scoped_refptr<SingleThreadTaskRunner> SchedulerSingleThreadTaskRunnerManager:: |
| 379 CreateSingleThreadTaskRunnerWithDelegate(const TaskTraits& traits) { |
| 380 size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| 381 DCHECK_LT(index, worker_pool_params_vector_.size()); |
| 382 return new SchedulerSingleThreadTaskRunner( |
| 383 this, traits, |
| 384 CreateAndRegisterSchedulerWorker<DelegateType>( |
| 385 worker_pool_params_vector_[index])); |
| 386 } |
| 387 |
| 388 template <> |
| 389 std::unique_ptr<SchedulerWorkerDelegate> |
| 390 SchedulerSingleThreadTaskRunnerManager::CreateSchedulerWorkerDelegate< |
| 391 SchedulerWorkerDelegate>(const SchedulerWorkerPoolParams& params, int id) { |
| 392 return MakeUnique<SchedulerWorkerDelegate>(StringPrintf( |
| 393 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); |
| 394 } |
| 395 |
| 396 #if defined(OS_WIN) |
| 397 template <> |
| 398 std::unique_ptr<SchedulerWorkerDelegate> |
| 399 SchedulerSingleThreadTaskRunnerManager::CreateSchedulerWorkerDelegate< |
| 400 SchedulerWorkerCOMDelegate>(const SchedulerWorkerPoolParams& params, |
| 401 int id) { |
| 402 return MakeUnique<SchedulerWorkerCOMDelegate>( |
| 403 StringPrintf("TaskSchedulerSingleThreadWorker%d%sCOMSTA", id, |
| 404 params.name().c_str()), |
| 405 task_tracker_); |
| 406 } |
| 407 #endif // defined(OS_WIN) |
| 408 |
| 409 template <typename DelegateType> |
| 265 SchedulerWorker* | 410 SchedulerWorker* |
| 266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( | 411 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( |
| 267 const SchedulerWorkerPoolParams& params) { | 412 const SchedulerWorkerPoolParams& params) { |
| 268 AutoSchedulerLock auto_lock(workers_lock_); | 413 AutoSchedulerLock auto_lock(workers_lock_); |
| 269 int id = next_worker_id_++; | 414 int id = next_worker_id_++; |
| 270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( | 415 |
| 271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); | |
| 272 workers_.emplace_back(SchedulerWorker::Create( | 416 workers_.emplace_back(SchedulerWorker::Create( |
| 273 params.priority_hint(), std::move(delegate), task_tracker_, | 417 params.priority_hint(), |
| 418 CreateSchedulerWorkerDelegate<DelegateType>(params, id), task_tracker_, |
| 274 SchedulerWorker::InitialState::DETACHED)); | 419 SchedulerWorker::InitialState::DETACHED)); |
| 275 return workers_.back().get(); | 420 return workers_.back().get(); |
| 276 } | 421 } |
| 277 | 422 |
| 278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( | 423 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| 279 SchedulerWorker* worker) { | 424 SchedulerWorker* worker) { |
| 280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing | 425 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
| 281 // |workers_lock_|. | 426 // |workers_lock_|. |
| 282 scoped_refptr<SchedulerWorker> worker_to_destroy; | 427 scoped_refptr<SchedulerWorker> worker_to_destroy; |
| 283 { | 428 { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 299 }); | 444 }); |
| 300 DCHECK(worker_iter != workers_.end()); | 445 DCHECK(worker_iter != workers_.end()); |
| 301 worker_to_destroy = std::move(*worker_iter); | 446 worker_to_destroy = std::move(*worker_iter); |
| 302 workers_.erase(worker_iter); | 447 workers_.erase(worker_iter); |
| 303 } | 448 } |
| 304 worker_to_destroy->Cleanup(); | 449 worker_to_destroy->Cleanup(); |
| 305 } | 450 } |
| 306 | 451 |
| 307 } // namespace internal | 452 } // namespace internal |
| 308 } // namespace base | 453 } // namespace base |
| OLD | NEW |