OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
77 : source(source), | 77 : source(source), |
78 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
79 routing_info(routing_info), | 79 routing_info(routing_info), |
80 ready_task(ready_task) { | 80 ready_task(ready_task) { |
81 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
82 } | 82 } |
83 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
84 | 84 |
85 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
86 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
87 had_nudge(false), | 87 had_nudge(false) {} |
88 pending_configure_job(NULL) {} | |
89 | 88 |
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | 89 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
91 : mode(mode), had_nudge(false), length(length), | 90 : mode(mode), had_nudge(false), length(length) {} |
92 pending_configure_job(NULL) {} | |
93 | 91 |
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 92 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
95 | 93 |
96 #define ENUM_CASE(x) case x: return #x; break; | 94 #define ENUM_CASE(x) case x: return #x; break; |
97 | 95 |
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 96 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
99 switch (mode) { | 97 switch (mode) { |
100 ENUM_CASE(UNKNOWN); | 98 ENUM_CASE(UNKNOWN); |
101 ENUM_CASE(EXPONENTIAL_BACKOFF); | 99 ENUM_CASE(EXPONENTIAL_BACKOFF); |
102 ENUM_CASE(THROTTLED); | 100 ENUM_CASE(THROTTLED); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
162 name_(name), | 160 name_(name), |
163 sync_loop_(MessageLoop::current()), | 161 sync_loop_(MessageLoop::current()), |
164 started_(false), | 162 started_(false), |
165 syncer_short_poll_interval_seconds_( | 163 syncer_short_poll_interval_seconds_( |
166 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 164 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
167 syncer_long_poll_interval_seconds_( | 165 syncer_long_poll_interval_seconds_( |
168 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 166 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
169 sessions_commit_delay_( | 167 sessions_commit_delay_( |
170 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 168 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
171 mode_(NORMAL_MODE), | 169 mode_(NORMAL_MODE), |
172 // Start with assuming everything is fine with the connection. | |
173 // At the end of the sync cycle we would have the correct status. | |
174 pending_nudge_(NULL), | |
175 delay_provider_(delay_provider), | 170 delay_provider_(delay_provider), |
176 syncer_(syncer), | 171 syncer_(syncer), |
177 session_context_(context), | 172 session_context_(context), |
178 no_scheduling_allowed_(false) { | 173 no_scheduling_allowed_(false) { |
179 DCHECK(sync_loop_); | 174 DCHECK(sync_loop_); |
180 } | 175 } |
181 | 176 |
182 SyncSchedulerImpl::~SyncSchedulerImpl() { | 177 SyncSchedulerImpl::~SyncSchedulerImpl() { |
183 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 178 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
184 StopImpl(base::Closure()); | 179 StopImpl(base::Closure()); |
(...skipping 22 matching lines...) Expand all Loading... | |
207 // | 202 // |
208 // 1. We're in exponential backoff. | 203 // 1. We're in exponential backoff. |
209 // 2. We're silenced / throttled. | 204 // 2. We're silenced / throttled. |
210 // 3. A nudge was saved previously due to not having a valid auth token. | 205 // 3. A nudge was saved previously due to not having a valid auth token. |
211 // 4. A nudge was scheduled + saved while in configuration mode. | 206 // 4. A nudge was scheduled + saved while in configuration mode. |
212 // | 207 // |
213 // In all cases except (2), we want to retry contacting the server. We | 208 // In all cases except (2), we want to retry contacting the server. We |
214 // call DoCanaryJob to achieve this, and note that nothing -- not even a | 209 // call DoCanaryJob to achieve this, and note that nothing -- not even a |
215 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | 210 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that |
216 // has the authority to do that is the Unthrottle timer. | 211 // has the authority to do that is the Unthrottle timer. |
217 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | 212 TryCanaryJob(); |
218 if (!pending.get()) | |
219 return; | |
220 DoCanaryJob(pending.Pass()); | |
221 } | 213 } |
222 | 214 |
223 void SyncSchedulerImpl::Start(Mode mode) { | 215 void SyncSchedulerImpl::Start(Mode mode) { |
224 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 216 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
225 std::string thread_name = MessageLoop::current()->thread_name(); | 217 std::string thread_name = MessageLoop::current()->thread_name(); |
226 if (thread_name.empty()) | 218 if (thread_name.empty()) |
227 thread_name = "<Main thread>"; | 219 thread_name = "<Main thread>"; |
228 SDVLOG(2) << "Start called from thread " | 220 SDVLOG(2) << "Start called from thread " |
229 << thread_name << " with mode " << GetModeString(mode); | 221 << thread_name << " with mode " << GetModeString(mode); |
230 if (!started_) { | 222 if (!started_) { |
231 started_ = true; | 223 started_ = true; |
232 SendInitialSnapshot(); | 224 SendInitialSnapshot(); |
233 } | 225 } |
234 | 226 |
235 DCHECK(!session_context_->account_name().empty()); | 227 DCHECK(!session_context_->account_name().empty()); |
236 DCHECK(syncer_.get()); | 228 DCHECK(syncer_.get()); |
237 Mode old_mode = mode_; | 229 Mode old_mode = mode_; |
238 mode_ = mode; | 230 mode_ = mode; |
239 AdjustPolling(NULL); // Will kick start poll timer if needed. | 231 AdjustPolling(NULL); // Will kick start poll timer if needed. |
240 | 232 |
241 if (old_mode != mode_) { | 233 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) { |
242 // We just changed our mode. See if there are any pending jobs that we could | 234 // We just got back to normal mode. Let's try to run the work that was |
243 // execute in the new mode. | 235 // queued up while we were configuring. |
244 if (mode_ == NORMAL_MODE) { | 236 DoNudgeSyncSessionJob(NORMAL_PRIORITY); |
245 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | |
246 // has not yet completed. | |
247 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | |
248 } | |
249 | |
250 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
251 if (pending.get()) { | |
252 SDVLOG(2) << "Executing pending job. Good luck!"; | |
253 DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY); | |
254 } | |
255 } | 237 } |
256 } | 238 } |
257 | 239 |
258 void SyncSchedulerImpl::SendInitialSnapshot() { | 240 void SyncSchedulerImpl::SendInitialSnapshot() { |
259 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 241 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
260 scoped_ptr<SyncSession> dummy(new SyncSession( | 242 scoped_ptr<SyncSession> dummy(new SyncSession( |
261 session_context_, this, SyncSourceInfo())); | 243 session_context_, this, SyncSourceInfo())); |
262 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 244 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
263 event.snapshot = dummy->TakeSnapshot(); | 245 event.snapshot = dummy->TakeSnapshot(); |
264 session_context_->NotifyListeners(event); | 246 session_context_->NotifyListeners(event); |
(...skipping 23 matching lines...) Expand all Loading... | |
288 const ConfigurationParams& params) { | 270 const ConfigurationParams& params) { |
289 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 271 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
290 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 272 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
291 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 273 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
292 DCHECK(!params.ready_task.is_null()); | 274 DCHECK(!params.ready_task.is_null()); |
293 CHECK(started_) << "Scheduler must be running to configure."; | 275 CHECK(started_) << "Scheduler must be running to configure."; |
294 SDVLOG(2) << "Reconfiguring syncer."; | 276 SDVLOG(2) << "Reconfiguring syncer."; |
295 | 277 |
296 // Only one configuration is allowed at a time. Verify we're not waiting | 278 // Only one configuration is allowed at a time. Verify we're not waiting |
297 // for a pending configure job. | 279 // for a pending configure job. |
298 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | 280 DCHECK(!pending_configure_job_); |
299 | 281 |
300 ModelSafeRoutingInfo restricted_routes; | 282 ModelSafeRoutingInfo restricted_routes; |
301 BuildModelSafeParams(params.types_to_download, | 283 BuildModelSafeParams(params.types_to_download, |
302 params.routing_info, | 284 params.routing_info, |
303 &restricted_routes); | 285 &restricted_routes); |
304 session_context_->set_routing_info(restricted_routes); | 286 session_context_->set_routing_info(restricted_routes); |
305 | 287 |
306 // Only reconfigure if we have types to download. | 288 // Only reconfigure if we have types to download. |
307 if (!params.types_to_download.Empty()) { | 289 if (!params.types_to_download.Empty()) { |
308 DCHECK(!restricted_routes.empty()); | 290 DCHECK(!restricted_routes.empty()); |
309 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 291 pending_configure_job_.reset(new SyncSessionJob( |
310 SyncSessionJob::CONFIGURATION, | 292 SyncSessionJob::CONFIGURATION, |
311 TimeTicks::Now(), | 293 TimeTicks::Now(), |
312 SyncSourceInfo(params.source, | 294 SyncSourceInfo(params.source, |
313 ModelSafeRoutingInfoToInvalidationMap( | 295 ModelSafeRoutingInfoToInvalidationMap( |
314 restricted_routes, | 296 restricted_routes, |
315 std::string())), | 297 std::string())), |
316 params)); | 298 params)); |
317 bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); | 299 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); |
318 | 300 |
319 // If we failed, the job would have been saved as the pending configure | 301 // If we failed, the job would have been saved as the pending configure |
320 // job and a wait interval would have been set. | 302 // job and a wait interval would have been set. |
321 if (!succeeded) { | 303 if (!succeeded) { |
322 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); | 304 DCHECK(pending_configure_job_); |
323 return false; | 305 return false; |
306 } else { | |
307 DCHECK(!pending_configure_job_); | |
324 } | 308 } |
325 } else { | 309 } else { |
326 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 310 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
327 params.ready_task.Run(); | 311 params.ready_task.Run(); |
328 } | 312 } |
329 | 313 |
330 return true; | 314 return true; |
331 } | 315 } |
332 | 316 |
333 SyncSchedulerImpl::JobProcessDecision | 317 SyncSchedulerImpl::JobProcessDecision |
(...skipping 21 matching lines...) Expand all Loading... | |
355 | 339 |
356 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 340 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
357 if (job.purpose() == SyncSessionJob::NUDGE) { | 341 if (job.purpose() == SyncSessionJob::NUDGE) { |
358 if (mode_ == CONFIGURATION_MODE) | 342 if (mode_ == CONFIGURATION_MODE) |
359 return SAVE; | 343 return SAVE; |
360 | 344 |
361 // If we already had one nudge then just drop this nudge. We will retry | 345 // If we already had one nudge then just drop this nudge. We will retry |
362 // later when the timer runs out. | 346 // later when the timer runs out. |
363 if (priority == NORMAL_PRIORITY) | 347 if (priority == NORMAL_PRIORITY) |
364 return wait_interval_->had_nudge ? DROP : CONTINUE; | 348 return wait_interval_->had_nudge ? DROP : CONTINUE; |
365 else // We are here because timer ran out. So retry. | 349 else // We are here because timer ran out. So retry. |
tim (not reviewing)
2013/04/04 18:07:30
Just noticed this clause is redundant and somewhat
rlarocque
2013/04/05 00:48:51
I was planning to leave it alone for now. There's
| |
366 return CONTINUE; | 350 return CONTINUE; |
367 } | 351 } |
368 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; | 352 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; |
369 } | 353 } |
370 | 354 |
371 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 355 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
372 const SyncSessionJob& job, | 356 const SyncSessionJob& job, |
373 JobPriority priority) { | 357 JobPriority priority) { |
374 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 358 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
375 | 359 |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
442 // Decision now rests on state of auth tokens. | 426 // Decision now rests on state of auth tokens. |
443 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 427 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
444 return CONTINUE; | 428 return CONTINUE; |
445 | 429 |
446 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 430 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
447 // Running the job would require updated auth, so we can't honour | 431 // Running the job would require updated auth, so we can't honour |
448 // job.scheduled_start(). | 432 // job.scheduled_start(). |
449 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | 433 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; |
450 } | 434 } |
451 | 435 |
452 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { | |
453 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; | |
454 if (is_nudge && pending_nudge_) { | |
455 SDVLOG(2) << "Coalescing a pending nudge"; | |
456 // TODO(tim): This basically means we never use the more-careful coalescing | |
457 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
458 // times, because we're calling this function first. Pull this out | |
459 // into a function to coalesce + set start times and reuse. | |
460 pending_nudge_->CoalesceSources(job->source_info()); | |
461 return; | |
462 } | |
463 | |
464 scoped_ptr<SyncSessionJob> job_to_save = job->Clone(); | |
465 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
466 // This job should be made the new canary. | |
467 if (is_nudge) { | |
468 pending_nudge_ = job_to_save.get(); | |
469 } else { | |
470 SDVLOG(2) << "Saving a configuration job"; | |
471 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
472 DCHECK(!wait_interval_->pending_configure_job); | |
473 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
474 DCHECK(!job->config_params().ready_task.is_null()); | |
475 // The only nudge that could exist is a scheduled canary nudge. | |
476 DCHECK(!unscheduled_nudge_storage_.get()); | |
477 if (pending_nudge_) { | |
478 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
479 unscheduled_nudge_storage_ = pending_nudge_->Clone(); | |
480 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
481 } | |
482 wait_interval_->pending_configure_job = job_to_save.get(); | |
483 } | |
484 TimeDelta length = | |
485 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
486 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
487 TimeDelta::FromSeconds(0) : length; | |
488 RestartWaiting(job_to_save.Pass()); | |
489 return; | |
490 } | |
491 | |
492 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
493 // when we're not in a WaitInterval. See bug 147736. | |
494 DCHECK(is_nudge); | |
495 // There may or may not be a pending_configure_job. Either way this nudge | |
496 // is unschedulable. | |
497 pending_nudge_ = job_to_save.get(); | |
498 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
499 } | |
500 | |
501 // Functor for std::find_if to search by ModelSafeGroup. | |
502 struct ModelSafeWorkerGroupIs { | |
503 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | |
504 bool operator()(ModelSafeWorker* w) { | |
505 return group == w->GetModelSafeGroup(); | |
506 } | |
507 ModelSafeGroup group; | |
508 }; | |
509 | |
510 void SyncSchedulerImpl::ScheduleNudgeAsync( | 436 void SyncSchedulerImpl::ScheduleNudgeAsync( |
511 const TimeDelta& desired_delay, | 437 const TimeDelta& desired_delay, |
512 NudgeSource source, ModelTypeSet types, | 438 NudgeSource source, ModelTypeSet types, |
513 const tracked_objects::Location& nudge_location) { | 439 const tracked_objects::Location& nudge_location) { |
514 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 440 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
515 SDVLOG_LOC(nudge_location, 2) | 441 SDVLOG_LOC(nudge_location, 2) |
516 << "Nudge scheduled with delay " | 442 << "Nudge scheduled with delay " |
517 << desired_delay.InMilliseconds() << " ms, " | 443 << desired_delay.InMilliseconds() << " ms, " |
518 << "source " << GetNudgeSourceString(source) << ", " | 444 << "source " << GetNudgeSourceString(source) << ", " |
519 << "types " << ModelTypeSetToString(types); | 445 << "types " << ModelTypeSetToString(types); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
579 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 505 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
580 SyncSessionJob::NUDGE, | 506 SyncSessionJob::NUDGE, |
581 TimeTicks::Now() + delay, | 507 TimeTicks::Now() + delay, |
582 info, | 508 info, |
583 ConfigurationParams())); | 509 ConfigurationParams())); |
584 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); | 510 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); |
585 SDVLOG(2) << "Should run " | 511 SDVLOG(2) << "Should run " |
586 << SyncSessionJob::GetPurposeString(job->purpose()) | 512 << SyncSessionJob::GetPurposeString(job->purpose()) |
587 << " in mode " << GetModeString(mode_) | 513 << " in mode " << GetModeString(mode_) |
588 << ": " << GetDecisionString(decision); | 514 << ": " << GetDecisionString(decision); |
589 if (decision != CONTINUE) { | 515 if (decision == DROP) { |
590 // End of the line, though we may save the job for later. | |
591 if (decision == SAVE) { | |
592 HandleSaveJobDecision(job.Pass()); | |
593 } else { | |
594 DCHECK_EQ(decision, DROP); | |
595 } | |
596 return; | 516 return; |
597 } | 517 } |
598 | 518 |
599 if (pending_nudge_) { | 519 // Try to coalesce in both SAVE and CONTINUE cases. |
600 SDVLOG(2) << "Rescheduling pending nudge"; | 520 if (pending_nudge_job_) { |
601 pending_nudge_->CoalesceSources(job->source_info()); | 521 pending_nudge_job_->CoalesceSources(job->source_info()); |
602 // Choose the start time as the earliest of the 2. Note that this means | 522 if (decision == CONTINUE) { |
603 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) | 523 // Only update the scheduled_start if we're going to reschedule. |
604 // but a nudge is already scheduled to go out, we'll send the (tab) commit | 524 pending_nudge_job_->set_scheduled_start( |
605 // without waiting. | 525 std::min(job->scheduled_start(), |
606 pending_nudge_->set_scheduled_start( | 526 pending_nudge_job_->scheduled_start())); |
607 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); | 527 } |
608 // Abandon the old task by cloning and replacing the session. | 528 } else { |
609 // It's possible that by "rescheduling" we're actually taking a job that | 529 pending_nudge_job_ = job.Pass(); |
610 // was previously unscheduled and giving it wings, so take care to reset | |
611 // unscheduled nudge storage. | |
612 job = pending_nudge_->Clone(); | |
613 pending_nudge_ = NULL; | |
614 unscheduled_nudge_storage_.reset(); | |
615 // It's also possible we took a canary job, since we allow one nudge | |
616 // per backoff interval. | |
617 DCHECK(!wait_interval_ || !wait_interval_->had_nudge); | |
618 } | 530 } |
619 | 531 |
620 TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now(); | 532 if (decision == SAVE) { |
533 return; | |
534 } | |
535 | |
536 TimeDelta run_delay = | |
537 pending_nudge_job_->scheduled_start() - TimeTicks::Now(); | |
621 if (run_delay < TimeDelta::FromMilliseconds(0)) | 538 if (run_delay < TimeDelta::FromMilliseconds(0)) |
622 run_delay = TimeDelta::FromMilliseconds(0); | 539 run_delay = TimeDelta::FromMilliseconds(0); |
623 SDVLOG_LOC(nudge_location, 2) | 540 SDVLOG_LOC(nudge_location, 2) |
624 << "Scheduling a nudge with " | 541 << "Scheduling a nudge with " |
625 << run_delay.InMilliseconds() << " ms delay"; | 542 << run_delay.InMilliseconds() << " ms delay"; |
626 | 543 |
627 pending_nudge_ = job.get(); | 544 PostDelayedTask( |
628 PostDelayedTask(nudge_location, "DoSyncSessionJob", | 545 nudge_location, |
629 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), | 546 "DoSyncSessionJob", |
630 weak_ptr_factory_.GetWeakPtr(), | 547 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoNudgeSyncSessionJob), |
631 base::Passed(&job), | 548 weak_ptr_factory_.GetWeakPtr(), |
632 NORMAL_PRIORITY), | 549 NORMAL_PRIORITY), |
633 run_delay); | 550 run_delay); |
634 } | 551 } |
635 | 552 |
636 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 553 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
637 switch (mode) { | 554 switch (mode) { |
638 ENUM_CASE(CONFIGURATION_MODE); | 555 ENUM_CASE(CONFIGURATION_MODE); |
639 ENUM_CASE(NORMAL_MODE); | 556 ENUM_CASE(NORMAL_MODE); |
640 } | 557 } |
641 return ""; | 558 return ""; |
642 } | 559 } |
(...skipping 16 matching lines...) Expand all Loading... | |
659 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 576 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
660 if (!started_) { | 577 if (!started_) { |
661 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 578 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
662 return; | 579 return; |
663 } | 580 } |
664 // This cancels the previous task, if one existed. | 581 // This cancels the previous task, if one existed. |
665 pending_wakeup_.Reset(task); | 582 pending_wakeup_.Reset(task); |
666 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); | 583 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); |
667 } | 584 } |
668 | 585 |
669 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, | 586 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job, |
670 JobPriority priority) { | 587 JobPriority priority) { |
671 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 588 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
672 if (job->purpose() == SyncSessionJob::NUDGE) { | |
673 pending_nudge_ = NULL; | |
674 } | |
675 | 589 |
676 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 590 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
677 JobProcessDecision decision = DecideOnJob(*job, priority); | 591 JobProcessDecision decision = DecideOnJob(*job, priority); |
678 SDVLOG(2) << "Should run " | 592 SDVLOG(2) << "Should run " |
679 << SyncSessionJob::GetPurposeString(job->purpose()) | 593 << SyncSessionJob::GetPurposeString(job->purpose()) |
680 << " in mode " << GetModeString(mode_) | 594 << " in mode " << GetModeString(mode_) |
681 << " with source " << job->source_info().updates_source | 595 << " with source " << job->source_info().updates_source |
682 << ": " << GetDecisionString(decision); | 596 << ": " << GetDecisionString(decision); |
683 if (decision != CONTINUE) { | 597 if (decision != CONTINUE) { |
684 if (decision == SAVE) { | 598 if (decision == SAVE) { |
685 HandleSaveJobDecision(job.Pass()); | 599 if (job->purpose() == SyncSessionJob::CONFIGURATION) { |
600 pending_configure_job_ = job.Pass(); | |
601 | |
602 // It's very unlikely, but possible, that the WaitInterval's wakeup task | |
603 // isn't actually active at this point. Sometimes the WaitInterval gets | |
604 // preempted by a nudge-while-in-backoff. We can't just assume that | |
605 // there's a task waiting to wake us up once the WaitInterval expires; | |
606 // we need to ensure it by rescheduling the WaitInterval (while taking | |
607 // into account any time already spent waiting, of course). | |
608 ResumeWaiting(); | |
rlarocque
2013/04/02 23:01:12
Here's a paragraph that I had originally intended
tim (not reviewing)
2013/04/04 18:07:30
It looks like we actually keep the WaitInterval ti
| |
609 } else { | |
610 pending_nudge_job_ = job.Pass(); | |
611 } | |
686 } else { | 612 } else { |
687 DCHECK_EQ(decision, DROP); | 613 DCHECK_EQ(decision, DROP); |
688 } | 614 } |
689 return false; | 615 return false; |
690 } | 616 } |
691 | 617 |
692 DVLOG(2) << "Creating sync session with routes " | 618 DVLOG(2) << "Creating sync session with routes " |
693 << ModelSafeRoutingInfoToString(session_context_->routing_info()) | 619 << ModelSafeRoutingInfoToString(session_context_->routing_info()) |
694 << "and purpose " << job->purpose(); | 620 << "and purpose " << job->purpose(); |
695 SyncSession session(session_context_, this, job->source_info()); | 621 SyncSession session(session_context_, this, job->source_info()); |
696 bool premature_exit = !syncer_->SyncShare(&session, | 622 bool premature_exit = !syncer_->SyncShare(&session, |
697 job->start_step(), | 623 job->start_step(), |
698 job->end_step()); | 624 job->end_step()); |
699 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 625 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; |
700 | 626 |
701 bool success = FinishSyncSessionJob(job.get(), | 627 bool success = FinishSyncSessionJob(job.get(), |
702 premature_exit, | 628 premature_exit, |
703 &session); | 629 &session); |
704 | 630 |
705 if (IsSyncingCurrentlySilenced()) { | 631 if (IsSyncingCurrentlySilenced()) { |
706 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | 632 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
707 // If we're here, it's because |job| was silenced until a server specified | 633 // If we're here, it's because |job| was silenced until a server specified |
708 // time. (Note, it had to be |job|, because DecideOnJob would not permit | 634 // time. (Note, it had to be |job|, because DecideOnJob would not permit |
709 // any job through while in WaitInterval::THROTTLED). | 635 // any job through while in WaitInterval::THROTTLED). |
710 scoped_ptr<SyncSessionJob> clone = job->Clone(); | 636 if (job->purpose() == SyncSessionJob::NUDGE) |
711 if (clone->purpose() == SyncSessionJob::NUDGE) | 637 pending_nudge_job_ = job.Pass(); |
712 pending_nudge_ = clone.get(); | 638 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
713 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) | 639 pending_configure_job_ = job.Pass(); |
714 wait_interval_->pending_configure_job = clone.get(); | |
715 else | 640 else |
716 NOTREACHED(); | 641 NOTREACHED(); |
717 | 642 |
718 RestartWaiting(clone.Pass()); | 643 RestartWaiting(); |
719 return success; | 644 return success; |
720 } | 645 } |
721 | 646 |
722 if (!success) | 647 if (!success) |
723 ScheduleNextSync(job.Pass(), &session); | 648 ScheduleNextSync(job.Pass(), &session); |
724 | 649 |
725 return success; | 650 return success; |
726 } | 651 } |
727 | 652 |
653 bool SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { | |
tim (not reviewing)
2013/04/04 00:22:11
Digging around, it looks like these Do*SSJ methods
rlarocque
2013/04/04 00:59:48
Your suggestion would work for now, but it wouldn'
| |
654 return DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority); | |
655 } | |
656 | |
657 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { | |
658 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority); | |
659 } | |
660 | |
728 bool SyncSchedulerImpl::ShouldPoll() { | 661 bool SyncSchedulerImpl::ShouldPoll() { |
729 if (wait_interval_.get()) { | 662 if (wait_interval_.get()) { |
730 SDVLOG(2) << "Not running poll in wait interval."; | 663 SDVLOG(2) << "Not running poll in wait interval."; |
731 return false; | 664 return false; |
732 } | 665 } |
733 | 666 |
734 if (mode_ == CONFIGURATION_MODE) { | 667 if (mode_ == CONFIGURATION_MODE) { |
735 SDVLOG(2) << "Not running poll in configuration mode."; | 668 SDVLOG(2) << "Not running poll in configuration mode."; |
736 return false; | 669 return false; |
737 } | 670 } |
738 | 671 |
739 // TODO(rlarocque): Refactor decision-making logic common to all types | 672 // TODO(rlarocque): Refactor decision-making logic common to all types |
740 // of jobs into a shared function. | 673 // of jobs into a shared function. |
741 | 674 |
742 if (session_context_->connection_manager()->HasInvalidAuthToken()) { | 675 if (session_context_->connection_manager()->HasInvalidAuthToken()) { |
743 SDVLOG(2) << "Not running poll because auth token is invalid."; | 676 SDVLOG(2) << "Not running poll because auth token is invalid."; |
744 return false; | 677 return false; |
745 } | 678 } |
746 | 679 |
747 return true; | 680 return true; |
748 } | 681 } |
749 | 682 |
750 void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { | 683 void SyncSchedulerImpl::DoPollSyncSessionJob() { |
751 DCHECK_EQ(job->purpose(), SyncSessionJob::POLL); | 684 ModelSafeRoutingInfo r; |
685 ModelTypeInvalidationMap invalidation_map = | |
686 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | |
687 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | |
688 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | |
689 TimeTicks::Now(), | |
690 info, | |
691 ConfigurationParams())); | |
752 | 692 |
753 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 693 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
754 | 694 |
755 if (!ShouldPoll()) | 695 if (!ShouldPoll()) |
756 return; | 696 return; |
757 | 697 |
758 DVLOG(2) << "Polling with routes " | 698 DVLOG(2) << "Polling with routes " |
759 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 699 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
760 SyncSession session(session_context_, this, job->source_info()); | 700 SyncSession session(session_context_, this, job->source_info()); |
761 bool premature_exit = !syncer_->SyncShare(&session, | 701 bool premature_exit = !syncer_->SyncShare(&session, |
762 job->start_step(), | 702 job->start_step(), |
763 job->end_step()); | 703 job->end_step()); |
764 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 704 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; |
765 | 705 |
766 FinishSyncSessionJob(job.get(), premature_exit, &session); | 706 FinishSyncSessionJob(job.get(), premature_exit, &session); |
767 | 707 |
768 if (IsSyncingCurrentlySilenced()) { | 708 if (IsSyncingCurrentlySilenced()) { |
769 // This will start the countdown to unthrottle. Other kinds of jobs would | 709 // Normally we would only call RestartWaiting() if we had a |
770 // schedule themselves as the post-unthrottle canary. A poll job is not | 710 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's |
771 // that urgent, so it does not get to be the canary. We still need to start | 711 // possible that neither is set. We create the wait interval anyway because |
772 // the timer regardless. Otherwise there could be no one to clear the | 712 // we need it to make sure we get unthrottled on time. |
773 // WaitInterval when the throttling expires. | 713 RestartWaiting(); |
774 RestartWaiting(scoped_ptr<SyncSessionJob>()); | |
775 } | 714 } |
776 } | 715 } |
777 | 716 |
778 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 717 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
779 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 718 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
780 | 719 |
781 // We are interested in recording time between local nudges for datatypes. | 720 // We are interested in recording time between local nudges for datatypes. |
782 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 721 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
783 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 722 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
784 return; | 723 return; |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
847 // appears that this was our nudge for this interval, and it failed. | 786 // appears that this was our nudge for this interval, and it failed. |
848 // | 787 // |
849 // Note: This does not prevent us from running canary jobs. For example, | 788 // Note: This does not prevent us from running canary jobs. For example, |
850 // an IP address change might still result in another nudge being executed | 789 // an IP address change might still result in another nudge being executed |
851 // during this backoff interval. | 790 // during this backoff interval. |
852 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; | 791 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
853 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); | 792 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); |
854 DCHECK(!wait_interval_->had_nudge); | 793 DCHECK(!wait_interval_->had_nudge); |
855 | 794 |
856 wait_interval_->had_nudge = true; | 795 wait_interval_->had_nudge = true; |
857 DCHECK(!pending_nudge_); | 796 DCHECK(!pending_nudge_job_); |
858 | 797 |
859 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); | 798 pending_nudge_job_ = finished_job.Pass(); |
860 pending_nudge_ = new_job.get(); | 799 RestartWaiting(); |
861 RestartWaiting(new_job.Pass()); | |
862 } else { | 800 } else { |
863 // Either this is the first failure or a consecutive failure after our | 801 // Either this is the first failure or a consecutive failure after our |
864 // backoff timer expired. We handle it the same way in either case. | 802 // backoff timer expired. We handle it the same way in either case. |
865 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 803 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
866 HandleContinuationError(finished_job.Pass(), session); | 804 HandleContinuationError(finished_job.Pass(), session); |
867 } | 805 } |
868 } | 806 } |
869 | 807 |
870 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 808 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
871 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 809 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
872 | 810 |
873 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 811 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
874 syncer_short_poll_interval_seconds_ : | 812 syncer_short_poll_interval_seconds_ : |
875 syncer_long_poll_interval_seconds_; | 813 syncer_long_poll_interval_seconds_; |
876 bool rate_changed = !poll_timer_.IsRunning() || | 814 bool rate_changed = !poll_timer_.IsRunning() || |
877 poll != poll_timer_.GetCurrentDelay(); | 815 poll != poll_timer_.GetCurrentDelay(); |
878 | 816 |
879 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) | 817 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
880 poll_timer_.Reset(); | 818 poll_timer_.Reset(); |
881 | 819 |
882 if (!rate_changed) | 820 if (!rate_changed) |
883 return; | 821 return; |
884 | 822 |
885 // Adjust poll rate. | 823 // Adjust poll rate. |
886 poll_timer_.Stop(); | 824 poll_timer_.Stop(); |
887 poll_timer_.Start(FROM_HERE, poll, this, | 825 poll_timer_.Start(FROM_HERE, poll, this, |
888 &SyncSchedulerImpl::PollTimerCallback); | 826 &SyncSchedulerImpl::PollTimerCallback); |
889 } | 827 } |
890 | 828 |
891 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { | 829 void SyncSchedulerImpl::ResumeWaiting() { |
830 TimeDelta length = | |
831 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
832 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
833 TimeDelta::FromSeconds(0) : length; | |
834 RestartWaiting(); | |
835 } | |
836 | |
837 void SyncSchedulerImpl::RestartWaiting() { | |
892 CHECK(wait_interval_.get()); | 838 CHECK(wait_interval_.get()); |
893 wait_interval_->timer.Stop(); | 839 wait_interval_->timer.Stop(); |
894 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); | 840 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
895 if (wait_interval_->mode == WaitInterval::THROTTLED) { | 841 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
896 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, | 842 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, |
897 weak_ptr_factory_.GetWeakPtr(), | 843 weak_ptr_factory_.GetWeakPtr())); |
898 base::Passed(&job))); | |
899 | 844 |
900 } else { | 845 } else { |
901 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 846 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::TryCanaryJob, |
902 weak_ptr_factory_.GetWeakPtr(), | 847 weak_ptr_factory_.GetWeakPtr())); |
903 base::Passed(&job))); | |
904 } | 848 } |
905 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 849 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
906 pending_wakeup_.callback()); | 850 pending_wakeup_.callback()); |
907 } | 851 } |
908 | 852 |
909 void SyncSchedulerImpl::HandleContinuationError( | 853 void SyncSchedulerImpl::HandleContinuationError( |
910 scoped_ptr<SyncSessionJob> old_job, | 854 scoped_ptr<SyncSessionJob> old_job, |
911 SyncSession* session) { | 855 SyncSession* session) { |
912 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 856 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
913 | 857 |
914 TimeDelta length = delay_provider_->GetDelay( | 858 TimeDelta length = delay_provider_->GetDelay( |
915 IsBackingOff() ? wait_interval_->length : | 859 IsBackingOff() ? wait_interval_->length : |
916 delay_provider_->GetInitialDelay( | 860 delay_provider_->GetInitialDelay( |
917 session->status_controller().model_neutral_state())); | 861 session->status_controller().model_neutral_state())); |
918 | 862 |
919 SDVLOG(2) << "In handle continuation error with " | 863 SDVLOG(2) << "In handle continuation error with " |
920 << SyncSessionJob::GetPurposeString(old_job->purpose()) | 864 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
921 << " job. The time delta(ms) is " | 865 << " job. The time delta(ms) is " |
922 << length.InMilliseconds(); | 866 << length.InMilliseconds(); |
923 | 867 |
924 // This will reset the had_nudge variable as well. | 868 // This will reset the had_nudge variable as well. |
925 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 869 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
926 length)); | 870 length)); |
927 NotifyRetryTime(base::Time::Now() + length); | 871 NotifyRetryTime(base::Time::Now() + length); |
928 scoped_ptr<SyncSessionJob> new_job(old_job->Clone()); | 872 old_job->set_scheduled_start(TimeTicks::Now() + length); |
929 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
930 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | 873 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { |
931 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 874 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
932 // Config params should always get set. | 875 // Config params should always get set. |
933 DCHECK(!old_job->config_params().ready_task.is_null()); | 876 DCHECK(!old_job->config_params().ready_task.is_null()); |
934 wait_interval_->pending_configure_job = new_job.get(); | 877 DCHECK(!pending_configure_job_); |
878 pending_configure_job_ = old_job.Pass(); | |
935 } else { | 879 } else { |
936 // We are not in configuration mode. So wait_interval's pending job | 880 // We're not in configure mode so we should not have a configure job. |
937 // should be null. | 881 DCHECK(!pending_configure_job_); |
938 DCHECK(wait_interval_->pending_configure_job == NULL); | 882 DCHECK(!pending_nudge_job_); |
939 DCHECK(!pending_nudge_); | 883 pending_nudge_job_ = old_job.Pass(); |
940 pending_nudge_ = new_job.get(); | |
941 } | 884 } |
942 | 885 |
943 RestartWaiting(new_job.Pass()); | 886 RestartWaiting(); |
944 } | 887 } |
945 | 888 |
946 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 889 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
947 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 890 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
948 DCHECK(weak_handle_this_.IsInitialized()); | 891 DCHECK(weak_handle_this_.IsInitialized()); |
949 SDVLOG(3) << "Posting StopImpl"; | 892 SDVLOG(3) << "Posting StopImpl"; |
950 weak_handle_this_.Call(FROM_HERE, | 893 weak_handle_this_.Call(FROM_HERE, |
951 &SyncSchedulerImpl::StopImpl, | 894 &SyncSchedulerImpl::StopImpl, |
952 callback); | 895 callback); |
953 } | 896 } |
954 | 897 |
955 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 898 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
956 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 899 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
957 SDVLOG(2) << "StopImpl called"; | 900 SDVLOG(2) << "StopImpl called"; |
958 | 901 |
959 // Kill any in-flight method calls. | 902 // Kill any in-flight method calls. |
960 weak_ptr_factory_.InvalidateWeakPtrs(); | 903 weak_ptr_factory_.InvalidateWeakPtrs(); |
961 wait_interval_.reset(); | 904 wait_interval_.reset(); |
962 NotifyRetryTime(base::Time()); | 905 NotifyRetryTime(base::Time()); |
963 poll_timer_.Stop(); | 906 poll_timer_.Stop(); |
964 pending_nudge_ = NULL; | |
965 unscheduled_nudge_storage_.reset(); | |
966 pending_wakeup_.Cancel(); | 907 pending_wakeup_.Cancel(); |
tim (not reviewing)
2013/04/04 00:22:11
Can we rename pending_wakeup_ to pending_wakeup_ev
rlarocque
2013/04/04 00:59:48
Done.
| |
908 pending_nudge_job_.reset(); | |
909 pending_configure_job_.reset(); | |
967 if (started_) { | 910 if (started_) { |
968 started_ = false; | 911 started_ = false; |
969 } | 912 } |
970 if (!callback.is_null()) | 913 if (!callback.is_null()) |
971 callback.Run(); | 914 callback.Run(); |
972 } | 915 } |
973 | 916 |
974 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { | 917 // This is the only place where we invoke DoSyncSessionJob with canary |
918 // privileges. Everyone else should use NORMAL_PRIORITY. | |
919 void SyncSchedulerImpl::TryCanaryJob() { | |
975 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 920 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
976 SDVLOG(2) << "Do canary job"; | |
977 | 921 |
978 // This is the only place where we invoke DoSyncSessionJob with canary | 922 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) { |
tim (not reviewing)
2013/04/04 18:07:30
There's an implicit point here that canary jobs ha
| |
979 // privileges. Everyone else should use NORMAL_PRIORITY. | 923 SDVLOG(2) << "Found pending configure job; will run as canary"; |
980 DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); | 924 DoConfigurationSyncSessionJob(CANARY_PRIORITY); |
981 } | 925 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) { |
982 | 926 SDVLOG(2) << "Found pending nudge job; will run as canary"; |
983 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { | 927 DoNudgeSyncSessionJob(CANARY_PRIORITY); |
984 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 928 } else { |
985 // If we find a scheduled pending_ job, abandon the old one and return a | 929 SDVLOG(2) << "Found no work to do; will not run a canary"; |
986 // a clone. If unscheduled, just hand over ownership. | |
987 scoped_ptr<SyncSessionJob> candidate; | |
988 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | |
989 && wait_interval_->pending_configure_job) { | |
990 SDVLOG(2) << "Found pending configure job"; | |
991 candidate = | |
992 wait_interval_->pending_configure_job->Clone().Pass(); | |
993 wait_interval_->pending_configure_job = candidate.get(); | |
994 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
995 SDVLOG(2) << "Found pending nudge job"; | |
996 candidate = pending_nudge_->Clone(); | |
997 pending_nudge_ = candidate.get(); | |
998 unscheduled_nudge_storage_.reset(); | |
999 } | 930 } |
1000 // If we took a job and there's a wait interval, we took the pending canary. | |
1001 if (candidate && wait_interval_) | |
1002 wait_interval_->timer.Stop(); | |
1003 return candidate.Pass(); | |
1004 } | 931 } |
1005 | 932 |
1006 void SyncSchedulerImpl::PollTimerCallback() { | 933 void SyncSchedulerImpl::PollTimerCallback() { |
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 934 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1008 ModelSafeRoutingInfo r; | |
1009 ModelTypeInvalidationMap invalidation_map = | |
1010 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | |
1011 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | |
1012 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | |
1013 TimeTicks::Now(), | |
1014 info, | |
1015 ConfigurationParams())); | |
1016 if (no_scheduling_allowed_) { | 935 if (no_scheduling_allowed_) { |
1017 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in | 936 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in |
1018 // functions that are called only on the sync thread. This function is also | 937 // functions that are called only on the sync thread. This function is also |
1019 // called only on the sync thread, and only when it is posted by an expiring | 938 // called only on the sync thread, and only when it is posted by an expiring |
1020 // timer. If we find that no_scheduling_allowed_ is set here, then | 939 // timer. If we find that no_scheduling_allowed_ is set here, then |
1021 // something is very wrong. Maybe someone mistakenly called us directly, or | 940 // something is very wrong. Maybe someone mistakenly called us directly, or |
1022 // mishandled the book-keeping for no_scheduling_allowed_. | 941 // mishandled the book-keeping for no_scheduling_allowed_. |
1023 NOTREACHED() << "Illegal to schedule job while session in progress."; | 942 NOTREACHED() << "Illegal to schedule job while session in progress."; |
1024 return; | 943 return; |
1025 } | 944 } |
1026 | 945 |
1027 DoPollSyncSessionJob(job.Pass()); | 946 DoPollSyncSessionJob(); |
1028 } | 947 } |
1029 | 948 |
1030 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { | 949 void SyncSchedulerImpl::Unthrottle() { |
1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 950 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1032 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 951 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1033 DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() || | |
1034 wait_interval_->pending_configure_job == to_be_canary.get()); | |
1035 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") | |
1036 << "canary."; | |
1037 | 952 |
1038 // We're no longer throttled, so clear the wait interval. | 953 // We're no longer throttled, so clear the wait interval. |
1039 wait_interval_.reset(); | 954 wait_interval_.reset(); |
1040 NotifyRetryTime(base::Time()); | 955 NotifyRetryTime(base::Time()); |
1041 | 956 |
1042 // We treat this as a 'canary' in the sense that it was originally scheduled | 957 // We treat this as a 'canary' in the sense that it was originally scheduled |
1043 // to run some time ago, failed, and we now want to retry, versus a job that | 958 // to run some time ago, failed, and we now want to retry, versus a job that |
1044 // was just created (e.g via ScheduleNudgeImpl). The main implication is | 959 // was just created (e.g via ScheduleNudgeImpl). The main implication is |
1045 // that we're careful to update routing info (etc) with such potentially | 960 // that we're careful to update routing info (etc) with such potentially |
1046 // stale canary jobs. | 961 // stale canary jobs. |
1047 if (to_be_canary.get()) { | 962 TryCanaryJob(); |
1048 DoCanaryJob(to_be_canary.Pass()); | |
1049 } else { | |
1050 DCHECK(!unscheduled_nudge_storage_.get()); | |
1051 } | |
1052 } | 963 } |
1053 | 964 |
1054 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 965 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1055 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 966 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1056 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 967 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1057 } | 968 } |
1058 | 969 |
1059 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { | 970 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { |
1060 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); | 971 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); |
1061 event.retry_time = retry_time; | 972 event.retry_time = retry_time; |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1140 | 1051 |
1141 #undef SDVLOG_LOC | 1052 #undef SDVLOG_LOC |
1142 | 1053 |
1143 #undef SDVLOG | 1054 #undef SDVLOG |
1144 | 1055 |
1145 #undef SLOG | 1056 #undef SLOG |
1146 | 1057 |
1147 #undef ENUM_CASE | 1058 #undef ENUM_CASE |
1148 | 1059 |
1149 } // namespace syncer | 1060 } // namespace syncer |
OLD | NEW |