Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2765703002: Add the COM STA Task Runner (Closed)
Patch Set: CR Feedback Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <memory> 8 #include <memory>
9 #include <string> 9 #include <string>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/callback.h" 12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h" 16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h" 17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h" 18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h" 19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h" 20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h" 21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h" 22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h" 23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h" 24 #include "base/time/time.h"
25 25
26 #if defined(OS_WIN)
27 #include <windows.h>
28
29 #include "base/win/scoped_com_initializer.h"
30 #endif // defined(OS_WIN)
31
26 namespace base { 32 namespace base {
27 namespace internal { 33 namespace internal {
28 34
29 namespace { 35 namespace {
30 36
31 // Allows for checking the PlatformThread::CurrentRef() against a set 37 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks. 38 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker { 39 class AtomicThreadRefChecker {
34 public: 40 public:
35 AtomicThreadRefChecker() = default; 41 AtomicThreadRefChecker() = default;
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 // Synchronizes access to |sequence_| and |has_work_|. 128 // Synchronizes access to |sequence_| and |has_work_|.
123 SchedulerLock sequence_lock_; 129 SchedulerLock sequence_lock_;
124 scoped_refptr<Sequence> sequence_ = new Sequence; 130 scoped_refptr<Sequence> sequence_ = new Sequence;
125 bool has_work_ = false; 131 bool has_work_ = false;
126 132
127 AtomicThreadRefChecker thread_ref_checker_; 133 AtomicThreadRefChecker thread_ref_checker_;
128 134
129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); 135 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
130 }; 136 };
131 137
138 #if defined(OS_WIN)
139
140 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
141 public:
142 SchedulerWorkerCOMDelegate(const std::string& thread_name,
143 TaskTracker* task_tracker)
144 : SchedulerWorkerDelegate(thread_name), task_tracker_(task_tracker) {}
145
146 ~SchedulerWorkerCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
147
148 // SchedulerWorker::Delegate:
149 void OnMainEntry(SchedulerWorker* worker) override {
150 SchedulerWorkerDelegate::OnMainEntry(worker);
151
152 scoped_com_initializer_ = MakeUnique<win::ScopedCOMInitializer>();
153 }
154
155 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
156 // This scheme below allows us to cover the following scenarios:
157 // * Only SchedulerWorkerDelegate::GetWork() has work:
158 // Always return the sequence from GetWork().
159 // * Only the Windows Message Queue has work:
160 // Always return the sequence from GetWorkFromWindowsMessageQueue();
161 // * Both SchedulerWorkerDelegate::GetWork() and the Windows Message Queue
162 // have work:
163 // Process sequences from each source round-robin style.
164 scoped_refptr<Sequence> sequence;
165 if (get_work_first_) {
166 sequence = SchedulerWorkerDelegate::GetWork(worker);
167 if (sequence)
168 get_work_first_ = false;
169 }
170
171 if (!sequence) {
172 sequence = GetWorkFromWindowsMessageQueue();
173 if (sequence)
174 get_work_first_ = true;
175 }
176
177 if (!sequence && !get_work_first_) {
178 // This case is important if we checked the Windows Message Queue first
179 // and found there was no work. We don't want to return null immediately
180 // as that could cause the thread to go to sleep while work is waiting via
181 // SchedulerWorkerDelegate::GetWork().
182 sequence = SchedulerWorkerDelegate::GetWork(worker);
183 }
184 return sequence;
185 }
186
187 void OnMainExit() override { scoped_com_initializer_.reset(); }
188
189 void WaitForWork(WaitableEvent* wake_up_event) override {
190 DCHECK(wake_up_event);
191 const TimeDelta sleep_time = GetSleepTimeout();
192 const DWORD milliseconds_wait =
193 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds();
194 HANDLE wake_up_event_handle = wake_up_event->handle();
195 DWORD result = MsgWaitForMultipleObjectsEx(
196 1, &wake_up_event_handle, milliseconds_wait, QS_ALLINPUT, 0);
197 if (result == WAIT_OBJECT_0) {
198 // Reset the event since we woke up due to it.
199 wake_up_event->Reset();
200 }
201 }
202
203 private:
204 scoped_refptr<Sequence> GetWorkFromWindowsMessageQueue() {
205 MSG msg;
206 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
207 auto pump_message_task =
208 MakeUnique<Task>(FROM_HERE,
209 base::Bind(
fdoray 2017/03/22 12:20:19 no base::
robliao 2017/03/22 17:56:52 Done.
210 [](MSG msg) {
211 TranslateMessage(&msg);
212 DispatchMessage(&msg);
213 },
214 std::move(msg)),
gab 2017/03/22 16:16:04 #include <utility>
robliao 2017/03/22 17:56:52 Done.
215 TaskTraits().MayBlock(), TimeDelta());
216 if (task_tracker_->WillPostTask(pump_message_task.get())) {
217 bool was_empty =
218 message_pump_sequence_->PushTask(std::move(pump_message_task));
219 DCHECK(was_empty) << "GetWorkFromWindowsMessageQueue() does not expect "
220 "queueing of pump tasks.";
221 return message_pump_sequence_;
222 }
223 }
224 return nullptr;
225 }
226
227 bool get_work_first_ = true;
228 const scoped_refptr<Sequence> message_pump_sequence_ = new Sequence;
229 TaskTracker* const task_tracker_;
230 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
231
232 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate);
233 };
234
235 #endif // defined(OS_WIN)
236
132 } // namespace 237 } // namespace
133 238
134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner 239 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
135 : public SingleThreadTaskRunner { 240 : public SingleThreadTaskRunner {
136 public: 241 public:
137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the 242 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
138 // lifetime of a dedicated |worker| for |traits|. 243 // lifetime of a dedicated |worker| for |traits|.
139 SchedulerSingleThreadTaskRunner( 244 SchedulerSingleThreadTaskRunner(
140 SchedulerSingleThreadTaskRunnerManager* const outer, 245 SchedulerSingleThreadTaskRunnerManager* const outer,
141 const TaskTraits& traits, 246 const TaskTraits& traits,
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
234 #endif 339 #endif
235 } 340 }
236 341
237 scoped_refptr<SingleThreadTaskRunner> 342 scoped_refptr<SingleThreadTaskRunner>
238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( 343 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
239 const TaskTraits& traits) { 344 const TaskTraits& traits) {
240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); 345 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
241 DCHECK_LT(index, worker_pool_params_vector_.size()); 346 DCHECK_LT(index, worker_pool_params_vector_.size());
242 return new SchedulerSingleThreadTaskRunner( 347 return new SchedulerSingleThreadTaskRunner(
243 this, traits, 348 this, traits,
244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); 349 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index],
350 DelegateType::DEFAULT));
245 } 351 }
246 352
353 #if defined(OS_WIN)
354 scoped_refptr<SingleThreadTaskRunner>
355 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits(
356 const TaskTraits& traits) {
357 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
358 DCHECK_LT(index, worker_pool_params_vector_.size());
359 return new SchedulerSingleThreadTaskRunner(
360 this, traits,
361 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index],
362 DelegateType::COM_STA));
363 }
364 #endif // defined(OS_WIN)
365
247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { 366 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
248 decltype(workers_) local_workers; 367 decltype(workers_) local_workers;
249 { 368 {
250 AutoSchedulerLock auto_lock(workers_lock_); 369 AutoSchedulerLock auto_lock(workers_lock_);
251 local_workers = std::move(workers_); 370 local_workers = std::move(workers_);
252 } 371 }
253 372
254 for (const auto& worker : local_workers) 373 for (const auto& worker : local_workers)
255 worker->JoinForTesting(); 374 worker->JoinForTesting();
256 375
257 { 376 {
258 AutoSchedulerLock auto_lock(workers_lock_); 377 AutoSchedulerLock auto_lock(workers_lock_);
259 DCHECK(workers_.empty()) 378 DCHECK(workers_.empty())
260 << "New worker(s) unexpectedly registered during join."; 379 << "New worker(s) unexpectedly registered during join.";
261 workers_ = std::move(local_workers); 380 workers_ = std::move(local_workers);
262 } 381 }
263 } 382 }
264 383
265 SchedulerWorker* 384 SchedulerWorker*
266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( 385 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
267 const SchedulerWorkerPoolParams& params) { 386 const SchedulerWorkerPoolParams& params,
387 DelegateType delegate_type) {
268 AutoSchedulerLock auto_lock(workers_lock_); 388 AutoSchedulerLock auto_lock(workers_lock_);
269 int id = next_worker_id_++; 389 int id = next_worker_id_++;
270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( 390
271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); 391 std::unique_ptr<SchedulerWorkerDelegate> delegate;
392 switch (delegate_type) {
393 case DelegateType::DEFAULT:
394 delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
fdoray 2017/03/22 12:20:19 no base::
robliao 2017/03/22 17:56:52 Done.
395 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
396 break;
397 #if defined(OS_WIN)
398 case DelegateType::COM_STA:
399 delegate = MakeUnique<SchedulerWorkerCOMDelegate>(
400 base::StringPrintf("TaskSchedulerSingleThreadWorker%d%sCOMSTA", id,
fdoray 2017/03/22 12:20:19 no base::
robliao 2017/03/22 17:56:52 Done.
401 params.name().c_str()),
402 task_tracker_);
403 break;
404 #endif
405 }
406
272 workers_.emplace_back(SchedulerWorker::Create( 407 workers_.emplace_back(SchedulerWorker::Create(
273 params.priority_hint(), std::move(delegate), task_tracker_, 408 params.priority_hint(), std::move(delegate), task_tracker_,
274 SchedulerWorker::InitialState::DETACHED)); 409 SchedulerWorker::InitialState::DETACHED));
275 return workers_.back().get(); 410 return workers_.back().get();
276 } 411 }
277 412
278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( 413 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
279 SchedulerWorker* worker) { 414 SchedulerWorker* worker) {
280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing 415 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
281 // |workers_lock_|. 416 // |workers_lock_|.
(...skipping 17 matching lines...) Expand all
299 }); 434 });
300 DCHECK(worker_iter != workers_.end()); 435 DCHECK(worker_iter != workers_.end());
301 worker_to_destroy = std::move(*worker_iter); 436 worker_to_destroy = std::move(*worker_iter);
302 workers_.erase(worker_iter); 437 workers_.erase(worker_iter);
303 } 438 }
304 worker_to_destroy->Cleanup(); 439 worker_to_destroy->Cleanup();
305 } 440 }
306 441
307 } // namespace internal 442 } // namespace internal
308 } // namespace base 443 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698