Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <cstring> | 8 #include <cstring> |
| 9 | 9 |
| 10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 77 : source(source), | 77 : source(source), |
| 78 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
| 79 routing_info(routing_info), | 79 routing_info(routing_info), |
| 80 ready_task(ready_task) { | 80 ready_task(ready_task) { |
| 81 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
| 82 } | 82 } |
| 83 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
| 84 | 84 |
| 85 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
| 86 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
| 87 had_nudge(false), | 87 had_nudge(false) {} |
| 88 pending_configure_job(NULL) {} | |
| 89 | 88 |
| 90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | 89 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
| 91 : mode(mode), had_nudge(false), length(length), | 90 : mode(mode), had_nudge(false), length(length) {} |
| 92 pending_configure_job(NULL) {} | |
| 93 | 91 |
| 94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 92 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
| 95 | 93 |
| 96 #define ENUM_CASE(x) case x: return #x; break; | 94 #define ENUM_CASE(x) case x: return #x; break; |
| 97 | 95 |
| 98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 96 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
| 99 switch (mode) { | 97 switch (mode) { |
| 100 ENUM_CASE(UNKNOWN); | 98 ENUM_CASE(UNKNOWN); |
| 101 ENUM_CASE(EXPONENTIAL_BACKOFF); | 99 ENUM_CASE(EXPONENTIAL_BACKOFF); |
| 102 ENUM_CASE(THROTTLED); | 100 ENUM_CASE(THROTTLED); |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 162 name_(name), | 160 name_(name), |
| 163 sync_loop_(MessageLoop::current()), | 161 sync_loop_(MessageLoop::current()), |
| 164 started_(false), | 162 started_(false), |
| 165 syncer_short_poll_interval_seconds_( | 163 syncer_short_poll_interval_seconds_( |
| 166 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 164 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
| 167 syncer_long_poll_interval_seconds_( | 165 syncer_long_poll_interval_seconds_( |
| 168 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 166 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
| 169 sessions_commit_delay_( | 167 sessions_commit_delay_( |
| 170 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 168 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
| 171 mode_(NORMAL_MODE), | 169 mode_(NORMAL_MODE), |
| 172 // Start with assuming everything is fine with the connection. | |
| 173 // At the end of the sync cycle we would have the correct status. | |
| 174 pending_nudge_(NULL), | |
| 175 delay_provider_(delay_provider), | 170 delay_provider_(delay_provider), |
| 176 syncer_(syncer), | 171 syncer_(syncer), |
| 177 session_context_(context), | 172 session_context_(context), |
| 178 no_scheduling_allowed_(false) { | 173 no_scheduling_allowed_(false) { |
| 179 DCHECK(sync_loop_); | 174 DCHECK(sync_loop_); |
| 180 } | 175 } |
| 181 | 176 |
| 182 SyncSchedulerImpl::~SyncSchedulerImpl() { | 177 SyncSchedulerImpl::~SyncSchedulerImpl() { |
| 183 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 178 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 184 StopImpl(base::Closure()); | 179 StopImpl(base::Closure()); |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 207 // | 202 // |
| 208 // 1. We're in exponential backoff. | 203 // 1. We're in exponential backoff. |
| 209 // 2. We're silenced / throttled. | 204 // 2. We're silenced / throttled. |
| 210 // 3. A nudge was saved previously due to not having a valid auth token. | 205 // 3. A nudge was saved previously due to not having a valid auth token. |
| 211 // 4. A nudge was scheduled + saved while in configuration mode. | 206 // 4. A nudge was scheduled + saved while in configuration mode. |
| 212 // | 207 // |
| 213 // In all cases except (2), we want to retry contacting the server. We | 208 // In all cases except (2), we want to retry contacting the server. We |
| 214 // call DoCanaryJob to achieve this, and note that nothing -- not even a | 209 // call DoCanaryJob to achieve this, and note that nothing -- not even a |
| 215 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | 210 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that |
| 216 // has the authority to do that is the Unthrottle timer. | 211 // has the authority to do that is the Unthrottle timer. |
| 217 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | 212 TryCanaryJob(); |
| 218 if (!pending.get()) | |
| 219 return; | |
| 220 DoCanaryJob(pending.Pass()); | |
| 221 } | 213 } |
| 222 | 214 |
| 223 void SyncSchedulerImpl::Start(Mode mode) { | 215 void SyncSchedulerImpl::Start(Mode mode) { |
| 224 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 216 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 225 std::string thread_name = MessageLoop::current()->thread_name(); | 217 std::string thread_name = MessageLoop::current()->thread_name(); |
| 226 if (thread_name.empty()) | 218 if (thread_name.empty()) |
| 227 thread_name = "<Main thread>"; | 219 thread_name = "<Main thread>"; |
| 228 SDVLOG(2) << "Start called from thread " | 220 SDVLOG(2) << "Start called from thread " |
| 229 << thread_name << " with mode " << GetModeString(mode); | 221 << thread_name << " with mode " << GetModeString(mode); |
| 230 if (!started_) { | 222 if (!started_) { |
| 231 started_ = true; | 223 started_ = true; |
| 232 SendInitialSnapshot(); | 224 SendInitialSnapshot(); |
| 233 } | 225 } |
| 234 | 226 |
| 235 DCHECK(!session_context_->account_name().empty()); | 227 DCHECK(!session_context_->account_name().empty()); |
| 236 DCHECK(syncer_.get()); | 228 DCHECK(syncer_.get()); |
| 237 Mode old_mode = mode_; | 229 Mode old_mode = mode_; |
| 238 mode_ = mode; | 230 mode_ = mode; |
| 239 AdjustPolling(NULL); // Will kick start poll timer if needed. | 231 AdjustPolling(NULL); // Will kick start poll timer if needed. |
| 240 | 232 |
| 241 if (old_mode != mode_) { | 233 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) { |
| 242 // We just changed our mode. See if there are any pending jobs that we could | 234 // We just got back to normal mode. Let's try to run the work that was |
| 243 // execute in the new mode. | 235 // queued up while we were configuring. |
| 244 if (mode_ == NORMAL_MODE) { | 236 DoNudgeSyncSessionJob(NORMAL_PRIORITY); |
| 245 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | |
| 246 // has not yet completed. | |
| 247 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | |
| 248 } | |
| 249 | |
| 250 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
| 251 if (pending.get()) { | |
| 252 SDVLOG(2) << "Executing pending job. Good luck!"; | |
| 253 DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY); | |
| 254 } | |
| 255 } | 237 } |
| 256 } | 238 } |
| 257 | 239 |
| 258 void SyncSchedulerImpl::SendInitialSnapshot() { | 240 void SyncSchedulerImpl::SendInitialSnapshot() { |
| 259 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 241 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 260 scoped_ptr<SyncSession> dummy(new SyncSession( | 242 scoped_ptr<SyncSession> dummy(new SyncSession( |
| 261 session_context_, this, SyncSourceInfo())); | 243 session_context_, this, SyncSourceInfo())); |
| 262 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 244 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
| 263 event.snapshot = dummy->TakeSnapshot(); | 245 event.snapshot = dummy->TakeSnapshot(); |
| 264 session_context_->NotifyListeners(event); | 246 session_context_->NotifyListeners(event); |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 288 const ConfigurationParams& params) { | 270 const ConfigurationParams& params) { |
| 289 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 271 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 290 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 272 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
| 291 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 273 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
| 292 DCHECK(!params.ready_task.is_null()); | 274 DCHECK(!params.ready_task.is_null()); |
| 293 CHECK(started_) << "Scheduler must be running to configure."; | 275 CHECK(started_) << "Scheduler must be running to configure."; |
| 294 SDVLOG(2) << "Reconfiguring syncer."; | 276 SDVLOG(2) << "Reconfiguring syncer."; |
| 295 | 277 |
| 296 // Only one configuration is allowed at a time. Verify we're not waiting | 278 // Only one configuration is allowed at a time. Verify we're not waiting |
| 297 // for a pending configure job. | 279 // for a pending configure job. |
| 298 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | 280 DCHECK(!pending_configure_job_); |
| 299 | 281 |
| 300 ModelSafeRoutingInfo restricted_routes; | 282 ModelSafeRoutingInfo restricted_routes; |
| 301 BuildModelSafeParams(params.types_to_download, | 283 BuildModelSafeParams(params.types_to_download, |
| 302 params.routing_info, | 284 params.routing_info, |
| 303 &restricted_routes); | 285 &restricted_routes); |
| 304 session_context_->set_routing_info(restricted_routes); | 286 session_context_->set_routing_info(restricted_routes); |
| 305 | 287 |
| 306 // Only reconfigure if we have types to download. | 288 // Only reconfigure if we have types to download. |
| 307 if (!params.types_to_download.Empty()) { | 289 if (!params.types_to_download.Empty()) { |
| 308 DCHECK(!restricted_routes.empty()); | 290 DCHECK(!restricted_routes.empty()); |
| 309 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 291 pending_configure_job_.reset(new SyncSessionJob( |
| 310 SyncSessionJob::CONFIGURATION, | 292 SyncSessionJob::CONFIGURATION, |
| 311 TimeTicks::Now(), | 293 TimeTicks::Now(), |
| 312 SyncSourceInfo(params.source, | 294 SyncSourceInfo(params.source, |
| 313 ModelSafeRoutingInfoToInvalidationMap( | 295 ModelSafeRoutingInfoToInvalidationMap( |
| 314 restricted_routes, | 296 restricted_routes, |
| 315 std::string())), | 297 std::string())), |
| 316 params)); | 298 params)); |
| 317 bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); | 299 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); |
| 318 | 300 |
| 319 // If we failed, the job would have been saved as the pending configure | 301 // If we failed, the job would have been saved as the pending configure |
| 320 // job and a wait interval would have been set. | 302 // job and a wait interval would have been set. |
| 321 if (!succeeded) { | 303 if (!succeeded) { |
| 322 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); | 304 DCHECK(pending_configure_job_); |
| 323 return false; | 305 return false; |
| 306 } else { | |
| 307 DCHECK(!pending_configure_job_); | |
| 324 } | 308 } |
| 325 } else { | 309 } else { |
| 326 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 310 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
| 327 params.ready_task.Run(); | 311 params.ready_task.Run(); |
| 328 } | 312 } |
| 329 | 313 |
| 330 return true; | 314 return true; |
| 331 } | 315 } |
| 332 | 316 |
| 333 SyncSchedulerImpl::JobProcessDecision | 317 SyncSchedulerImpl::JobProcessDecision |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 355 | 339 |
| 356 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 340 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| 357 if (job.purpose() == SyncSessionJob::NUDGE) { | 341 if (job.purpose() == SyncSessionJob::NUDGE) { |
| 358 if (mode_ == CONFIGURATION_MODE) | 342 if (mode_ == CONFIGURATION_MODE) |
| 359 return SAVE; | 343 return SAVE; |
| 360 | 344 |
| 361 // If we already had one nudge then just drop this nudge. We will retry | 345 // If we already had one nudge then just drop this nudge. We will retry |
| 362 // later when the timer runs out. | 346 // later when the timer runs out. |
| 363 if (priority == NORMAL_PRIORITY) | 347 if (priority == NORMAL_PRIORITY) |
| 364 return wait_interval_->had_nudge ? DROP : CONTINUE; | 348 return wait_interval_->had_nudge ? DROP : CONTINUE; |
| 365 else // We are here because timer ran out. So retry. | 349 else // We are here because timer ran out. So retry. |
|
tim (not reviewing)
2013/04/04 18:07:30
Just noticed this clause is redundant and somewhat
rlarocque
2013/04/05 00:48:51
I was planning to leave it alone for now. There's
| |
| 366 return CONTINUE; | 350 return CONTINUE; |
| 367 } | 351 } |
| 368 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; | 352 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; |
| 369 } | 353 } |
| 370 | 354 |
| 371 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 355 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
| 372 const SyncSessionJob& job, | 356 const SyncSessionJob& job, |
| 373 JobPriority priority) { | 357 JobPriority priority) { |
| 374 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 358 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 375 | 359 |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 442 // Decision now rests on state of auth tokens. | 426 // Decision now rests on state of auth tokens. |
| 443 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 427 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
| 444 return CONTINUE; | 428 return CONTINUE; |
| 445 | 429 |
| 446 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 430 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
| 447 // Running the job would require updated auth, so we can't honour | 431 // Running the job would require updated auth, so we can't honour |
| 448 // job.scheduled_start(). | 432 // job.scheduled_start(). |
| 449 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | 433 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; |
| 450 } | 434 } |
| 451 | 435 |
| 452 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { | |
| 453 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; | |
| 454 if (is_nudge && pending_nudge_) { | |
| 455 SDVLOG(2) << "Coalescing a pending nudge"; | |
| 456 // TODO(tim): This basically means we never use the more-careful coalescing | |
| 457 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
| 458 // times, because we're calling this function first. Pull this out | |
| 459 // into a function to coalesce + set start times and reuse. | |
| 460 pending_nudge_->CoalesceSources(job->source_info()); | |
| 461 return; | |
| 462 } | |
| 463 | |
| 464 scoped_ptr<SyncSessionJob> job_to_save = job->Clone(); | |
| 465 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
| 466 // This job should be made the new canary. | |
| 467 if (is_nudge) { | |
| 468 pending_nudge_ = job_to_save.get(); | |
| 469 } else { | |
| 470 SDVLOG(2) << "Saving a configuration job"; | |
| 471 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
| 472 DCHECK(!wait_interval_->pending_configure_job); | |
| 473 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
| 474 DCHECK(!job->config_params().ready_task.is_null()); | |
| 475 // The only nudge that could exist is a scheduled canary nudge. | |
| 476 DCHECK(!unscheduled_nudge_storage_.get()); | |
| 477 if (pending_nudge_) { | |
| 478 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
| 479 unscheduled_nudge_storage_ = pending_nudge_->Clone(); | |
| 480 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
| 481 } | |
| 482 wait_interval_->pending_configure_job = job_to_save.get(); | |
| 483 } | |
| 484 TimeDelta length = | |
| 485 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
| 486 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
| 487 TimeDelta::FromSeconds(0) : length; | |
| 488 RestartWaiting(job_to_save.Pass()); | |
| 489 return; | |
| 490 } | |
| 491 | |
| 492 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
| 493 // when we're not in a WaitInterval. See bug 147736. | |
| 494 DCHECK(is_nudge); | |
| 495 // There may or may not be a pending_configure_job. Either way this nudge | |
| 496 // is unschedulable. | |
| 497 pending_nudge_ = job_to_save.get(); | |
| 498 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
| 499 } | |
| 500 | |
| 501 // Functor for std::find_if to search by ModelSafeGroup. | |
| 502 struct ModelSafeWorkerGroupIs { | |
| 503 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | |
| 504 bool operator()(ModelSafeWorker* w) { | |
| 505 return group == w->GetModelSafeGroup(); | |
| 506 } | |
| 507 ModelSafeGroup group; | |
| 508 }; | |
| 509 | |
| 510 void SyncSchedulerImpl::ScheduleNudgeAsync( | 436 void SyncSchedulerImpl::ScheduleNudgeAsync( |
| 511 const TimeDelta& desired_delay, | 437 const TimeDelta& desired_delay, |
| 512 NudgeSource source, ModelTypeSet types, | 438 NudgeSource source, ModelTypeSet types, |
| 513 const tracked_objects::Location& nudge_location) { | 439 const tracked_objects::Location& nudge_location) { |
| 514 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 440 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 515 SDVLOG_LOC(nudge_location, 2) | 441 SDVLOG_LOC(nudge_location, 2) |
| 516 << "Nudge scheduled with delay " | 442 << "Nudge scheduled with delay " |
| 517 << desired_delay.InMilliseconds() << " ms, " | 443 << desired_delay.InMilliseconds() << " ms, " |
| 518 << "source " << GetNudgeSourceString(source) << ", " | 444 << "source " << GetNudgeSourceString(source) << ", " |
| 519 << "types " << ModelTypeSetToString(types); | 445 << "types " << ModelTypeSetToString(types); |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 579 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 505 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
| 580 SyncSessionJob::NUDGE, | 506 SyncSessionJob::NUDGE, |
| 581 TimeTicks::Now() + delay, | 507 TimeTicks::Now() + delay, |
| 582 info, | 508 info, |
| 583 ConfigurationParams())); | 509 ConfigurationParams())); |
| 584 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); | 510 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); |
| 585 SDVLOG(2) << "Should run " | 511 SDVLOG(2) << "Should run " |
| 586 << SyncSessionJob::GetPurposeString(job->purpose()) | 512 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 587 << " in mode " << GetModeString(mode_) | 513 << " in mode " << GetModeString(mode_) |
| 588 << ": " << GetDecisionString(decision); | 514 << ": " << GetDecisionString(decision); |
| 589 if (decision != CONTINUE) { | 515 if (decision == DROP) { |
| 590 // End of the line, though we may save the job for later. | |
| 591 if (decision == SAVE) { | |
| 592 HandleSaveJobDecision(job.Pass()); | |
| 593 } else { | |
| 594 DCHECK_EQ(decision, DROP); | |
| 595 } | |
| 596 return; | 516 return; |
| 597 } | 517 } |
| 598 | 518 |
| 599 if (pending_nudge_) { | 519 // Try to coalesce in both SAVE and CONTINUE cases. |
| 600 SDVLOG(2) << "Rescheduling pending nudge"; | 520 if (pending_nudge_job_) { |
| 601 pending_nudge_->CoalesceSources(job->source_info()); | 521 pending_nudge_job_->CoalesceSources(job->source_info()); |
| 602 // Choose the start time as the earliest of the 2. Note that this means | 522 if (decision == CONTINUE) { |
| 603 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) | 523 // Only update the scheduled_start if we're going to reschedule. |
| 604 // but a nudge is already scheduled to go out, we'll send the (tab) commit | 524 pending_nudge_job_->set_scheduled_start( |
| 605 // without waiting. | 525 std::min(job->scheduled_start(), |
| 606 pending_nudge_->set_scheduled_start( | 526 pending_nudge_job_->scheduled_start())); |
| 607 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); | 527 } |
| 608 // Abandon the old task by cloning and replacing the session. | 528 } else { |
| 609 // It's possible that by "rescheduling" we're actually taking a job that | 529 pending_nudge_job_ = job.Pass(); |
| 610 // was previously unscheduled and giving it wings, so take care to reset | |
| 611 // unscheduled nudge storage. | |
| 612 job = pending_nudge_->Clone(); | |
| 613 pending_nudge_ = NULL; | |
| 614 unscheduled_nudge_storage_.reset(); | |
| 615 // It's also possible we took a canary job, since we allow one nudge | |
| 616 // per backoff interval. | |
| 617 DCHECK(!wait_interval_ || !wait_interval_->had_nudge); | |
| 618 } | 530 } |
| 619 | 531 |
| 620 TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now(); | 532 if (decision == SAVE) { |
| 533 return; | |
| 534 } | |
| 535 | |
| 536 TimeDelta run_delay = | |
| 537 pending_nudge_job_->scheduled_start() - TimeTicks::Now(); | |
| 621 if (run_delay < TimeDelta::FromMilliseconds(0)) | 538 if (run_delay < TimeDelta::FromMilliseconds(0)) |
| 622 run_delay = TimeDelta::FromMilliseconds(0); | 539 run_delay = TimeDelta::FromMilliseconds(0); |
| 623 SDVLOG_LOC(nudge_location, 2) | 540 SDVLOG_LOC(nudge_location, 2) |
| 624 << "Scheduling a nudge with " | 541 << "Scheduling a nudge with " |
| 625 << run_delay.InMilliseconds() << " ms delay"; | 542 << run_delay.InMilliseconds() << " ms delay"; |
| 626 | 543 |
| 627 pending_nudge_ = job.get(); | 544 PostDelayedTask( |
| 628 PostDelayedTask(nudge_location, "DoSyncSessionJob", | 545 nudge_location, |
| 629 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), | 546 "DoSyncSessionJob", |
| 630 weak_ptr_factory_.GetWeakPtr(), | 547 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoNudgeSyncSessionJob), |
| 631 base::Passed(&job), | 548 weak_ptr_factory_.GetWeakPtr(), |
| 632 NORMAL_PRIORITY), | 549 NORMAL_PRIORITY), |
| 633 run_delay); | 550 run_delay); |
| 634 } | 551 } |
| 635 | 552 |
| 636 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 553 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
| 637 switch (mode) { | 554 switch (mode) { |
| 638 ENUM_CASE(CONFIGURATION_MODE); | 555 ENUM_CASE(CONFIGURATION_MODE); |
| 639 ENUM_CASE(NORMAL_MODE); | 556 ENUM_CASE(NORMAL_MODE); |
| 640 } | 557 } |
| 641 return ""; | 558 return ""; |
| 642 } | 559 } |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 659 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 576 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 660 if (!started_) { | 577 if (!started_) { |
| 661 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 578 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
| 662 return; | 579 return; |
| 663 } | 580 } |
| 664 // This cancels the previous task, if one existed. | 581 // This cancels the previous task, if one existed. |
| 665 pending_wakeup_.Reset(task); | 582 pending_wakeup_.Reset(task); |
| 666 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); | 583 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); |
| 667 } | 584 } |
| 668 | 585 |
| 669 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, | 586 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job, |
| 670 JobPriority priority) { | 587 JobPriority priority) { |
| 671 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 588 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 672 if (job->purpose() == SyncSessionJob::NUDGE) { | |
| 673 pending_nudge_ = NULL; | |
| 674 } | |
| 675 | 589 |
| 676 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 590 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 677 JobProcessDecision decision = DecideOnJob(*job, priority); | 591 JobProcessDecision decision = DecideOnJob(*job, priority); |
| 678 SDVLOG(2) << "Should run " | 592 SDVLOG(2) << "Should run " |
| 679 << SyncSessionJob::GetPurposeString(job->purpose()) | 593 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 680 << " in mode " << GetModeString(mode_) | 594 << " in mode " << GetModeString(mode_) |
| 681 << " with source " << job->source_info().updates_source | 595 << " with source " << job->source_info().updates_source |
| 682 << ": " << GetDecisionString(decision); | 596 << ": " << GetDecisionString(decision); |
| 683 if (decision != CONTINUE) { | 597 if (decision != CONTINUE) { |
| 684 if (decision == SAVE) { | 598 if (decision == SAVE) { |
| 685 HandleSaveJobDecision(job.Pass()); | 599 if (job->purpose() == SyncSessionJob::CONFIGURATION) { |
| 600 pending_configure_job_ = job.Pass(); | |
| 601 | |
| 602 // It's very unlikely, but possible, that the WaitInterval's wakeup task | |
| 603 // isn't actually active at this point. Sometimes the WaitInterval gets | |
| 604 // preempted by a nudge-while-in-backoff. We can't just assume that | |
| 605 // there's a task waiting to wake us up once the WaitInterval expires; | |
| 606 // we need to ensure it by rescheduling the WaitInterval (while taking | |
| 607 // into account any time already spent waiting, of course). | |
| 608 ResumeWaiting(); | |
|
rlarocque
2013/04/02 23:01:12
Here's a paragraph that I had originally intended
tim (not reviewing)
2013/04/04 18:07:30
It looks like we actually keep the WaitInterval ti
| |
| 609 } else { | |
| 610 pending_nudge_job_ = job.Pass(); | |
| 611 } | |
| 686 } else { | 612 } else { |
| 687 DCHECK_EQ(decision, DROP); | 613 DCHECK_EQ(decision, DROP); |
| 688 } | 614 } |
| 689 return false; | 615 return false; |
| 690 } | 616 } |
| 691 | 617 |
| 692 DVLOG(2) << "Creating sync session with routes " | 618 DVLOG(2) << "Creating sync session with routes " |
| 693 << ModelSafeRoutingInfoToString(session_context_->routing_info()) | 619 << ModelSafeRoutingInfoToString(session_context_->routing_info()) |
| 694 << "and purpose " << job->purpose(); | 620 << "and purpose " << job->purpose(); |
| 695 SyncSession session(session_context_, this, job->source_info()); | 621 SyncSession session(session_context_, this, job->source_info()); |
| 696 bool premature_exit = !syncer_->SyncShare(&session, | 622 bool premature_exit = !syncer_->SyncShare(&session, |
| 697 job->start_step(), | 623 job->start_step(), |
| 698 job->end_step()); | 624 job->end_step()); |
| 699 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 625 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; |
| 700 | 626 |
| 701 bool success = FinishSyncSessionJob(job.get(), | 627 bool success = FinishSyncSessionJob(job.get(), |
| 702 premature_exit, | 628 premature_exit, |
| 703 &session); | 629 &session); |
| 704 | 630 |
| 705 if (IsSyncingCurrentlySilenced()) { | 631 if (IsSyncingCurrentlySilenced()) { |
| 706 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | 632 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
| 707 // If we're here, it's because |job| was silenced until a server specified | 633 // If we're here, it's because |job| was silenced until a server specified |
| 708 // time. (Note, it had to be |job|, because DecideOnJob would not permit | 634 // time. (Note, it had to be |job|, because DecideOnJob would not permit |
| 709 // any job through while in WaitInterval::THROTTLED). | 635 // any job through while in WaitInterval::THROTTLED). |
| 710 scoped_ptr<SyncSessionJob> clone = job->Clone(); | 636 if (job->purpose() == SyncSessionJob::NUDGE) |
| 711 if (clone->purpose() == SyncSessionJob::NUDGE) | 637 pending_nudge_job_ = job.Pass(); |
| 712 pending_nudge_ = clone.get(); | 638 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
| 713 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) | 639 pending_configure_job_ = job.Pass(); |
| 714 wait_interval_->pending_configure_job = clone.get(); | |
| 715 else | 640 else |
| 716 NOTREACHED(); | 641 NOTREACHED(); |
| 717 | 642 |
| 718 RestartWaiting(clone.Pass()); | 643 RestartWaiting(); |
| 719 return success; | 644 return success; |
| 720 } | 645 } |
| 721 | 646 |
| 722 if (!success) | 647 if (!success) |
| 723 ScheduleNextSync(job.Pass(), &session); | 648 ScheduleNextSync(job.Pass(), &session); |
| 724 | 649 |
| 725 return success; | 650 return success; |
| 726 } | 651 } |
| 727 | 652 |
| 653 bool SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { | |
|
tim (not reviewing)
2013/04/04 00:22:11
Digging around, it looks like these Do*SSJ methods
rlarocque
2013/04/04 00:59:48
Your suggestion would work for now, but it wouldn'
| |
| 654 return DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority); | |
| 655 } | |
| 656 | |
| 657 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { | |
| 658 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority); | |
| 659 } | |
| 660 | |
| 728 bool SyncSchedulerImpl::ShouldPoll() { | 661 bool SyncSchedulerImpl::ShouldPoll() { |
| 729 if (wait_interval_.get()) { | 662 if (wait_interval_.get()) { |
| 730 SDVLOG(2) << "Not running poll in wait interval."; | 663 SDVLOG(2) << "Not running poll in wait interval."; |
| 731 return false; | 664 return false; |
| 732 } | 665 } |
| 733 | 666 |
| 734 if (mode_ == CONFIGURATION_MODE) { | 667 if (mode_ == CONFIGURATION_MODE) { |
| 735 SDVLOG(2) << "Not running poll in configuration mode."; | 668 SDVLOG(2) << "Not running poll in configuration mode."; |
| 736 return false; | 669 return false; |
| 737 } | 670 } |
| 738 | 671 |
| 739 // TODO(rlarocque): Refactor decision-making logic common to all types | 672 // TODO(rlarocque): Refactor decision-making logic common to all types |
| 740 // of jobs into a shared function. | 673 // of jobs into a shared function. |
| 741 | 674 |
| 742 if (session_context_->connection_manager()->HasInvalidAuthToken()) { | 675 if (session_context_->connection_manager()->HasInvalidAuthToken()) { |
| 743 SDVLOG(2) << "Not running poll because auth token is invalid."; | 676 SDVLOG(2) << "Not running poll because auth token is invalid."; |
| 744 return false; | 677 return false; |
| 745 } | 678 } |
| 746 | 679 |
| 747 return true; | 680 return true; |
| 748 } | 681 } |
| 749 | 682 |
| 750 void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { | 683 void SyncSchedulerImpl::DoPollSyncSessionJob() { |
| 751 DCHECK_EQ(job->purpose(), SyncSessionJob::POLL); | 684 ModelSafeRoutingInfo r; |
| 685 ModelTypeInvalidationMap invalidation_map = | |
| 686 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | |
| 687 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | |
| 688 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | |
| 689 TimeTicks::Now(), | |
| 690 info, | |
| 691 ConfigurationParams())); | |
| 752 | 692 |
| 753 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 693 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 754 | 694 |
| 755 if (!ShouldPoll()) | 695 if (!ShouldPoll()) |
| 756 return; | 696 return; |
| 757 | 697 |
| 758 DVLOG(2) << "Polling with routes " | 698 DVLOG(2) << "Polling with routes " |
| 759 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 699 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
| 760 SyncSession session(session_context_, this, job->source_info()); | 700 SyncSession session(session_context_, this, job->source_info()); |
| 761 bool premature_exit = !syncer_->SyncShare(&session, | 701 bool premature_exit = !syncer_->SyncShare(&session, |
| 762 job->start_step(), | 702 job->start_step(), |
| 763 job->end_step()); | 703 job->end_step()); |
| 764 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 704 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; |
| 765 | 705 |
| 766 FinishSyncSessionJob(job.get(), premature_exit, &session); | 706 FinishSyncSessionJob(job.get(), premature_exit, &session); |
| 767 | 707 |
| 768 if (IsSyncingCurrentlySilenced()) { | 708 if (IsSyncingCurrentlySilenced()) { |
| 769 // This will start the countdown to unthrottle. Other kinds of jobs would | 709 // Normally we would only call RestartWaiting() if we had a |
| 770 // schedule themselves as the post-unthrottle canary. A poll job is not | 710 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's |
| 771 // that urgent, so it does not get to be the canary. We still need to start | 711 // possible that neither is set. We create the wait interval anyway because |
| 772 // the timer regardless. Otherwise there could be no one to clear the | 712 // we need it to make sure we get unthrottled on time. |
| 773 // WaitInterval when the throttling expires. | 713 RestartWaiting(); |
| 774 RestartWaiting(scoped_ptr<SyncSessionJob>()); | |
| 775 } | 714 } |
| 776 } | 715 } |
| 777 | 716 |
| 778 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 717 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
| 779 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 718 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 780 | 719 |
| 781 // We are interested in recording time between local nudges for datatypes. | 720 // We are interested in recording time between local nudges for datatypes. |
| 782 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 721 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
| 783 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 722 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
| 784 return; | 723 return; |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 847 // appears that this was our nudge for this interval, and it failed. | 786 // appears that this was our nudge for this interval, and it failed. |
| 848 // | 787 // |
| 849 // Note: This does not prevent us from running canary jobs. For example, | 788 // Note: This does not prevent us from running canary jobs. For example, |
| 850 // an IP address change might still result in another nudge being executed | 789 // an IP address change might still result in another nudge being executed |
| 851 // during this backoff interval. | 790 // during this backoff interval. |
| 852 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; | 791 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
| 853 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); | 792 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); |
| 854 DCHECK(!wait_interval_->had_nudge); | 793 DCHECK(!wait_interval_->had_nudge); |
| 855 | 794 |
| 856 wait_interval_->had_nudge = true; | 795 wait_interval_->had_nudge = true; |
| 857 DCHECK(!pending_nudge_); | 796 DCHECK(!pending_nudge_job_); |
| 858 | 797 |
| 859 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); | 798 pending_nudge_job_ = finished_job.Pass(); |
| 860 pending_nudge_ = new_job.get(); | 799 RestartWaiting(); |
| 861 RestartWaiting(new_job.Pass()); | |
| 862 } else { | 800 } else { |
| 863 // Either this is the first failure or a consecutive failure after our | 801 // Either this is the first failure or a consecutive failure after our |
| 864 // backoff timer expired. We handle it the same way in either case. | 802 // backoff timer expired. We handle it the same way in either case. |
| 865 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 803 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
| 866 HandleContinuationError(finished_job.Pass(), session); | 804 HandleContinuationError(finished_job.Pass(), session); |
| 867 } | 805 } |
| 868 } | 806 } |
| 869 | 807 |
| 870 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 808 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
| 871 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 809 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 872 | 810 |
| 873 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 811 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
| 874 syncer_short_poll_interval_seconds_ : | 812 syncer_short_poll_interval_seconds_ : |
| 875 syncer_long_poll_interval_seconds_; | 813 syncer_long_poll_interval_seconds_; |
| 876 bool rate_changed = !poll_timer_.IsRunning() || | 814 bool rate_changed = !poll_timer_.IsRunning() || |
| 877 poll != poll_timer_.GetCurrentDelay(); | 815 poll != poll_timer_.GetCurrentDelay(); |
| 878 | 816 |
| 879 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) | 817 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
| 880 poll_timer_.Reset(); | 818 poll_timer_.Reset(); |
| 881 | 819 |
| 882 if (!rate_changed) | 820 if (!rate_changed) |
| 883 return; | 821 return; |
| 884 | 822 |
| 885 // Adjust poll rate. | 823 // Adjust poll rate. |
| 886 poll_timer_.Stop(); | 824 poll_timer_.Stop(); |
| 887 poll_timer_.Start(FROM_HERE, poll, this, | 825 poll_timer_.Start(FROM_HERE, poll, this, |
| 888 &SyncSchedulerImpl::PollTimerCallback); | 826 &SyncSchedulerImpl::PollTimerCallback); |
| 889 } | 827 } |
| 890 | 828 |
| 891 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { | 829 void SyncSchedulerImpl::ResumeWaiting() { |
| 830 TimeDelta length = | |
| 831 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
| 832 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
| 833 TimeDelta::FromSeconds(0) : length; | |
| 834 RestartWaiting(); | |
| 835 } | |
| 836 | |
| 837 void SyncSchedulerImpl::RestartWaiting() { | |
| 892 CHECK(wait_interval_.get()); | 838 CHECK(wait_interval_.get()); |
| 893 wait_interval_->timer.Stop(); | 839 wait_interval_->timer.Stop(); |
| 894 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); | 840 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
| 895 if (wait_interval_->mode == WaitInterval::THROTTLED) { | 841 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| 896 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, | 842 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, |
| 897 weak_ptr_factory_.GetWeakPtr(), | 843 weak_ptr_factory_.GetWeakPtr())); |
| 898 base::Passed(&job))); | |
| 899 | 844 |
| 900 } else { | 845 } else { |
| 901 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 846 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::TryCanaryJob, |
| 902 weak_ptr_factory_.GetWeakPtr(), | 847 weak_ptr_factory_.GetWeakPtr())); |
| 903 base::Passed(&job))); | |
| 904 } | 848 } |
| 905 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 849 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
| 906 pending_wakeup_.callback()); | 850 pending_wakeup_.callback()); |
| 907 } | 851 } |
| 908 | 852 |
| 909 void SyncSchedulerImpl::HandleContinuationError( | 853 void SyncSchedulerImpl::HandleContinuationError( |
| 910 scoped_ptr<SyncSessionJob> old_job, | 854 scoped_ptr<SyncSessionJob> old_job, |
| 911 SyncSession* session) { | 855 SyncSession* session) { |
| 912 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 856 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 913 | 857 |
| 914 TimeDelta length = delay_provider_->GetDelay( | 858 TimeDelta length = delay_provider_->GetDelay( |
| 915 IsBackingOff() ? wait_interval_->length : | 859 IsBackingOff() ? wait_interval_->length : |
| 916 delay_provider_->GetInitialDelay( | 860 delay_provider_->GetInitialDelay( |
| 917 session->status_controller().model_neutral_state())); | 861 session->status_controller().model_neutral_state())); |
| 918 | 862 |
| 919 SDVLOG(2) << "In handle continuation error with " | 863 SDVLOG(2) << "In handle continuation error with " |
| 920 << SyncSessionJob::GetPurposeString(old_job->purpose()) | 864 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
| 921 << " job. The time delta(ms) is " | 865 << " job. The time delta(ms) is " |
| 922 << length.InMilliseconds(); | 866 << length.InMilliseconds(); |
| 923 | 867 |
| 924 // This will reset the had_nudge variable as well. | 868 // This will reset the had_nudge variable as well. |
| 925 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 869 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| 926 length)); | 870 length)); |
| 927 NotifyRetryTime(base::Time::Now() + length); | 871 NotifyRetryTime(base::Time::Now() + length); |
| 928 scoped_ptr<SyncSessionJob> new_job(old_job->Clone()); | 872 old_job->set_scheduled_start(TimeTicks::Now() + length); |
| 929 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
| 930 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | 873 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { |
| 931 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 874 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
| 932 // Config params should always get set. | 875 // Config params should always get set. |
| 933 DCHECK(!old_job->config_params().ready_task.is_null()); | 876 DCHECK(!old_job->config_params().ready_task.is_null()); |
| 934 wait_interval_->pending_configure_job = new_job.get(); | 877 DCHECK(!pending_configure_job_); |
| 878 pending_configure_job_ = old_job.Pass(); | |
| 935 } else { | 879 } else { |
| 936 // We are not in configuration mode. So wait_interval's pending job | 880 // We're not in configure mode so we should not have a configure job. |
| 937 // should be null. | 881 DCHECK(!pending_configure_job_); |
| 938 DCHECK(wait_interval_->pending_configure_job == NULL); | 882 DCHECK(!pending_nudge_job_); |
| 939 DCHECK(!pending_nudge_); | 883 pending_nudge_job_ = old_job.Pass(); |
| 940 pending_nudge_ = new_job.get(); | |
| 941 } | 884 } |
| 942 | 885 |
| 943 RestartWaiting(new_job.Pass()); | 886 RestartWaiting(); |
| 944 } | 887 } |
| 945 | 888 |
| 946 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 889 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
| 947 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 890 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
| 948 DCHECK(weak_handle_this_.IsInitialized()); | 891 DCHECK(weak_handle_this_.IsInitialized()); |
| 949 SDVLOG(3) << "Posting StopImpl"; | 892 SDVLOG(3) << "Posting StopImpl"; |
| 950 weak_handle_this_.Call(FROM_HERE, | 893 weak_handle_this_.Call(FROM_HERE, |
| 951 &SyncSchedulerImpl::StopImpl, | 894 &SyncSchedulerImpl::StopImpl, |
| 952 callback); | 895 callback); |
| 953 } | 896 } |
| 954 | 897 |
| 955 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 898 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
| 956 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 899 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 957 SDVLOG(2) << "StopImpl called"; | 900 SDVLOG(2) << "StopImpl called"; |
| 958 | 901 |
| 959 // Kill any in-flight method calls. | 902 // Kill any in-flight method calls. |
| 960 weak_ptr_factory_.InvalidateWeakPtrs(); | 903 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 961 wait_interval_.reset(); | 904 wait_interval_.reset(); |
| 962 NotifyRetryTime(base::Time()); | 905 NotifyRetryTime(base::Time()); |
| 963 poll_timer_.Stop(); | 906 poll_timer_.Stop(); |
| 964 pending_nudge_ = NULL; | |
| 965 unscheduled_nudge_storage_.reset(); | |
| 966 pending_wakeup_.Cancel(); | 907 pending_wakeup_.Cancel(); |
|
tim (not reviewing)
2013/04/04 00:22:11
Can we rename pending_wakeup_ to pending_wakeup_ev
rlarocque
2013/04/04 00:59:48
Done.
| |
| 908 pending_nudge_job_.reset(); | |
| 909 pending_configure_job_.reset(); | |
| 967 if (started_) { | 910 if (started_) { |
| 968 started_ = false; | 911 started_ = false; |
| 969 } | 912 } |
| 970 if (!callback.is_null()) | 913 if (!callback.is_null()) |
| 971 callback.Run(); | 914 callback.Run(); |
| 972 } | 915 } |
| 973 | 916 |
| 974 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { | 917 // This is the only place where we invoke DoSyncSessionJob with canary |
| 918 // privileges. Everyone else should use NORMAL_PRIORITY. | |
| 919 void SyncSchedulerImpl::TryCanaryJob() { | |
| 975 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 920 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 976 SDVLOG(2) << "Do canary job"; | |
| 977 | 921 |
| 978 // This is the only place where we invoke DoSyncSessionJob with canary | 922 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) { |
|
tim (not reviewing)
2013/04/04 18:07:30
There's an implicit point here that canary jobs ha
| |
| 979 // privileges. Everyone else should use NORMAL_PRIORITY. | 923 SDVLOG(2) << "Found pending configure job; will run as canary"; |
| 980 DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); | 924 DoConfigurationSyncSessionJob(CANARY_PRIORITY); |
| 981 } | 925 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) { |
| 982 | 926 SDVLOG(2) << "Found pending nudge job; will run as canary"; |
| 983 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { | 927 DoNudgeSyncSessionJob(CANARY_PRIORITY); |
| 984 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 928 } else { |
| 985 // If we find a scheduled pending_ job, abandon the old one and return a | 929 SDVLOG(2) << "Found no work to do; will not run a canary"; |
| 986 // a clone. If unscheduled, just hand over ownership. | |
| 987 scoped_ptr<SyncSessionJob> candidate; | |
| 988 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | |
| 989 && wait_interval_->pending_configure_job) { | |
| 990 SDVLOG(2) << "Found pending configure job"; | |
| 991 candidate = | |
| 992 wait_interval_->pending_configure_job->Clone().Pass(); | |
| 993 wait_interval_->pending_configure_job = candidate.get(); | |
| 994 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
| 995 SDVLOG(2) << "Found pending nudge job"; | |
| 996 candidate = pending_nudge_->Clone(); | |
| 997 pending_nudge_ = candidate.get(); | |
| 998 unscheduled_nudge_storage_.reset(); | |
| 999 } | 930 } |
| 1000 // If we took a job and there's a wait interval, we took the pending canary. | |
| 1001 if (candidate && wait_interval_) | |
| 1002 wait_interval_->timer.Stop(); | |
| 1003 return candidate.Pass(); | |
| 1004 } | 931 } |
| 1005 | 932 |
| 1006 void SyncSchedulerImpl::PollTimerCallback() { | 933 void SyncSchedulerImpl::PollTimerCallback() { |
| 1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 934 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1008 ModelSafeRoutingInfo r; | |
| 1009 ModelTypeInvalidationMap invalidation_map = | |
| 1010 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | |
| 1011 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | |
| 1012 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | |
| 1013 TimeTicks::Now(), | |
| 1014 info, | |
| 1015 ConfigurationParams())); | |
| 1016 if (no_scheduling_allowed_) { | 935 if (no_scheduling_allowed_) { |
| 1017 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in | 936 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in |
| 1018 // functions that are called only on the sync thread. This function is also | 937 // functions that are called only on the sync thread. This function is also |
| 1019 // called only on the sync thread, and only when it is posted by an expiring | 938 // called only on the sync thread, and only when it is posted by an expiring |
| 1020 // timer. If we find that no_scheduling_allowed_ is set here, then | 939 // timer. If we find that no_scheduling_allowed_ is set here, then |
| 1021 // something is very wrong. Maybe someone mistakenly called us directly, or | 940 // something is very wrong. Maybe someone mistakenly called us directly, or |
| 1022 // mishandled the book-keeping for no_scheduling_allowed_. | 941 // mishandled the book-keeping for no_scheduling_allowed_. |
| 1023 NOTREACHED() << "Illegal to schedule job while session in progress."; | 942 NOTREACHED() << "Illegal to schedule job while session in progress."; |
| 1024 return; | 943 return; |
| 1025 } | 944 } |
| 1026 | 945 |
| 1027 DoPollSyncSessionJob(job.Pass()); | 946 DoPollSyncSessionJob(); |
| 1028 } | 947 } |
| 1029 | 948 |
| 1030 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { | 949 void SyncSchedulerImpl::Unthrottle() { |
| 1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 950 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1032 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 951 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| 1033 DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() || | |
| 1034 wait_interval_->pending_configure_job == to_be_canary.get()); | |
| 1035 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") | |
| 1036 << "canary."; | |
| 1037 | 952 |
| 1038 // We're no longer throttled, so clear the wait interval. | 953 // We're no longer throttled, so clear the wait interval. |
| 1039 wait_interval_.reset(); | 954 wait_interval_.reset(); |
| 1040 NotifyRetryTime(base::Time()); | 955 NotifyRetryTime(base::Time()); |
| 1041 | 956 |
| 1042 // We treat this as a 'canary' in the sense that it was originally scheduled | 957 // We treat this as a 'canary' in the sense that it was originally scheduled |
| 1043 // to run some time ago, failed, and we now want to retry, versus a job that | 958 // to run some time ago, failed, and we now want to retry, versus a job that |
| 1044 // was just created (e.g via ScheduleNudgeImpl). The main implication is | 959 // was just created (e.g via ScheduleNudgeImpl). The main implication is |
| 1045 // that we're careful to update routing info (etc) with such potentially | 960 // that we're careful to update routing info (etc) with such potentially |
| 1046 // stale canary jobs. | 961 // stale canary jobs. |
| 1047 if (to_be_canary.get()) { | 962 TryCanaryJob(); |
| 1048 DoCanaryJob(to_be_canary.Pass()); | |
| 1049 } else { | |
| 1050 DCHECK(!unscheduled_nudge_storage_.get()); | |
| 1051 } | |
| 1052 } | 963 } |
| 1053 | 964 |
| 1054 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 965 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
| 1055 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 966 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1056 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 967 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
| 1057 } | 968 } |
| 1058 | 969 |
| 1059 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { | 970 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { |
| 1060 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); | 971 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); |
| 1061 event.retry_time = retry_time; | 972 event.retry_time = retry_time; |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1140 | 1051 |
| 1141 #undef SDVLOG_LOC | 1052 #undef SDVLOG_LOC |
| 1142 | 1053 |
| 1143 #undef SDVLOG | 1054 #undef SDVLOG |
| 1144 | 1055 |
| 1145 #undef SLOG | 1056 #undef SLOG |
| 1146 | 1057 |
| 1147 #undef ENUM_CASE | 1058 #undef ENUM_CASE |
| 1148 | 1059 |
| 1149 } // namespace syncer | 1060 } // namespace syncer |
| OLD | NEW |