Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(497)

Side by Side Diff: sync/internal_api/public/attachments/task_queue.h

Issue 554743004: Update AttachmentServiceImpl to retry attachment uploads. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Merge with master. Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sync/internal_api/public/attachments/attachment_uploader.h ('k') | sync/sync.gyp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
7
8 #include <deque>
9 #include <set>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/gtest_prod_util.h"
14 #include "base/macros.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/time/time.h"
19 #include "base/timer/timer.h"
20 #include "net/base/backoff_entry.h"
21
22 namespace syncer {
23
24 // A queue that dispatches tasks, ignores duplicates, and provides backoff
25 // semantics.
26 //
27 // |T| is the task type.
28 //
29 // For each task added to the queue, the HandleTaskCallback will eventually be
30 // invoked. For each invocation, the user of TaskQueue must call exactly one of
31 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
32 //
33 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
34 //
35 // Example usage:
36 //
37 // void Handle(const Foo& foo);
38 // ...
39 // TaskQueue<Foo> queue(base::Bind(&Handle),
40 // base::TimeDelta::FromSeconds(1),
41 // base::TimeDelta::FromMinutes(1));
42 // ...
43 // {
44 // Foo foo;
45 // // Add foo to the queue. At some point, Handle will be invoked in this
46 // // message loop.
47 // queue.AddToQueue(foo);
48 // }
49 // ...
50 // void Handle(const Foo& foo) {
51 // DoSomethingWith(foo);
52 // // We must call one of the three methods to tell the queue how we're
53 // // dealing with foo. Of course, we are free to call in the the context of
54 // // this HandleTaskCallback or outside the context if we so choose.
55 // if (SuccessfullyHandled(foo)) {
56 // queue.MarkAsSucceeded(foo);
57 // } else if (Failed(foo)) {
58 // queue.MarkAsFailed(foo);
59 // if (ShouldRetry(foo)) {
60 // queue.AddToQueue(foo);
61 // }
62 // } else {
63 // Cancel(foo);
64 // }
65 // }
66 //
67 template <typename T>
68 class TaskQueue : base::NonThreadSafe {
69 public:
70 // A callback provided by users of the TaskQueue to handle tasks.
71 //
72 // This callback is invoked by the queue with a task to be handled. The
73 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
74 // or |Cancel| to signify completion of the task.
75 typedef base::Callback<void(const T&)> HandleTaskCallback;
76
77 // Construct a TaskQueue.
78 //
79 // |callback| the callback to be invoked for handling tasks.
80 //
81 // |initial_backoff_delay| the initial amount of time the queue will wait
82 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be
83 // zero. Subsequent failures will increase the delay up to
84 // |max_backoff_delay|.
85 //
86 // |max_backoff_delay| the maximum amount of time the queue will wait before
87 // dispatching tasks. May be zero. Must be greater than or equal to
88 // |initial_backoff_delay|.
89 TaskQueue(const HandleTaskCallback& callback,
90 const base::TimeDelta& initial_backoff_delay,
91 const base::TimeDelta& max_backoff_delay);
92
93 // Add |task| to the end of the queue.
94 //
95 // If |task| is already present (as determined by operator==) it is not added.
96 void AddToQueue(const T& task);
97
98 // Mark |task| as completing successfully.
99 //
100 // Marking a task as completing successfully will reduce or eliminate any
101 // backoff delay in effect.
102 //
103 // May only be called after the HandleTaskCallback has been invoked with
104 // |task|.
105 void MarkAsSucceeded(const T& task);
106
107 // Mark |task| as failed.
108 //
109 // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
110 // of subsequent tasks. Repeated failures will increase the delay.
111 //
112 // May only be called after the HandleTaskCallback has been invoked with
113 // |task|.
114 void MarkAsFailed(const T& task);
115
116 // Cancel |task|.
117 //
118 // |task| is removed from the queue and will not be retried. Does not affect
119 // the backoff delay.
120 //
121 // May only be called after the HandleTaskCallback has been invoked with
122 // |task|.
123 void Cancel(const T& task);
124
125 private:
126 FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry);
127
128 // Use |timer| for scheduled events.
129 //
130 // Used in tests. See also MockTimer.
131 void SetTimerForTest(scoped_ptr<base::Timer> timer);
132 void FinishTask(const T& task);
133 void ScheduleDispatch();
134 void Dispatch();
135 // Return true if we should dispatch tasks.
136 bool ShouldDispatch();
137
138 const HandleTaskCallback process_callback_;
139 net::BackoffEntry::Policy backoff_policy_;
140 scoped_ptr<net::BackoffEntry> backoff_entry_;
141 // The number of tasks currently being handled.
142 int num_in_progress_;
143 std::deque<T> queue_;
144 // The set of tasks in queue_ or currently being handled.
145 std::set<T> tasks_;
146 base::Closure dispatch_closure_;
147 scoped_ptr<base::Timer> backoff_timer_;
148 base::TimeDelta delay_;
149
150 // Must be last data member.
151 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
152
153 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
154 };
155
156 // The maximum number of tasks that may be concurrently executed. Think
157 // carefully before changing this value. The desired behavior of backoff may
158 // not be obvious when there is more than one concurrent task
159 const int kMaxConcurrentTasks = 1;
160
161 template <typename T>
162 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
163 const base::TimeDelta& initial_backoff_delay,
164 const base::TimeDelta& max_backoff_delay)
165 : process_callback_(callback),
166 backoff_policy_({}),
167 num_in_progress_(0),
168 weak_ptr_factory_(this) {
169 DCHECK_LE(initial_backoff_delay.InMicroseconds(),
170 max_backoff_delay.InMicroseconds());
171 backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
172 backoff_policy_.multiply_factor = 2.0;
173 backoff_policy_.jitter_factor = 0.1;
174 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
175 backoff_policy_.entry_lifetime_ms = -1;
176 backoff_policy_.always_use_initial_delay = false;
177 backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
178 dispatch_closure_ =
179 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
180 backoff_timer_.reset(new base::Timer(false, false));
181 }
182
183 template <typename T>
184 void TaskQueue<T>::AddToQueue(const T& task) {
185 DCHECK(CalledOnValidThread());
186 // Ignore duplicates.
187 if (tasks_.find(task) == tasks_.end()) {
188 queue_.push_back(task);
189 tasks_.insert(task);
190 }
191 ScheduleDispatch();
192 }
193
194 template <typename T>
195 void TaskQueue<T>::MarkAsSucceeded(const T& task) {
196 DCHECK(CalledOnValidThread());
197 FinishTask(task);
198 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
199 // reschedule a dispatch.
200 backoff_timer_->Stop();
201 backoff_entry_->Reset();
202 ScheduleDispatch();
203 }
204
205 template <typename T>
206 void TaskQueue<T>::MarkAsFailed(const T& task) {
207 DCHECK(CalledOnValidThread());
208 FinishTask(task);
209 backoff_entry_->InformOfRequest(false);
210 ScheduleDispatch();
211 }
212
213 template <typename T>
214 void TaskQueue<T>::Cancel(const T& task) {
215 DCHECK(CalledOnValidThread());
216 FinishTask(task);
217 ScheduleDispatch();
218 }
219
220 template <typename T>
221 void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
222 DCHECK(CalledOnValidThread());
223 DCHECK(timer.get());
224 backoff_timer_ = timer.Pass();
225 }
226
227 template <typename T>
228 void TaskQueue<T>::FinishTask(const T& task) {
229 DCHECK(CalledOnValidThread());
230 DCHECK_GE(num_in_progress_, 1);
231 --num_in_progress_;
232 const size_t num_erased = tasks_.erase(task);
233 DCHECK_EQ(1U, num_erased);
234 }
235
236 template <typename T>
237 void TaskQueue<T>::ScheduleDispatch() {
238 DCHECK(CalledOnValidThread());
239 if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
240 return;
241 }
242
243 backoff_timer_->Start(
244 FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
245 }
246
247 template <typename T>
248 void TaskQueue<T>::Dispatch() {
249 DCHECK(CalledOnValidThread());
250 if (!ShouldDispatch()) {
251 return;
252 }
253
254 DCHECK(!queue_.empty());
255 const T& task = queue_.front();
256 ++num_in_progress_;
257 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
258 base::MessageLoop::current()->PostTask(FROM_HERE,
259 base::Bind(process_callback_, task));
260 queue_.pop_front();
261 }
262
263 template <typename T>
264 bool TaskQueue<T>::ShouldDispatch() {
265 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
266 }
267
268 } // namespace syncer
269
270 #endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
OLDNEW
« no previous file with comments | « sync/internal_api/public/attachments/attachment_uploader.h ('k') | sync/sync.gyp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698