Chromium Code Reviews| Index: chrome/browser/sync/engine/syncer_thread2.cc |
| diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc |
| new file mode 100755 |
| index 0000000000000000000000000000000000000000..20be3c7711ef7316fc9a7c847cea9cbfa81bc307 |
| --- /dev/null |
| +++ b/chrome/browser/sync/engine/syncer_thread2.cc |
| @@ -0,0 +1,452 @@ |
| +// Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "chrome/browser/sync/engine/syncer_thread2.h" |
| + |
| +#include "base/rand_util.h" |
| +#include "chrome/browser/sync/engine/syncer.h" |
| + |
| +using base::TimeDelta; |
| +using base::TimeTicks; |
| + |
| +namespace browser_sync { |
| + |
| +using sessions::SyncSession; |
| +using sessions::SyncSessionSnapshot; |
| +using sessions::SyncSourceInfo; |
| +using syncable::ModelTypeBitSet; |
| +using sync_pb::GetUpdatesCallerInfo; |
| + |
| +namespace s3 { |
| + |
| +SyncerThread::SyncerThread(sessions::SyncSessionContext* context, |
| + Syncer* syncer) |
| + : thread_("SyncEngine_SyncerThread"), |
| + syncer_short_poll_interval_seconds_( |
| + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
| + syncer_long_poll_interval_seconds_( |
| + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
| + server_connection_ok_(false), |
| + syncer_(syncer), |
| + session_context_(context) { |
| +} |
| + |
| +SyncerThread::~SyncerThread() { |
| + DCHECK(!thread_.IsRunning()); |
| +} |
| + |
| +void SyncerThread::Start(Mode mode) { |
| + if (!thread_.IsRunning() && !thread_.Start()) { |
| + NOTREACHED() << "Unable to start SyncerThread."; |
| + return; |
| + } |
| + |
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| + this, &SyncerThread::StartImpl, mode)); |
| +} |
| + |
| +void SyncerThread::StartImpl(Mode mode) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + DCHECK(!session_context_->account_name().empty()); |
| + DCHECK(syncer_.get()); |
| + mode_ = mode; |
| + AdjustPolling(NULL); // Will kick start poll timer if needed. |
| +} |
| + |
| +bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose, |
| + const TimeTicks& scheduled_start) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + |
| + // Check wait interval. |
| + if (wait_interval_.get()) { |
| + if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| + return false; |
| + } else { |
|
akalin
2011/01/20 00:47:57
no need for else since the if returns.
tim (not reviewing)
2011/01/25 03:23:05
Done.
|
| + DCHECK_EQ(wait_interval_->mode, |
| + WaitInterval::EXPONENTIAL_BACKOFF); |
| + switch (purpose) { |
|
akalin
2011/01/20 00:47:57
isn't this clearer as:
DCHECK(purpose == POLL ||
tim (not reviewing)
2011/01/25 03:23:05
Done.
|
| + case SyncSessionJob::NUDGE: |
| + if (!wait_interval_->had_nudge) |
| + break; |
| + default: |
| + DCHECK(purpose == SyncSessionJob::POLL || |
| + purpose == SyncSessionJob::NUDGE); |
| + return false; |
| + } |
| + } |
| + } |
| + |
| + // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that |
| + // were intended for a normal sync if we are in configuration mode, and vice |
| + // versa. |
| + switch (mode_) { |
| + case CONFIGURATION_MODE: |
| + if (purpose != SyncSessionJob::CONFIGURATION) |
| + return false; |
| + break; |
| + case NORMAL_MODE: |
| + if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE) |
| + return false; |
| + break; |
| + default: |
| + NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; |
| + return false; |
| + } |
| + |
| + // Freshness condition. |
| + if (purpose == SyncSessionJob::NUDGE && |
| + (scheduled_start < last_sync_session_end_time_)) { |
| + return false; |
| + } |
| + |
| + return server_connection_ok_; |
| +} |
| + |
| +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
|
akalin
2011/01/20 00:47:57
anon namespace?
tim (not reviewing)
2011/01/25 03:23:05
Done.
|
| + NudgeSource source) { |
| + switch (source) { |
| + case NUDGE_SOURCE_NOTIFICATION: |
| + return GetUpdatesCallerInfo::NOTIFICATION; |
| + case NUDGE_SOURCE_LOCAL: |
| + return GetUpdatesCallerInfo::LOCAL; |
| + case NUDGE_SOURCE_CONTINUATION: |
| + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
| + case NUDGE_SOURCE_CLEAR_PRIVATE_DATA: |
| + return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; |
| + case NUDGE_SOURCE_UNKNOWN: |
| + default: |
| + return GetUpdatesCallerInfo::UNKNOWN; |
| + } |
| +} |
| + |
| +void SyncerThread::ScheduleNudge(const TimeDelta& delay, |
| + NudgeSource source, const ModelTypeBitSet& types) { |
| + if (!thread_.IsRunning()) { |
| + NOTREACHED(); |
| + return; |
| + } |
| + |
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| + this, &SyncerThread::ScheduleNudgeImpl, delay, source, types)); |
| +} |
| + |
| +// Functor for std::find_if to search by ModelSafeGroup. |
|
akalin
2011/01/20 00:47:57
anon namespace?
tim (not reviewing)
2011/01/25 03:23:05
Done.
|
| +struct WorkerGroupIs { |
| + explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {} |
| + bool operator()(ModelSafeWorker* w) { |
| + return group == w->GetModelSafeGroup(); |
| + } |
| + ModelSafeGroup group; |
| +}; |
| + |
| +void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
| + NudgeSource source, const ModelTypeBitSet& model_types) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + TimeTicks rough_start = TimeTicks::Now() + delay; |
| + if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start)) |
| + return; |
| + |
| + // Note we currently nudge for all types regardless of the ones incurring |
| + // the nudge. Doing different would throw off some syncer commands like |
| + // CleanupDisabledTypes. We may want to change this in the future. |
| + ModelSafeRoutingInfo routes; |
| + std::vector<ModelSafeWorker*> workers; |
| + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
| + session_context_->registrar()->GetWorkers(&workers); |
| + SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types); |
| + |
| + scoped_ptr<SyncSession> session(new SyncSession( |
| + session_context_.get(), this, info, routes, workers)); |
| + |
| + if (pending_nudge_.get()) { |
| + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) |
| + return; |
| + |
| + pending_nudge_->session->Coalesce(session.get()); |
| + if (!IsBackingOff()) |
| + return; |
| + } |
| + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release()); |
| +} |
| + |
| +// Helper to extract the routing info and workers corresponding to types in |
| +// |types| from |registrar|. |
| +void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, |
| + ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, |
| + std::vector<ModelSafeWorker*>* workers) { |
| + ModelSafeRoutingInfo r_tmp; |
| + std::vector<ModelSafeWorker*> w_tmp; |
| + registrar->GetModelSafeRoutingInfo(&r_tmp); |
| + registrar->GetWorkers(&w_tmp); |
| + |
| + typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
| + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { |
| + if (!types.test(i)) |
| + continue; |
| + syncable::ModelType t = syncable::ModelTypeFromInt(i); |
| + DCHECK_EQ(1U, r_tmp.count(t)); |
| + (*routes)[t] = r_tmp[t]; |
| + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t])); |
| + DCHECK(w_tmp.end() != it); |
|
akalin
2011/01/20 00:47:57
maybe:
if (it != w_tmp.end() {
workers->push_ba
tim (not reviewing)
2011/01/25 03:23:05
Yeah, probably safer.
|
| + workers->push_back(*it); |
| + } |
| + |
| + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), |
| + WorkerGroupIs(GROUP_PASSIVE)); |
| + DCHECK(w_tmp.end() != it); |
|
akalin
2011/01/20 00:47:57
here, too
tim (not reviewing)
2011/01/25 03:23:05
Done.
|
| + workers->push_back(*it); |
| +} |
| + |
| +void SyncerThread::ScheduleConfig(const TimeDelta& delay, |
| + const ModelTypeBitSet& types) { |
| + if (!thread_.IsRunning()) { |
| + NOTREACHED(); |
| + return; |
| + } |
| + |
| + ModelSafeRoutingInfo routes; |
| + std::vector<ModelSafeWorker*> workers; |
| + GetModelSafeParamsForTypes(types, session_context_->registrar(), |
| + &routes, &workers); |
| + |
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| + this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers)); |
| +} |
| + |
| +void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay, |
| + const ModelSafeRoutingInfo& routing_info, |
| + const std::vector<ModelSafeWorker*>& workers) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + NOTIMPLEMENTED() << "TODO(tim)"; |
| +} |
| + |
| +void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
| + SyncSessionJob::Purpose purpose, sessions::SyncSession* session) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + SyncSessionJob job = {purpose, TimeTicks::Now() + delay, |
| + make_linked_ptr(session)}; |
| + if (purpose == SyncSessionJob::NUDGE) { |
| + DCHECK(!pending_nudge_.get()); |
| + pending_nudge_.reset(new SyncSessionJob(job)); |
| + } |
| + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, |
| + &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); |
| +} |
| + |
| +void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + |
| + if (job.purpose == SyncSessionJob::NUDGE) { |
| + DCHECK(pending_nudge_.get()); |
| + if (pending_nudge_->session != job.session) |
| + return; // Another nudge must have been scheduled in in the meantime. |
| + pending_nudge_.reset(); |
| + } else if (job.purpose == SyncSessionJob::CONFIGURATION) { |
| + NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]"; |
| + } |
| + |
| + bool has_more_to_sync = true; |
| + while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { |
| + VLOG(1) << "SyncerThread: Calling SyncShare."; |
| + // Synchronously perform the sync session from this thread. |
| + syncer_->SyncShare(job.session.get()); |
| + has_more_to_sync = job.session->HasMoreToSync(); |
| + if (has_more_to_sync) |
| + job.session->ResetTransientState(); |
| + } |
| + VLOG(1) << "SyncerThread: Done SyncShare looping."; |
| + FinishSyncSessionJob(job); |
| +} |
| + |
| +void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + // Update timing information for how often datatypes are triggering nudges. |
| + base::TimeTicks now = TimeTicks::Now(); |
| + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; |
| + i < job.session->source().second.size() && |
| + !last_sync_session_end_time_.is_null(); |
| + ++i) { |
| + if (job.session->source().second[i]) { |
| + syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i), |
| + now - last_sync_session_end_time_); |
| + } |
| + } |
| + last_sync_session_end_time_ = now; |
| + if (IsSyncingCurrentlySilenced()) |
| + return; // Nothing to do. |
| + |
| + VLOG(1) << "Updating the next polling time after SyncMain"; |
| + ScheduleNextSync(job); |
| +} |
| + |
| +void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + const bool work_to_do = |
| + old_job.session->status_controller()->num_server_changes_remaining() > 0 |
| + || old_job.session->status_controller()->unsynced_handles().size() > 0; |
| + VLOG(1) << "syncer has work to do: " << work_to_do; |
| + |
| + AdjustPolling(&old_job); |
| + |
| + // TODO(tim): Old impl had special code if notifications disabled. Needed? |
| + if (!work_to_do) |
| + return; |
| + |
| + if (old_job.session->source().first == |
| + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { |
| + // We don't seem to have made forward progress. Start or extend backoff. |
| + HandleConsecutiveContinuationError(old_job); |
| + } else if (IsBackingOff()) { |
| + // We weren't continuing but we're in backoff; must have been a nudge. |
| + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
| + DCHECK(!wait_interval_->had_nudge); |
| + wait_interval_->had_nudge = true; |
| + wait_interval_->timer.Reset(); |
| + } else { |
| + // We weren't continuing and we aren't in backoff. Schedule a normal |
| + // continuation. |
| + ScheduleNudgeImpl(GetRecommendedDelay(TimeDelta::FromSeconds(0)), |
| + NUDGE_SOURCE_CONTINUATION, |
|
Nicolas Zea
2011/01/20 20:01:57
Just for my understanding here, is this called whe
tim (not reviewing)
2011/01/25 03:23:05
Typically this means some sort of error occurred b
|
| + old_job.session->source().second); |
| + } |
| +} |
| + |
| +void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { |
| + DCHECK(thread_.IsRunning()); |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + |
| + TimeDelta poll = (!session_context_->notifications_enabled()) ? |
| + syncer_short_poll_interval_seconds_ : |
| + syncer_long_poll_interval_seconds_; |
| + bool rate_changed = !poll_timer_.IsRunning() || |
| + poll != poll_timer_.GetCurrentDelay(); |
| + |
| + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) |
| + poll_timer_.Reset(); |
| + |
| + if (!rate_changed) |
| + return; |
| + |
| + // Adjust poll rate. |
| + poll_timer_.Stop(); |
| + poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); |
| +} |
| + |
| +void SyncerThread::HandleConsecutiveContinuationError( |
| + const SyncSessionJob& old_job) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); |
| + SyncSession* old = old_job.session.get(); |
| + SyncSession* s(new SyncSession(session_context_.get(), this, |
| + old->source(), old->routing_info(), old->workers())); |
| + TimeDelta length = GetRecommendedDelay( |
| + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
| + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| + length)); |
| + SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, |
| + make_linked_ptr(s)}; |
| + pending_nudge_.reset(new SyncSessionJob(job)); |
| + wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); |
| +} |
| + |
| +// static |
| +TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { |
| + if (last_delay.InSeconds() >= kMaxBackoffSeconds) |
| + return TimeDelta::FromSeconds(kMaxBackoffSeconds); |
| + |
| + // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 |
| + int64 backoff_s = |
| + std::max(1LL, last_delay.InSeconds() * kBackoffRandomizationFactor); |
| + |
| + // Flip a coin to randomize backoff interval by +/- 50%. |
| + int rand_sign = base::RandInt(0, 1) * 2 - 1; |
| + |
| + // Truncation is adequate for rounding here. |
| + backoff_s = backoff_s + |
| + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); |
| + |
| + // Cap the backoff interval. |
| + backoff_s = std::max(1LL, std::min(backoff_s, kMaxBackoffSeconds)); |
| + |
| + return TimeDelta::FromSeconds(backoff_s); |
| +} |
| + |
| +void SyncerThread::Stop() { |
| + syncer_->RequestEarlyExit(); // Safe to call from any thread. |
| + thread_.Stop(); |
| + Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); |
| +} |
| + |
| +void SyncerThread::DoCanaryJob() { |
| + DCHECK(pending_nudge_.get()); |
| + wait_interval_->had_nudge = false; |
| + DoSyncSessionJob(*pending_nudge_); |
| +} |
| + |
| +void SyncerThread::PollTimerCallback() { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + ModelSafeRoutingInfo r; |
| + std::vector<ModelSafeWorker*> w; |
| + session_context_->registrar()->GetModelSafeRoutingInfo(&r); |
| + session_context_->registrar()->GetWorkers(&w); |
| + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet()); |
| + SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); |
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s); |
| +} |
| + |
| +void SyncerThread::Unthrottle() { |
| + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| + wait_interval_.reset(); |
| +} |
| + |
| +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + session_context_->NotifyListeners(SyncEngineEvent(cause)); |
| +} |
| + |
| +bool SyncerThread::IsBackingOff() const { |
| + return wait_interval_.get() && wait_interval_->mode == |
| + WaitInterval::EXPONENTIAL_BACKOFF; |
| +} |
| + |
| +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { |
| + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
| + silenced_until - TimeTicks::Now())); |
| + wait_interval_->timer.Start(wait_interval_->length, this, |
| + &SyncerThread::Unthrottle); |
| +} |
| + |
| +bool SyncerThread::IsSyncingCurrentlySilenced() { |
|
akalin
2011/01/20 00:47:57
const method?
tim (not reviewing)
2011/01/25 03:23:05
Yeah, probably should be - I started to make this
|
| + return wait_interval_.get() && wait_interval_->mode == |
| + WaitInterval::THROTTLED; |
| +} |
| + |
| +void SyncerThread::OnReceivedShortPollIntervalUpdate( |
| + const base::TimeDelta& new_interval) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + syncer_short_poll_interval_seconds_ = new_interval; |
| +} |
| + |
| +void SyncerThread::OnReceivedLongPollIntervalUpdate( |
| + const base::TimeDelta& new_interval) { |
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| + syncer_long_poll_interval_seconds_ = new_interval; |
| +} |
| + |
| +void SyncerThread::OnShouldStopSyncingPermanently() { |
| + syncer_->RequestEarlyExit(); // Thread-safe. |
| + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
| +} |
| + |
| +void SyncerThread::OnServerConnectionEvent( |
| + const ServerConnectionEvent& event) { |
| + NOTIMPLEMENTED(); |
| +} |
| + |
| +void SyncerThread::set_notifications_enabled(bool notifications_enabled) { |
| + session_context_->set_notifications_enabled(notifications_enabled); |
| +} |
| + |
| +} // s3 |
| +} // browser_sync |