Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: components/sync/model_impl/attachments/task_queue.h

Issue 2915453002: Deprecate NonThreadSafe in components/sync in favor of SequenceChecker. (Closed)
Patch Set: fix comment Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ 5 #ifndef COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_
6 #define COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ 6 #define COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_
7 7
8 #include <stddef.h> 8 #include <stddef.h>
9 9
10 #include <deque> 10 #include <deque>
11 #include <memory> 11 #include <memory>
12 #include <set> 12 #include <set>
13 #include <utility> 13 #include <utility>
14 14
15 #include "base/bind.h" 15 #include "base/bind.h"
16 #include "base/callback.h" 16 #include "base/callback.h"
17 #include "base/macros.h" 17 #include "base/macros.h"
18 #include "base/memory/ptr_util.h" 18 #include "base/memory/ptr_util.h"
19 #include "base/memory/weak_ptr.h" 19 #include "base/memory/weak_ptr.h"
20 #include "base/threading/non_thread_safe.h" 20 #include "base/sequence_checker.h"
21 #include "base/threading/thread_task_runner_handle.h" 21 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/time/time.h" 22 #include "base/time/time.h"
23 #include "base/timer/timer.h" 23 #include "base/timer/timer.h"
24 #include "net/base/backoff_entry.h" 24 #include "net/base/backoff_entry.h"
25 25
26 namespace syncer { 26 namespace syncer {
27 27
28 // A queue that dispatches tasks, ignores duplicates, and provides backoff 28 // A queue that dispatches tasks, ignores duplicates, and provides backoff
29 // semantics. 29 // semantics.
30 // 30 //
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 // queue.MarkAsFailed(foo); 62 // queue.MarkAsFailed(foo);
63 // if (ShouldRetry(foo)) { 63 // if (ShouldRetry(foo)) {
64 // queue.AddToQueue(foo); 64 // queue.AddToQueue(foo);
65 // } 65 // }
66 // } else { 66 // } else {
67 // Cancel(foo); 67 // Cancel(foo);
68 // } 68 // }
69 // } 69 // }
70 // 70 //
71 template <typename T> 71 template <typename T>
72 class TaskQueue : base::NonThreadSafe { 72 class TaskQueue {
73 public: 73 public:
74 // A callback provided by users of the TaskQueue to handle tasks. 74 // A callback provided by users of the TaskQueue to handle tasks.
75 // 75 //
76 // This callback is invoked by the queue with a task to be handled. The 76 // This callback is invoked by the queue with a task to be handled. The
77 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, 77 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
78 // or |Cancel| to signify completion of the task. 78 // or |Cancel| to signify completion of the task.
79 using HandleTaskCallback = base::Callback<void(const T&)>; 79 using HandleTaskCallback = base::Callback<void(const T&)>;
80 80
81 // Construct a TaskQueue. 81 // Construct a TaskQueue.
82 // 82 //
83 // |callback| the callback to be invoked for handling tasks. 83 // |callback| the callback to be invoked for handling tasks.
84 // 84 //
85 // |initial_backoff_delay| the initial amount of time the queue will wait 85 // |initial_backoff_delay| the initial amount of time the queue will wait
86 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be 86 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be
87 // zero. Subsequent failures will increase the delay up to 87 // zero. Subsequent failures will increase the delay up to
88 // |max_backoff_delay|. 88 // |max_backoff_delay|.
89 // 89 //
90 // |max_backoff_delay| the maximum amount of time the queue will wait before 90 // |max_backoff_delay| the maximum amount of time the queue will wait before
91 // dispatching tasks. May be zero. Must be greater than or equal to 91 // dispatching tasks. May be zero. Must be greater than or equal to
92 // |initial_backoff_delay|. 92 // |initial_backoff_delay|.
93 TaskQueue(const HandleTaskCallback& callback, 93 TaskQueue(const HandleTaskCallback& callback,
94 const base::TimeDelta& initial_backoff_delay, 94 const base::TimeDelta& initial_backoff_delay,
95 const base::TimeDelta& max_backoff_delay); 95 const base::TimeDelta& max_backoff_delay);
96 96
97 ~TaskQueue();
98
97 // Add |task| to the end of the queue. 99 // Add |task| to the end of the queue.
98 // 100 //
99 // If |task| is already present (as determined by operator==) it is not added. 101 // If |task| is already present (as determined by operator==) it is not added.
100 void AddToQueue(const T& task); 102 void AddToQueue(const T& task);
101 103
102 // Mark |task| as completing successfully. 104 // Mark |task| as completing successfully.
103 // 105 //
104 // Marking a task as completing successfully will reduce or eliminate any 106 // Marking a task as completing successfully will reduce or eliminate any
105 // backoff delay in effect. 107 // backoff delay in effect.
106 // 108 //
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 std::unique_ptr<net::BackoffEntry> backoff_entry_; 152 std::unique_ptr<net::BackoffEntry> backoff_entry_;
151 // The number of tasks currently being handled. 153 // The number of tasks currently being handled.
152 int num_in_progress_; 154 int num_in_progress_;
153 std::deque<T> queue_; 155 std::deque<T> queue_;
154 // The set of tasks in queue_ or currently being handled. 156 // The set of tasks in queue_ or currently being handled.
155 std::set<T> tasks_; 157 std::set<T> tasks_;
156 base::Closure dispatch_closure_; 158 base::Closure dispatch_closure_;
157 std::unique_ptr<base::Timer> backoff_timer_; 159 std::unique_ptr<base::Timer> backoff_timer_;
158 base::TimeDelta delay_; 160 base::TimeDelta delay_;
159 161
162 SEQUENCE_CHECKER(sequence_checker_);
163
160 // Must be last data member. 164 // Must be last data member.
161 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; 165 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
162 166
163 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 167 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
164 }; 168 };
165 169
166 // The maximum number of tasks that may be concurrently executed. Think 170 // The maximum number of tasks that may be concurrently executed. Think
167 // carefully before changing this value. The desired behavior of backoff may 171 // carefully before changing this value. The desired behavior of backoff may
168 // not be obvious when there is more than one concurrent task 172 // not be obvious when there is more than one concurrent task
169 const int kMaxConcurrentTasks = 1; 173 const int kMaxConcurrentTasks = 1;
(...skipping 14 matching lines...) Expand all
184 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); 188 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
185 backoff_policy_.entry_lifetime_ms = -1; 189 backoff_policy_.entry_lifetime_ms = -1;
186 backoff_policy_.always_use_initial_delay = false; 190 backoff_policy_.always_use_initial_delay = false;
187 backoff_entry_ = base::MakeUnique<net::BackoffEntry>(&backoff_policy_); 191 backoff_entry_ = base::MakeUnique<net::BackoffEntry>(&backoff_policy_);
188 dispatch_closure_ = 192 dispatch_closure_ =
189 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); 193 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
190 backoff_timer_ = base::MakeUnique<base::Timer>(false, false); 194 backoff_timer_ = base::MakeUnique<base::Timer>(false, false);
191 } 195 }
192 196
193 template <typename T> 197 template <typename T>
198 TaskQueue<T>::~TaskQueue() {
199 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
200 }
201
202 template <typename T>
194 void TaskQueue<T>::AddToQueue(const T& task) { 203 void TaskQueue<T>::AddToQueue(const T& task) {
195 DCHECK(CalledOnValidThread()); 204 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
196 // Ignore duplicates. 205 // Ignore duplicates.
197 if (tasks_.find(task) == tasks_.end()) { 206 if (tasks_.find(task) == tasks_.end()) {
198 queue_.push_back(task); 207 queue_.push_back(task);
199 tasks_.insert(task); 208 tasks_.insert(task);
200 } 209 }
201 ScheduleDispatch(); 210 ScheduleDispatch();
202 } 211 }
203 212
204 template <typename T> 213 template <typename T>
205 void TaskQueue<T>::MarkAsSucceeded(const T& task) { 214 void TaskQueue<T>::MarkAsSucceeded(const T& task) {
206 DCHECK(CalledOnValidThread()); 215 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
207 FinishTask(task); 216 FinishTask(task);
208 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and 217 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
209 // reschedule a dispatch. 218 // reschedule a dispatch.
210 backoff_timer_->Stop(); 219 backoff_timer_->Stop();
211 backoff_entry_->Reset(); 220 backoff_entry_->Reset();
212 ScheduleDispatch(); 221 ScheduleDispatch();
213 } 222 }
214 223
215 template <typename T> 224 template <typename T>
216 void TaskQueue<T>::MarkAsFailed(const T& task) { 225 void TaskQueue<T>::MarkAsFailed(const T& task) {
217 DCHECK(CalledOnValidThread()); 226 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
218 FinishTask(task); 227 FinishTask(task);
219 backoff_entry_->InformOfRequest(false); 228 backoff_entry_->InformOfRequest(false);
220 ScheduleDispatch(); 229 ScheduleDispatch();
221 } 230 }
222 231
223 template <typename T> 232 template <typename T>
224 void TaskQueue<T>::Cancel(const T& task) { 233 void TaskQueue<T>::Cancel(const T& task) {
225 DCHECK(CalledOnValidThread()); 234 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
226 FinishTask(task); 235 FinishTask(task);
227 ScheduleDispatch(); 236 ScheduleDispatch();
228 } 237 }
229 238
230 template <typename T> 239 template <typename T>
231 void TaskQueue<T>::ResetBackoff() { 240 void TaskQueue<T>::ResetBackoff() {
232 backoff_timer_->Stop(); 241 backoff_timer_->Stop();
233 backoff_entry_->Reset(); 242 backoff_entry_->Reset();
234 ScheduleDispatch(); 243 ScheduleDispatch();
235 } 244 }
236 245
237 template <typename T> 246 template <typename T>
238 void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) { 247 void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) {
239 DCHECK(CalledOnValidThread()); 248 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
240 DCHECK(timer.get()); 249 DCHECK(timer.get());
241 backoff_timer_ = std::move(timer); 250 backoff_timer_ = std::move(timer);
242 } 251 }
243 252
244 template <typename T> 253 template <typename T>
245 void TaskQueue<T>::FinishTask(const T& task) { 254 void TaskQueue<T>::FinishTask(const T& task) {
246 DCHECK(CalledOnValidThread()); 255 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
247 DCHECK_GE(num_in_progress_, 1); 256 DCHECK_GE(num_in_progress_, 1);
248 --num_in_progress_; 257 --num_in_progress_;
249 const size_t num_erased = tasks_.erase(task); 258 const size_t num_erased = tasks_.erase(task);
250 DCHECK_EQ(1U, num_erased); 259 DCHECK_EQ(1U, num_erased);
251 } 260 }
252 261
253 template <typename T> 262 template <typename T>
254 void TaskQueue<T>::ScheduleDispatch() { 263 void TaskQueue<T>::ScheduleDispatch() {
255 DCHECK(CalledOnValidThread()); 264 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
256 if (backoff_timer_->IsRunning() || !ShouldDispatch()) { 265 if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
257 return; 266 return;
258 } 267 }
259 268
260 backoff_timer_->Start(FROM_HERE, backoff_entry_->GetTimeUntilRelease(), 269 backoff_timer_->Start(FROM_HERE, backoff_entry_->GetTimeUntilRelease(),
261 dispatch_closure_); 270 dispatch_closure_);
262 } 271 }
263 272
264 template <typename T> 273 template <typename T>
265 void TaskQueue<T>::Dispatch() { 274 void TaskQueue<T>::Dispatch() {
266 DCHECK(CalledOnValidThread()); 275 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
267 if (!ShouldDispatch()) { 276 if (!ShouldDispatch()) {
268 return; 277 return;
269 } 278 }
270 279
271 DCHECK(!queue_.empty()); 280 DCHECK(!queue_.empty());
272 const T& task = queue_.front(); 281 const T& task = queue_.front();
273 ++num_in_progress_; 282 ++num_in_progress_;
274 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); 283 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
275 base::ThreadTaskRunnerHandle::Get()->PostTask( 284 base::ThreadTaskRunnerHandle::Get()->PostTask(
276 FROM_HERE, base::Bind(process_callback_, task)); 285 FROM_HERE, base::Bind(process_callback_, task));
277 queue_.pop_front(); 286 queue_.pop_front();
278 } 287 }
279 288
280 template <typename T> 289 template <typename T>
281 bool TaskQueue<T>::ShouldDispatch() { 290 bool TaskQueue<T>::ShouldDispatch() {
282 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); 291 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
283 } 292 }
284 293
285 } // namespace syncer 294 } // namespace syncer
286 295
287 #endif // COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ 296 #endif // COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_
OLDNEW
« no previous file with comments | « components/sync/model_impl/attachments/attachment_service_impl.cc ('k') | components/sync/model_impl/model_type_store_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698