Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1738)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2762703002: FOR REFERENCE ONLY Task Scheduler COM Task Runner (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h" 5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <memory> 8 #include <memory>
9 #include <string> 9 #include <string>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/callback.h" 12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h" 16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h" 17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h" 18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h" 19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h" 20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h" 21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h" 22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h" 23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h" 24 #include "base/time/time.h"
25 #include "build/build_config.h"
26
27 #if defined(OS_WIN)
28 #include <windows.h>
29
30 #include "base/win/scoped_com_initializer.h"
31 #endif
25 32
26 namespace base { 33 namespace base {
27 namespace internal { 34 namespace internal {
28 35
29 namespace { 36 namespace {
30 37
31 // Allows for checking the PlatformThread::CurrentRef() against a set 38 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks. 39 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker { 40 class AtomicThreadRefChecker {
34 public: 41 public:
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 // Synchronizes access to |sequence_| and |has_work_|. 129 // Synchronizes access to |sequence_| and |has_work_|.
123 SchedulerLock sequence_lock_; 130 SchedulerLock sequence_lock_;
124 scoped_refptr<Sequence> sequence_ = new Sequence; 131 scoped_refptr<Sequence> sequence_ = new Sequence;
125 bool has_work_ = false; 132 bool has_work_ = false;
126 133
127 AtomicThreadRefChecker thread_ref_checker_; 134 AtomicThreadRefChecker thread_ref_checker_;
128 135
129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate); 136 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
130 }; 137 };
131 138
139 #if defined(OS_WIN)
140
141 void PumpOneMessage() {
142 MSG msg;
143 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
144 TranslateMessage(&msg);
145 DispatchMessage(&msg);
146 }
147 }
148
149 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
150 public:
151 SchedulerWorkerCOMDelegate(const std::string& thread_name)
152 : SchedulerWorkerDelegate(thread_name) {}
153
154 // SchedulerWorker::Delegate:
155 void OnMainEntry(SchedulerWorker* worker) override {
156 SchedulerWorkerDelegate::OnMainEntry(worker);
157
158 scoped_com_initializer_ = MakeUnique<win::ScopedCOMInitializer>();
159 }
160
161 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
162 pump_one_message_ = !pump_one_message_;
163 if (pump_one_message_) {
164 message_pump_sequence_->PushTask(MakeUnique<Task>(
165 FROM_HERE, base::Bind(&PumpOneMessage), TaskTraits().MayBlock(),
166 TimeDelta(), Task::TaskType::INTERNAL));
167 return message_pump_sequence_;
168 }
169 return SchedulerWorkerDelegate::GetWork(worker);
170 }
171
172 void OnMainExit() override { scoped_com_initializer_.reset(); }
173
174 void WaitForWork(WaitableEvent* wake_up_event) override {
175 DCHECK(wake_up_event);
176 const TimeDelta sleep_time = GetSleepTimeout();
177 const DWORD milliseconds_wait =
178 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds();
179 HANDLE wake_up_event_handle = wake_up_event->handle();
180 DWORD result = MsgWaitForMultipleObjectsEx(
181 1, &wake_up_event_handle, milliseconds_wait, QS_ALLEVENTS, 0);
182 if (result == WAIT_OBJECT_0) {
183 // Reset the event since we woke up due to it.
184 wake_up_event->Reset();
185 }
186 }
187
188 private:
189 bool pump_one_message_ = true;
190 scoped_refptr<Sequence> message_pump_sequence_ = new Sequence;
191 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
192
193 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate);
194 };
195
196 #endif // defined(OS_WIN)
197
132 } // namespace 198 } // namespace
133 199
134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner 200 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
135 : public SingleThreadTaskRunner { 201 : public SingleThreadTaskRunner {
136 public: 202 public:
137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the 203 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
138 // lifetime of a dedicated |worker| for |traits|. 204 // lifetime of a dedicated |worker| for |traits|.
139 SchedulerSingleThreadTaskRunner( 205 SchedulerSingleThreadTaskRunner(
140 SchedulerSingleThreadTaskRunnerManager* const outer, 206 SchedulerSingleThreadTaskRunnerManager* const outer,
141 const TaskTraits& traits, 207 const TaskTraits& traits,
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
234 #endif 300 #endif
235 } 301 }
236 302
237 scoped_refptr<SingleThreadTaskRunner> 303 scoped_refptr<SingleThreadTaskRunner>
238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits( 304 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
239 const TaskTraits& traits) { 305 const TaskTraits& traits) {
240 size_t index = worker_pool_index_for_traits_callback_.Run(traits); 306 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
241 DCHECK_LT(index, worker_pool_params_vector_.size()); 307 DCHECK_LT(index, worker_pool_params_vector_.size());
242 return new SchedulerSingleThreadTaskRunner( 308 return new SchedulerSingleThreadTaskRunner(
243 this, traits, 309 this, traits,
244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index])); 310 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index],
311 DelegateType::REGULAR));
312 }
313
314 scoped_refptr<SingleThreadTaskRunner>
315 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits(
316 const TaskTraits& traits) {
317 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
318 DCHECK_LT(index, worker_pool_params_vector_.size());
319 return new SchedulerSingleThreadTaskRunner(
320 this, traits,
321 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index],
322 DelegateType::COM_STA));
245 } 323 }
246 324
247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() { 325 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
248 decltype(workers_) local_workers; 326 decltype(workers_) local_workers;
249 { 327 {
250 AutoSchedulerLock auto_lock(workers_lock_); 328 AutoSchedulerLock auto_lock(workers_lock_);
251 local_workers = std::move(workers_); 329 local_workers = std::move(workers_);
252 } 330 }
253 331
254 for (const auto& worker : local_workers) 332 for (const auto& worker : local_workers)
255 worker->JoinForTesting(); 333 worker->JoinForTesting();
256 334
257 { 335 {
258 AutoSchedulerLock auto_lock(workers_lock_); 336 AutoSchedulerLock auto_lock(workers_lock_);
259 DCHECK(workers_.empty()) 337 DCHECK(workers_.empty())
260 << "New worker(s) unexpectedly registered during join."; 338 << "New worker(s) unexpectedly registered during join.";
261 workers_ = std::move(local_workers); 339 workers_ = std::move(local_workers);
262 } 340 }
263 } 341 }
264 342
265 SchedulerWorker* 343 SchedulerWorker*
266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker( 344 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
267 const SchedulerWorkerPoolParams& params) { 345 const SchedulerWorkerPoolParams& params,
346 DelegateType delegate_type) {
268 AutoSchedulerLock auto_lock(workers_lock_); 347 AutoSchedulerLock auto_lock(workers_lock_);
269 int id = next_worker_id_++; 348 int id = next_worker_id_++;
270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf( 349
271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str())); 350 auto delegate =
351 delegate_type == DelegateType::COM_STA
352 ? MakeUnique<SchedulerWorkerCOMDelegate>(
353 base::StringPrintf("TaskSchedulerSingleThreadWorker%d%sCOMSTA",
354 id, params.name().c_str()))
355 : MakeUnique<SchedulerWorkerDelegate>(
356 base::StringPrintf("TaskSchedulerSingleThreadWorker%d%s", id,
357 params.name().c_str()));
272 workers_.emplace_back(SchedulerWorker::Create( 358 workers_.emplace_back(SchedulerWorker::Create(
273 params.priority_hint(), std::move(delegate), task_tracker_, 359 params.priority_hint(), std::move(delegate), task_tracker_,
274 SchedulerWorker::InitialState::DETACHED)); 360 SchedulerWorker::InitialState::DETACHED));
275 return workers_.back().get(); 361 return workers_.back().get();
276 } 362 }
277 363
278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker( 364 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
279 SchedulerWorker* worker) { 365 SchedulerWorker* worker) {
280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing 366 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
281 // |workers_lock_|. 367 // |workers_lock_|.
(...skipping 17 matching lines...) Expand all
299 }); 385 });
300 DCHECK(worker_iter != workers_.end()); 386 DCHECK(worker_iter != workers_.end());
301 worker_to_destroy = std::move(*worker_iter); 387 worker_to_destroy = std::move(*worker_iter);
302 workers_.erase(worker_iter); 388 workers_.erase(worker_iter);
303 } 389 }
304 worker_to_destroy->Cleanup(); 390 worker_to_destroy->Cleanup();
305 } 391 }
306 392
307 } // namespace internal 393 } // namespace internal
308 } // namespace base 394 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698