Chromium Code Reviews| Index: chrome/browser/sync/engine/syncer_thread2.cc |
| diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc |
| index e94215e1b5f809ac682d54865f0e2085d812261a..d10bc331bb697cbe609b0a25cbc9e5fdf03a2d59 100644 |
| --- a/chrome/browser/sync/engine/syncer_thread2.cc |
| +++ b/chrome/browser/sync/engine/syncer_thread2.cc |
| @@ -23,6 +23,17 @@ using sync_pb::GetUpdatesCallerInfo; |
| namespace s3 { |
| +struct SyncerThread::SyncSessionJob { |
| + SyncSessionJobPurpose purpose; |
| + base::TimeTicks scheduled_start; |
| + linked_ptr<sessions::SyncSession> session; |
| + |
| + // This is the location the nudge came from. used for debugging purpose. |
| + // In case of multiple nudges getting coalesced this stores the first nudge |
| + // that came in. |
| + tracked_objects::Location nudge_location; |
| +}; |
| + |
| struct SyncerThread::WaitInterval { |
| enum Mode { |
| // A wait interval whose duration has been affected by exponential |
| @@ -40,20 +51,10 @@ struct SyncerThread::WaitInterval { |
| bool had_nudge; |
| base::TimeDelta length; |
| base::OneShotTimer<SyncerThread> timer; |
| + scoped_ptr<SyncSessionJob> pending_job; |
| WaitInterval(Mode mode, base::TimeDelta length); |
| }; |
| -struct SyncerThread::SyncSessionJob { |
| - SyncSessionJobPurpose purpose; |
| - base::TimeTicks scheduled_start; |
| - linked_ptr<sessions::SyncSession> session; |
| - |
| - // This is the location the nudge came from. used for debugging purpose. |
| - // In case of multiple nudges getting coalesced this stores the first nudge |
| - // that came in. |
| - tracked_objects::Location nudge_location; |
| -}; |
| - |
| SyncerThread::DelayProvider::DelayProvider() {} |
| SyncerThread::DelayProvider::~DelayProvider() {} |
| @@ -76,7 +77,8 @@ SyncerThread::SyncerThread(sessions::SyncSessionContext* context, |
| server_connection_ok_(false), |
| delay_provider_(new DelayProvider()), |
| syncer_(syncer), |
| - session_context_(context) { |
| + session_context_(context), |
| + saved_nudge_(false) { |
| } |
| SyncerThread::~SyncerThread() { |
| @@ -140,58 +142,127 @@ void SyncerThread::StartImpl(Mode mode, |
| AdjustPolling(NULL); // Will kick start poll timer if needed. |
| if (callback.get()) |
| callback->Run(); |
| + |
| + if (mode_ == NORMAL_MODE && saved_nudge_ == true) { |
| + syncable::ModelTypePayloadMap map; |
| + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_NOTIFICATION, |
| + map, FROM_HERE); |
| + } |
| } |
| -bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, |
| - const TimeTicks& scheduled_start) { |
| - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| +SyncerThread::JobProcessDecision SyncerThread::ShouldRunNudgeJob( |
| + const base::TimeTicks& scheduled_start) { |
| + if (wait_interval_.get()) { |
| + // Means we are either in throttled or exponential back off. |
| + // However it is also possible that we are in exponential back off |
| + // and we are retrying.(In that case the timer ran out but wait_interval |
| + // is not yet cleared.) wait_interval_.timer_running is false; |
| + if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| + return SAVE; |
| + } |
| - // Check wait interval. |
| + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| + if (mode_ == CONFIGURATION) { |
| + return SAVE; |
| + } |
| + // The mode is normal. We already had a nudge. No point retrying. |
| + if (wait_interval_->had_nudge) { |
| + return DROP; |
| + } |
| + // Either this is the first nudge or we are in exponential backoff |
| + // and we are trying because our timer ran out.(In either case had_nudge |
| + // is false) |
| + return CONTINUE; |
| + } |
| + |
| + // We are not in any kind of backoff. |
| + if (mode_ == CONFIGURATION) { |
| + return SAVE; |
| + } |
| + // Freshness condition |
| + if (scheduled_start < last_sync_session_end_time_) { |
| + return DROP; |
| + } |
| + |
| + return CONTINUE; |
| +} |
| + |
| +// Note: We should never be dropping a config request. |
| +SyncerThread::JobProcessDecision SyncerThread::ShouldRunConfigureJob() { |
|
tim (not reviewing)
2011/04/07 06:13:07
wouldn't it result in a substantial amount less co
lipalani1
2011/04/07 18:35:45
The original function did its job well in terms of
|
| if (wait_interval_.get()) { |
| - // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit |
| - // when throttled). |
| - if (wait_interval_->mode == WaitInterval::THROTTLED) |
| - return false; |
| + if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| + return SAVE; |
| + } |
| DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| - if ((purpose != NUDGE) || wait_interval_->had_nudge) |
| - return false; |
| - } |
| - |
| - // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that |
| - // were intended for a normal sync if we are in configuration mode, and vice |
| - // versa. |
| - switch (mode_) { |
| - case CONFIGURATION_MODE: |
| - if (purpose != CONFIGURATION) |
| - return false; |
| - break; |
| - case NORMAL_MODE: |
| - if (purpose == CONFIGURATION) |
| - return false; |
| - break; |
| - default: |
| - NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; |
| - return false; |
| + DCHECK(mode_ == CONFIGURATION_MODE); |
| + if (wait_interval_->timer.IsRunning() == true) { |
| + return SAVE; |
| + } |
| + |
| + return CONTINUE; |
| } |
| - // Continuation NUDGE tasks have priority over POLLs because they are the |
| - // only tasks that trigger exponential backoff, so this prevents them from |
| - // being starved from running (e.g. due to a very, very low poll interval, |
| - // such as 0ms). It's rare that this would ever matter in practice. |
| - if (purpose == POLL && (pending_nudge_.get() && |
| - pending_nudge_->session->source().updates_source == |
| - GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { |
| - return false; |
| + // We are not in any kind of back off. |
| + DCHECK(mode_ == CONFIGURATION_MODE); |
| + return CONTINUE; |
| +} |
| + |
| +SyncerThread::JobProcessDecision SyncerThread::ShouldRunJobDefaultImpl() { |
| + if (wait_interval_.get()) { |
| + return DROP; |
| + } |
| + |
| + if (mode_ == CONFIGURATION) { |
| + return DROP; |
| + } |
| + |
| + return CONTINUE; |
| +} |
| + |
| +SyncerThread::JobProcessDecision SyncerThread::ShouldRunJob( |
| + SyncSessionJobPurpose purpose, |
| + const TimeTicks& scheduled_start) { |
| + if (purpose == NUDGE) { |
| + return ShouldRunNudgeJob(scheduled_start); |
| + } else if (purpose == CONFIGURATION) { |
| + return ShouldRunConfigureJob(); |
| + } else { |
| + return ShouldRunJobDefaultImpl(); |
| } |
| +} |
| - // Freshness condition. |
| - if (purpose == NUDGE && |
| - (scheduled_start < last_sync_session_end_time_)) { |
| +bool SyncerThread::ProcessJob(const SyncSessionJob& job) { |
|
tim (not reviewing)
2011/04/07 06:13:07
It's pretty confusing that we now have both Proces
lipalani1
2011/04/07 18:35:45
Will do.
Regarding job creation it was also done
|
| + JobProcessDecision decision = ShouldRunJob(job.purpose, job.scheduled_start); |
| + if (decision == DROP) { |
| return false; |
| } |
| - return server_connection_ok_; |
| + if (decision == CONTINUE) { |
| + return true; |
| + } |
| + |
| + DCHECK(job.purpose == NUDGE || job.purpose == CONFIGURATION); |
| + if (job.purpose == NUDGE) { |
| + saved_nudge_ = true; |
| + } else { |
| + DCHECK(wait_interval_.get()); |
| + DCHECK(mode_ == CONFIGURATION_MODE); |
| + |
| + // Save off the nudge if we had one already stored. |
| + if (wait_interval_->pending_job.get()) { |
| + if (wait_interval_->pending_job->purpose == NUDGE) { |
| + saved_nudge_ = true; |
| + } |
| + } |
| + SyncSession* old = job.session.get(); |
| + SyncSession* s(new SyncSession(session_context_.get(), this, |
| + old->source(), old->routing_info(), old->workers())); |
| + SyncSessionJob new_job = {job.purpose, TimeTicks::Now(), |
| + make_linked_ptr(s), job.nudge_location}; |
| + wait_interval_->pending_job.reset(new SyncSessionJob(new_job)); |
| + } |
| + return false; |
| } |
| GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
| @@ -270,12 +341,6 @@ void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
| NudgeSource source, const ModelTypePayloadMap& types_with_payloads, |
| const tracked_objects::Location& nudge_location) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| - TimeTicks rough_start = TimeTicks::Now() + delay; |
| - if (!ShouldRunJob(NUDGE, rough_start)) { |
| - LOG(WARNING) << "Dropping nudge at scheduling time, source = " |
| - << source; |
| - return; |
| - } |
| // Note we currently nudge for all types regardless of the ones incurring |
| // the nudge. Doing different would throw off some syncer commands like |
| @@ -287,26 +352,36 @@ void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
| SyncSourceInfo info(GetUpdatesFromNudgeSource(source), |
| types_with_payloads); |
| - scoped_ptr<SyncSession> session(new SyncSession( |
| + SyncSession* session(new SyncSession( |
| session_context_.get(), this, info, routes, workers)); |
| + SyncSessionJob job = {NUDGE, TimeTicks::Now() + delay, |
| + make_linked_ptr(session), nudge_location}; |
| + |
| + session = NULL; |
| + if (!ProcessJob(job)) { |
|
tim (not reviewing)
2011/04/07 06:13:07
We only had to move this down because we must pass
lipalani1
2011/04/07 18:35:45
The other option as I mentioned was to pass only t
|
| + return; |
| + } |
| + |
| if (pending_nudge_.get()) { |
| if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) |
| return; |
| - pending_nudge_->session->Coalesce(*session.get()); |
| + pending_nudge_->session->Coalesce(*(job.session.get())); |
| if (!IsBackingOff()) { |
| return; |
| } else { |
| // Re-schedule the current pending nudge. |
| SyncSession* s = pending_nudge_->session.get(); |
| - session.reset(new SyncSession(s->context(), s->delegate(), s->source(), |
| - s->routing_info(), s->workers())); |
| + job.session.reset(new SyncSession(s->context(), s->delegate(), |
| + s->source(), s->routing_info(), s->workers())); |
| pending_nudge_.reset(); |
| } |
| } |
| - ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location); |
| + |
| + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. |
|
tim (not reviewing)
2011/04/07 06:13:07
if we do that then there is no point to having Sch
lipalani1
2011/04/07 18:35:45
Same as above.
On 2011/04/07 06:13:07, timsteele w
|
| + ScheduleSyncSessionJob(delay, NUDGE, job.session.release(), nudge_location); |
| } |
| // Helper to extract the routing info and workers corresponding to types in |
| @@ -354,21 +429,23 @@ void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { |
| &routes, &workers); |
| thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| - this, &SyncerThread::ScheduleConfigImpl, routes, workers)); |
| + this, &SyncerThread::ScheduleConfigImpl, routes, workers, |
| + GetUpdatesCallerInfo::FIRST_UPDATE)); |
| } |
| void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, |
| - const std::vector<ModelSafeWorker*>& workers) { |
| + const std::vector<ModelSafeWorker*>& workers, |
| + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| // TODO(tim): config-specific GetUpdatesCallerInfo value? |
| SyncSession* session = new SyncSession(session_context_.get(), this, |
| - SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE, |
| + SyncSourceInfo(source, |
| syncable::ModelTypePayloadMapFromRoutingInfo( |
| routing_info, std::string())), |
| routing_info, workers); |
| ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session, |
| - FROM_HERE); |
| + FROM_HERE); |
| } |
| void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
| @@ -409,7 +486,7 @@ void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, |
| void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
| DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| - if (!ShouldRunJob(job.purpose, job.scheduled_start)) { |
| + if (!ProcessJob(job)) { |
| LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = " |
| << job.session->source().updates_source; |
| return; |
| @@ -420,6 +497,7 @@ void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
| if (pending_nudge_->session != job.session) |
| return; // Another nudge must have been scheduled in in the meantime. |
| pending_nudge_.reset(); |
| + saved_nudge_ = false; |
| } |
| SyncerStep begin(SYNCER_BEGIN); |
| @@ -427,7 +505,7 @@ void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
| SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
| bool has_more_to_sync = true; |
| - while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { |
| + while (ProcessJob(job) && has_more_to_sync) { |
| VLOG(1) << "SyncerThread: Calling SyncShare."; |
| // Synchronously perform the sync session from this thread. |
| syncer_->SyncShare(job.session.get(), begin, end); |
| @@ -519,8 +597,14 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
| } else { |
| // We weren't continuing and we aren't in backoff. Schedule a normal |
| // continuation. |
| - ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, |
| - old_job.session->source().types, FROM_HERE); |
| + if (old_job.purpose == NUDGE) { |
| + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, |
| + old_job.session->source().types, FROM_HERE); |
| + } else if (old_job.purpose == CONFIGURATION) { |
| + ScheduleConfigImpl(old_job.session->routing_info(), |
| + old_job.session->workers(), |
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); |
| + } // Drop the rest. |
| } |
| } |
| @@ -556,9 +640,9 @@ void SyncerThread::HandleConsecutiveContinuationError( |
| IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| length)); |
| - SyncSessionJob job = {NUDGE, TimeTicks::Now() + length, |
| + SyncSessionJob job = {old_job.purpose, TimeTicks::Now() + length, |
| make_linked_ptr(s), FROM_HERE}; |
| - pending_nudge_.reset(new SyncSessionJob(job)); |
| + wait_interval_->pending_job.reset(new SyncSessionJob(job)); |
| wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); |
| } |
| @@ -593,9 +677,13 @@ void SyncerThread::Stop() { |
| } |
| void SyncerThread::DoCanaryJob() { |
| - DCHECK(pending_nudge_.get()); |
| wait_interval_->had_nudge = false; |
| - SyncSessionJob copy = *pending_nudge_; |
| + SyncSessionJob copy = *(wait_interval_->pending_job); |
| + if (copy.purpose == CONFIGURATION) { |
| + DCHECK(mode_ == CONFIGURATION_MODE); |
| + } else { |
| + pending_nudge_.reset(new SyncSessionJob(copy)); |
| + } |
| DoSyncSessionJob(copy); |
| } |
| @@ -614,6 +702,7 @@ void SyncerThread::PollTimerCallback() { |
| void SyncerThread::Unthrottle() { |
| DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| + DoCanaryJob(); |
| wait_interval_.reset(); |
| } |