Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2017 The Chromium Authors. All rights reserved. | 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" | 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <memory> | 8 #include <memory> |
| 9 #include <string> | 9 #include <string> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/callback.h" | 12 #include "base/callback.h" |
| 13 #include "base/memory/ptr_util.h" | 13 #include "base/memory/ptr_util.h" |
| 14 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
| 15 #include "base/strings/stringprintf.h" | 15 #include "base/strings/stringprintf.h" |
| 16 #include "base/synchronization/atomic_flag.h" | 16 #include "base/synchronization/atomic_flag.h" |
| 17 #include "base/task_scheduler/delayed_task_manager.h" | 17 #include "base/task_scheduler/delayed_task_manager.h" |
| 18 #include "base/task_scheduler/scheduler_worker.h" | 18 #include "base/task_scheduler/scheduler_worker.h" |
| 19 #include "base/task_scheduler/sequence.h" | 19 #include "base/task_scheduler/sequence.h" |
| 20 #include "base/task_scheduler/task.h" | 20 #include "base/task_scheduler/task.h" |
| 21 #include "base/task_scheduler/task_tracker.h" | 21 #include "base/task_scheduler/task_tracker.h" |
| 22 #include "base/task_scheduler/task_traits.h" | 22 #include "base/task_scheduler/task_traits.h" |
| 23 #include "base/threading/platform_thread.h" | 23 #include "base/threading/platform_thread.h" |
| 24 #include "base/time/time.h" | 24 #include "base/time/time.h" |
| 25 #include "build/build_config.h" | |
|
fdoray
2017/03/21 15:23:12
Not needed if you include it in the .h.
robliao
2017/03/21 20:00:19
Done.
| |
| 26 | |
| 27 #if defined(OS_WIN) | |
| 28 #include <windows.h> | |
| 29 | |
| 30 #include "base/win/scoped_com_initializer.h" | |
| 31 #endif | |
|
gab
2017/03/21 21:09:34
#endif // defined(OS_WIN)
robliao
2017/03/21 22:25:32
Ah, I thought we didn't want these for certain sho
gab
2017/03/22 16:16:04
Yeah, it's kind of an hand-wavy rule, for tight sc
| |
| 25 | 32 |
| 26 namespace base { | 33 namespace base { |
| 27 namespace internal { | 34 namespace internal { |
| 28 | 35 |
| 29 namespace { | 36 namespace { |
| 30 | 37 |
| 31 // Allows for checking the PlatformThread::CurrentRef() against a set | 38 // Allows for checking the PlatformThread::CurrentRef() against a set |
| 32 // PlatformThreadRef atomically without using locks. | 39 // PlatformThreadRef atomically without using locks. |
| 33 class AtomicThreadRefChecker { | 40 class AtomicThreadRefChecker { |
| 34 public: | 41 public: |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 122 // Synchronizes access to |sequence_| and |has_work_|. | 129 // Synchronizes access to |sequence_| and |has_work_|. |
| 123 SchedulerLock sequence_lock_; | 130 SchedulerLock sequence_lock_; |
| 124 scoped_refptr<Sequence> sequence_ = new Sequence; | 131 scoped_refptr<Sequence> sequence_ = new Sequence; |
| 125 bool has_work_ = false; | 132 bool has_work_ = false; |
| 126 | 133 |
| 127 AtomicThreadRefChecker thread_ref_checker_; | 134 AtomicThreadRefChecker thread_ref_checker_; |
| 128 | 135 |
| 129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); | 136 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); |
| 130 }; | 137 }; |
| 131 | 138 |
| 139 #if defined(OS_WIN) | |
| 140 | |
| 141 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate { | |
| 142 public: | |
| 143 SchedulerWorkerCOMDelegate(const std::string& thread_name, | |
| 144 TaskTracker* task_tracker) | |
| 145 : SchedulerWorkerDelegate(thread_name), task_tracker_(task_tracker) {} | |
|
fdoray
2017/03/21 15:23:12
~SchedulerWorkerCOMDelegate() {
DCHECK(!scoped_c
robliao
2017/03/21 20:00:20
Done.
| |
| 146 | |
| 147 // SchedulerWorker::Delegate: | |
| 148 void OnMainEntry(SchedulerWorker* worker) override { | |
| 149 SchedulerWorkerDelegate::OnMainEntry(worker); | |
| 150 | |
| 151 scoped_com_initializer_ = MakeUnique<win::ScopedCOMInitializer>(); | |
| 152 } | |
| 153 | |
| 154 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override { | |
| 155 // This scheme below allows us to cover the following scenarios: | |
| 156 // * Tasks only come from SchedulerWorkerDelegate::GetWork(): | |
|
gab
2017/03/21 21:09:34
s/only come/are only coming/
here and below? Fee
robliao
2017/03/21 22:25:32
Rephrased
// This scheme below allows us to co
| |
| 157 // Only return the sequence from GetWork(). | |
| 158 // * Tasks only come from the Windows Message Queue: | |
| 159 // Only return the sequence from GetWorkFromWindowsMessageQueue(); | |
| 160 // * Tasks come from both SchedulerWorkerDelegate::GetWork() and | |
| 161 // the Windows Message Queue: | |
| 162 // Process sequences from each source round-robin style. | |
| 163 scoped_refptr<Sequence> sequence; | |
| 164 if (get_work_first_) { | |
| 165 sequence = SchedulerWorkerDelegate::GetWork(worker); | |
| 166 if (sequence) | |
| 167 get_work_first_ = false; | |
| 168 } | |
| 169 | |
| 170 if (!sequence) { | |
| 171 sequence = GetWorkFromWindowsMessageQueue(); | |
| 172 if (sequence) | |
| 173 get_work_first_ = true; | |
| 174 } | |
| 175 | |
| 176 if (!sequence && !get_work_first_) { | |
| 177 // This case is important if we checked the Windows Message Queue first | |
| 178 // and found there was no work. We don't want to return null immediately | |
| 179 // as that could cause the thread to go to sleep while work is waiting via | |
| 180 // SchedulerWorkerDelegate::GetWork(). As the same time, we don't want to | |
|
gab
2017/03/21 21:09:33
s/As/At/
(but overall I don't think this last sen
robliao
2017/03/21 22:25:32
Removed.
| |
| 181 // mark |get_work_first_| to continue to check the message queue first | |
| 182 // after this sequence is returned. | |
| 183 sequence = SchedulerWorkerDelegate::GetWork(worker); | |
| 184 } | |
| 185 return sequence; | |
| 186 } | |
| 187 | |
| 188 void OnMainExit() override { scoped_com_initializer_.reset(); } | |
| 189 | |
| 190 void WaitForWork(WaitableEvent* wake_up_event) override { | |
| 191 DCHECK(wake_up_event); | |
| 192 const TimeDelta sleep_time = GetSleepTimeout(); | |
| 193 const DWORD milliseconds_wait = | |
| 194 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds(); | |
| 195 HANDLE wake_up_event_handle = wake_up_event->handle(); | |
| 196 DWORD result = MsgWaitForMultipleObjectsEx( | |
| 197 1, &wake_up_event_handle, milliseconds_wait, QS_ALLINPUT, 0); | |
| 198 if (result == WAIT_OBJECT_0) { | |
| 199 // Reset the event since we woke up due to it. | |
| 200 wake_up_event->Reset(); | |
| 201 } | |
| 202 } | |
|
fdoray
2017/03/21 15:23:12
void ReEnqueueSequence(scoped_refptr<Sequence> seq
robliao
2017/03/21 20:00:19
This check is already covered by SchedulerWorkerDe
| |
| 203 | |
| 204 private: | |
| 205 scoped_refptr<Sequence> GetWorkFromWindowsMessageQueue() { | |
| 206 MSG msg; | |
| 207 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) { | |
| 208 auto pump_message_task = | |
| 209 MakeUnique<Task>(FROM_HERE, | |
| 210 base::Bind( | |
| 211 [](MSG msg_in) { | |
| 212 MSG msg = msg_in; | |
|
gab
2017/03/21 21:09:34
Why do you need this extra variable?
robliao
2017/03/21 22:25:32
Because in the shuffle I lost a "const MSG& msg_in
gab
2017/03/22 16:16:04
sgtm
| |
| 213 TranslateMessage(&msg); | |
| 214 DispatchMessage(&msg); | |
| 215 }, | |
| 216 msg), | |
|
gab
2017/03/21 21:09:34
std::move(msg) ? Not sure if MSG is moveable but i
robliao
2017/03/21 22:25:32
sgtm. No qualms with that. It is just a struct und
| |
| 217 TaskTraits().MayBlock(), TimeDelta()); | |
| 218 if (task_tracker_->WillPostTask(pump_message_task.get())) { | |
|
gab
2017/03/21 21:09:33
Otherwise do we have to tell Windows we're droppin
robliao
2017/03/21 22:25:32
Generally, the only thing you can do with a messag
gab
2017/03/22 16:16:04
Got it, and I guess SendMessage would return with
robliao
2017/03/22 17:56:52
Hrm... that's a good question. One of the things a
| |
| 219 message_pump_sequence_->PushTask(std::move(pump_message_task)); | |
|
fdoray
2017/03/21 15:23:12
bool was_empty = message_pump_sequence_->PushTask(
robliao
2017/03/21 20:00:19
Done.
| |
| 220 return message_pump_sequence_; | |
| 221 } | |
| 222 } | |
| 223 return nullptr; | |
| 224 } | |
| 225 | |
| 226 bool get_work_first_ = true; | |
| 227 scoped_refptr<Sequence> message_pump_sequence_ = new Sequence; | |
|
fdoray
2017/03/21 15:23:12
>>const<< scoped_refptr<Sequence> message_pump_seq
robliao
2017/03/21 20:00:19
Done.
| |
| 228 TaskTracker* const task_tracker_; | |
| 229 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_; | |
| 230 | |
| 231 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate); | |
| 232 }; | |
| 233 | |
| 234 #endif // defined(OS_WIN) | |
| 235 | |
| 132 } // namespace | 236 } // namespace |
| 133 | 237 |
| 134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner | 238 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner |
| 135 : public SingleThreadTaskRunner { | 239 : public SingleThreadTaskRunner { |
| 136 public: | 240 public: |
| 137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the | 241 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the |
| 138 // lifetime of a dedicated |worker| for |traits|. | 242 // lifetime of a dedicated |worker| for |traits|. |
| 139 SchedulerSingleThreadTaskRunner( | 243 SchedulerSingleThreadTaskRunner( |
| 140 SchedulerSingleThreadTaskRunnerManager* const outer, | 244 SchedulerSingleThreadTaskRunnerManager* const outer, |
| 141 const TaskTraits& traits, | 245 const TaskTraits& traits, |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 234 #endif | 338 #endif |
| 235 } | 339 } |
| 236 | 340 |
| 237 scoped_refptr<SingleThreadTaskRunner> | 341 scoped_refptr<SingleThreadTaskRunner> |
| 238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( | 342 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( |
| 239 const TaskTraits& traits) { | 343 const TaskTraits& traits) { |
| 240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); | 344 size_t index = worker_pool_index_for_traits_callback_.Run(traits); |
| 241 DCHECK_LT(index, worker_pool_params_vector_.size()); | 345 DCHECK_LT(index, worker_pool_params_vector_.size()); |
| 242 return new SchedulerSingleThreadTaskRunner( | 346 return new SchedulerSingleThreadTaskRunner( |
| 243 this, traits, | 347 this, traits, |
| 244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); | 348 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index], |
| 349 DelegateType::REGULAR)); | |
| 245 } | 350 } |
| 246 | 351 |
| 352 #if defined(OS_WIN) | |
| 353 scoped_refptr<SingleThreadTaskRunner> | |
| 354 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits( | |
| 355 const TaskTraits& traits) { | |
| 356 size_t index = worker_pool_index_for_traits_callback_.Run(traits); | |
| 357 DCHECK_LT(index, worker_pool_params_vector_.size()); | |
| 358 return new SchedulerSingleThreadTaskRunner( | |
| 359 this, traits, | |
| 360 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index], | |
| 361 DelegateType::COM_STA)); | |
| 362 } | |
| 363 #endif // defined(OS_WIN) | |
| 364 | |
| 247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { | 365 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { |
| 248 decltype(workers_) local_workers; | 366 decltype(workers_) local_workers; |
| 249 { | 367 { |
| 250 AutoSchedulerLock auto_lock(workers_lock_); | 368 AutoSchedulerLock auto_lock(workers_lock_); |
| 251 local_workers = std::move(workers_); | 369 local_workers = std::move(workers_); |
| 252 } | 370 } |
| 253 | 371 |
| 254 for (const auto& worker : local_workers) | 372 for (const auto& worker : local_workers) |
| 255 worker->JoinForTesting(); | 373 worker->JoinForTesting(); |
| 256 | 374 |
| 257 { | 375 { |
| 258 AutoSchedulerLock auto_lock(workers_lock_); | 376 AutoSchedulerLock auto_lock(workers_lock_); |
| 259 DCHECK(workers_.empty()) | 377 DCHECK(workers_.empty()) |
| 260 << "New worker(s) unexpectedly registered during join."; | 378 << "New worker(s) unexpectedly registered during join."; |
| 261 workers_ = std::move(local_workers); | 379 workers_ = std::move(local_workers); |
| 262 } | 380 } |
| 263 } | 381 } |
| 264 | 382 |
| 265 SchedulerWorker* | 383 SchedulerWorker* |
| 266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( | 384 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( |
| 267 const SchedulerWorkerPoolParams& params) { | 385 const SchedulerWorkerPoolParams& params, |
| 386 DelegateType delegate_type) { | |
| 268 AutoSchedulerLock auto_lock(workers_lock_); | 387 AutoSchedulerLock auto_lock(workers_lock_); |
| 269 int id = next_worker_id_++; | 388 int id = next_worker_id_++; |
| 270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( | 389 |
| 271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); | 390 std::unique_ptr<SchedulerWorkerDelegate> delegate; |
|
fdoray
2017/03/21 15:23:12
Optional:
- Instantiate the right type of delegate
robliao
2017/03/21 20:00:19
I guess this comes down to how much duplication we
gab
2017/03/21 21:09:34
Or we make CreateAndRegisterSchedulerWorker<T> a t
robliao
2017/03/21 22:25:32
That is an interesting idea, but SchedulerWorkerCO
robliao
2017/03/22 08:29:42
I think I just came up with a scheme to make this
| |
| 391 switch (delegate_type) { | |
| 392 case DelegateType::REGULAR: | |
| 393 delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( | |
| 394 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); | |
| 395 break; | |
| 396 #if defined(OS_WIN) | |
| 397 case DelegateType::COM_STA: | |
| 398 delegate = MakeUnique<SchedulerWorkerCOMDelegate>( | |
| 399 base::StringPrintf("TaskSchedulerSingleThreadWorker%d%sCOMSTA", id, | |
| 400 params.name().c_str()), | |
| 401 task_tracker_); | |
| 402 break; | |
| 403 #endif | |
| 404 } | |
| 405 | |
| 272 workers_.emplace_back(SchedulerWorker::Create( | 406 workers_.emplace_back(SchedulerWorker::Create( |
| 273 params.priority_hint(), std::move(delegate), task_tracker_, | 407 params.priority_hint(), std::move(delegate), task_tracker_, |
| 274 SchedulerWorker::InitialState::DETACHED)); | 408 SchedulerWorker::InitialState::DETACHED)); |
| 275 return workers_.back().get(); | 409 return workers_.back().get(); |
| 276 } | 410 } |
| 277 | 411 |
| 278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( | 412 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( |
| 279 SchedulerWorker* worker) { | 413 SchedulerWorker* worker) { |
| 280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing | 414 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing |
| 281 // |workers_lock_|. | 415 // |workers_lock_|. |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 299 }); | 433 }); |
| 300 DCHECK(worker_iter != workers_.end()); | 434 DCHECK(worker_iter != workers_.end()); |
| 301 worker_to_destroy = std::move(*worker_iter); | 435 worker_to_destroy = std::move(*worker_iter); |
| 302 workers_.erase(worker_iter); | 436 workers_.erase(worker_iter); |
| 303 } | 437 } |
| 304 worker_to_destroy->Cleanup(); | 438 worker_to_destroy->Cleanup(); |
| 305 } | 439 } |
| 306 | 440 |
| 307 } // namespace internal | 441 } // namespace internal |
| 308 } // namespace base | 442 } // namespace base |
| OLD | NEW |