Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(128)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2765703002: Add the COM STA Task Runner (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <memory> 8 #include <memory>
9 #include <string> 9 #include <string>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/callback.h" 12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h" 16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h" 17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h" 18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h" 19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h" 20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h" 21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h" 22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h" 23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h" 24 #include "base/time/time.h"
25 #include "build/build_config.h"
fdoray 2017/03/21 15:23:12 Not needed if you include it in the .h.
robliao 2017/03/21 20:00:19 Done.
26
27 #if defined(OS_WIN)
28 #include <windows.h>
29
30 #include "base/win/scoped_com_initializer.h"
31 #endif
gab 2017/03/21 21:09:34 #endif // defined(OS_WIN)
robliao 2017/03/21 22:25:32 Ah, I thought we didn't want these for certain sho
gab 2017/03/22 16:16:04 Yeah, it's kind of an hand-wavy rule, for tight sc
25 32
26 namespace base { 33 namespace base {
27 namespace internal { 34 namespace internal {
28 35
29 namespace { 36 namespace {
30 37
31 // Allows for checking the PlatformThread::CurrentRef() against a set 38 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks. 39 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker { 40 class AtomicThreadRefChecker {
34 public: 41 public:
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 // Synchronizes access to |sequence_| and |has_work_|. 129 // Synchronizes access to |sequence_| and |has_work_|.
123 SchedulerLock sequence_lock_; 130 SchedulerLock sequence_lock_;
124 scoped_refptr<Sequence> sequence_ = new Sequence; 131 scoped_refptr<Sequence> sequence_ = new Sequence;
125 bool has_work_ = false; 132 bool has_work_ = false;
126 133
127 AtomicThreadRefChecker thread_ref_checker_; 134 AtomicThreadRefChecker thread_ref_checker_;
128 135
129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); 136 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
130 }; 137 };
131 138
139 #if defined(OS_WIN)
140
141 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
142 public:
143 SchedulerWorkerCOMDelegate(const std::string& thread_name,
144 TaskTracker* task_tracker)
145 : SchedulerWorkerDelegate(thread_name), task_tracker_(task_tracker) {}
fdoray 2017/03/21 15:23:12 ~SchedulerWorkerCOMDelegate() { DCHECK(!scoped_c
robliao 2017/03/21 20:00:20 Done.
146
147 // SchedulerWorker::Delegate:
148 void OnMainEntry(SchedulerWorker* worker) override {
149 SchedulerWorkerDelegate::OnMainEntry(worker);
150
151 scoped_com_initializer_ = MakeUnique<win::ScopedCOMInitializer>();
152 }
153
154 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
155 // This scheme below allows us to cover the following scenarios:
156 // * Tasks only come from SchedulerWorkerDelegate::GetWork():
gab 2017/03/21 21:09:34 s/only come/are only coming/ here and below? Fee
robliao 2017/03/21 22:25:32 Rephrased // This scheme below allows us to co
157 // Only return the sequence from GetWork().
158 // * Tasks only come from the Windows Message Queue:
159 // Only return the sequence from GetWorkFromWindowsMessageQueue();
160 // * Tasks come from both SchedulerWorkerDelegate::GetWork() and
161 // the Windows Message Queue:
162 // Process sequences from each source round-robin style.
163 scoped_refptr<Sequence> sequence;
164 if (get_work_first_) {
165 sequence = SchedulerWorkerDelegate::GetWork(worker);
166 if (sequence)
167 get_work_first_ = false;
168 }
169
170 if (!sequence) {
171 sequence = GetWorkFromWindowsMessageQueue();
172 if (sequence)
173 get_work_first_ = true;
174 }
175
176 if (!sequence && !get_work_first_) {
177 // This case is important if we checked the Windows Message Queue first
178 // and found there was no work. We don't want to return null immediately
179 // as that could cause the thread to go to sleep while work is waiting via
180 // SchedulerWorkerDelegate::GetWork(). As the same time, we don't want to
gab 2017/03/21 21:09:33 s/As/At/ (but overall I don't think this last sen
robliao 2017/03/21 22:25:32 Removed.
181 // mark |get_work_first_| to continue to check the message queue first
182 // after this sequence is returned.
183 sequence = SchedulerWorkerDelegate::GetWork(worker);
184 }
185 return sequence;
186 }
187
188 void OnMainExit() override { scoped_com_initializer_.reset(); }
189
190 void WaitForWork(WaitableEvent* wake_up_event) override {
191 DCHECK(wake_up_event);
192 const TimeDelta sleep_time = GetSleepTimeout();
193 const DWORD milliseconds_wait =
194 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds();
195 HANDLE wake_up_event_handle = wake_up_event->handle();
196 DWORD result = MsgWaitForMultipleObjectsEx(
197 1, &wake_up_event_handle, milliseconds_wait, QS_ALLINPUT, 0);
198 if (result == WAIT_OBJECT_0) {
199 // Reset the event since we woke up due to it.
200 wake_up_event->Reset();
201 }
202 }
fdoray 2017/03/21 15:23:12 void ReEnqueueSequence(scoped_refptr<Sequence> seq
robliao 2017/03/21 20:00:19 This check is already covered by SchedulerWorkerDe
203
204 private:
205 scoped_refptr<Sequence> GetWorkFromWindowsMessageQueue() {
206 MSG msg;
207 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
208 auto pump_message_task =
209 MakeUnique<Task>(FROM_HERE,
210 base::Bind(
211 [](MSG msg_in) {
212 MSG msg = msg_in;
gab 2017/03/21 21:09:34 Why do you need this extra variable?
robliao 2017/03/21 22:25:32 Because in the shuffle I lost a "const MSG& msg_in
gab 2017/03/22 16:16:04 sgtm
213 TranslateMessage(&msg);
214 DispatchMessage(&msg);
215 },
216 msg),
gab 2017/03/21 21:09:34 std::move(msg) ? Not sure if MSG is moveable but i
robliao 2017/03/21 22:25:32 sgtm. No qualms with that. It is just a struct und
217 TaskTraits().MayBlock(), TimeDelta());
218 if (task_tracker_->WillPostTask(pump_message_task.get())) {
gab 2017/03/21 21:09:33 Otherwise do we have to tell Windows we're droppin
robliao 2017/03/21 22:25:32 Generally, the only thing you can do with a messag
gab 2017/03/22 16:16:04 Got it, and I guess SendMessage would return with
robliao 2017/03/22 17:56:52 Hrm... that's a good question. One of the things a
219 message_pump_sequence_->PushTask(std::move(pump_message_task));
fdoray 2017/03/21 15:23:12 bool was_empty = message_pump_sequence_->PushTask(
robliao 2017/03/21 20:00:19 Done.
220 return message_pump_sequence_;
221 }
222 }
223 return nullptr;
224 }
225
226 bool get_work_first_ = true;
227 scoped_refptr<Sequence> message_pump_sequence_ = new Sequence;
fdoray 2017/03/21 15:23:12 >>const<< scoped_refptr<Sequence> message_pump_seq
robliao 2017/03/21 20:00:19 Done.
228 TaskTracker* const task_tracker_;
229 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
230
231 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate);
232 };
233
234 #endif // defined(OS_WIN)
235
132 } // namespace 236 } // namespace
133 237
134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner 238 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
135 : public SingleThreadTaskRunner { 239 : public SingleThreadTaskRunner {
136 public: 240 public:
137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the 241 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
138 // lifetime of a dedicated |worker| for |traits|. 242 // lifetime of a dedicated |worker| for |traits|.
139 SchedulerSingleThreadTaskRunner( 243 SchedulerSingleThreadTaskRunner(
140 SchedulerSingleThreadTaskRunnerManager* const outer, 244 SchedulerSingleThreadTaskRunnerManager* const outer,
141 const TaskTraits& traits, 245 const TaskTraits& traits,
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
234 #endif 338 #endif
235 } 339 }
236 340
237 scoped_refptr<SingleThreadTaskRunner> 341 scoped_refptr<SingleThreadTaskRunner>
238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( 342 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
239 const TaskTraits& traits) { 343 const TaskTraits& traits) {
240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); 344 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
241 DCHECK_LT(index, worker_pool_params_vector_.size()); 345 DCHECK_LT(index, worker_pool_params_vector_.size());
242 return new SchedulerSingleThreadTaskRunner( 346 return new SchedulerSingleThreadTaskRunner(
243 this, traits, 347 this, traits,
244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); 348 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index],
349 DelegateType::REGULAR));
245 } 350 }
246 351
352 #if defined(OS_WIN)
353 scoped_refptr<SingleThreadTaskRunner>
354 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits(
355 const TaskTraits& traits) {
356 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
357 DCHECK_LT(index, worker_pool_params_vector_.size());
358 return new SchedulerSingleThreadTaskRunner(
359 this, traits,
360 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index],
361 DelegateType::COM_STA));
362 }
363 #endif // defined(OS_WIN)
364
247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { 365 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
248 decltype(workers_) local_workers; 366 decltype(workers_) local_workers;
249 { 367 {
250 AutoSchedulerLock auto_lock(workers_lock_); 368 AutoSchedulerLock auto_lock(workers_lock_);
251 local_workers = std::move(workers_); 369 local_workers = std::move(workers_);
252 } 370 }
253 371
254 for (const auto& worker : local_workers) 372 for (const auto& worker : local_workers)
255 worker->JoinForTesting(); 373 worker->JoinForTesting();
256 374
257 { 375 {
258 AutoSchedulerLock auto_lock(workers_lock_); 376 AutoSchedulerLock auto_lock(workers_lock_);
259 DCHECK(workers_.empty()) 377 DCHECK(workers_.empty())
260 << "New worker(s) unexpectedly registered during join."; 378 << "New worker(s) unexpectedly registered during join.";
261 workers_ = std::move(local_workers); 379 workers_ = std::move(local_workers);
262 } 380 }
263 } 381 }
264 382
265 SchedulerWorker* 383 SchedulerWorker*
266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( 384 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
267 const SchedulerWorkerPoolParams& params) { 385 const SchedulerWorkerPoolParams& params,
386 DelegateType delegate_type) {
268 AutoSchedulerLock auto_lock(workers_lock_); 387 AutoSchedulerLock auto_lock(workers_lock_);
269 int id = next_worker_id_++; 388 int id = next_worker_id_++;
270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( 389
271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); 390 std::unique_ptr<SchedulerWorkerDelegate> delegate;
fdoray 2017/03/21 15:23:12 Optional: - Instantiate the right type of delegate
robliao 2017/03/21 20:00:19 I guess this comes down to how much duplication we
gab 2017/03/21 21:09:34 Or we make CreateAndRegisterSchedulerWorker<T> a t
robliao 2017/03/21 22:25:32 That is an interesting idea, but SchedulerWorkerCO
robliao 2017/03/22 08:29:42 I think I just came up with a scheme to make this
391 switch (delegate_type) {
392 case DelegateType::REGULAR:
393 delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
394 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
395 break;
396 #if defined(OS_WIN)
397 case DelegateType::COM_STA:
398 delegate = MakeUnique<SchedulerWorkerCOMDelegate>(
399 base::StringPrintf("TaskSchedulerSingleThreadWorker%d%sCOMSTA", id,
400 params.name().c_str()),
401 task_tracker_);
402 break;
403 #endif
404 }
405
272 workers_.emplace_back(SchedulerWorker::Create( 406 workers_.emplace_back(SchedulerWorker::Create(
273 params.priority_hint(), std::move(delegate), task_tracker_, 407 params.priority_hint(), std::move(delegate), task_tracker_,
274 SchedulerWorker::InitialState::DETACHED)); 408 SchedulerWorker::InitialState::DETACHED));
275 return workers_.back().get(); 409 return workers_.back().get();
276 } 410 }
277 411
278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( 412 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
279 SchedulerWorker* worker) { 413 SchedulerWorker* worker) {
280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing 414 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
281 // |workers_lock_|. 415 // |workers_lock_|.
(...skipping 17 matching lines...) Expand all
299 }); 433 });
300 DCHECK(worker_iter != workers_.end()); 434 DCHECK(worker_iter != workers_.end());
301 worker_to_destroy = std::move(*worker_iter); 435 worker_to_destroy = std::move(*worker_iter);
302 workers_.erase(worker_iter); 436 workers_.erase(worker_iter);
303 } 437 }
304 worker_to_destroy->Cleanup(); 438 worker_to_destroy->Cleanup();
305 } 439 }
306 440
307 } // namespace internal 441 } // namespace internal
308 } // namespace base 442 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698