Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(543)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.h

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ 5 #ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
6 #define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ 6 #define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
7 7
8 #include <stddef.h> 8 #include <stddef.h>
9 9
10 #include <memory> 10 #include <memory>
(...skipping 12 matching lines...) Expand all
23 #include "base/task_scheduler/scheduler_lock.h" 23 #include "base/task_scheduler/scheduler_lock.h"
24 #include "base/task_scheduler/scheduler_worker.h" 24 #include "base/task_scheduler/scheduler_worker.h"
25 #include "base/task_scheduler/scheduler_worker_pool.h" 25 #include "base/task_scheduler/scheduler_worker_pool.h"
26 #include "base/task_scheduler/scheduler_worker_stack.h" 26 #include "base/task_scheduler/scheduler_worker_stack.h"
27 #include "base/task_scheduler/sequence.h" 27 #include "base/task_scheduler/sequence.h"
28 #include "base/task_scheduler/task.h" 28 #include "base/task_scheduler/task.h"
29 #include "base/task_scheduler/task_traits.h" 29 #include "base/task_scheduler/task_traits.h"
30 #include "base/threading/platform_thread.h" 30 #include "base/threading/platform_thread.h"
31 31
32 namespace base { 32 namespace base {
33
34 class TimeDelta;
35
33 namespace internal { 36 namespace internal {
34 37
35 class DelayedTaskManager; 38 class DelayedTaskManager;
36 class TaskTracker; 39 class TaskTracker;
37 40
38 // A pool of workers that run Tasks. This class is thread-safe. 41 // A pool of workers that run Tasks. This class is thread-safe.
39 class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { 42 class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
40 public: 43 public:
41 enum class IORestriction { 44 enum class IORestriction {
42 ALLOWED, 45 ALLOWED,
43 DISALLOWED, 46 DISALLOWED,
44 }; 47 };
45 48
46 // Callback invoked when a Sequence isn't empty after a worker pops a Task 49 // Callback invoked when a Sequence isn't empty after a worker pops a Task
47 // from it. 50 // from it.
48 using ReEnqueueSequenceCallback = Callback<void(scoped_refptr<Sequence>)>; 51 using ReEnqueueSequenceCallback = Callback<void(scoped_refptr<Sequence>)>;
49 52
50 // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in 53 // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in
51 // production; it is always leaked. In tests, it can only be destroyed after 54 // production; it is always leaked. In tests, it can only be destroyed after
52 // JoinForTesting() has returned. 55 // JoinForTesting() has returned.
53 ~SchedulerWorkerPoolImpl() override; 56 ~SchedulerWorkerPoolImpl() override;
54 57
55 // Creates a SchedulerWorkerPoolImpl labeled |name| with up to |max_threads| 58 // Creates a SchedulerWorkerPoolImpl labeled |name| with up to |max_threads|
56 // threads of priority |thread_priority|. |io_restriction| indicates whether 59 // threads of priority |thread_priority|. |io_restriction| indicates whether
57 // Tasks on the constructed worker pool are allowed to make I/O calls. 60 // Tasks on the constructed worker pool are allowed to make I/O calls.
58 // |re_enqueue_sequence_callback| will be invoked after a worker of this 61 // |suggested_reclaim_time| sets a suggestion on when to reclaim idle threads.
59 // worker pool tries to run a Task. |task_tracker| is used to handle shutdown 62 // The worker pool is free to ignore this value for performance or correctness
60 // behavior of Tasks. |delayed_task_manager| handles Tasks posted with a 63 // reasons. |re_enqueue_sequence_callback| will be invoked after a worker of
61 // delay. Returns nullptr on failure to create a worker pool with at least one 64 // this worker pool tries to run a Task. |task_tracker| is used to handle
62 // thread. 65 // shutdown behavior of Tasks. |delayed_task_manager| handles Tasks posted
66 // with a delay. Returns nullptr on failure to create a worker pool with at
67 // least one thread.
63 static std::unique_ptr<SchedulerWorkerPoolImpl> Create( 68 static std::unique_ptr<SchedulerWorkerPoolImpl> Create(
64 StringPiece name, 69 StringPiece name,
65 ThreadPriority thread_priority, 70 ThreadPriority thread_priority,
66 size_t max_threads, 71 size_t max_threads,
67 IORestriction io_restriction, 72 IORestriction io_restriction,
73 const TimeDelta& suggested_reclaim_time,
68 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 74 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
69 TaskTracker* task_tracker, 75 TaskTracker* task_tracker,
70 DelayedTaskManager* delayed_task_manager); 76 DelayedTaskManager* delayed_task_manager);
71 77
72 // Waits until all workers are idle. 78 // Waits until all workers are idle.
73 void WaitForAllWorkersIdleForTesting(); 79 void WaitForAllWorkersIdleForTesting();
74 80
75 // Joins all workers of this worker pool. Tasks that are already running are 81 // Joins all workers of this worker pool. Tasks that are already running are
76 // allowed to complete their execution. This can only be called once. 82 // allowed to complete their execution. This can only be called once.
77 void JoinForTesting(); 83 void JoinForTesting();
78 84
79 // SchedulerWorkerPool: 85 // SchedulerWorkerPool:
80 scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits( 86 scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
81 const TaskTraits& traits, 87 const TaskTraits& traits,
82 ExecutionMode execution_mode) override; 88 ExecutionMode execution_mode) override;
83 void ReEnqueueSequence(scoped_refptr<Sequence> sequence, 89 void ReEnqueueSequence(scoped_refptr<Sequence> sequence,
84 const SequenceSortKey& sequence_sort_key) override; 90 const SequenceSortKey& sequence_sort_key) override;
85 bool PostTaskWithSequence(std::unique_ptr<Task> task, 91 bool PostTaskWithSequence(std::unique_ptr<Task> task,
86 scoped_refptr<Sequence> sequence, 92 scoped_refptr<Sequence> sequence,
87 SchedulerWorker* worker) override; 93 SchedulerWorker* worker) override;
88 void PostTaskWithSequenceNow(std::unique_ptr<Task> task, 94 void PostTaskWithSequenceNow(std::unique_ptr<Task> task,
89 scoped_refptr<Sequence> sequence, 95 scoped_refptr<Sequence> sequence,
90 SchedulerWorker* worker) override; 96 SchedulerWorker* worker) override;
91 97
98 void RegisterSingleThreadTaskRunner(SchedulerWorker* worker);
99 void UnregisterSingleThreadTaskRunner(SchedulerWorker* worker);
fdoray 2016/07/04 19:41:32 I would would be better if these methods weren't h
robliao 2016/07/07 17:49:16 Done.
100
92 private: 101 private:
93 class SchedulerWorkerDelegateImpl; 102 class SchedulerWorkerDelegateImpl;
94 103
95 SchedulerWorkerPoolImpl(StringPiece name, 104 SchedulerWorkerPoolImpl(StringPiece name,
96 IORestriction io_restriction, 105 IORestriction io_restriction,
97 TaskTracker* task_tracker, 106 TaskTracker* task_tracker,
98 DelayedTaskManager* delayed_task_manager); 107 DelayedTaskManager* delayed_task_manager);
99 108
100 bool Initialize( 109 bool Initialize(
101 ThreadPriority thread_priority, 110 ThreadPriority thread_priority,
102 size_t max_threads, 111 size_t max_threads,
112 const TimeDelta& suggested_reclaim_time,
103 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback); 113 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback);
104 114
105 // Wakes up the last worker from this worker pool to go idle, if any. 115 // Wakes up the last worker from this worker pool to go idle, if any.
106 void WakeUpOneWorker(); 116 void WakeUpOneWorker();
107 117
108 // Adds |worker| to |idle_workers_stack_|. 118 // Adds |worker| to |idle_workers_stack_|.
109 void AddToIdleWorkersStack(SchedulerWorker* worker); 119 void AddToIdleWorkersStack(SchedulerWorker* worker);
110 120
121 // Peeks from |idle_workers_stack_|.
122 const SchedulerWorker* PeekAtIdleWorkersStack() const;
123
111 // Removes |worker| from |idle_workers_stack_|. 124 // Removes |worker| from |idle_workers_stack_|.
112 void RemoveFromIdleWorkersStack(SchedulerWorker* worker); 125 void RemoveFromIdleWorkersStack(SchedulerWorker* worker);
113 126
127 // True if JoinForTesting() has been called.
128 bool HasJoinedForTesting();
129
114 // The name of this worker pool, used to label its worker threads. 130 // The name of this worker pool, used to label its worker threads.
115 const std::string name_; 131 const std::string name_;
116 132
117 // All worker owned by this worker pool. Only modified during initialization 133 // All worker owned by this worker pool. Only modified during initialization
118 // of the worker pool. 134 // of the worker pool.
119 std::vector<std::unique_ptr<SchedulerWorker>> workers_; 135 std::vector<std::unique_ptr<SchedulerWorker>> workers_;
120 136
121 // Synchronizes access to |next_worker_index_|. 137 // Synchronizes access to |next_worker_index_|.
122 SchedulerLock next_worker_index_lock_; 138 SchedulerLock next_worker_index_lock_;
123 139
124 // Index of the worker that will be assigned to the next single-threaded 140 // Index of the worker that will be assigned to the next single-threaded
125 // TaskRunner returned by this pool. 141 // TaskRunner returned by this pool.
126 size_t next_worker_index_ = 0; 142 size_t next_worker_index_ = 0;
127 143
128 // PriorityQueue from which all threads of this worker pool get work. 144 // PriorityQueue from which all threads of this worker pool get work.
129 PriorityQueue shared_priority_queue_; 145 PriorityQueue shared_priority_queue_;
130 146
131 // Indicates whether Tasks on this worker pool are allowed to make I/O calls. 147 // Indicates whether Tasks on this worker pool are allowed to make I/O calls.
132 const IORestriction io_restriction_; 148 const IORestriction io_restriction_;
133 149
134 // Synchronizes access to |idle_workers_stack_| and 150 // Synchronizes access to |idle_workers_stack_| and
135 // |idle_workers_stack_cv_for_testing_|. Has |shared_priority_queue_|'s 151 // |idle_workers_stack_cv_for_testing_|. Has |shared_priority_queue_|'s
136 // lock as its predecessor so that a worker can be pushed to 152 // lock as its predecessor so that a worker can be pushed to
137 // |idle_workers_stack_| within the scope of a Transaction (more 153 // |idle_workers_stack_| within the scope of a Transaction (more
138 // details in GetWork()). 154 // details in GetWork()).
139 SchedulerLock idle_workers_stack_lock_; 155 mutable SchedulerLock idle_workers_stack_lock_;
140 156
141 // Stack of idle workers. 157 // Stack of idle workers.
142 SchedulerWorkerStack idle_workers_stack_; 158 SchedulerWorkerStack idle_workers_stack_;
143 159
144 // Signaled when all workers become idle. 160 // Signaled when all workers become idle.
145 std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_; 161 std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_;
146 162
147 // Signaled once JoinForTesting() has returned. 163 // Signaled once JoinForTesting() has returned.
148 WaitableEvent join_for_testing_returned_; 164 WaitableEvent join_for_testing_returned_;
149 165
166 // Synchronizes access to |join_for_testing_called_|.
167 SchedulerLock join_for_testing_called_lock_;
fdoray 2016/07/04 19:41:32 Elsewhere in base/task_scheduler, we use a manual
robliao 2016/07/07 17:49:16 Here, that seems like an abuse of the WaitableEven
fdoray 2016/07/07 20:45:57 It would be nice to have an AtomicBool class. Sinc
gab 2016/07/13 21:07:55 We have CancellationFlag. It's name is a bit too s
gab 2016/07/20 01:45:21 ping ^^
robliao 2016/07/20 19:44:00 Yup. I see your pending change for the atomic flag
gab 2016/07/20 20:26:31 Which change? It sure isn't mine or I code in my s
robliao 2016/07/20 22:18:03 Ah, I guess I misread it and it was fdoray's https
168
169 // Indicates to the delegates that JoinForTesting() has been called.
170 bool join_for_testing_called_;
171
150 #if DCHECK_IS_ON() 172 #if DCHECK_IS_ON()
151 // Signaled when all workers have been created. 173 // Signaled when all workers have been created.
152 WaitableEvent workers_created_; 174 WaitableEvent workers_created_;
153 #endif 175 #endif
154 176
155 TaskTracker* const task_tracker_; 177 TaskTracker* const task_tracker_;
156 DelayedTaskManager* const delayed_task_manager_; 178 DelayedTaskManager* const delayed_task_manager_;
157 179
158 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl); 180 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl);
159 }; 181 };
160 182
161 } // namespace internal 183 } // namespace internal
162 } // namespace base 184 } // namespace base
163 185
164 #endif // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ 186 #endif // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698