Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <cstring> | 8 #include <cstring> |
| 9 | 9 |
| 10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/bind_helpers.h" | |
| 12 #include "base/compiler_specific.h" | 13 #include "base/compiler_specific.h" |
| 13 #include "base/location.h" | 14 #include "base/location.h" |
| 14 #include "base/logging.h" | 15 #include "base/logging.h" |
| 15 #include "base/message_loop.h" | 16 #include "base/message_loop.h" |
| 16 #include "sync/engine/backoff_delay_provider.h" | 17 #include "sync/engine/backoff_delay_provider.h" |
| 17 #include "sync/engine/syncer.h" | 18 #include "sync/engine/syncer.h" |
| 18 #include "sync/engine/throttled_data_type_tracker.h" | 19 #include "sync/engine/throttled_data_type_tracker.h" |
| 19 #include "sync/protocol/proto_enum_conversions.h" | 20 #include "sync/protocol/proto_enum_conversions.h" |
| 20 #include "sync/protocol/sync.pb.h" | 21 #include "sync/protocol/sync.pb.h" |
| 21 #include "sync/util/data_type_histogram.h" | 22 #include "sync/util/data_type_histogram.h" |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 76 : source(source), | 77 : source(source), |
| 77 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
| 78 routing_info(routing_info), | 79 routing_info(routing_info), |
| 79 ready_task(ready_task) { | 80 ready_task(ready_task) { |
| 80 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
| 81 } | 82 } |
| 82 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
| 83 | 84 |
| 84 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
| 85 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
| 86 had_nudge(false) { | 87 had_nudge(false), |
| 88 pending_configure_job(NULL) { | |
|
akalin
2012/09/25 22:37:24
three different styles for empty function bodies h
tim (not reviewing)
2012/10/08 00:20:03
Done.
| |
| 87 } | 89 } |
| 88 | 90 |
| 91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
| 92 : mode(mode), had_nudge(false), length(length), | |
| 93 pending_configure_job(NULL) { } | |
| 94 | |
| 89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
| 90 | 96 |
| 91 #define ENUM_CASE(x) case x: return #x; break; | 97 #define ENUM_CASE(x) case x: return #x; break; |
| 92 | 98 |
| 93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
| 94 switch (mode) { | 100 switch (mode) { |
| 95 ENUM_CASE(UNKNOWN); | 101 ENUM_CASE(UNKNOWN); |
| 96 ENUM_CASE(EXPONENTIAL_BACKOFF); | 102 ENUM_CASE(EXPONENTIAL_BACKOFF); |
| 97 ENUM_CASE(THROTTLED); | 103 ENUM_CASE(THROTTLED); |
| 98 } | 104 } |
| 99 NOTREACHED(); | 105 NOTREACHED(); |
| 100 return ""; | 106 return ""; |
| 101 } | 107 } |
| 102 | 108 |
| 103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() | |
| 104 : purpose(UNKNOWN), | |
| 105 is_canary_job(false) { | |
| 106 } | |
| 107 | |
| 108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} | |
| 109 | |
| 110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | |
| 111 base::TimeTicks start, | |
| 112 linked_ptr<sessions::SyncSession> session, | |
| 113 bool is_canary_job, | |
| 114 const ConfigurationParams& config_params, | |
| 115 const tracked_objects::Location& from_here) | |
| 116 : purpose(purpose), | |
| 117 scheduled_start(start), | |
| 118 session(session), | |
| 119 is_canary_job(is_canary_job), | |
| 120 config_params(config_params), | |
| 121 from_here(from_here) { | |
| 122 } | |
| 123 | |
| 124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( | |
| 125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { | |
| 126 switch (purpose) { | |
| 127 ENUM_CASE(UNKNOWN); | |
| 128 ENUM_CASE(POLL); | |
| 129 ENUM_CASE(NUDGE); | |
| 130 ENUM_CASE(CONFIGURATION); | |
| 131 } | |
| 132 NOTREACHED(); | |
| 133 return ""; | |
| 134 } | |
| 135 | |
| 136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
| 137 NudgeSource source) { | 110 NudgeSource source) { |
| 138 switch (source) { | 111 switch (source) { |
| 139 case NUDGE_SOURCE_NOTIFICATION: | 112 case NUDGE_SOURCE_NOTIFICATION: |
| 140 return GetUpdatesCallerInfo::NOTIFICATION; | 113 return GetUpdatesCallerInfo::NOTIFICATION; |
| 141 case NUDGE_SOURCE_LOCAL: | 114 case NUDGE_SOURCE_LOCAL: |
| 142 return GetUpdatesCallerInfo::LOCAL; | 115 return GetUpdatesCallerInfo::LOCAL; |
| 143 case NUDGE_SOURCE_CONTINUATION: | 116 case NUDGE_SOURCE_CONTINUATION: |
| 144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 117 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
| 145 case NUDGE_SOURCE_LOCAL_REFRESH: | 118 case NUDGE_SOURCE_LOCAL_REFRESH: |
| 146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 119 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
| 147 case NUDGE_SOURCE_UNKNOWN: | 120 case NUDGE_SOURCE_UNKNOWN: |
| 148 return GetUpdatesCallerInfo::UNKNOWN; | 121 return GetUpdatesCallerInfo::UNKNOWN; |
| 149 default: | 122 default: |
| 150 NOTREACHED(); | 123 NOTREACHED(); |
| 151 return GetUpdatesCallerInfo::UNKNOWN; | 124 return GetUpdatesCallerInfo::UNKNOWN; |
| 152 } | 125 } |
| 153 } | 126 } |
| 154 | 127 |
| 155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
| 156 : mode(mode), had_nudge(false), length(length) { } | |
| 157 | |
| 158 // Helper macros to log with the syncer thread name; useful when there | 128 // Helper macros to log with the syncer thread name; useful when there |
| 159 // are multiple syncer threads involved. | 129 // are multiple syncer threads involved. |
| 160 | 130 |
| 161 #define SLOG(severity) LOG(severity) << name_ << ": " | 131 #define SLOG(severity) LOG(severity) << name_ << ": " |
| 162 | 132 |
| 163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 133 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
| 164 | 134 |
| 165 #define SDVLOG_LOC(from_here, verbose_level) \ | 135 #define SDVLOG_LOC(from_here, verbose_level) \ |
| 166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 136 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
| 167 | 137 |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 198 syncer_short_poll_interval_seconds_( | 168 syncer_short_poll_interval_seconds_( |
| 199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 169 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
| 200 syncer_long_poll_interval_seconds_( | 170 syncer_long_poll_interval_seconds_( |
| 201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 171 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
| 202 sessions_commit_delay_( | 172 sessions_commit_delay_( |
| 203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 173 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
| 204 mode_(NORMAL_MODE), | 174 mode_(NORMAL_MODE), |
| 205 // Start with assuming everything is fine with the connection. | 175 // Start with assuming everything is fine with the connection. |
| 206 // At the end of the sync cycle we would have the correct status. | 176 // At the end of the sync cycle we would have the correct status. |
| 207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 177 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
| 178 pending_nudge_(NULL), | |
| 208 delay_provider_(delay_provider), | 179 delay_provider_(delay_provider), |
| 209 syncer_(syncer), | 180 syncer_(syncer), |
| 210 session_context_(context), | 181 session_context_(context), |
| 211 no_scheduling_allowed_(false) { | 182 no_scheduling_allowed_(false) { |
| 212 DCHECK(sync_loop_); | 183 DCHECK(sync_loop_); |
| 213 } | 184 } |
| 214 | 185 |
| 215 SyncSchedulerImpl::~SyncSchedulerImpl() { | 186 SyncSchedulerImpl::~SyncSchedulerImpl() { |
| 216 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 187 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 217 StopImpl(base::Closure()); | 188 StopImpl(base::Closure()); |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 234 void SyncSchedulerImpl::OnConnectionStatusChange() { | 205 void SyncSchedulerImpl::OnConnectionStatusChange() { |
| 235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 206 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
| 236 // Optimistically assume that the connection is fixed and try | 207 // Optimistically assume that the connection is fixed and try |
| 237 // connecting. | 208 // connecting. |
| 238 OnServerConnectionErrorFixed(); | 209 OnServerConnectionErrorFixed(); |
| 239 } | 210 } |
| 240 } | 211 } |
| 241 | 212 |
| 242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 213 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
| 243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 214 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
| 215 // There could be a pending nudge or configuration job in several cases: | |
| 216 // | |
| 217 // 1. We're in exponential backoff. | |
| 218 // 2. We're silenced / throttled. | |
| 219 // 3. A nudge was saved previously due to not having a valid auth token. | |
| 220 // 4. A nudge was scheduled + saved while in configuration mode. | |
| 221 // | |
| 222 // In all cases except (2), we want to retry contacting the server. We | |
| 223 // call DoCanaryJob to achieve this, and note that nothing -- not even a | |
| 224 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
| 225 // has the authority to do that is the Unthrottle timer. | |
| 226 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
| 227 if (!pending.get()) | |
| 228 return; | |
| 229 | |
| 244 PostTask(FROM_HERE, "DoCanaryJob", | 230 PostTask(FROM_HERE, "DoCanaryJob", |
| 245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 231 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
| 246 weak_ptr_factory_.GetWeakPtr())); | 232 weak_ptr_factory_.GetWeakPtr(), |
| 247 | 233 base::Passed(&pending))); |
| 248 } | 234 } |
| 249 | 235 |
| 250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 236 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
| 251 HttpResponse::ServerConnectionCode code) { | 237 HttpResponse::ServerConnectionCode code) { |
| 252 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 238 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 253 SDVLOG(2) << "New server connection code: " | 239 SDVLOG(2) << "New server connection code: " |
| 254 << HttpResponse::GetServerConnectionCodeString(code); | 240 << HttpResponse::GetServerConnectionCodeString(code); |
| 255 | 241 |
| 256 connection_code_ = code; | 242 connection_code_ = code; |
| 257 } | 243 } |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 273 Mode old_mode = mode_; | 259 Mode old_mode = mode_; |
| 274 mode_ = mode; | 260 mode_ = mode; |
| 275 AdjustPolling(NULL); // Will kick start poll timer if needed. | 261 AdjustPolling(NULL); // Will kick start poll timer if needed. |
| 276 | 262 |
| 277 if (old_mode != mode_) { | 263 if (old_mode != mode_) { |
| 278 // We just changed our mode. See if there are any pending jobs that we could | 264 // We just changed our mode. See if there are any pending jobs that we could |
| 279 // execute in the new mode. | 265 // execute in the new mode. |
| 280 if (mode_ == NORMAL_MODE) { | 266 if (mode_ == NORMAL_MODE) { |
| 281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 267 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
| 282 // has not yet completed. | 268 // has not yet completed. |
| 283 DCHECK(!wait_interval_.get() || | 269 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
| 284 !wait_interval_->pending_configure_job.get()); | |
| 285 } | 270 } |
| 286 | 271 |
| 287 DoPendingJobIfPossible(false); | 272 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
| 273 if (pending.get()) { | |
| 274 // TODO(tim): We should be able to remove this... | |
| 275 scoped_ptr<SyncSession> session(CreateSyncSession( | |
| 276 pending->session()->source())); | |
| 277 // Also the routing info might have been changed since we cached the | |
| 278 // pending nudge. Update it by coalescing to the latest. | |
| 279 pending->mutable_session()->Coalesce(*(session)); | |
| 280 SDVLOG(2) << "Executing pending job. Good luck!"; | |
| 281 DoSyncSessionJob(pending.Pass()); | |
| 282 } | |
| 288 } | 283 } |
| 289 } | 284 } |
| 290 | 285 |
| 291 void SyncSchedulerImpl::SendInitialSnapshot() { | 286 void SyncSchedulerImpl::SendInitialSnapshot() { |
| 292 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 287 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 288 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
| 294 SyncSourceInfo(), ModelSafeRoutingInfo(), | 289 SyncSourceInfo(), ModelSafeRoutingInfo(), |
| 295 std::vector<ModelSafeWorker*>())); | 290 std::vector<ModelSafeWorker*>())); |
| 296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 291 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
| 297 event.snapshot = dummy->TakeSnapshot(); | 292 event.snapshot = dummy->TakeSnapshot(); |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 321 bool SyncSchedulerImpl::ScheduleConfiguration( | 316 bool SyncSchedulerImpl::ScheduleConfiguration( |
| 322 const ConfigurationParams& params) { | 317 const ConfigurationParams& params) { |
| 323 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 318 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 319 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
| 325 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 320 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
| 326 DCHECK(!params.ready_task.is_null()); | 321 DCHECK(!params.ready_task.is_null()); |
| 327 SDVLOG(2) << "Reconfiguring syncer."; | 322 SDVLOG(2) << "Reconfiguring syncer."; |
| 328 | 323 |
| 329 // Only one configuration is allowed at a time. Verify we're not waiting | 324 // Only one configuration is allowed at a time. Verify we're not waiting |
| 330 // for a pending configure job. | 325 // for a pending configure job. |
| 331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | 326 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
| 332 | 327 |
| 333 ModelSafeRoutingInfo restricted_routes; | 328 ModelSafeRoutingInfo restricted_routes; |
| 334 BuildModelSafeParams(params.types_to_download, | 329 BuildModelSafeParams(params.types_to_download, |
| 335 params.routing_info, | 330 params.routing_info, |
| 336 &restricted_routes); | 331 &restricted_routes); |
| 337 session_context_->set_routing_info(params.routing_info); | 332 session_context_->set_routing_info(params.routing_info); |
| 338 | 333 |
| 339 // Only reconfigure if we have types to download. | 334 // Only reconfigure if we have types to download. |
| 340 if (!params.types_to_download.Empty()) { | 335 if (!params.types_to_download.Empty()) { |
| 341 DCHECK(!restricted_routes.empty()); | 336 DCHECK(!restricted_routes.empty()); |
| 342 linked_ptr<SyncSession> session(new SyncSession( | 337 scoped_ptr<SyncSession> session(new SyncSession( |
| 343 session_context_, | 338 session_context_, |
| 344 this, | 339 this, |
| 345 SyncSourceInfo(params.source, | 340 SyncSourceInfo(params.source, |
| 346 ModelSafeRoutingInfoToStateMap( | 341 ModelSafeRoutingInfoToStateMap( |
| 347 restricted_routes, | 342 restricted_routes, |
| 348 std::string())), | 343 std::string())), |
| 349 restricted_routes, | 344 restricted_routes, |
| 350 session_context_->workers())); | 345 session_context_->workers())); |
| 351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | 346 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
| 352 TimeTicks::Now(), | 347 SyncSessionJob::CONFIGURATION, |
| 353 session, | 348 TimeTicks::Now(), |
| 354 false, | 349 session.Pass(), |
| 355 params, | 350 params, |
| 356 FROM_HERE); | 351 FROM_HERE)); |
| 357 DoSyncSessionJob(job); | 352 bool succeeded = DoSyncSessionJob(job.Pass()); |
| 358 | 353 |
| 359 // If we failed, the job would have been saved as the pending configure | 354 // If we failed, the job would have been saved as the pending configure |
| 360 // job and a wait interval would have been set. | 355 // job and a wait interval would have been set. |
| 361 if (!session->Succeeded()) { | 356 if (!succeeded) { |
| 362 DCHECK(wait_interval_.get() && | 357 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
| 363 wait_interval_->pending_configure_job.get()); | |
| 364 return false; | 358 return false; |
| 365 } | 359 } |
| 366 } else { | 360 } else { |
| 367 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 361 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
| 368 params.ready_task.Run(); | 362 params.ready_task.Run(); |
| 369 } | 363 } |
| 370 | 364 |
| 371 return true; | 365 return true; |
| 372 } | 366 } |
| 373 | 367 |
| 374 SyncSchedulerImpl::JobProcessDecision | 368 SyncSchedulerImpl::JobProcessDecision |
| 375 SyncSchedulerImpl::DecideWhileInWaitInterval( | 369 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) { |
| 376 const SyncSessionJob& job) { | |
| 377 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 370 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 378 DCHECK(wait_interval_.get()); | 371 DCHECK(wait_interval_.get()); |
| 379 | 372 |
| 380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 373 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
| 381 << WaitInterval::GetModeString(wait_interval_->mode) | 374 << WaitInterval::GetModeString(wait_interval_->mode) |
| 382 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 375 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
| 383 << (job.is_canary_job ? " (canary)" : ""); | 376 << (job->is_canary() ? " (canary)" : ""); |
| 384 | 377 |
| 385 if (job.purpose == SyncSessionJob::POLL) | 378 if (job->purpose() == SyncSessionJob::POLL) |
| 386 return DROP; | 379 return DROP; |
| 387 | 380 |
| 388 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 381 // If we save a job while in a WaitInterval, there is a well-defined moment |
| 389 job.purpose == SyncSessionJob::CONFIGURATION); | 382 // in time in the future when it makes sense for that SAVE-worthy job to try |
| 383 // running again -- the end of the WaitInterval. | |
| 384 DCHECK(job->purpose() == SyncSessionJob::NUDGE || | |
| 385 job->purpose() == SyncSessionJob::CONFIGURATION); | |
| 386 | |
| 387 // If throttled, there's a clock ticking to unthrottle. We want to get | |
| 388 // on the same train. | |
| 390 if (wait_interval_->mode == WaitInterval::THROTTLED) | 389 if (wait_interval_->mode == WaitInterval::THROTTLED) |
| 391 return SAVE; | 390 return SAVE; |
| 392 | 391 |
| 393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 392 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| 394 if (job.purpose == SyncSessionJob::NUDGE) { | 393 if (job->purpose() == SyncSessionJob::NUDGE) { |
| 395 if (mode_ == CONFIGURATION_MODE) | 394 if (mode_ == CONFIGURATION_MODE) |
| 396 return SAVE; | 395 return SAVE; |
| 397 | 396 |
| 398 // If we already had one nudge then just drop this nudge. We will retry | 397 // If we already had one nudge then just drop this nudge. We will retry |
| 399 // later when the timer runs out. | 398 // later when the timer runs out. |
| 400 if (!job.is_canary_job) | 399 if (!job->is_canary()) |
| 401 return wait_interval_->had_nudge ? DROP : CONTINUE; | 400 return wait_interval_->had_nudge ? DROP : CONTINUE; |
| 402 else // We are here because timer ran out. So retry. | 401 else // We are here because timer ran out. So retry. |
| 403 return CONTINUE; | 402 return CONTINUE; |
| 404 } | 403 } |
| 405 return job.is_canary_job ? CONTINUE : SAVE; | 404 return job->is_canary() ? CONTINUE : SAVE; |
| 406 } | 405 } |
| 407 | 406 |
| 408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 407 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
| 409 const SyncSessionJob& job) { | 408 const SyncSessionJob* job) { |
| 410 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 409 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 411 | 410 |
| 412 // See if our type is throttled. | 411 // See if our type is throttled. |
| 413 ModelTypeSet throttled_types = | 412 ModelTypeSet throttled_types = |
| 414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 413 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
| 415 if (job.purpose == SyncSessionJob::NUDGE && | 414 if (job->purpose() == SyncSessionJob::NUDGE && |
| 416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 415 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
| 417 ModelTypeSet requested_types; | 416 ModelTypeSet requested_types; |
| 418 for (ModelTypeStateMap::const_iterator i = | 417 for (ModelTypeStateMap::const_iterator i = |
| 419 job.session->source().types.begin(); | 418 job->session()->source().types.begin(); |
| 420 i != job.session->source().types.end(); | 419 i != job->session()->source().types.end(); |
| 421 ++i) { | 420 ++i) { |
| 422 requested_types.Put(i->first); | 421 requested_types.Put(i->first); |
| 423 } | 422 } |
| 424 | 423 |
| 424 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
| 425 // a per-datatype "unthrottle" event as something that should force a | |
| 426 // canary job. For this reason, there's no good time to reschedule this job | |
| 427 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
| 428 // Note that there may already be such an event if we're in a WaitInterval, | |
| 429 // so we can retry it then. | |
| 425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 430 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
|
rlarocque
2012/09/24 21:15:14
This isn't related to your change, but your commen
tim (not reviewing)
2012/10/08 00:20:03
Can you define 'forget'? The job would have accoun
rlarocque
2012/10/08 19:18:04
When we calculate the items to commit in GetCommit
tim (not reviewing)
2012/10/11 17:35:14
Hm, I see. Good point. I filed crbug.com/155296 t
| |
| 426 return SAVE; | 431 return SAVE; |
| 427 } | 432 } |
| 428 | 433 |
| 429 if (wait_interval_.get()) | 434 if (wait_interval_.get()) |
| 430 return DecideWhileInWaitInterval(job); | 435 return DecideWhileInWaitInterval(job); |
| 431 | 436 |
| 432 if (mode_ == CONFIGURATION_MODE) { | 437 if (mode_ == CONFIGURATION_MODE) { |
| 433 if (job.purpose == SyncSessionJob::NUDGE) | 438 if (job->purpose() == SyncSessionJob::NUDGE) |
| 434 return SAVE; | 439 return SAVE; // Running requires a mode switch. |
| 435 else if (job.purpose == SyncSessionJob::CONFIGURATION) | 440 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
| 436 return CONTINUE; | 441 return CONTINUE; |
| 437 else | 442 else |
| 438 return DROP; | 443 return DROP; |
| 439 } | 444 } |
| 440 | 445 |
| 441 // We are in normal mode. | 446 // We are in normal mode. |
| 442 DCHECK_EQ(mode_, NORMAL_MODE); | 447 DCHECK_EQ(mode_, NORMAL_MODE); |
| 443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 448 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION); |
| 444 | 449 |
| 445 // Note about some subtle scheduling semantics. | 450 // Note about some subtle scheduling semantics. |
| 446 // | 451 // |
| 447 // It's possible at this point that |job| is known to be unnecessary, and | 452 // It's possible at this point that |job| is known to be unnecessary, and |
| 448 // dropping it would be perfectly safe and correct. Consider | 453 // dropping it would be perfectly safe and correct. Consider |
| 449 // | 454 // |
| 450 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 455 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
| 451 // the time that the last successful all-datatype NUDGE completed. | 456 // the time that the last successful all-datatype NUDGE completed. |
| 452 // | 457 // |
| 453 // 2) |job| is a NUDGE (for any combination of types) with a | 458 // 2) |job| is a NUDGE (for any combination of types) with a |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 476 // * It's not strictly "impossible", but it would be reentrant and hence | 481 // * It's not strictly "impossible", but it would be reentrant and hence |
| 477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 482 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
| 478 // legal side effect of any of the work being done as part of a sync cycle. | 483 // legal side effect of any of the work being done as part of a sync cycle. |
| 479 // See |no_scheduling_allowed_| for details. | 484 // See |no_scheduling_allowed_| for details. |
| 480 | 485 |
| 481 // Decision now rests on state of auth tokens. | 486 // Decision now rests on state of auth tokens. |
| 482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 487 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
| 483 return CONTINUE; | 488 return CONTINUE; |
| 484 | 489 |
| 485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 490 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
| 486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 491 // Running the job would require updated auth, so we can't honour |
| 492 // job.scheduled_start(). | |
| 493 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
| 487 } | 494 } |
| 488 | 495 |
| 489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 496 bool SyncSchedulerImpl::ShouldRunJobSaveIfNecessary(SyncSessionJob* job) { |
| 490 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
| 491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | |
| 492 if (pending_nudge_.get() == NULL) { | |
| 493 SDVLOG(2) << "Creating a pending nudge job"; | |
| 494 SyncSession* s = job.session.get(); | |
| 495 | |
| 496 // Get a fresh session with similar configuration as before (resets | |
| 497 // StatusController). | |
| 498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | |
| 499 s->delegate(), s->source(), s->routing_info(), s->workers())); | |
| 500 | |
| 501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | |
| 502 make_linked_ptr(session.release()), false, | |
| 503 ConfigurationParams(), job.from_here); | |
| 504 pending_nudge_.reset(new SyncSessionJob(new_job)); | |
| 505 return; | |
| 506 } | |
| 507 | |
| 508 SDVLOG(2) << "Coalescing a pending nudge"; | |
| 509 pending_nudge_->session->Coalesce(*(job.session.get())); | |
| 510 pending_nudge_->scheduled_start = job.scheduled_start; | |
| 511 | |
| 512 // Unfortunately the nudge location cannot be modified. So it stores the | |
| 513 // location of the first caller. | |
| 514 } | |
| 515 | |
| 516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { | |
| 517 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 497 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 518 DCHECK(started_); | 498 DCHECK(started_); |
| 519 | 499 |
| 520 JobProcessDecision decision = DecideOnJob(job); | 500 JobProcessDecision decision = DecideOnJob(job); |
| 521 SDVLOG(2) << "Should run " | 501 SDVLOG(2) << "Should run " |
| 522 << SyncSessionJob::GetPurposeString(job.purpose) | 502 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 523 << " job in mode " << GetModeString(mode_) | 503 << " job " << job->session() |
| 504 << " in mode " << GetModeString(mode_) | |
| 524 << ": " << GetDecisionString(decision); | 505 << ": " << GetDecisionString(decision); |
| 525 if (decision != SAVE) | 506 if (decision != SAVE) |
|
rlarocque
2012/09/24 21:15:14
These lines have always bugged me. It's not clear
tim (not reviewing)
2012/10/08 00:20:03
Done.
| |
| 526 return decision == CONTINUE; | 507 return decision == CONTINUE; |
| 527 | 508 |
| 528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 509 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
| 529 SyncSessionJob::CONFIGURATION); | 510 if (is_nudge && pending_nudge_) { |
| 511 SDVLOG(2) << "Coalescing a pending nudge"; | |
| 512 // TODO(tim): This basically means we never use the more-careful coalescing | |
|
rlarocque
2012/09/24 21:15:14
This is a regression, right? How/Why did this hap
tim (not reviewing)
2012/10/08 00:20:03
It is, but I suspect it has been this way for quit
rlarocque
2012/10/08 19:18:04
Sounds good. I was confused for a bit there; I di
| |
| 513 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
| 514 // times, because we're calling this function first. Pull this out | |
| 515 // into a function to coalesce + set start times and reuse. | |
| 516 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | |
| 517 pending_nudge_->set_scheduled_start(job->scheduled_start()); | |
|
rlarocque
2012/09/24 21:15:14
When a user is actively using their browser, I exp
tim (not reviewing)
2012/10/08 00:20:03
Hmm. Afaik, this set_scheduled_start call doesn't
| |
| 518 return false; | |
| 519 } | |
| 530 | 520 |
| 531 SaveJob(job); | 521 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); |
|
rlarocque
2012/09/24 21:15:14
This makes me think ShouldRunJobSaveIfNecessary()
tim (not reviewing)
2012/10/08 00:20:03
Well, note the intended contract here is simpler t
| |
| 522 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
| 523 // This job should be made the new canary. | |
| 524 if (is_nudge) { | |
| 525 pending_nudge_ = job_to_save.get(); | |
| 526 } else { | |
| 527 SDVLOG(2) << "Saving a configuration job"; | |
| 528 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
| 529 DCHECK(!wait_interval_->pending_configure_job); | |
| 530 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
| 531 DCHECK(!job->config_params().ready_task.is_null()); | |
| 532 // The only nudge that could exist is a scheduled canary nudge. | |
| 533 DCHECK(!unscheduled_nudge_storage_.get()); | |
| 534 if (pending_nudge_) { | |
| 535 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
| 536 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); | |
|
rlarocque
2012/09/24 21:15:14
What effect do these two lines have?
tim (not reviewing)
2012/10/08 00:20:03
By "pre-empt" I mean cancel the pending nudge job
| |
| 537 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
| 538 } | |
| 539 wait_interval_->pending_configure_job = job_to_save.get(); | |
| 540 } | |
| 541 wait_interval_->length = | |
|
rlarocque
2012/09/24 21:15:14
Could this end up being zero or negative? What ef
tim (not reviewing)
2012/10/08 00:20:03
Hmph. Good catch - I had fixed this since I hit a
| |
| 542 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
| 543 RestartWaiting(job_to_save.Pass()); | |
| 544 return false; | |
| 545 } | |
| 546 | |
| 547 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
| 548 // when we're not in a WaitInterval. See bug 147736. | |
| 549 DCHECK(is_nudge); | |
| 550 // There may or may not be a pending_configure_job. Either way this nudge | |
| 551 // is unschedulable. | |
| 552 pending_nudge_ = job_to_save.get(); | |
| 553 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
| 532 return false; | 554 return false; |
| 533 } | 555 } |
| 534 | 556 |
| 535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { | |
| 536 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
| 537 if (job.purpose == SyncSessionJob::NUDGE) { | |
| 538 SDVLOG(2) << "Saving a nudge job"; | |
| 539 InitOrCoalescePendingJob(job); | |
| 540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | |
| 541 SDVLOG(2) << "Saving a configuration job"; | |
| 542 DCHECK(wait_interval_.get()); | |
| 543 DCHECK(mode_ == CONFIGURATION_MODE); | |
| 544 | |
| 545 // Config params should always get set. | |
| 546 DCHECK(!job.config_params.ready_task.is_null()); | |
| 547 SyncSession* old = job.session.get(); | |
| 548 SyncSession* s(new SyncSession(session_context_, this, old->source(), | |
| 549 old->routing_info(), old->workers())); | |
| 550 SyncSessionJob new_job(job.purpose, | |
| 551 TimeTicks::Now(), | |
| 552 make_linked_ptr(s), | |
| 553 false, | |
| 554 job.config_params, | |
| 555 job.from_here); | |
| 556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | |
| 557 } // drop the rest. | |
| 558 // TODO(sync): Is it okay to drop the rest? It's weird that | |
| 559 // SaveJob() only does what it says sometimes. (See | |
| 560 // http://crbug.com/90868.) | |
| 561 } | |
| 562 | |
| 563 // Functor for std::find_if to search by ModelSafeGroup. | 557 // Functor for std::find_if to search by ModelSafeGroup. |
| 564 struct ModelSafeWorkerGroupIs { | 558 struct ModelSafeWorkerGroupIs { |
| 565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 559 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
| 566 bool operator()(ModelSafeWorker* w) { | 560 bool operator()(ModelSafeWorker* w) { |
| 567 return group == w->GetModelSafeGroup(); | 561 return group == w->GetModelSafeGroup(); |
| 568 } | 562 } |
| 569 ModelSafeGroup group; | 563 ModelSafeGroup group; |
| 570 }; | 564 }; |
| 571 | 565 |
| 572 void SyncSchedulerImpl::ScheduleNudgeAsync( | 566 void SyncSchedulerImpl::ScheduleNudgeAsync( |
| 573 const TimeDelta& delay, | 567 const TimeDelta& delay, |
| 574 NudgeSource source, ModelTypeSet types, | 568 NudgeSource source, ModelTypeSet types, |
| 575 const tracked_objects::Location& nudge_location) { | 569 const tracked_objects::Location& nudge_location) { |
| 576 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 570 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 577 SDVLOG_LOC(nudge_location, 2) | 571 SDVLOG_LOC(nudge_location, 2) |
| 578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 572 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
| 579 << "source " << GetNudgeSourceString(source) << ", " | 573 << "source " << GetNudgeSourceString(source) << ", " |
| 580 << "types " << ModelTypeSetToString(types); | 574 << "types " << ModelTypeSetToString(types); |
| 581 | 575 |
| 582 ModelTypeStateMap type_state_map = | 576 ModelTypeStateMap type_state_map = |
| 583 ModelTypeSetToStateMap(types, std::string()); | 577 ModelTypeSetToStateMap(types, std::string()); |
| 584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 578 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
| 585 GetUpdatesFromNudgeSource(source), | 579 GetUpdatesFromNudgeSource(source), |
| 586 type_state_map, | 580 type_state_map, |
| 587 false, | |
| 588 nudge_location); | 581 nudge_location); |
| 589 } | 582 } |
| 590 | 583 |
| 591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 584 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
| 592 const TimeDelta& delay, | 585 const TimeDelta& delay, |
| 593 NudgeSource source, const ModelTypeStateMap& type_state_map, | 586 NudgeSource source, const ModelTypeStateMap& type_state_map, |
| 594 const tracked_objects::Location& nudge_location) { | 587 const tracked_objects::Location& nudge_location) { |
| 595 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 588 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 596 SDVLOG_LOC(nudge_location, 2) | 589 SDVLOG_LOC(nudge_location, 2) |
| 597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 590 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
| 598 << "source " << GetNudgeSourceString(source) << ", " | 591 << "source " << GetNudgeSourceString(source) << ", " |
| 599 << "payloads " | 592 << "payloads " |
| 600 << ModelTypeStateMapToString(type_state_map); | 593 << ModelTypeStateMapToString(type_state_map); |
| 601 | 594 |
| 602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 595 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
| 603 GetUpdatesFromNudgeSource(source), | 596 GetUpdatesFromNudgeSource(source), |
| 604 type_state_map, | 597 type_state_map, |
| 605 false, | |
| 606 nudge_location); | 598 nudge_location); |
| 607 } | 599 } |
| 608 | 600 |
| 609 void SyncSchedulerImpl::ScheduleNudgeImpl( | 601 void SyncSchedulerImpl::ScheduleNudgeImpl( |
| 610 const TimeDelta& delay, | 602 const TimeDelta& delay, |
| 611 GetUpdatesCallerInfo::GetUpdatesSource source, | 603 GetUpdatesCallerInfo::GetUpdatesSource source, |
| 612 const ModelTypeStateMap& type_state_map, | 604 const ModelTypeStateMap& type_state_map, |
| 613 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 605 const tracked_objects::Location& nudge_location) { |
| 614 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 606 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 615 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; | 607 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; |
| 616 | 608 |
| 617 SDVLOG_LOC(nudge_location, 2) | 609 SDVLOG_LOC(nudge_location, 2) |
| 618 << "In ScheduleNudgeImpl with delay " | 610 << "In ScheduleNudgeImpl with delay " |
| 619 << delay.InMilliseconds() << " ms, " | 611 << delay.InMilliseconds() << " ms, " |
| 620 << "source " << GetUpdatesSourceString(source) << ", " | 612 << "source " << GetUpdatesSourceString(source) << ", " |
| 621 << "payloads " | 613 << "payloads " |
| 622 << ModelTypeStateMapToString(type_state_map) | 614 << ModelTypeStateMapToString(type_state_map); |
| 623 << (is_canary_job ? " (canary)" : ""); | |
| 624 | 615 |
| 625 SyncSourceInfo info(source, type_state_map); | 616 SyncSourceInfo info(source, type_state_map); |
| 626 UpdateNudgeTimeRecords(info); | 617 UpdateNudgeTimeRecords(info); |
| 627 | 618 |
| 628 SyncSession* session(CreateSyncSession(info)); | 619 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
| 629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 620 SyncSessionJob::NUDGE, |
| 630 make_linked_ptr(session), is_canary_job, | 621 TimeTicks::Now() + delay, |
| 631 ConfigurationParams(), nudge_location); | 622 CreateSyncSession(info).Pass(), |
| 623 ConfigurationParams(), | |
| 624 nudge_location)); | |
| 632 | 625 |
| 633 session = NULL; | 626 if (!ShouldRunJobSaveIfNecessary(job.get())) |
| 634 if (!ShouldRunJob(job)) | |
| 635 return; | 627 return; |
| 636 | 628 |
| 637 if (pending_nudge_.get()) { | 629 if (pending_nudge_) { |
| 638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | |
| 639 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | |
| 640 return; | |
| 641 } | |
| 642 | |
| 643 SDVLOG(2) << "Coalescing pending nudge"; | |
| 644 pending_nudge_->session->Coalesce(*(job.session.get())); | |
| 645 | |
| 646 SDVLOG(2) << "Rescheduling pending nudge"; | 630 SDVLOG(2) << "Rescheduling pending nudge"; |
| 647 SyncSession* s = pending_nudge_->session.get(); | 631 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
| 648 job.session.reset(new SyncSession(s->context(), s->delegate(), | 632 // Choose the start time as the earliest of the 2. Note that this means |
| 649 s->source(), s->routing_info(), s->workers())); | 633 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
| 650 | 634 // but a nudge is already scheduled to go out, we'll send the (tab) commit |
| 651 // Choose the start time as the earliest of the 2. | 635 // without waiting. |
| 652 job.scheduled_start = std::min(job.scheduled_start, | 636 pending_nudge_->set_scheduled_start( |
| 653 pending_nudge_->scheduled_start); | 637 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
| 654 pending_nudge_.reset(); | 638 // Abandon the old task by cloning and replacing the session. |
| 639 // It's possible that by "rescheduling" we're actually taking a job that | |
| 640 // was previously unscheduled and giving it wings, so take care to reset | |
| 641 // unscheduled nudge storage. | |
| 642 job = pending_nudge_->CloneAndAbandon(); | |
| 643 unscheduled_nudge_storage_.reset(); | |
| 644 pending_nudge_ = NULL; | |
| 655 } | 645 } |
| 656 | 646 |
| 657 // TODO(zea): Consider adding separate throttling/backoff for datatype | 647 // TODO(zea): Consider adding separate throttling/backoff for datatype |
| 658 // refresh requests. | 648 // refresh requests. |
| 659 ScheduleSyncSessionJob(job); | 649 ScheduleSyncSessionJob(job.Pass()); |
| 660 } | 650 } |
| 661 | 651 |
| 662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 652 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
| 663 switch (mode) { | 653 switch (mode) { |
| 664 ENUM_CASE(CONFIGURATION_MODE); | 654 ENUM_CASE(CONFIGURATION_MODE); |
| 665 ENUM_CASE(NORMAL_MODE); | 655 ENUM_CASE(NORMAL_MODE); |
| 666 } | 656 } |
| 667 return ""; | 657 return ""; |
| 668 } | 658 } |
| 669 | 659 |
| 670 const char* SyncSchedulerImpl::GetDecisionString( | 660 const char* SyncSchedulerImpl::GetDecisionString( |
| 671 SyncSchedulerImpl::JobProcessDecision mode) { | 661 SyncSchedulerImpl::JobProcessDecision mode) { |
| 672 switch (mode) { | 662 switch (mode) { |
| 673 ENUM_CASE(CONTINUE); | 663 ENUM_CASE(CONTINUE); |
| 674 ENUM_CASE(SAVE); | 664 ENUM_CASE(SAVE); |
| 675 ENUM_CASE(DROP); | 665 ENUM_CASE(DROP); |
| 676 } | 666 } |
| 677 return ""; | 667 return ""; |
| 678 } | 668 } |
| 679 | 669 |
| 680 // static | |
| 681 void SyncSchedulerImpl::SetSyncerStepsForPurpose( | |
| 682 SyncSessionJob::SyncSessionJobPurpose purpose, | |
| 683 SyncerStep* start, | |
| 684 SyncerStep* end) { | |
| 685 switch (purpose) { | |
| 686 case SyncSessionJob::CONFIGURATION: | |
| 687 *start = DOWNLOAD_UPDATES; | |
| 688 *end = APPLY_UPDATES; | |
| 689 return; | |
| 690 case SyncSessionJob::NUDGE: | |
| 691 case SyncSessionJob::POLL: | |
| 692 *start = SYNCER_BEGIN; | |
| 693 *end = SYNCER_END; | |
| 694 return; | |
| 695 default: | |
| 696 NOTREACHED(); | |
| 697 *start = SYNCER_END; | |
| 698 *end = SYNCER_END; | |
| 699 return; | |
| 700 } | |
| 701 } | |
| 702 | |
| 703 void SyncSchedulerImpl::PostTask( | 670 void SyncSchedulerImpl::PostTask( |
| 704 const tracked_objects::Location& from_here, | 671 const tracked_objects::Location& from_here, |
| 705 const char* name, const base::Closure& task) { | 672 const char* name, const base::Closure& task) { |
| 706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 673 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
| 707 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 674 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 708 if (!started_) { | 675 if (!started_) { |
| 709 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 676 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
| 710 return; | 677 return; |
| 711 } | 678 } |
| 712 sync_loop_->PostTask(from_here, task); | 679 sync_loop_->PostTask(from_here, task); |
| 713 } | 680 } |
| 714 | 681 |
| 715 void SyncSchedulerImpl::PostDelayedTask( | 682 void SyncSchedulerImpl::PostDelayedTask( |
| 716 const tracked_objects::Location& from_here, | 683 const tracked_objects::Location& from_here, |
| 717 const char* name, const base::Closure& task, base::TimeDelta delay) { | 684 const char* name, const base::Closure& task, base::TimeDelta delay) { |
| 718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 685 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
| 719 << delay.InMilliseconds() << " ms delay"; | 686 << delay.InMilliseconds() << " ms delay"; |
| 720 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 687 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 721 if (!started_) { | 688 if (!started_) { |
| 722 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 689 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
| 723 return; | 690 return; |
| 724 } | 691 } |
| 725 sync_loop_->PostDelayedTask(from_here, task, delay); | 692 sync_loop_->PostDelayedTask(from_here, task, delay); |
| 726 } | 693 } |
| 727 | 694 |
| 728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { | 695 void SyncSchedulerImpl::ScheduleSyncSessionJob( |
| 696 scoped_ptr<SyncSessionJob> job) { | |
| 729 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 697 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 730 if (no_scheduling_allowed_) { | 698 if (no_scheduling_allowed_) { |
| 731 NOTREACHED() << "Illegal to schedule job while session in progress."; | 699 NOTREACHED() << "Illegal to schedule job while session in progress."; |
| 732 return; | 700 return; |
| 733 } | 701 } |
| 734 | 702 |
| 735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); | 703 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); |
| 736 if (delay < TimeDelta::FromMilliseconds(0)) | 704 if (delay < TimeDelta::FromMilliseconds(0)) |
| 737 delay = TimeDelta::FromMilliseconds(0); | 705 delay = TimeDelta::FromMilliseconds(0); |
| 738 SDVLOG_LOC(job.from_here, 2) | 706 SDVLOG_LOC(job->from_location(), 2) |
| 739 << "In ScheduleSyncSessionJob with " | 707 << "In ScheduleSyncSessionJob with " |
| 740 << SyncSessionJob::GetPurposeString(job.purpose) | 708 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 741 << " job and " << delay.InMilliseconds() << " ms delay"; | 709 << " job and " << delay.InMilliseconds() << " ms delay"; |
| 742 | 710 |
| 743 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 711 DCHECK(job->purpose() == SyncSessionJob::NUDGE || |
| 744 job.purpose == SyncSessionJob::POLL); | 712 job->purpose() == SyncSessionJob::POLL); |
| 745 if (job.purpose == SyncSessionJob::NUDGE) { | 713 if (job->purpose() == SyncSessionJob::NUDGE) { |
| 746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; | 714 SDVLOG_LOC(job->from_location(), 2) << "Resetting pending_nudge to "; |
| 747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == | 715 DCHECK(!pending_nudge_ || pending_nudge_->session() == |
| 748 job.session); | 716 job->session()); |
| 749 pending_nudge_.reset(new SyncSessionJob(job)); | 717 pending_nudge_ = job.get(); |
| 750 } | 718 } |
| 751 PostDelayedTask(job.from_here, "DoSyncSessionJob", | 719 |
| 752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, | 720 tracked_objects::Location loc(job->from_location()); |
| 753 weak_ptr_factory_.GetWeakPtr(), | 721 PostDelayedTask(loc, "DoSyncSessionJob", |
|
akalin
2012/09/25 22:37:24
why not just inline job->from_location() here? Or
tim (not reviewing)
2012/10/08 00:20:03
:( Because base::Passed takes ownership of the job
| |
| 754 job), | 722 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
| 755 delay); | 723 weak_ptr_factory_.GetWeakPtr(), |
| 724 base::Passed(&job)), | |
| 725 delay); | |
| 756 } | 726 } |
| 757 | 727 |
| 758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { | 728 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
| 759 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 729 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 730 if (job->purpose() == SyncSessionJob::NUDGE) { | |
|
rlarocque
2012/09/24 21:15:14
I get the feeling that there's not much point in s
tim (not reviewing)
2012/10/08 00:20:03
There's still a chunk of common code, but also, fo
rlarocque
2012/10/08 19:18:04
I see your point, but I think the call sites shoul
tim (not reviewing)
2012/10/11 17:35:14
I'm not convinced we should do this yet. I still
rlarocque
2012/10/11 18:15:08
I think it would be easier to define a function "D
| |
| 731 if (pending_nudge_ == NULL || | |
| 732 pending_nudge_->session() != job->session()) { | |
| 733 // |job| is abandoned. | |
| 734 SDVLOG(2) << "Dropping a nudge in " | |
| 735 << "DoSyncSessionJob because another nudge was scheduled"; | |
| 736 return false; | |
| 737 } | |
| 738 pending_nudge_ = NULL; | |
| 739 | |
| 740 // Rebase the session with the latest model safe table and use it to purge | |
| 741 // and update any disabled or modified entries in the job. | |
| 742 job->mutable_session()->RebaseRoutingInfoWithLatest( | |
| 743 session_context_->routing_info(), session_context_->workers()); | |
| 744 } | |
| 760 | 745 |
| 761 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 746 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 762 if (!ShouldRunJob(job)) { | 747 GetUpdatesCallerInfo::GetUpdatesSource source( |
| 748 job->session()->source().updates_source); | |
| 749 if (!ShouldRunJobSaveIfNecessary(job.get())) { | |
| 763 SLOG(WARNING) | 750 SLOG(WARNING) |
| 764 << "Not executing " | 751 << "Not executing " |
| 765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " | 752 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from " |
| 766 << GetUpdatesSourceString(job.session->source().updates_source); | 753 << GetUpdatesSourceString(source); |
| 767 return; | 754 return false; |
| 768 } | 755 } |
| 769 | 756 |
| 770 if (job.purpose == SyncSessionJob::NUDGE) { | |
| 771 if (pending_nudge_.get() == NULL || | |
| 772 pending_nudge_->session != job.session) { | |
| 773 SDVLOG(2) << "Dropping a nudge in " | |
| 774 << "DoSyncSessionJob because another nudge was scheduled"; | |
| 775 return; // Another nudge must have been scheduled in in the meantime. | |
| 776 } | |
| 777 pending_nudge_.reset(); | |
| 778 | |
| 779 // Create the session with the latest model safe table and use it to purge | |
| 780 // and update any disabled or modified entries in the job. | |
| 781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | |
| 782 | |
| 783 job.session->RebaseRoutingInfoWithLatest(*session); | |
| 784 } | |
| 785 SDVLOG(2) << "DoSyncSessionJob with " | 757 SDVLOG(2) << "DoSyncSessionJob with " |
| 786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; | 758 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; |
| 787 | |
| 788 SyncerStep begin(SYNCER_END); | |
| 789 SyncerStep end(SYNCER_END); | |
| 790 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | |
| 791 | 759 |
| 792 bool has_more_to_sync = true; | 760 bool has_more_to_sync = true; |
| 793 while (ShouldRunJob(job) && has_more_to_sync) { | 761 bool premature_exit = false; |
| 762 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) { | |
| 794 SDVLOG(2) << "Calling SyncShare."; | 763 SDVLOG(2) << "Calling SyncShare."; |
| 795 // Synchronously perform the sync session from this thread. | 764 // Synchronously perform the sync session from this thread. |
| 796 syncer_->SyncShare(job.session.get(), begin, end); | 765 premature_exit = !syncer_->SyncShare(job->mutable_session(), |
| 797 has_more_to_sync = job.session->HasMoreToSync(); | 766 job->start_step(), |
| 767 job->end_step()); | |
| 768 | |
| 769 has_more_to_sync = job->session()->HasMoreToSync(); | |
| 798 if (has_more_to_sync) | 770 if (has_more_to_sync) |
| 799 job.session->PrepareForAnotherSyncCycle(); | 771 job->mutable_session()->PrepareForAnotherSyncCycle(); |
| 800 } | 772 } |
| 801 SDVLOG(2) << "Done SyncShare looping."; | 773 SDVLOG(2) << "Done SyncShare looping."; |
| 802 | 774 |
| 803 FinishSyncSessionJob(job); | 775 FinishSyncSessionJob(job.get(), premature_exit); |
|
akalin
2012/10/03 00:11:34
can you make SyncSessionJob::Finish() return the v
tim (not reviewing)
2012/10/08 00:20:03
Yeah, that works! Done.
| |
| 776 return job->Succeeded(); | |
| 804 } | 777 } |
| 805 | 778 |
| 806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 779 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
| 807 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 780 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 808 | 781 |
| 809 // We are interested in recording time between local nudges for datatypes. | 782 // We are interested in recording time between local nudges for datatypes. |
| 810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 783 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
| 811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 784 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
| 812 return; | 785 return; |
| 813 | 786 |
| 814 base::TimeTicks now = TimeTicks::Now(); | 787 base::TimeTicks now = TimeTicks::Now(); |
| 815 // Update timing information for how often datatypes are triggering nudges. | 788 // Update timing information for how often datatypes are triggering nudges. |
| 816 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); | 789 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); |
| 817 iter != info.types.end(); | 790 iter != info.types.end(); |
| 818 ++iter) { | 791 ++iter) { |
| 819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 792 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
| 820 last_local_nudges_by_model_type_[iter->first] = now; | 793 last_local_nudges_by_model_type_[iter->first] = now; |
| 821 if (previous.is_null()) | 794 if (previous.is_null()) |
| 822 continue; | 795 continue; |
| 823 | 796 |
| 824 #define PER_DATA_TYPE_MACRO(type_str) \ | 797 #define PER_DATA_TYPE_MACRO(type_str) \ |
| 825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 798 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
| 826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 799 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
| 827 #undef PER_DATA_TYPE_MACRO | 800 #undef PER_DATA_TYPE_MACRO |
| 828 } | 801 } |
| 829 } | 802 } |
| 830 | 803 |
| 831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { | 804 void SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, |
| 805 bool exited_prematurely) { | |
| 832 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 806 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 833 // Now update the status of the connection from SCM. We need this to decide | 807 // Now update the status of the connection from SCM. We need this to decide |
| 834 // whether we need to save/run future jobs. The notifications from SCM are not | 808 // whether we need to save/run future jobs. The notifications from SCM are |
| 835 // reliable. | 809 // not reliable. |
| 836 // | 810 // |
| 837 // TODO(rlarocque): crbug.com/110954 | 811 // TODO(rlarocque): crbug.com/110954 |
| 838 // We should get rid of the notifications and it is probably not needed to | 812 // We should get rid of the notifications and it is probably not needed to |
| 839 // maintain this status variable in 2 places. We should query it directly from | 813 // maintain this status variable in 2 places. We should query it directly |
| 840 // SCM when needed. | 814 // from SCM when needed. |
| 841 ServerConnectionManager* scm = session_context_->connection_manager(); | 815 ServerConnectionManager* scm = session_context_->connection_manager(); |
| 842 UpdateServerConnectionManagerStatus(scm->server_status()); | 816 UpdateServerConnectionManagerStatus(scm->server_status()); |
| 843 | 817 |
| 844 if (IsSyncingCurrentlySilenced()) { | 818 if (IsSyncingCurrentlySilenced()) { |
| 845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 819 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
| 846 // TODO(sync): Investigate whether we need to check job.purpose | 820 // If we're here, it's because |job| was silenced until a server specified |
| 847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | 821 // time. (Note, it had to be |job|, because DecideOnJob would not permit |
| 848 SaveJob(job); | 822 // any job through while in WaitInterval::THROTTLED). |
| 849 return; // Nothing to do. | 823 scoped_ptr<SyncSessionJob> clone = job->CloneAndAbandon(); |
|
akalin
2012/10/03 00:11:34
Assume you implement my comment above re. passing
tim (not reviewing)
2012/10/08 00:20:03
Interesting point. In general the logic is if you
| |
| 850 } else if (job.session->Succeeded() && | 824 if (job->purpose() == SyncSessionJob::NUDGE) |
| 851 !job.config_params.ready_task.is_null()) { | 825 pending_nudge_ = clone.get(); |
| 852 // If this was a configuration job with a ready task, invoke it now that | 826 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
| 853 // we finished successfully. | 827 wait_interval_->pending_configure_job = clone.get(); |
| 828 else | |
| 829 clone.reset(); // Unthrottling is enough, no need to force a canary. | |
| 830 | |
| 831 RestartWaiting(clone.Pass()); | |
| 832 return; | |
| 833 } | |
| 834 | |
| 835 // Let job know that we're through syncing (calling SyncShare) at this point. | |
| 836 { | |
| 854 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 837 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 855 job.config_params.ready_task.Run(); | 838 job->Finish(exited_prematurely); |
| 856 } | 839 } |
| 857 | 840 |
| 858 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 841 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
| 859 ScheduleNextSync(job); | 842 ScheduleNextSync(job); |
| 860 } | 843 } |
| 861 | 844 |
| 862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { | 845 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob* old_job) { |
| 863 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 846 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 864 DCHECK(!old_job.session->HasMoreToSync()); | 847 DCHECK(!old_job->session()->HasMoreToSync()); |
| 865 | 848 |
| 866 AdjustPolling(&old_job); | 849 AdjustPolling(old_job); |
| 867 | 850 |
| 868 if (old_job.session->Succeeded()) { | 851 if (old_job->Succeeded()) { |
| 869 // Only reset backoff if we actually reached the server. | 852 // Only reset backoff if we actually reached the server. |
| 870 if (old_job.session->SuccessfullyReachedServer()) | 853 // It's possible that we reached the server on one attempt, then had an |
| 854 // error on the next (or didn't perform some of the server-communicating | |
| 855 // commands). We want to verify that, for all commands attempted, we | |
| 856 // successfully spoke with the server. Therefore, we verify no errors | |
| 857 // and at least one SYNCER_OK. | |
| 858 if (old_job->session()->DidReachServer()) | |
| 871 wait_interval_.reset(); | 859 wait_interval_.reset(); |
| 872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 860 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
| 873 return; | 861 return; |
| 874 } | 862 } |
| 875 | 863 |
| 876 if (old_job.purpose == SyncSessionJob::POLL) { | 864 if (old_job->purpose() == SyncSessionJob::POLL) { |
| 877 return; // We don't retry POLL jobs. | 865 return; // We don't retry POLL jobs. |
| 878 } | 866 } |
| 879 | 867 |
| 880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 868 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
| 881 // if we don't succeed. Some types of errors are not likely to disappear on | 869 // if we don't succeed. Some types of errors are not likely to disappear on |
| 882 // their own. With the return values now available in the old_job.session, we | 870 // their own. With the return values now available in the old_job.session, |
| 883 // should be able to detect such errors and only retry when we detect | 871 // we should be able to detect such errors and only retry when we detect |
| 884 // transient errors. | 872 // transient errors. |
| 885 | 873 |
| 886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 874 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
| 887 mode_ == NORMAL_MODE) { | 875 mode_ == NORMAL_MODE) { |
| 888 // When in normal mode, we allow up to one nudge per backoff interval. It | 876 // When in normal mode, we allow up to one nudge per backoff interval. It |
| 889 // appears that this was our nudge for this interval, and it failed. | 877 // appears that this was our nudge for this interval, and it failed. |
| 890 // | 878 // |
| 891 // Note: This does not prevent us from running canary jobs. For example, an | 879 // Note: This does not prevent us from running canary jobs. For example, |
| 892 // IP address change might still result in another nudge being executed | 880 // an IP address change might still result in another nudge being executed |
| 893 // during this backoff interval. | 881 // during this backoff interval. |
| 894 SDVLOG(2) << "A nudge during backoff failed"; | 882 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
| 895 | 883 DCHECK_EQ(SyncSessionJob::NUDGE, old_job->purpose()); |
| 896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
| 897 DCHECK(!wait_interval_->had_nudge); | 884 DCHECK(!wait_interval_->had_nudge); |
| 898 | 885 |
| 899 wait_interval_->had_nudge = true; | 886 wait_interval_->had_nudge = true; |
| 900 InitOrCoalescePendingJob(old_job); | 887 DCHECK(!pending_nudge_); |
| 901 RestartWaiting(); | 888 |
| 889 scoped_ptr<SyncSessionJob> new_job = old_job->Clone(); | |
|
akalin
2012/10/03 00:11:34
same question here -- can you pass ownership to th
tim (not reviewing)
2012/10/08 00:20:03
I did change this around a bit, see my response ab
| |
| 890 pending_nudge_ = new_job.get(); | |
| 891 RestartWaiting(new_job.Pass()); | |
| 902 } else { | 892 } else { |
| 903 // Either this is the first failure or a consecutive failure after our | 893 // Either this is the first failure or a consecutive failure after our |
| 904 // backoff timer expired. We handle it the same way in either case. | 894 // backoff timer expired. We handle it the same way in either case. |
| 905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 895 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
| 906 HandleContinuationError(old_job); | 896 HandleContinuationError(old_job); |
| 907 } | 897 } |
| 908 } | 898 } |
| 909 | 899 |
| 910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 900 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
| 911 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 901 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 912 | 902 |
| 913 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 903 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
| 914 syncer_short_poll_interval_seconds_ : | 904 syncer_short_poll_interval_seconds_ : |
| 915 syncer_long_poll_interval_seconds_; | 905 syncer_long_poll_interval_seconds_; |
| 916 bool rate_changed = !poll_timer_.IsRunning() || | 906 bool rate_changed = !poll_timer_.IsRunning() || |
| 917 poll != poll_timer_.GetCurrentDelay(); | 907 poll != poll_timer_.GetCurrentDelay(); |
| 918 | 908 |
| 919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | 909 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
| 920 poll_timer_.Reset(); | 910 poll_timer_.Reset(); |
| 921 | 911 |
| 922 if (!rate_changed) | 912 if (!rate_changed) |
| 923 return; | 913 return; |
| 924 | 914 |
| 925 // Adjust poll rate. | 915 // Adjust poll rate. |
| 926 poll_timer_.Stop(); | 916 poll_timer_.Stop(); |
| 927 poll_timer_.Start(FROM_HERE, poll, this, | 917 poll_timer_.Start(FROM_HERE, poll, this, |
| 928 &SyncSchedulerImpl::PollTimerCallback); | 918 &SyncSchedulerImpl::PollTimerCallback); |
| 929 } | 919 } |
| 930 | 920 |
| 931 void SyncSchedulerImpl::RestartWaiting() { | 921 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
| 932 CHECK(wait_interval_.get()); | 922 CHECK(wait_interval_.get()); |
| 933 wait_interval_->timer.Stop(); | 923 wait_interval_->timer.Stop(); |
| 934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 924 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| 935 this, &SyncSchedulerImpl::DoCanaryJob); | 925 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
| 926 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
| 927 weak_ptr_factory_.GetWeakPtr(), | |
| 928 base::Passed(&job))); | |
| 929 } else { | |
| 930 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
| 931 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | |
| 932 weak_ptr_factory_.GetWeakPtr(), | |
| 933 base::Passed(&job))); | |
| 934 } | |
| 936 } | 935 } |
| 937 | 936 |
| 938 void SyncSchedulerImpl::HandleContinuationError( | 937 void SyncSchedulerImpl::HandleContinuationError( |
| 939 const SyncSessionJob& old_job) { | 938 const SyncSessionJob* old_job) { |
|
akalin
2012/10/03 00:11:34
similar to the above, can you pass ownership to th
tim (not reviewing)
2012/10/08 00:20:03
I'm going with the logic that a job has a well def
| |
| 940 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 939 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 941 if (DCHECK_IS_ON()) { | 940 if (DCHECK_IS_ON()) { |
| 942 if (IsBackingOff()) { | 941 if (IsBackingOff()) { |
| 943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 942 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); |
| 944 } | 943 } |
| 945 } | 944 } |
| 946 | 945 |
| 947 TimeDelta length = delay_provider_->GetDelay( | 946 TimeDelta length = delay_provider_->GetDelay( |
| 948 IsBackingOff() ? wait_interval_->length : | 947 IsBackingOff() ? wait_interval_->length : |
| 949 delay_provider_->GetInitialDelay( | 948 delay_provider_->GetInitialDelay( |
| 950 old_job.session->status_controller().model_neutral_state())); | 949 old_job->session()->status_controller().model_neutral_state())); |
| 951 | 950 |
| 952 SDVLOG(2) << "In handle continuation error with " | 951 SDVLOG(2) << "In handle continuation error with " |
| 953 << SyncSessionJob::GetPurposeString(old_job.purpose) | 952 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
| 954 << " job. The time delta(ms) is " | 953 << " job. The time delta(ms) is " |
| 955 << length.InMilliseconds(); | 954 << length.InMilliseconds(); |
| 956 | 955 |
| 957 // This will reset the had_nudge variable as well. | 956 // This will reset the had_nudge variable as well. |
| 958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 957 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| 959 length)); | 958 length)); |
| 960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 959 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); |
| 960 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
| 961 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
| 961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 962 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
| 962 // Config params should always get set. | 963 // Config params should always get set. |
| 963 DCHECK(!old_job.config_params.ready_task.is_null()); | 964 DCHECK(!old_job->config_params().ready_task.is_null()); |
| 964 SyncSession* old = old_job.session.get(); | 965 wait_interval_->pending_configure_job = new_job.get(); |
| 965 SyncSession* s(new SyncSession(session_context_, this, | |
| 966 old->source(), old->routing_info(), old->workers())); | |
| 967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | |
| 968 make_linked_ptr(s), false, old_job.config_params, | |
| 969 FROM_HERE); | |
| 970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | |
| 971 } else { | 966 } else { |
| 972 // We are not in configuration mode. So wait_interval's pending job | 967 // We are not in configuration mode. So wait_interval's pending job |
| 973 // should be null. | 968 // should be null. |
| 974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 969 DCHECK(wait_interval_->pending_configure_job == NULL); |
| 970 DCHECK(!pending_nudge_); | |
| 971 pending_nudge_ = new_job.get(); | |
| 972 } | |
| 975 | 973 |
| 976 // TODO(lipalani) - handle clear user data. | 974 RestartWaiting(new_job.Pass()); |
| 977 InitOrCoalescePendingJob(old_job); | |
| 978 } | |
| 979 RestartWaiting(); | |
| 980 } | 975 } |
| 981 | 976 |
| 982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 977 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
| 983 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 978 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
| 984 DCHECK(weak_handle_this_.IsInitialized()); | 979 DCHECK(weak_handle_this_.IsInitialized()); |
| 985 SDVLOG(3) << "Posting StopImpl"; | 980 SDVLOG(3) << "Posting StopImpl"; |
| 986 weak_handle_this_.Call(FROM_HERE, | 981 weak_handle_this_.Call(FROM_HERE, |
| 987 &SyncSchedulerImpl::StopImpl, | 982 &SyncSchedulerImpl::StopImpl, |
| 988 callback); | 983 callback); |
| 989 } | 984 } |
| 990 | 985 |
| 991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 986 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
| 992 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 987 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 993 SDVLOG(2) << "StopImpl called"; | 988 SDVLOG(2) << "StopImpl called"; |
| 994 | 989 |
| 995 // Kill any in-flight method calls. | 990 // Kill any in-flight method calls. |
| 996 weak_ptr_factory_.InvalidateWeakPtrs(); | 991 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 997 wait_interval_.reset(); | 992 wait_interval_.reset(); |
| 998 poll_timer_.Stop(); | 993 poll_timer_.Stop(); |
| 999 if (started_) { | 994 if (started_) { |
| 1000 started_ = false; | 995 started_ = false; |
| 1001 } | 996 } |
| 1002 if (!callback.is_null()) | 997 if (!callback.is_null()) |
| 1003 callback.Run(); | 998 callback.Run(); |
| 1004 } | 999 } |
| 1005 | 1000 |
| 1006 void SyncSchedulerImpl::DoCanaryJob() { | 1001 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
| 1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1002 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1008 SDVLOG(2) << "Do canary job"; | 1003 SDVLOG(2) << "Do canary job"; |
| 1009 DoPendingJobIfPossible(true); | 1004 |
| 1005 // Only set canary privlieges here, when we are about to run the job. This | |
|
akalin
2012/09/25 22:37:24
privileges
tim (not reviewing)
2012/10/08 00:20:03
Done.
| |
| 1006 // avoids confusion in managing canary bits during scheduling, when you | |
| 1007 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that | |
| 1008 // was scheduled as canary, and send it to an "unscheduled" state. | |
| 1009 to_be_canary->GrantCanaryPrivilege(); | |
| 1010 | |
| 1011 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { | |
| 1012 // TODO(tim): We should be able to remove this... | |
| 1013 scoped_ptr<SyncSession> temp = CreateSyncSession( | |
| 1014 to_be_canary->session()->source()).Pass(); | |
| 1015 // The routing info might have been changed since we cached the | |
| 1016 // pending nudge. Update it by coalescing to the latest. | |
| 1017 to_be_canary->mutable_session()->Coalesce(*(temp)); | |
| 1018 } | |
| 1019 DoSyncSessionJob(to_be_canary.Pass()); | |
| 1010 } | 1020 } |
| 1011 | 1021 |
| 1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { | 1022 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
| 1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1023 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1014 SyncSessionJob* job_to_execute = NULL; | 1024 // If we find a scheduled pending_ job, abandon the old one and return a |
| 1025 // a clone. If unscheduled, just hand over ownership. | |
| 1026 scoped_ptr<SyncSessionJob> candidate; | |
| 1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1027 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
| 1016 && wait_interval_->pending_configure_job.get()) { | 1028 && wait_interval_->pending_configure_job) { |
| 1017 SDVLOG(2) << "Found pending configure job"; | 1029 SDVLOG(2) << "Found pending configure job"; |
| 1018 job_to_execute = wait_interval_->pending_configure_job.get(); | 1030 candidate = |
| 1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 1031 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); |
| 1032 wait_interval_->pending_configure_job = candidate.get(); | |
| 1033 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
| 1020 SDVLOG(2) << "Found pending nudge job"; | 1034 SDVLOG(2) << "Found pending nudge job"; |
| 1021 | 1035 candidate = pending_nudge_->CloneAndAbandon(); |
| 1022 scoped_ptr<SyncSession> session(CreateSyncSession( | 1036 pending_nudge_ = candidate.get(); |
| 1023 pending_nudge_->session->source())); | 1037 unscheduled_nudge_storage_.reset(); |
| 1024 | |
| 1025 // Also the routing info might have been changed since we cached the | |
| 1026 // pending nudge. Update it by coalescing to the latest. | |
| 1027 pending_nudge_->session->Coalesce(*(session.get())); | |
| 1028 // The pending nudge would be cleared in the DoSyncSessionJob function. | |
| 1029 job_to_execute = pending_nudge_.get(); | |
| 1030 } | 1038 } |
| 1031 | 1039 return candidate.Pass(); |
| 1032 if (job_to_execute != NULL) { | |
| 1033 SDVLOG(2) << "Executing pending job"; | |
| 1034 SyncSessionJob copy = *job_to_execute; | |
| 1035 copy.is_canary_job = is_canary_job; | |
| 1036 DoSyncSessionJob(copy); | |
| 1037 } | |
| 1038 } | 1040 } |
| 1039 | 1041 |
| 1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( | 1042 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( |
| 1041 const SyncSourceInfo& source) { | 1043 const SyncSourceInfo& source) { |
| 1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1044 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1043 DVLOG(2) << "Creating sync session with routes " | 1045 DVLOG(2) << "Creating sync session with routes " |
| 1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1046 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
| 1045 | 1047 |
| 1046 SyncSourceInfo info(source); | 1048 SyncSourceInfo info(source); |
| 1047 SyncSession* session(new SyncSession(session_context_, this, info, | 1049 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, |
| 1048 session_context_->routing_info(), session_context_->workers())); | 1050 session_context_->routing_info(), session_context_->workers())); |
| 1049 | |
| 1050 return session; | |
| 1051 } | 1051 } |
| 1052 | 1052 |
| 1053 void SyncSchedulerImpl::PollTimerCallback() { | 1053 void SyncSchedulerImpl::PollTimerCallback() { |
| 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1055 ModelSafeRoutingInfo r; | 1055 ModelSafeRoutingInfo r; |
| 1056 ModelTypeStateMap type_state_map = | 1056 ModelTypeStateMap type_state_map = |
| 1057 ModelSafeRoutingInfoToStateMap(r, std::string()); | 1057 ModelSafeRoutingInfoToStateMap(r, std::string()); |
| 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); | 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); |
| 1059 SyncSession* s = CreateSyncSession(info); | 1059 scoped_ptr<SyncSession> s(CreateSyncSession(info)); |
| 1060 | 1060 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
| 1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1061 TimeTicks::Now(), |
| 1062 make_linked_ptr(s), | 1062 s.Pass(), |
| 1063 false, | 1063 ConfigurationParams(), |
| 1064 ConfigurationParams(), | 1064 FROM_HERE)); |
| 1065 FROM_HERE); | 1065 ScheduleSyncSessionJob(job.Pass()); |
| 1066 | |
| 1067 ScheduleSyncSessionJob(job); | |
| 1068 } | 1066 } |
| 1069 | 1067 |
| 1070 void SyncSchedulerImpl::Unthrottle() { | 1068 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
| 1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1069 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1070 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| 1073 SDVLOG(2) << "Unthrottled."; | 1071 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
| 1074 DoCanaryJob(); | 1072 << "canary."; |
| 1073 if (to_be_canary.get()) | |
| 1074 DoCanaryJob(to_be_canary.Pass()); | |
| 1075 | |
| 1076 // TODO(tim): ?! This must have been broken. The way DecideOnJob works today | |
|
rlarocque
2012/09/24 21:15:14
Agreed, but this comment won't make much sense oth
tim (not reviewing)
2012/10/08 00:20:03
Ah, right. I had left this as a comment for mysel
rlarocque
2012/10/08 19:18:04
On second thought, this function resets the wait_i
tim (not reviewing)
2012/10/11 17:35:14
You're right. It should mean we're good to go on t
| |
| 1077 // canary privileges aren't enough to bypass a THROTTLED wait interval, which | |
| 1078 // would suggest we need to reset first (though trusting canary in Decide is | |
| 1079 // probably the "right" thing to do). | |
| 1075 wait_interval_.reset(); | 1080 wait_interval_.reset(); |
| 1076 } | 1081 } |
| 1077 | 1082 |
| 1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1083 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
| 1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1085 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
| 1081 } | 1086 } |
| 1082 | 1087 |
| 1083 bool SyncSchedulerImpl::IsBackingOff() const { | 1088 bool SyncSchedulerImpl::IsBackingOff() const { |
| 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1089 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1085 return wait_interval_.get() && wait_interval_->mode == | 1090 return wait_interval_.get() && wait_interval_->mode == |
| 1086 WaitInterval::EXPONENTIAL_BACKOFF; | 1091 WaitInterval::EXPONENTIAL_BACKOFF; |
| 1087 } | 1092 } |
| 1088 | 1093 |
| 1089 void SyncSchedulerImpl::OnSilencedUntil( | 1094 void SyncSchedulerImpl::OnSilencedUntil( |
| 1090 const base::TimeTicks& silenced_until) { | 1095 const base::TimeTicks& silenced_until) { |
| 1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1096 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1097 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
| 1093 silenced_until - TimeTicks::Now())); | 1098 silenced_until - TimeTicks::Now())); |
| 1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, | |
| 1095 &SyncSchedulerImpl::Unthrottle); | |
| 1096 } | 1099 } |
| 1097 | 1100 |
| 1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1101 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
| 1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1102 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1100 return wait_interval_.get() && wait_interval_->mode == | 1103 return wait_interval_.get() && wait_interval_->mode == |
| 1101 WaitInterval::THROTTLED; | 1104 WaitInterval::THROTTLED; |
| 1102 } | 1105 } |
| 1103 | 1106 |
| 1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1107 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
| 1105 const base::TimeDelta& new_interval) { | 1108 const base::TimeDelta& new_interval) { |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1159 | 1162 |
| 1160 #undef SDVLOG_LOC | 1163 #undef SDVLOG_LOC |
| 1161 | 1164 |
| 1162 #undef SDVLOG | 1165 #undef SDVLOG |
| 1163 | 1166 |
| 1164 #undef SLOG | 1167 #undef SLOG |
| 1165 | 1168 |
| 1166 #undef ENUM_CASE | 1169 #undef ENUM_CASE |
| 1167 | 1170 |
| 1168 } // namespace syncer | 1171 } // namespace syncer |
| OLD | NEW |