Index: sync/engine/sync_scheduler_impl.cc |
diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc |
index 7fd9ccda52bcc93e70568725ec07c9ba4b4e314b..b03b3c85bb16613012941a5627f6a7425922c3f7 100644 |
--- a/sync/engine/sync_scheduler_impl.cc |
+++ b/sync/engine/sync_scheduler_impl.cc |
@@ -83,13 +83,10 @@ ConfigurationParams::ConfigurationParams( |
ConfigurationParams::~ConfigurationParams() {} |
SyncSchedulerImpl::WaitInterval::WaitInterval() |
- : mode(UNKNOWN), |
- had_nudge(false), |
- pending_configure_job(NULL) {} |
+ : mode(UNKNOWN) {} |
SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
- : mode(mode), had_nudge(false), length(length), |
- pending_configure_job(NULL) {} |
+ : mode(mode), length(length) {} |
SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
@@ -160,7 +157,6 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, |
weak_handle_this_(MakeWeakHandle( |
weak_ptr_factory_for_weak_handle_.GetWeakPtr())), |
name_(name), |
- sync_loop_(MessageLoop::current()), |
started_(false), |
syncer_short_poll_interval_seconds_( |
TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
@@ -169,23 +165,19 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, |
sessions_commit_delay_( |
TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
mode_(NORMAL_MODE), |
- // Start with assuming everything is fine with the connection. |
- // At the end of the sync cycle we would have the correct status. |
- pending_nudge_(NULL), |
delay_provider_(delay_provider), |
syncer_(syncer), |
session_context_(context), |
no_scheduling_allowed_(false) { |
- DCHECK(sync_loop_); |
} |
SyncSchedulerImpl::~SyncSchedulerImpl() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
StopImpl(base::Closure()); |
} |
void SyncSchedulerImpl::OnCredentialsUpdated() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
if (HttpResponse::SYNC_AUTH_ERROR == |
session_context_->connection_manager()->server_status()) { |
@@ -214,14 +206,11 @@ void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
// call DoCanaryJob to achieve this, and note that nothing -- not even a |
// canary job -- can bypass a THROTTLED WaitInterval. The only thing that |
// has the authority to do that is the Unthrottle timer. |
- scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
- if (!pending.get()) |
- return; |
- DoCanaryJob(pending.Pass()); |
+ TryCanaryJob(); |
} |
void SyncSchedulerImpl::Start(Mode mode) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
std::string thread_name = MessageLoop::current()->thread_name(); |
if (thread_name.empty()) |
thread_name = "<Main thread>"; |
@@ -238,25 +227,15 @@ void SyncSchedulerImpl::Start(Mode mode) { |
mode_ = mode; |
AdjustPolling(NULL); // Will kick start poll timer if needed. |
- if (old_mode != mode_) { |
- // We just changed our mode. See if there are any pending jobs that we could |
- // execute in the new mode. |
- if (mode_ == NORMAL_MODE) { |
- // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
- // has not yet completed. |
- DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
- } |
- |
- scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
- if (pending.get()) { |
- SDVLOG(2) << "Executing pending job. Good luck!"; |
- DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY); |
- } |
+ if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) { |
+ // We just got back to normal mode. Let's try to run the work that was |
+ // queued up while we were configuring. |
+ DoNudgeSyncSessionJob(NORMAL_PRIORITY); |
} |
} |
void SyncSchedulerImpl::SendInitialSnapshot() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
scoped_ptr<SyncSession> dummy(new SyncSession( |
session_context_, this, SyncSourceInfo())); |
SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
@@ -286,7 +265,7 @@ void BuildModelSafeParams( |
bool SyncSchedulerImpl::ScheduleConfiguration( |
const ConfigurationParams& params) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
DCHECK_EQ(CONFIGURATION_MODE, mode_); |
DCHECK(!params.ready_task.is_null()); |
@@ -295,7 +274,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration( |
// Only one configuration is allowed at a time. Verify we're not waiting |
// for a pending configure job. |
- DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
+ DCHECK(!pending_configure_job_); |
ModelSafeRoutingInfo restricted_routes; |
BuildModelSafeParams(params.types_to_download, |
@@ -306,7 +285,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration( |
// Only reconfigure if we have types to download. |
if (!params.types_to_download.Empty()) { |
DCHECK(!restricted_routes.empty()); |
- scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
+ pending_configure_job_.reset(new SyncSessionJob( |
SyncSessionJob::CONFIGURATION, |
TimeTicks::Now(), |
SyncSourceInfo(params.source, |
@@ -314,13 +293,15 @@ bool SyncSchedulerImpl::ScheduleConfiguration( |
restricted_routes, |
std::string())), |
params)); |
- bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); |
+ bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); |
// If we failed, the job would have been saved as the pending configure |
// job and a wait interval would have been set. |
if (!succeeded) { |
- DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
+ DCHECK(pending_configure_job_); |
return false; |
+ } else { |
+ DCHECK(!pending_configure_job_); |
} |
} else { |
SDVLOG(2) << "No change in routing info, calling ready task directly."; |
@@ -333,13 +314,12 @@ bool SyncSchedulerImpl::ScheduleConfiguration( |
SyncSchedulerImpl::JobProcessDecision |
SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, |
JobPriority priority) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
DCHECK(wait_interval_.get()); |
DCHECK_NE(job.purpose(), SyncSessionJob::POLL); |
SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
<< WaitInterval::GetModeString(wait_interval_->mode) |
- << (wait_interval_->had_nudge ? " (had nudge)" : "") |
<< ((priority == CANARY_PRIORITY) ? " (canary)" : ""); |
// If we save a job while in a WaitInterval, there is a well-defined moment |
@@ -358,11 +338,9 @@ SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, |
if (mode_ == CONFIGURATION_MODE) |
return SAVE; |
- // If we already had one nudge then just drop this nudge. We will retry |
- // later when the timer runs out. |
if (priority == NORMAL_PRIORITY) |
- return wait_interval_->had_nudge ? DROP : CONTINUE; |
- else // We are here because timer ran out. So retry. |
+ return DROP; |
+ else // Either backoff has ended, or we have permission to bypass it. |
return CONTINUE; |
} |
return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; |
@@ -371,7 +349,7 @@ SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, |
SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
const SyncSessionJob& job, |
JobPriority priority) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
// POLL jobs do not call this function. |
DCHECK(job.purpose() == SyncSessionJob::NUDGE || |
@@ -449,69 +427,11 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; |
} |
-void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { |
- const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
- if (is_nudge && pending_nudge_) { |
- SDVLOG(2) << "Coalescing a pending nudge"; |
- // TODO(tim): This basically means we never use the more-careful coalescing |
- // logic in ScheduleNudgeImpl that takes the min of the two nudge start |
- // times, because we're calling this function first. Pull this out |
- // into a function to coalesce + set start times and reuse. |
- pending_nudge_->CoalesceSources(job->source_info()); |
- return; |
- } |
- |
- scoped_ptr<SyncSessionJob> job_to_save = job->Clone(); |
- if (wait_interval_.get() && !wait_interval_->pending_configure_job) { |
- // This job should be made the new canary. |
- if (is_nudge) { |
- pending_nudge_ = job_to_save.get(); |
- } else { |
- SDVLOG(2) << "Saving a configuration job"; |
- DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); |
- DCHECK(!wait_interval_->pending_configure_job); |
- DCHECK_EQ(mode_, CONFIGURATION_MODE); |
- DCHECK(!job->config_params().ready_task.is_null()); |
- // The only nudge that could exist is a scheduled canary nudge. |
- DCHECK(!unscheduled_nudge_storage_.get()); |
- if (pending_nudge_) { |
- // Pre-empt the nudge canary and abandon the old nudge (owned by task). |
- unscheduled_nudge_storage_ = pending_nudge_->Clone(); |
- pending_nudge_ = unscheduled_nudge_storage_.get(); |
- } |
- wait_interval_->pending_configure_job = job_to_save.get(); |
- } |
- TimeDelta length = |
- wait_interval_->timer.desired_run_time() - TimeTicks::Now(); |
- wait_interval_->length = length < TimeDelta::FromSeconds(0) ? |
- TimeDelta::FromSeconds(0) : length; |
- RestartWaiting(job_to_save.Pass()); |
- return; |
- } |
- |
- // Note that today there are no cases where we SAVE a CONFIGURATION job |
- // when we're not in a WaitInterval. See bug 147736. |
- DCHECK(is_nudge); |
- // There may or may not be a pending_configure_job. Either way this nudge |
- // is unschedulable. |
- pending_nudge_ = job_to_save.get(); |
- unscheduled_nudge_storage_ = job_to_save.Pass(); |
-} |
- |
-// Functor for std::find_if to search by ModelSafeGroup. |
-struct ModelSafeWorkerGroupIs { |
- explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
- bool operator()(ModelSafeWorker* w) { |
- return group == w->GetModelSafeGroup(); |
- } |
- ModelSafeGroup group; |
-}; |
- |
void SyncSchedulerImpl::ScheduleNudgeAsync( |
const TimeDelta& desired_delay, |
NudgeSource source, ModelTypeSet types, |
const tracked_objects::Location& nudge_location) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
SDVLOG_LOC(nudge_location, 2) |
<< "Nudge scheduled with delay " |
<< desired_delay.InMilliseconds() << " ms, " |
@@ -530,7 +450,7 @@ void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
const TimeDelta& desired_delay, |
NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, |
const tracked_objects::Location& nudge_location) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
SDVLOG_LOC(nudge_location, 2) |
<< "Nudge scheduled with delay " |
<< desired_delay.InMilliseconds() << " ms, " |
@@ -552,7 +472,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( |
GetUpdatesCallerInfo::GetUpdatesSource source, |
const ModelTypeInvalidationMap& invalidation_map, |
const tracked_objects::Location& nudge_location) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; |
if (no_scheduling_allowed_) { |
@@ -586,51 +506,43 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( |
<< SyncSessionJob::GetPurposeString(job->purpose()) |
<< " in mode " << GetModeString(mode_) |
<< ": " << GetDecisionString(decision); |
- if (decision != CONTINUE) { |
- // End of the line, though we may save the job for later. |
- if (decision == SAVE) { |
- HandleSaveJobDecision(job.Pass()); |
- } else { |
- DCHECK_EQ(decision, DROP); |
- } |
+ if (decision == DROP) { |
return; |
} |
- if (pending_nudge_) { |
- SDVLOG(2) << "Rescheduling pending nudge"; |
- pending_nudge_->CoalesceSources(job->source_info()); |
- // Choose the start time as the earliest of the 2. Note that this means |
- // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
- // but a nudge is already scheduled to go out, we'll send the (tab) commit |
- // without waiting. |
- pending_nudge_->set_scheduled_start( |
- std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
- // Abandon the old task by cloning and replacing the session. |
- // It's possible that by "rescheduling" we're actually taking a job that |
- // was previously unscheduled and giving it wings, so take care to reset |
- // unscheduled nudge storage. |
- job = pending_nudge_->Clone(); |
- pending_nudge_ = NULL; |
- unscheduled_nudge_storage_.reset(); |
- // It's also possible we took a canary job, since we allow one nudge |
- // per backoff interval. |
- DCHECK(!wait_interval_ || !wait_interval_->had_nudge); |
+ // Try to coalesce in both SAVE and CONTINUE cases. |
+ if (pending_nudge_job_) { |
+ pending_nudge_job_->CoalesceSources(job->source_info()); |
+ if (decision == CONTINUE) { |
+ // Only update the scheduled_start if we're going to reschedule. |
+ pending_nudge_job_->set_scheduled_start( |
+ std::min(job->scheduled_start(), |
+ pending_nudge_job_->scheduled_start())); |
+ } |
+ } else { |
+ pending_nudge_job_ = job.Pass(); |
+ } |
+ |
+ if (decision == SAVE) { |
+ return; |
} |
- TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now(); |
+ TimeDelta run_delay = |
+ pending_nudge_job_->scheduled_start() - TimeTicks::Now(); |
if (run_delay < TimeDelta::FromMilliseconds(0)) |
run_delay = TimeDelta::FromMilliseconds(0); |
SDVLOG_LOC(nudge_location, 2) |
<< "Scheduling a nudge with " |
<< run_delay.InMilliseconds() << " ms delay"; |
- pending_nudge_ = job.get(); |
- PostDelayedTask(nudge_location, "DoSyncSessionJob", |
- base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
- weak_ptr_factory_.GetWeakPtr(), |
- base::Passed(&job), |
- NORMAL_PRIORITY), |
- run_delay); |
+ if (started_) { |
+ pending_wakeup_timer_.Start( |
+ nudge_location, |
+ run_delay, |
+ base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ NORMAL_PRIORITY)); |
+ } |
} |
const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
@@ -651,27 +563,9 @@ const char* SyncSchedulerImpl::GetDecisionString( |
return ""; |
} |
-void SyncSchedulerImpl::PostDelayedTask( |
- const tracked_objects::Location& from_here, |
- const char* name, const base::Closure& task, base::TimeDelta delay) { |
- SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
- << delay.InMilliseconds() << " ms delay"; |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- if (!started_) { |
- SDVLOG(1) << "Not posting task as scheduler is stopped."; |
- return; |
- } |
- // This cancels the previous task, if one existed. |
- pending_wakeup_.Reset(task); |
- sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); |
-} |
- |
-bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
- JobPriority priority) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- if (job->purpose() == SyncSessionJob::NUDGE) { |
- pending_nudge_ = NULL; |
- } |
+bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job, |
+ JobPriority priority) { |
+ DCHECK(CalledOnValidThread()); |
base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
JobProcessDecision decision = DecideOnJob(*job, priority); |
@@ -682,7 +576,11 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
<< ": " << GetDecisionString(decision); |
if (decision != CONTINUE) { |
if (decision == SAVE) { |
- HandleSaveJobDecision(job.Pass()); |
+ if (job->purpose() == SyncSessionJob::CONFIGURATION) { |
+ pending_configure_job_ = job.Pass(); |
+ } else { |
+ pending_nudge_job_ = job.Pass(); |
+ } |
} else { |
DCHECK_EQ(decision, DROP); |
} |
@@ -707,15 +605,14 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
// If we're here, it's because |job| was silenced until a server specified |
// time. (Note, it had to be |job|, because DecideOnJob would not permit |
// any job through while in WaitInterval::THROTTLED). |
- scoped_ptr<SyncSessionJob> clone = job->Clone(); |
- if (clone->purpose() == SyncSessionJob::NUDGE) |
- pending_nudge_ = clone.get(); |
- else if (clone->purpose() == SyncSessionJob::CONFIGURATION) |
- wait_interval_->pending_configure_job = clone.get(); |
+ if (job->purpose() == SyncSessionJob::NUDGE) |
+ pending_nudge_job_ = job.Pass(); |
+ else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
+ pending_configure_job_ = job.Pass(); |
else |
NOTREACHED(); |
- RestartWaiting(clone.Pass()); |
+ RestartWaiting(); |
return success; |
} |
@@ -725,6 +622,14 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
return success; |
} |
+void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { |
+ DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority); |
+} |
+ |
+bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { |
+ return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority); |
+} |
+ |
bool SyncSchedulerImpl::ShouldPoll() { |
if (wait_interval_.get()) { |
SDVLOG(2) << "Not running poll in wait interval."; |
@@ -747,8 +652,15 @@ bool SyncSchedulerImpl::ShouldPoll() { |
return true; |
} |
-void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
- DCHECK_EQ(job->purpose(), SyncSessionJob::POLL); |
+void SyncSchedulerImpl::DoPollSyncSessionJob() { |
+ ModelSafeRoutingInfo r; |
+ ModelTypeInvalidationMap invalidation_map = |
+ ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
+ SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
+ scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
+ TimeTicks::Now(), |
+ info, |
+ ConfigurationParams())); |
base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
@@ -766,17 +678,16 @@ void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
FinishSyncSessionJob(job.get(), premature_exit, &session); |
if (IsSyncingCurrentlySilenced()) { |
- // This will start the countdown to unthrottle. Other kinds of jobs would |
- // schedule themselves as the post-unthrottle canary. A poll job is not |
- // that urgent, so it does not get to be the canary. We still need to start |
- // the timer regardless. Otherwise there could be no one to clear the |
- // WaitInterval when the throttling expires. |
- RestartWaiting(scoped_ptr<SyncSessionJob>()); |
+ // Normally we would only call RestartWaiting() if we had a |
+ // pending_nudge_job_ or pending_configure_job_ set. In this case, it's |
+ // possible that neither is set. We create the wait interval anyway because |
+ // we need it to make sure we get unthrottled on time. |
+ RestartWaiting(); |
} |
} |
void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
// We are interested in recording time between local nudges for datatypes. |
// TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
@@ -803,7 +714,7 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, |
bool exited_prematurely, |
SyncSession* session) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
// Let job know that we're through syncing (calling SyncShare) at this point. |
bool succeeded = false; |
@@ -831,7 +742,7 @@ bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, |
void SyncSchedulerImpl::ScheduleNextSync( |
scoped_ptr<SyncSessionJob> finished_job, |
SyncSession* session) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION |
|| finished_job->purpose() == SyncSessionJob::NUDGE); |
@@ -841,34 +752,12 @@ void SyncSchedulerImpl::ScheduleNextSync( |
// we should be able to detect such errors and only retry when we detect |
// transient errors. |
- if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
- mode_ == NORMAL_MODE) { |
- // When in normal mode, we allow up to one nudge per backoff interval. It |
- // appears that this was our nudge for this interval, and it failed. |
- // |
- // Note: This does not prevent us from running canary jobs. For example, |
- // an IP address change might still result in another nudge being executed |
- // during this backoff interval. |
- SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
- DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); |
- DCHECK(!wait_interval_->had_nudge); |
- |
- wait_interval_->had_nudge = true; |
- DCHECK(!pending_nudge_); |
- |
- scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); |
- pending_nudge_ = new_job.get(); |
- RestartWaiting(new_job.Pass()); |
- } else { |
- // Either this is the first failure or a consecutive failure after our |
- // backoff timer expired. We handle it the same way in either case. |
- SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
- HandleContinuationError(finished_job.Pass(), session); |
- } |
+ SDVLOG(2) << "SyncShare job failed; will start or update backoff"; |
+ HandleContinuationError(finished_job.Pass(), session); |
} |
void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
TimeDelta poll = (!session_context_->notifications_enabled()) ? |
syncer_short_poll_interval_seconds_ : |
@@ -888,28 +777,28 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
&SyncSchedulerImpl::PollTimerCallback); |
} |
-void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
+void SyncSchedulerImpl::RestartWaiting() { |
CHECK(wait_interval_.get()); |
- wait_interval_->timer.Stop(); |
DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
if (wait_interval_->mode == WaitInterval::THROTTLED) { |
- pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, |
- weak_ptr_factory_.GetWeakPtr(), |
- base::Passed(&job))); |
- |
+ pending_wakeup_timer_.Start( |
+ FROM_HERE, |
+ wait_interval_->length, |
+ base::Bind(&SyncSchedulerImpl::Unthrottle, |
+ weak_ptr_factory_.GetWeakPtr())); |
} else { |
- pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
- weak_ptr_factory_.GetWeakPtr(), |
- base::Passed(&job))); |
+ pending_wakeup_timer_.Start( |
+ FROM_HERE, |
+ wait_interval_->length, |
+ base::Bind(&SyncSchedulerImpl::TryCanaryJob, |
+ weak_ptr_factory_.GetWeakPtr())); |
} |
- wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
- pending_wakeup_.callback()); |
} |
void SyncSchedulerImpl::HandleContinuationError( |
scoped_ptr<SyncSessionJob> old_job, |
SyncSession* session) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
TimeDelta length = delay_provider_->GetDelay( |
IsBackingOff() ? wait_interval_->length : |
@@ -921,26 +810,24 @@ void SyncSchedulerImpl::HandleContinuationError( |
<< " job. The time delta(ms) is " |
<< length.InMilliseconds(); |
- // This will reset the had_nudge variable as well. |
wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
length)); |
NotifyRetryTime(base::Time::Now() + length); |
- scoped_ptr<SyncSessionJob> new_job(old_job->Clone()); |
- new_job->set_scheduled_start(TimeTicks::Now() + length); |
+ old_job->set_scheduled_start(TimeTicks::Now() + length); |
if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { |
SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
// Config params should always get set. |
DCHECK(!old_job->config_params().ready_task.is_null()); |
- wait_interval_->pending_configure_job = new_job.get(); |
+ DCHECK(!pending_configure_job_); |
+ pending_configure_job_ = old_job.Pass(); |
} else { |
- // We are not in configuration mode. So wait_interval's pending job |
- // should be null. |
- DCHECK(wait_interval_->pending_configure_job == NULL); |
- DCHECK(!pending_nudge_); |
- pending_nudge_ = new_job.get(); |
+ // We're not in configure mode so we should not have a configure job. |
+ DCHECK(!pending_configure_job_); |
+ DCHECK(!pending_nudge_job_); |
+ pending_nudge_job_ = old_job.Pass(); |
} |
- RestartWaiting(new_job.Pass()); |
+ RestartWaiting(); |
} |
void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
@@ -953,7 +840,7 @@ void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
} |
void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
SDVLOG(2) << "StopImpl called"; |
// Kill any in-flight method calls. |
@@ -961,9 +848,9 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
wait_interval_.reset(); |
NotifyRetryTime(base::Time()); |
poll_timer_.Stop(); |
- pending_nudge_ = NULL; |
- unscheduled_nudge_storage_.reset(); |
- pending_wakeup_.Cancel(); |
+ pending_wakeup_timer_.Stop(); |
+ pending_nudge_job_.reset(); |
+ pending_configure_job_.reset(); |
if (started_) { |
started_ = false; |
} |
@@ -971,48 +858,24 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
callback.Run(); |
} |
-void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- SDVLOG(2) << "Do canary job"; |
- |
- // This is the only place where we invoke DoSyncSessionJob with canary |
- // privileges. Everyone else should use NORMAL_PRIORITY. |
- DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); |
-} |
+// This is the only place where we invoke DoSyncSessionJob with canary |
+// privileges. Everyone else should use NORMAL_PRIORITY. |
+void SyncSchedulerImpl::TryCanaryJob() { |
+ DCHECK(CalledOnValidThread()); |
-scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- // If we find a scheduled pending_ job, abandon the old one and return a |
- // a clone. If unscheduled, just hand over ownership. |
- scoped_ptr<SyncSessionJob> candidate; |
- if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
- && wait_interval_->pending_configure_job) { |
- SDVLOG(2) << "Found pending configure job"; |
- candidate = |
- wait_interval_->pending_configure_job->Clone().Pass(); |
- wait_interval_->pending_configure_job = candidate.get(); |
- } else if (mode_ == NORMAL_MODE && pending_nudge_) { |
- SDVLOG(2) << "Found pending nudge job"; |
- candidate = pending_nudge_->Clone(); |
- pending_nudge_ = candidate.get(); |
- unscheduled_nudge_storage_.reset(); |
+ if (mode_ == CONFIGURATION_MODE && pending_configure_job_) { |
+ SDVLOG(2) << "Found pending configure job; will run as canary"; |
+ DoConfigurationSyncSessionJob(CANARY_PRIORITY); |
+ } else if (mode_ == NORMAL_MODE && pending_nudge_job_) { |
+ SDVLOG(2) << "Found pending nudge job; will run as canary"; |
+ DoNudgeSyncSessionJob(CANARY_PRIORITY); |
+ } else { |
+ SDVLOG(2) << "Found no work to do; will not run a canary"; |
} |
- // If we took a job and there's a wait interval, we took the pending canary. |
- if (candidate && wait_interval_) |
- wait_interval_->timer.Stop(); |
- return candidate.Pass(); |
} |
void SyncSchedulerImpl::PollTimerCallback() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- ModelSafeRoutingInfo r; |
- ModelTypeInvalidationMap invalidation_map = |
- ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
- SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
- scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
- TimeTicks::Now(), |
- info, |
- ConfigurationParams())); |
+ DCHECK(CalledOnValidThread()); |
if (no_scheduling_allowed_) { |
// The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in |
// functions that are called only on the sync thread. This function is also |
@@ -1024,16 +887,12 @@ void SyncSchedulerImpl::PollTimerCallback() { |
return; |
} |
- DoPollSyncSessionJob(job.Pass()); |
+ DoPollSyncSessionJob(); |
} |
-void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+void SyncSchedulerImpl::Unthrottle() { |
+ DCHECK(CalledOnValidThread()); |
DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
- DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() || |
- wait_interval_->pending_configure_job == to_be_canary.get()); |
- SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
- << "canary."; |
// We're no longer throttled, so clear the wait interval. |
wait_interval_.reset(); |
@@ -1044,15 +903,11 @@ void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
// was just created (e.g via ScheduleNudgeImpl). The main implication is |
// that we're careful to update routing info (etc) with such potentially |
// stale canary jobs. |
- if (to_be_canary.get()) { |
- DoCanaryJob(to_be_canary.Pass()); |
- } else { |
- DCHECK(!unscheduled_nudge_storage_.get()); |
- } |
+ TryCanaryJob(); |
} |
void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
session_context_->NotifyListeners(SyncEngineEvent(cause)); |
} |
@@ -1063,45 +918,45 @@ void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { |
} |
bool SyncSchedulerImpl::IsBackingOff() const { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
return wait_interval_.get() && wait_interval_->mode == |
WaitInterval::EXPONENTIAL_BACKOFF; |
} |
void SyncSchedulerImpl::OnSilencedUntil( |
const base::TimeTicks& silenced_until) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
silenced_until - TimeTicks::Now())); |
NotifyRetryTime(base::Time::Now() + wait_interval_->length); |
} |
bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
return wait_interval_.get() && wait_interval_->mode == |
WaitInterval::THROTTLED; |
} |
void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
const base::TimeDelta& new_interval) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
syncer_short_poll_interval_seconds_ = new_interval; |
} |
void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( |
const base::TimeDelta& new_interval) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
syncer_long_poll_interval_seconds_ = new_interval; |
} |
void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( |
const base::TimeDelta& new_delay) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
sessions_commit_delay_ = new_delay; |
} |
void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
SDVLOG(2) << "OnShouldStopSyncingPermanently"; |
syncer_->RequestEarlyExit(); // Thread-safe. |
Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
@@ -1109,7 +964,7 @@ void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { |
void SyncSchedulerImpl::OnActionableError( |
const sessions::SyncSessionSnapshot& snap) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
SDVLOG(2) << "OnActionableError"; |
SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); |
event.snapshot = snap; |
@@ -1118,7 +973,7 @@ void SyncSchedulerImpl::OnActionableError( |
void SyncSchedulerImpl::OnSyncProtocolError( |
const sessions::SyncSessionSnapshot& snapshot) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
if (ShouldRequestEarlyExit( |
snapshot.model_neutral_state().sync_protocol_error)) { |
SDVLOG(2) << "Sync Scheduler requesting early exit."; |
@@ -1129,12 +984,12 @@ void SyncSchedulerImpl::OnSyncProtocolError( |
} |
void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
session_context_->set_notifications_enabled(notifications_enabled); |
} |
base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(CalledOnValidThread()); |
return sessions_commit_delay_; |
} |