Chromium Code Reviews| Index: chrome/browser/sync/engine/syncer_thread2.cc |
| diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc |
| index e94215e1b5f809ac682d54865f0e2085d812261a..7123a6cb535d15184f626f337847a9c009f444ba 100644 |
| --- a/chrome/browser/sync/engine/syncer_thread2.cc |
| +++ b/chrome/browser/sync/engine/syncer_thread2.cc |
| @@ -23,45 +23,47 @@ using sync_pb::GetUpdatesCallerInfo; |
| namespace s3 { |
| -struct SyncerThread::WaitInterval { |
| - enum Mode { |
| - // A wait interval whose duration has been affected by exponential |
| - // backoff. |
| - // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. |
| - EXPONENTIAL_BACKOFF, |
| - // A server-initiated throttled interval. We do not allow any syncing |
| - // during such an interval. |
| - THROTTLED, |
| - }; |
| - Mode mode; |
| - |
| - // This bool is set to true if we have observed a nudge during this |
| - // interval and mode == EXPONENTIAL_BACKOFF. |
| - bool had_nudge; |
| - base::TimeDelta length; |
| - base::OneShotTimer<SyncerThread> timer; |
| - WaitInterval(Mode mode, base::TimeDelta length); |
| -}; |
| +SyncerThread::DelayProvider::DelayProvider() {} |
| +SyncerThread::DelayProvider::~DelayProvider() {} |
| -struct SyncerThread::SyncSessionJob { |
| - SyncSessionJobPurpose purpose; |
| - base::TimeTicks scheduled_start; |
| - linked_ptr<sessions::SyncSession> session; |
| +SyncerThread::WaitInterval::WaitInterval() {} |
| +SyncerThread::WaitInterval::~WaitInterval() {} |
| - // This is the location the nudge came from. used for debugging purpose. |
| - // In case of multiple nudges getting coalesced this stores the first nudge |
| - // that came in. |
| - tracked_objects::Location nudge_location; |
| -}; |
| +SyncerThread::SyncSessionJob::SyncSessionJob() {} |
| +SyncerThread::SyncSessionJob::~SyncSessionJob() {} |
| -SyncerThread::DelayProvider::DelayProvider() {} |
| -SyncerThread::DelayProvider::~DelayProvider() {} |
| +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
| + base::TimeTicks start, |
| + linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
| + const tracked_objects::Location& nudge_location) : purpose(purpose), |
| + scheduled_start(start), |
| + session(session), |
| + is_canary_job(is_canary_job), |
| + nudge_location(nudge_location) { |
| +} |
| TimeDelta SyncerThread::DelayProvider::GetDelay( |
| const base::TimeDelta& last_delay) { |
| return SyncerThread::GetRecommendedDelay(last_delay); |
| } |
| +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
| + NudgeSource source) { |
| + switch (source) { |
| + case NUDGE_SOURCE_NOTIFICATION: |
| + return GetUpdatesCallerInfo::NOTIFICATION; |
| + case NUDGE_SOURCE_LOCAL: |
| + return GetUpdatesCallerInfo::LOCAL; |
| + case NUDGE_SOURCE_CONTINUATION: |
| + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
| + case NUDGE_SOURCE_UNKNOWN: |
| + return GetUpdatesCallerInfo::UNKNOWN; |
| + default: |
| + NOTREACHED(); |
| + return GetUpdatesCallerInfo::UNKNOWN; |
| + } |
| +} |
| + |
| SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
| : mode(mode), had_nudge(false), length(length) { } |
| @@ -73,7 +75,7 @@ SyncerThread::SyncerThread(sessions::SyncSessionContext* context, |
| syncer_long_poll_interval_seconds_( |
| TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
| mode_(NORMAL_MODE), |
| - server_connection_ok_(false), |
| + server_connection_ok_(true), |
|
tim (not reviewing)
2011/04/15 00:21:43
this can't be right...
lipalani1
2011/04/15 01:24:39
ha ha!! Not sure if it is incorrect.
However I hav
|
| delay_provider_(new DelayProvider()), |
| syncer_(syncer), |
| session_context_(context) { |
| @@ -85,6 +87,9 @@ SyncerThread::~SyncerThread() { |
| void SyncerThread::CheckServerConnectionManagerStatus( |
| HttpResponse::ServerConnectionCode code) { |
| + |
| + VLOG(2) << this << " Server connection changed. Old mode:" |
| + << server_connection_ok_ << " Code: " << code; |
| // Note, be careful when adding cases here because if the SyncerThread |
| // thinks there is no valid connection as determined by this method, it |
| // will drop out of *all* forward progress sync loops (it won't poll and it |
| @@ -94,13 +99,21 @@ void SyncerThread::CheckServerConnectionManagerStatus( |
| if (HttpResponse::CONNECTION_UNAVAILABLE == code || |
| HttpResponse::SYNC_AUTH_ERROR == code) { |
| server_connection_ok_ = false; |
| + VLOG(2) << this << " Server connection changed. new mode:" |
| + << server_connection_ok_; |
| } else if (HttpResponse::SERVER_CONNECTION_OK == code) { |
| server_connection_ok_ = true; |
| + VLOG(2) << this << " Server connection changed. new mode:" |
| + << server_connection_ok_; |
| + DoCanaryJob(); |
| } |
| } |
| void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { |
| + VLOG(2) << this << " Start called from thread " << |
| + MessageLoop::current()->thread_name(); |
| if (!thread_.IsRunning()) { |
| + VLOG(2) << this << " Starting thread with mode " << mode; |
| if (!thread_.Start()) { |
| NOTREACHED() << "Unable to start SyncerThread."; |
| return; |
| @@ -110,6 +123,8 @@ void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { |
| this, &SyncerThread::SendInitialSnapshot)); |
| } |
| + VLOG(2) << this << " Entering start with mode = " << mode; |
| + |
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); |
| } |
| @@ -133,6 +148,7 @@ void SyncerThread::WatchConnectionManager() { |
| void SyncerThread::StartImpl(Mode mode, |
| linked_ptr<ModeChangeCallback> callback) { |
| + VLOG(2) << this << " Doing StartImpl with mode " << mode; |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| DCHECK(!session_context_->account_name().empty()); |
| DCHECK(syncer_.get()); |
| @@ -140,75 +156,131 @@ void SyncerThread::StartImpl(Mode mode, |
| AdjustPolling(NULL); // Will kick start poll timer if needed. |
| if (callback.get()) |
| callback->Run(); |
| + |
| + // We just changed our mode. See if there are any pending jobs that we could |
| + // execute in the new mode. |
| + DoPendingJobIfPossible(false); |
| } |
| -bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, |
| - const TimeTicks& scheduled_start) { |
| - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( |
| + const SyncSessionJob& job) { |
| - // Check wait interval. |
| - if (wait_interval_.get()) { |
| - // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit |
| - // when throttled). |
| - if (wait_interval_->mode == WaitInterval::THROTTLED) |
| - return false; |
| + DCHECK(wait_interval_.get()); |
| + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); |
| - DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| - if ((purpose != NUDGE) || wait_interval_->had_nudge) |
| - return false; |
| - } |
| + VLOG(2) << this << " Wait interval mode : " << wait_interval_->mode |
| + << "Wait interval had nudge : " << wait_interval_->had_nudge |
| + << "is canary job : " << job.is_canary_job; |
| - // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that |
| - // were intended for a normal sync if we are in configuration mode, and vice |
| - // versa. |
| - switch (mode_) { |
| - case CONFIGURATION_MODE: |
| - if (purpose != CONFIGURATION) |
| - return false; |
| - break; |
| - case NORMAL_MODE: |
| - if (purpose == CONFIGURATION) |
| - return false; |
| - break; |
| - default: |
| - NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; |
| - return false; |
| + if (job.purpose == SyncSessionJob::POLL) |
| + return DROP; |
| + |
| + DCHECK(job.purpose == SyncSessionJob::NUDGE || |
| + job.purpose == SyncSessionJob::CONFIGURATION); |
| + if (wait_interval_->mode == WaitInterval::THROTTLED) |
| + return SAVE; |
| + |
| + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| + if (job.purpose == SyncSessionJob::NUDGE) { |
| + if (mode_ == CONFIGURATION_MODE) |
| + return SAVE; |
| + |
| + // If we already had one nudge then just drop this nudge. We will retry |
| + // later when the timer runs out. |
| + return wait_interval_->had_nudge ? DROP : CONTINUE; |
| } |
| + // This is a config job. |
| + return job.is_canary_job ? CONTINUE : SAVE; |
| +} |
| + |
| +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( |
| + const SyncSessionJob& job) { |
| + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) |
| + return CONTINUE; |
| - // Continuation NUDGE tasks have priority over POLLs because they are the |
| - // only tasks that trigger exponential backoff, so this prevents them from |
| - // being starved from running (e.g. due to a very, very low poll interval, |
| - // such as 0ms). It's rare that this would ever matter in practice. |
| - if (purpose == POLL && (pending_nudge_.get() && |
| - pending_nudge_->session->source().updates_source == |
| - GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { |
| - return false; |
| + if (wait_interval_.get()) |
| + return DecideWhileInWaitInterval(job); |
| + |
| + if (mode_ == CONFIGURATION_MODE) { |
| + if (job.purpose == SyncSessionJob::NUDGE) |
| + return SAVE; |
| + else if (job.purpose == SyncSessionJob::CONFIGURATION) |
| + return CONTINUE; |
| + else |
| + return DROP; |
| } |
| - // Freshness condition. |
| - if (purpose == NUDGE && |
| - (scheduled_start < last_sync_session_end_time_)) { |
| - return false; |
| + // We are in normal mode. |
| + DCHECK_EQ(mode_, NORMAL_MODE); |
| + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); |
| + |
| + // Freshness condition |
| + if (job.scheduled_start < last_sync_session_end_time_) { |
| + VLOG(2) << this << " Dropping job because of freshness"; |
| + return DROP; |
| } |
| - return server_connection_ok_; |
| + if (server_connection_ok_) |
| + return CONTINUE; |
| + |
| + VLOG(2) << this << " Bad server connection. Using that to decide on job."; |
| + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; |
| } |
| -GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
| - NudgeSource source) { |
| - switch (source) { |
| - case NUDGE_SOURCE_NOTIFICATION: |
| - return GetUpdatesCallerInfo::NOTIFICATION; |
| - case NUDGE_SOURCE_LOCAL: |
| - return GetUpdatesCallerInfo::LOCAL; |
| - case NUDGE_SOURCE_CONTINUATION: |
| - return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
| - case NUDGE_SOURCE_UNKNOWN: |
| - return GetUpdatesCallerInfo::UNKNOWN; |
| - default: |
| - NOTREACHED(); |
| - return GetUpdatesCallerInfo::UNKNOWN; |
| +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
| + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
| + if (pending_nudge_.get() == NULL) { |
| + VLOG(2) << this << " Creating a pending nudge job"; |
| + SyncSession* s = job.session.get(); |
| + scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
| + s->delegate(), s->source(), s->routing_info(), s->workers())); |
| + |
| + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
| + make_linked_ptr(session.release()), false, job.nudge_location); |
| + pending_nudge_.reset(new SyncSessionJob(new_job)); |
| + |
| + return; |
| } |
| + |
| + VLOG(2) << this << " Coalescing a pending nudge"; |
| + pending_nudge_->session->Coalesce(*(job.session.get())); |
| + pending_nudge_->scheduled_start = job.scheduled_start; |
| + |
| + // Unfortunately the nudge location cannot be modified. So it stores the |
| + // location of the first caller. |
| +} |
| + |
| +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { |
| + JobProcessDecision decision = DecideOnJob(job); |
| + VLOG(2) << this << " Should run job, decision: " << decision |
| + << " Job purpose " << job.purpose << "mode " << mode_; |
| + if (decision != SAVE) |
| + return decision == CONTINUE; |
| + |
| + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == |
| + SyncSessionJob::CONFIGURATION); |
| + |
| + SaveJob(job); |
| + return false; |
| +} |
| + |
| +void SyncerThread::SaveJob(const SyncSessionJob& job) { |
| + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); |
| + if (job.purpose == SyncSessionJob::NUDGE) { |
| + VLOG(2) << this << " Saving a nudge job"; |
|
tim (not reviewing)
2011/04/15 00:21:43
using 'this' in these is a bit messy... prefix wit
lipalani1
2011/04/15 01:24:39
Done.
|
| + InitOrCoalescePendingJob(job); |
| + } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
| + VLOG(2) << this << " Saving a configuration job"; |
| + DCHECK(wait_interval_.get()); |
| + DCHECK(mode_ == CONFIGURATION_MODE); |
| + |
| + SyncSession* old = job.session.get(); |
| + SyncSession* s(new SyncSession(session_context_.get(), this, |
| + old->source(), old->routing_info(), old->workers())); |
| + SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
| + make_linked_ptr(s), false, job.nudge_location); |
| + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
| + } // drop the rest. |
| } |
| // Functor for std::find_if to search by ModelSafeGroup. |
| @@ -237,11 +309,14 @@ void SyncerThread::ScheduleNudge(const TimeDelta& delay, |
| return; |
| } |
| + VLOG(2) << this << " Nudge scheduled"; |
| + |
| ModelTypePayloadMap types_with_payloads = |
| syncable::ModelTypePayloadMapFromBitSet(types, std::string()); |
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| - this, &SyncerThread::ScheduleNudgeImpl, delay, source, |
| - types_with_payloads, nudge_location)); |
| + this, &SyncerThread::ScheduleNudgeImpl, delay, |
| + GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
| + nudge_location)); |
| } |
| void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, |
| @@ -252,9 +327,12 @@ void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, |
| return; |
| } |
| + VLOG(2) << this << " Nudge scheduled with payloads"; |
| + |
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| - this, &SyncerThread::ScheduleNudgeImpl, delay, source, |
| - types_with_payloads, nudge_location)); |
| + this, &SyncerThread::ScheduleNudgeImpl, delay, |
| + GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
| + nudge_location)); |
| } |
| void SyncerThread::ScheduleClearUserDataImpl() { |
| @@ -262,51 +340,56 @@ void SyncerThread::ScheduleClearUserDataImpl() { |
| SyncSession* session = new SyncSession(session_context_.get(), this, |
| SyncSourceInfo(), ModelSafeRoutingInfo(), |
| std::vector<ModelSafeWorker*>()); |
| - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session, |
| - FROM_HERE); |
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
| + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); |
| } |
| void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
| - NudgeSource source, const ModelTypePayloadMap& types_with_payloads, |
| - const tracked_objects::Location& nudge_location) { |
| + GetUpdatesCallerInfo::GetUpdatesSource source, |
| + const ModelTypePayloadMap& types_with_payloads, |
| + bool is_canary_job, const tracked_objects::Location& nudge_location) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| - TimeTicks rough_start = TimeTicks::Now() + delay; |
| - if (!ShouldRunJob(NUDGE, rough_start)) { |
| - LOG(WARNING) << "Dropping nudge at scheduling time, source = " |
| - << source; |
| - return; |
| - } |
| + VLOG(2) << this << " Running Schedule nudge impl"; |
| // Note we currently nudge for all types regardless of the ones incurring |
| // the nudge. Doing different would throw off some syncer commands like |
| // CleanupDisabledTypes. We may want to change this in the future. |
| - ModelSafeRoutingInfo routes; |
| - std::vector<ModelSafeWorker*> workers; |
| - session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
| - session_context_->registrar()->GetWorkers(&workers); |
| - SyncSourceInfo info(GetUpdatesFromNudgeSource(source), |
| - types_with_payloads); |
| + SyncSourceInfo info(source, types_with_payloads); |
| - scoped_ptr<SyncSession> session(new SyncSession( |
| - session_context_.get(), this, info, routes, workers)); |
| + SyncSession* session(CreateSyncSession(info)); |
| + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
| + make_linked_ptr(session), is_canary_job, |
| + nudge_location); |
| + |
| + session = NULL; |
| + if (!ShouldRunJob(job)) |
| + return; |
| if (pending_nudge_.get()) { |
| - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) |
| + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
| + VLOG(2) << this << " Dropping the nudge because we are in backoff"; |
| return; |
| + } |
| - pending_nudge_->session->Coalesce(*session.get()); |
| + VLOG(2) << this << " Coalescing pending nudge"; |
| + pending_nudge_->session->Coalesce(*(job.session.get())); |
| if (!IsBackingOff()) { |
| + VLOG(2) << this << " Dropping a nudge because we are not in backoff" |
| + << " and the job was coalesced"; |
| return; |
| } else { |
| - // Re-schedule the current pending nudge. |
| + VLOG(2) << this << " Rescheduling pending nudge"; |
| SyncSession* s = pending_nudge_->session.get(); |
| - session.reset(new SyncSession(s->context(), s->delegate(), s->source(), |
| - s->routing_info(), s->workers())); |
| + job.session.reset(new SyncSession(s->context(), s->delegate(), |
| + s->source(), s->routing_info(), s->workers())); |
| pending_nudge_.reset(); |
| } |
| } |
| - ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location); |
| + |
| + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. |
| + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), |
| + nudge_location); |
| } |
| // Helper to extract the routing info and workers corresponding to types in |
| @@ -348,58 +431,67 @@ void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { |
| return; |
| } |
| + VLOG(2) << this << " Scheduling a config"; |
| ModelSafeRoutingInfo routes; |
| std::vector<ModelSafeWorker*> workers; |
| GetModelSafeParamsForTypes(types, session_context_->registrar(), |
| &routes, &workers); |
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| - this, &SyncerThread::ScheduleConfigImpl, routes, workers)); |
| + this, &SyncerThread::ScheduleConfigImpl, routes, workers, |
| + GetUpdatesCallerInfo::FIRST_UPDATE)); |
| } |
| void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, |
| - const std::vector<ModelSafeWorker*>& workers) { |
| + const std::vector<ModelSafeWorker*>& workers, |
| + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + VLOG(2) << this << " ScheduleConfigImpl..."; |
| // TODO(tim): config-specific GetUpdatesCallerInfo value? |
| SyncSession* session = new SyncSession(session_context_.get(), this, |
| - SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE, |
| + SyncSourceInfo(source, |
| syncable::ModelTypePayloadMapFromRoutingInfo( |
| routing_info, std::string())), |
| routing_info, workers); |
| - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session, |
| - FROM_HERE); |
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
| + SyncSessionJob::CONFIGURATION, session, FROM_HERE); |
| } |
| void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
| - SyncSessionJobPurpose purpose, sessions::SyncSession* session, |
| + SyncSessionJob::SyncSessionJobPurpose purpose, |
| + sessions::SyncSession* session, |
| const tracked_objects::Location& nudge_location) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| - SyncSessionJob job = {purpose, TimeTicks::Now() + delay, |
| - make_linked_ptr(session), nudge_location}; |
| - if (purpose == NUDGE) { |
| + SyncSessionJob job(purpose, TimeTicks::Now() + delay, |
| + make_linked_ptr(session), false, nudge_location); |
| + if (purpose == SyncSessionJob::NUDGE) { |
| + VLOG(2) << this << " Resetting pending_nudge in ScheduleSyncSessionJob"; |
| DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); |
| pending_nudge_.reset(new SyncSessionJob(job)); |
| } |
| + VLOG(2) << this << " Posting job to execute in DoSyncSessionJob. Job purpose " |
| + << job.purpose; |
| MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, |
| &SyncerThread::DoSyncSessionJob, job), |
| delay.InMilliseconds()); |
| } |
| -void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, |
| +void SyncerThread::SetSyncerStepsForPurpose( |
| + SyncSessionJob::SyncSessionJobPurpose purpose, |
| SyncerStep* start, SyncerStep* end) { |
| *end = SYNCER_END; |
| switch (purpose) { |
| - case CONFIGURATION: |
| + case SyncSessionJob::CONFIGURATION: |
| *start = DOWNLOAD_UPDATES; |
| *end = APPLY_UPDATES; |
| return; |
| - case CLEAR_USER_DATA: |
| + case SyncSessionJob::CLEAR_USER_DATA: |
| *start = CLEAR_PRIVATE_DATA; |
| return; |
| - case NUDGE: |
| - case POLL: |
| + case SyncSessionJob::NUDGE: |
| + case SyncSessionJob::POLL: |
| *start = SYNCER_BEGIN; |
| return; |
| default: |
| @@ -409,38 +501,36 @@ void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, |
| void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| - if (!ShouldRunJob(job.purpose, job.scheduled_start)) { |
| - LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = " |
| - << job.session->source().updates_source; |
| + if (!ShouldRunJob(job)) |
| return; |
| - } |
| - if (job.purpose == NUDGE) { |
| + if (job.purpose == SyncSessionJob::NUDGE) { |
| DCHECK(pending_nudge_.get()); |
| if (pending_nudge_->session != job.session) |
| return; // Another nudge must have been scheduled in in the meantime. |
| pending_nudge_.reset(); |
| } |
| + VLOG(2) << this << " DoSyncSessionJob. job purpose " << job.purpose; |
| SyncerStep begin(SYNCER_BEGIN); |
| SyncerStep end(SYNCER_END); |
| SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
| bool has_more_to_sync = true; |
| - while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { |
| - VLOG(1) << "SyncerThread: Calling SyncShare."; |
| + while (ShouldRunJob(job) && has_more_to_sync) { |
| + VLOG(2) << this << " SyncerThread: Calling SyncShare."; |
| // Synchronously perform the sync session from this thread. |
| syncer_->SyncShare(job.session.get(), begin, end); |
| has_more_to_sync = job.session->HasMoreToSync(); |
| if (has_more_to_sync) |
| job.session->ResetTransientState(); |
| } |
| - VLOG(1) << "SyncerThread: Done SyncShare looping."; |
| + VLOG(2) << this << " SyncerThread: Done SyncShare looping."; |
| FinishSyncSessionJob(job); |
| } |
| void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { |
| - if (old_job.purpose == CONFIGURATION) { |
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| // Whatever types were part of a configuration task will have had updates |
| // downloaded. For that reason, we make sure they get recorded in the |
| // event that they get disabled at a later time. |
| @@ -473,10 +563,14 @@ void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { |
| } |
| last_sync_session_end_time_ = now; |
| UpdateCarryoverSessionState(job); |
| - if (IsSyncingCurrentlySilenced()) |
| + if (IsSyncingCurrentlySilenced()) { |
| + VLOG(2) << this << " We are currently throttled. So not scheduling the next" |
| + << " sync"; |
| + SaveJob(job); |
| return; // Nothing to do. |
| + } |
| - VLOG(1) << "Updating the next polling time after SyncMain"; |
| + VLOG(2) << this << " Updating the next polling time after SyncMain"; |
| ScheduleNextSync(job); |
| } |
| @@ -492,35 +586,47 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
| const bool work_to_do = |
| old_job.session->status_controller()->num_server_changes_remaining() > 0 |
| || old_job.session->status_controller()->unsynced_handles().size() > 0; |
| - VLOG(1) << "syncer has work to do: " << work_to_do; |
| + VLOG(2) << this << " syncer has work to do: " << work_to_do; |
| AdjustPolling(&old_job); |
| // TODO(tim): Old impl had special code if notifications disabled. Needed? |
| if (!work_to_do) { |
| // Success implies backoff relief. Note that if this was a "one-off" job |
| - // (i.e. purpose == CLEAR_USER_DATA), if there was work_to_do before it |
| - // ran this wont have changed, as jobs like this don't run a full sync |
| - // cycle. So we don't need special code here. |
| + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was |
| + // work_to_do before it ran this wont have changed, as jobs like this don't |
| + // run a full sync cycle. So we don't need special code here. |
| wait_interval_.reset(); |
| + VLOG(2) << this << " Job suceeded so not scheduling more jobs"; |
| return; |
| } |
| if (old_job.session->source().updates_source == |
| GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { |
| + VLOG(2) << this << " Job failed with source continuation"; |
| // We don't seem to have made forward progress. Start or extend backoff. |
| HandleConsecutiveContinuationError(old_job); |
| } else if (IsBackingOff()) { |
| + VLOG(2) << this << " A nudge during backoff failed"; |
| // We weren't continuing but we're in backoff; must have been a nudge. |
| - DCHECK_EQ(NUDGE, old_job.purpose); |
| + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
| DCHECK(!wait_interval_->had_nudge); |
| wait_interval_->had_nudge = true; |
| wait_interval_->timer.Reset(); |
| } else { |
| + VLOG(2) << this << " Failed. Schedule a job with continuation as source"; |
| // We weren't continuing and we aren't in backoff. Schedule a normal |
| // continuation. |
| - ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, |
| - old_job.session->source().types, FROM_HERE); |
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| + ScheduleConfigImpl(old_job.session->routing_info(), |
| + old_job.session->workers(), |
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); |
| + } else { |
| + // For all other purposes(nudge and poll) we schedule a retry nudge. |
| + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), |
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), |
| + old_job.session->source().types, false, FROM_HERE); |
| + } |
| } |
| } |
| @@ -534,7 +640,7 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { |
| bool rate_changed = !poll_timer_.IsRunning() || |
| poll != poll_timer_.GetCurrentDelay(); |
| - if (old_job && old_job->purpose != POLL && !rate_changed) |
| + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) |
| poll_timer_.Reset(); |
| if (!rate_changed) |
| @@ -548,17 +654,37 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { |
| void SyncerThread::HandleConsecutiveContinuationError( |
| const SyncSessionJob& old_job) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| - DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); |
| + // This if conditions should be compiled out in retail builds. |
| + if (IsBackingOff()) { |
| + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); |
| + } |
| SyncSession* old = old_job.session.get(); |
| SyncSession* s(new SyncSession(session_context_.get(), this, |
| old->source(), old->routing_info(), old->workers())); |
| TimeDelta length = delay_provider_->GetDelay( |
| IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
| + |
| + VLOG(2) << this << " In handle continuation error. Old job purpose is " |
| + << old_job.purpose; |
| + VLOG(2) << this << " In Handle continuation error. The time delta(ms)" |
| + " calculated is " |
| + << length.InMilliseconds(); |
| + |
| + // This will reset the had_nudge variable as well. |
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| length)); |
| - SyncSessionJob job = {NUDGE, TimeTicks::Now() + length, |
| - make_linked_ptr(s), FROM_HERE}; |
| - pending_nudge_.reset(new SyncSessionJob(job)); |
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
| + make_linked_ptr(s), false, FROM_HERE); |
| + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
| + } else { |
| + // We are not in configuration mode. So wait_interval's pending job |
| + // should be null. |
| + DCHECK(wait_interval_->pending_configure_job.get() == NULL); |
| + |
| + // TODO(lipalani) - handle clear user data. |
| + InitOrCoalescePendingJob(old_job); |
| + } |
| wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); |
| } |
| @@ -587,33 +713,76 @@ TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { |
| } |
| void SyncerThread::Stop() { |
| + VLOG(2) << this << " stop called"; |
| syncer_->RequestEarlyExit(); // Safe to call from any thread. |
| session_context_->connection_manager()->RemoveListener(this); |
| thread_.Stop(); |
| } |
| void SyncerThread::DoCanaryJob() { |
| - DCHECK(pending_nudge_.get()); |
| - wait_interval_->had_nudge = false; |
| - SyncSessionJob copy = *pending_nudge_; |
| - DoSyncSessionJob(copy); |
| + VLOG(2) << this << " Do canary job"; |
| + DoPendingJobIfPossible(true); |
| +} |
| + |
| +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { |
| + SyncSessionJob* job_to_execute = NULL; |
| + if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
| + && wait_interval_->pending_configure_job.get()) { |
| + VLOG(2) << this << " Found pending configure job"; |
| + job_to_execute = wait_interval_->pending_configure_job.get(); |
| + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { |
| + VLOG(2) << this << " Found pending nudge job"; |
| + // Pending jobs mostly have time from the past. Reset it so this job |
| + // will get executed. |
| + if (pending_nudge_->scheduled_start < TimeTicks::Now()) |
| + pending_nudge_->scheduled_start = TimeTicks::Now(); |
| + |
| + scoped_ptr<SyncSession> session(CreateSyncSession( |
| + pending_nudge_->session->source())); |
| + |
| + // Also the routing info might have been changed since we cached the |
| + // pending nudge. Update it by coalescing to the latest. |
| + pending_nudge_->session->Coalesce(*(session.get())); |
| + // The pending nudge would be cleared in the DoSyncSessionJob function. |
| + job_to_execute = pending_nudge_.get(); |
| + } |
| + |
| + if (job_to_execute != NULL) { |
| + VLOG(2) << this << " Executing pending job"; |
| + SyncSessionJob copy = *job_to_execute; |
| + copy.is_canary_job = is_canary_job; |
| + DoSyncSessionJob(copy); |
| + } |
| +} |
| + |
| +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { |
| + ModelSafeRoutingInfo routes; |
| + std::vector<ModelSafeWorker*> workers; |
| + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
| + session_context_->registrar()->GetWorkers(&workers); |
| + SyncSourceInfo info(source); |
| + |
| + SyncSession* session(new SyncSession(session_context_.get(), this, info, |
| + routes, workers)); |
| + |
| + return session; |
| } |
| void SyncerThread::PollTimerCallback() { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| ModelSafeRoutingInfo r; |
| - std::vector<ModelSafeWorker*> w; |
| - session_context_->registrar()->GetModelSafeRoutingInfo(&r); |
| - session_context_->registrar()->GetWorkers(&w); |
| ModelTypePayloadMap types_with_payloads = |
| syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); |
| SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
| - SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); |
| - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE); |
| + SyncSession* s = CreateSyncSession(info); |
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, |
| + FROM_HERE); |
| } |
| void SyncerThread::Unthrottle() { |
| DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| + VLOG(2) << this << " Unthrottled.."; |
| + DoCanaryJob(); |
| wait_interval_.reset(); |
| } |
| @@ -652,6 +821,7 @@ void SyncerThread::OnReceivedLongPollIntervalUpdate( |
| } |
| void SyncerThread::OnShouldStopSyncingPermanently() { |
| + VLOG(2) << this << " OnShouldStopSyncingPermanently"; |
| syncer_->RequestEarlyExit(); // Thread-safe. |
| Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
| } |