Index: chrome/browser/sync_file_system/sync_process_runner.cc |
diff --git a/chrome/browser/sync_file_system/sync_process_runner.cc b/chrome/browser/sync_file_system/sync_process_runner.cc |
index 7096cab5c715c76e50272a4041bccceeaf66dfe4..5b91b3544cf991a7e7060a038b6295bc1194b734 100644 |
--- a/chrome/browser/sync_file_system/sync_process_runner.cc |
+++ b/chrome/browser/sync_file_system/sync_process_runner.cc |
@@ -63,20 +63,16 @@ SyncProcessRunner::SyncProcessRunner( |
const std::string& name, |
Client* client, |
scoped_ptr<TimerHelper> timer_helper, |
- int max_parallel_task) |
+ size_t max_parallel_task) |
: name_(name), |
client_(client), |
max_parallel_task_(max_parallel_task), |
running_tasks_(0), |
timer_helper_(timer_helper.Pass()), |
- current_delay_(0), |
- last_delay_(0), |
+ service_state_(SYNC_SERVICE_RUNNING), |
pending_changes_(0), |
factory_(this) { |
- DCHECK_LE(1, max_parallel_task_); |
- |
- DCHECK_EQ(1, max_parallel_task_) |
- << "Parellel task execution is not yet implemented."; |
+ DCHECK_LE(1u, max_parallel_task_); |
if (!timer_helper_) |
timer_helper_.reset(new BaseTimerHelper); |
} |
@@ -84,38 +80,69 @@ SyncProcessRunner::SyncProcessRunner( |
SyncProcessRunner::~SyncProcessRunner() {} |
void SyncProcessRunner::Schedule() { |
- int64 delay = kSyncDelayInMilliseconds; |
if (pending_changes_ == 0) { |
ScheduleInternal(kSyncDelayMaxInMilliseconds); |
return; |
} |
- switch (GetServiceState()) { |
+ |
+ SyncServiceState last_service_state = service_state_; |
+ service_state_ = GetServiceState(); |
+ |
+ switch (service_state_) { |
case SYNC_SERVICE_RUNNING: |
+ ResetThrottling(); |
if (pending_changes_ > kPendingChangeThresholdForFastSync) |
- delay = kSyncDelayFastInMilliseconds; |
+ ScheduleInternal(kSyncDelayFastInMilliseconds); |
else |
- delay = kSyncDelayInMilliseconds; |
- break; |
+ ScheduleInternal(kSyncDelayInMilliseconds); |
+ return; |
case SYNC_SERVICE_TEMPORARY_UNAVAILABLE: |
- delay = kSyncDelaySlowInMilliseconds; |
- if (last_delay_ >= kSyncDelaySlowInMilliseconds) |
- delay = last_delay_ * 2; |
- if (delay >= kSyncDelayMaxInMilliseconds) |
- delay = kSyncDelayMaxInMilliseconds; |
- break; |
+ if (last_service_state != service_state_) |
+ ThrottleSync(kSyncDelaySlowInMilliseconds); |
+ ScheduleInternal(kSyncDelaySlowInMilliseconds); |
+ return; |
case SYNC_SERVICE_AUTHENTICATION_REQUIRED: |
case SYNC_SERVICE_DISABLED: |
- delay = kSyncDelayMaxInMilliseconds; |
- break; |
+ if (last_service_state != service_state_) |
+ ThrottleSync(kSyncDelaySlowInMilliseconds); |
+ ScheduleInternal(kSyncDelayMaxInMilliseconds); |
+ return; |
} |
- ScheduleInternal(delay); |
+ |
+ NOTREACHED(); |
+ ScheduleInternal(kSyncDelayMaxInMilliseconds); |
} |
-void SyncProcessRunner::ScheduleIfNotRunning() { |
- if (!timer_helper_->IsRunning()) |
- Schedule(); |
+void SyncProcessRunner::ThrottleSync(int64 base_delay) { |
+ base::TimeTicks now = timer_helper_->Now(); |
+ base::TimeDelta elapsed = std::min(now, throttle_until_) - throttle_from_; |
+ DCHECK(base::TimeDelta() <= elapsed); |
+ |
+ throttle_from_ = now; |
+ // Extend throttling duration by twice the elapsed time. |
+ // That is, if the backoff repeats in a short period, the throttling period |
+ // doesn't grow exponentially. If the backoff happens on the end of |
+ // throttling period, it causes another throttling period that is twice as |
+ // long as previous. |
+ base::TimeDelta base_delay_delta = |
+ base::TimeDelta::FromMilliseconds(base_delay); |
+ const base::TimeDelta max_delay = |
+ base::TimeDelta::FromMilliseconds(kSyncDelayMaxInMilliseconds); |
+ throttle_until_ = |
+ std::min(now + max_delay, |
+ std::max(now + base_delay_delta, throttle_until_ + 2 * elapsed)); |
+} |
+ |
+void SyncProcessRunner::ResetOldThrottling() { |
+ if (throttle_until_ < base::TimeTicks::Now()) |
+ ResetThrottling(); |
+} |
+ |
+void SyncProcessRunner::ResetThrottling() { |
+ throttle_from_ = base::TimeTicks(); |
+ throttle_until_ = base::TimeTicks(); |
} |
void SyncProcessRunner::OnChangesUpdated( |
@@ -143,62 +170,71 @@ SyncServiceState SyncProcessRunner::GetServiceState() { |
void SyncProcessRunner::Finished(const base::TimeTicks& start_time, |
SyncStatusCode status) { |
- DCHECK_LT(0, running_tasks_); |
+ DCHECK_LT(0u, running_tasks_); |
DCHECK_LE(running_tasks_, max_parallel_task_); |
--running_tasks_; |
util::Log(logging::LOG_VERBOSE, FROM_HERE, |
- "[%s] * Finished (elapsed: %" PRId64 " sec)", |
- name_.c_str(), |
- (timer_helper_->Now() - start_time).InSeconds()); |
+ "[%s] * Finished (elapsed: %" PRId64 " ms)", name_.c_str(), |
+ (timer_helper_->Now() - start_time).InMilliseconds()); |
+ |
if (status == SYNC_STATUS_NO_CHANGE_TO_SYNC || |
- status == SYNC_STATUS_FILE_BUSY) |
+ status == SYNC_STATUS_FILE_BUSY) { |
ScheduleInternal(kSyncDelayMaxInMilliseconds); |
- else if (!WasSuccessfulSync(status) && |
- GetServiceState() == SYNC_SERVICE_RUNNING) |
- ScheduleInternal(kSyncDelayWithSyncError); |
+ return; |
+ } |
+ |
+ if (WasSuccessfulSync(status)) |
+ ResetOldThrottling(); |
else |
- Schedule(); |
+ ThrottleSync(kSyncDelayWithSyncError); |
+ |
+ Schedule(); |
} |
void SyncProcessRunner::Run() { |
if (running_tasks_ >= max_parallel_task_) |
return; |
++running_tasks_; |
- last_scheduled_ = timer_helper_->Now(); |
- last_delay_ = current_delay_; |
+ base::TimeTicks now = timer_helper_->Now(); |
+ last_run_ = now; |
util::Log(logging::LOG_VERBOSE, FROM_HERE, |
"[%s] * Started", name_.c_str()); |
StartSync(base::Bind(&SyncProcessRunner::Finished, factory_.GetWeakPtr(), |
- last_scheduled_)); |
+ now)); |
+ if (running_tasks_ < max_parallel_task_) |
+ Schedule(); |
} |
void SyncProcessRunner::ScheduleInternal(int64 delay) { |
- base::TimeDelta time_to_next = base::TimeDelta::FromMilliseconds(delay); |
+ base::TimeTicks now = timer_helper_->Now(); |
+ base::TimeTicks next_scheduled; |
if (timer_helper_->IsRunning()) { |
- if (current_delay_ == delay) |
- return; |
- |
- base::TimeDelta elapsed = timer_helper_->Now() - last_scheduled_; |
- if (elapsed < time_to_next) { |
- time_to_next = time_to_next - elapsed; |
- } else { |
- time_to_next = base::TimeDelta::FromMilliseconds( |
- kSyncDelayFastInMilliseconds); |
+ next_scheduled = last_run_ + base::TimeDelta::FromMilliseconds(delay); |
+ if (next_scheduled < now) { |
+ next_scheduled = |
+ now + base::TimeDelta::FromMilliseconds(kSyncDelayFastInMilliseconds); |
} |
+ } else { |
+ next_scheduled = now + base::TimeDelta::FromMilliseconds(delay); |
} |
- if (current_delay_ != delay) { |
- util::Log(logging::LOG_VERBOSE, FROM_HERE, |
- "[%s] Scheduling task in %" PRId64 " secs", |
- name_.c_str(), time_to_next.InSeconds()); |
- } |
- current_delay_ = delay; |
+ if (next_scheduled < throttle_until_) |
+ next_scheduled = throttle_until_; |
+ |
+ if (timer_helper_->IsRunning() && last_scheduled_ == next_scheduled) |
+ return; |
+ |
+ util::Log(logging::LOG_VERBOSE, FROM_HERE, |
+ "[%s] Scheduling task in %" PRId64 " ms", |
+ name_.c_str(), (next_scheduled - now).InMilliseconds()); |
+ |
+ last_scheduled_ = next_scheduled; |
timer_helper_->Start( |
- FROM_HERE, time_to_next, |
+ FROM_HERE, next_scheduled - now, |
base::Bind(&SyncProcessRunner::Run, base::Unretained(this))); |
} |