Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(224)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2765703002: Add the COM STA Task Runner (Closed)
Patch Set: Nits Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <memory> 8 #include <memory>
9 #include <string> 9 #include <string>
10 #include <utility>
10 11
11 #include "base/bind.h" 12 #include "base/bind.h"
12 #include "base/callback.h" 13 #include "base/callback.h"
13 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h" 15 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h" 16 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h" 17 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h" 18 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h" 19 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h" 20 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h" 21 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h" 22 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h" 23 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h" 24 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h" 25 #include "base/time/time.h"
25 26
27 #if defined(OS_WIN)
28 #include <windows.h>
29
30 #include "base/win/scoped_com_initializer.h"
31 #endif // defined(OS_WIN)
32
26 namespace base { 33 namespace base {
27 namespace internal { 34 namespace internal {
28 35
29 namespace { 36 namespace {
30 37
31 // Allows for checking the PlatformThread::CurrentRef() against a set 38 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks. 39 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker { 40 class AtomicThreadRefChecker {
34 public: 41 public:
35 AtomicThreadRefChecker() = default; 42 AtomicThreadRefChecker() = default;
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 // Synchronizes access to |sequence_| and |has_work_|. 129 // Synchronizes access to |sequence_| and |has_work_|.
123 SchedulerLock sequence_lock_; 130 SchedulerLock sequence_lock_;
124 scoped_refptr<Sequence> sequence_ = new Sequence; 131 scoped_refptr<Sequence> sequence_ = new Sequence;
125 bool has_work_ = false; 132 bool has_work_ = false;
126 133
127 AtomicThreadRefChecker thread_ref_checker_; 134 AtomicThreadRefChecker thread_ref_checker_;
128 135
129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); 136 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
130 }; 137 };
131 138
139 #if defined(OS_WIN)
140
141 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
142 public:
143 SchedulerWorkerCOMDelegate(const std::string& thread_name,
144 TaskTracker* task_tracker)
145 : SchedulerWorkerDelegate(thread_name), task_tracker_(task_tracker) {}
146
147 ~SchedulerWorkerCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
148
149 // SchedulerWorker::Delegate:
150 void OnMainEntry(SchedulerWorker* worker) override {
151 SchedulerWorkerDelegate::OnMainEntry(worker);
152
153 scoped_com_initializer_ = MakeUnique<win::ScopedCOMInitializer>();
154 }
155
156 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
157 // This scheme below allows us to cover the following scenarios:
158 // * Only SchedulerWorkerDelegate::GetWork() has work:
159 // Always return the sequence from GetWork().
160 // * Only the Windows Message Queue has work:
161 // Always return the sequence from GetWorkFromWindowsMessageQueue();
162 // * Both SchedulerWorkerDelegate::GetWork() and the Windows Message Queue
163 // have work:
164 // Process sequences from each source round-robin style.
165 scoped_refptr<Sequence> sequence;
166 if (get_work_first_) {
167 sequence = SchedulerWorkerDelegate::GetWork(worker);
168 if (sequence)
169 get_work_first_ = false;
170 }
171
172 if (!sequence) {
173 sequence = GetWorkFromWindowsMessageQueue();
174 if (sequence)
175 get_work_first_ = true;
176 }
177
178 if (!sequence && !get_work_first_) {
179 // This case is important if we checked the Windows Message Queue first
180 // and found there was no work. We don't want to return null immediately
181 // as that could cause the thread to go to sleep while work is waiting via
182 // SchedulerWorkerDelegate::GetWork().
183 sequence = SchedulerWorkerDelegate::GetWork(worker);
184 }
185 return sequence;
186 }
187
188 void OnMainExit() override { scoped_com_initializer_.reset(); }
189
190 void WaitForWork(WaitableEvent* wake_up_event) override {
191 DCHECK(wake_up_event);
192 const TimeDelta sleep_time = GetSleepTimeout();
193 const DWORD milliseconds_wait =
194 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds();
195 HANDLE wake_up_event_handle = wake_up_event->handle();
196 DWORD result = MsgWaitForMultipleObjectsEx(
197 1, &wake_up_event_handle, milliseconds_wait, QS_ALLINPUT, 0);
198 if (result == WAIT_OBJECT_0) {
199 // Reset the event since we woke up due to it.
200 wake_up_event->Reset();
201 }
202 }
203
204 private:
205 scoped_refptr<Sequence> GetWorkFromWindowsMessageQueue() {
206 MSG msg;
207 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
208 auto pump_message_task =
209 MakeUnique<Task>(FROM_HERE,
210 Bind(
211 [](MSG msg) {
212 TranslateMessage(&msg);
213 DispatchMessage(&msg);
214 },
215 std::move(msg)),
216 TaskTraits().MayBlock(), TimeDelta());
217 if (task_tracker_->WillPostTask(pump_message_task.get())) {
218 bool was_empty =
219 message_pump_sequence_->PushTask(std::move(pump_message_task));
220 DCHECK(was_empty) << "GetWorkFromWindowsMessageQueue() does not expect "
221 "queueing of pump tasks.";
222 return message_pump_sequence_;
223 }
224 }
225 return nullptr;
226 }
227
228 bool get_work_first_ = true;
229 const scoped_refptr<Sequence> message_pump_sequence_ = new Sequence;
230 TaskTracker* const task_tracker_;
231 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
232
233 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate);
234 };
235
236 #endif // defined(OS_WIN)
237
132 } // namespace 238 } // namespace
133 239
134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner 240 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
135 : public SingleThreadTaskRunner { 241 : public SingleThreadTaskRunner {
136 public: 242 public:
137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the 243 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
138 // lifetime of a dedicated |worker| for |traits|. 244 // lifetime of a dedicated |worker| for |traits|.
139 SchedulerSingleThreadTaskRunner( 245 SchedulerSingleThreadTaskRunner(
140 SchedulerSingleThreadTaskRunnerManager* const outer, 246 SchedulerSingleThreadTaskRunnerManager* const outer,
141 const TaskTraits& traits, 247 const TaskTraits& traits,
(...skipping 18 matching lines...) Expand all
160 } else { 266 } else {
161 outer_->delayed_task_manager_->AddDelayedTask( 267 outer_->delayed_task_manager_->AddDelayedTask(
162 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow, 268 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow,
163 Unretained(this))); 269 Unretained(this)));
164 } 270 }
165 return true; 271 return true;
166 } 272 }
167 273
168 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 274 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
169 const Closure& closure, 275 const Closure& closure,
170 base::TimeDelta delay) override { 276 TimeDelta delay) override {
171 // Tasks are never nested within the task scheduler. 277 // Tasks are never nested within the task scheduler.
172 return PostDelayedTask(from_here, closure, delay); 278 return PostDelayedTask(from_here, closure, delay);
173 } 279 }
174 280
175 bool RunsTasksOnCurrentThread() const override { 281 bool RunsTasksOnCurrentThread() const override {
176 return GetDelegate()->RunsTasksOnCurrentThread(); 282 return GetDelegate()->RunsTasksOnCurrentThread();
177 } 283 }
178 284
179 private: 285 private:
180 ~SchedulerSingleThreadTaskRunner() override { 286 ~SchedulerSingleThreadTaskRunner() override {
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
230 subtle::NoBarrier_Load(&workers_unregistered_during_join_); 336 subtle::NoBarrier_Load(&workers_unregistered_during_join_);
231 DCHECK_EQ(workers_unregistered_during_join, workers_.size()) 337 DCHECK_EQ(workers_unregistered_during_join, workers_.size())
232 << "There cannot be outstanding SingleThreadTaskRunners upon destruction " 338 << "There cannot be outstanding SingleThreadTaskRunners upon destruction "
233 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler"; 339 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler";
234 #endif 340 #endif
235 } 341 }
236 342
237 scoped_refptr<SingleThreadTaskRunner> 343 scoped_refptr<SingleThreadTaskRunner>
238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( 344 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
239 const TaskTraits& traits) { 345 const TaskTraits& traits) {
240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); 346 return CreateSingleThreadTaskRunnerWithDelegate<SchedulerWorkerDelegate>(
241 DCHECK_LT(index, worker_pool_params_vector_.size()); 347 traits);
242 return new SchedulerSingleThreadTaskRunner(
243 this, traits,
244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
245 } 348 }
246 349
350 #if defined(OS_WIN)
351 scoped_refptr<SingleThreadTaskRunner>
352 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits(
353 const TaskTraits& traits) {
354 return CreateSingleThreadTaskRunnerWithDelegate<SchedulerWorkerCOMDelegate>(
355 traits);
356 }
357 #endif // defined(OS_WIN)
358
247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { 359 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
248 decltype(workers_) local_workers; 360 decltype(workers_) local_workers;
249 { 361 {
250 AutoSchedulerLock auto_lock(workers_lock_); 362 AutoSchedulerLock auto_lock(workers_lock_);
251 local_workers = std::move(workers_); 363 local_workers = std::move(workers_);
252 } 364 }
253 365
254 for (const auto& worker : local_workers) 366 for (const auto& worker : local_workers)
255 worker->JoinForTesting(); 367 worker->JoinForTesting();
256 368
257 { 369 {
258 AutoSchedulerLock auto_lock(workers_lock_); 370 AutoSchedulerLock auto_lock(workers_lock_);
259 DCHECK(workers_.empty()) 371 DCHECK(workers_.empty())
260 << "New worker(s) unexpectedly registered during join."; 372 << "New worker(s) unexpectedly registered during join.";
261 workers_ = std::move(local_workers); 373 workers_ = std::move(local_workers);
262 } 374 }
263 } 375 }
264 376
377 template <typename DelegateType>
378 scoped_refptr<SingleThreadTaskRunner> SchedulerSingleThreadTaskRunnerManager::
379 CreateSingleThreadTaskRunnerWithDelegate(const TaskTraits& traits) {
380 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
381 DCHECK_LT(index, worker_pool_params_vector_.size());
382 return new SchedulerSingleThreadTaskRunner(
383 this, traits,
384 CreateAndRegisterSchedulerWorker<DelegateType>(
385 worker_pool_params_vector_[index]));
386 }
387
388 template <>
389 std::unique_ptr<SchedulerWorkerDelegate>
390 SchedulerSingleThreadTaskRunnerManager::CreateSchedulerWorkerDelegate<
391 SchedulerWorkerDelegate>(const SchedulerWorkerPoolParams& params, int id) {
392 return MakeUnique<SchedulerWorkerDelegate>(StringPrintf(
393 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
394 }
395
396 #if defined(OS_WIN)
397 template <>
398 std::unique_ptr<SchedulerWorkerDelegate>
399 SchedulerSingleThreadTaskRunnerManager::CreateSchedulerWorkerDelegate<
400 SchedulerWorkerCOMDelegate>(const SchedulerWorkerPoolParams& params,
401 int id) {
402 return MakeUnique<SchedulerWorkerCOMDelegate>(
403 StringPrintf("TaskSchedulerSingleThreadWorker%d%sCOMSTA", id,
404 params.name().c_str()),
405 task_tracker_);
406 }
407 #endif // defined(OS_WIN)
408
409 template <typename DelegateType>
265 SchedulerWorker* 410 SchedulerWorker*
266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( 411 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
267 const SchedulerWorkerPoolParams& params) { 412 const SchedulerWorkerPoolParams& params) {
268 AutoSchedulerLock auto_lock(workers_lock_); 413 AutoSchedulerLock auto_lock(workers_lock_);
269 int id = next_worker_id_++; 414 int id = next_worker_id_++;
270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( 415
271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
272 workers_.emplace_back(SchedulerWorker::Create( 416 workers_.emplace_back(SchedulerWorker::Create(
273 params.priority_hint(), std::move(delegate), task_tracker_, 417 params.priority_hint(),
418 CreateSchedulerWorkerDelegate<DelegateType>(params, id), task_tracker_,
274 SchedulerWorker::InitialState::DETACHED)); 419 SchedulerWorker::InitialState::DETACHED));
275 return workers_.back().get(); 420 return workers_.back().get();
276 } 421 }
277 422
278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( 423 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
279 SchedulerWorker* worker) { 424 SchedulerWorker* worker) {
280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing 425 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
281 // |workers_lock_|. 426 // |workers_lock_|.
282 scoped_refptr<SchedulerWorker> worker_to_destroy; 427 scoped_refptr<SchedulerWorker> worker_to_destroy;
283 { 428 {
(...skipping 15 matching lines...) Expand all
299 }); 444 });
300 DCHECK(worker_iter != workers_.end()); 445 DCHECK(worker_iter != workers_.end());
301 worker_to_destroy = std::move(*worker_iter); 446 worker_to_destroy = std::move(*worker_iter);
302 workers_.erase(worker_iter); 447 workers_.erase(worker_iter);
303 } 448 }
304 worker_to_destroy->Cleanup(); 449 worker_to_destroy->Cleanup();
305 } 450 }
306 451
307 } // namespace internal 452 } // namespace internal
308 } // namespace base 453 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698