| Index: sync/engine/sync_scheduler_impl.cc
|
| diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc
|
| index 7fd9ccda52bcc93e70568725ec07c9ba4b4e314b..b03b3c85bb16613012941a5627f6a7425922c3f7 100644
|
| --- a/sync/engine/sync_scheduler_impl.cc
|
| +++ b/sync/engine/sync_scheduler_impl.cc
|
| @@ -83,13 +83,10 @@ ConfigurationParams::ConfigurationParams(
|
| ConfigurationParams::~ConfigurationParams() {}
|
|
|
| SyncSchedulerImpl::WaitInterval::WaitInterval()
|
| - : mode(UNKNOWN),
|
| - had_nudge(false),
|
| - pending_configure_job(NULL) {}
|
| + : mode(UNKNOWN) {}
|
|
|
| SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
|
| - : mode(mode), had_nudge(false), length(length),
|
| - pending_configure_job(NULL) {}
|
| + : mode(mode), length(length) {}
|
|
|
| SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
|
|
|
| @@ -160,7 +157,6 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
|
| weak_handle_this_(MakeWeakHandle(
|
| weak_ptr_factory_for_weak_handle_.GetWeakPtr())),
|
| name_(name),
|
| - sync_loop_(MessageLoop::current()),
|
| started_(false),
|
| syncer_short_poll_interval_seconds_(
|
| TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
|
| @@ -169,23 +165,19 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
|
| sessions_commit_delay_(
|
| TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
|
| mode_(NORMAL_MODE),
|
| - // Start with assuming everything is fine with the connection.
|
| - // At the end of the sync cycle we would have the correct status.
|
| - pending_nudge_(NULL),
|
| delay_provider_(delay_provider),
|
| syncer_(syncer),
|
| session_context_(context),
|
| no_scheduling_allowed_(false) {
|
| - DCHECK(sync_loop_);
|
| }
|
|
|
| SyncSchedulerImpl::~SyncSchedulerImpl() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| StopImpl(base::Closure());
|
| }
|
|
|
| void SyncSchedulerImpl::OnCredentialsUpdated() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
|
|
| if (HttpResponse::SYNC_AUTH_ERROR ==
|
| session_context_->connection_manager()->server_status()) {
|
| @@ -214,14 +206,11 @@ void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
|
| // call DoCanaryJob to achieve this, and note that nothing -- not even a
|
| // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
|
| // has the authority to do that is the Unthrottle timer.
|
| - scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
|
| - if (!pending.get())
|
| - return;
|
| - DoCanaryJob(pending.Pass());
|
| + TryCanaryJob();
|
| }
|
|
|
| void SyncSchedulerImpl::Start(Mode mode) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| std::string thread_name = MessageLoop::current()->thread_name();
|
| if (thread_name.empty())
|
| thread_name = "<Main thread>";
|
| @@ -238,25 +227,15 @@ void SyncSchedulerImpl::Start(Mode mode) {
|
| mode_ = mode;
|
| AdjustPolling(NULL); // Will kick start poll timer if needed.
|
|
|
| - if (old_mode != mode_) {
|
| - // We just changed our mode. See if there are any pending jobs that we could
|
| - // execute in the new mode.
|
| - if (mode_ == NORMAL_MODE) {
|
| - // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
|
| - // has not yet completed.
|
| - DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
|
| - }
|
| -
|
| - scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
|
| - if (pending.get()) {
|
| - SDVLOG(2) << "Executing pending job. Good luck!";
|
| - DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY);
|
| - }
|
| + if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) {
|
| + // We just got back to normal mode. Let's try to run the work that was
|
| + // queued up while we were configuring.
|
| + DoNudgeSyncSessionJob(NORMAL_PRIORITY);
|
| }
|
| }
|
|
|
| void SyncSchedulerImpl::SendInitialSnapshot() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| scoped_ptr<SyncSession> dummy(new SyncSession(
|
| session_context_, this, SyncSourceInfo()));
|
| SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
|
| @@ -286,7 +265,7 @@ void BuildModelSafeParams(
|
|
|
| bool SyncSchedulerImpl::ScheduleConfiguration(
|
| const ConfigurationParams& params) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
|
| DCHECK_EQ(CONFIGURATION_MODE, mode_);
|
| DCHECK(!params.ready_task.is_null());
|
| @@ -295,7 +274,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
|
|
| // Only one configuration is allowed at a time. Verify we're not waiting
|
| // for a pending configure job.
|
| - DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
|
| + DCHECK(!pending_configure_job_);
|
|
|
| ModelSafeRoutingInfo restricted_routes;
|
| BuildModelSafeParams(params.types_to_download,
|
| @@ -306,7 +285,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
| // Only reconfigure if we have types to download.
|
| if (!params.types_to_download.Empty()) {
|
| DCHECK(!restricted_routes.empty());
|
| - scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
|
| + pending_configure_job_.reset(new SyncSessionJob(
|
| SyncSessionJob::CONFIGURATION,
|
| TimeTicks::Now(),
|
| SyncSourceInfo(params.source,
|
| @@ -314,13 +293,15 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
| restricted_routes,
|
| std::string())),
|
| params));
|
| - bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY);
|
| + bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY);
|
|
|
| // If we failed, the job would have been saved as the pending configure
|
| // job and a wait interval would have been set.
|
| if (!succeeded) {
|
| - DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job);
|
| + DCHECK(pending_configure_job_);
|
| return false;
|
| + } else {
|
| + DCHECK(!pending_configure_job_);
|
| }
|
| } else {
|
| SDVLOG(2) << "No change in routing info, calling ready task directly.";
|
| @@ -333,13 +314,12 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
| SyncSchedulerImpl::JobProcessDecision
|
| SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
|
| JobPriority priority) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK(wait_interval_.get());
|
| DCHECK_NE(job.purpose(), SyncSessionJob::POLL);
|
|
|
| SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
|
| << WaitInterval::GetModeString(wait_interval_->mode)
|
| - << (wait_interval_->had_nudge ? " (had nudge)" : "")
|
| << ((priority == CANARY_PRIORITY) ? " (canary)" : "");
|
|
|
| // If we save a job while in a WaitInterval, there is a well-defined moment
|
| @@ -358,11 +338,9 @@ SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
|
| if (mode_ == CONFIGURATION_MODE)
|
| return SAVE;
|
|
|
| - // If we already had one nudge then just drop this nudge. We will retry
|
| - // later when the timer runs out.
|
| if (priority == NORMAL_PRIORITY)
|
| - return wait_interval_->had_nudge ? DROP : CONTINUE;
|
| - else // We are here because timer ran out. So retry.
|
| + return DROP;
|
| + else // Either backoff has ended, or we have permission to bypass it.
|
| return CONTINUE;
|
| }
|
| return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE;
|
| @@ -371,7 +349,7 @@ SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
|
| SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
| const SyncSessionJob& job,
|
| JobPriority priority) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
|
|
| // POLL jobs do not call this function.
|
| DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
|
| @@ -449,69 +427,11 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
| return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
|
| }
|
|
|
| -void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
|
| - const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
|
| - if (is_nudge && pending_nudge_) {
|
| - SDVLOG(2) << "Coalescing a pending nudge";
|
| - // TODO(tim): This basically means we never use the more-careful coalescing
|
| - // logic in ScheduleNudgeImpl that takes the min of the two nudge start
|
| - // times, because we're calling this function first. Pull this out
|
| - // into a function to coalesce + set start times and reuse.
|
| - pending_nudge_->CoalesceSources(job->source_info());
|
| - return;
|
| - }
|
| -
|
| - scoped_ptr<SyncSessionJob> job_to_save = job->Clone();
|
| - if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
|
| - // This job should be made the new canary.
|
| - if (is_nudge) {
|
| - pending_nudge_ = job_to_save.get();
|
| - } else {
|
| - SDVLOG(2) << "Saving a configuration job";
|
| - DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
|
| - DCHECK(!wait_interval_->pending_configure_job);
|
| - DCHECK_EQ(mode_, CONFIGURATION_MODE);
|
| - DCHECK(!job->config_params().ready_task.is_null());
|
| - // The only nudge that could exist is a scheduled canary nudge.
|
| - DCHECK(!unscheduled_nudge_storage_.get());
|
| - if (pending_nudge_) {
|
| - // Pre-empt the nudge canary and abandon the old nudge (owned by task).
|
| - unscheduled_nudge_storage_ = pending_nudge_->Clone();
|
| - pending_nudge_ = unscheduled_nudge_storage_.get();
|
| - }
|
| - wait_interval_->pending_configure_job = job_to_save.get();
|
| - }
|
| - TimeDelta length =
|
| - wait_interval_->timer.desired_run_time() - TimeTicks::Now();
|
| - wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
|
| - TimeDelta::FromSeconds(0) : length;
|
| - RestartWaiting(job_to_save.Pass());
|
| - return;
|
| - }
|
| -
|
| - // Note that today there are no cases where we SAVE a CONFIGURATION job
|
| - // when we're not in a WaitInterval. See bug 147736.
|
| - DCHECK(is_nudge);
|
| - // There may or may not be a pending_configure_job. Either way this nudge
|
| - // is unschedulable.
|
| - pending_nudge_ = job_to_save.get();
|
| - unscheduled_nudge_storage_ = job_to_save.Pass();
|
| -}
|
| -
|
| -// Functor for std::find_if to search by ModelSafeGroup.
|
| -struct ModelSafeWorkerGroupIs {
|
| - explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
|
| - bool operator()(ModelSafeWorker* w) {
|
| - return group == w->GetModelSafeGroup();
|
| - }
|
| - ModelSafeGroup group;
|
| -};
|
| -
|
| void SyncSchedulerImpl::ScheduleNudgeAsync(
|
| const TimeDelta& desired_delay,
|
| NudgeSource source, ModelTypeSet types,
|
| const tracked_objects::Location& nudge_location) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| SDVLOG_LOC(nudge_location, 2)
|
| << "Nudge scheduled with delay "
|
| << desired_delay.InMilliseconds() << " ms, "
|
| @@ -530,7 +450,7 @@ void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
|
| const TimeDelta& desired_delay,
|
| NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
|
| const tracked_objects::Location& nudge_location) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| SDVLOG_LOC(nudge_location, 2)
|
| << "Nudge scheduled with delay "
|
| << desired_delay.InMilliseconds() << " ms, "
|
| @@ -552,7 +472,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
|
| GetUpdatesCallerInfo::GetUpdatesSource source,
|
| const ModelTypeInvalidationMap& invalidation_map,
|
| const tracked_objects::Location& nudge_location) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
|
|
|
| if (no_scheduling_allowed_) {
|
| @@ -586,51 +506,43 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
|
| << SyncSessionJob::GetPurposeString(job->purpose())
|
| << " in mode " << GetModeString(mode_)
|
| << ": " << GetDecisionString(decision);
|
| - if (decision != CONTINUE) {
|
| - // End of the line, though we may save the job for later.
|
| - if (decision == SAVE) {
|
| - HandleSaveJobDecision(job.Pass());
|
| - } else {
|
| - DCHECK_EQ(decision, DROP);
|
| - }
|
| + if (decision == DROP) {
|
| return;
|
| }
|
|
|
| - if (pending_nudge_) {
|
| - SDVLOG(2) << "Rescheduling pending nudge";
|
| - pending_nudge_->CoalesceSources(job->source_info());
|
| - // Choose the start time as the earliest of the 2. Note that this means
|
| - // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
|
| - // but a nudge is already scheduled to go out, we'll send the (tab) commit
|
| - // without waiting.
|
| - pending_nudge_->set_scheduled_start(
|
| - std::min(job->scheduled_start(), pending_nudge_->scheduled_start()));
|
| - // Abandon the old task by cloning and replacing the session.
|
| - // It's possible that by "rescheduling" we're actually taking a job that
|
| - // was previously unscheduled and giving it wings, so take care to reset
|
| - // unscheduled nudge storage.
|
| - job = pending_nudge_->Clone();
|
| - pending_nudge_ = NULL;
|
| - unscheduled_nudge_storage_.reset();
|
| - // It's also possible we took a canary job, since we allow one nudge
|
| - // per backoff interval.
|
| - DCHECK(!wait_interval_ || !wait_interval_->had_nudge);
|
| + // Try to coalesce in both SAVE and CONTINUE cases.
|
| + if (pending_nudge_job_) {
|
| + pending_nudge_job_->CoalesceSources(job->source_info());
|
| + if (decision == CONTINUE) {
|
| + // Only update the scheduled_start if we're going to reschedule.
|
| + pending_nudge_job_->set_scheduled_start(
|
| + std::min(job->scheduled_start(),
|
| + pending_nudge_job_->scheduled_start()));
|
| + }
|
| + } else {
|
| + pending_nudge_job_ = job.Pass();
|
| + }
|
| +
|
| + if (decision == SAVE) {
|
| + return;
|
| }
|
|
|
| - TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now();
|
| + TimeDelta run_delay =
|
| + pending_nudge_job_->scheduled_start() - TimeTicks::Now();
|
| if (run_delay < TimeDelta::FromMilliseconds(0))
|
| run_delay = TimeDelta::FromMilliseconds(0);
|
| SDVLOG_LOC(nudge_location, 2)
|
| << "Scheduling a nudge with "
|
| << run_delay.InMilliseconds() << " ms delay";
|
|
|
| - pending_nudge_ = job.get();
|
| - PostDelayedTask(nudge_location, "DoSyncSessionJob",
|
| - base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob),
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - base::Passed(&job),
|
| - NORMAL_PRIORITY),
|
| - run_delay);
|
| + if (started_) {
|
| + pending_wakeup_timer_.Start(
|
| + nudge_location,
|
| + run_delay,
|
| + base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + NORMAL_PRIORITY));
|
| + }
|
| }
|
|
|
| const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
|
| @@ -651,27 +563,9 @@ const char* SyncSchedulerImpl::GetDecisionString(
|
| return "";
|
| }
|
|
|
| -void SyncSchedulerImpl::PostDelayedTask(
|
| - const tracked_objects::Location& from_here,
|
| - const char* name, const base::Closure& task, base::TimeDelta delay) {
|
| - SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
|
| - << delay.InMilliseconds() << " ms delay";
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - if (!started_) {
|
| - SDVLOG(1) << "Not posting task as scheduler is stopped.";
|
| - return;
|
| - }
|
| - // This cancels the previous task, if one existed.
|
| - pending_wakeup_.Reset(task);
|
| - sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay);
|
| -}
|
| -
|
| -bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
|
| - JobPriority priority) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - if (job->purpose() == SyncSessionJob::NUDGE) {
|
| - pending_nudge_ = NULL;
|
| - }
|
| +bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job,
|
| + JobPriority priority) {
|
| + DCHECK(CalledOnValidThread());
|
|
|
| base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
|
| JobProcessDecision decision = DecideOnJob(*job, priority);
|
| @@ -682,7 +576,11 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
|
| << ": " << GetDecisionString(decision);
|
| if (decision != CONTINUE) {
|
| if (decision == SAVE) {
|
| - HandleSaveJobDecision(job.Pass());
|
| + if (job->purpose() == SyncSessionJob::CONFIGURATION) {
|
| + pending_configure_job_ = job.Pass();
|
| + } else {
|
| + pending_nudge_job_ = job.Pass();
|
| + }
|
| } else {
|
| DCHECK_EQ(decision, DROP);
|
| }
|
| @@ -707,15 +605,14 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
|
| // If we're here, it's because |job| was silenced until a server specified
|
| // time. (Note, it had to be |job|, because DecideOnJob would not permit
|
| // any job through while in WaitInterval::THROTTLED).
|
| - scoped_ptr<SyncSessionJob> clone = job->Clone();
|
| - if (clone->purpose() == SyncSessionJob::NUDGE)
|
| - pending_nudge_ = clone.get();
|
| - else if (clone->purpose() == SyncSessionJob::CONFIGURATION)
|
| - wait_interval_->pending_configure_job = clone.get();
|
| + if (job->purpose() == SyncSessionJob::NUDGE)
|
| + pending_nudge_job_ = job.Pass();
|
| + else if (job->purpose() == SyncSessionJob::CONFIGURATION)
|
| + pending_configure_job_ = job.Pass();
|
| else
|
| NOTREACHED();
|
|
|
| - RestartWaiting(clone.Pass());
|
| + RestartWaiting();
|
| return success;
|
| }
|
|
|
| @@ -725,6 +622,14 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
|
| return success;
|
| }
|
|
|
| +void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
|
| + DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority);
|
| +}
|
| +
|
| +bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
|
| + return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority);
|
| +}
|
| +
|
| bool SyncSchedulerImpl::ShouldPoll() {
|
| if (wait_interval_.get()) {
|
| SDVLOG(2) << "Not running poll in wait interval.";
|
| @@ -747,8 +652,15 @@ bool SyncSchedulerImpl::ShouldPoll() {
|
| return true;
|
| }
|
|
|
| -void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
|
| - DCHECK_EQ(job->purpose(), SyncSessionJob::POLL);
|
| +void SyncSchedulerImpl::DoPollSyncSessionJob() {
|
| + ModelSafeRoutingInfo r;
|
| + ModelTypeInvalidationMap invalidation_map =
|
| + ModelSafeRoutingInfoToInvalidationMap(r, std::string());
|
| + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
|
| + scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
|
| + TimeTicks::Now(),
|
| + info,
|
| + ConfigurationParams()));
|
|
|
| base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
|
|
|
| @@ -766,17 +678,16 @@ void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
|
| FinishSyncSessionJob(job.get(), premature_exit, &session);
|
|
|
| if (IsSyncingCurrentlySilenced()) {
|
| - // This will start the countdown to unthrottle. Other kinds of jobs would
|
| - // schedule themselves as the post-unthrottle canary. A poll job is not
|
| - // that urgent, so it does not get to be the canary. We still need to start
|
| - // the timer regardless. Otherwise there could be no one to clear the
|
| - // WaitInterval when the throttling expires.
|
| - RestartWaiting(scoped_ptr<SyncSessionJob>());
|
| + // Normally we would only call RestartWaiting() if we had a
|
| + // pending_nudge_job_ or pending_configure_job_ set. In this case, it's
|
| + // possible that neither is set. We create the wait interval anyway because
|
| + // we need it to make sure we get unthrottled on time.
|
| + RestartWaiting();
|
| }
|
| }
|
|
|
| void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
|
|
| // We are interested in recording time between local nudges for datatypes.
|
| // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
|
| @@ -803,7 +714,7 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
|
| bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
|
| bool exited_prematurely,
|
| SyncSession* session) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
|
|
| // Let job know that we're through syncing (calling SyncShare) at this point.
|
| bool succeeded = false;
|
| @@ -831,7 +742,7 @@ bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
|
| void SyncSchedulerImpl::ScheduleNextSync(
|
| scoped_ptr<SyncSessionJob> finished_job,
|
| SyncSession* session) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION
|
| || finished_job->purpose() == SyncSessionJob::NUDGE);
|
|
|
| @@ -841,34 +752,12 @@ void SyncSchedulerImpl::ScheduleNextSync(
|
| // we should be able to detect such errors and only retry when we detect
|
| // transient errors.
|
|
|
| - if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
|
| - mode_ == NORMAL_MODE) {
|
| - // When in normal mode, we allow up to one nudge per backoff interval. It
|
| - // appears that this was our nudge for this interval, and it failed.
|
| - //
|
| - // Note: This does not prevent us from running canary jobs. For example,
|
| - // an IP address change might still result in another nudge being executed
|
| - // during this backoff interval.
|
| - SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
|
| - DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
|
| - DCHECK(!wait_interval_->had_nudge);
|
| -
|
| - wait_interval_->had_nudge = true;
|
| - DCHECK(!pending_nudge_);
|
| -
|
| - scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
|
| - pending_nudge_ = new_job.get();
|
| - RestartWaiting(new_job.Pass());
|
| - } else {
|
| - // Either this is the first failure or a consecutive failure after our
|
| - // backoff timer expired. We handle it the same way in either case.
|
| - SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
|
| - HandleContinuationError(finished_job.Pass(), session);
|
| - }
|
| + SDVLOG(2) << "SyncShare job failed; will start or update backoff";
|
| + HandleContinuationError(finished_job.Pass(), session);
|
| }
|
|
|
| void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
|
|
| TimeDelta poll = (!session_context_->notifications_enabled()) ?
|
| syncer_short_poll_interval_seconds_ :
|
| @@ -888,28 +777,28 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
|
| &SyncSchedulerImpl::PollTimerCallback);
|
| }
|
|
|
| -void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
|
| +void SyncSchedulerImpl::RestartWaiting() {
|
| CHECK(wait_interval_.get());
|
| - wait_interval_->timer.Stop();
|
| DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
|
| if (wait_interval_->mode == WaitInterval::THROTTLED) {
|
| - pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - base::Passed(&job)));
|
| -
|
| + pending_wakeup_timer_.Start(
|
| + FROM_HERE,
|
| + wait_interval_->length,
|
| + base::Bind(&SyncSchedulerImpl::Unthrottle,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| } else {
|
| - pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - base::Passed(&job)));
|
| + pending_wakeup_timer_.Start(
|
| + FROM_HERE,
|
| + wait_interval_->length,
|
| + base::Bind(&SyncSchedulerImpl::TryCanaryJob,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| }
|
| - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
|
| - pending_wakeup_.callback());
|
| }
|
|
|
| void SyncSchedulerImpl::HandleContinuationError(
|
| scoped_ptr<SyncSessionJob> old_job,
|
| SyncSession* session) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
|
|
| TimeDelta length = delay_provider_->GetDelay(
|
| IsBackingOff() ? wait_interval_->length :
|
| @@ -921,26 +810,24 @@ void SyncSchedulerImpl::HandleContinuationError(
|
| << " job. The time delta(ms) is "
|
| << length.InMilliseconds();
|
|
|
| - // This will reset the had_nudge variable as well.
|
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
|
| length));
|
| NotifyRetryTime(base::Time::Now() + length);
|
| - scoped_ptr<SyncSessionJob> new_job(old_job->Clone());
|
| - new_job->set_scheduled_start(TimeTicks::Now() + length);
|
| + old_job->set_scheduled_start(TimeTicks::Now() + length);
|
| if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
|
| SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
|
| // Config params should always get set.
|
| DCHECK(!old_job->config_params().ready_task.is_null());
|
| - wait_interval_->pending_configure_job = new_job.get();
|
| + DCHECK(!pending_configure_job_);
|
| + pending_configure_job_ = old_job.Pass();
|
| } else {
|
| - // We are not in configuration mode. So wait_interval's pending job
|
| - // should be null.
|
| - DCHECK(wait_interval_->pending_configure_job == NULL);
|
| - DCHECK(!pending_nudge_);
|
| - pending_nudge_ = new_job.get();
|
| + // We're not in configure mode so we should not have a configure job.
|
| + DCHECK(!pending_configure_job_);
|
| + DCHECK(!pending_nudge_job_);
|
| + pending_nudge_job_ = old_job.Pass();
|
| }
|
|
|
| - RestartWaiting(new_job.Pass());
|
| + RestartWaiting();
|
| }
|
|
|
| void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
|
| @@ -953,7 +840,7 @@ void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
|
| }
|
|
|
| void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| SDVLOG(2) << "StopImpl called";
|
|
|
| // Kill any in-flight method calls.
|
| @@ -961,9 +848,9 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
|
| wait_interval_.reset();
|
| NotifyRetryTime(base::Time());
|
| poll_timer_.Stop();
|
| - pending_nudge_ = NULL;
|
| - unscheduled_nudge_storage_.reset();
|
| - pending_wakeup_.Cancel();
|
| + pending_wakeup_timer_.Stop();
|
| + pending_nudge_job_.reset();
|
| + pending_configure_job_.reset();
|
| if (started_) {
|
| started_ = false;
|
| }
|
| @@ -971,48 +858,24 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
|
| callback.Run();
|
| }
|
|
|
| -void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - SDVLOG(2) << "Do canary job";
|
| -
|
| - // This is the only place where we invoke DoSyncSessionJob with canary
|
| - // privileges. Everyone else should use NORMAL_PRIORITY.
|
| - DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY);
|
| -}
|
| +// This is the only place where we invoke DoSyncSessionJob with canary
|
| +// privileges. Everyone else should use NORMAL_PRIORITY.
|
| +void SyncSchedulerImpl::TryCanaryJob() {
|
| + DCHECK(CalledOnValidThread());
|
|
|
| -scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - // If we find a scheduled pending_ job, abandon the old one and return a
|
| - // a clone. If unscheduled, just hand over ownership.
|
| - scoped_ptr<SyncSessionJob> candidate;
|
| - if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
|
| - && wait_interval_->pending_configure_job) {
|
| - SDVLOG(2) << "Found pending configure job";
|
| - candidate =
|
| - wait_interval_->pending_configure_job->Clone().Pass();
|
| - wait_interval_->pending_configure_job = candidate.get();
|
| - } else if (mode_ == NORMAL_MODE && pending_nudge_) {
|
| - SDVLOG(2) << "Found pending nudge job";
|
| - candidate = pending_nudge_->Clone();
|
| - pending_nudge_ = candidate.get();
|
| - unscheduled_nudge_storage_.reset();
|
| + if (mode_ == CONFIGURATION_MODE && pending_configure_job_) {
|
| + SDVLOG(2) << "Found pending configure job; will run as canary";
|
| + DoConfigurationSyncSessionJob(CANARY_PRIORITY);
|
| + } else if (mode_ == NORMAL_MODE && pending_nudge_job_) {
|
| + SDVLOG(2) << "Found pending nudge job; will run as canary";
|
| + DoNudgeSyncSessionJob(CANARY_PRIORITY);
|
| + } else {
|
| + SDVLOG(2) << "Found no work to do; will not run a canary";
|
| }
|
| - // If we took a job and there's a wait interval, we took the pending canary.
|
| - if (candidate && wait_interval_)
|
| - wait_interval_->timer.Stop();
|
| - return candidate.Pass();
|
| }
|
|
|
| void SyncSchedulerImpl::PollTimerCallback() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - ModelSafeRoutingInfo r;
|
| - ModelTypeInvalidationMap invalidation_map =
|
| - ModelSafeRoutingInfoToInvalidationMap(r, std::string());
|
| - SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
|
| - scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
|
| - TimeTicks::Now(),
|
| - info,
|
| - ConfigurationParams()));
|
| + DCHECK(CalledOnValidThread());
|
| if (no_scheduling_allowed_) {
|
| // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
|
| // functions that are called only on the sync thread. This function is also
|
| @@ -1024,16 +887,12 @@ void SyncSchedulerImpl::PollTimerCallback() {
|
| return;
|
| }
|
|
|
| - DoPollSyncSessionJob(job.Pass());
|
| + DoPollSyncSessionJob();
|
| }
|
|
|
| -void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| +void SyncSchedulerImpl::Unthrottle() {
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
|
| - DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() ||
|
| - wait_interval_->pending_configure_job == to_be_canary.get());
|
| - SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
|
| - << "canary.";
|
|
|
| // We're no longer throttled, so clear the wait interval.
|
| wait_interval_.reset();
|
| @@ -1044,15 +903,11 @@ void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
|
| // was just created (e.g via ScheduleNudgeImpl). The main implication is
|
| // that we're careful to update routing info (etc) with such potentially
|
| // stale canary jobs.
|
| - if (to_be_canary.get()) {
|
| - DoCanaryJob(to_be_canary.Pass());
|
| - } else {
|
| - DCHECK(!unscheduled_nudge_storage_.get());
|
| - }
|
| + TryCanaryJob();
|
| }
|
|
|
| void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| session_context_->NotifyListeners(SyncEngineEvent(cause));
|
| }
|
|
|
| @@ -1063,45 +918,45 @@ void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
|
| }
|
|
|
| bool SyncSchedulerImpl::IsBackingOff() const {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| return wait_interval_.get() && wait_interval_->mode ==
|
| WaitInterval::EXPONENTIAL_BACKOFF;
|
| }
|
|
|
| void SyncSchedulerImpl::OnSilencedUntil(
|
| const base::TimeTicks& silenced_until) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
|
| silenced_until - TimeTicks::Now()));
|
| NotifyRetryTime(base::Time::Now() + wait_interval_->length);
|
| }
|
|
|
| bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| return wait_interval_.get() && wait_interval_->mode ==
|
| WaitInterval::THROTTLED;
|
| }
|
|
|
| void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
|
| const base::TimeDelta& new_interval) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| syncer_short_poll_interval_seconds_ = new_interval;
|
| }
|
|
|
| void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
|
| const base::TimeDelta& new_interval) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| syncer_long_poll_interval_seconds_ = new_interval;
|
| }
|
|
|
| void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
|
| const base::TimeDelta& new_delay) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| sessions_commit_delay_ = new_delay;
|
| }
|
|
|
| void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| SDVLOG(2) << "OnShouldStopSyncingPermanently";
|
| syncer_->RequestEarlyExit(); // Thread-safe.
|
| Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
|
| @@ -1109,7 +964,7 @@ void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
|
|
|
| void SyncSchedulerImpl::OnActionableError(
|
| const sessions::SyncSessionSnapshot& snap) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| SDVLOG(2) << "OnActionableError";
|
| SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
|
| event.snapshot = snap;
|
| @@ -1118,7 +973,7 @@ void SyncSchedulerImpl::OnActionableError(
|
|
|
| void SyncSchedulerImpl::OnSyncProtocolError(
|
| const sessions::SyncSessionSnapshot& snapshot) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| if (ShouldRequestEarlyExit(
|
| snapshot.model_neutral_state().sync_protocol_error)) {
|
| SDVLOG(2) << "Sync Scheduler requesting early exit.";
|
| @@ -1129,12 +984,12 @@ void SyncSchedulerImpl::OnSyncProtocolError(
|
| }
|
|
|
| void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| session_context_->set_notifications_enabled(notifications_enabled);
|
| }
|
|
|
| base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| + DCHECK(CalledOnValidThread());
|
| return sessions_commit_delay_;
|
| }
|
|
|
|
|