| Index: chrome/browser/sync/engine/syncer_thread2.cc
|
| diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc
|
| index e94215e1b5f809ac682d54865f0e2085d812261a..b8bd9a95d25adf4936e3b0b40cae3ce633b38380 100644
|
| --- a/chrome/browser/sync/engine/syncer_thread2.cc
|
| +++ b/chrome/browser/sync/engine/syncer_thread2.cc
|
| @@ -23,45 +23,47 @@ using sync_pb::GetUpdatesCallerInfo;
|
|
|
| namespace s3 {
|
|
|
| -struct SyncerThread::WaitInterval {
|
| - enum Mode {
|
| - // A wait interval whose duration has been affected by exponential
|
| - // backoff.
|
| - // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval.
|
| - EXPONENTIAL_BACKOFF,
|
| - // A server-initiated throttled interval. We do not allow any syncing
|
| - // during such an interval.
|
| - THROTTLED,
|
| - };
|
| - Mode mode;
|
| -
|
| - // This bool is set to true if we have observed a nudge during this
|
| - // interval and mode == EXPONENTIAL_BACKOFF.
|
| - bool had_nudge;
|
| - base::TimeDelta length;
|
| - base::OneShotTimer<SyncerThread> timer;
|
| - WaitInterval(Mode mode, base::TimeDelta length);
|
| -};
|
| +SyncerThread::DelayProvider::DelayProvider() {}
|
| +SyncerThread::DelayProvider::~DelayProvider() {}
|
|
|
| -struct SyncerThread::SyncSessionJob {
|
| - SyncSessionJobPurpose purpose;
|
| - base::TimeTicks scheduled_start;
|
| - linked_ptr<sessions::SyncSession> session;
|
| +SyncerThread::WaitInterval::WaitInterval() {}
|
| +SyncerThread::WaitInterval::~WaitInterval() {}
|
|
|
| - // This is the location the nudge came from. used for debugging purpose.
|
| - // In case of multiple nudges getting coalesced this stores the first nudge
|
| - // that came in.
|
| - tracked_objects::Location nudge_location;
|
| -};
|
| +SyncerThread::SyncSessionJob::SyncSessionJob() {}
|
| +SyncerThread::SyncSessionJob::~SyncSessionJob() {}
|
|
|
| -SyncerThread::DelayProvider::DelayProvider() {}
|
| -SyncerThread::DelayProvider::~DelayProvider() {}
|
| +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
|
| + base::TimeTicks start,
|
| + linked_ptr<sessions::SyncSession> session, bool is_canary_job,
|
| + const tracked_objects::Location& nudge_location) : purpose(purpose),
|
| + scheduled_start(start),
|
| + session(session),
|
| + is_canary_job(is_canary_job),
|
| + nudge_location(nudge_location) {
|
| +}
|
|
|
| TimeDelta SyncerThread::DelayProvider::GetDelay(
|
| const base::TimeDelta& last_delay) {
|
| return SyncerThread::GetRecommendedDelay(last_delay);
|
| }
|
|
|
| +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
|
| + NudgeSource source) {
|
| + switch (source) {
|
| + case NUDGE_SOURCE_NOTIFICATION:
|
| + return GetUpdatesCallerInfo::NOTIFICATION;
|
| + case NUDGE_SOURCE_LOCAL:
|
| + return GetUpdatesCallerInfo::LOCAL;
|
| + case NUDGE_SOURCE_CONTINUATION:
|
| + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
|
| + case NUDGE_SOURCE_UNKNOWN:
|
| + return GetUpdatesCallerInfo::UNKNOWN;
|
| + default:
|
| + NOTREACHED();
|
| + return GetUpdatesCallerInfo::UNKNOWN;
|
| + }
|
| +}
|
| +
|
| SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
|
| : mode(mode), had_nudge(false), length(length) { }
|
|
|
| @@ -85,6 +87,9 @@ SyncerThread::~SyncerThread() {
|
|
|
| void SyncerThread::CheckServerConnectionManagerStatus(
|
| HttpResponse::ServerConnectionCode code) {
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
|
| + << "Old mode: " << server_connection_ok_ << " Code: " << code;
|
| // Note, be careful when adding cases here because if the SyncerThread
|
| // thinks there is no valid connection as determined by this method, it
|
| // will drop out of *all* forward progress sync loops (it won't poll and it
|
| @@ -94,13 +99,22 @@ void SyncerThread::CheckServerConnectionManagerStatus(
|
| if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
|
| HttpResponse::SYNC_AUTH_ERROR == code) {
|
| server_connection_ok_ = false;
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
|
| + << " new mode:" << server_connection_ok_;
|
| } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
|
| server_connection_ok_ = true;
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
|
| + << " new mode:" << server_connection_ok_;
|
| + DoCanaryJob();
|
| }
|
| }
|
|
|
| void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread "
|
| + << MessageLoop::current()->thread_name();
|
| if (!thread_.IsRunning()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode "
|
| + << mode;
|
| if (!thread_.Start()) {
|
| NOTREACHED() << "Unable to start SyncerThread.";
|
| return;
|
| @@ -110,6 +124,9 @@ void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
|
| this, &SyncerThread::SendInitialSnapshot));
|
| }
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = "
|
| + << mode;
|
| +
|
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback)));
|
| }
|
| @@ -133,6 +150,8 @@ void SyncerThread::WatchConnectionManager() {
|
|
|
| void SyncerThread::StartImpl(Mode mode,
|
| linked_ptr<ModeChangeCallback> callback) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
|
| + << mode;
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| DCHECK(!session_context_->account_name().empty());
|
| DCHECK(syncer_.get());
|
| @@ -140,75 +159,135 @@ void SyncerThread::StartImpl(Mode mode,
|
| AdjustPolling(NULL); // Will kick start poll timer if needed.
|
| if (callback.get())
|
| callback->Run();
|
| +
|
| + // We just changed our mode. See if there are any pending jobs that we could
|
| + // execute in the new mode.
|
| + DoPendingJobIfPossible(false);
|
| }
|
|
|
| -bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose,
|
| - const TimeTicks& scheduled_start) {
|
| - DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
|
| + const SyncSessionJob& job) {
|
|
|
| - // Check wait interval.
|
| - if (wait_interval_.get()) {
|
| - // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit
|
| - // when throttled).
|
| - if (wait_interval_->mode == WaitInterval::THROTTLED)
|
| - return false;
|
| + DCHECK(wait_interval_.get());
|
| + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
|
|
|
| - DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
|
| - if ((purpose != NUDGE) || wait_interval_->had_nudge)
|
| - return false;
|
| - }
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : "
|
| + << wait_interval_->mode << "Wait interval had nudge : "
|
| + << wait_interval_->had_nudge << "is canary job : "
|
| + << job.is_canary_job;
|
|
|
| - // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that
|
| - // were intended for a normal sync if we are in configuration mode, and vice
|
| - // versa.
|
| - switch (mode_) {
|
| - case CONFIGURATION_MODE:
|
| - if (purpose != CONFIGURATION)
|
| - return false;
|
| - break;
|
| - case NORMAL_MODE:
|
| - if (purpose == CONFIGURATION)
|
| - return false;
|
| - break;
|
| - default:
|
| - NOTREACHED() << "Unknown SyncerThread Mode: " << mode_;
|
| - return false;
|
| + if (job.purpose == SyncSessionJob::POLL)
|
| + return DROP;
|
| +
|
| + DCHECK(job.purpose == SyncSessionJob::NUDGE ||
|
| + job.purpose == SyncSessionJob::CONFIGURATION);
|
| + if (wait_interval_->mode == WaitInterval::THROTTLED)
|
| + return SAVE;
|
| +
|
| + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
|
| + if (job.purpose == SyncSessionJob::NUDGE) {
|
| + if (mode_ == CONFIGURATION_MODE)
|
| + return SAVE;
|
| +
|
| + // If we already had one nudge then just drop this nudge. We will retry
|
| + // later when the timer runs out.
|
| + return wait_interval_->had_nudge ? DROP : CONTINUE;
|
| }
|
| + // This is a config job.
|
| + return job.is_canary_job ? CONTINUE : SAVE;
|
| +}
|
| +
|
| +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
|
| + const SyncSessionJob& job) {
|
| + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
|
| + return CONTINUE;
|
|
|
| - // Continuation NUDGE tasks have priority over POLLs because they are the
|
| - // only tasks that trigger exponential backoff, so this prevents them from
|
| - // being starved from running (e.g. due to a very, very low poll interval,
|
| - // such as 0ms). It's rare that this would ever matter in practice.
|
| - if (purpose == POLL && (pending_nudge_.get() &&
|
| - pending_nudge_->session->source().updates_source ==
|
| - GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) {
|
| - return false;
|
| + if (wait_interval_.get())
|
| + return DecideWhileInWaitInterval(job);
|
| +
|
| + if (mode_ == CONFIGURATION_MODE) {
|
| + if (job.purpose == SyncSessionJob::NUDGE)
|
| + return SAVE;
|
| + else if (job.purpose == SyncSessionJob::CONFIGURATION)
|
| + return CONTINUE;
|
| + else
|
| + return DROP;
|
| }
|
|
|
| - // Freshness condition.
|
| - if (purpose == NUDGE &&
|
| - (scheduled_start < last_sync_session_end_time_)) {
|
| - return false;
|
| + // We are in normal mode.
|
| + DCHECK_EQ(mode_, NORMAL_MODE);
|
| + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
|
| +
|
| + // Freshness condition
|
| + if (job.scheduled_start < last_sync_session_end_time_) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Dropping job because of freshness";
|
| + return DROP;
|
| }
|
|
|
| - return server_connection_ok_;
|
| + if (server_connection_ok_)
|
| + return CONTINUE;
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Bad server connection. Using that to decide on job.";
|
| + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
|
| }
|
|
|
| -GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
|
| - NudgeSource source) {
|
| - switch (source) {
|
| - case NUDGE_SOURCE_NOTIFICATION:
|
| - return GetUpdatesCallerInfo::NOTIFICATION;
|
| - case NUDGE_SOURCE_LOCAL:
|
| - return GetUpdatesCallerInfo::LOCAL;
|
| - case NUDGE_SOURCE_CONTINUATION:
|
| - return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
|
| - case NUDGE_SOURCE_UNKNOWN:
|
| - return GetUpdatesCallerInfo::UNKNOWN;
|
| - default:
|
| - NOTREACHED();
|
| - return GetUpdatesCallerInfo::UNKNOWN;
|
| +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
|
| + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
|
| + if (pending_nudge_.get() == NULL) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Creating a pending nudge job";
|
| + SyncSession* s = job.session.get();
|
| + scoped_ptr<SyncSession> session(new SyncSession(s->context(),
|
| + s->delegate(), s->source(), s->routing_info(), s->workers()));
|
| +
|
| + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
|
| + make_linked_ptr(session.release()), false, job.nudge_location);
|
| + pending_nudge_.reset(new SyncSessionJob(new_job));
|
| +
|
| + return;
|
| }
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
|
| + pending_nudge_->session->Coalesce(*(job.session.get()));
|
| + pending_nudge_->scheduled_start = job.scheduled_start;
|
| +
|
| + // Unfortunately the nudge location cannot be modified. So it stores the
|
| + // location of the first caller.
|
| +}
|
| +
|
| +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
|
| + JobProcessDecision decision = DecideOnJob(job);
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: "
|
| + << decision << " Job purpose " << job.purpose << "mode " << mode_;
|
| + if (decision != SAVE)
|
| + return decision == CONTINUE;
|
| +
|
| + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
|
| + SyncSessionJob::CONFIGURATION);
|
| +
|
| + SaveJob(job);
|
| + return false;
|
| +}
|
| +
|
| +void SyncerThread::SaveJob(const SyncSessionJob& job) {
|
| + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
|
| + if (job.purpose == SyncSessionJob::NUDGE) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job";
|
| + InitOrCoalescePendingJob(job);
|
| + } else if (job.purpose == SyncSessionJob::CONFIGURATION){
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job";
|
| + DCHECK(wait_interval_.get());
|
| + DCHECK(mode_ == CONFIGURATION_MODE);
|
| +
|
| + SyncSession* old = job.session.get();
|
| + SyncSession* s(new SyncSession(session_context_.get(), this,
|
| + old->source(), old->routing_info(), old->workers()));
|
| + SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
|
| + make_linked_ptr(s), false, job.nudge_location);
|
| + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
|
| + } // drop the rest.
|
| }
|
|
|
| // Functor for std::find_if to search by ModelSafeGroup.
|
| @@ -237,11 +316,14 @@ void SyncerThread::ScheduleNudge(const TimeDelta& delay,
|
| return;
|
| }
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled";
|
| +
|
| ModelTypePayloadMap types_with_payloads =
|
| syncable::ModelTypePayloadMapFromBitSet(types, std::string());
|
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| - this, &SyncerThread::ScheduleNudgeImpl, delay, source,
|
| - types_with_payloads, nudge_location));
|
| + this, &SyncerThread::ScheduleNudgeImpl, delay,
|
| + GetUpdatesFromNudgeSource(source), types_with_payloads, false,
|
| + nudge_location));
|
| }
|
|
|
| void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
|
| @@ -252,9 +334,12 @@ void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
|
| return;
|
| }
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";
|
| +
|
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| - this, &SyncerThread::ScheduleNudgeImpl, delay, source,
|
| - types_with_payloads, nudge_location));
|
| + this, &SyncerThread::ScheduleNudgeImpl, delay,
|
| + GetUpdatesFromNudgeSource(source), types_with_payloads, false,
|
| + nudge_location));
|
| }
|
|
|
| void SyncerThread::ScheduleClearUserDataImpl() {
|
| @@ -262,51 +347,58 @@ void SyncerThread::ScheduleClearUserDataImpl() {
|
| SyncSession* session = new SyncSession(session_context_.get(), this,
|
| SyncSourceInfo(), ModelSafeRoutingInfo(),
|
| std::vector<ModelSafeWorker*>());
|
| - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session,
|
| - FROM_HERE);
|
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
|
| + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
|
| }
|
|
|
| void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
|
| - NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
|
| - const tracked_objects::Location& nudge_location) {
|
| + GetUpdatesCallerInfo::GetUpdatesSource source,
|
| + const ModelTypePayloadMap& types_with_payloads,
|
| + bool is_canary_job, const tracked_objects::Location& nudge_location) {
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| - TimeTicks rough_start = TimeTicks::Now() + delay;
|
| - if (!ShouldRunJob(NUDGE, rough_start)) {
|
| - LOG(WARNING) << "Dropping nudge at scheduling time, source = "
|
| - << source;
|
| - return;
|
| - }
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
|
| // Note we currently nudge for all types regardless of the ones incurring
|
| // the nudge. Doing different would throw off some syncer commands like
|
| // CleanupDisabledTypes. We may want to change this in the future.
|
| - ModelSafeRoutingInfo routes;
|
| - std::vector<ModelSafeWorker*> workers;
|
| - session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
|
| - session_context_->registrar()->GetWorkers(&workers);
|
| - SyncSourceInfo info(GetUpdatesFromNudgeSource(source),
|
| - types_with_payloads);
|
| + SyncSourceInfo info(source, types_with_payloads);
|
|
|
| - scoped_ptr<SyncSession> session(new SyncSession(
|
| - session_context_.get(), this, info, routes, workers));
|
| + SyncSession* session(CreateSyncSession(info));
|
| + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
|
| + make_linked_ptr(session), is_canary_job,
|
| + nudge_location);
|
| +
|
| + session = NULL;
|
| + if (!ShouldRunJob(job))
|
| + return;
|
|
|
| if (pending_nudge_.get()) {
|
| - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1))
|
| + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
|
| + << "we are in backoff";
|
| return;
|
| + }
|
|
|
| - pending_nudge_->session->Coalesce(*session.get());
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
|
| + pending_nudge_->session->Coalesce(*(job.session.get()));
|
|
|
| if (!IsBackingOff()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
|
| + << " we are not in backoff and the job was coalesced";
|
| return;
|
| } else {
|
| - // Re-schedule the current pending nudge.
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Rescheduling pending nudge";
|
| SyncSession* s = pending_nudge_->session.get();
|
| - session.reset(new SyncSession(s->context(), s->delegate(), s->source(),
|
| - s->routing_info(), s->workers()));
|
| + job.session.reset(new SyncSession(s->context(), s->delegate(),
|
| + s->source(), s->routing_info(), s->workers()));
|
| pending_nudge_.reset();
|
| }
|
| }
|
| - ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location);
|
| +
|
| + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
|
| + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
|
| + nudge_location);
|
| }
|
|
|
| // Helper to extract the routing info and workers corresponding to types in
|
| @@ -348,58 +440,69 @@ void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
|
| return;
|
| }
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config";
|
| ModelSafeRoutingInfo routes;
|
| std::vector<ModelSafeWorker*> workers;
|
| GetModelSafeParamsForTypes(types, session_context_->registrar(),
|
| &routes, &workers);
|
|
|
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| - this, &SyncerThread::ScheduleConfigImpl, routes, workers));
|
| + this, &SyncerThread::ScheduleConfigImpl, routes, workers,
|
| + GetUpdatesCallerInfo::FIRST_UPDATE));
|
| }
|
|
|
| void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
|
| - const std::vector<ModelSafeWorker*>& workers) {
|
| + const std::vector<ModelSafeWorker*>& workers,
|
| + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
|
| // TODO(tim): config-specific GetUpdatesCallerInfo value?
|
| SyncSession* session = new SyncSession(session_context_.get(), this,
|
| - SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE,
|
| + SyncSourceInfo(source,
|
| syncable::ModelTypePayloadMapFromRoutingInfo(
|
| routing_info, std::string())),
|
| routing_info, workers);
|
| - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session,
|
| - FROM_HERE);
|
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
|
| + SyncSessionJob::CONFIGURATION, session, FROM_HERE);
|
| }
|
|
|
| void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
|
| - SyncSessionJobPurpose purpose, sessions::SyncSession* session,
|
| + SyncSessionJob::SyncSessionJobPurpose purpose,
|
| + sessions::SyncSession* session,
|
| const tracked_objects::Location& nudge_location) {
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
|
|
| - SyncSessionJob job = {purpose, TimeTicks::Now() + delay,
|
| - make_linked_ptr(session), nudge_location};
|
| - if (purpose == NUDGE) {
|
| + SyncSessionJob job(purpose, TimeTicks::Now() + delay,
|
| + make_linked_ptr(session), false, nudge_location);
|
| + if (purpose == SyncSessionJob::NUDGE) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
|
| + << " ScheduleSyncSessionJob";
|
| DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
|
| pending_nudge_.reset(new SyncSessionJob(job));
|
| }
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Posting job to execute in DoSyncSessionJob. Job purpose "
|
| + << job.purpose;
|
| MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
|
| &SyncerThread::DoSyncSessionJob, job),
|
| delay.InMilliseconds());
|
| }
|
|
|
| -void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose,
|
| +void SyncerThread::SetSyncerStepsForPurpose(
|
| + SyncSessionJob::SyncSessionJobPurpose purpose,
|
| SyncerStep* start, SyncerStep* end) {
|
| *end = SYNCER_END;
|
| switch (purpose) {
|
| - case CONFIGURATION:
|
| + case SyncSessionJob::CONFIGURATION:
|
| *start = DOWNLOAD_UPDATES;
|
| *end = APPLY_UPDATES;
|
| return;
|
| - case CLEAR_USER_DATA:
|
| + case SyncSessionJob::CLEAR_USER_DATA:
|
| *start = CLEAR_PRIVATE_DATA;
|
| return;
|
| - case NUDGE:
|
| - case POLL:
|
| + case SyncSessionJob::NUDGE:
|
| + case SyncSessionJob::POLL:
|
| *start = SYNCER_BEGIN;
|
| return;
|
| default:
|
| @@ -409,38 +512,39 @@ void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose,
|
|
|
| void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| - if (!ShouldRunJob(job.purpose, job.scheduled_start)) {
|
| - LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
|
| - << job.session->source().updates_source;
|
| + if (!ShouldRunJob(job))
|
| return;
|
| - }
|
|
|
| - if (job.purpose == NUDGE) {
|
| + if (job.purpose == SyncSessionJob::NUDGE) {
|
| DCHECK(pending_nudge_.get());
|
| if (pending_nudge_->session != job.session)
|
| return; // Another nudge must have been scheduled in in the meantime.
|
| pending_nudge_.reset();
|
| }
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
|
| + << job.purpose;
|
|
|
| SyncerStep begin(SYNCER_BEGIN);
|
| SyncerStep end(SYNCER_END);
|
| SetSyncerStepsForPurpose(job.purpose, &begin, &end);
|
|
|
| bool has_more_to_sync = true;
|
| - while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) {
|
| - VLOG(1) << "SyncerThread: Calling SyncShare.";
|
| + while (ShouldRunJob(job) && has_more_to_sync) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " SyncerThread: Calling SyncShare.";
|
| // Synchronously perform the sync session from this thread.
|
| syncer_->SyncShare(job.session.get(), begin, end);
|
| has_more_to_sync = job.session->HasMoreToSync();
|
| if (has_more_to_sync)
|
| job.session->ResetTransientState();
|
| }
|
| - VLOG(1) << "SyncerThread: Done SyncShare looping.";
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " SyncerThread: Done SyncShare looping.";
|
| FinishSyncSessionJob(job);
|
| }
|
|
|
| void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
|
| - if (old_job.purpose == CONFIGURATION) {
|
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| // Whatever types were part of a configuration task will have had updates
|
| // downloaded. For that reason, we make sure they get recorded in the
|
| // event that they get disabled at a later time.
|
| @@ -473,10 +577,15 @@ void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
|
| }
|
| last_sync_session_end_time_ = now;
|
| UpdateCarryoverSessionState(job);
|
| - if (IsSyncingCurrentlySilenced())
|
| + if (IsSyncingCurrentlySilenced()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " We are currently throttled. So not scheduling the next sync.";
|
| + SaveJob(job);
|
| return; // Nothing to do.
|
| + }
|
|
|
| - VLOG(1) << "Updating the next polling time after SyncMain";
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Updating the next polling time after SyncMain";
|
| ScheduleNextSync(job);
|
| }
|
|
|
| @@ -492,35 +601,52 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
|
| const bool work_to_do =
|
| old_job.session->status_controller()->num_server_changes_remaining() > 0
|
| || old_job.session->status_controller()->unsynced_handles().size() > 0;
|
| - VLOG(1) << "syncer has work to do: " << work_to_do;
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: "
|
| + << work_to_do;
|
|
|
| AdjustPolling(&old_job);
|
|
|
| // TODO(tim): Old impl had special code if notifications disabled. Needed?
|
| if (!work_to_do) {
|
| // Success implies backoff relief. Note that if this was a "one-off" job
|
| - // (i.e. purpose == CLEAR_USER_DATA), if there was work_to_do before it
|
| - // ran this wont have changed, as jobs like this don't run a full sync
|
| - // cycle. So we don't need special code here.
|
| + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
|
| + // work_to_do before it ran this wont have changed, as jobs like this don't
|
| + // run a full sync cycle. So we don't need special code here.
|
| wait_interval_.reset();
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Job suceeded so not scheduling more jobs";
|
| return;
|
| }
|
|
|
| if (old_job.session->source().updates_source ==
|
| GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Job failed with source continuation";
|
| // We don't seem to have made forward progress. Start or extend backoff.
|
| HandleConsecutiveContinuationError(old_job);
|
| } else if (IsBackingOff()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " A nudge during backoff failed";
|
| // We weren't continuing but we're in backoff; must have been a nudge.
|
| - DCHECK_EQ(NUDGE, old_job.purpose);
|
| + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
|
| DCHECK(!wait_interval_->had_nudge);
|
| wait_interval_->had_nudge = true;
|
| wait_interval_->timer.Reset();
|
| } else {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Failed. Schedule a job with continuation as source";
|
| // We weren't continuing and we aren't in backoff. Schedule a normal
|
| // continuation.
|
| - ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION,
|
| - old_job.session->source().types, FROM_HERE);
|
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| + ScheduleConfigImpl(old_job.session->routing_info(),
|
| + old_job.session->workers(),
|
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
|
| + } else {
|
| + // For all other purposes(nudge and poll) we schedule a retry nudge.
|
| + ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
|
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
|
| + old_job.session->source().types, false, FROM_HERE);
|
| + }
|
| }
|
| }
|
|
|
| @@ -534,7 +660,7 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
|
| bool rate_changed = !poll_timer_.IsRunning() ||
|
| poll != poll_timer_.GetCurrentDelay();
|
|
|
| - if (old_job && old_job->purpose != POLL && !rate_changed)
|
| + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
|
| poll_timer_.Reset();
|
|
|
| if (!rate_changed)
|
| @@ -548,17 +674,38 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
|
| void SyncerThread::HandleConsecutiveContinuationError(
|
| const SyncSessionJob& old_job) {
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| - DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning());
|
| + // This if conditions should be compiled out in retail builds.
|
| + if (IsBackingOff()) {
|
| + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
|
| + }
|
| SyncSession* old = old_job.session.get();
|
| SyncSession* s(new SyncSession(session_context_.get(), this,
|
| old->source(), old->routing_info(), old->workers()));
|
| TimeDelta length = delay_provider_->GetDelay(
|
| IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " In handle continuation error. Old job purpose is "
|
| + << old_job.purpose;
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " In Handle continuation error. The time delta(ms) is: "
|
| + << length.InMilliseconds();
|
| +
|
| + // This will reset the had_nudge variable as well.
|
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
|
| length));
|
| - SyncSessionJob job = {NUDGE, TimeTicks::Now() + length,
|
| - make_linked_ptr(s), FROM_HERE};
|
| - pending_nudge_.reset(new SyncSessionJob(job));
|
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
|
| + make_linked_ptr(s), false, FROM_HERE);
|
| + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
|
| + } else {
|
| + // We are not in configuration mode. So wait_interval's pending job
|
| + // should be null.
|
| + DCHECK(wait_interval_->pending_configure_job.get() == NULL);
|
| +
|
| + // TODO(lipalani) - handle clear user data.
|
| + InitOrCoalescePendingJob(old_job);
|
| + }
|
| wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
|
| }
|
|
|
| @@ -587,33 +734,76 @@ TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
|
| }
|
|
|
| void SyncerThread::Stop() {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " stop called";
|
| syncer_->RequestEarlyExit(); // Safe to call from any thread.
|
| session_context_->connection_manager()->RemoveListener(this);
|
| thread_.Stop();
|
| }
|
|
|
| void SyncerThread::DoCanaryJob() {
|
| - DCHECK(pending_nudge_.get());
|
| - wait_interval_->had_nudge = false;
|
| - SyncSessionJob copy = *pending_nudge_;
|
| - DoSyncSessionJob(copy);
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job";
|
| + DoPendingJobIfPossible(true);
|
| +}
|
| +
|
| +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
|
| + SyncSessionJob* job_to_execute = NULL;
|
| + if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
|
| + && wait_interval_->pending_configure_job.get()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job";
|
| + job_to_execute = wait_interval_->pending_configure_job.get();
|
| + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job";
|
| + // Pending jobs mostly have time from the past. Reset it so this job
|
| + // will get executed.
|
| + if (pending_nudge_->scheduled_start < TimeTicks::Now())
|
| + pending_nudge_->scheduled_start = TimeTicks::Now();
|
| +
|
| + scoped_ptr<SyncSession> session(CreateSyncSession(
|
| + pending_nudge_->session->source()));
|
| +
|
| + // Also the routing info might have been changed since we cached the
|
| + // pending nudge. Update it by coalescing to the latest.
|
| + pending_nudge_->session->Coalesce(*(session.get()));
|
| + // The pending nudge would be cleared in the DoSyncSessionJob function.
|
| + job_to_execute = pending_nudge_.get();
|
| + }
|
| +
|
| + if (job_to_execute != NULL) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job";
|
| + SyncSessionJob copy = *job_to_execute;
|
| + copy.is_canary_job = is_canary_job;
|
| + DoSyncSessionJob(copy);
|
| + }
|
| +}
|
| +
|
| +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
|
| + ModelSafeRoutingInfo routes;
|
| + std::vector<ModelSafeWorker*> workers;
|
| + session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
|
| + session_context_->registrar()->GetWorkers(&workers);
|
| + SyncSourceInfo info(source);
|
| +
|
| + SyncSession* session(new SyncSession(session_context_.get(), this, info,
|
| + routes, workers));
|
| +
|
| + return session;
|
| }
|
|
|
| void SyncerThread::PollTimerCallback() {
|
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| ModelSafeRoutingInfo r;
|
| - std::vector<ModelSafeWorker*> w;
|
| - session_context_->registrar()->GetModelSafeRoutingInfo(&r);
|
| - session_context_->registrar()->GetWorkers(&w);
|
| ModelTypePayloadMap types_with_payloads =
|
| syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
|
| SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
|
| - SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w);
|
| - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE);
|
| + SyncSession* s = CreateSyncSession(info);
|
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
|
| + FROM_HERE);
|
| }
|
|
|
| void SyncerThread::Unthrottle() {
|
| DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled..";
|
| + DoCanaryJob();
|
| wait_interval_.reset();
|
| }
|
|
|
| @@ -652,6 +842,8 @@ void SyncerThread::OnReceivedLongPollIntervalUpdate(
|
| }
|
|
|
| void SyncerThread::OnShouldStopSyncingPermanently() {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " OnShouldStopSyncingPermanently";
|
| syncer_->RequestEarlyExit(); // Thread-safe.
|
| Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
|
| }
|
|
|