| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #ifndef COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ | 5 #ifndef COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ |
| 6 #define COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ | 6 #define COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ |
| 7 | 7 |
| 8 #include <stddef.h> | 8 #include <stddef.h> |
| 9 | 9 |
| 10 #include <deque> | 10 #include <deque> |
| 11 #include <memory> | 11 #include <memory> |
| 12 #include <set> | 12 #include <set> |
| 13 #include <utility> | 13 #include <utility> |
| 14 | 14 |
| 15 #include "base/bind.h" | 15 #include "base/bind.h" |
| 16 #include "base/callback.h" | 16 #include "base/callback.h" |
| 17 #include "base/macros.h" | 17 #include "base/macros.h" |
| 18 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
| 19 #include "base/memory/weak_ptr.h" | 19 #include "base/memory/weak_ptr.h" |
| 20 #include "base/threading/non_thread_safe.h" | 20 #include "base/sequence_checker.h" |
| 21 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
| 22 #include "base/time/time.h" | 22 #include "base/time/time.h" |
| 23 #include "base/timer/timer.h" | 23 #include "base/timer/timer.h" |
| 24 #include "net/base/backoff_entry.h" | 24 #include "net/base/backoff_entry.h" |
| 25 | 25 |
| 26 namespace syncer { | 26 namespace syncer { |
| 27 | 27 |
| 28 // A queue that dispatches tasks, ignores duplicates, and provides backoff | 28 // A queue that dispatches tasks, ignores duplicates, and provides backoff |
| 29 // semantics. | 29 // semantics. |
| 30 // | 30 // |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 62 // queue.MarkAsFailed(foo); | 62 // queue.MarkAsFailed(foo); |
| 63 // if (ShouldRetry(foo)) { | 63 // if (ShouldRetry(foo)) { |
| 64 // queue.AddToQueue(foo); | 64 // queue.AddToQueue(foo); |
| 65 // } | 65 // } |
| 66 // } else { | 66 // } else { |
| 67 // Cancel(foo); | 67 // Cancel(foo); |
| 68 // } | 68 // } |
| 69 // } | 69 // } |
| 70 // | 70 // |
| 71 template <typename T> | 71 template <typename T> |
| 72 class TaskQueue : base::NonThreadSafe { | 72 class TaskQueue { |
| 73 public: | 73 public: |
| 74 // A callback provided by users of the TaskQueue to handle tasks. | 74 // A callback provided by users of the TaskQueue to handle tasks. |
| 75 // | 75 // |
| 76 // This callback is invoked by the queue with a task to be handled. The | 76 // This callback is invoked by the queue with a task to be handled. The |
| 77 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, | 77 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, |
| 78 // or |Cancel| to signify completion of the task. | 78 // or |Cancel| to signify completion of the task. |
| 79 using HandleTaskCallback = base::Callback<void(const T&)>; | 79 using HandleTaskCallback = base::Callback<void(const T&)>; |
| 80 | 80 |
| 81 // Construct a TaskQueue. | 81 // Construct a TaskQueue. |
| 82 // | 82 // |
| 83 // |callback| the callback to be invoked for handling tasks. | 83 // |callback| the callback to be invoked for handling tasks. |
| 84 // | 84 // |
| 85 // |initial_backoff_delay| the initial amount of time the queue will wait | 85 // |initial_backoff_delay| the initial amount of time the queue will wait |
| 86 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be | 86 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be |
| 87 // zero. Subsequent failures will increase the delay up to | 87 // zero. Subsequent failures will increase the delay up to |
| 88 // |max_backoff_delay|. | 88 // |max_backoff_delay|. |
| 89 // | 89 // |
| 90 // |max_backoff_delay| the maximum amount of time the queue will wait before | 90 // |max_backoff_delay| the maximum amount of time the queue will wait before |
| 91 // dispatching tasks. May be zero. Must be greater than or equal to | 91 // dispatching tasks. May be zero. Must be greater than or equal to |
| 92 // |initial_backoff_delay|. | 92 // |initial_backoff_delay|. |
| 93 TaskQueue(const HandleTaskCallback& callback, | 93 TaskQueue(const HandleTaskCallback& callback, |
| 94 const base::TimeDelta& initial_backoff_delay, | 94 const base::TimeDelta& initial_backoff_delay, |
| 95 const base::TimeDelta& max_backoff_delay); | 95 const base::TimeDelta& max_backoff_delay); |
| 96 | 96 |
| 97 ~TaskQueue(); |
| 98 |
| 97 // Add |task| to the end of the queue. | 99 // Add |task| to the end of the queue. |
| 98 // | 100 // |
| 99 // If |task| is already present (as determined by operator==) it is not added. | 101 // If |task| is already present (as determined by operator==) it is not added. |
| 100 void AddToQueue(const T& task); | 102 void AddToQueue(const T& task); |
| 101 | 103 |
| 102 // Mark |task| as completing successfully. | 104 // Mark |task| as completing successfully. |
| 103 // | 105 // |
| 104 // Marking a task as completing successfully will reduce or eliminate any | 106 // Marking a task as completing successfully will reduce or eliminate any |
| 105 // backoff delay in effect. | 107 // backoff delay in effect. |
| 106 // | 108 // |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 150 std::unique_ptr<net::BackoffEntry> backoff_entry_; | 152 std::unique_ptr<net::BackoffEntry> backoff_entry_; |
| 151 // The number of tasks currently being handled. | 153 // The number of tasks currently being handled. |
| 152 int num_in_progress_; | 154 int num_in_progress_; |
| 153 std::deque<T> queue_; | 155 std::deque<T> queue_; |
| 154 // The set of tasks in queue_ or currently being handled. | 156 // The set of tasks in queue_ or currently being handled. |
| 155 std::set<T> tasks_; | 157 std::set<T> tasks_; |
| 156 base::Closure dispatch_closure_; | 158 base::Closure dispatch_closure_; |
| 157 std::unique_ptr<base::Timer> backoff_timer_; | 159 std::unique_ptr<base::Timer> backoff_timer_; |
| 158 base::TimeDelta delay_; | 160 base::TimeDelta delay_; |
| 159 | 161 |
| 162 SEQUENCE_CHECKER(sequence_checker_); |
| 163 |
| 160 // Must be last data member. | 164 // Must be last data member. |
| 161 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; | 165 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; |
| 162 | 166 |
| 163 DISALLOW_COPY_AND_ASSIGN(TaskQueue); | 167 DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
| 164 }; | 168 }; |
| 165 | 169 |
| 166 // The maximum number of tasks that may be concurrently executed. Think | 170 // The maximum number of tasks that may be concurrently executed. Think |
| 167 // carefully before changing this value. The desired behavior of backoff may | 171 // carefully before changing this value. The desired behavior of backoff may |
| 168 // not be obvious when there is more than one concurrent task | 172 // not be obvious when there is more than one concurrent task |
| 169 const int kMaxConcurrentTasks = 1; | 173 const int kMaxConcurrentTasks = 1; |
| (...skipping 14 matching lines...) Expand all Loading... |
| 184 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); | 188 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); |
| 185 backoff_policy_.entry_lifetime_ms = -1; | 189 backoff_policy_.entry_lifetime_ms = -1; |
| 186 backoff_policy_.always_use_initial_delay = false; | 190 backoff_policy_.always_use_initial_delay = false; |
| 187 backoff_entry_ = base::MakeUnique<net::BackoffEntry>(&backoff_policy_); | 191 backoff_entry_ = base::MakeUnique<net::BackoffEntry>(&backoff_policy_); |
| 188 dispatch_closure_ = | 192 dispatch_closure_ = |
| 189 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); | 193 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); |
| 190 backoff_timer_ = base::MakeUnique<base::Timer>(false, false); | 194 backoff_timer_ = base::MakeUnique<base::Timer>(false, false); |
| 191 } | 195 } |
| 192 | 196 |
| 193 template <typename T> | 197 template <typename T> |
| 198 TaskQueue<T>::~TaskQueue() { |
| 199 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 200 } |
| 201 |
| 202 template <typename T> |
| 194 void TaskQueue<T>::AddToQueue(const T& task) { | 203 void TaskQueue<T>::AddToQueue(const T& task) { |
| 195 DCHECK(CalledOnValidThread()); | 204 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 196 // Ignore duplicates. | 205 // Ignore duplicates. |
| 197 if (tasks_.find(task) == tasks_.end()) { | 206 if (tasks_.find(task) == tasks_.end()) { |
| 198 queue_.push_back(task); | 207 queue_.push_back(task); |
| 199 tasks_.insert(task); | 208 tasks_.insert(task); |
| 200 } | 209 } |
| 201 ScheduleDispatch(); | 210 ScheduleDispatch(); |
| 202 } | 211 } |
| 203 | 212 |
| 204 template <typename T> | 213 template <typename T> |
| 205 void TaskQueue<T>::MarkAsSucceeded(const T& task) { | 214 void TaskQueue<T>::MarkAsSucceeded(const T& task) { |
| 206 DCHECK(CalledOnValidThread()); | 215 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 207 FinishTask(task); | 216 FinishTask(task); |
| 208 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and | 217 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and |
| 209 // reschedule a dispatch. | 218 // reschedule a dispatch. |
| 210 backoff_timer_->Stop(); | 219 backoff_timer_->Stop(); |
| 211 backoff_entry_->Reset(); | 220 backoff_entry_->Reset(); |
| 212 ScheduleDispatch(); | 221 ScheduleDispatch(); |
| 213 } | 222 } |
| 214 | 223 |
| 215 template <typename T> | 224 template <typename T> |
| 216 void TaskQueue<T>::MarkAsFailed(const T& task) { | 225 void TaskQueue<T>::MarkAsFailed(const T& task) { |
| 217 DCHECK(CalledOnValidThread()); | 226 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 218 FinishTask(task); | 227 FinishTask(task); |
| 219 backoff_entry_->InformOfRequest(false); | 228 backoff_entry_->InformOfRequest(false); |
| 220 ScheduleDispatch(); | 229 ScheduleDispatch(); |
| 221 } | 230 } |
| 222 | 231 |
| 223 template <typename T> | 232 template <typename T> |
| 224 void TaskQueue<T>::Cancel(const T& task) { | 233 void TaskQueue<T>::Cancel(const T& task) { |
| 225 DCHECK(CalledOnValidThread()); | 234 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 226 FinishTask(task); | 235 FinishTask(task); |
| 227 ScheduleDispatch(); | 236 ScheduleDispatch(); |
| 228 } | 237 } |
| 229 | 238 |
| 230 template <typename T> | 239 template <typename T> |
| 231 void TaskQueue<T>::ResetBackoff() { | 240 void TaskQueue<T>::ResetBackoff() { |
| 232 backoff_timer_->Stop(); | 241 backoff_timer_->Stop(); |
| 233 backoff_entry_->Reset(); | 242 backoff_entry_->Reset(); |
| 234 ScheduleDispatch(); | 243 ScheduleDispatch(); |
| 235 } | 244 } |
| 236 | 245 |
| 237 template <typename T> | 246 template <typename T> |
| 238 void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) { | 247 void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) { |
| 239 DCHECK(CalledOnValidThread()); | 248 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 240 DCHECK(timer.get()); | 249 DCHECK(timer.get()); |
| 241 backoff_timer_ = std::move(timer); | 250 backoff_timer_ = std::move(timer); |
| 242 } | 251 } |
| 243 | 252 |
| 244 template <typename T> | 253 template <typename T> |
| 245 void TaskQueue<T>::FinishTask(const T& task) { | 254 void TaskQueue<T>::FinishTask(const T& task) { |
| 246 DCHECK(CalledOnValidThread()); | 255 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 247 DCHECK_GE(num_in_progress_, 1); | 256 DCHECK_GE(num_in_progress_, 1); |
| 248 --num_in_progress_; | 257 --num_in_progress_; |
| 249 const size_t num_erased = tasks_.erase(task); | 258 const size_t num_erased = tasks_.erase(task); |
| 250 DCHECK_EQ(1U, num_erased); | 259 DCHECK_EQ(1U, num_erased); |
| 251 } | 260 } |
| 252 | 261 |
| 253 template <typename T> | 262 template <typename T> |
| 254 void TaskQueue<T>::ScheduleDispatch() { | 263 void TaskQueue<T>::ScheduleDispatch() { |
| 255 DCHECK(CalledOnValidThread()); | 264 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 256 if (backoff_timer_->IsRunning() || !ShouldDispatch()) { | 265 if (backoff_timer_->IsRunning() || !ShouldDispatch()) { |
| 257 return; | 266 return; |
| 258 } | 267 } |
| 259 | 268 |
| 260 backoff_timer_->Start(FROM_HERE, backoff_entry_->GetTimeUntilRelease(), | 269 backoff_timer_->Start(FROM_HERE, backoff_entry_->GetTimeUntilRelease(), |
| 261 dispatch_closure_); | 270 dispatch_closure_); |
| 262 } | 271 } |
| 263 | 272 |
| 264 template <typename T> | 273 template <typename T> |
| 265 void TaskQueue<T>::Dispatch() { | 274 void TaskQueue<T>::Dispatch() { |
| 266 DCHECK(CalledOnValidThread()); | 275 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| 267 if (!ShouldDispatch()) { | 276 if (!ShouldDispatch()) { |
| 268 return; | 277 return; |
| 269 } | 278 } |
| 270 | 279 |
| 271 DCHECK(!queue_.empty()); | 280 DCHECK(!queue_.empty()); |
| 272 const T& task = queue_.front(); | 281 const T& task = queue_.front(); |
| 273 ++num_in_progress_; | 282 ++num_in_progress_; |
| 274 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); | 283 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); |
| 275 base::ThreadTaskRunnerHandle::Get()->PostTask( | 284 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 276 FROM_HERE, base::Bind(process_callback_, task)); | 285 FROM_HERE, base::Bind(process_callback_, task)); |
| 277 queue_.pop_front(); | 286 queue_.pop_front(); |
| 278 } | 287 } |
| 279 | 288 |
| 280 template <typename T> | 289 template <typename T> |
| 281 bool TaskQueue<T>::ShouldDispatch() { | 290 bool TaskQueue<T>::ShouldDispatch() { |
| 282 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); | 291 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); |
| 283 } | 292 } |
| 284 | 293 |
| 285 } // namespace syncer | 294 } // namespace syncer |
| 286 | 295 |
| 287 #endif // COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ | 296 #endif // COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_ |
| OLD | NEW |