Index: chrome/browser/sync/engine/syncer_thread2.cc |
diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc |
deleted file mode 100644 |
index b8bd9a95d25adf4936e3b0b40cae3ce633b38380..0000000000000000000000000000000000000000 |
--- a/chrome/browser/sync/engine/syncer_thread2.cc |
+++ /dev/null |
@@ -1,863 +0,0 @@ |
-// Copyright (c) 2011 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-#include "chrome/browser/sync/engine/syncer_thread2.h" |
- |
-#include <algorithm> |
- |
-#include "base/rand_util.h" |
-#include "chrome/browser/sync/engine/syncer.h" |
- |
-using base::TimeDelta; |
-using base::TimeTicks; |
- |
-namespace browser_sync { |
- |
-using sessions::SyncSession; |
-using sessions::SyncSessionSnapshot; |
-using sessions::SyncSourceInfo; |
-using syncable::ModelTypePayloadMap; |
-using syncable::ModelTypeBitSet; |
-using sync_pb::GetUpdatesCallerInfo; |
- |
-namespace s3 { |
- |
-SyncerThread::DelayProvider::DelayProvider() {} |
-SyncerThread::DelayProvider::~DelayProvider() {} |
- |
-SyncerThread::WaitInterval::WaitInterval() {} |
-SyncerThread::WaitInterval::~WaitInterval() {} |
- |
-SyncerThread::SyncSessionJob::SyncSessionJob() {} |
-SyncerThread::SyncSessionJob::~SyncSessionJob() {} |
- |
-SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
- base::TimeTicks start, |
- linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
- const tracked_objects::Location& nudge_location) : purpose(purpose), |
- scheduled_start(start), |
- session(session), |
- is_canary_job(is_canary_job), |
- nudge_location(nudge_location) { |
-} |
- |
-TimeDelta SyncerThread::DelayProvider::GetDelay( |
- const base::TimeDelta& last_delay) { |
- return SyncerThread::GetRecommendedDelay(last_delay); |
-} |
- |
-GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
- NudgeSource source) { |
- switch (source) { |
- case NUDGE_SOURCE_NOTIFICATION: |
- return GetUpdatesCallerInfo::NOTIFICATION; |
- case NUDGE_SOURCE_LOCAL: |
- return GetUpdatesCallerInfo::LOCAL; |
- case NUDGE_SOURCE_CONTINUATION: |
- return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
- case NUDGE_SOURCE_UNKNOWN: |
- return GetUpdatesCallerInfo::UNKNOWN; |
- default: |
- NOTREACHED(); |
- return GetUpdatesCallerInfo::UNKNOWN; |
- } |
-} |
- |
-SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
- : mode(mode), had_nudge(false), length(length) { } |
- |
-SyncerThread::SyncerThread(sessions::SyncSessionContext* context, |
- Syncer* syncer) |
- : thread_("SyncEngine_SyncerThread"), |
- syncer_short_poll_interval_seconds_( |
- TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
- syncer_long_poll_interval_seconds_( |
- TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
- mode_(NORMAL_MODE), |
- server_connection_ok_(false), |
- delay_provider_(new DelayProvider()), |
- syncer_(syncer), |
- session_context_(context) { |
-} |
- |
-SyncerThread::~SyncerThread() { |
- DCHECK(!thread_.IsRunning()); |
-} |
- |
-void SyncerThread::CheckServerConnectionManagerStatus( |
- HttpResponse::ServerConnectionCode code) { |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." |
- << "Old mode: " << server_connection_ok_ << " Code: " << code; |
- // Note, be careful when adding cases here because if the SyncerThread |
- // thinks there is no valid connection as determined by this method, it |
- // will drop out of *all* forward progress sync loops (it won't poll and it |
- // will queue up Talk notifications but not actually call SyncShare) until |
- // some external action causes a ServerConnectionManager to broadcast that |
- // a valid connection has been re-established. |
- if (HttpResponse::CONNECTION_UNAVAILABLE == code || |
- HttpResponse::SYNC_AUTH_ERROR == code) { |
- server_connection_ok_ = false; |
- VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." |
- << " new mode:" << server_connection_ok_; |
- } else if (HttpResponse::SERVER_CONNECTION_OK == code) { |
- server_connection_ok_ = true; |
- VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." |
- << " new mode:" << server_connection_ok_; |
- DoCanaryJob(); |
- } |
-} |
- |
-void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread " |
- << MessageLoop::current()->thread_name(); |
- if (!thread_.IsRunning()) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode " |
- << mode; |
- if (!thread_.Start()) { |
- NOTREACHED() << "Unable to start SyncerThread."; |
- return; |
- } |
- WatchConnectionManager(); |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
- this, &SyncerThread::SendInitialSnapshot)); |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = " |
- << mode; |
- |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
- this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); |
-} |
- |
-void SyncerThread::SendInitialSnapshot() { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, |
- SyncSourceInfo(), ModelSafeRoutingInfo(), |
- std::vector<ModelSafeWorker*>())); |
- SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
- sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); |
- event.snapshot = &snapshot; |
- session_context_->NotifyListeners(event); |
-} |
- |
-void SyncerThread::WatchConnectionManager() { |
- ServerConnectionManager* scm = session_context_->connection_manager(); |
- CheckServerConnectionManagerStatus(scm->server_status()); |
- scm->AddListener(this); |
-} |
- |
-void SyncerThread::StartImpl(Mode mode, |
- linked_ptr<ModeChangeCallback> callback) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " |
- << mode; |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- DCHECK(!session_context_->account_name().empty()); |
- DCHECK(syncer_.get()); |
- mode_ = mode; |
- AdjustPolling(NULL); // Will kick start poll timer if needed. |
- if (callback.get()) |
- callback->Run(); |
- |
- // We just changed our mode. See if there are any pending jobs that we could |
- // execute in the new mode. |
- DoPendingJobIfPossible(false); |
-} |
- |
-SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( |
- const SyncSessionJob& job) { |
- |
- DCHECK(wait_interval_.get()); |
- DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : " |
- << wait_interval_->mode << "Wait interval had nudge : " |
- << wait_interval_->had_nudge << "is canary job : " |
- << job.is_canary_job; |
- |
- if (job.purpose == SyncSessionJob::POLL) |
- return DROP; |
- |
- DCHECK(job.purpose == SyncSessionJob::NUDGE || |
- job.purpose == SyncSessionJob::CONFIGURATION); |
- if (wait_interval_->mode == WaitInterval::THROTTLED) |
- return SAVE; |
- |
- DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
- if (job.purpose == SyncSessionJob::NUDGE) { |
- if (mode_ == CONFIGURATION_MODE) |
- return SAVE; |
- |
- // If we already had one nudge then just drop this nudge. We will retry |
- // later when the timer runs out. |
- return wait_interval_->had_nudge ? DROP : CONTINUE; |
- } |
- // This is a config job. |
- return job.is_canary_job ? CONTINUE : SAVE; |
-} |
- |
-SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( |
- const SyncSessionJob& job) { |
- if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) |
- return CONTINUE; |
- |
- if (wait_interval_.get()) |
- return DecideWhileInWaitInterval(job); |
- |
- if (mode_ == CONFIGURATION_MODE) { |
- if (job.purpose == SyncSessionJob::NUDGE) |
- return SAVE; |
- else if (job.purpose == SyncSessionJob::CONFIGURATION) |
- return CONTINUE; |
- else |
- return DROP; |
- } |
- |
- // We are in normal mode. |
- DCHECK_EQ(mode_, NORMAL_MODE); |
- DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); |
- |
- // Freshness condition |
- if (job.scheduled_start < last_sync_session_end_time_) { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Dropping job because of freshness"; |
- return DROP; |
- } |
- |
- if (server_connection_ok_) |
- return CONTINUE; |
- |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Bad server connection. Using that to decide on job."; |
- return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; |
-} |
- |
-void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
- DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
- if (pending_nudge_.get() == NULL) { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Creating a pending nudge job"; |
- SyncSession* s = job.session.get(); |
- scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
- s->delegate(), s->source(), s->routing_info(), s->workers())); |
- |
- SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
- make_linked_ptr(session.release()), false, job.nudge_location); |
- pending_nudge_.reset(new SyncSessionJob(new_job)); |
- |
- return; |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; |
- pending_nudge_->session->Coalesce(*(job.session.get())); |
- pending_nudge_->scheduled_start = job.scheduled_start; |
- |
- // Unfortunately the nudge location cannot be modified. So it stores the |
- // location of the first caller. |
-} |
- |
-bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { |
- JobProcessDecision decision = DecideOnJob(job); |
- VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: " |
- << decision << " Job purpose " << job.purpose << "mode " << mode_; |
- if (decision != SAVE) |
- return decision == CONTINUE; |
- |
- DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == |
- SyncSessionJob::CONFIGURATION); |
- |
- SaveJob(job); |
- return false; |
-} |
- |
-void SyncerThread::SaveJob(const SyncSessionJob& job) { |
- DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); |
- if (job.purpose == SyncSessionJob::NUDGE) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job"; |
- InitOrCoalescePendingJob(job); |
- } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
- VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job"; |
- DCHECK(wait_interval_.get()); |
- DCHECK(mode_ == CONFIGURATION_MODE); |
- |
- SyncSession* old = job.session.get(); |
- SyncSession* s(new SyncSession(session_context_.get(), this, |
- old->source(), old->routing_info(), old->workers())); |
- SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
- make_linked_ptr(s), false, job.nudge_location); |
- wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
- } // drop the rest. |
-} |
- |
-// Functor for std::find_if to search by ModelSafeGroup. |
-struct ModelSafeWorkerGroupIs { |
- explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
- bool operator()(ModelSafeWorker* w) { |
- return group == w->GetModelSafeGroup(); |
- } |
- ModelSafeGroup group; |
-}; |
- |
-void SyncerThread::ScheduleClearUserData() { |
- if (!thread_.IsRunning()) { |
- NOTREACHED(); |
- return; |
- } |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
- this, &SyncerThread::ScheduleClearUserDataImpl)); |
-} |
- |
-void SyncerThread::ScheduleNudge(const TimeDelta& delay, |
- NudgeSource source, const ModelTypeBitSet& types, |
- const tracked_objects::Location& nudge_location) { |
- if (!thread_.IsRunning()) { |
- NOTREACHED(); |
- return; |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled"; |
- |
- ModelTypePayloadMap types_with_payloads = |
- syncable::ModelTypePayloadMapFromBitSet(types, std::string()); |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
- this, &SyncerThread::ScheduleNudgeImpl, delay, |
- GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
- nudge_location)); |
-} |
- |
-void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, |
- NudgeSource source, const ModelTypePayloadMap& types_with_payloads, |
- const tracked_objects::Location& nudge_location) { |
- if (!thread_.IsRunning()) { |
- NOTREACHED(); |
- return; |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; |
- |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
- this, &SyncerThread::ScheduleNudgeImpl, delay, |
- GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
- nudge_location)); |
-} |
- |
-void SyncerThread::ScheduleClearUserDataImpl() { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- SyncSession* session = new SyncSession(session_context_.get(), this, |
- SyncSourceInfo(), ModelSafeRoutingInfo(), |
- std::vector<ModelSafeWorker*>()); |
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
- SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); |
-} |
- |
-void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
- GetUpdatesCallerInfo::GetUpdatesSource source, |
- const ModelTypePayloadMap& types_with_payloads, |
- bool is_canary_job, const tracked_objects::Location& nudge_location) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; |
- // Note we currently nudge for all types regardless of the ones incurring |
- // the nudge. Doing different would throw off some syncer commands like |
- // CleanupDisabledTypes. We may want to change this in the future. |
- SyncSourceInfo info(source, types_with_payloads); |
- |
- SyncSession* session(CreateSyncSession(info)); |
- SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
- make_linked_ptr(session), is_canary_job, |
- nudge_location); |
- |
- session = NULL; |
- if (!ShouldRunJob(job)) |
- return; |
- |
- if (pending_nudge_.get()) { |
- if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because" |
- << "we are in backoff"; |
- return; |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; |
- pending_nudge_->session->Coalesce(*(job.session.get())); |
- |
- if (!IsBackingOff()) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because" |
- << " we are not in backoff and the job was coalesced"; |
- return; |
- } else { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Rescheduling pending nudge"; |
- SyncSession* s = pending_nudge_->session.get(); |
- job.session.reset(new SyncSession(s->context(), s->delegate(), |
- s->source(), s->routing_info(), s->workers())); |
- pending_nudge_.reset(); |
- } |
- } |
- |
- // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. |
- ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), |
- nudge_location); |
-} |
- |
-// Helper to extract the routing info and workers corresponding to types in |
-// |types| from |registrar|. |
-void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, |
- ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, |
- std::vector<ModelSafeWorker*>* workers) { |
- ModelSafeRoutingInfo r_tmp; |
- std::vector<ModelSafeWorker*> w_tmp; |
- registrar->GetModelSafeRoutingInfo(&r_tmp); |
- registrar->GetWorkers(&w_tmp); |
- |
- typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
- for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { |
- if (!types.test(i)) |
- continue; |
- syncable::ModelType t = syncable::ModelTypeFromInt(i); |
- DCHECK_EQ(1U, r_tmp.count(t)); |
- (*routes)[t] = r_tmp[t]; |
- iter it = std::find_if(w_tmp.begin(), w_tmp.end(), |
- ModelSafeWorkerGroupIs(r_tmp[t])); |
- if (it != w_tmp.end()) |
- workers->push_back(*it); |
- else |
- NOTREACHED(); |
- } |
- |
- iter it = std::find_if(w_tmp.begin(), w_tmp.end(), |
- ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
- if (it != w_tmp.end()) |
- workers->push_back(*it); |
- else |
- NOTREACHED(); |
-} |
- |
-void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { |
- if (!thread_.IsRunning()) { |
- NOTREACHED(); |
- return; |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config"; |
- ModelSafeRoutingInfo routes; |
- std::vector<ModelSafeWorker*> workers; |
- GetModelSafeParamsForTypes(types, session_context_->registrar(), |
- &routes, &workers); |
- |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
- this, &SyncerThread::ScheduleConfigImpl, routes, workers, |
- GetUpdatesCallerInfo::FIRST_UPDATE)); |
-} |
- |
-void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, |
- const std::vector<ModelSafeWorker*>& workers, |
- const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- |
- VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; |
- // TODO(tim): config-specific GetUpdatesCallerInfo value? |
- SyncSession* session = new SyncSession(session_context_.get(), this, |
- SyncSourceInfo(source, |
- syncable::ModelTypePayloadMapFromRoutingInfo( |
- routing_info, std::string())), |
- routing_info, workers); |
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
- SyncSessionJob::CONFIGURATION, session, FROM_HERE); |
-} |
- |
-void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
- SyncSessionJob::SyncSessionJobPurpose purpose, |
- sessions::SyncSession* session, |
- const tracked_objects::Location& nudge_location) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- |
- SyncSessionJob job(purpose, TimeTicks::Now() + delay, |
- make_linked_ptr(session), false, nudge_location); |
- if (purpose == SyncSessionJob::NUDGE) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" |
- << " ScheduleSyncSessionJob"; |
- DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); |
- pending_nudge_.reset(new SyncSessionJob(job)); |
- } |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Posting job to execute in DoSyncSessionJob. Job purpose " |
- << job.purpose; |
- MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, |
- &SyncerThread::DoSyncSessionJob, job), |
- delay.InMilliseconds()); |
-} |
- |
-void SyncerThread::SetSyncerStepsForPurpose( |
- SyncSessionJob::SyncSessionJobPurpose purpose, |
- SyncerStep* start, SyncerStep* end) { |
- *end = SYNCER_END; |
- switch (purpose) { |
- case SyncSessionJob::CONFIGURATION: |
- *start = DOWNLOAD_UPDATES; |
- *end = APPLY_UPDATES; |
- return; |
- case SyncSessionJob::CLEAR_USER_DATA: |
- *start = CLEAR_PRIVATE_DATA; |
- return; |
- case SyncSessionJob::NUDGE: |
- case SyncSessionJob::POLL: |
- *start = SYNCER_BEGIN; |
- return; |
- default: |
- NOTREACHED(); |
- } |
-} |
- |
-void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- if (!ShouldRunJob(job)) |
- return; |
- |
- if (job.purpose == SyncSessionJob::NUDGE) { |
- DCHECK(pending_nudge_.get()); |
- if (pending_nudge_->session != job.session) |
- return; // Another nudge must have been scheduled in in the meantime. |
- pending_nudge_.reset(); |
- } |
- VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " |
- << job.purpose; |
- |
- SyncerStep begin(SYNCER_BEGIN); |
- SyncerStep end(SYNCER_END); |
- SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
- |
- bool has_more_to_sync = true; |
- while (ShouldRunJob(job) && has_more_to_sync) { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " SyncerThread: Calling SyncShare."; |
- // Synchronously perform the sync session from this thread. |
- syncer_->SyncShare(job.session.get(), begin, end); |
- has_more_to_sync = job.session->HasMoreToSync(); |
- if (has_more_to_sync) |
- job.session->ResetTransientState(); |
- } |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " SyncerThread: Done SyncShare looping."; |
- FinishSyncSessionJob(job); |
-} |
- |
-void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { |
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
- // Whatever types were part of a configuration task will have had updates |
- // downloaded. For that reason, we make sure they get recorded in the |
- // event that they get disabled at a later time. |
- ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); |
- if (!r.empty()) { |
- ModelSafeRoutingInfo temp_r; |
- ModelSafeRoutingInfo old_info(old_job.session->routing_info()); |
- std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), |
- std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); |
- session_context_->set_previous_session_routing_info(temp_r); |
- } |
- } else { |
- session_context_->set_previous_session_routing_info( |
- old_job.session->routing_info()); |
- } |
-} |
- |
-void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- // Update timing information for how often datatypes are triggering nudges. |
- base::TimeTicks now = TimeTicks::Now(); |
- if (!last_sync_session_end_time_.is_null()) { |
- ModelTypePayloadMap::const_iterator iter; |
- for (iter = job.session->source().types.begin(); |
- iter != job.session->source().types.end(); |
- ++iter) { |
- syncable::PostTimeToTypeHistogram(iter->first, |
- now - last_sync_session_end_time_); |
- } |
- } |
- last_sync_session_end_time_ = now; |
- UpdateCarryoverSessionState(job); |
- if (IsSyncingCurrentlySilenced()) { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " We are currently throttled. So not scheduling the next sync."; |
- SaveJob(job); |
- return; // Nothing to do. |
- } |
- |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Updating the next polling time after SyncMain"; |
- ScheduleNextSync(job); |
-} |
- |
-void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- DCHECK(!old_job.session->HasMoreToSync()); |
- // Note: |num_server_changes_remaining| > 0 here implies that we received a |
- // broken response while trying to download all updates, because the Syncer |
- // will loop until this value is exhausted. Also, if unsynced_handles exist |
- // but HasMoreToSync is false, this implies that the Syncer determined no |
- // forward progress was possible at this time (an error, such as an HTTP |
- // 500, is likely to have occurred during commit). |
- const bool work_to_do = |
- old_job.session->status_controller()->num_server_changes_remaining() > 0 |
- || old_job.session->status_controller()->unsynced_handles().size() > 0; |
- VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: " |
- << work_to_do; |
- |
- AdjustPolling(&old_job); |
- |
- // TODO(tim): Old impl had special code if notifications disabled. Needed? |
- if (!work_to_do) { |
- // Success implies backoff relief. Note that if this was a "one-off" job |
- // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was |
- // work_to_do before it ran this wont have changed, as jobs like this don't |
- // run a full sync cycle. So we don't need special code here. |
- wait_interval_.reset(); |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Job suceeded so not scheduling more jobs"; |
- return; |
- } |
- |
- if (old_job.session->source().updates_source == |
- GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Job failed with source continuation"; |
- // We don't seem to have made forward progress. Start or extend backoff. |
- HandleConsecutiveContinuationError(old_job); |
- } else if (IsBackingOff()) { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " A nudge during backoff failed"; |
- // We weren't continuing but we're in backoff; must have been a nudge. |
- DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
- DCHECK(!wait_interval_->had_nudge); |
- wait_interval_->had_nudge = true; |
- wait_interval_->timer.Reset(); |
- } else { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " Failed. Schedule a job with continuation as source"; |
- // We weren't continuing and we aren't in backoff. Schedule a normal |
- // continuation. |
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
- ScheduleConfigImpl(old_job.session->routing_info(), |
- old_job.session->workers(), |
- GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); |
- } else { |
- // For all other purposes(nudge and poll) we schedule a retry nudge. |
- ScheduleNudgeImpl(TimeDelta::FromSeconds(0), |
- GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), |
- old_job.session->source().types, false, FROM_HERE); |
- } |
- } |
-} |
- |
-void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { |
- DCHECK(thread_.IsRunning()); |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- |
- TimeDelta poll = (!session_context_->notifications_enabled()) ? |
- syncer_short_poll_interval_seconds_ : |
- syncer_long_poll_interval_seconds_; |
- bool rate_changed = !poll_timer_.IsRunning() || |
- poll != poll_timer_.GetCurrentDelay(); |
- |
- if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) |
- poll_timer_.Reset(); |
- |
- if (!rate_changed) |
- return; |
- |
- // Adjust poll rate. |
- poll_timer_.Stop(); |
- poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); |
-} |
- |
-void SyncerThread::HandleConsecutiveContinuationError( |
- const SyncSessionJob& old_job) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- // This if conditions should be compiled out in retail builds. |
- if (IsBackingOff()) { |
- DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); |
- } |
- SyncSession* old = old_job.session.get(); |
- SyncSession* s(new SyncSession(session_context_.get(), this, |
- old->source(), old->routing_info(), old->workers())); |
- TimeDelta length = delay_provider_->GetDelay( |
- IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
- |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " In handle continuation error. Old job purpose is " |
- << old_job.purpose; |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " In Handle continuation error. The time delta(ms) is: " |
- << length.InMilliseconds(); |
- |
- // This will reset the had_nudge variable as well. |
- wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
- length)); |
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
- SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
- make_linked_ptr(s), false, FROM_HERE); |
- wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
- } else { |
- // We are not in configuration mode. So wait_interval's pending job |
- // should be null. |
- DCHECK(wait_interval_->pending_configure_job.get() == NULL); |
- |
- // TODO(lipalani) - handle clear user data. |
- InitOrCoalescePendingJob(old_job); |
- } |
- wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); |
-} |
- |
-// static |
-TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { |
- if (last_delay.InSeconds() >= kMaxBackoffSeconds) |
- return TimeDelta::FromSeconds(kMaxBackoffSeconds); |
- |
- // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 |
- int64 backoff_s = |
- std::max(static_cast<int64>(1), |
- last_delay.InSeconds() * kBackoffRandomizationFactor); |
- |
- // Flip a coin to randomize backoff interval by +/- 50%. |
- int rand_sign = base::RandInt(0, 1) * 2 - 1; |
- |
- // Truncation is adequate for rounding here. |
- backoff_s = backoff_s + |
- (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); |
- |
- // Cap the backoff interval. |
- backoff_s = std::max(static_cast<int64>(1), |
- std::min(backoff_s, kMaxBackoffSeconds)); |
- |
- return TimeDelta::FromSeconds(backoff_s); |
-} |
- |
-void SyncerThread::Stop() { |
- VLOG(2) << "SyncerThread(" << this << ")" << " stop called"; |
- syncer_->RequestEarlyExit(); // Safe to call from any thread. |
- session_context_->connection_manager()->RemoveListener(this); |
- thread_.Stop(); |
-} |
- |
-void SyncerThread::DoCanaryJob() { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job"; |
- DoPendingJobIfPossible(true); |
-} |
- |
-void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { |
- SyncSessionJob* job_to_execute = NULL; |
- if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
- && wait_interval_->pending_configure_job.get()) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job"; |
- job_to_execute = wait_interval_->pending_configure_job.get(); |
- } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job"; |
- // Pending jobs mostly have time from the past. Reset it so this job |
- // will get executed. |
- if (pending_nudge_->scheduled_start < TimeTicks::Now()) |
- pending_nudge_->scheduled_start = TimeTicks::Now(); |
- |
- scoped_ptr<SyncSession> session(CreateSyncSession( |
- pending_nudge_->session->source())); |
- |
- // Also the routing info might have been changed since we cached the |
- // pending nudge. Update it by coalescing to the latest. |
- pending_nudge_->session->Coalesce(*(session.get())); |
- // The pending nudge would be cleared in the DoSyncSessionJob function. |
- job_to_execute = pending_nudge_.get(); |
- } |
- |
- if (job_to_execute != NULL) { |
- VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job"; |
- SyncSessionJob copy = *job_to_execute; |
- copy.is_canary_job = is_canary_job; |
- DoSyncSessionJob(copy); |
- } |
-} |
- |
-SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { |
- ModelSafeRoutingInfo routes; |
- std::vector<ModelSafeWorker*> workers; |
- session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
- session_context_->registrar()->GetWorkers(&workers); |
- SyncSourceInfo info(source); |
- |
- SyncSession* session(new SyncSession(session_context_.get(), this, info, |
- routes, workers)); |
- |
- return session; |
-} |
- |
-void SyncerThread::PollTimerCallback() { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- ModelSafeRoutingInfo r; |
- ModelTypePayloadMap types_with_payloads = |
- syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); |
- SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
- SyncSession* s = CreateSyncSession(info); |
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, |
- FROM_HERE); |
-} |
- |
-void SyncerThread::Unthrottle() { |
- DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
- VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled.."; |
- DoCanaryJob(); |
- wait_interval_.reset(); |
-} |
- |
-void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- session_context_->NotifyListeners(SyncEngineEvent(cause)); |
-} |
- |
-bool SyncerThread::IsBackingOff() const { |
- return wait_interval_.get() && wait_interval_->mode == |
- WaitInterval::EXPONENTIAL_BACKOFF; |
-} |
- |
-void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { |
- wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
- silenced_until - TimeTicks::Now())); |
- wait_interval_->timer.Start(wait_interval_->length, this, |
- &SyncerThread::Unthrottle); |
-} |
- |
-bool SyncerThread::IsSyncingCurrentlySilenced() { |
- return wait_interval_.get() && wait_interval_->mode == |
- WaitInterval::THROTTLED; |
-} |
- |
-void SyncerThread::OnReceivedShortPollIntervalUpdate( |
- const base::TimeDelta& new_interval) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- syncer_short_poll_interval_seconds_ = new_interval; |
-} |
- |
-void SyncerThread::OnReceivedLongPollIntervalUpdate( |
- const base::TimeDelta& new_interval) { |
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- syncer_long_poll_interval_seconds_ = new_interval; |
-} |
- |
-void SyncerThread::OnShouldStopSyncingPermanently() { |
- VLOG(2) << "SyncerThread(" << this << ")" |
- << " OnShouldStopSyncingPermanently"; |
- syncer_->RequestEarlyExit(); // Thread-safe. |
- Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
-} |
- |
-void SyncerThread::OnServerConnectionEvent( |
- const ServerConnectionEvent2& event) { |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
- &SyncerThread::CheckServerConnectionManagerStatus, |
- event.connection_code)); |
-} |
- |
-void SyncerThread::set_notifications_enabled(bool notifications_enabled) { |
- session_context_->set_notifications_enabled(notifications_enabled); |
-} |
- |
-} // s3 |
-} // browser_sync |