Index: sync/internal_api/public/attachments/task_queue.h |
diff --git a/sync/internal_api/public/attachments/task_queue.h b/sync/internal_api/public/attachments/task_queue.h |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2466f3a3e3b34142ca2db6d6f8ec705940ff70e6 |
--- /dev/null |
+++ b/sync/internal_api/public/attachments/task_queue.h |
@@ -0,0 +1,270 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ |
+#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ |
+ |
+#include <deque> |
+#include <set> |
+ |
+#include "base/bind.h" |
+#include "base/callback.h" |
+#include "base/gtest_prod_util.h" |
+#include "base/macros.h" |
+#include "base/memory/weak_ptr.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/threading/non_thread_safe.h" |
+#include "base/time/time.h" |
+#include "base/timer/timer.h" |
+#include "net/base/backoff_entry.h" |
+ |
+namespace syncer { |
+ |
+// A queue that dispatches tasks, ignores duplicates, and provides backoff |
+// semantics. |
+// |
+// |T| is the task type. |
+// |
+// For each task added to the queue, the HandleTaskCallback will eventually be |
+// invoked. For each invocation, the user of TaskQueue must call exactly one of |
+// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|. |
+// |
+// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task). |
+// |
+// Example usage: |
+// |
+// void Handle(const Foo& foo); |
+// ... |
+// TaskQueue<Foo> queue(base::Bind(&Handle), |
+// base::TimeDelta::FromSeconds(1), |
+// base::TimeDelta::FromMinutes(1)); |
+// ... |
+// { |
+// Foo foo; |
+// // Add foo to the queue. At some point, Handle will be invoked in this |
+// // message loop. |
+// queue.AddToQueue(foo); |
+// } |
+// ... |
+// void Handle(const Foo& foo) { |
+// DoSomethingWith(foo); |
+// // We must call one of the three methods to tell the queue how we're |
+// // dealing with foo. Of course, we are free to call in the the context of |
+// // this HandleTaskCallback or outside the context if we so choose. |
+// if (SuccessfullyHandled(foo)) { |
+// queue.MarkAsSucceeded(foo); |
+// } else if (Failed(foo)) { |
+// queue.MarkAsFailed(foo); |
+// if (ShouldRetry(foo)) { |
+// queue.AddToQueue(foo); |
+// } |
+// } else { |
+// Cancel(foo); |
+// } |
+// } |
+// |
+template <typename T> |
+class TaskQueue : base::NonThreadSafe { |
+ public: |
+ // A callback provided by users of the TaskQueue to handle tasks. |
+ // |
+ // This callback is invoked by the queue with a task to be handled. The |
+ // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, |
+ // or |Cancel| to signify completion of the task. |
+ typedef base::Callback<void(const T&)> HandleTaskCallback; |
+ |
+ // Construct a TaskQueue. |
+ // |
+ // |callback| the callback to be invoked for handling tasks. |
+ // |
+ // |initial_backoff_delay| the initial amount of time the queue will wait |
+ // before dispatching tasks after a failed task (see |MarkAsFailed|). May be |
+ // zero. Subsequent failures will increase the delay up to |
+ // |max_backoff_delay|. |
+ // |
+ // |max_backoff_delay| the maximum amount of time the queue will wait before |
+ // dispatching tasks. May be zero. Must be greater than or equal to |
+ // |initial_backoff_delay|. |
+ TaskQueue(const HandleTaskCallback& callback, |
+ const base::TimeDelta& initial_backoff_delay, |
+ const base::TimeDelta& max_backoff_delay); |
+ |
+ // Add |task| to the end of the queue. |
+ // |
+ // If |task| is already present (as determined by operator==) it is not added. |
+ void AddToQueue(const T& task); |
+ |
+ // Mark |task| as completing successfully. |
+ // |
+ // Marking a task as completing successfully will reduce or eliminate any |
+ // backoff delay in effect. |
+ // |
+ // May only be called after the HandleTaskCallback has been invoked with |
+ // |task|. |
+ void MarkAsSucceeded(const T& task); |
+ |
+ // Mark |task| as failed. |
+ // |
+ // Marking a task as failed will cause a backoff, i.e. a delay in dispatching |
+ // of subsequent tasks. Repeated failures will increase the delay. |
+ // |
+ // May only be called after the HandleTaskCallback has been invoked with |
+ // |task|. |
+ void MarkAsFailed(const T& task); |
+ |
+ // Cancel |task|. |
+ // |
+ // |task| is removed from the queue and will not be retried. Does not affect |
+ // the backoff delay. |
+ // |
+ // May only be called after the HandleTaskCallback has been invoked with |
+ // |task|. |
+ void Cancel(const T& task); |
+ |
+ private: |
+ FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry); |
+ |
+ // Use |timer| for scheduled events. |
+ // |
+ // Used in tests. See also MockTimer. |
+ void SetTimerForTest(scoped_ptr<base::Timer> timer); |
+ void FinishTask(const T& task); |
+ void ScheduleDispatch(); |
+ void Dispatch(); |
+ // Return true if we should dispatch tasks. |
+ bool ShouldDispatch(); |
+ |
+ const HandleTaskCallback process_callback_; |
+ net::BackoffEntry::Policy backoff_policy_; |
+ scoped_ptr<net::BackoffEntry> backoff_entry_; |
+ // The number of tasks currently being handled. |
+ int num_in_progress_; |
+ std::deque<T> queue_; |
+ // The set of tasks in queue_ or currently being handled. |
+ std::set<T> tasks_; |
+ base::Closure dispatch_closure_; |
+ scoped_ptr<base::Timer> backoff_timer_; |
+ base::TimeDelta delay_; |
+ |
+ // Must be last data member. |
+ base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
+}; |
+ |
+// The maximum number of tasks that may be concurrently executed. Think |
+// carefully before changing this value. The desired behavior of backoff may |
+// not be obvious when there is more than one concurrent task |
+const int kMaxConcurrentTasks = 1; |
+ |
+template <typename T> |
+TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback, |
+ const base::TimeDelta& initial_backoff_delay, |
+ const base::TimeDelta& max_backoff_delay) |
+ : process_callback_(callback), |
+ backoff_policy_({}), |
+ num_in_progress_(0), |
+ weak_ptr_factory_(this) { |
+ DCHECK_LE(initial_backoff_delay.InMicroseconds(), |
+ max_backoff_delay.InMicroseconds()); |
+ backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds(); |
+ backoff_policy_.multiply_factor = 2.0; |
+ backoff_policy_.jitter_factor = 0.1; |
+ backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); |
+ backoff_policy_.entry_lifetime_ms = -1; |
+ backoff_policy_.always_use_initial_delay = false; |
+ backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_)); |
+ dispatch_closure_ = |
+ base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); |
+ backoff_timer_.reset(new base::Timer(false, false)); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::AddToQueue(const T& task) { |
+ DCHECK(CalledOnValidThread()); |
+ // Ignore duplicates. |
+ if (tasks_.find(task) == tasks_.end()) { |
+ queue_.push_back(task); |
+ tasks_.insert(task); |
+ } |
+ ScheduleDispatch(); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::MarkAsSucceeded(const T& task) { |
+ DCHECK(CalledOnValidThread()); |
+ FinishTask(task); |
+ // The task succeeded. Stop any pending timer, reset (clear) the backoff, and |
+ // reschedule a dispatch. |
+ backoff_timer_->Stop(); |
+ backoff_entry_->Reset(); |
+ ScheduleDispatch(); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::MarkAsFailed(const T& task) { |
+ DCHECK(CalledOnValidThread()); |
+ FinishTask(task); |
+ backoff_entry_->InformOfRequest(false); |
+ ScheduleDispatch(); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::Cancel(const T& task) { |
+ DCHECK(CalledOnValidThread()); |
+ FinishTask(task); |
+ ScheduleDispatch(); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK(timer.get()); |
+ backoff_timer_ = timer.Pass(); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::FinishTask(const T& task) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_GE(num_in_progress_, 1); |
+ --num_in_progress_; |
+ const size_t num_erased = tasks_.erase(task); |
+ DCHECK_EQ(1U, num_erased); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::ScheduleDispatch() { |
+ DCHECK(CalledOnValidThread()); |
+ if (backoff_timer_->IsRunning() || !ShouldDispatch()) { |
+ return; |
+ } |
+ |
+ backoff_timer_->Start( |
+ FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_); |
+} |
+ |
+template <typename T> |
+void TaskQueue<T>::Dispatch() { |
+ DCHECK(CalledOnValidThread()); |
+ if (!ShouldDispatch()) { |
+ return; |
+ } |
+ |
+ DCHECK(!queue_.empty()); |
+ const T& task = queue_.front(); |
+ ++num_in_progress_; |
+ DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); |
+ base::MessageLoop::current()->PostTask(FROM_HERE, |
+ base::Bind(process_callback_, task)); |
+ queue_.pop_front(); |
+} |
+ |
+template <typename T> |
+bool TaskQueue<T>::ShouldDispatch() { |
+ return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); |
+} |
+ |
+} // namespace syncer |
+ |
+#endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ |