Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(661)

Side by Side Diff: sync/internal_api/public/attachments/task_queue.h

Issue 554743004: Update AttachmentServiceImpl to retry attachment uploads. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Check ShouldDispatch in Dispatch. Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
7
8 #include <deque>
9 #include <set>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/gtest_prod_util.h"
14 #include "base/macros.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/time/time.h"
19 #include "base/timer/timer.h"
20 #include "net/base/backoff_entry.h"
21
22 namespace syncer {
23
24 // A queue that dispatches tasks, ignores duplicates, and provides retry with
25 // backoff semantics.
26 //
27 // |T| is the task type.
28 template <typename T>
pavely 2014/09/09 19:42:23 It would be useful to put comment with example of
maniscalco 2014/09/09 21:15:44 Good idea. Done.
29 class TaskQueue : base::NonThreadSafe {
30 public:
31 // A callback provided by users of the TaskQueue to handle tasks.
32 //
33 // This callback is invoked by the queue with a task to be handled. The
34 // callee is expected to (eventually) call |MarkAsSucceeded| or |Cancel| to
35 // signify completion of the task, or call |Retry| if the task should be
36 // retried later.
37 typedef base::Callback<void(const T&)> HandleTaskCallback;
38
39 // Construct a TaskQueue.
40 //
41 // |callback| the callback to be invoked for handling tasks.
42 //
43 // |initial_backoff_delay| the initial amount of time the queue will wait
44 // before redispatching |Retry|'d tasks. May be zero.
45 //
46 // |max_backoff_delay| the maximum amount of time the queue will wait before
47 // redispatching |Retry|'d tasks. May be zero. Must be greater than or equal
48 // to |initial_backoff_delay|.
49 TaskQueue(const HandleTaskCallback& callback,
50 const base::TimeDelta& initial_backoff_delay,
51 const base::TimeDelta& max_backoff_delay);
52
53 // Add |task| to the end of the queue.
54 //
55 // If |task| is already present (as determined by operator==) it is not added.
56 void AddToQueue(const T& task);
57
58 // Mark |task| as completing successfully.
59 //
60 // Marking as task as completing successfully will reduce or eliminate any
61 // backoff delay in effect.
62 //
63 // May only be called after the HandleTaskCallback has been invoked with
64 // |task|.
65 void MarkAsSucceeded(const T& task);
66
67 // Cancel |task|.
68 //
69 // |task| is removed from the queue and will not be retried. Does not affect
70 // the backoff delay.
71 //
72 // May only be called after the HandleTaskCallback has been invoked with
73 // |task|.
74 void Cancel(const T& task);
75
76 // Retry |task|.
77 //
78 // Indiciates that |task| failed and should be retried. Increase backoff
79 // delay and add |task| to the back of the queue.
80 //
81 // May only be called after the HandleTaskCallback has been invoked with
82 // |task|.
83 void Retry(const T& task);
84
85 private:
86 FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry);
87
88 // Use |timer| for scheduled events.
89 //
90 // Used in tests. See also MockTimer.
91 void SetTimerForTest(scoped_ptr<base::Timer> timer);
92 void FinishTask(const T& task);
93 void ScheduleDispatch();
94 void Dispatch();
95 // Return true if we should dispatch tasks.
96 bool ShouldDispatch();
97
98 const HandleTaskCallback process_callback_;
99 net::BackoffEntry::Policy backoff_policy_;
100 scoped_ptr<net::BackoffEntry> backoff_entry_;
101 // The number of tasks currently being handled.
102 int num_in_progress_;
103 std::deque<T> queue_;
104 // The set of tasks in queue_ or currently being handled.
105 std::set<T> tasks_;
106 base::Closure dispatch_closure_;
107 scoped_ptr<base::Timer> backoff_timer_;
108 base::TimeDelta delay_;
109
110 // Must be last data member.
111 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
112
113 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
114 };
115
116 // The maximum number of tasks that may be concurrently executed. Think
117 // carefully before changing this value. The desired behavior of backoff may
118 // not be obvious when there is more than one concurrent task
119 const int kMaxConcurrentTasks = 1;
120
121 template <typename T>
122 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
123 const base::TimeDelta& initial_backoff_delay,
124 const base::TimeDelta& max_backoff_delay)
125 : process_callback_(callback),
126 backoff_policy_({}),
127 num_in_progress_(0),
128 weak_ptr_factory_(this) {
129 DCHECK_LE(initial_backoff_delay.InMicroseconds(),
130 max_backoff_delay.InMicroseconds());
131 backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
132 backoff_policy_.multiply_factor = 2.0;
133 backoff_policy_.jitter_factor = 0.1;
134 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
135 backoff_policy_.entry_lifetime_ms = -1;
136 backoff_policy_.always_use_initial_delay = false;
137 backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
138 dispatch_closure_ =
139 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
140 backoff_timer_.reset(new base::Timer(false, false));
141 }
142
143 template <typename T>
144 void TaskQueue<T>::AddToQueue(const T& task) {
145 DCHECK(CalledOnValidThread());
146 // Ignore duplicates.
147 if (tasks_.find(task) == tasks_.end()) {
148 queue_.push_back(task);
149 tasks_.insert(task);
150 }
151 ScheduleDispatch();
152 }
153
154 template <typename T>
155 void TaskQueue<T>::MarkAsSucceeded(const T& task) {
156 DCHECK(CalledOnValidThread());
157 FinishTask(task);
158 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
159 // reschedule a dispatch.
160 backoff_timer_->Stop();
161 backoff_entry_->Reset();
162 ScheduleDispatch();
163 }
164
165 template <typename T>
166 void TaskQueue<T>::Cancel(const T& task) {
167 DCHECK(CalledOnValidThread());
168 FinishTask(task);
169 ScheduleDispatch();
170 }
171
172 template <typename T>
173 void TaskQueue<T>::Retry(const T& task) {
174 DCHECK(CalledOnValidThread());
175 DCHECK_GE(num_in_progress_, 1);
176 --num_in_progress_;
177 DCHECK(tasks_.find(task) != tasks_.end());
178 backoff_entry_->InformOfRequest(false);
179 queue_.push_back(task);
180 ScheduleDispatch();
181 }
182
183 template <typename T>
184 void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
185 DCHECK(CalledOnValidThread());
186 DCHECK(timer.get());
187 backoff_timer_ = timer.Pass();
188 }
189
190 template <typename T>
191 void TaskQueue<T>::FinishTask(const T& task) {
192 DCHECK(CalledOnValidThread());
193 DCHECK_GE(num_in_progress_, 1);
194 --num_in_progress_;
195 const size_t num_erased = tasks_.erase(task);
196 DCHECK_EQ(1U, num_erased);
197 }
198
199 template <typename T>
200 void TaskQueue<T>::ScheduleDispatch() {
201 DCHECK(CalledOnValidThread());
202 if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
203 return;
204 }
205
206 backoff_timer_->Start(
207 FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
208 }
209
210 template <typename T>
211 void TaskQueue<T>::Dispatch() {
212 DCHECK(CalledOnValidThread());
213 if (!ShouldDispatch()) {
214 return;
215 }
216
217 DCHECK(!queue_.empty());
218 const T& task = queue_.front();
219 queue_.pop_front();
pavely 2014/09/09 19:42:23 task variable doesn't own task. pop_front will del
maniscalco 2014/09/09 21:15:44 Yikes. Good catch. Fixed. I mistakenly thought
220 ++num_in_progress_;
221 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
222 base::MessageLoop::current()->PostTask(FROM_HERE,
223 base::Bind(process_callback_, task));
224 }
225
226 template <typename T>
227 bool TaskQueue<T>::ShouldDispatch() {
228 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
229 }
230
231 } // namespace syncer
232
233 #endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698