Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <cstring> | 8 #include <cstring> |
| 9 | 9 |
| 10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/bind_helpers.h" | |
| 12 #include "base/compiler_specific.h" | 13 #include "base/compiler_specific.h" |
| 13 #include "base/location.h" | 14 #include "base/location.h" |
| 14 #include "base/logging.h" | 15 #include "base/logging.h" |
| 15 #include "base/message_loop.h" | 16 #include "base/message_loop.h" |
| 16 #include "sync/engine/backoff_delay_provider.h" | 17 #include "sync/engine/backoff_delay_provider.h" |
| 17 #include "sync/engine/syncer.h" | 18 #include "sync/engine/syncer.h" |
| 18 #include "sync/engine/throttled_data_type_tracker.h" | 19 #include "sync/engine/throttled_data_type_tracker.h" |
| 19 #include "sync/protocol/proto_enum_conversions.h" | 20 #include "sync/protocol/proto_enum_conversions.h" |
| 20 #include "sync/protocol/sync.pb.h" | 21 #include "sync/protocol/sync.pb.h" |
| 21 #include "sync/util/data_type_histogram.h" | 22 #include "sync/util/data_type_histogram.h" |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 76 : source(source), | 77 : source(source), |
| 77 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
| 78 routing_info(routing_info), | 79 routing_info(routing_info), |
| 79 ready_task(ready_task) { | 80 ready_task(ready_task) { |
| 80 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
| 81 } | 82 } |
| 82 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
| 83 | 84 |
| 84 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
| 85 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
| 86 had_nudge(false) { | 87 had_nudge(false), |
| 87 } | 88 pending_configure_job(NULL) {} |
| 89 | |
| 90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
| 91 : mode(mode), had_nudge(false), length(length), | |
| 92 pending_configure_job(NULL) {} | |
| 88 | 93 |
| 89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
| 90 | 95 |
| 91 #define ENUM_CASE(x) case x: return #x; break; | 96 #define ENUM_CASE(x) case x: return #x; break; |
| 92 | 97 |
| 93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
| 94 switch (mode) { | 99 switch (mode) { |
| 95 ENUM_CASE(UNKNOWN); | 100 ENUM_CASE(UNKNOWN); |
| 96 ENUM_CASE(EXPONENTIAL_BACKOFF); | 101 ENUM_CASE(EXPONENTIAL_BACKOFF); |
| 97 ENUM_CASE(THROTTLED); | 102 ENUM_CASE(THROTTLED); |
| 98 } | 103 } |
| 99 NOTREACHED(); | 104 NOTREACHED(); |
| 100 return ""; | 105 return ""; |
| 101 } | 106 } |
| 102 | 107 |
| 103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() | |
| 104 : purpose(UNKNOWN), | |
| 105 is_canary_job(false) { | |
| 106 } | |
| 107 | |
| 108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} | |
| 109 | |
| 110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | |
| 111 base::TimeTicks start, | |
| 112 linked_ptr<sessions::SyncSession> session, | |
| 113 bool is_canary_job, | |
| 114 const ConfigurationParams& config_params, | |
| 115 const tracked_objects::Location& from_here) | |
| 116 : purpose(purpose), | |
| 117 scheduled_start(start), | |
| 118 session(session), | |
| 119 is_canary_job(is_canary_job), | |
| 120 config_params(config_params), | |
| 121 from_here(from_here) { | |
| 122 } | |
| 123 | |
| 124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( | |
| 125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { | |
| 126 switch (purpose) { | |
| 127 ENUM_CASE(UNKNOWN); | |
| 128 ENUM_CASE(POLL); | |
| 129 ENUM_CASE(NUDGE); | |
| 130 ENUM_CASE(CONFIGURATION); | |
| 131 } | |
| 132 NOTREACHED(); | |
| 133 return ""; | |
| 134 } | |
| 135 | |
| 136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
| 137 NudgeSource source) { | 109 NudgeSource source) { |
| 138 switch (source) { | 110 switch (source) { |
| 139 case NUDGE_SOURCE_NOTIFICATION: | 111 case NUDGE_SOURCE_NOTIFICATION: |
| 140 return GetUpdatesCallerInfo::NOTIFICATION; | 112 return GetUpdatesCallerInfo::NOTIFICATION; |
| 141 case NUDGE_SOURCE_LOCAL: | 113 case NUDGE_SOURCE_LOCAL: |
| 142 return GetUpdatesCallerInfo::LOCAL; | 114 return GetUpdatesCallerInfo::LOCAL; |
| 143 case NUDGE_SOURCE_CONTINUATION: | 115 case NUDGE_SOURCE_CONTINUATION: |
| 144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
| 145 case NUDGE_SOURCE_LOCAL_REFRESH: | 117 case NUDGE_SOURCE_LOCAL_REFRESH: |
| 146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 118 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
| 147 case NUDGE_SOURCE_UNKNOWN: | 119 case NUDGE_SOURCE_UNKNOWN: |
| 148 return GetUpdatesCallerInfo::UNKNOWN; | 120 return GetUpdatesCallerInfo::UNKNOWN; |
| 149 default: | 121 default: |
| 150 NOTREACHED(); | 122 NOTREACHED(); |
| 151 return GetUpdatesCallerInfo::UNKNOWN; | 123 return GetUpdatesCallerInfo::UNKNOWN; |
| 152 } | 124 } |
| 153 } | 125 } |
| 154 | 126 |
| 155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
| 156 : mode(mode), had_nudge(false), length(length) { } | |
| 157 | |
| 158 // Helper macros to log with the syncer thread name; useful when there | 127 // Helper macros to log with the syncer thread name; useful when there |
| 159 // are multiple syncer threads involved. | 128 // are multiple syncer threads involved. |
| 160 | 129 |
| 161 #define SLOG(severity) LOG(severity) << name_ << ": " | 130 #define SLOG(severity) LOG(severity) << name_ << ": " |
| 162 | 131 |
| 163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
| 164 | 133 |
| 165 #define SDVLOG_LOC(from_here, verbose_level) \ | 134 #define SDVLOG_LOC(from_here, verbose_level) \ |
| 166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 135 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
| 167 | 136 |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 198 syncer_short_poll_interval_seconds_( | 167 syncer_short_poll_interval_seconds_( |
| 199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
| 200 syncer_long_poll_interval_seconds_( | 169 syncer_long_poll_interval_seconds_( |
| 201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
| 202 sessions_commit_delay_( | 171 sessions_commit_delay_( |
| 203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
| 204 mode_(NORMAL_MODE), | 173 mode_(NORMAL_MODE), |
| 205 // Start with assuming everything is fine with the connection. | 174 // Start with assuming everything is fine with the connection. |
| 206 // At the end of the sync cycle we would have the correct status. | 175 // At the end of the sync cycle we would have the correct status. |
| 207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 176 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
| 177 pending_nudge_(NULL), | |
| 208 delay_provider_(delay_provider), | 178 delay_provider_(delay_provider), |
| 209 syncer_(syncer), | 179 syncer_(syncer), |
| 210 session_context_(context), | 180 session_context_(context), |
| 211 no_scheduling_allowed_(false) { | 181 no_scheduling_allowed_(false) { |
| 212 DCHECK(sync_loop_); | 182 DCHECK(sync_loop_); |
| 213 } | 183 } |
| 214 | 184 |
| 215 SyncSchedulerImpl::~SyncSchedulerImpl() { | 185 SyncSchedulerImpl::~SyncSchedulerImpl() { |
| 216 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 186 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 217 StopImpl(base::Closure()); | 187 StopImpl(base::Closure()); |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 234 void SyncSchedulerImpl::OnConnectionStatusChange() { | 204 void SyncSchedulerImpl::OnConnectionStatusChange() { |
| 235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
| 236 // Optimistically assume that the connection is fixed and try | 206 // Optimistically assume that the connection is fixed and try |
| 237 // connecting. | 207 // connecting. |
| 238 OnServerConnectionErrorFixed(); | 208 OnServerConnectionErrorFixed(); |
| 239 } | 209 } |
| 240 } | 210 } |
| 241 | 211 |
| 242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
| 243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
| 214 // There could be a pending nudge or configuration job in several cases: | |
| 215 // | |
| 216 // 1. We're in exponential backoff. | |
| 217 // 2. We're silenced / throttled. | |
| 218 // 3. A nudge was saved previously due to not having a valid auth token. | |
| 219 // 4. A nudge was scheduled + saved while in configuration mode. | |
| 220 // | |
| 221 // In all cases except (2), we want to retry contacting the server. We | |
| 222 // call DoCanaryJob to achieve this, and note that nothing -- not even a | |
| 223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
| 224 // has the authority to do that is the Unthrottle timer. | |
| 225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
| 226 if (!pending.get()) | |
| 227 return; | |
| 228 | |
| 244 PostTask(FROM_HERE, "DoCanaryJob", | 229 PostTask(FROM_HERE, "DoCanaryJob", |
| 245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 230 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
| 246 weak_ptr_factory_.GetWeakPtr())); | 231 weak_ptr_factory_.GetWeakPtr(), |
| 247 | 232 base::Passed(&pending))); |
| 248 } | 233 } |
| 249 | 234 |
| 250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
| 251 HttpResponse::ServerConnectionCode code) { | 236 HttpResponse::ServerConnectionCode code) { |
| 252 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 237 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 253 SDVLOG(2) << "New server connection code: " | 238 SDVLOG(2) << "New server connection code: " |
| 254 << HttpResponse::GetServerConnectionCodeString(code); | 239 << HttpResponse::GetServerConnectionCodeString(code); |
| 255 | 240 |
| 256 connection_code_ = code; | 241 connection_code_ = code; |
| 257 } | 242 } |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 273 Mode old_mode = mode_; | 258 Mode old_mode = mode_; |
| 274 mode_ = mode; | 259 mode_ = mode; |
| 275 AdjustPolling(NULL); // Will kick start poll timer if needed. | 260 AdjustPolling(NULL); // Will kick start poll timer if needed. |
| 276 | 261 |
| 277 if (old_mode != mode_) { | 262 if (old_mode != mode_) { |
| 278 // We just changed our mode. See if there are any pending jobs that we could | 263 // We just changed our mode. See if there are any pending jobs that we could |
| 279 // execute in the new mode. | 264 // execute in the new mode. |
| 280 if (mode_ == NORMAL_MODE) { | 265 if (mode_ == NORMAL_MODE) { |
| 281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
| 282 // has not yet completed. | 267 // has not yet completed. |
| 283 DCHECK(!wait_interval_.get() || | 268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
| 284 !wait_interval_->pending_configure_job.get()); | |
| 285 } | 269 } |
| 286 | 270 |
| 287 DoPendingJobIfPossible(false); | 271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
| 272 if (pending.get()) { | |
| 273 // TODO(tim): We should be able to remove this... | |
| 274 scoped_ptr<SyncSession> session(CreateSyncSession( | |
| 275 pending->session()->source())); | |
| 276 // Also the routing info might have been changed since we cached the | |
| 277 // pending nudge. Update it by coalescing to the latest. | |
| 278 pending->mutable_session()->Coalesce(*(session)); | |
| 279 SDVLOG(2) << "Executing pending job. Good luck!"; | |
| 280 DoSyncSessionJob(pending.Pass()); | |
| 281 } | |
| 288 } | 282 } |
| 289 } | 283 } |
| 290 | 284 |
| 291 void SyncSchedulerImpl::SendInitialSnapshot() { | 285 void SyncSchedulerImpl::SendInitialSnapshot() { |
| 292 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 286 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
| 294 SyncSourceInfo(), ModelSafeRoutingInfo(), | 288 SyncSourceInfo(), ModelSafeRoutingInfo(), |
| 295 std::vector<ModelSafeWorker*>())); | 289 std::vector<ModelSafeWorker*>())); |
| 296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
| 297 event.snapshot = dummy->TakeSnapshot(); | 291 event.snapshot = dummy->TakeSnapshot(); |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 321 bool SyncSchedulerImpl::ScheduleConfiguration( | 315 bool SyncSchedulerImpl::ScheduleConfiguration( |
| 322 const ConfigurationParams& params) { | 316 const ConfigurationParams& params) { |
| 323 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 317 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
| 325 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 319 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
| 326 DCHECK(!params.ready_task.is_null()); | 320 DCHECK(!params.ready_task.is_null()); |
| 327 SDVLOG(2) << "Reconfiguring syncer."; | 321 SDVLOG(2) << "Reconfiguring syncer."; |
| 328 | 322 |
| 329 // Only one configuration is allowed at a time. Verify we're not waiting | 323 // Only one configuration is allowed at a time. Verify we're not waiting |
| 330 // for a pending configure job. | 324 // for a pending configure job. |
| 331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | 325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
| 332 | 326 |
| 333 ModelSafeRoutingInfo restricted_routes; | 327 ModelSafeRoutingInfo restricted_routes; |
| 334 BuildModelSafeParams(params.types_to_download, | 328 BuildModelSafeParams(params.types_to_download, |
| 335 params.routing_info, | 329 params.routing_info, |
| 336 &restricted_routes); | 330 &restricted_routes); |
| 337 session_context_->set_routing_info(params.routing_info); | 331 session_context_->set_routing_info(params.routing_info); |
| 338 | 332 |
| 339 // Only reconfigure if we have types to download. | 333 // Only reconfigure if we have types to download. |
| 340 if (!params.types_to_download.Empty()) { | 334 if (!params.types_to_download.Empty()) { |
| 341 DCHECK(!restricted_routes.empty()); | 335 DCHECK(!restricted_routes.empty()); |
| 342 linked_ptr<SyncSession> session(new SyncSession( | 336 scoped_ptr<SyncSession> session(new SyncSession( |
| 343 session_context_, | 337 session_context_, |
| 344 this, | 338 this, |
| 345 SyncSourceInfo(params.source, | 339 SyncSourceInfo(params.source, |
| 346 ModelSafeRoutingInfoToInvalidationMap( | 340 ModelSafeRoutingInfoToInvalidationMap( |
| 347 restricted_routes, | 341 restricted_routes, |
| 348 std::string())), | 342 std::string())), |
| 349 restricted_routes, | 343 restricted_routes, |
| 350 session_context_->workers())); | 344 session_context_->workers())); |
| 351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | 345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
| 352 TimeTicks::Now(), | 346 SyncSessionJob::CONFIGURATION, |
| 353 session, | 347 TimeTicks::Now(), |
| 354 false, | 348 session.Pass(), |
| 355 params, | 349 params, |
| 356 FROM_HERE); | 350 FROM_HERE)); |
| 357 DoSyncSessionJob(job); | 351 bool succeeded = DoSyncSessionJob(job.Pass()); |
| 358 | 352 |
| 359 // If we failed, the job would have been saved as the pending configure | 353 // If we failed, the job would have been saved as the pending configure |
| 360 // job and a wait interval would have been set. | 354 // job and a wait interval would have been set. |
| 361 if (!session->Succeeded()) { | 355 if (!succeeded) { |
| 362 DCHECK(wait_interval_.get() && | 356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
| 363 wait_interval_->pending_configure_job.get()); | |
| 364 return false; | 357 return false; |
| 365 } | 358 } |
| 366 } else { | 359 } else { |
| 367 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 360 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
| 368 params.ready_task.Run(); | 361 params.ready_task.Run(); |
| 369 } | 362 } |
| 370 | 363 |
| 371 return true; | 364 return true; |
| 372 } | 365 } |
| 373 | 366 |
| 374 SyncSchedulerImpl::JobProcessDecision | 367 SyncSchedulerImpl::JobProcessDecision |
| 375 SyncSchedulerImpl::DecideWhileInWaitInterval( | 368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) { |
|
akalin
2012/10/17 17:39:17
why does this need a pointer?
tim (not reviewing)
2012/10/17 20:33:32
Every callsite has a pointer, so it seems simpler
akalin
2012/10/17 22:55:45
I don't know...const references are the usual styl
| |
| 376 const SyncSessionJob& job) { | |
| 377 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 369 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 378 DCHECK(wait_interval_.get()); | 370 DCHECK(wait_interval_.get()); |
| 379 | 371 |
| 380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
| 381 << WaitInterval::GetModeString(wait_interval_->mode) | 373 << WaitInterval::GetModeString(wait_interval_->mode) |
| 382 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 374 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
| 383 << (job.is_canary_job ? " (canary)" : ""); | 375 << (job->is_canary() ? " (canary)" : ""); |
| 384 | 376 |
| 385 if (job.purpose == SyncSessionJob::POLL) | 377 if (job->purpose() == SyncSessionJob::POLL) |
| 386 return DROP; | 378 return DROP; |
| 387 | 379 |
| 388 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 380 // If we save a job while in a WaitInterval, there is a well-defined moment |
| 389 job.purpose == SyncSessionJob::CONFIGURATION); | 381 // in time in the future when it makes sense for that SAVE-worthy job to try |
| 382 // running again -- the end of the WaitInterval. | |
| 383 DCHECK(job->purpose() == SyncSessionJob::NUDGE || | |
| 384 job->purpose() == SyncSessionJob::CONFIGURATION); | |
| 385 | |
| 386 // If throttled, there's a clock ticking to unthrottle. We want to get | |
| 387 // on the same train. | |
| 390 if (wait_interval_->mode == WaitInterval::THROTTLED) | 388 if (wait_interval_->mode == WaitInterval::THROTTLED) |
| 391 return SAVE; | 389 return SAVE; |
| 392 | 390 |
| 393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| 394 if (job.purpose == SyncSessionJob::NUDGE) { | 392 if (job->purpose() == SyncSessionJob::NUDGE) { |
| 395 if (mode_ == CONFIGURATION_MODE) | 393 if (mode_ == CONFIGURATION_MODE) |
| 396 return SAVE; | 394 return SAVE; |
| 397 | 395 |
| 398 // If we already had one nudge then just drop this nudge. We will retry | 396 // If we already had one nudge then just drop this nudge. We will retry |
| 399 // later when the timer runs out. | 397 // later when the timer runs out. |
| 400 if (!job.is_canary_job) | 398 if (!job->is_canary()) |
| 401 return wait_interval_->had_nudge ? DROP : CONTINUE; | 399 return wait_interval_->had_nudge ? DROP : CONTINUE; |
| 402 else // We are here because timer ran out. So retry. | 400 else // We are here because timer ran out. So retry. |
| 403 return CONTINUE; | 401 return CONTINUE; |
| 404 } | 402 } |
| 405 return job.is_canary_job ? CONTINUE : SAVE; | 403 return job->is_canary() ? CONTINUE : SAVE; |
| 406 } | 404 } |
| 407 | 405 |
| 408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
| 409 const SyncSessionJob& job) { | 407 const SyncSessionJob* job) { |
|
akalin
2012/10/17 17:39:17
why does this need a pointer?
tim (not reviewing)
2012/10/17 20:33:32
Every callsite has a pointer, so it seems simpler
akalin
2012/10/17 22:55:45
same.
| |
| 410 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 408 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 411 | 409 |
| 412 // See if our type is throttled. | 410 // See if our type is throttled. |
| 413 ModelTypeSet throttled_types = | 411 ModelTypeSet throttled_types = |
| 414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 412 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
| 415 if (job.purpose == SyncSessionJob::NUDGE && | 413 if (job->purpose() == SyncSessionJob::NUDGE && |
| 416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 414 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
| 417 ModelTypeSet requested_types; | 415 ModelTypeSet requested_types; |
| 418 for (ModelTypeInvalidationMap::const_iterator i = | 416 for (ModelTypeInvalidationMap::const_iterator i = |
| 419 job.session->source().types.begin(); | 417 job->session()->source().types.begin(); |
| 420 i != job.session->source().types.end(); | 418 i != job->session()->source().types.end(); |
| 421 ++i) { | 419 ++i) { |
| 422 requested_types.Put(i->first); | 420 requested_types.Put(i->first); |
| 423 } | 421 } |
| 424 | 422 |
| 423 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
| 424 // a per-datatype "unthrottle" event as something that should force a | |
| 425 // canary job. For this reason, there's no good time to reschedule this job | |
| 426 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
| 427 // Note that there may already be such an event if we're in a WaitInterval, | |
| 428 // so we can retry it then. | |
| 425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
| 426 return SAVE; | 430 return SAVE; |
| 427 } | 431 } |
| 428 | 432 |
| 429 if (wait_interval_.get()) | 433 if (wait_interval_.get()) |
| 430 return DecideWhileInWaitInterval(job); | 434 return DecideWhileInWaitInterval(job); |
| 431 | 435 |
| 432 if (mode_ == CONFIGURATION_MODE) { | 436 if (mode_ == CONFIGURATION_MODE) { |
| 433 if (job.purpose == SyncSessionJob::NUDGE) | 437 if (job->purpose() == SyncSessionJob::NUDGE) |
| 434 return SAVE; | 438 return SAVE; // Running requires a mode switch. |
| 435 else if (job.purpose == SyncSessionJob::CONFIGURATION) | 439 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
| 436 return CONTINUE; | 440 return CONTINUE; |
| 437 else | 441 else |
| 438 return DROP; | 442 return DROP; |
| 439 } | 443 } |
| 440 | 444 |
| 441 // We are in normal mode. | 445 // We are in normal mode. |
| 442 DCHECK_EQ(mode_, NORMAL_MODE); | 446 DCHECK_EQ(mode_, NORMAL_MODE); |
| 443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 447 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION); |
| 444 | 448 |
| 445 // Note about some subtle scheduling semantics. | 449 // Note about some subtle scheduling semantics. |
| 446 // | 450 // |
| 447 // It's possible at this point that |job| is known to be unnecessary, and | 451 // It's possible at this point that |job| is known to be unnecessary, and |
| 448 // dropping it would be perfectly safe and correct. Consider | 452 // dropping it would be perfectly safe and correct. Consider |
| 449 // | 453 // |
| 450 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 454 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
| 451 // the time that the last successful all-datatype NUDGE completed. | 455 // the time that the last successful all-datatype NUDGE completed. |
| 452 // | 456 // |
| 453 // 2) |job| is a NUDGE (for any combination of types) with a | 457 // 2) |job| is a NUDGE (for any combination of types) with a |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 476 // * It's not strictly "impossible", but it would be reentrant and hence | 480 // * It's not strictly "impossible", but it would be reentrant and hence |
| 477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
| 478 // legal side effect of any of the work being done as part of a sync cycle. | 482 // legal side effect of any of the work being done as part of a sync cycle. |
| 479 // See |no_scheduling_allowed_| for details. | 483 // See |no_scheduling_allowed_| for details. |
| 480 | 484 |
| 481 // Decision now rests on state of auth tokens. | 485 // Decision now rests on state of auth tokens. |
| 482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 486 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
| 483 return CONTINUE; | 487 return CONTINUE; |
| 484 | 488 |
| 485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 489 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
| 486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 490 // Running the job would require updated auth, so we can't honour |
| 491 // job.scheduled_start(). | |
| 492 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
| 487 } | 493 } |
| 488 | 494 |
| 489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 495 bool SyncSchedulerImpl::ShouldRunJobSaveIfNecessary(SyncSessionJob* job) { |
|
akalin
2012/10/17 17:39:17
the name for this is unclear -- "ShouldRunJob" imp
tim (not reviewing)
2012/10/17 20:33:32
SaveIfNecessary works, although there's also the c
akalin
2012/10/17 22:55:45
I see. See my comment below.
| |
| 490 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
| 491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | |
| 492 if (pending_nudge_.get() == NULL) { | |
| 493 SDVLOG(2) << "Creating a pending nudge job"; | |
| 494 SyncSession* s = job.session.get(); | |
| 495 | |
| 496 // Get a fresh session with similar configuration as before (resets | |
| 497 // StatusController). | |
| 498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | |
| 499 s->delegate(), s->source(), s->routing_info(), s->workers())); | |
| 500 | |
| 501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | |
| 502 make_linked_ptr(session.release()), false, | |
| 503 ConfigurationParams(), job.from_here); | |
| 504 pending_nudge_.reset(new SyncSessionJob(new_job)); | |
| 505 return; | |
| 506 } | |
| 507 | |
| 508 SDVLOG(2) << "Coalescing a pending nudge"; | |
| 509 pending_nudge_->session->Coalesce(*(job.session.get())); | |
| 510 pending_nudge_->scheduled_start = job.scheduled_start; | |
| 511 | |
| 512 // Unfortunately the nudge location cannot be modified. So it stores the | |
| 513 // location of the first caller. | |
| 514 } | |
| 515 | |
| 516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { | |
| 517 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 496 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 518 DCHECK(started_); | 497 DCHECK(started_); |
| 519 | 498 |
| 520 JobProcessDecision decision = DecideOnJob(job); | 499 JobProcessDecision decision = DecideOnJob(job); |
| 521 SDVLOG(2) << "Should run " | 500 SDVLOG(2) << "Should run " |
| 522 << SyncSessionJob::GetPurposeString(job.purpose) | 501 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 523 << " job in mode " << GetModeString(mode_) | 502 << " job " << job->session() |
| 503 << " in mode " << GetModeString(mode_) | |
| 524 << ": " << GetDecisionString(decision); | 504 << ": " << GetDecisionString(decision); |
| 525 if (decision != SAVE) | 505 if (decision == DROP) |
| 526 return decision == CONTINUE; | 506 return false; |
| 507 if (decision == CONTINUE) | |
| 508 return true; | |
| 509 DCHECK_EQ(decision, SAVE); | |
| 527 | 510 |
| 528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 511 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
| 529 SyncSessionJob::CONFIGURATION); | 512 if (is_nudge && pending_nudge_) { |
| 513 SDVLOG(2) << "Coalescing a pending nudge"; | |
| 514 // TODO(tim): This basically means we never use the more-careful coalescing | |
|
rlarocque
2012/10/08 19:18:05
After some more thought, I'm not sure this ever ma
| |
| 515 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
| 516 // times, because we're calling this function first. Pull this out | |
| 517 // into a function to coalesce + set start times and reuse. | |
| 518 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | |
| 519 return false; | |
| 520 } | |
| 530 | 521 |
| 531 SaveJob(job); | 522 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); |
| 523 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
| 524 // This job should be made the new canary. | |
| 525 if (is_nudge) { | |
| 526 pending_nudge_ = job_to_save.get(); | |
| 527 } else { | |
| 528 SDVLOG(2) << "Saving a configuration job"; | |
| 529 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
| 530 DCHECK(!wait_interval_->pending_configure_job); | |
| 531 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
| 532 DCHECK(!job->config_params().ready_task.is_null()); | |
| 533 // The only nudge that could exist is a scheduled canary nudge. | |
| 534 DCHECK(!unscheduled_nudge_storage_.get()); | |
| 535 if (pending_nudge_) { | |
| 536 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
| 537 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); | |
| 538 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
| 539 } | |
| 540 wait_interval_->pending_configure_job = job_to_save.get(); | |
| 541 } | |
| 542 TimeDelta length = | |
| 543 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
| 544 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
| 545 TimeDelta::FromSeconds(0) : length; | |
| 546 RestartWaiting(job_to_save.Pass()); | |
| 547 return false; | |
| 548 } | |
| 549 | |
| 550 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
| 551 // when we're not in a WaitInterval. See bug 147736. | |
| 552 DCHECK(is_nudge); | |
| 553 // There may or may not be a pending_configure_job. Either way this nudge | |
| 554 // is unschedulable. | |
| 555 pending_nudge_ = job_to_save.get(); | |
| 556 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
| 532 return false; | 557 return false; |
| 533 } | 558 } |
| 534 | 559 |
| 535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { | |
| 536 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
| 537 if (job.purpose == SyncSessionJob::NUDGE) { | |
| 538 SDVLOG(2) << "Saving a nudge job"; | |
| 539 InitOrCoalescePendingJob(job); | |
| 540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | |
| 541 SDVLOG(2) << "Saving a configuration job"; | |
| 542 DCHECK(wait_interval_.get()); | |
| 543 DCHECK(mode_ == CONFIGURATION_MODE); | |
| 544 | |
| 545 // Config params should always get set. | |
| 546 DCHECK(!job.config_params.ready_task.is_null()); | |
| 547 SyncSession* old = job.session.get(); | |
| 548 SyncSession* s(new SyncSession(session_context_, this, old->source(), | |
| 549 old->routing_info(), old->workers())); | |
| 550 SyncSessionJob new_job(job.purpose, | |
| 551 TimeTicks::Now(), | |
| 552 make_linked_ptr(s), | |
| 553 false, | |
| 554 job.config_params, | |
| 555 job.from_here); | |
| 556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | |
| 557 } // drop the rest. | |
| 558 // TODO(sync): Is it okay to drop the rest? It's weird that | |
| 559 // SaveJob() only does what it says sometimes. (See | |
| 560 // http://crbug.com/90868.) | |
| 561 } | |
| 562 | |
| 563 // Functor for std::find_if to search by ModelSafeGroup. | 560 // Functor for std::find_if to search by ModelSafeGroup. |
| 564 struct ModelSafeWorkerGroupIs { | 561 struct ModelSafeWorkerGroupIs { |
| 565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 562 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
| 566 bool operator()(ModelSafeWorker* w) { | 563 bool operator()(ModelSafeWorker* w) { |
| 567 return group == w->GetModelSafeGroup(); | 564 return group == w->GetModelSafeGroup(); |
| 568 } | 565 } |
| 569 ModelSafeGroup group; | 566 ModelSafeGroup group; |
| 570 }; | 567 }; |
| 571 | 568 |
| 572 void SyncSchedulerImpl::ScheduleNudgeAsync( | 569 void SyncSchedulerImpl::ScheduleNudgeAsync( |
| 573 const TimeDelta& delay, | 570 const TimeDelta& desired_delay, |
| 574 NudgeSource source, ModelTypeSet types, | 571 NudgeSource source, ModelTypeSet types, |
| 575 const tracked_objects::Location& nudge_location) { | 572 const tracked_objects::Location& nudge_location) { |
| 576 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 573 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 577 SDVLOG_LOC(nudge_location, 2) | 574 SDVLOG_LOC(nudge_location, 2) |
| 578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 575 << "Nudge scheduled with delay " |
| 576 << desired_delay.InMilliseconds() << " ms, " | |
| 579 << "source " << GetNudgeSourceString(source) << ", " | 577 << "source " << GetNudgeSourceString(source) << ", " |
| 580 << "types " << ModelTypeSetToString(types); | 578 << "types " << ModelTypeSetToString(types); |
| 581 | 579 |
| 582 ModelTypeInvalidationMap invalidation_map = | 580 ModelTypeInvalidationMap invalidation_map = |
| 583 ModelTypeSetToInvalidationMap(types, std::string()); | 581 ModelTypeSetToInvalidationMap(types, std::string()); |
| 584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 582 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
| 585 GetUpdatesFromNudgeSource(source), | 583 GetUpdatesFromNudgeSource(source), |
| 586 invalidation_map, | 584 invalidation_map, |
| 587 false, | |
| 588 nudge_location); | 585 nudge_location); |
| 589 } | 586 } |
| 590 | 587 |
| 591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 588 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
| 592 const TimeDelta& delay, | 589 const TimeDelta& desired_delay, |
| 593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, | 590 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, |
| 594 const tracked_objects::Location& nudge_location) { | 591 const tracked_objects::Location& nudge_location) { |
| 595 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 592 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 596 SDVLOG_LOC(nudge_location, 2) | 593 SDVLOG_LOC(nudge_location, 2) |
| 597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 594 << "Nudge scheduled with delay " |
| 595 << desired_delay.InMilliseconds() << " ms, " | |
| 598 << "source " << GetNudgeSourceString(source) << ", " | 596 << "source " << GetNudgeSourceString(source) << ", " |
| 599 << "payloads " | 597 << "payloads " |
| 600 << ModelTypeInvalidationMapToString(invalidation_map); | 598 << ModelTypeInvalidationMapToString(invalidation_map); |
| 601 | 599 |
| 602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 600 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
| 603 GetUpdatesFromNudgeSource(source), | 601 GetUpdatesFromNudgeSource(source), |
| 604 invalidation_map, | 602 invalidation_map, |
| 605 false, | |
| 606 nudge_location); | 603 nudge_location); |
| 607 } | 604 } |
| 608 | 605 |
| 609 void SyncSchedulerImpl::ScheduleNudgeImpl( | 606 void SyncSchedulerImpl::ScheduleNudgeImpl( |
| 610 const TimeDelta& delay, | 607 const TimeDelta& delay, |
| 611 GetUpdatesCallerInfo::GetUpdatesSource source, | 608 GetUpdatesCallerInfo::GetUpdatesSource source, |
| 612 const ModelTypeInvalidationMap& invalidation_map, | 609 const ModelTypeInvalidationMap& invalidation_map, |
| 613 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 610 const tracked_objects::Location& nudge_location) { |
| 614 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 611 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; | 612 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; |
| 616 | 613 |
| 617 SDVLOG_LOC(nudge_location, 2) | 614 SDVLOG_LOC(nudge_location, 2) |
| 618 << "In ScheduleNudgeImpl with delay " | 615 << "In ScheduleNudgeImpl with delay " |
| 619 << delay.InMilliseconds() << " ms, " | 616 << delay.InMilliseconds() << " ms, " |
| 620 << "source " << GetUpdatesSourceString(source) << ", " | 617 << "source " << GetUpdatesSourceString(source) << ", " |
| 621 << "payloads " | 618 << "payloads " |
| 622 << ModelTypeInvalidationMapToString(invalidation_map) | 619 << ModelTypeInvalidationMapToString(invalidation_map); |
| 623 << (is_canary_job ? " (canary)" : ""); | |
| 624 | 620 |
| 625 SyncSourceInfo info(source, invalidation_map); | 621 SyncSourceInfo info(source, invalidation_map); |
| 626 UpdateNudgeTimeRecords(info); | 622 UpdateNudgeTimeRecords(info); |
| 627 | 623 |
| 628 SyncSession* session(CreateSyncSession(info)); | 624 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
| 629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 625 SyncSessionJob::NUDGE, |
| 630 make_linked_ptr(session), is_canary_job, | 626 TimeTicks::Now() + delay, |
| 631 ConfigurationParams(), nudge_location); | 627 CreateSyncSession(info).Pass(), |
| 628 ConfigurationParams(), | |
| 629 nudge_location)); | |
| 632 | 630 |
| 633 session = NULL; | 631 if (!ShouldRunJobSaveIfNecessary(job.get())) |
|
akalin
2012/10/17 17:39:17
Naively, I'd expect to be able to split this call
tim (not reviewing)
2012/10/17 20:33:32
Yes. !ShouldRunJob doesn't mean you can forget ab
akalin
2012/10/17 22:55:45
I see. See comment below...
| |
| 634 if (!ShouldRunJob(job)) | |
| 635 return; | 632 return; |
| 636 | 633 |
| 637 if (pending_nudge_.get()) { | 634 if (pending_nudge_) { |
| 638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | |
| 639 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | |
| 640 return; | |
| 641 } | |
| 642 | |
| 643 SDVLOG(2) << "Coalescing pending nudge"; | |
| 644 pending_nudge_->session->Coalesce(*(job.session.get())); | |
| 645 | |
| 646 SDVLOG(2) << "Rescheduling pending nudge"; | 635 SDVLOG(2) << "Rescheduling pending nudge"; |
| 647 SyncSession* s = pending_nudge_->session.get(); | 636 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
| 648 job.session.reset(new SyncSession(s->context(), s->delegate(), | 637 // Choose the start time as the earliest of the 2. Note that this means |
| 649 s->source(), s->routing_info(), s->workers())); | 638 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
| 650 | 639 // but a nudge is already scheduled to go out, we'll send the (tab) commit |
| 651 // Choose the start time as the earliest of the 2. | 640 // without waiting. |
| 652 job.scheduled_start = std::min(job.scheduled_start, | 641 pending_nudge_->set_scheduled_start( |
| 653 pending_nudge_->scheduled_start); | 642 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
| 654 pending_nudge_.reset(); | 643 // Abandon the old task by cloning and replacing the session. |
| 644 // It's possible that by "rescheduling" we're actually taking a job that | |
| 645 // was previously unscheduled and giving it wings, so take care to reset | |
| 646 // unscheduled nudge storage. | |
| 647 job = pending_nudge_->CloneAndAbandon(); | |
| 648 unscheduled_nudge_storage_.reset(); | |
| 649 pending_nudge_ = NULL; | |
| 655 } | 650 } |
| 656 | 651 |
| 657 // TODO(zea): Consider adding separate throttling/backoff for datatype | 652 // TODO(zea): Consider adding separate throttling/backoff for datatype |
| 658 // refresh requests. | 653 // refresh requests. |
| 659 ScheduleSyncSessionJob(job); | 654 ScheduleSyncSessionJob(job.Pass()); |
|
akalin
2012/10/17 17:39:17
Isn't the job abandoned at this point? Do we stil
tim (not reviewing)
2012/10/17 20:33:32
Err, did you mean to put this comment elsewhere? T
akalin
2012/10/17 22:55:45
Right, I was misunderstanding the flow.
So it loo
| |
| 660 } | 655 } |
| 661 | 656 |
| 662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 657 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
| 663 switch (mode) { | 658 switch (mode) { |
| 664 ENUM_CASE(CONFIGURATION_MODE); | 659 ENUM_CASE(CONFIGURATION_MODE); |
| 665 ENUM_CASE(NORMAL_MODE); | 660 ENUM_CASE(NORMAL_MODE); |
| 666 } | 661 } |
| 667 return ""; | 662 return ""; |
| 668 } | 663 } |
| 669 | 664 |
| 670 const char* SyncSchedulerImpl::GetDecisionString( | 665 const char* SyncSchedulerImpl::GetDecisionString( |
| 671 SyncSchedulerImpl::JobProcessDecision mode) { | 666 SyncSchedulerImpl::JobProcessDecision mode) { |
| 672 switch (mode) { | 667 switch (mode) { |
| 673 ENUM_CASE(CONTINUE); | 668 ENUM_CASE(CONTINUE); |
| 674 ENUM_CASE(SAVE); | 669 ENUM_CASE(SAVE); |
| 675 ENUM_CASE(DROP); | 670 ENUM_CASE(DROP); |
| 676 } | 671 } |
| 677 return ""; | 672 return ""; |
| 678 } | 673 } |
| 679 | 674 |
| 680 // static | |
| 681 void SyncSchedulerImpl::SetSyncerStepsForPurpose( | |
| 682 SyncSessionJob::SyncSessionJobPurpose purpose, | |
| 683 SyncerStep* start, | |
| 684 SyncerStep* end) { | |
| 685 switch (purpose) { | |
| 686 case SyncSessionJob::CONFIGURATION: | |
| 687 *start = DOWNLOAD_UPDATES; | |
| 688 *end = APPLY_UPDATES; | |
| 689 return; | |
| 690 case SyncSessionJob::NUDGE: | |
| 691 case SyncSessionJob::POLL: | |
| 692 *start = SYNCER_BEGIN; | |
| 693 *end = SYNCER_END; | |
| 694 return; | |
| 695 default: | |
| 696 NOTREACHED(); | |
| 697 *start = SYNCER_END; | |
| 698 *end = SYNCER_END; | |
| 699 return; | |
| 700 } | |
| 701 } | |
| 702 | |
| 703 void SyncSchedulerImpl::PostTask( | 675 void SyncSchedulerImpl::PostTask( |
| 704 const tracked_objects::Location& from_here, | 676 const tracked_objects::Location& from_here, |
| 705 const char* name, const base::Closure& task) { | 677 const char* name, const base::Closure& task) { |
| 706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 678 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
| 707 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 679 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 708 if (!started_) { | 680 if (!started_) { |
| 709 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 681 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
| 710 return; | 682 return; |
| 711 } | 683 } |
| 712 sync_loop_->PostTask(from_here, task); | 684 sync_loop_->PostTask(from_here, task); |
| 713 } | 685 } |
| 714 | 686 |
| 715 void SyncSchedulerImpl::PostDelayedTask( | 687 void SyncSchedulerImpl::PostDelayedTask( |
| 716 const tracked_objects::Location& from_here, | 688 const tracked_objects::Location& from_here, |
| 717 const char* name, const base::Closure& task, base::TimeDelta delay) { | 689 const char* name, const base::Closure& task, base::TimeDelta delay) { |
| 718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 690 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
| 719 << delay.InMilliseconds() << " ms delay"; | 691 << delay.InMilliseconds() << " ms delay"; |
| 720 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 692 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 721 if (!started_) { | 693 if (!started_) { |
| 722 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 694 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
| 723 return; | 695 return; |
| 724 } | 696 } |
| 725 sync_loop_->PostDelayedTask(from_here, task, delay); | 697 sync_loop_->PostDelayedTask(from_here, task, delay); |
| 726 } | 698 } |
| 727 | 699 |
| 728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { | 700 void SyncSchedulerImpl::ScheduleSyncSessionJob( |
| 701 scoped_ptr<SyncSessionJob> job) { | |
| 729 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 702 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 730 if (no_scheduling_allowed_) { | 703 if (no_scheduling_allowed_) { |
| 731 NOTREACHED() << "Illegal to schedule job while session in progress."; | 704 NOTREACHED() << "Illegal to schedule job while session in progress."; |
| 732 return; | 705 return; |
| 733 } | 706 } |
| 734 | 707 |
| 735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); | 708 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); |
| 709 tracked_objects::Location loc(job->from_location()); | |
| 736 if (delay < TimeDelta::FromMilliseconds(0)) | 710 if (delay < TimeDelta::FromMilliseconds(0)) |
| 737 delay = TimeDelta::FromMilliseconds(0); | 711 delay = TimeDelta::FromMilliseconds(0); |
| 738 SDVLOG_LOC(job.from_here, 2) | 712 SDVLOG_LOC(loc, 2) |
| 739 << "In ScheduleSyncSessionJob with " | 713 << "In ScheduleSyncSessionJob with " |
| 740 << SyncSessionJob::GetPurposeString(job.purpose) | 714 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 741 << " job and " << delay.InMilliseconds() << " ms delay"; | 715 << " job and " << delay.InMilliseconds() << " ms delay"; |
| 742 | 716 |
| 743 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 717 DCHECK(job->purpose() == SyncSessionJob::NUDGE || |
| 744 job.purpose == SyncSessionJob::POLL); | 718 job->purpose() == SyncSessionJob::POLL); |
| 745 if (job.purpose == SyncSessionJob::NUDGE) { | 719 if (job->purpose() == SyncSessionJob::NUDGE) { |
| 746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; | 720 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to "; |
| 747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == | 721 DCHECK(!pending_nudge_ || pending_nudge_->session() == |
| 748 job.session); | 722 job->session()); |
| 749 pending_nudge_.reset(new SyncSessionJob(job)); | 723 pending_nudge_ = job.get(); |
| 750 } | 724 } |
| 751 PostDelayedTask(job.from_here, "DoSyncSessionJob", | 725 |
| 752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, | 726 PostDelayedTask(loc, "DoSyncSessionJob", |
| 753 weak_ptr_factory_.GetWeakPtr(), | 727 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
| 754 job), | 728 weak_ptr_factory_.GetWeakPtr(), |
| 755 delay); | 729 base::Passed(&job)), |
| 730 delay); | |
| 756 } | 731 } |
| 757 | 732 |
| 758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { | 733 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
| 759 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 734 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 735 if (job->purpose() == SyncSessionJob::NUDGE) { | |
| 736 if (pending_nudge_ == NULL || | |
| 737 pending_nudge_->session() != job->session()) { | |
| 738 // |job| is abandoned. | |
| 739 SDVLOG(2) << "Dropping a nudge in " | |
| 740 << "DoSyncSessionJob because another nudge was scheduled"; | |
| 741 return false; | |
| 742 } | |
| 743 pending_nudge_ = NULL; | |
| 744 | |
| 745 // Rebase the session with the latest model safe table and use it to purge | |
| 746 // and update any disabled or modified entries in the job. | |
| 747 job->mutable_session()->RebaseRoutingInfoWithLatest( | |
| 748 session_context_->routing_info(), session_context_->workers()); | |
| 749 } | |
| 760 | 750 |
| 761 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 751 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 762 if (!ShouldRunJob(job)) { | 752 GetUpdatesCallerInfo::GetUpdatesSource source( |
| 753 job->session()->source().updates_source); | |
| 754 if (!ShouldRunJobSaveIfNecessary(job.get())) { | |
| 763 SLOG(WARNING) | 755 SLOG(WARNING) |
| 764 << "Not executing " | 756 << "Not executing " |
| 765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " | 757 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from " |
| 766 << GetUpdatesSourceString(job.session->source().updates_source); | 758 << GetUpdatesSourceString(source); |
| 767 return; | 759 return false; |
| 768 } | 760 } |
| 769 | 761 |
| 770 if (job.purpose == SyncSessionJob::NUDGE) { | |
| 771 if (pending_nudge_.get() == NULL || | |
| 772 pending_nudge_->session != job.session) { | |
| 773 SDVLOG(2) << "Dropping a nudge in " | |
| 774 << "DoSyncSessionJob because another nudge was scheduled"; | |
| 775 return; // Another nudge must have been scheduled in in the meantime. | |
| 776 } | |
| 777 pending_nudge_.reset(); | |
| 778 | |
| 779 // Create the session with the latest model safe table and use it to purge | |
| 780 // and update any disabled or modified entries in the job. | |
| 781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | |
| 782 | |
| 783 job.session->RebaseRoutingInfoWithLatest(*session); | |
| 784 } | |
| 785 SDVLOG(2) << "DoSyncSessionJob with " | 762 SDVLOG(2) << "DoSyncSessionJob with " |
| 786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; | 763 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; |
| 787 | |
| 788 SyncerStep begin(SYNCER_END); | |
| 789 SyncerStep end(SYNCER_END); | |
| 790 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | |
| 791 | 764 |
| 792 bool has_more_to_sync = true; | 765 bool has_more_to_sync = true; |
| 793 while (ShouldRunJob(job) && has_more_to_sync) { | 766 bool premature_exit = false; |
| 767 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) { | |
| 794 SDVLOG(2) << "Calling SyncShare."; | 768 SDVLOG(2) << "Calling SyncShare."; |
| 795 // Synchronously perform the sync session from this thread. | 769 // Synchronously perform the sync session from this thread. |
| 796 syncer_->SyncShare(job.session.get(), begin, end); | 770 premature_exit = !syncer_->SyncShare(job->mutable_session(), |
| 797 has_more_to_sync = job.session->HasMoreToSync(); | 771 job->start_step(), |
| 772 job->end_step()); | |
| 773 | |
| 774 has_more_to_sync = job->session()->HasMoreToSync(); | |
| 798 if (has_more_to_sync) | 775 if (has_more_to_sync) |
| 799 job.session->PrepareForAnotherSyncCycle(); | 776 job->mutable_session()->PrepareForAnotherSyncCycle(); |
| 800 } | 777 } |
| 801 SDVLOG(2) << "Done SyncShare looping."; | 778 SDVLOG(2) << "Done SyncShare looping."; |
| 802 | 779 |
| 803 FinishSyncSessionJob(job); | 780 return FinishSyncSessionJob(job.Pass(), premature_exit); |
| 804 } | 781 } |
| 805 | 782 |
| 806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 783 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
| 807 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 784 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 808 | 785 |
| 809 // We are interested in recording time between local nudges for datatypes. | 786 // We are interested in recording time between local nudges for datatypes. |
| 810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 787 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
| 811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 788 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
| 812 return; | 789 return; |
| 813 | 790 |
| 814 base::TimeTicks now = TimeTicks::Now(); | 791 base::TimeTicks now = TimeTicks::Now(); |
| 815 // Update timing information for how often datatypes are triggering nudges. | 792 // Update timing information for how often datatypes are triggering nudges. |
| 816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); | 793 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); |
| 817 iter != info.types.end(); | 794 iter != info.types.end(); |
| 818 ++iter) { | 795 ++iter) { |
| 819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 796 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
| 820 last_local_nudges_by_model_type_[iter->first] = now; | 797 last_local_nudges_by_model_type_[iter->first] = now; |
| 821 if (previous.is_null()) | 798 if (previous.is_null()) |
| 822 continue; | 799 continue; |
| 823 | 800 |
| 824 #define PER_DATA_TYPE_MACRO(type_str) \ | 801 #define PER_DATA_TYPE_MACRO(type_str) \ |
| 825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 802 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
| 826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 803 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
| 827 #undef PER_DATA_TYPE_MACRO | 804 #undef PER_DATA_TYPE_MACRO |
| 828 } | 805 } |
| 829 } | 806 } |
| 830 | 807 |
| 831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { | 808 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
| 809 bool exited_prematurely) { | |
| 832 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 810 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 833 // Now update the status of the connection from SCM. We need this to decide | 811 // Now update the status of the connection from SCM. We need this to decide |
| 834 // whether we need to save/run future jobs. The notifications from SCM are not | 812 // whether we need to save/run future jobs. The notifications from SCM are |
| 835 // reliable. | 813 // not reliable. |
| 836 // | 814 // |
| 837 // TODO(rlarocque): crbug.com/110954 | 815 // TODO(rlarocque): crbug.com/110954 |
| 838 // We should get rid of the notifications and it is probably not needed to | 816 // We should get rid of the notifications and it is probably not needed to |
| 839 // maintain this status variable in 2 places. We should query it directly from | 817 // maintain this status variable in 2 places. We should query it directly |
| 840 // SCM when needed. | 818 // from SCM when needed. |
| 841 ServerConnectionManager* scm = session_context_->connection_manager(); | 819 ServerConnectionManager* scm = session_context_->connection_manager(); |
| 842 UpdateServerConnectionManagerStatus(scm->server_status()); | 820 UpdateServerConnectionManagerStatus(scm->server_status()); |
| 843 | 821 |
| 844 if (IsSyncingCurrentlySilenced()) { | 822 // Let job know that we're through syncing (calling SyncShare) at this point. |
| 845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 823 bool succeeded = false; |
| 846 // TODO(sync): Investigate whether we need to check job.purpose | 824 { |
| 847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | |
| 848 SaveJob(job); | |
| 849 return; // Nothing to do. | |
| 850 } else if (job.session->Succeeded() && | |
| 851 !job.config_params.ready_task.is_null()) { | |
| 852 // If this was a configuration job with a ready task, invoke it now that | |
| 853 // we finished successfully. | |
| 854 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 825 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 855 job.config_params.ready_task.Run(); | 826 succeeded = job->Finish(exited_prematurely); |
| 856 } | 827 } |
| 857 | 828 |
| 858 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 829 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
| 859 ScheduleNextSync(job); | 830 ScheduleNextSync(job.Pass(), succeeded); |
| 831 return succeeded; | |
| 860 } | 832 } |
| 861 | 833 |
| 862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { | 834 void SyncSchedulerImpl::ScheduleNextSync( |
| 835 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) { | |
| 863 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 836 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 864 DCHECK(!old_job.session->HasMoreToSync()); | 837 DCHECK(!finished_job->session()->HasMoreToSync()); |
| 865 | 838 |
| 866 AdjustPolling(&old_job); | 839 AdjustPolling(finished_job.get()); |
| 867 | 840 |
| 868 if (old_job.session->Succeeded()) { | 841 if (succeeded) { |
| 869 // Only reset backoff if we actually reached the server. | 842 // Only reset backoff if we actually reached the server. |
| 870 if (old_job.session->SuccessfullyReachedServer()) | 843 // It's possible that we reached the server on one attempt, then had an |
| 844 // error on the next (or didn't perform some of the server-communicating | |
| 845 // commands). We want to verify that, for all commands attempted, we | |
| 846 // successfully spoke with the server. Therefore, we verify no errors | |
| 847 // and at least one SYNCER_OK. | |
| 848 if (finished_job->session()->DidReachServer()) | |
| 871 wait_interval_.reset(); | 849 wait_interval_.reset(); |
| 872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 850 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
| 873 return; | 851 return; |
| 874 } | 852 } |
| 875 | 853 |
| 876 if (old_job.purpose == SyncSessionJob::POLL) { | 854 if (IsSyncingCurrentlySilenced()) { |
| 855 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | |
| 856 // If we're here, it's because |job| was silenced until a server specified | |
| 857 // time. (Note, it had to be |job|, because DecideOnJob would not permit | |
| 858 // any job through while in WaitInterval::THROTTLED). | |
| 859 scoped_ptr<SyncSessionJob> clone = finished_job->Clone(); | |
| 860 if (clone->purpose() == SyncSessionJob::NUDGE) | |
| 861 pending_nudge_ = clone.get(); | |
| 862 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) | |
| 863 wait_interval_->pending_configure_job = clone.get(); | |
| 864 else | |
| 865 clone.reset(); // Unthrottling is enough, no need to force a canary. | |
| 866 | |
| 867 RestartWaiting(clone.Pass()); | |
| 868 return; | |
| 869 } | |
| 870 | |
| 871 if (finished_job->purpose() == SyncSessionJob::POLL) { | |
| 877 return; // We don't retry POLL jobs. | 872 return; // We don't retry POLL jobs. |
| 878 } | 873 } |
| 879 | 874 |
| 880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 875 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
| 881 // if we don't succeed. Some types of errors are not likely to disappear on | 876 // if we don't succeed. Some types of errors are not likely to disappear on |
| 882 // their own. With the return values now available in the old_job.session, we | 877 // their own. With the return values now available in the old_job.session, |
| 883 // should be able to detect such errors and only retry when we detect | 878 // we should be able to detect such errors and only retry when we detect |
| 884 // transient errors. | 879 // transient errors. |
| 885 | 880 |
| 886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 881 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
| 887 mode_ == NORMAL_MODE) { | 882 mode_ == NORMAL_MODE) { |
| 888 // When in normal mode, we allow up to one nudge per backoff interval. It | 883 // When in normal mode, we allow up to one nudge per backoff interval. It |
| 889 // appears that this was our nudge for this interval, and it failed. | 884 // appears that this was our nudge for this interval, and it failed. |
| 890 // | 885 // |
| 891 // Note: This does not prevent us from running canary jobs. For example, an | 886 // Note: This does not prevent us from running canary jobs. For example, |
| 892 // IP address change might still result in another nudge being executed | 887 // an IP address change might still result in another nudge being executed |
| 893 // during this backoff interval. | 888 // during this backoff interval. |
| 894 SDVLOG(2) << "A nudge during backoff failed"; | 889 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
| 895 | 890 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); |
| 896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
| 897 DCHECK(!wait_interval_->had_nudge); | 891 DCHECK(!wait_interval_->had_nudge); |
| 898 | 892 |
| 899 wait_interval_->had_nudge = true; | 893 wait_interval_->had_nudge = true; |
| 900 InitOrCoalescePendingJob(old_job); | 894 DCHECK(!pending_nudge_); |
| 901 RestartWaiting(); | 895 |
| 896 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); | |
| 897 pending_nudge_ = new_job.get(); | |
| 898 RestartWaiting(new_job.Pass()); | |
| 902 } else { | 899 } else { |
| 903 // Either this is the first failure or a consecutive failure after our | 900 // Either this is the first failure or a consecutive failure after our |
| 904 // backoff timer expired. We handle it the same way in either case. | 901 // backoff timer expired. We handle it the same way in either case. |
| 905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 902 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
| 906 HandleContinuationError(old_job); | 903 HandleContinuationError(finished_job.get()); |
| 907 } | 904 } |
| 908 } | 905 } |
| 909 | 906 |
| 910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 907 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
| 911 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 908 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 912 | 909 |
| 913 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 910 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
| 914 syncer_short_poll_interval_seconds_ : | 911 syncer_short_poll_interval_seconds_ : |
| 915 syncer_long_poll_interval_seconds_; | 912 syncer_long_poll_interval_seconds_; |
| 916 bool rate_changed = !poll_timer_.IsRunning() || | 913 bool rate_changed = !poll_timer_.IsRunning() || |
| 917 poll != poll_timer_.GetCurrentDelay(); | 914 poll != poll_timer_.GetCurrentDelay(); |
| 918 | 915 |
| 919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | 916 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
| 920 poll_timer_.Reset(); | 917 poll_timer_.Reset(); |
| 921 | 918 |
| 922 if (!rate_changed) | 919 if (!rate_changed) |
| 923 return; | 920 return; |
| 924 | 921 |
| 925 // Adjust poll rate. | 922 // Adjust poll rate. |
| 926 poll_timer_.Stop(); | 923 poll_timer_.Stop(); |
| 927 poll_timer_.Start(FROM_HERE, poll, this, | 924 poll_timer_.Start(FROM_HERE, poll, this, |
| 928 &SyncSchedulerImpl::PollTimerCallback); | 925 &SyncSchedulerImpl::PollTimerCallback); |
| 929 } | 926 } |
| 930 | 927 |
| 931 void SyncSchedulerImpl::RestartWaiting() { | 928 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
| 932 CHECK(wait_interval_.get()); | 929 CHECK(wait_interval_.get()); |
| 933 wait_interval_->timer.Stop(); | 930 wait_interval_->timer.Stop(); |
| 934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 931 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
| 935 this, &SyncSchedulerImpl::DoCanaryJob); | 932 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| 933 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
| 934 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
| 935 weak_ptr_factory_.GetWeakPtr(), | |
| 936 base::Passed(&job))); | |
| 937 } else { | |
| 938 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
| 939 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | |
| 940 weak_ptr_factory_.GetWeakPtr(), | |
| 941 base::Passed(&job))); | |
| 942 } | |
| 936 } | 943 } |
| 937 | 944 |
| 938 void SyncSchedulerImpl::HandleContinuationError( | 945 void SyncSchedulerImpl::HandleContinuationError( |
| 939 const SyncSessionJob& old_job) { | 946 const SyncSessionJob* old_job) { |
| 940 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 947 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 941 if (DCHECK_IS_ON()) { | 948 if (DCHECK_IS_ON()) { |
| 942 if (IsBackingOff()) { | 949 if (IsBackingOff()) { |
| 943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 950 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); |
| 944 } | 951 } |
| 945 } | 952 } |
| 946 | 953 |
| 947 TimeDelta length = delay_provider_->GetDelay( | 954 TimeDelta length = delay_provider_->GetDelay( |
| 948 IsBackingOff() ? wait_interval_->length : | 955 IsBackingOff() ? wait_interval_->length : |
| 949 delay_provider_->GetInitialDelay( | 956 delay_provider_->GetInitialDelay( |
| 950 old_job.session->status_controller().model_neutral_state())); | 957 old_job->session()->status_controller().model_neutral_state())); |
| 951 | 958 |
| 952 SDVLOG(2) << "In handle continuation error with " | 959 SDVLOG(2) << "In handle continuation error with " |
| 953 << SyncSessionJob::GetPurposeString(old_job.purpose) | 960 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
| 954 << " job. The time delta(ms) is " | 961 << " job. The time delta(ms) is " |
| 955 << length.InMilliseconds(); | 962 << length.InMilliseconds(); |
| 956 | 963 |
| 957 // This will reset the had_nudge variable as well. | 964 // This will reset the had_nudge variable as well. |
| 958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 965 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| 959 length)); | 966 length)); |
| 960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 967 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); |
| 968 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
| 969 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
| 961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 970 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
| 962 // Config params should always get set. | 971 // Config params should always get set. |
| 963 DCHECK(!old_job.config_params.ready_task.is_null()); | 972 DCHECK(!old_job->config_params().ready_task.is_null()); |
| 964 SyncSession* old = old_job.session.get(); | 973 wait_interval_->pending_configure_job = new_job.get(); |
| 965 SyncSession* s(new SyncSession(session_context_, this, | |
| 966 old->source(), old->routing_info(), old->workers())); | |
| 967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | |
| 968 make_linked_ptr(s), false, old_job.config_params, | |
| 969 FROM_HERE); | |
| 970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | |
| 971 } else { | 974 } else { |
| 972 // We are not in configuration mode. So wait_interval's pending job | 975 // We are not in configuration mode. So wait_interval's pending job |
| 973 // should be null. | 976 // should be null. |
| 974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 977 DCHECK(wait_interval_->pending_configure_job == NULL); |
| 978 DCHECK(!pending_nudge_); | |
| 979 pending_nudge_ = new_job.get(); | |
| 980 } | |
| 975 | 981 |
| 976 // TODO(lipalani) - handle clear user data. | 982 RestartWaiting(new_job.Pass()); |
| 977 InitOrCoalescePendingJob(old_job); | |
| 978 } | |
| 979 RestartWaiting(); | |
| 980 } | 983 } |
| 981 | 984 |
| 982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 985 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
| 983 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 986 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
| 984 DCHECK(weak_handle_this_.IsInitialized()); | 987 DCHECK(weak_handle_this_.IsInitialized()); |
| 985 SDVLOG(3) << "Posting StopImpl"; | 988 SDVLOG(3) << "Posting StopImpl"; |
| 986 weak_handle_this_.Call(FROM_HERE, | 989 weak_handle_this_.Call(FROM_HERE, |
| 987 &SyncSchedulerImpl::StopImpl, | 990 &SyncSchedulerImpl::StopImpl, |
| 988 callback); | 991 callback); |
| 989 } | 992 } |
| 990 | 993 |
| 991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 994 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
| 992 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 995 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 993 SDVLOG(2) << "StopImpl called"; | 996 SDVLOG(2) << "StopImpl called"; |
| 994 | 997 |
| 995 // Kill any in-flight method calls. | 998 // Kill any in-flight method calls. |
| 996 weak_ptr_factory_.InvalidateWeakPtrs(); | 999 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 997 wait_interval_.reset(); | 1000 wait_interval_.reset(); |
| 998 poll_timer_.Stop(); | 1001 poll_timer_.Stop(); |
| 999 if (started_) { | 1002 if (started_) { |
| 1000 started_ = false; | 1003 started_ = false; |
| 1001 } | 1004 } |
| 1002 if (!callback.is_null()) | 1005 if (!callback.is_null()) |
| 1003 callback.Run(); | 1006 callback.Run(); |
| 1004 } | 1007 } |
| 1005 | 1008 |
| 1006 void SyncSchedulerImpl::DoCanaryJob() { | 1009 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
| 1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1010 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1008 SDVLOG(2) << "Do canary job"; | 1011 SDVLOG(2) << "Do canary job"; |
| 1009 DoPendingJobIfPossible(true); | 1012 |
| 1013 // Only set canary privileges here, when we are about to run the job. This | |
| 1014 // avoids confusion in managing canary bits during scheduling, when you | |
| 1015 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that | |
| 1016 // was scheduled as canary, and send it to an "unscheduled" state. | |
| 1017 to_be_canary->GrantCanaryPrivilege(); | |
| 1018 | |
| 1019 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { | |
| 1020 // TODO(tim): We should be able to remove this... | |
| 1021 scoped_ptr<SyncSession> temp = CreateSyncSession( | |
| 1022 to_be_canary->session()->source()).Pass(); | |
| 1023 // The routing info might have been changed since we cached the | |
| 1024 // pending nudge. Update it by coalescing to the latest. | |
| 1025 to_be_canary->mutable_session()->Coalesce(*(temp)); | |
| 1026 } | |
| 1027 DoSyncSessionJob(to_be_canary.Pass()); | |
| 1010 } | 1028 } |
| 1011 | 1029 |
| 1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { | 1030 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
| 1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1014 SyncSessionJob* job_to_execute = NULL; | 1032 // If we find a scheduled pending_ job, abandon the old one and return a |
| 1033 // a clone. If unscheduled, just hand over ownership. | |
| 1034 scoped_ptr<SyncSessionJob> candidate; | |
| 1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1035 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
| 1016 && wait_interval_->pending_configure_job.get()) { | 1036 && wait_interval_->pending_configure_job) { |
| 1017 SDVLOG(2) << "Found pending configure job"; | 1037 SDVLOG(2) << "Found pending configure job"; |
| 1018 job_to_execute = wait_interval_->pending_configure_job.get(); | 1038 candidate = |
| 1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 1039 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); |
| 1040 wait_interval_->pending_configure_job = candidate.get(); | |
| 1041 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
| 1020 SDVLOG(2) << "Found pending nudge job"; | 1042 SDVLOG(2) << "Found pending nudge job"; |
| 1021 | 1043 candidate = pending_nudge_->CloneAndAbandon(); |
| 1022 scoped_ptr<SyncSession> session(CreateSyncSession( | 1044 pending_nudge_ = candidate.get(); |
| 1023 pending_nudge_->session->source())); | 1045 unscheduled_nudge_storage_.reset(); |
| 1024 | |
| 1025 // Also the routing info might have been changed since we cached the | |
| 1026 // pending nudge. Update it by coalescing to the latest. | |
| 1027 pending_nudge_->session->Coalesce(*(session.get())); | |
| 1028 // The pending nudge would be cleared in the DoSyncSessionJob function. | |
| 1029 job_to_execute = pending_nudge_.get(); | |
| 1030 } | 1046 } |
| 1031 | 1047 return candidate.Pass(); |
| 1032 if (job_to_execute != NULL) { | |
| 1033 SDVLOG(2) << "Executing pending job"; | |
| 1034 SyncSessionJob copy = *job_to_execute; | |
| 1035 copy.is_canary_job = is_canary_job; | |
| 1036 DoSyncSessionJob(copy); | |
| 1037 } | |
| 1038 } | 1048 } |
| 1039 | 1049 |
| 1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( | 1050 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( |
| 1041 const SyncSourceInfo& source) { | 1051 const SyncSourceInfo& source) { |
| 1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1052 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1043 DVLOG(2) << "Creating sync session with routes " | 1053 DVLOG(2) << "Creating sync session with routes " |
| 1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1054 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
| 1045 | 1055 |
| 1046 SyncSourceInfo info(source); | 1056 SyncSourceInfo info(source); |
| 1047 SyncSession* session(new SyncSession(session_context_, this, info, | 1057 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, |
| 1048 session_context_->routing_info(), session_context_->workers())); | 1058 session_context_->routing_info(), session_context_->workers())); |
| 1049 | |
| 1050 return session; | |
| 1051 } | 1059 } |
| 1052 | 1060 |
| 1053 void SyncSchedulerImpl::PollTimerCallback() { | 1061 void SyncSchedulerImpl::PollTimerCallback() { |
| 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1062 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1055 ModelSafeRoutingInfo r; | 1063 ModelSafeRoutingInfo r; |
| 1056 ModelTypeInvalidationMap invalidation_map = | 1064 ModelTypeInvalidationMap invalidation_map = |
| 1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | 1065 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
| 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | 1066 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
| 1059 SyncSession* s = CreateSyncSession(info); | 1067 scoped_ptr<SyncSession> s(CreateSyncSession(info)); |
| 1060 | 1068 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
| 1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1069 TimeTicks::Now(), |
| 1062 make_linked_ptr(s), | 1070 s.Pass(), |
| 1063 false, | 1071 ConfigurationParams(), |
| 1064 ConfigurationParams(), | 1072 FROM_HERE)); |
| 1065 FROM_HERE); | 1073 ScheduleSyncSessionJob(job.Pass()); |
| 1066 | |
| 1067 ScheduleSyncSessionJob(job); | |
| 1068 } | 1074 } |
| 1069 | 1075 |
| 1070 void SyncSchedulerImpl::Unthrottle() { | 1076 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
| 1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1077 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1078 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| 1073 SDVLOG(2) << "Unthrottled."; | 1079 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
| 1074 DoCanaryJob(); | 1080 << "canary."; |
| 1081 if (to_be_canary.get()) | |
| 1082 DoCanaryJob(to_be_canary.Pass()); | |
| 1083 | |
| 1084 // TODO(tim): The way DecideOnJob works today, canary privileges aren't | |
| 1085 // enough to bypass a THROTTLED wait interval, which would suggest we need | |
| 1086 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is | |
| 1087 // probably the "right" thing to do). Bug 154216. | |
| 1075 wait_interval_.reset(); | 1088 wait_interval_.reset(); |
| 1076 } | 1089 } |
| 1077 | 1090 |
| 1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1091 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
| 1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1092 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1093 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
| 1081 } | 1094 } |
| 1082 | 1095 |
| 1083 bool SyncSchedulerImpl::IsBackingOff() const { | 1096 bool SyncSchedulerImpl::IsBackingOff() const { |
| 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1097 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1085 return wait_interval_.get() && wait_interval_->mode == | 1098 return wait_interval_.get() && wait_interval_->mode == |
| 1086 WaitInterval::EXPONENTIAL_BACKOFF; | 1099 WaitInterval::EXPONENTIAL_BACKOFF; |
| 1087 } | 1100 } |
| 1088 | 1101 |
| 1089 void SyncSchedulerImpl::OnSilencedUntil( | 1102 void SyncSchedulerImpl::OnSilencedUntil( |
| 1090 const base::TimeTicks& silenced_until) { | 1103 const base::TimeTicks& silenced_until) { |
| 1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1104 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1105 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
| 1093 silenced_until - TimeTicks::Now())); | 1106 silenced_until - TimeTicks::Now())); |
| 1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, | |
| 1095 &SyncSchedulerImpl::Unthrottle); | |
| 1096 } | 1107 } |
| 1097 | 1108 |
| 1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1109 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
| 1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1110 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1100 return wait_interval_.get() && wait_interval_->mode == | 1111 return wait_interval_.get() && wait_interval_->mode == |
| 1101 WaitInterval::THROTTLED; | 1112 WaitInterval::THROTTLED; |
| 1102 } | 1113 } |
| 1103 | 1114 |
| 1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1115 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
| 1105 const base::TimeDelta& new_interval) { | 1116 const base::TimeDelta& new_interval) { |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1159 | 1170 |
| 1160 #undef SDVLOG_LOC | 1171 #undef SDVLOG_LOC |
| 1161 | 1172 |
| 1162 #undef SDVLOG | 1173 #undef SDVLOG |
| 1163 | 1174 |
| 1164 #undef SLOG | 1175 #undef SLOG |
| 1165 | 1176 |
| 1166 #undef ENUM_CASE | 1177 #undef ENUM_CASE |
| 1167 | 1178 |
| 1168 } // namespace syncer | 1179 } // namespace syncer |
| OLD | NEW |