| Index: sync/engine/sync_scheduler_impl.cc
|
| diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc
|
| index b2180fbded8e803e691895fe562d4c82dd5be815..4d1f9a8fe99c0fb1f1f07e51ce2c897a187bccf5 100644
|
| --- a/sync/engine/sync_scheduler_impl.cc
|
| +++ b/sync/engine/sync_scheduler_impl.cc
|
| @@ -9,6 +9,7 @@
|
|
|
| #include "base/auto_reset.h"
|
| #include "base/bind.h"
|
| +#include "base/bind_helpers.h"
|
| #include "base/compiler_specific.h"
|
| #include "base/location.h"
|
| #include "base/logging.h"
|
| @@ -83,8 +84,12 @@ ConfigurationParams::~ConfigurationParams() {}
|
|
|
| SyncSchedulerImpl::WaitInterval::WaitInterval()
|
| : mode(UNKNOWN),
|
| - had_nudge(false) {
|
| -}
|
| + had_nudge(false),
|
| + pending_configure_job(NULL) {}
|
| +
|
| +SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
|
| + : mode(mode), had_nudge(false), length(length),
|
| + pending_configure_job(NULL) {}
|
|
|
| SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
|
|
|
| @@ -100,39 +105,6 @@ const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
|
| return "";
|
| }
|
|
|
| -SyncSchedulerImpl::SyncSessionJob::SyncSessionJob()
|
| - : purpose(UNKNOWN),
|
| - is_canary_job(false) {
|
| -}
|
| -
|
| -SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {}
|
| -
|
| -SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
|
| - base::TimeTicks start,
|
| - linked_ptr<sessions::SyncSession> session,
|
| - bool is_canary_job,
|
| - const ConfigurationParams& config_params,
|
| - const tracked_objects::Location& from_here)
|
| - : purpose(purpose),
|
| - scheduled_start(start),
|
| - session(session),
|
| - is_canary_job(is_canary_job),
|
| - config_params(config_params),
|
| - from_here(from_here) {
|
| -}
|
| -
|
| -const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString(
|
| - SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
|
| - switch (purpose) {
|
| - ENUM_CASE(UNKNOWN);
|
| - ENUM_CASE(POLL);
|
| - ENUM_CASE(NUDGE);
|
| - ENUM_CASE(CONFIGURATION);
|
| - }
|
| - NOTREACHED();
|
| - return "";
|
| -}
|
| -
|
| GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
|
| NudgeSource source) {
|
| switch (source) {
|
| @@ -152,9 +124,6 @@ GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
|
| }
|
| }
|
|
|
| -SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
|
| - : mode(mode), had_nudge(false), length(length) { }
|
| -
|
| // Helper macros to log with the syncer thread name; useful when there
|
| // are multiple syncer threads involved.
|
|
|
| @@ -205,6 +174,7 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
|
| // Start with assuming everything is fine with the connection.
|
| // At the end of the sync cycle we would have the correct status.
|
| connection_code_(HttpResponse::SERVER_CONNECTION_OK),
|
| + pending_nudge_(NULL),
|
| delay_provider_(delay_provider),
|
| syncer_(syncer),
|
| session_context_(context),
|
| @@ -241,10 +211,25 @@ void SyncSchedulerImpl::OnConnectionStatusChange() {
|
|
|
| void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
|
| connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
|
| + // There could be a pending nudge or configuration job in several cases:
|
| + //
|
| + // 1. We're in exponential backoff.
|
| + // 2. We're silenced / throttled.
|
| + // 3. A nudge was saved previously due to not having a valid auth token.
|
| + // 4. A nudge was scheduled + saved while in configuration mode.
|
| + //
|
| + // In all cases except (2), we want to retry contacting the server. We
|
| + // call DoCanaryJob to achieve this, and note that nothing -- not even a
|
| + // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
|
| + // has the authority to do that is the Unthrottle timer.
|
| + scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
|
| + if (!pending.get())
|
| + return;
|
| +
|
| PostTask(FROM_HERE, "DoCanaryJob",
|
| base::Bind(&SyncSchedulerImpl::DoCanaryJob,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| -
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + base::Passed(&pending)));
|
| }
|
|
|
| void SyncSchedulerImpl::UpdateServerConnectionManagerStatus(
|
| @@ -280,11 +265,20 @@ void SyncSchedulerImpl::Start(Mode mode) {
|
| if (mode_ == NORMAL_MODE) {
|
| // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
|
| // has not yet completed.
|
| - DCHECK(!wait_interval_.get() ||
|
| - !wait_interval_->pending_configure_job.get());
|
| + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
|
| }
|
|
|
| - DoPendingJobIfPossible(false);
|
| + scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
|
| + if (pending.get()) {
|
| + // TODO(tim): We should be able to remove this...
|
| + scoped_ptr<SyncSession> session(CreateSyncSession(
|
| + pending->session()->source()));
|
| + // Also the routing info might have been changed since we cached the
|
| + // pending nudge. Update it by coalescing to the latest.
|
| + pending->mutable_session()->Coalesce(*session);
|
| + SDVLOG(2) << "Executing pending job. Good luck!";
|
| + DoSyncSessionJob(pending.Pass());
|
| + }
|
| }
|
| }
|
|
|
| @@ -328,7 +322,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
|
|
| // Only one configuration is allowed at a time. Verify we're not waiting
|
| // for a pending configure job.
|
| - DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
|
| + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
|
|
|
| ModelSafeRoutingInfo restricted_routes;
|
| BuildModelSafeParams(params.types_to_download,
|
| @@ -339,7 +333,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
| // Only reconfigure if we have types to download.
|
| if (!params.types_to_download.Empty()) {
|
| DCHECK(!restricted_routes.empty());
|
| - linked_ptr<SyncSession> session(new SyncSession(
|
| + scoped_ptr<SyncSession> session(new SyncSession(
|
| session_context_,
|
| this,
|
| SyncSourceInfo(params.source,
|
| @@ -348,19 +342,18 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
| std::string())),
|
| restricted_routes,
|
| session_context_->workers()));
|
| - SyncSessionJob job(SyncSessionJob::CONFIGURATION,
|
| - TimeTicks::Now(),
|
| - session,
|
| - false,
|
| - params,
|
| - FROM_HERE);
|
| - DoSyncSessionJob(job);
|
| + scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
|
| + SyncSessionJob::CONFIGURATION,
|
| + TimeTicks::Now(),
|
| + session.Pass(),
|
| + params,
|
| + FROM_HERE));
|
| + bool succeeded = DoSyncSessionJob(job.Pass());
|
|
|
| // If we failed, the job would have been saved as the pending configure
|
| // job and a wait interval would have been set.
|
| - if (!session->Succeeded()) {
|
| - DCHECK(wait_interval_.get() &&
|
| - wait_interval_->pending_configure_job.get());
|
| + if (!succeeded) {
|
| + DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job);
|
| return false;
|
| }
|
| } else {
|
| @@ -372,37 +365,42 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
|
| }
|
|
|
| SyncSchedulerImpl::JobProcessDecision
|
| -SyncSchedulerImpl::DecideWhileInWaitInterval(
|
| - const SyncSessionJob& job) {
|
| +SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| DCHECK(wait_interval_.get());
|
|
|
| SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
|
| << WaitInterval::GetModeString(wait_interval_->mode)
|
| << (wait_interval_->had_nudge ? " (had nudge)" : "")
|
| - << (job.is_canary_job ? " (canary)" : "");
|
| + << (job.is_canary() ? " (canary)" : "");
|
|
|
| - if (job.purpose == SyncSessionJob::POLL)
|
| + if (job.purpose() == SyncSessionJob::POLL)
|
| return DROP;
|
|
|
| - DCHECK(job.purpose == SyncSessionJob::NUDGE ||
|
| - job.purpose == SyncSessionJob::CONFIGURATION);
|
| + // If we save a job while in a WaitInterval, there is a well-defined moment
|
| + // in time in the future when it makes sense for that SAVE-worthy job to try
|
| + // running again -- the end of the WaitInterval.
|
| + DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
|
| + job.purpose() == SyncSessionJob::CONFIGURATION);
|
| +
|
| + // If throttled, there's a clock ticking to unthrottle. We want to get
|
| + // on the same train.
|
| if (wait_interval_->mode == WaitInterval::THROTTLED)
|
| return SAVE;
|
|
|
| DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
|
| - if (job.purpose == SyncSessionJob::NUDGE) {
|
| + if (job.purpose() == SyncSessionJob::NUDGE) {
|
| if (mode_ == CONFIGURATION_MODE)
|
| return SAVE;
|
|
|
| // If we already had one nudge then just drop this nudge. We will retry
|
| // later when the timer runs out.
|
| - if (!job.is_canary_job)
|
| + if (!job.is_canary())
|
| return wait_interval_->had_nudge ? DROP : CONTINUE;
|
| else // We are here because timer ran out. So retry.
|
| return CONTINUE;
|
| }
|
| - return job.is_canary_job ? CONTINUE : SAVE;
|
| + return job.is_canary() ? CONTINUE : SAVE;
|
| }
|
|
|
| SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
| @@ -412,16 +410,22 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
| // See if our type is throttled.
|
| ModelTypeSet throttled_types =
|
| session_context_->throttled_data_type_tracker()->GetThrottledTypes();
|
| - if (job.purpose == SyncSessionJob::NUDGE &&
|
| - job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
|
| + if (job.purpose() == SyncSessionJob::NUDGE &&
|
| + job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
|
| ModelTypeSet requested_types;
|
| for (ModelTypeInvalidationMap::const_iterator i =
|
| - job.session->source().types.begin();
|
| - i != job.session->source().types.end();
|
| + job.session()->source().types.begin();
|
| + i != job.session()->source().types.end();
|
| ++i) {
|
| requested_types.Put(i->first);
|
| }
|
|
|
| + // If all types are throttled, do not CONTINUE. Today, we don't treat
|
| + // a per-datatype "unthrottle" event as something that should force a
|
| + // canary job. For this reason, there's no good time to reschedule this job
|
| + // to run -- we'll lazily wait for an independent event to trigger a sync.
|
| + // Note that there may already be such an event if we're in a WaitInterval,
|
| + // so we can retry it then.
|
| if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
|
| return SAVE;
|
| }
|
| @@ -430,9 +434,9 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
| return DecideWhileInWaitInterval(job);
|
|
|
| if (mode_ == CONFIGURATION_MODE) {
|
| - if (job.purpose == SyncSessionJob::NUDGE)
|
| - return SAVE;
|
| - else if (job.purpose == SyncSessionJob::CONFIGURATION)
|
| + if (job.purpose() == SyncSessionJob::NUDGE)
|
| + return SAVE; // Running requires a mode switch.
|
| + else if (job.purpose() == SyncSessionJob::CONFIGURATION)
|
| return CONTINUE;
|
| else
|
| return DROP;
|
| @@ -440,7 +444,7 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
|
|
| // We are in normal mode.
|
| DCHECK_EQ(mode_, NORMAL_MODE);
|
| - DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
|
| + DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION);
|
|
|
| // Note about some subtle scheduling semantics.
|
| //
|
| @@ -483,81 +487,59 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
|
| return CONTINUE;
|
|
|
| SDVLOG(2) << "No valid auth token. Using that to decide on job.";
|
| - return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
|
| + // Running the job would require updated auth, so we can't honour
|
| + // job.scheduled_start().
|
| + return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
|
| }
|
|
|
| -void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
|
| - if (pending_nudge_.get() == NULL) {
|
| - SDVLOG(2) << "Creating a pending nudge job";
|
| - SyncSession* s = job.session.get();
|
| -
|
| - // Get a fresh session with similar configuration as before (resets
|
| - // StatusController).
|
| - scoped_ptr<SyncSession> session(new SyncSession(s->context(),
|
| - s->delegate(), s->source(), s->routing_info(), s->workers()));
|
| -
|
| - SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
|
| - make_linked_ptr(session.release()), false,
|
| - ConfigurationParams(), job.from_here);
|
| - pending_nudge_.reset(new SyncSessionJob(new_job));
|
| +void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
|
| + DCHECK_EQ(DecideOnJob(*job), SAVE);
|
| + const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
|
| + if (is_nudge && pending_nudge_) {
|
| + SDVLOG(2) << "Coalescing a pending nudge";
|
| + // TODO(tim): This basically means we never use the more-careful coalescing
|
| + // logic in ScheduleNudgeImpl that takes the min of the two nudge start
|
| + // times, because we're calling this function first. Pull this out
|
| + // into a function to coalesce + set start times and reuse.
|
| + pending_nudge_->mutable_session()->Coalesce(*(job->session()));
|
| return;
|
| }
|
|
|
| - SDVLOG(2) << "Coalescing a pending nudge";
|
| - pending_nudge_->session->Coalesce(*(job.session.get()));
|
| - pending_nudge_->scheduled_start = job.scheduled_start;
|
| -
|
| - // Unfortunately the nudge location cannot be modified. So it stores the
|
| - // location of the first caller.
|
| -}
|
| -
|
| -bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - DCHECK(started_);
|
| -
|
| - JobProcessDecision decision = DecideOnJob(job);
|
| - SDVLOG(2) << "Should run "
|
| - << SyncSessionJob::GetPurposeString(job.purpose)
|
| - << " job in mode " << GetModeString(mode_)
|
| - << ": " << GetDecisionString(decision);
|
| - if (decision != SAVE)
|
| - return decision == CONTINUE;
|
| -
|
| - DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
|
| - SyncSessionJob::CONFIGURATION);
|
| -
|
| - SaveJob(job);
|
| - return false;
|
| -}
|
| -
|
| -void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) {
|
| - DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - if (job.purpose == SyncSessionJob::NUDGE) {
|
| - SDVLOG(2) << "Saving a nudge job";
|
| - InitOrCoalescePendingJob(job);
|
| - } else if (job.purpose == SyncSessionJob::CONFIGURATION){
|
| - SDVLOG(2) << "Saving a configuration job";
|
| - DCHECK(wait_interval_.get());
|
| - DCHECK(mode_ == CONFIGURATION_MODE);
|
| + scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon();
|
| + if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
|
| + // This job should be made the new canary.
|
| + if (is_nudge) {
|
| + pending_nudge_ = job_to_save.get();
|
| + } else {
|
| + SDVLOG(2) << "Saving a configuration job";
|
| + DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
|
| + DCHECK(!wait_interval_->pending_configure_job);
|
| + DCHECK_EQ(mode_, CONFIGURATION_MODE);
|
| + DCHECK(!job->config_params().ready_task.is_null());
|
| + // The only nudge that could exist is a scheduled canary nudge.
|
| + DCHECK(!unscheduled_nudge_storage_.get());
|
| + if (pending_nudge_) {
|
| + // Pre-empt the nudge canary and abandon the old nudge (owned by task).
|
| + unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon();
|
| + pending_nudge_ = unscheduled_nudge_storage_.get();
|
| + }
|
| + wait_interval_->pending_configure_job = job_to_save.get();
|
| + }
|
| + TimeDelta length =
|
| + wait_interval_->timer.desired_run_time() - TimeTicks::Now();
|
| + wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
|
| + TimeDelta::FromSeconds(0) : length;
|
| + RestartWaiting(job_to_save.Pass());
|
| + return;
|
| + }
|
|
|
| - // Config params should always get set.
|
| - DCHECK(!job.config_params.ready_task.is_null());
|
| - SyncSession* old = job.session.get();
|
| - SyncSession* s(new SyncSession(session_context_, this, old->source(),
|
| - old->routing_info(), old->workers()));
|
| - SyncSessionJob new_job(job.purpose,
|
| - TimeTicks::Now(),
|
| - make_linked_ptr(s),
|
| - false,
|
| - job.config_params,
|
| - job.from_here);
|
| - wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
|
| - } // drop the rest.
|
| - // TODO(sync): Is it okay to drop the rest? It's weird that
|
| - // SaveJob() only does what it says sometimes. (See
|
| - // http://crbug.com/90868.)
|
| + // Note that today there are no cases where we SAVE a CONFIGURATION job
|
| + // when we're not in a WaitInterval. See bug 147736.
|
| + DCHECK(is_nudge);
|
| + // There may or may not be a pending_configure_job. Either way this nudge
|
| + // is unschedulable.
|
| + pending_nudge_ = job_to_save.get();
|
| + unscheduled_nudge_storage_ = job_to_save.Pass();
|
| }
|
|
|
| // Functor for std::find_if to search by ModelSafeGroup.
|
| @@ -570,39 +552,39 @@ struct ModelSafeWorkerGroupIs {
|
| };
|
|
|
| void SyncSchedulerImpl::ScheduleNudgeAsync(
|
| - const TimeDelta& delay,
|
| + const TimeDelta& desired_delay,
|
| NudgeSource source, ModelTypeSet types,
|
| const tracked_objects::Location& nudge_location) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| SDVLOG_LOC(nudge_location, 2)
|
| - << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
|
| + << "Nudge scheduled with delay "
|
| + << desired_delay.InMilliseconds() << " ms, "
|
| << "source " << GetNudgeSourceString(source) << ", "
|
| << "types " << ModelTypeSetToString(types);
|
|
|
| ModelTypeInvalidationMap invalidation_map =
|
| ModelTypeSetToInvalidationMap(types, std::string());
|
| - SyncSchedulerImpl::ScheduleNudgeImpl(delay,
|
| + SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
|
| GetUpdatesFromNudgeSource(source),
|
| invalidation_map,
|
| - false,
|
| nudge_location);
|
| }
|
|
|
| void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
|
| - const TimeDelta& delay,
|
| + const TimeDelta& desired_delay,
|
| NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
|
| const tracked_objects::Location& nudge_location) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| SDVLOG_LOC(nudge_location, 2)
|
| - << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
|
| + << "Nudge scheduled with delay "
|
| + << desired_delay.InMilliseconds() << " ms, "
|
| << "source " << GetNudgeSourceString(source) << ", "
|
| << "payloads "
|
| << ModelTypeInvalidationMapToString(invalidation_map);
|
|
|
| - SyncSchedulerImpl::ScheduleNudgeImpl(delay,
|
| + SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
|
| GetUpdatesFromNudgeSource(source),
|
| invalidation_map,
|
| - false,
|
| nudge_location);
|
| }
|
|
|
| @@ -610,7 +592,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
|
| const TimeDelta& delay,
|
| GetUpdatesCallerInfo::GetUpdatesSource source,
|
| const ModelTypeInvalidationMap& invalidation_map,
|
| - bool is_canary_job, const tracked_objects::Location& nudge_location) {
|
| + const tracked_objects::Location& nudge_location) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
|
|
|
| @@ -619,44 +601,55 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
|
| << delay.InMilliseconds() << " ms, "
|
| << "source " << GetUpdatesSourceString(source) << ", "
|
| << "payloads "
|
| - << ModelTypeInvalidationMapToString(invalidation_map)
|
| - << (is_canary_job ? " (canary)" : "");
|
| + << ModelTypeInvalidationMapToString(invalidation_map);
|
|
|
| SyncSourceInfo info(source, invalidation_map);
|
| UpdateNudgeTimeRecords(info);
|
|
|
| - SyncSession* session(CreateSyncSession(info));
|
| - SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
|
| - make_linked_ptr(session), is_canary_job,
|
| - ConfigurationParams(), nudge_location);
|
| + scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
|
| + SyncSessionJob::NUDGE,
|
| + TimeTicks::Now() + delay,
|
| + CreateSyncSession(info).Pass(),
|
| + ConfigurationParams(),
|
| + nudge_location));
|
|
|
| - session = NULL;
|
| - if (!ShouldRunJob(job))
|
| - return;
|
| -
|
| - if (pending_nudge_.get()) {
|
| - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
|
| - SDVLOG(2) << "Dropping the nudge because we are in backoff";
|
| - return;
|
| + JobProcessDecision decision = DecideOnJob(*job);
|
| + SDVLOG(2) << "Should run "
|
| + << SyncSessionJob::GetPurposeString(job->purpose())
|
| + << " job " << job->session()
|
| + << " in mode " << GetModeString(mode_)
|
| + << ": " << GetDecisionString(decision);
|
| + if (decision != CONTINUE) {
|
| + // End of the line, though we may save the job for later.
|
| + if (decision == SAVE) {
|
| + HandleSaveJobDecision(job.Pass());
|
| + } else {
|
| + DCHECK_EQ(decision, DROP);
|
| }
|
| + return;
|
| + }
|
|
|
| - SDVLOG(2) << "Coalescing pending nudge";
|
| - pending_nudge_->session->Coalesce(*(job.session.get()));
|
| -
|
| + if (pending_nudge_) {
|
| SDVLOG(2) << "Rescheduling pending nudge";
|
| - SyncSession* s = pending_nudge_->session.get();
|
| - job.session.reset(new SyncSession(s->context(), s->delegate(),
|
| - s->source(), s->routing_info(), s->workers()));
|
| -
|
| - // Choose the start time as the earliest of the 2.
|
| - job.scheduled_start = std::min(job.scheduled_start,
|
| - pending_nudge_->scheduled_start);
|
| - pending_nudge_.reset();
|
| + pending_nudge_->mutable_session()->Coalesce(*(job->session()));
|
| + // Choose the start time as the earliest of the 2. Note that this means
|
| + // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
|
| + // but a nudge is already scheduled to go out, we'll send the (tab) commit
|
| + // without waiting.
|
| + pending_nudge_->set_scheduled_start(
|
| + std::min(job->scheduled_start(), pending_nudge_->scheduled_start()));
|
| + // Abandon the old task by cloning and replacing the session.
|
| + // It's possible that by "rescheduling" we're actually taking a job that
|
| + // was previously unscheduled and giving it wings, so take care to reset
|
| + // unscheduled nudge storage.
|
| + job = pending_nudge_->CloneAndAbandon();
|
| + unscheduled_nudge_storage_.reset();
|
| + pending_nudge_ = NULL;
|
| }
|
|
|
| // TODO(zea): Consider adding separate throttling/backoff for datatype
|
| // refresh requests.
|
| - ScheduleSyncSessionJob(job);
|
| + ScheduleSyncSessionJob(job.Pass());
|
| }
|
|
|
| const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
|
| @@ -677,29 +670,6 @@ const char* SyncSchedulerImpl::GetDecisionString(
|
| return "";
|
| }
|
|
|
| -// static
|
| -void SyncSchedulerImpl::SetSyncerStepsForPurpose(
|
| - SyncSessionJob::SyncSessionJobPurpose purpose,
|
| - SyncerStep* start,
|
| - SyncerStep* end) {
|
| - switch (purpose) {
|
| - case SyncSessionJob::CONFIGURATION:
|
| - *start = DOWNLOAD_UPDATES;
|
| - *end = APPLY_UPDATES;
|
| - return;
|
| - case SyncSessionJob::NUDGE:
|
| - case SyncSessionJob::POLL:
|
| - *start = SYNCER_BEGIN;
|
| - *end = SYNCER_END;
|
| - return;
|
| - default:
|
| - NOTREACHED();
|
| - *start = SYNCER_END;
|
| - *end = SYNCER_END;
|
| - return;
|
| - }
|
| -}
|
| -
|
| void SyncSchedulerImpl::PostTask(
|
| const tracked_objects::Location& from_here,
|
| const char* name, const base::Closure& task) {
|
| @@ -725,82 +695,93 @@ void SyncSchedulerImpl::PostDelayedTask(
|
| sync_loop_->PostDelayedTask(from_here, task, delay);
|
| }
|
|
|
| -void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) {
|
| +void SyncSchedulerImpl::ScheduleSyncSessionJob(
|
| + scoped_ptr<SyncSessionJob> job) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| if (no_scheduling_allowed_) {
|
| NOTREACHED() << "Illegal to schedule job while session in progress.";
|
| return;
|
| }
|
|
|
| - TimeDelta delay = job.scheduled_start - TimeTicks::Now();
|
| + TimeDelta delay = job->scheduled_start() - TimeTicks::Now();
|
| + tracked_objects::Location loc(job->from_location());
|
| if (delay < TimeDelta::FromMilliseconds(0))
|
| delay = TimeDelta::FromMilliseconds(0);
|
| - SDVLOG_LOC(job.from_here, 2)
|
| + SDVLOG_LOC(loc, 2)
|
| << "In ScheduleSyncSessionJob with "
|
| - << SyncSessionJob::GetPurposeString(job.purpose)
|
| + << SyncSessionJob::GetPurposeString(job->purpose())
|
| << " job and " << delay.InMilliseconds() << " ms delay";
|
|
|
| - DCHECK(job.purpose == SyncSessionJob::NUDGE ||
|
| - job.purpose == SyncSessionJob::POLL);
|
| - if (job.purpose == SyncSessionJob::NUDGE) {
|
| - SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge";
|
| - DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() ==
|
| - job.session);
|
| - pending_nudge_.reset(new SyncSessionJob(job));
|
| + DCHECK(job->purpose() == SyncSessionJob::NUDGE ||
|
| + job->purpose() == SyncSessionJob::POLL);
|
| + if (job->purpose() == SyncSessionJob::NUDGE) {
|
| + SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to ";
|
| + DCHECK(!pending_nudge_ || pending_nudge_->session() ==
|
| + job->session());
|
| + pending_nudge_ = job.get();
|
| }
|
| - PostDelayedTask(job.from_here, "DoSyncSessionJob",
|
| - base::Bind(&SyncSchedulerImpl::DoSyncSessionJob,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - job),
|
| - delay);
|
| +
|
| + PostDelayedTask(loc, "DoSyncSessionJob",
|
| + base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob),
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + base::Passed(&job)),
|
| + delay);
|
| }
|
|
|
| -void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) {
|
| +bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| -
|
| - AutoReset<bool> protector(&no_scheduling_allowed_, true);
|
| - if (!ShouldRunJob(job)) {
|
| - SLOG(WARNING)
|
| - << "Not executing "
|
| - << SyncSessionJob::GetPurposeString(job.purpose) << " job from "
|
| - << GetUpdatesSourceString(job.session->source().updates_source);
|
| - return;
|
| - }
|
| -
|
| - if (job.purpose == SyncSessionJob::NUDGE) {
|
| - if (pending_nudge_.get() == NULL ||
|
| - pending_nudge_->session != job.session) {
|
| + if (job->purpose() == SyncSessionJob::NUDGE) {
|
| + if (pending_nudge_ == NULL ||
|
| + pending_nudge_->session() != job->session()) {
|
| + // |job| is abandoned.
|
| SDVLOG(2) << "Dropping a nudge in "
|
| << "DoSyncSessionJob because another nudge was scheduled";
|
| - return; // Another nudge must have been scheduled in in the meantime.
|
| + return false;
|
| }
|
| - pending_nudge_.reset();
|
| + pending_nudge_ = NULL;
|
|
|
| - // Create the session with the latest model safe table and use it to purge
|
| + // Rebase the session with the latest model safe table and use it to purge
|
| // and update any disabled or modified entries in the job.
|
| - scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
|
| + job->mutable_session()->RebaseRoutingInfoWithLatest(
|
| + session_context_->routing_info(), session_context_->workers());
|
| + }
|
|
|
| - job.session->RebaseRoutingInfoWithLatest(*session);
|
| + AutoReset<bool> protector(&no_scheduling_allowed_, true);
|
| + JobProcessDecision decision = DecideOnJob(*job);
|
| + SDVLOG(2) << "Should run "
|
| + << SyncSessionJob::GetPurposeString(job->purpose())
|
| + << " job " << job->session()
|
| + << " in mode " << GetModeString(mode_)
|
| + << " with source " << job->session()->source().updates_source
|
| + << ": " << GetDecisionString(decision);
|
| + if (decision != CONTINUE) {
|
| + if (decision == SAVE) {
|
| + HandleSaveJobDecision(job.Pass());
|
| + } else {
|
| + DCHECK_EQ(decision, DROP);
|
| + }
|
| + return false;
|
| }
|
| - SDVLOG(2) << "DoSyncSessionJob with "
|
| - << SyncSessionJob::GetPurposeString(job.purpose) << " job";
|
|
|
| - SyncerStep begin(SYNCER_END);
|
| - SyncerStep end(SYNCER_END);
|
| - SetSyncerStepsForPurpose(job.purpose, &begin, &end);
|
| + SDVLOG(2) << "DoSyncSessionJob with "
|
| + << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
|
|
|
| bool has_more_to_sync = true;
|
| - while (ShouldRunJob(job) && has_more_to_sync) {
|
| + bool premature_exit = false;
|
| + while (DecideOnJob(*job) == CONTINUE && has_more_to_sync) {
|
| SDVLOG(2) << "Calling SyncShare.";
|
| // Synchronously perform the sync session from this thread.
|
| - syncer_->SyncShare(job.session.get(), begin, end);
|
| - has_more_to_sync = job.session->HasMoreToSync();
|
| + premature_exit = !syncer_->SyncShare(job->mutable_session(),
|
| + job->start_step(),
|
| + job->end_step());
|
| +
|
| + has_more_to_sync = job->session()->HasMoreToSync();
|
| if (has_more_to_sync)
|
| - job.session->PrepareForAnotherSyncCycle();
|
| + job->mutable_session()->PrepareForAnotherSyncCycle();
|
| }
|
| SDVLOG(2) << "Done SyncShare looping.";
|
|
|
| - FinishSyncSessionJob(job);
|
| + return FinishSyncSessionJob(job.Pass(), premature_exit);
|
| }
|
|
|
| void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
|
| @@ -823,64 +804,82 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
|
|
|
| #define PER_DATA_TYPE_MACRO(type_str) \
|
| SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
|
| - SYNC_DATA_TYPE_HISTOGRAM(iter->first);
|
| + SYNC_DATA_TYPE_HISTOGRAM(iter->first);
|
| #undef PER_DATA_TYPE_MACRO
|
| }
|
| }
|
|
|
| -void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) {
|
| +bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job,
|
| + bool exited_prematurely) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| // Now update the status of the connection from SCM. We need this to decide
|
| - // whether we need to save/run future jobs. The notifications from SCM are not
|
| - // reliable.
|
| + // whether we need to save/run future jobs. The notifications from SCM are
|
| + // not reliable.
|
| //
|
| // TODO(rlarocque): crbug.com/110954
|
| // We should get rid of the notifications and it is probably not needed to
|
| - // maintain this status variable in 2 places. We should query it directly from
|
| - // SCM when needed.
|
| + // maintain this status variable in 2 places. We should query it directly
|
| + // from SCM when needed.
|
| ServerConnectionManager* scm = session_context_->connection_manager();
|
| UpdateServerConnectionManagerStatus(scm->server_status());
|
|
|
| - if (IsSyncingCurrentlySilenced()) {
|
| - SDVLOG(2) << "We are currently throttled; not scheduling the next sync.";
|
| - // TODO(sync): Investigate whether we need to check job.purpose
|
| - // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
|
| - SaveJob(job);
|
| - return; // Nothing to do.
|
| - } else if (job.session->Succeeded() &&
|
| - !job.config_params.ready_task.is_null()) {
|
| - // If this was a configuration job with a ready task, invoke it now that
|
| - // we finished successfully.
|
| + // Let job know that we're through syncing (calling SyncShare) at this point.
|
| + bool succeeded = false;
|
| + {
|
| AutoReset<bool> protector(&no_scheduling_allowed_, true);
|
| - job.config_params.ready_task.Run();
|
| + succeeded = job->Finish(exited_prematurely);
|
| }
|
|
|
| SDVLOG(2) << "Updating the next polling time after SyncMain";
|
| - ScheduleNextSync(job);
|
| + ScheduleNextSync(job.Pass(), succeeded);
|
| + return succeeded;
|
| }
|
|
|
| -void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) {
|
| +void SyncSchedulerImpl::ScheduleNextSync(
|
| + scoped_ptr<SyncSessionJob> finished_job, bool succeeded) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - DCHECK(!old_job.session->HasMoreToSync());
|
| + DCHECK(!finished_job->session()->HasMoreToSync());
|
|
|
| - AdjustPolling(&old_job);
|
| + AdjustPolling(finished_job.get());
|
|
|
| - if (old_job.session->Succeeded()) {
|
| + if (succeeded) {
|
| // Only reset backoff if we actually reached the server.
|
| - if (old_job.session->SuccessfullyReachedServer())
|
| + // It's possible that we reached the server on one attempt, then had an
|
| + // error on the next (or didn't perform some of the server-communicating
|
| + // commands). We want to verify that, for all commands attempted, we
|
| + // successfully spoke with the server. Therefore, we verify no errors
|
| + // and at least one SYNCER_OK.
|
| + if (finished_job->session()->DidReachServer())
|
| wait_interval_.reset();
|
| SDVLOG(2) << "Job succeeded so not scheduling more jobs";
|
| return;
|
| }
|
|
|
| - if (old_job.purpose == SyncSessionJob::POLL) {
|
| + if (IsSyncingCurrentlySilenced()) {
|
| + SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
|
| + // If we're here, it's because |job| was silenced until a server specified
|
| + // time. (Note, it had to be |job|, because DecideOnJob would not permit
|
| + // any job through while in WaitInterval::THROTTLED).
|
| + scoped_ptr<SyncSessionJob> clone = finished_job->Clone();
|
| + if (clone->purpose() == SyncSessionJob::NUDGE)
|
| + pending_nudge_ = clone.get();
|
| + else if (clone->purpose() == SyncSessionJob::CONFIGURATION)
|
| + wait_interval_->pending_configure_job = clone.get();
|
| + else
|
| + clone.reset(); // Unthrottling is enough, no need to force a canary.
|
| +
|
| + RestartWaiting(clone.Pass());
|
| + return;
|
| + }
|
| +
|
| + if (finished_job->purpose() == SyncSessionJob::POLL) {
|
| return; // We don't retry POLL jobs.
|
| }
|
|
|
| // TODO(rlarocque): There's no reason why we should blindly backoff and retry
|
| // if we don't succeed. Some types of errors are not likely to disappear on
|
| - // their own. With the return values now available in the old_job.session, we
|
| - // should be able to detect such errors and only retry when we detect
|
| + // their own. With the return values now available in the old_job.session,
|
| + // we should be able to detect such errors and only retry when we detect
|
| // transient errors.
|
|
|
| if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
|
| @@ -888,22 +887,24 @@ void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) {
|
| // When in normal mode, we allow up to one nudge per backoff interval. It
|
| // appears that this was our nudge for this interval, and it failed.
|
| //
|
| - // Note: This does not prevent us from running canary jobs. For example, an
|
| - // IP address change might still result in another nudge being executed
|
| + // Note: This does not prevent us from running canary jobs. For example,
|
| + // an IP address change might still result in another nudge being executed
|
| // during this backoff interval.
|
| - SDVLOG(2) << "A nudge during backoff failed";
|
| -
|
| - DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
|
| + SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
|
| + DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
|
| DCHECK(!wait_interval_->had_nudge);
|
|
|
| wait_interval_->had_nudge = true;
|
| - InitOrCoalescePendingJob(old_job);
|
| - RestartWaiting();
|
| + DCHECK(!pending_nudge_);
|
| +
|
| + scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
|
| + pending_nudge_ = new_job.get();
|
| + RestartWaiting(new_job.Pass());
|
| } else {
|
| // Either this is the first failure or a consecutive failure after our
|
| // backoff timer expired. We handle it the same way in either case.
|
| SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
|
| - HandleContinuationError(old_job);
|
| + HandleContinuationError(finished_job.Pass());
|
| }
|
| }
|
|
|
| @@ -916,7 +917,7 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
|
| bool rate_changed = !poll_timer_.IsRunning() ||
|
| poll != poll_timer_.GetCurrentDelay();
|
|
|
| - if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
|
| + if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed)
|
| poll_timer_.Reset();
|
|
|
| if (!rate_changed)
|
| @@ -928,55 +929,61 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
|
| &SyncSchedulerImpl::PollTimerCallback);
|
| }
|
|
|
| -void SyncSchedulerImpl::RestartWaiting() {
|
| +void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
|
| CHECK(wait_interval_.get());
|
| wait_interval_->timer.Stop();
|
| - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
|
| - this, &SyncSchedulerImpl::DoCanaryJob);
|
| + DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
|
| + if (wait_interval_->mode == WaitInterval::THROTTLED) {
|
| + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
|
| + base::Bind(&SyncSchedulerImpl::Unthrottle,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + base::Passed(&job)));
|
| + } else {
|
| + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
|
| + base::Bind(&SyncSchedulerImpl::DoCanaryJob,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + base::Passed(&job)));
|
| + }
|
| }
|
|
|
| void SyncSchedulerImpl::HandleContinuationError(
|
| - const SyncSessionJob& old_job) {
|
| + scoped_ptr<SyncSessionJob> old_job) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| if (DCHECK_IS_ON()) {
|
| if (IsBackingOff()) {
|
| - DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
|
| + DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary());
|
| }
|
| }
|
|
|
| TimeDelta length = delay_provider_->GetDelay(
|
| IsBackingOff() ? wait_interval_->length :
|
| delay_provider_->GetInitialDelay(
|
| - old_job.session->status_controller().model_neutral_state()));
|
| + old_job->session()->status_controller().model_neutral_state()));
|
|
|
| SDVLOG(2) << "In handle continuation error with "
|
| - << SyncSessionJob::GetPurposeString(old_job.purpose)
|
| + << SyncSessionJob::GetPurposeString(old_job->purpose())
|
| << " job. The time delta(ms) is "
|
| << length.InMilliseconds();
|
|
|
| // This will reset the had_nudge variable as well.
|
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
|
| length));
|
| - if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| + scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE));
|
| + new_job->set_scheduled_start(TimeTicks::Now() + length);
|
| + if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
|
| SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
|
| // Config params should always get set.
|
| - DCHECK(!old_job.config_params.ready_task.is_null());
|
| - SyncSession* old = old_job.session.get();
|
| - SyncSession* s(new SyncSession(session_context_, this,
|
| - old->source(), old->routing_info(), old->workers()));
|
| - SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
|
| - make_linked_ptr(s), false, old_job.config_params,
|
| - FROM_HERE);
|
| - wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
|
| + DCHECK(!old_job->config_params().ready_task.is_null());
|
| + wait_interval_->pending_configure_job = new_job.get();
|
| } else {
|
| // We are not in configuration mode. So wait_interval's pending job
|
| // should be null.
|
| - DCHECK(wait_interval_->pending_configure_job.get() == NULL);
|
| -
|
| - // TODO(lipalani) - handle clear user data.
|
| - InitOrCoalescePendingJob(old_job);
|
| + DCHECK(wait_interval_->pending_configure_job == NULL);
|
| + DCHECK(!pending_nudge_);
|
| + pending_nudge_ = new_job.get();
|
| }
|
| - RestartWaiting();
|
| +
|
| + RestartWaiting(new_job.Pass());
|
| }
|
|
|
| void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
|
| @@ -1003,51 +1010,56 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
|
| callback.Run();
|
| }
|
|
|
| -void SyncSchedulerImpl::DoCanaryJob() {
|
| +void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| SDVLOG(2) << "Do canary job";
|
| - DoPendingJobIfPossible(true);
|
| +
|
| + // Only set canary privileges here, when we are about to run the job. This
|
| + // avoids confusion in managing canary bits during scheduling, when you
|
| + // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that
|
| + // was scheduled as canary, and send it to an "unscheduled" state.
|
| + to_be_canary->GrantCanaryPrivilege();
|
| +
|
| + if (to_be_canary->purpose() == SyncSessionJob::NUDGE) {
|
| + // TODO(tim): We should be able to remove this...
|
| + scoped_ptr<SyncSession> temp = CreateSyncSession(
|
| + to_be_canary->session()->source()).Pass();
|
| + // The routing info might have been changed since we cached the
|
| + // pending nudge. Update it by coalescing to the latest.
|
| + to_be_canary->mutable_session()->Coalesce(*(temp));
|
| + }
|
| + DoSyncSessionJob(to_be_canary.Pass());
|
| }
|
|
|
| -void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) {
|
| +scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| - SyncSessionJob* job_to_execute = NULL;
|
| + // If we find a scheduled pending_ job, abandon the old one and return a
|
| + // a clone. If unscheduled, just hand over ownership.
|
| + scoped_ptr<SyncSessionJob> candidate;
|
| if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
|
| - && wait_interval_->pending_configure_job.get()) {
|
| + && wait_interval_->pending_configure_job) {
|
| SDVLOG(2) << "Found pending configure job";
|
| - job_to_execute = wait_interval_->pending_configure_job.get();
|
| - } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
|
| + candidate =
|
| + wait_interval_->pending_configure_job->CloneAndAbandon().Pass();
|
| + wait_interval_->pending_configure_job = candidate.get();
|
| + } else if (mode_ == NORMAL_MODE && pending_nudge_) {
|
| SDVLOG(2) << "Found pending nudge job";
|
| -
|
| - scoped_ptr<SyncSession> session(CreateSyncSession(
|
| - pending_nudge_->session->source()));
|
| -
|
| - // Also the routing info might have been changed since we cached the
|
| - // pending nudge. Update it by coalescing to the latest.
|
| - pending_nudge_->session->Coalesce(*(session.get()));
|
| - // The pending nudge would be cleared in the DoSyncSessionJob function.
|
| - job_to_execute = pending_nudge_.get();
|
| - }
|
| -
|
| - if (job_to_execute != NULL) {
|
| - SDVLOG(2) << "Executing pending job";
|
| - SyncSessionJob copy = *job_to_execute;
|
| - copy.is_canary_job = is_canary_job;
|
| - DoSyncSessionJob(copy);
|
| + candidate = pending_nudge_->CloneAndAbandon();
|
| + pending_nudge_ = candidate.get();
|
| + unscheduled_nudge_storage_.reset();
|
| }
|
| + return candidate.Pass();
|
| }
|
|
|
| -SyncSession* SyncSchedulerImpl::CreateSyncSession(
|
| +scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession(
|
| const SyncSourceInfo& source) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| DVLOG(2) << "Creating sync session with routes "
|
| << ModelSafeRoutingInfoToString(session_context_->routing_info());
|
|
|
| SyncSourceInfo info(source);
|
| - SyncSession* session(new SyncSession(session_context_, this, info,
|
| + return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info,
|
| session_context_->routing_info(), session_context_->workers()));
|
| -
|
| - return session;
|
| }
|
|
|
| void SyncSchedulerImpl::PollTimerCallback() {
|
| @@ -1056,22 +1068,27 @@ void SyncSchedulerImpl::PollTimerCallback() {
|
| ModelTypeInvalidationMap invalidation_map =
|
| ModelSafeRoutingInfoToInvalidationMap(r, std::string());
|
| SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
|
| - SyncSession* s = CreateSyncSession(info);
|
| -
|
| - SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(),
|
| - make_linked_ptr(s),
|
| - false,
|
| - ConfigurationParams(),
|
| - FROM_HERE);
|
| -
|
| - ScheduleSyncSessionJob(job);
|
| + scoped_ptr<SyncSession> s(CreateSyncSession(info));
|
| + scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
|
| + TimeTicks::Now(),
|
| + s.Pass(),
|
| + ConfigurationParams(),
|
| + FROM_HERE));
|
| + ScheduleSyncSessionJob(job.Pass());
|
| }
|
|
|
| -void SyncSchedulerImpl::Unthrottle() {
|
| +void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
|
| - SDVLOG(2) << "Unthrottled.";
|
| - DoCanaryJob();
|
| + SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
|
| + << "canary.";
|
| + if (to_be_canary.get())
|
| + DoCanaryJob(to_be_canary.Pass());
|
| +
|
| + // TODO(tim): The way DecideOnJob works today, canary privileges aren't
|
| + // enough to bypass a THROTTLED wait interval, which would suggest we need
|
| + // to reset before DoCanaryJob (though trusting canary in DecideOnJob is
|
| + // probably the "right" thing to do). Bug 154216.
|
| wait_interval_.reset();
|
| }
|
|
|
| @@ -1091,8 +1108,6 @@ void SyncSchedulerImpl::OnSilencedUntil(
|
| DCHECK_EQ(MessageLoop::current(), sync_loop_);
|
| wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
|
| silenced_until - TimeTicks::Now()));
|
| - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
|
| - &SyncSchedulerImpl::Unthrottle);
|
| }
|
|
|
| bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
|
|
|