Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3935)

Unified Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 6874018: make new syncer thread the default. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Upload before submit. Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread2.h ('k') | chrome/browser/sync/engine/syncer_thread2_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: chrome/browser/sync/engine/syncer_thread2.cc
diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc
deleted file mode 100644
index b8bd9a95d25adf4936e3b0b40cae3ce633b38380..0000000000000000000000000000000000000000
--- a/chrome/browser/sync/engine/syncer_thread2.cc
+++ /dev/null
@@ -1,863 +0,0 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "chrome/browser/sync/engine/syncer_thread2.h"
-
-#include <algorithm>
-
-#include "base/rand_util.h"
-#include "chrome/browser/sync/engine/syncer.h"
-
-using base::TimeDelta;
-using base::TimeTicks;
-
-namespace browser_sync {
-
-using sessions::SyncSession;
-using sessions::SyncSessionSnapshot;
-using sessions::SyncSourceInfo;
-using syncable::ModelTypePayloadMap;
-using syncable::ModelTypeBitSet;
-using sync_pb::GetUpdatesCallerInfo;
-
-namespace s3 {
-
-SyncerThread::DelayProvider::DelayProvider() {}
-SyncerThread::DelayProvider::~DelayProvider() {}
-
-SyncerThread::WaitInterval::WaitInterval() {}
-SyncerThread::WaitInterval::~WaitInterval() {}
-
-SyncerThread::SyncSessionJob::SyncSessionJob() {}
-SyncerThread::SyncSessionJob::~SyncSessionJob() {}
-
-SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
- base::TimeTicks start,
- linked_ptr<sessions::SyncSession> session, bool is_canary_job,
- const tracked_objects::Location& nudge_location) : purpose(purpose),
- scheduled_start(start),
- session(session),
- is_canary_job(is_canary_job),
- nudge_location(nudge_location) {
-}
-
-TimeDelta SyncerThread::DelayProvider::GetDelay(
- const base::TimeDelta& last_delay) {
- return SyncerThread::GetRecommendedDelay(last_delay);
-}
-
-GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
- NudgeSource source) {
- switch (source) {
- case NUDGE_SOURCE_NOTIFICATION:
- return GetUpdatesCallerInfo::NOTIFICATION;
- case NUDGE_SOURCE_LOCAL:
- return GetUpdatesCallerInfo::LOCAL;
- case NUDGE_SOURCE_CONTINUATION:
- return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
- case NUDGE_SOURCE_UNKNOWN:
- return GetUpdatesCallerInfo::UNKNOWN;
- default:
- NOTREACHED();
- return GetUpdatesCallerInfo::UNKNOWN;
- }
-}
-
-SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
- : mode(mode), had_nudge(false), length(length) { }
-
-SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
- Syncer* syncer)
- : thread_("SyncEngine_SyncerThread"),
- syncer_short_poll_interval_seconds_(
- TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
- syncer_long_poll_interval_seconds_(
- TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
- mode_(NORMAL_MODE),
- server_connection_ok_(false),
- delay_provider_(new DelayProvider()),
- syncer_(syncer),
- session_context_(context) {
-}
-
-SyncerThread::~SyncerThread() {
- DCHECK(!thread_.IsRunning());
-}
-
-void SyncerThread::CheckServerConnectionManagerStatus(
- HttpResponse::ServerConnectionCode code) {
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
- << "Old mode: " << server_connection_ok_ << " Code: " << code;
- // Note, be careful when adding cases here because if the SyncerThread
- // thinks there is no valid connection as determined by this method, it
- // will drop out of *all* forward progress sync loops (it won't poll and it
- // will queue up Talk notifications but not actually call SyncShare) until
- // some external action causes a ServerConnectionManager to broadcast that
- // a valid connection has been re-established.
- if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
- HttpResponse::SYNC_AUTH_ERROR == code) {
- server_connection_ok_ = false;
- VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
- << " new mode:" << server_connection_ok_;
- } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
- server_connection_ok_ = true;
- VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
- << " new mode:" << server_connection_ok_;
- DoCanaryJob();
- }
-}
-
-void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread "
- << MessageLoop::current()->thread_name();
- if (!thread_.IsRunning()) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode "
- << mode;
- if (!thread_.Start()) {
- NOTREACHED() << "Unable to start SyncerThread.";
- return;
- }
- WatchConnectionManager();
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::SendInitialSnapshot));
- }
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = "
- << mode;
-
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback)));
-}
-
-void SyncerThread::SendInitialSnapshot() {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
- SyncSourceInfo(), ModelSafeRoutingInfo(),
- std::vector<ModelSafeWorker*>()));
- SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
- sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
- event.snapshot = &snapshot;
- session_context_->NotifyListeners(event);
-}
-
-void SyncerThread::WatchConnectionManager() {
- ServerConnectionManager* scm = session_context_->connection_manager();
- CheckServerConnectionManagerStatus(scm->server_status());
- scm->AddListener(this);
-}
-
-void SyncerThread::StartImpl(Mode mode,
- linked_ptr<ModeChangeCallback> callback) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
- << mode;
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- DCHECK(!session_context_->account_name().empty());
- DCHECK(syncer_.get());
- mode_ = mode;
- AdjustPolling(NULL); // Will kick start poll timer if needed.
- if (callback.get())
- callback->Run();
-
- // We just changed our mode. See if there are any pending jobs that we could
- // execute in the new mode.
- DoPendingJobIfPossible(false);
-}
-
-SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
- const SyncSessionJob& job) {
-
- DCHECK(wait_interval_.get());
- DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : "
- << wait_interval_->mode << "Wait interval had nudge : "
- << wait_interval_->had_nudge << "is canary job : "
- << job.is_canary_job;
-
- if (job.purpose == SyncSessionJob::POLL)
- return DROP;
-
- DCHECK(job.purpose == SyncSessionJob::NUDGE ||
- job.purpose == SyncSessionJob::CONFIGURATION);
- if (wait_interval_->mode == WaitInterval::THROTTLED)
- return SAVE;
-
- DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
- if (job.purpose == SyncSessionJob::NUDGE) {
- if (mode_ == CONFIGURATION_MODE)
- return SAVE;
-
- // If we already had one nudge then just drop this nudge. We will retry
- // later when the timer runs out.
- return wait_interval_->had_nudge ? DROP : CONTINUE;
- }
- // This is a config job.
- return job.is_canary_job ? CONTINUE : SAVE;
-}
-
-SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
- const SyncSessionJob& job) {
- if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
- return CONTINUE;
-
- if (wait_interval_.get())
- return DecideWhileInWaitInterval(job);
-
- if (mode_ == CONFIGURATION_MODE) {
- if (job.purpose == SyncSessionJob::NUDGE)
- return SAVE;
- else if (job.purpose == SyncSessionJob::CONFIGURATION)
- return CONTINUE;
- else
- return DROP;
- }
-
- // We are in normal mode.
- DCHECK_EQ(mode_, NORMAL_MODE);
- DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
-
- // Freshness condition
- if (job.scheduled_start < last_sync_session_end_time_) {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Dropping job because of freshness";
- return DROP;
- }
-
- if (server_connection_ok_)
- return CONTINUE;
-
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Bad server connection. Using that to decide on job.";
- return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
-}
-
-void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
- DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
- if (pending_nudge_.get() == NULL) {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Creating a pending nudge job";
- SyncSession* s = job.session.get();
- scoped_ptr<SyncSession> session(new SyncSession(s->context(),
- s->delegate(), s->source(), s->routing_info(), s->workers()));
-
- SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
- make_linked_ptr(session.release()), false, job.nudge_location);
- pending_nudge_.reset(new SyncSessionJob(new_job));
-
- return;
- }
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
- pending_nudge_->session->Coalesce(*(job.session.get()));
- pending_nudge_->scheduled_start = job.scheduled_start;
-
- // Unfortunately the nudge location cannot be modified. So it stores the
- // location of the first caller.
-}
-
-bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
- JobProcessDecision decision = DecideOnJob(job);
- VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: "
- << decision << " Job purpose " << job.purpose << "mode " << mode_;
- if (decision != SAVE)
- return decision == CONTINUE;
-
- DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
- SyncSessionJob::CONFIGURATION);
-
- SaveJob(job);
- return false;
-}
-
-void SyncerThread::SaveJob(const SyncSessionJob& job) {
- DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
- if (job.purpose == SyncSessionJob::NUDGE) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job";
- InitOrCoalescePendingJob(job);
- } else if (job.purpose == SyncSessionJob::CONFIGURATION){
- VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job";
- DCHECK(wait_interval_.get());
- DCHECK(mode_ == CONFIGURATION_MODE);
-
- SyncSession* old = job.session.get();
- SyncSession* s(new SyncSession(session_context_.get(), this,
- old->source(), old->routing_info(), old->workers()));
- SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
- make_linked_ptr(s), false, job.nudge_location);
- wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
- } // drop the rest.
-}
-
-// Functor for std::find_if to search by ModelSafeGroup.
-struct ModelSafeWorkerGroupIs {
- explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
- bool operator()(ModelSafeWorker* w) {
- return group == w->GetModelSafeGroup();
- }
- ModelSafeGroup group;
-};
-
-void SyncerThread::ScheduleClearUserData() {
- if (!thread_.IsRunning()) {
- NOTREACHED();
- return;
- }
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleClearUserDataImpl));
-}
-
-void SyncerThread::ScheduleNudge(const TimeDelta& delay,
- NudgeSource source, const ModelTypeBitSet& types,
- const tracked_objects::Location& nudge_location) {
- if (!thread_.IsRunning()) {
- NOTREACHED();
- return;
- }
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled";
-
- ModelTypePayloadMap types_with_payloads =
- syncable::ModelTypePayloadMapFromBitSet(types, std::string());
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleNudgeImpl, delay,
- GetUpdatesFromNudgeSource(source), types_with_payloads, false,
- nudge_location));
-}
-
-void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
- NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
- const tracked_objects::Location& nudge_location) {
- if (!thread_.IsRunning()) {
- NOTREACHED();
- return;
- }
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";
-
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleNudgeImpl, delay,
- GetUpdatesFromNudgeSource(source), types_with_payloads, false,
- nudge_location));
-}
-
-void SyncerThread::ScheduleClearUserDataImpl() {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- SyncSession* session = new SyncSession(session_context_.get(), this,
- SyncSourceInfo(), ModelSafeRoutingInfo(),
- std::vector<ModelSafeWorker*>());
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
- SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
-}
-
-void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
- GetUpdatesCallerInfo::GetUpdatesSource source,
- const ModelTypePayloadMap& types_with_payloads,
- bool is_canary_job, const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
- // Note we currently nudge for all types regardless of the ones incurring
- // the nudge. Doing different would throw off some syncer commands like
- // CleanupDisabledTypes. We may want to change this in the future.
- SyncSourceInfo info(source, types_with_payloads);
-
- SyncSession* session(CreateSyncSession(info));
- SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
- make_linked_ptr(session), is_canary_job,
- nudge_location);
-
- session = NULL;
- if (!ShouldRunJob(job))
- return;
-
- if (pending_nudge_.get()) {
- if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
- << "we are in backoff";
- return;
- }
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
- pending_nudge_->session->Coalesce(*(job.session.get()));
-
- if (!IsBackingOff()) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
- << " we are not in backoff and the job was coalesced";
- return;
- } else {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Rescheduling pending nudge";
- SyncSession* s = pending_nudge_->session.get();
- job.session.reset(new SyncSession(s->context(), s->delegate(),
- s->source(), s->routing_info(), s->workers()));
- pending_nudge_.reset();
- }
- }
-
- // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
- ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
- nudge_location);
-}
-
-// Helper to extract the routing info and workers corresponding to types in
-// |types| from |registrar|.
-void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
- ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
- std::vector<ModelSafeWorker*>* workers) {
- ModelSafeRoutingInfo r_tmp;
- std::vector<ModelSafeWorker*> w_tmp;
- registrar->GetModelSafeRoutingInfo(&r_tmp);
- registrar->GetWorkers(&w_tmp);
-
- typedef std::vector<ModelSafeWorker*>::const_iterator iter;
- for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
- if (!types.test(i))
- continue;
- syncable::ModelType t = syncable::ModelTypeFromInt(i);
- DCHECK_EQ(1U, r_tmp.count(t));
- (*routes)[t] = r_tmp[t];
- iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
- ModelSafeWorkerGroupIs(r_tmp[t]));
- if (it != w_tmp.end())
- workers->push_back(*it);
- else
- NOTREACHED();
- }
-
- iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
- ModelSafeWorkerGroupIs(GROUP_PASSIVE));
- if (it != w_tmp.end())
- workers->push_back(*it);
- else
- NOTREACHED();
-}
-
-void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
- if (!thread_.IsRunning()) {
- NOTREACHED();
- return;
- }
-
- VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config";
- ModelSafeRoutingInfo routes;
- std::vector<ModelSafeWorker*> workers;
- GetModelSafeParamsForTypes(types, session_context_->registrar(),
- &routes, &workers);
-
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- this, &SyncerThread::ScheduleConfigImpl, routes, workers,
- GetUpdatesCallerInfo::FIRST_UPDATE));
-}
-
-void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
- const std::vector<ModelSafeWorker*>& workers,
- const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
-
- VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
- // TODO(tim): config-specific GetUpdatesCallerInfo value?
- SyncSession* session = new SyncSession(session_context_.get(), this,
- SyncSourceInfo(source,
- syncable::ModelTypePayloadMapFromRoutingInfo(
- routing_info, std::string())),
- routing_info, workers);
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
- SyncSessionJob::CONFIGURATION, session, FROM_HERE);
-}
-
-void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
- SyncSessionJob::SyncSessionJobPurpose purpose,
- sessions::SyncSession* session,
- const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
-
- SyncSessionJob job(purpose, TimeTicks::Now() + delay,
- make_linked_ptr(session), false, nudge_location);
- if (purpose == SyncSessionJob::NUDGE) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
- << " ScheduleSyncSessionJob";
- DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
- pending_nudge_.reset(new SyncSessionJob(job));
- }
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Posting job to execute in DoSyncSessionJob. Job purpose "
- << job.purpose;
- MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
- &SyncerThread::DoSyncSessionJob, job),
- delay.InMilliseconds());
-}
-
-void SyncerThread::SetSyncerStepsForPurpose(
- SyncSessionJob::SyncSessionJobPurpose purpose,
- SyncerStep* start, SyncerStep* end) {
- *end = SYNCER_END;
- switch (purpose) {
- case SyncSessionJob::CONFIGURATION:
- *start = DOWNLOAD_UPDATES;
- *end = APPLY_UPDATES;
- return;
- case SyncSessionJob::CLEAR_USER_DATA:
- *start = CLEAR_PRIVATE_DATA;
- return;
- case SyncSessionJob::NUDGE:
- case SyncSessionJob::POLL:
- *start = SYNCER_BEGIN;
- return;
- default:
- NOTREACHED();
- }
-}
-
-void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- if (!ShouldRunJob(job))
- return;
-
- if (job.purpose == SyncSessionJob::NUDGE) {
- DCHECK(pending_nudge_.get());
- if (pending_nudge_->session != job.session)
- return; // Another nudge must have been scheduled in in the meantime.
- pending_nudge_.reset();
- }
- VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
- << job.purpose;
-
- SyncerStep begin(SYNCER_BEGIN);
- SyncerStep end(SYNCER_END);
- SetSyncerStepsForPurpose(job.purpose, &begin, &end);
-
- bool has_more_to_sync = true;
- while (ShouldRunJob(job) && has_more_to_sync) {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " SyncerThread: Calling SyncShare.";
- // Synchronously perform the sync session from this thread.
- syncer_->SyncShare(job.session.get(), begin, end);
- has_more_to_sync = job.session->HasMoreToSync();
- if (has_more_to_sync)
- job.session->ResetTransientState();
- }
- VLOG(2) << "SyncerThread(" << this << ")"
- << " SyncerThread: Done SyncShare looping.";
- FinishSyncSessionJob(job);
-}
-
-void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
- // Whatever types were part of a configuration task will have had updates
- // downloaded. For that reason, we make sure they get recorded in the
- // event that they get disabled at a later time.
- ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
- if (!r.empty()) {
- ModelSafeRoutingInfo temp_r;
- ModelSafeRoutingInfo old_info(old_job.session->routing_info());
- std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
- std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
- session_context_->set_previous_session_routing_info(temp_r);
- }
- } else {
- session_context_->set_previous_session_routing_info(
- old_job.session->routing_info());
- }
-}
-
-void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- // Update timing information for how often datatypes are triggering nudges.
- base::TimeTicks now = TimeTicks::Now();
- if (!last_sync_session_end_time_.is_null()) {
- ModelTypePayloadMap::const_iterator iter;
- for (iter = job.session->source().types.begin();
- iter != job.session->source().types.end();
- ++iter) {
- syncable::PostTimeToTypeHistogram(iter->first,
- now - last_sync_session_end_time_);
- }
- }
- last_sync_session_end_time_ = now;
- UpdateCarryoverSessionState(job);
- if (IsSyncingCurrentlySilenced()) {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " We are currently throttled. So not scheduling the next sync.";
- SaveJob(job);
- return; // Nothing to do.
- }
-
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Updating the next polling time after SyncMain";
- ScheduleNextSync(job);
-}
-
-void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- DCHECK(!old_job.session->HasMoreToSync());
- // Note: |num_server_changes_remaining| > 0 here implies that we received a
- // broken response while trying to download all updates, because the Syncer
- // will loop until this value is exhausted. Also, if unsynced_handles exist
- // but HasMoreToSync is false, this implies that the Syncer determined no
- // forward progress was possible at this time (an error, such as an HTTP
- // 500, is likely to have occurred during commit).
- const bool work_to_do =
- old_job.session->status_controller()->num_server_changes_remaining() > 0
- || old_job.session->status_controller()->unsynced_handles().size() > 0;
- VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: "
- << work_to_do;
-
- AdjustPolling(&old_job);
-
- // TODO(tim): Old impl had special code if notifications disabled. Needed?
- if (!work_to_do) {
- // Success implies backoff relief. Note that if this was a "one-off" job
- // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
- // work_to_do before it ran this wont have changed, as jobs like this don't
- // run a full sync cycle. So we don't need special code here.
- wait_interval_.reset();
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Job suceeded so not scheduling more jobs";
- return;
- }
-
- if (old_job.session->source().updates_source ==
- GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Job failed with source continuation";
- // We don't seem to have made forward progress. Start or extend backoff.
- HandleConsecutiveContinuationError(old_job);
- } else if (IsBackingOff()) {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " A nudge during backoff failed";
- // We weren't continuing but we're in backoff; must have been a nudge.
- DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
- DCHECK(!wait_interval_->had_nudge);
- wait_interval_->had_nudge = true;
- wait_interval_->timer.Reset();
- } else {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " Failed. Schedule a job with continuation as source";
- // We weren't continuing and we aren't in backoff. Schedule a normal
- // continuation.
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
- ScheduleConfigImpl(old_job.session->routing_info(),
- old_job.session->workers(),
- GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
- } else {
- // For all other purposes(nudge and poll) we schedule a retry nudge.
- ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
- GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
- old_job.session->source().types, false, FROM_HERE);
- }
- }
-}
-
-void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
- DCHECK(thread_.IsRunning());
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
-
- TimeDelta poll = (!session_context_->notifications_enabled()) ?
- syncer_short_poll_interval_seconds_ :
- syncer_long_poll_interval_seconds_;
- bool rate_changed = !poll_timer_.IsRunning() ||
- poll != poll_timer_.GetCurrentDelay();
-
- if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
- poll_timer_.Reset();
-
- if (!rate_changed)
- return;
-
- // Adjust poll rate.
- poll_timer_.Stop();
- poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
-}
-
-void SyncerThread::HandleConsecutiveContinuationError(
- const SyncSessionJob& old_job) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- // This if conditions should be compiled out in retail builds.
- if (IsBackingOff()) {
- DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
- }
- SyncSession* old = old_job.session.get();
- SyncSession* s(new SyncSession(session_context_.get(), this,
- old->source(), old->routing_info(), old->workers()));
- TimeDelta length = delay_provider_->GetDelay(
- IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
-
- VLOG(2) << "SyncerThread(" << this << ")"
- << " In handle continuation error. Old job purpose is "
- << old_job.purpose;
- VLOG(2) << "SyncerThread(" << this << ")"
- << " In Handle continuation error. The time delta(ms) is: "
- << length.InMilliseconds();
-
- // This will reset the had_nudge variable as well.
- wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
- length));
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
- SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
- make_linked_ptr(s), false, FROM_HERE);
- wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
- } else {
- // We are not in configuration mode. So wait_interval's pending job
- // should be null.
- DCHECK(wait_interval_->pending_configure_job.get() == NULL);
-
- // TODO(lipalani) - handle clear user data.
- InitOrCoalescePendingJob(old_job);
- }
- wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
-}
-
-// static
-TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
- if (last_delay.InSeconds() >= kMaxBackoffSeconds)
- return TimeDelta::FromSeconds(kMaxBackoffSeconds);
-
- // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
- int64 backoff_s =
- std::max(static_cast<int64>(1),
- last_delay.InSeconds() * kBackoffRandomizationFactor);
-
- // Flip a coin to randomize backoff interval by +/- 50%.
- int rand_sign = base::RandInt(0, 1) * 2 - 1;
-
- // Truncation is adequate for rounding here.
- backoff_s = backoff_s +
- (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
-
- // Cap the backoff interval.
- backoff_s = std::max(static_cast<int64>(1),
- std::min(backoff_s, kMaxBackoffSeconds));
-
- return TimeDelta::FromSeconds(backoff_s);
-}
-
-void SyncerThread::Stop() {
- VLOG(2) << "SyncerThread(" << this << ")" << " stop called";
- syncer_->RequestEarlyExit(); // Safe to call from any thread.
- session_context_->connection_manager()->RemoveListener(this);
- thread_.Stop();
-}
-
-void SyncerThread::DoCanaryJob() {
- VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job";
- DoPendingJobIfPossible(true);
-}
-
-void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
- SyncSessionJob* job_to_execute = NULL;
- if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
- && wait_interval_->pending_configure_job.get()) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job";
- job_to_execute = wait_interval_->pending_configure_job.get();
- } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job";
- // Pending jobs mostly have time from the past. Reset it so this job
- // will get executed.
- if (pending_nudge_->scheduled_start < TimeTicks::Now())
- pending_nudge_->scheduled_start = TimeTicks::Now();
-
- scoped_ptr<SyncSession> session(CreateSyncSession(
- pending_nudge_->session->source()));
-
- // Also the routing info might have been changed since we cached the
- // pending nudge. Update it by coalescing to the latest.
- pending_nudge_->session->Coalesce(*(session.get()));
- // The pending nudge would be cleared in the DoSyncSessionJob function.
- job_to_execute = pending_nudge_.get();
- }
-
- if (job_to_execute != NULL) {
- VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job";
- SyncSessionJob copy = *job_to_execute;
- copy.is_canary_job = is_canary_job;
- DoSyncSessionJob(copy);
- }
-}
-
-SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
- ModelSafeRoutingInfo routes;
- std::vector<ModelSafeWorker*> workers;
- session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
- session_context_->registrar()->GetWorkers(&workers);
- SyncSourceInfo info(source);
-
- SyncSession* session(new SyncSession(session_context_.get(), this, info,
- routes, workers));
-
- return session;
-}
-
-void SyncerThread::PollTimerCallback() {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- ModelSafeRoutingInfo r;
- ModelTypePayloadMap types_with_payloads =
- syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
- SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
- SyncSession* s = CreateSyncSession(info);
- ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
- FROM_HERE);
-}
-
-void SyncerThread::Unthrottle() {
- DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
- VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled..";
- DoCanaryJob();
- wait_interval_.reset();
-}
-
-void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- session_context_->NotifyListeners(SyncEngineEvent(cause));
-}
-
-bool SyncerThread::IsBackingOff() const {
- return wait_interval_.get() && wait_interval_->mode ==
- WaitInterval::EXPONENTIAL_BACKOFF;
-}
-
-void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
- wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
- silenced_until - TimeTicks::Now()));
- wait_interval_->timer.Start(wait_interval_->length, this,
- &SyncerThread::Unthrottle);
-}
-
-bool SyncerThread::IsSyncingCurrentlySilenced() {
- return wait_interval_.get() && wait_interval_->mode ==
- WaitInterval::THROTTLED;
-}
-
-void SyncerThread::OnReceivedShortPollIntervalUpdate(
- const base::TimeDelta& new_interval) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- syncer_short_poll_interval_seconds_ = new_interval;
-}
-
-void SyncerThread::OnReceivedLongPollIntervalUpdate(
- const base::TimeDelta& new_interval) {
- DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
- syncer_long_poll_interval_seconds_ = new_interval;
-}
-
-void SyncerThread::OnShouldStopSyncingPermanently() {
- VLOG(2) << "SyncerThread(" << this << ")"
- << " OnShouldStopSyncingPermanently";
- syncer_->RequestEarlyExit(); // Thread-safe.
- Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
-}
-
-void SyncerThread::OnServerConnectionEvent(
- const ServerConnectionEvent2& event) {
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
- &SyncerThread::CheckServerConnectionManagerStatus,
- event.connection_code));
-}
-
-void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
- session_context_->set_notifications_enabled(notifications_enabled);
-}
-
-} // s3
-} // browser_sync
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread2.h ('k') | chrome/browser/sync/engine/syncer_thread2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698