OLD | NEW |
(Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ |
| 6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ |
| 7 |
| 8 #include <deque> |
| 9 #include <set> |
| 10 |
| 11 #include "base/bind.h" |
| 12 #include "base/callback.h" |
| 13 #include "base/gtest_prod_util.h" |
| 14 #include "base/macros.h" |
| 15 #include "base/memory/weak_ptr.h" |
| 16 #include "base/message_loop/message_loop.h" |
| 17 #include "base/threading/non_thread_safe.h" |
| 18 #include "base/time/time.h" |
| 19 #include "base/timer/timer.h" |
| 20 #include "net/base/backoff_entry.h" |
| 21 |
| 22 namespace syncer { |
| 23 |
| 24 // A queue that dispatches tasks, ignores duplicates, and provides backoff |
| 25 // semantics. |
| 26 // |
| 27 // |T| is the task type. |
| 28 // |
| 29 // For each task added to the queue, the HandleTaskCallback will eventually be |
| 30 // invoked. For each invocation, the user of TaskQueue must call exactly one of |
| 31 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|. |
| 32 // |
| 33 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task). |
| 34 // |
| 35 // Example usage: |
| 36 // |
| 37 // void Handle(const Foo& foo); |
| 38 // ... |
| 39 // TaskQueue<Foo> queue(base::Bind(&Handle), |
| 40 // base::TimeDelta::FromSeconds(1), |
| 41 // base::TimeDelta::FromMinutes(1)); |
| 42 // ... |
| 43 // { |
| 44 // Foo foo; |
| 45 // // Add foo to the queue. At some point, Handle will be invoked in this |
| 46 // // message loop. |
| 47 // queue.AddToQueue(foo); |
| 48 // } |
| 49 // ... |
| 50 // void Handle(const Foo& foo) { |
| 51 // DoSomethingWith(foo); |
| 52 // // We must call one of the three methods to tell the queue how we're |
| 53 // // dealing with foo. Of course, we are free to call in the the context of |
| 54 // // this HandleTaskCallback or outside the context if we so choose. |
| 55 // if (SuccessfullyHandled(foo)) { |
| 56 // queue.MarkAsSucceeded(foo); |
| 57 // } else if (Failed(foo)) { |
| 58 // queue.MarkAsFailed(foo); |
| 59 // if (ShouldRetry(foo)) { |
| 60 // queue.AddToQueue(foo); |
| 61 // } |
| 62 // } else { |
| 63 // Cancel(foo); |
| 64 // } |
| 65 // } |
| 66 // |
| 67 template <typename T> |
| 68 class TaskQueue : base::NonThreadSafe { |
| 69 public: |
| 70 // A callback provided by users of the TaskQueue to handle tasks. |
| 71 // |
| 72 // This callback is invoked by the queue with a task to be handled. The |
| 73 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, |
| 74 // or |Cancel| to signify completion of the task. |
| 75 typedef base::Callback<void(const T&)> HandleTaskCallback; |
| 76 |
| 77 // Construct a TaskQueue. |
| 78 // |
| 79 // |callback| the callback to be invoked for handling tasks. |
| 80 // |
| 81 // |initial_backoff_delay| the initial amount of time the queue will wait |
| 82 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be |
| 83 // zero. Subsequent failures will increase the delay up to |
| 84 // |max_backoff_delay|. |
| 85 // |
| 86 // |max_backoff_delay| the maximum amount of time the queue will wait before |
| 87 // dispatching tasks. May be zero. Must be greater than or equal to |
| 88 // |initial_backoff_delay|. |
| 89 TaskQueue(const HandleTaskCallback& callback, |
| 90 const base::TimeDelta& initial_backoff_delay, |
| 91 const base::TimeDelta& max_backoff_delay); |
| 92 |
| 93 // Add |task| to the end of the queue. |
| 94 // |
| 95 // If |task| is already present (as determined by operator==) it is not added. |
| 96 void AddToQueue(const T& task); |
| 97 |
| 98 // Mark |task| as completing successfully. |
| 99 // |
| 100 // Marking a task as completing successfully will reduce or eliminate any |
| 101 // backoff delay in effect. |
| 102 // |
| 103 // May only be called after the HandleTaskCallback has been invoked with |
| 104 // |task|. |
| 105 void MarkAsSucceeded(const T& task); |
| 106 |
| 107 // Mark |task| as failed. |
| 108 // |
| 109 // Marking a task as failed will cause a backoff, i.e. a delay in dispatching |
| 110 // of subsequent tasks. Repeated failures will increase the delay. |
| 111 // |
| 112 // May only be called after the HandleTaskCallback has been invoked with |
| 113 // |task|. |
| 114 void MarkAsFailed(const T& task); |
| 115 |
| 116 // Cancel |task|. |
| 117 // |
| 118 // |task| is removed from the queue and will not be retried. Does not affect |
| 119 // the backoff delay. |
| 120 // |
| 121 // May only be called after the HandleTaskCallback has been invoked with |
| 122 // |task|. |
| 123 void Cancel(const T& task); |
| 124 |
| 125 private: |
| 126 FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry); |
| 127 |
| 128 // Use |timer| for scheduled events. |
| 129 // |
| 130 // Used in tests. See also MockTimer. |
| 131 void SetTimerForTest(scoped_ptr<base::Timer> timer); |
| 132 void FinishTask(const T& task); |
| 133 void ScheduleDispatch(); |
| 134 void Dispatch(); |
| 135 // Return true if we should dispatch tasks. |
| 136 bool ShouldDispatch(); |
| 137 |
| 138 const HandleTaskCallback process_callback_; |
| 139 net::BackoffEntry::Policy backoff_policy_; |
| 140 scoped_ptr<net::BackoffEntry> backoff_entry_; |
| 141 // The number of tasks currently being handled. |
| 142 int num_in_progress_; |
| 143 std::deque<T> queue_; |
| 144 // The set of tasks in queue_ or currently being handled. |
| 145 std::set<T> tasks_; |
| 146 base::Closure dispatch_closure_; |
| 147 scoped_ptr<base::Timer> backoff_timer_; |
| 148 base::TimeDelta delay_; |
| 149 |
| 150 // Must be last data member. |
| 151 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; |
| 152 |
| 153 DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
| 154 }; |
| 155 |
| 156 // The maximum number of tasks that may be concurrently executed. Think |
| 157 // carefully before changing this value. The desired behavior of backoff may |
| 158 // not be obvious when there is more than one concurrent task |
| 159 const int kMaxConcurrentTasks = 1; |
| 160 |
| 161 template <typename T> |
| 162 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback, |
| 163 const base::TimeDelta& initial_backoff_delay, |
| 164 const base::TimeDelta& max_backoff_delay) |
| 165 : process_callback_(callback), |
| 166 backoff_policy_({}), |
| 167 num_in_progress_(0), |
| 168 weak_ptr_factory_(this) { |
| 169 DCHECK_LE(initial_backoff_delay.InMicroseconds(), |
| 170 max_backoff_delay.InMicroseconds()); |
| 171 backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds(); |
| 172 backoff_policy_.multiply_factor = 2.0; |
| 173 backoff_policy_.jitter_factor = 0.1; |
| 174 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); |
| 175 backoff_policy_.entry_lifetime_ms = -1; |
| 176 backoff_policy_.always_use_initial_delay = false; |
| 177 backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_)); |
| 178 dispatch_closure_ = |
| 179 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); |
| 180 backoff_timer_.reset(new base::Timer(false, false)); |
| 181 } |
| 182 |
| 183 template <typename T> |
| 184 void TaskQueue<T>::AddToQueue(const T& task) { |
| 185 DCHECK(CalledOnValidThread()); |
| 186 // Ignore duplicates. |
| 187 if (tasks_.find(task) == tasks_.end()) { |
| 188 queue_.push_back(task); |
| 189 tasks_.insert(task); |
| 190 } |
| 191 ScheduleDispatch(); |
| 192 } |
| 193 |
| 194 template <typename T> |
| 195 void TaskQueue<T>::MarkAsSucceeded(const T& task) { |
| 196 DCHECK(CalledOnValidThread()); |
| 197 FinishTask(task); |
| 198 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and |
| 199 // reschedule a dispatch. |
| 200 backoff_timer_->Stop(); |
| 201 backoff_entry_->Reset(); |
| 202 ScheduleDispatch(); |
| 203 } |
| 204 |
| 205 template <typename T> |
| 206 void TaskQueue<T>::MarkAsFailed(const T& task) { |
| 207 DCHECK(CalledOnValidThread()); |
| 208 FinishTask(task); |
| 209 backoff_entry_->InformOfRequest(false); |
| 210 ScheduleDispatch(); |
| 211 } |
| 212 |
| 213 template <typename T> |
| 214 void TaskQueue<T>::Cancel(const T& task) { |
| 215 DCHECK(CalledOnValidThread()); |
| 216 FinishTask(task); |
| 217 ScheduleDispatch(); |
| 218 } |
| 219 |
| 220 template <typename T> |
| 221 void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) { |
| 222 DCHECK(CalledOnValidThread()); |
| 223 DCHECK(timer.get()); |
| 224 backoff_timer_ = timer.Pass(); |
| 225 } |
| 226 |
| 227 template <typename T> |
| 228 void TaskQueue<T>::FinishTask(const T& task) { |
| 229 DCHECK(CalledOnValidThread()); |
| 230 DCHECK_GE(num_in_progress_, 1); |
| 231 --num_in_progress_; |
| 232 const size_t num_erased = tasks_.erase(task); |
| 233 DCHECK_EQ(1U, num_erased); |
| 234 } |
| 235 |
| 236 template <typename T> |
| 237 void TaskQueue<T>::ScheduleDispatch() { |
| 238 DCHECK(CalledOnValidThread()); |
| 239 if (backoff_timer_->IsRunning() || !ShouldDispatch()) { |
| 240 return; |
| 241 } |
| 242 |
| 243 backoff_timer_->Start( |
| 244 FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_); |
| 245 } |
| 246 |
| 247 template <typename T> |
| 248 void TaskQueue<T>::Dispatch() { |
| 249 DCHECK(CalledOnValidThread()); |
| 250 if (!ShouldDispatch()) { |
| 251 return; |
| 252 } |
| 253 |
| 254 DCHECK(!queue_.empty()); |
| 255 const T& task = queue_.front(); |
| 256 ++num_in_progress_; |
| 257 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); |
| 258 base::MessageLoop::current()->PostTask(FROM_HERE, |
| 259 base::Bind(process_callback_, task)); |
| 260 queue_.pop_front(); |
| 261 } |
| 262 |
| 263 template <typename T> |
| 264 bool TaskQueue<T>::ShouldDispatch() { |
| 265 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); |
| 266 } |
| 267 |
| 268 } // namespace syncer |
| 269 |
| 270 #endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ |
OLD | NEW |