Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2509)

Unified Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 6812004: sync: Make nudge + config jobs reliable in SyncerThread2 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fixing CR feedback. Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/browser/sync/engine/syncer_thread2.cc
diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc
index e94215e1b5f809ac682d54865f0e2085d812261a..b0bae49bf2911552fe851281119180f2c0442b63 100644
--- a/chrome/browser/sync/engine/syncer_thread2.cc
+++ b/chrome/browser/sync/engine/syncer_thread2.cc
@@ -23,45 +23,47 @@ using sync_pb::GetUpdatesCallerInfo;
namespace s3 {
-struct SyncerThread::WaitInterval {
- enum Mode {
- // A wait interval whose duration has been affected by exponential
- // backoff.
- // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval.
- EXPONENTIAL_BACKOFF,
- // A server-initiated throttled interval. We do not allow any syncing
- // during such an interval.
- THROTTLED,
- };
- Mode mode;
-
- // This bool is set to true if we have observed a nudge during this
- // interval and mode == EXPONENTIAL_BACKOFF.
- bool had_nudge;
- base::TimeDelta length;
- base::OneShotTimer<SyncerThread> timer;
- WaitInterval(Mode mode, base::TimeDelta length);
-};
+SyncerThread::DelayProvider::DelayProvider() {}
+SyncerThread::DelayProvider::~DelayProvider() {}
-struct SyncerThread::SyncSessionJob {
- SyncSessionJobPurpose purpose;
- base::TimeTicks scheduled_start;
- linked_ptr<sessions::SyncSession> session;
+SyncerThread::WaitInterval::WaitInterval() {}
+SyncerThread::WaitInterval::~WaitInterval() {}
- // This is the location the nudge came from. used for debugging purpose.
- // In case of multiple nudges getting coalesced this stores the first nudge
- // that came in.
- tracked_objects::Location nudge_location;
-};
+SyncerThread::SyncSessionJob::SyncSessionJob() {}
+SyncerThread::SyncSessionJob::~SyncSessionJob() {}
-SyncerThread::DelayProvider::DelayProvider() {}
-SyncerThread::DelayProvider::~DelayProvider() {}
+SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
+ base::TimeTicks start,
+ linked_ptr<sessions::SyncSession> session, bool is_canary_job,
+ const tracked_objects::Location& nudge_location) : purpose(purpose),
+ scheduled_start(start),
+ session(session),
+ is_canary_job(is_canary_job),
+ nudge_location(nudge_location) {
+}
TimeDelta SyncerThread::DelayProvider::GetDelay(
const base::TimeDelta& last_delay) {
return SyncerThread::GetRecommendedDelay(last_delay);
}
+GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
+ NudgeSource source) {
+ switch (source) {
+ case NUDGE_SOURCE_NOTIFICATION:
+ return GetUpdatesCallerInfo::NOTIFICATION;
+ case NUDGE_SOURCE_LOCAL:
+ return GetUpdatesCallerInfo::LOCAL;
+ case NUDGE_SOURCE_CONTINUATION:
+ return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
+ case NUDGE_SOURCE_UNKNOWN:
+ return GetUpdatesCallerInfo::UNKNOWN;
+ default:
+ NOTREACHED();
+ return GetUpdatesCallerInfo::UNKNOWN;
+ }
+}
+
SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
: mode(mode), had_nudge(false), length(length) { }
@@ -96,6 +98,7 @@ void SyncerThread::CheckServerConnectionManagerStatus(
server_connection_ok_ = false;
} else if (HttpResponse::SERVER_CONNECTION_OK == code) {
server_connection_ok_ = true;
+ DoPendingJobIfPossible();
}
}
@@ -140,74 +143,119 @@ void SyncerThread::StartImpl(Mode mode,
AdjustPolling(NULL); // Will kick start poll timer if needed.
if (callback.get())
callback->Run();
+
+ // We just changed our mode. See if there are any pending jobs that we could
+ // execute in the new mode.
+ DoPendingJobIfPossible();
}
-bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose,
- const TimeTicks& scheduled_start) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
+SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
+ const SyncSessionJob& job) {
- // Check wait interval.
- if (wait_interval_.get()) {
- // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit
- // when throttled).
- if (wait_interval_->mode == WaitInterval::THROTTLED)
- return false;
+ DCHECK(wait_interval_.get());
+ DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
- DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
- if ((purpose != NUDGE) || wait_interval_->had_nudge)
- return false;
- }
+ if (job.purpose == SyncSessionJob::POLL)
+ return DROP;
- // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that
- // were intended for a normal sync if we are in configuration mode, and vice
- // versa.
- switch (mode_) {
- case CONFIGURATION_MODE:
- if (purpose != CONFIGURATION)
- return false;
- break;
- case NORMAL_MODE:
- if (purpose == CONFIGURATION)
- return false;
- break;
- default:
- NOTREACHED() << "Unknown SyncerThread Mode: " << mode_;
- return false;
+ DCHECK(job.purpose == SyncSessionJob::NUDGE ||
+ job.purpose == SyncSessionJob::CONFIGURATION);
+ if (wait_interval_->mode == WaitInterval::THROTTLED)
+ return SAVE;
+
+ DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
+ if (job.purpose == SyncSessionJob::NUDGE) {
+ if (mode_ == CONFIGURATION_MODE)
+ return SAVE;
+
+ // If we already had one nudge then just drop this nudge. We will retry
+ // later when the timer runs out.
+ return wait_interval_->had_nudge ? DROP : CONTINUE;
}
+ // This is a config job.
+ return job.is_canary_job ? CONTINUE : SAVE;
+
+}
+
+SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
+ const SyncSessionJob& job) {
+ if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
+ return CONTINUE;
+
+ if (wait_interval_.get())
+ return DecideWhileInWaitInterval(job);
- // Continuation NUDGE tasks have priority over POLLs because they are the
- // only tasks that trigger exponential backoff, so this prevents them from
- // being starved from running (e.g. due to a very, very low poll interval,
- // such as 0ms). It's rare that this would ever matter in practice.
- if (purpose == POLL && (pending_nudge_.get() &&
- pending_nudge_->session->source().updates_source ==
- GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) {
- return false;
+ if (mode_ == CONFIGURATION_MODE) {
+ if (job.purpose == SyncSessionJob::NUDGE)
+ return SAVE;
+ else if (job.purpose == SyncSessionJob::CONFIGURATION)
+ return CONTINUE;
+ else
+ return DROP;
}
- // Freshness condition.
- if (purpose == NUDGE &&
- (scheduled_start < last_sync_session_end_time_)) {
- return false;
+ // We are in normal mode.
+ DCHECK_EQ(mode_, NORMAL_MODE);
+ DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
+
+ // Freshness condition
+ if (job.scheduled_start < last_sync_session_end_time_)
+ return DROP;
+
+ if (server_connection_ok_)
+ return CONTINUE;
+
+ return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
+}
+
+void SyncerThread::UpdatePendingJob(const SyncSessionJob& job) {
+ DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
+ if (pending_nudge_.get() == NULL) {
+ SyncSession* s = job.session.get();
+ scoped_ptr<SyncSession> session(new SyncSession(s->context(),
+ s->delegate(), s->source(), s->routing_info(), s->workers()));
+
+ SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
+ make_linked_ptr(session.release()), false, job.nudge_location);
+ pending_nudge_.reset(new SyncSessionJob(new_job));
+
+ return;
}
- return server_connection_ok_;
+ pending_nudge_->session->Coalesce(*(job.session.get()));
+ pending_nudge_->scheduled_start = job.scheduled_start;
+
+ // Unfortunately the nudge location cannot be modified. So it stores the
+ // location of the first caller.
}
-GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
- NudgeSource source) {
- switch (source) {
- case NUDGE_SOURCE_NOTIFICATION:
- return GetUpdatesCallerInfo::NOTIFICATION;
- case NUDGE_SOURCE_LOCAL:
- return GetUpdatesCallerInfo::LOCAL;
- case NUDGE_SOURCE_CONTINUATION:
- return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
- case NUDGE_SOURCE_UNKNOWN:
- return GetUpdatesCallerInfo::UNKNOWN;
- default:
- NOTREACHED();
- return GetUpdatesCallerInfo::UNKNOWN;
+bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
+ JobProcessDecision decision = DecideOnJob(job);
+ if (decision != SAVE)
+ return decision == CONTINUE ? true : false;
tim (not reviewing) 2011/04/12 06:09:29 what I meant before is that this is equivalent to
lipalani1 2011/04/13 00:07:29 Done.
+
+ DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
+ SyncSessionJob::CONFIGURATION);
+
+ SaveJob(job);
+ return false;
+}
+
+void SyncerThread::SaveJob(const SyncSessionJob& job) {
+ DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
+ if (job.purpose == SyncSessionJob::NUDGE || job.purpose ==
+ SyncSessionJob::POLL) {
+ UpdatePendingJob(job);
tim (not reviewing) 2011/04/12 06:09:29 What I was saying before was this might be more re
lipalani1 2011/04/13 00:07:29 Hmm.. this is used in 2 places. SaveJob and Handle
+ } else {
+ DCHECK(wait_interval_.get());
+ DCHECK(mode_ == CONFIGURATION_MODE);
+
+ SyncSession* old = job.session.get();
+ SyncSession* s(new SyncSession(session_context_.get(), this,
+ old->source(), old->routing_info(), old->workers()));
+ SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
+ make_linked_ptr(s), false, job.nudge_location);
+ wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
}
}
@@ -240,8 +288,9 @@ void SyncerThread::ScheduleNudge(const TimeDelta& delay,
ModelTypePayloadMap types_with_payloads =
syncable::ModelTypePayloadMapFromBitSet(types, std::string());
thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleNudgeImpl, delay, source,
- types_with_payloads, nudge_location));
+ this, &SyncerThread::ScheduleNudgeImpl, delay,
+ GetUpdatesFromNudgeSource(source), types_with_payloads, false,
+ nudge_location));
}
void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
@@ -253,8 +302,9 @@ void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
}
thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleNudgeImpl, delay, source,
- types_with_payloads, nudge_location));
+ this, &SyncerThread::ScheduleNudgeImpl, delay,
+ GetUpdatesFromNudgeSource(source), types_with_payloads, false,
+ nudge_location));
}
void SyncerThread::ScheduleClearUserDataImpl() {
@@ -262,51 +312,51 @@ void SyncerThread::ScheduleClearUserDataImpl() {
SyncSession* session = new SyncSession(session_context_.get(), this,
SyncSourceInfo(), ModelSafeRoutingInfo(),
std::vector<ModelSafeWorker*>());
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session,
- FROM_HERE);
+ ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
+ SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
}
void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
- NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
- const tracked_objects::Location& nudge_location) {
+ GetUpdatesCallerInfo::GetUpdatesSource source,
+ const ModelTypePayloadMap& types_with_payloads,
+ bool is_canary_job, const tracked_objects::Location& nudge_location) {
DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- TimeTicks rough_start = TimeTicks::Now() + delay;
- if (!ShouldRunJob(NUDGE, rough_start)) {
- LOG(WARNING) << "Dropping nudge at scheduling time, source = "
- << source;
- return;
- }
// Note we currently nudge for all types regardless of the ones incurring
// the nudge. Doing different would throw off some syncer commands like
// CleanupDisabledTypes. We may want to change this in the future.
- ModelSafeRoutingInfo routes;
- std::vector<ModelSafeWorker*> workers;
- session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
- session_context_->registrar()->GetWorkers(&workers);
- SyncSourceInfo info(GetUpdatesFromNudgeSource(source),
- types_with_payloads);
+ SyncSourceInfo info(source, types_with_payloads);
- scoped_ptr<SyncSession> session(new SyncSession(
- session_context_.get(), this, info, routes, workers));
+ SyncSession* session(CreateSyncSession(info));
+
+ SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
+ make_linked_ptr(session), is_canary_job,
+ nudge_location);
+
+ session = NULL;
+ if (!ShouldRunJob(job))
+ return;
if (pending_nudge_.get()) {
if (IsBackingOff() && delay > TimeDelta::FromSeconds(1))
return;
- pending_nudge_->session->Coalesce(*session.get());
+ pending_nudge_->session->Coalesce(*(job.session.get()));
if (!IsBackingOff()) {
return;
} else {
// Re-schedule the current pending nudge.
SyncSession* s = pending_nudge_->session.get();
- session.reset(new SyncSession(s->context(), s->delegate(), s->source(),
- s->routing_info(), s->workers()));
+ job.session.reset(new SyncSession(s->context(), s->delegate(),
+ s->source(), s->routing_info(), s->workers()));
pending_nudge_.reset();
}
}
- ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location);
+
+ // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
+ ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
+ nudge_location);
}
// Helper to extract the routing info and workers corresponding to types in
@@ -354,31 +404,34 @@ void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
&routes, &workers);
thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleConfigImpl, routes, workers));
+ this, &SyncerThread::ScheduleConfigImpl, routes, workers,
+ GetUpdatesCallerInfo::FIRST_UPDATE));
}
void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
- const std::vector<ModelSafeWorker*>& workers) {
+ const std::vector<ModelSafeWorker*>& workers,
+ const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
// TODO(tim): config-specific GetUpdatesCallerInfo value?
SyncSession* session = new SyncSession(session_context_.get(), this,
- SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE,
+ SyncSourceInfo(source,
syncable::ModelTypePayloadMapFromRoutingInfo(
routing_info, std::string())),
routing_info, workers);
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session,
- FROM_HERE);
+ ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
+ SyncSessionJob::CONFIGURATION, session, FROM_HERE);
}
void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
- SyncSessionJobPurpose purpose, sessions::SyncSession* session,
+ SyncSessionJob::SyncSessionJobPurpose purpose,
+ sessions::SyncSession* session,
const tracked_objects::Location& nudge_location) {
DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- SyncSessionJob job = {purpose, TimeTicks::Now() + delay,
- make_linked_ptr(session), nudge_location};
- if (purpose == NUDGE) {
+ SyncSessionJob job(purpose, TimeTicks::Now() + delay,
+ make_linked_ptr(session), false, nudge_location);
+ if (purpose == SyncSessionJob::NUDGE) {
DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
pending_nudge_.reset(new SyncSessionJob(job));
}
@@ -387,19 +440,20 @@ void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
delay.InMilliseconds());
}
-void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose,
+void SyncerThread::SetSyncerStepsForPurpose(
+ SyncSessionJob::SyncSessionJobPurpose purpose,
SyncerStep* start, SyncerStep* end) {
*end = SYNCER_END;
switch (purpose) {
- case CONFIGURATION:
+ case SyncSessionJob::CONFIGURATION:
*start = DOWNLOAD_UPDATES;
*end = APPLY_UPDATES;
return;
- case CLEAR_USER_DATA:
+ case SyncSessionJob::CLEAR_USER_DATA:
*start = CLEAR_PRIVATE_DATA;
return;
- case NUDGE:
- case POLL:
+ case SyncSessionJob::NUDGE:
+ case SyncSessionJob::POLL:
*start = SYNCER_BEGIN;
return;
default:
@@ -409,13 +463,10 @@ void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose,
void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- if (!ShouldRunJob(job.purpose, job.scheduled_start)) {
- LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
- << job.session->source().updates_source;
+ if (!ShouldRunJob(job))
return;
- }
- if (job.purpose == NUDGE) {
+ if (job.purpose == SyncSessionJob::NUDGE) {
DCHECK(pending_nudge_.get());
if (pending_nudge_->session != job.session)
return; // Another nudge must have been scheduled in in the meantime.
@@ -427,7 +478,7 @@ void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
SetSyncerStepsForPurpose(job.purpose, &begin, &end);
bool has_more_to_sync = true;
- while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) {
+ while (ShouldRunJob(job) && has_more_to_sync) {
VLOG(1) << "SyncerThread: Calling SyncShare.";
// Synchronously perform the sync session from this thread.
syncer_->SyncShare(job.session.get(), begin, end);
@@ -440,7 +491,7 @@ void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
}
void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
- if (old_job.purpose == CONFIGURATION) {
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
// Whatever types were part of a configuration task will have had updates
// downloaded. For that reason, we make sure they get recorded in the
// event that they get disabled at a later time.
@@ -473,8 +524,11 @@ void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
}
last_sync_session_end_time_ = now;
UpdateCarryoverSessionState(job);
- if (IsSyncingCurrentlySilenced())
+ if (IsSyncingCurrentlySilenced()) {
+ // Let us save the job. So when we are unthrottled we can execute it.
+ SaveJob(job);
return; // Nothing to do.
+ }
VLOG(1) << "Updating the next polling time after SyncMain";
ScheduleNextSync(job);
@@ -499,9 +553,10 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
// TODO(tim): Old impl had special code if notifications disabled. Needed?
if (!work_to_do) {
// Success implies backoff relief. Note that if this was a "one-off" job
- // (i.e. purpose == CLEAR_USER_DATA), if there was work_to_do before it
- // ran this wont have changed, as jobs like this don't run a full sync
- // cycle. So we don't need special code here.
+ // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA),
+ // if there was work_to_do before it ran this wont have changed,
+ // as jobs like this don't run a full sync cycle. So we don't need
+ // special code here.
wait_interval_.reset();
return;
}
@@ -512,15 +567,23 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
HandleConsecutiveContinuationError(old_job);
} else if (IsBackingOff()) {
// We weren't continuing but we're in backoff; must have been a nudge.
- DCHECK_EQ(NUDGE, old_job.purpose);
+ DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
DCHECK(!wait_interval_->had_nudge);
wait_interval_->had_nudge = true;
wait_interval_->timer.Reset();
} else {
// We weren't continuing and we aren't in backoff. Schedule a normal
// continuation.
- ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION,
- old_job.session->source().types, FROM_HERE);
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
+ ScheduleConfigImpl(old_job.session->routing_info(),
+ old_job.session->workers(),
+ GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
+ } else {
+ // For all other purposes(nudge and poll) we schedule a retry nudge.
+ ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
+ GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
+ old_job.session->source().types, false, FROM_HERE);
+ }
}
}
@@ -534,7 +597,7 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
bool rate_changed = !poll_timer_.IsRunning() ||
poll != poll_timer_.GetCurrentDelay();
- if (old_job && old_job->purpose != POLL && !rate_changed)
+ if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
poll_timer_.Reset();
if (!rate_changed)
@@ -548,7 +611,7 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
void SyncerThread::HandleConsecutiveContinuationError(
const SyncSessionJob& old_job) {
DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning());
+ DCHECK(!IsBackingOff() || wait_interval_.get() != NULL);
tim (not reviewing) 2011/04/12 06:09:29 this appears to be the opposite condition from bef
lipalani1 2011/04/13 00:07:29 What we want to check is if wait interval pointer
SyncSession* old = old_job.session.get();
SyncSession* s(new SyncSession(session_context_.get(), this,
old->source(), old->routing_info(), old->workers()));
@@ -556,9 +619,21 @@ void SyncerThread::HandleConsecutiveContinuationError(
IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
length));
- SyncSessionJob job = {NUDGE, TimeTicks::Now() + length,
- make_linked_ptr(s), FROM_HERE};
- pending_nudge_.reset(new SyncSessionJob(job));
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
+ SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
+ make_linked_ptr(s), false, FROM_HERE);
+ wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
+ } else {
+ // We are not in configuration mode. So wait_interval's pending job
+ // should be null.
+ DCHECK(wait_interval_->pending_configure_job.get() == NULL);
+
+ // No matter what type of job it is.(nudge or poll, it cannot be config)
+ // We are going to treat it as nudge when doing exponential back off
+ // retries.
+ // TODO(lipalani) - handle clear user data.
+ UpdatePendingJob(old_job);
+ }
wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
}
@@ -593,10 +668,59 @@ void SyncerThread::Stop() {
}
void SyncerThread::DoCanaryJob() {
- DCHECK(pending_nudge_.get());
+ // We should not be here unless wait interval was initialized due to
+ // throttling or backing off.
+ DCHECK(wait_interval_.get());
+
wait_interval_->had_nudge = false;
- SyncSessionJob copy = *pending_nudge_;
- DoSyncSessionJob(copy);
+
+ // We should have one of 2 things. Otherwise we shouldnt be running the timer.
+ DCHECK(wait_interval_->pending_configure_job.get() || pending_nudge_.get());
+ DoPendingJobIfPossible();
+}
+
+void SyncerThread::DoPendingJobIfPossible() {
+ SyncSessionJob* job_to_execute = NULL;
+ if (mode_ == CONFIGURATION_MODE) {
+ if (wait_interval_.get() && wait_interval_->pending_configure_job.get()) {
tim (not reviewing) 2011/04/12 06:09:29 combine this with the above if statement... less n
lipalani1 2011/04/13 00:07:29 Done.
+ job_to_execute = wait_interval_->pending_configure_job.get();
+ }
+ } else {
+ if (pending_nudge_.get()) {
tim (not reviewing) 2011/04/12 06:09:29 move this into 'else if' above.
lipalani1 2011/04/13 00:07:29 Done.
+ // Pending jobs mostly have time from the past. Reset it so this job
+ // will get executed.
+ if (pending_nudge_->scheduled_start < TimeTicks::Now())
+ pending_nudge_->scheduled_start = TimeTicks::Now();
+
+ scoped_ptr<SyncSession> session(CreateSyncSession(
+ pending_nudge_->session->source()));
+
+ // Also the routing info might have been changed since we cached the
+ // pending nudge. Update it by coalescing to the latest.
+ pending_nudge_->session->Coalesce(*(session.get()));
+ // The pending nudge would be cleared in the DoSyncSessionJob function.
+ job_to_execute = pending_nudge_.get();
+ }
+ }
+
+ if (job_to_execute != NULL) {
+ SyncSessionJob copy = *job_to_execute;
+ copy.is_canary_job = true;
+ DoSyncSessionJob(copy);
+ }
+}
+
+SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
tim (not reviewing) 2011/04/12 06:09:29 I think this could be used in 4 (?) more places if
lipalani1 2011/04/13 00:07:29 I have added one more place(polltimer) where this
+ ModelSafeRoutingInfo routes;
+ std::vector<ModelSafeWorker*> workers;
+ session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
+ session_context_->registrar()->GetWorkers(&workers);
+ SyncSourceInfo info(source);
+
+ SyncSession* session(new SyncSession(session_context_.get(), this, info,
+ routes, workers));
+
+ return session;
}
void SyncerThread::PollTimerCallback() {
@@ -609,11 +733,13 @@ void SyncerThread::PollTimerCallback() {
syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w);
tim (not reviewing) 2011/04/12 06:09:29 can this use CreateSyncSession?
lipalani1 2011/04/13 00:07:29 Done.
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE);
+ ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
+ FROM_HERE);
}
void SyncerThread::Unthrottle() {
DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
+ DoCanaryJob();
wait_interval_.reset();
}

Powered by Google App Engine
This is Rietveld 408576698