Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "chrome/browser/sync/engine/syncer_thread2.h" | |
| 6 | |
| 7 #include "base/rand_util.h" | |
| 8 #include "chrome/browser/sync/engine/syncer.h" | |
| 9 | |
| 10 using base::TimeDelta; | |
| 11 using base::TimeTicks; | |
| 12 | |
| 13 namespace browser_sync { | |
| 14 | |
| 15 using sessions::SyncSession; | |
| 16 using sessions::SyncSessionSnapshot; | |
| 17 using sessions::SyncSourceInfo; | |
| 18 using syncable::ModelTypeBitSet; | |
| 19 using sync_pb::GetUpdatesCallerInfo; | |
| 20 | |
| 21 namespace s3 { | |
| 22 | |
| 23 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, | |
| 24 Syncer* syncer) | |
| 25 : thread_("SyncEngine_SyncerThread"), | |
| 26 syncer_short_poll_interval_seconds_( | |
| 27 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | |
| 28 syncer_long_poll_interval_seconds_( | |
| 29 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | |
| 30 server_connection_ok_(false), | |
| 31 syncer_(syncer), | |
| 32 session_context_(context) { | |
| 33 } | |
| 34 | |
| 35 SyncerThread::~SyncerThread() { | |
| 36 DCHECK(!thread_.IsRunning()); | |
| 37 } | |
| 38 | |
| 39 void SyncerThread::Start(Mode mode) { | |
| 40 if (!thread_.IsRunning() && !thread_.Start()) { | |
| 41 NOTREACHED() << "Unable to start SyncerThread."; | |
| 42 return; | |
| 43 } | |
| 44 | |
| 45 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
| 46 this, &SyncerThread::StartImpl, mode)); | |
| 47 } | |
| 48 | |
| 49 void SyncerThread::StartImpl(Mode mode) { | |
| 50 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 51 DCHECK(!session_context_->account_name().empty()); | |
| 52 DCHECK(syncer_.get()); | |
| 53 mode_ = mode; | |
| 54 AdjustPolling(NULL); // Will kick start poll timer if needed. | |
| 55 } | |
| 56 | |
| 57 bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose, | |
| 58 const TimeTicks& scheduled_start) { | |
| 59 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 60 | |
| 61 // Check wait interval. | |
| 62 if (wait_interval_.get()) { | |
| 63 if (wait_interval_->mode == WaitInterval::THROTTLED) { | |
| 64 return false; | |
| 65 } else { | |
| 66 DCHECK_EQ(wait_interval_->mode, | |
| 67 WaitInterval::EXPONENTIAL_BACKOFF); | |
| 68 switch (purpose) { | |
| 69 case SyncSessionJob::NUDGE: | |
| 70 if (!wait_interval_->had_nudge) | |
|
Nicolas Zea
2010/12/23 21:49:20
Why don't we back off if we haven't had a nudge?
tim (not reviewing)
2010/12/24 04:23:47
the code allows one nudge per backoff interval (ak
| |
| 71 break; | |
| 72 default: | |
| 73 DCHECK(purpose == SyncSessionJob::POLL || | |
| 74 purpose == SyncSessionJob::NUDGE); | |
| 75 return false; | |
| 76 } | |
| 77 } | |
| 78 } | |
| 79 | |
| 80 // Mode / purpose contract. | |
|
Nicolas Zea
2010/12/23 21:49:20
An explanation of when the contract might be viola
tim (not reviewing)
2010/12/24 04:23:47
Ok. I'll adapt the 'Mode' comment in the header a
| |
| 81 switch (mode_) { | |
| 82 case CONFIGURATION_MODE: | |
| 83 if (purpose != SyncSessionJob::CONFIGURATION) | |
| 84 return false; | |
| 85 break; | |
| 86 case NORMAL_MODE: | |
| 87 if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE) | |
| 88 return false; | |
| 89 break; | |
| 90 default: | |
| 91 NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; | |
| 92 return false; | |
| 93 } | |
| 94 | |
| 95 // Freshness condition. | |
| 96 if (purpose == SyncSessionJob::NUDGE && | |
| 97 (scheduled_start < last_sync_session_end_time_)) { | |
| 98 return false; | |
| 99 } | |
| 100 | |
| 101 return server_connection_ok_; | |
| 102 } | |
| 103 | |
| 104 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | |
| 105 NudgeSource source) { | |
| 106 switch (source) { | |
| 107 case NUDGE_SOURCE_NOTIFICATION: | |
| 108 return GetUpdatesCallerInfo::NOTIFICATION; | |
| 109 case NUDGE_SOURCE_LOCAL: | |
| 110 return GetUpdatesCallerInfo::LOCAL; | |
| 111 case NUDGE_SOURCE_CONTINUATION: | |
| 112 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | |
| 113 case NUDGE_SOURCE_CLEAR_PRIVATE_DATA: | |
| 114 return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; | |
| 115 case NUDGE_SOURCE_UNKNOWN: | |
| 116 default: | |
| 117 return GetUpdatesCallerInfo::UNKNOWN; | |
| 118 } | |
| 119 } | |
| 120 | |
| 121 void SyncerThread::ScheduleNudge(const TimeDelta& delay, | |
| 122 NudgeSource source, const ModelTypeBitSet& types) { | |
| 123 if (!thread_.IsRunning()) { | |
| 124 NOTREACHED(); | |
| 125 return; | |
| 126 } | |
| 127 | |
| 128 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
| 129 this, &SyncerThread::ScheduleNudgeImpl, delay, source, types)); | |
| 130 } | |
| 131 | |
| 132 // Functor for std::find_if to search by ModelSafeGroup. | |
| 133 struct WorkerGroupIs { | |
| 134 explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {} | |
| 135 bool operator()(ModelSafeWorker* w) { | |
| 136 return group == w->GetModelSafeGroup(); | |
| 137 } | |
| 138 ModelSafeGroup group; | |
| 139 }; | |
| 140 | |
| 141 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, | |
| 142 NudgeSource source, const ModelTypeBitSet& model_types) { | |
| 143 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 144 TimeTicks rough_start = TimeTicks::Now() + delay; | |
| 145 if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start)) | |
| 146 return; | |
| 147 | |
| 148 // Note we currently nudge for all types regardless of the ones incurring | |
| 149 // the nudge. Doing different would throw off some syncer commands like | |
| 150 // CleanupDisabledTypes. We may want to change this in the future. | |
| 151 ModelSafeRoutingInfo routes; | |
| 152 std::vector<ModelSafeWorker*> workers; | |
| 153 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); | |
| 154 session_context_->registrar()->GetWorkers(&workers); | |
| 155 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types); | |
| 156 | |
| 157 scoped_refptr<SyncSession> session = new SyncSession( | |
| 158 session_context_.get(), this, info, routes, workers); | |
| 159 | |
| 160 if (pending_nudge_.get()) { | |
| 161 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) | |
| 162 return; | |
| 163 | |
| 164 pending_nudge_->session->Coalesce(session); | |
| 165 if (!IsBackingOff()) | |
| 166 return; | |
| 167 } | |
| 168 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release()); | |
| 169 } | |
| 170 | |
| 171 // Helper to extract the routing info and workers corresponding to types in | |
| 172 // |types| from |registrar|. | |
| 173 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, | |
| 174 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, | |
| 175 std::vector<ModelSafeWorker*>* workers) { | |
| 176 ModelSafeRoutingInfo r_tmp; | |
| 177 std::vector<ModelSafeWorker*> w_tmp; | |
| 178 registrar->GetModelSafeRoutingInfo(&r_tmp); | |
| 179 registrar->GetWorkers(&w_tmp); | |
| 180 | |
| 181 typedef std::vector<ModelSafeWorker*>::const_iterator iter; | |
| 182 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { | |
| 183 if (!types.test(i)) | |
| 184 continue; | |
| 185 syncable::ModelType t = syncable::ModelTypeFromInt(i); | |
| 186 DCHECK_EQ(1U, r_tmp.count(t)); | |
| 187 (*routes)[t] = r_tmp[t]; | |
| 188 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t])); | |
| 189 DCHECK(w_tmp.end() != it); | |
| 190 workers->push_back(*it); | |
| 191 } | |
| 192 | |
| 193 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), | |
| 194 WorkerGroupIs(GROUP_PASSIVE)); | |
| 195 DCHECK(w_tmp.end() != it); | |
| 196 workers->push_back(*it); | |
| 197 } | |
| 198 | |
| 199 void SyncerThread::ScheduleConfig(const TimeDelta& delay, | |
| 200 const ModelTypeBitSet& types) { | |
| 201 if (!thread_.IsRunning()) { | |
| 202 NOTREACHED(); | |
| 203 return; | |
| 204 } | |
| 205 | |
| 206 ModelSafeRoutingInfo routes; | |
| 207 std::vector<ModelSafeWorker*> workers; | |
| 208 GetModelSafeParamsForTypes(types, session_context_->registrar(), | |
| 209 &routes, &workers); | |
| 210 | |
| 211 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
| 212 this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers)); | |
| 213 } | |
| 214 | |
| 215 void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay, | |
| 216 const ModelSafeRoutingInfo& routing_info, | |
| 217 const std::vector<ModelSafeWorker*>& workers) { | |
| 218 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 219 NOTIMPLEMENTED() << "TODO(tim)"; | |
| 220 } | |
| 221 | |
| 222 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, | |
| 223 SyncSessionJob::Purpose purpose, sessions::SyncSession* session) { | |
| 224 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 225 SyncSessionJob job = {purpose, TimeTicks::Now() + delay, session}; | |
| 226 if (purpose == SyncSessionJob::NUDGE) { | |
| 227 DCHECK(!pending_nudge_.get()); | |
| 228 pending_nudge_.reset(new SyncSessionJob(job)); | |
| 229 } | |
| 230 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, | |
| 231 &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); | |
| 232 } | |
| 233 | |
| 234 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { | |
| 235 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 236 | |
| 237 if (job.purpose == SyncSessionJob::NUDGE) { | |
| 238 DCHECK(pending_nudge_.get()); | |
| 239 if (pending_nudge_->session != job.session) | |
| 240 return; // Another nudge must have been scheduled in in the meantime. | |
| 241 pending_nudge_.reset(); | |
| 242 } else if (job.purpose == SyncSessionJob::CONFIGURATION) { | |
| 243 NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]"; | |
| 244 } | |
| 245 | |
| 246 bool has_more_to_sync = true; | |
| 247 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { | |
| 248 VLOG(1) << "SyncerThread: Calling SyncShare."; | |
| 249 // Synchronously perform the sync session from this thread. | |
| 250 syncer_->SyncShare(job.session); | |
| 251 has_more_to_sync = job.session->HasMoreToSync(); | |
| 252 if (has_more_to_sync) | |
| 253 job.session->ResetTransientState(); | |
| 254 } | |
| 255 VLOG(1) << "SyncerThread: Done SyncShare looping."; | |
| 256 FinishSyncSessionJob(job); | |
| 257 } | |
| 258 | |
| 259 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { | |
| 260 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 261 // Update timing information for how often datatypes are triggering nudges. | |
| 262 base::TimeTicks now = TimeTicks::Now(); | |
| 263 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; | |
| 264 i < job.session->source().second.size() && | |
| 265 !last_sync_session_end_time_.is_null(); | |
| 266 ++i) { | |
| 267 if (job.session->source().second[i]) { | |
| 268 syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i), | |
| 269 now - last_sync_session_end_time_); | |
| 270 } | |
| 271 } | |
| 272 last_sync_session_end_time_ = now; | |
| 273 if (IsSyncingCurrentlySilenced()) | |
| 274 return; // Nothing to do. | |
| 275 | |
| 276 VLOG(1) << "Updating the next polling time after SyncMain"; | |
| 277 ScheduleNextSync(job); | |
| 278 } | |
| 279 | |
| 280 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { | |
| 281 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 282 const bool work_to_do = | |
| 283 old_job.session->status_controller()->num_server_changes_remaining() > | |
| 284 old_job.session->status_controller()->ComputeMaxLocalTimestamp() || | |
| 285 old_job.session->status_controller()->unsynced_handles().size() > 0; | |
| 286 VLOG(1) << "syncer has work to do: " << work_to_do; | |
| 287 | |
| 288 AdjustPolling(&old_job); | |
| 289 | |
| 290 // TODO(tim): Old impl had special code if notifications disabled. Needed? | |
| 291 if (!work_to_do) | |
| 292 return; | |
| 293 | |
| 294 if (old_job.session->source().first == | |
| 295 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { | |
| 296 // We don't seem to have made forward progress. Start or extend backoff. | |
| 297 HandleConsecutiveContinuationError(old_job); | |
| 298 } else if (IsBackingOff()) { | |
| 299 // We weren't continuing but we're in backoff; must have been a nudge. | |
| 300 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
| 301 DCHECK(!wait_interval_->had_nudge); | |
| 302 wait_interval_->had_nudge = true; | |
| 303 wait_interval_->timer.Reset(); | |
| 304 } else { | |
| 305 // We weren't continuing and we aren't in backoff. Schedule a normal | |
| 306 // continuation. | |
| 307 ScheduleNudgeImpl(GetRecommendedDelay(TimeDelta::FromSeconds(0)), | |
| 308 NUDGE_SOURCE_CONTINUATION, | |
| 309 old_job.session->source().second); | |
| 310 } | |
| 311 } | |
| 312 | |
| 313 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { | |
| 314 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 315 TimeDelta poll = (!session_context_->notifications_enabled()) ? | |
| 316 syncer_short_poll_interval_seconds_ : | |
| 317 syncer_long_poll_interval_seconds_; | |
| 318 bool rate_changed = !poll_timer_.IsRunning() || | |
| 319 poll != poll_timer_.GetCurrentDelay(); | |
| 320 | |
| 321 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | |
| 322 poll_timer_.Reset(); | |
| 323 | |
| 324 if (!rate_changed) | |
| 325 return; | |
| 326 | |
| 327 // Adjust poll rate. | |
| 328 poll_timer_.Stop(); | |
| 329 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); | |
| 330 } | |
| 331 | |
| 332 void SyncerThread::HandleConsecutiveContinuationError( | |
| 333 const SyncSessionJob& old_job) { | |
| 334 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 335 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); | |
| 336 SyncSession* old = old_job.session; | |
| 337 SyncSession* s(new SyncSession(session_context_.get(), this, | |
| 338 old->source(), old->routing_info(), old->workers())); | |
| 339 TimeDelta length = GetRecommendedDelay( | |
| 340 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); | |
| 341 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | |
| 342 length)); | |
| 343 SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, s}; | |
| 344 pending_nudge_.reset(new SyncSessionJob(job)); | |
| 345 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); | |
| 346 } | |
| 347 | |
| 348 // static | |
| 349 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { | |
| 350 if (last_delay.InSeconds() >= kMaxBackoffSeconds) | |
| 351 return TimeDelta::FromSeconds(kMaxBackoffSeconds); | |
| 352 | |
| 353 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 | |
| 354 int64 backoff_s = | |
| 355 std::max(1LL, last_delay.InSeconds() * kBackoffRandomizationFactor); | |
| 356 | |
| 357 // Flip a coin to randomize backoff interval by +/- 50%. | |
| 358 int rand_sign = base::RandInt(0, 1) * 2 - 1; | |
| 359 | |
| 360 // Truncation is adequate for rounding here. | |
| 361 backoff_s = backoff_s + | |
| 362 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); | |
| 363 | |
| 364 // Cap the backoff interval. | |
| 365 backoff_s = std::max(1LL, std::min(backoff_s, kMaxBackoffSeconds)); | |
| 366 | |
| 367 return TimeDelta::FromSeconds(backoff_s); | |
| 368 } | |
| 369 | |
| 370 void SyncerThread::Stop() { | |
| 371 syncer_->RequestEarlyExit(); // Safe to call from any thread. | |
| 372 thread_.Stop(); | |
| 373 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); | |
| 374 } | |
| 375 | |
| 376 void SyncerThread::DoCanaryJob() { | |
| 377 DCHECK(pending_nudge_.get()); | |
| 378 wait_interval_->had_nudge = false; | |
| 379 DoSyncSessionJob(*pending_nudge_); | |
| 380 } | |
| 381 | |
| 382 void SyncerThread::PollTimerCallback() { | |
| 383 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 384 ModelSafeRoutingInfo r; | |
| 385 std::vector<ModelSafeWorker*> w; | |
| 386 session_context_->registrar()->GetModelSafeRoutingInfo(&r); | |
| 387 session_context_->registrar()->GetWorkers(&w); | |
| 388 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet()); | |
| 389 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); | |
| 390 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s); | |
| 391 } | |
| 392 | |
| 393 void SyncerThread::Unthrottle() { | |
| 394 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | |
| 395 wait_interval_.reset(); | |
| 396 } | |
| 397 | |
| 398 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { | |
| 399 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
| 400 session_context_->NotifyListeners(SyncEngineEvent(cause)); | |
| 401 } | |
| 402 | |
| 403 bool SyncerThread::IsBackingOff() const { | |
| 404 return wait_interval_.get() && wait_interval_->mode == | |
| 405 WaitInterval::EXPONENTIAL_BACKOFF; | |
| 406 } | |
| 407 | |
| 408 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { | |
| 409 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | |
| 410 silenced_until - TimeTicks::Now())); | |
| 411 wait_interval_->timer.Start(wait_interval_->length, this, | |
| 412 &SyncerThread::Unthrottle); | |
| 413 } | |
| 414 | |
| 415 bool SyncerThread::IsSyncingCurrentlySilenced() { | |
| 416 return wait_interval_.get() && wait_interval_->mode == | |
| 417 WaitInterval::THROTTLED; | |
| 418 } | |
| 419 | |
| 420 void SyncerThread::OnReceivedShortPollIntervalUpdate( | |
| 421 const base::TimeDelta& new_interval) { | |
| 422 syncer_short_poll_interval_seconds_ = new_interval; | |
| 423 } | |
| 424 | |
| 425 void SyncerThread::OnReceivedLongPollIntervalUpdate( | |
| 426 const base::TimeDelta& new_interval) { | |
| 427 syncer_long_poll_interval_seconds_ = new_interval; | |
| 428 } | |
| 429 | |
| 430 void SyncerThread::OnShouldStopSyncingPermanently() { | |
| 431 syncer_->RequestEarlyExit(); // Thread-safe. | |
| 432 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); | |
| 433 } | |
| 434 | |
| 435 void SyncerThread::OnServerConnectionEvent( | |
| 436 const ServerConnectionEvent& event) { | |
| 437 NOTIMPLEMENTED(); | |
| 438 } | |
| 439 | |
| 440 } // s3 | |
| 441 } // browser_sync | |
| OLD | NEW |