Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 13422003: sync: Refactor job ownership in SyncScheduler (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Attempt to fix subtle problem with nudge-in-backoff handling Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 : source(source), 77 : source(source),
78 types_to_download(types_to_download), 78 types_to_download(types_to_download),
79 routing_info(routing_info), 79 routing_info(routing_info),
80 ready_task(ready_task) { 80 ready_task(ready_task) {
81 DCHECK(!ready_task.is_null()); 81 DCHECK(!ready_task.is_null());
82 } 82 }
83 ConfigurationParams::~ConfigurationParams() {} 83 ConfigurationParams::~ConfigurationParams() {}
84 84
85 SyncSchedulerImpl::WaitInterval::WaitInterval() 85 SyncSchedulerImpl::WaitInterval::WaitInterval()
86 : mode(UNKNOWN), 86 : mode(UNKNOWN),
87 had_nudge(false), 87 had_nudge(false) {}
88 pending_configure_job(NULL) {}
89 88
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 89 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
91 : mode(mode), had_nudge(false), length(length), 90 : mode(mode), had_nudge(false), length(length) {}
92 pending_configure_job(NULL) {}
93 91
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 92 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
95 93
96 #define ENUM_CASE(x) case x: return #x; break; 94 #define ENUM_CASE(x) case x: return #x; break;
97 95
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 96 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
99 switch (mode) { 97 switch (mode) {
100 ENUM_CASE(UNKNOWN); 98 ENUM_CASE(UNKNOWN);
101 ENUM_CASE(EXPONENTIAL_BACKOFF); 99 ENUM_CASE(EXPONENTIAL_BACKOFF);
102 ENUM_CASE(THROTTLED); 100 ENUM_CASE(THROTTLED);
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
162 name_(name), 160 name_(name),
163 sync_loop_(MessageLoop::current()), 161 sync_loop_(MessageLoop::current()),
164 started_(false), 162 started_(false),
165 syncer_short_poll_interval_seconds_( 163 syncer_short_poll_interval_seconds_(
166 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 164 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
167 syncer_long_poll_interval_seconds_( 165 syncer_long_poll_interval_seconds_(
168 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 166 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
169 sessions_commit_delay_( 167 sessions_commit_delay_(
170 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 168 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
171 mode_(NORMAL_MODE), 169 mode_(NORMAL_MODE),
172 // Start with assuming everything is fine with the connection.
173 // At the end of the sync cycle we would have the correct status.
174 pending_nudge_(NULL),
175 delay_provider_(delay_provider), 170 delay_provider_(delay_provider),
176 syncer_(syncer), 171 syncer_(syncer),
177 session_context_(context), 172 session_context_(context),
178 no_scheduling_allowed_(false) { 173 no_scheduling_allowed_(false) {
179 DCHECK(sync_loop_); 174 DCHECK(sync_loop_);
180 } 175 }
181 176
182 SyncSchedulerImpl::~SyncSchedulerImpl() { 177 SyncSchedulerImpl::~SyncSchedulerImpl() {
183 DCHECK_EQ(MessageLoop::current(), sync_loop_); 178 DCHECK_EQ(MessageLoop::current(), sync_loop_);
184 StopImpl(base::Closure()); 179 StopImpl(base::Closure());
(...skipping 22 matching lines...) Expand all
207 // 202 //
208 // 1. We're in exponential backoff. 203 // 1. We're in exponential backoff.
209 // 2. We're silenced / throttled. 204 // 2. We're silenced / throttled.
210 // 3. A nudge was saved previously due to not having a valid auth token. 205 // 3. A nudge was saved previously due to not having a valid auth token.
211 // 4. A nudge was scheduled + saved while in configuration mode. 206 // 4. A nudge was scheduled + saved while in configuration mode.
212 // 207 //
213 // In all cases except (2), we want to retry contacting the server. We 208 // In all cases except (2), we want to retry contacting the server. We
214 // call DoCanaryJob to achieve this, and note that nothing -- not even a 209 // call DoCanaryJob to achieve this, and note that nothing -- not even a
215 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that 210 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
216 // has the authority to do that is the Unthrottle timer. 211 // has the authority to do that is the Unthrottle timer.
217 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); 212 TryCanaryJob();
218 if (!pending.get())
219 return;
220 DoCanaryJob(pending.Pass());
221 } 213 }
222 214
223 void SyncSchedulerImpl::Start(Mode mode) { 215 void SyncSchedulerImpl::Start(Mode mode) {
224 DCHECK_EQ(MessageLoop::current(), sync_loop_); 216 DCHECK_EQ(MessageLoop::current(), sync_loop_);
225 std::string thread_name = MessageLoop::current()->thread_name(); 217 std::string thread_name = MessageLoop::current()->thread_name();
226 if (thread_name.empty()) 218 if (thread_name.empty())
227 thread_name = "<Main thread>"; 219 thread_name = "<Main thread>";
228 SDVLOG(2) << "Start called from thread " 220 SDVLOG(2) << "Start called from thread "
229 << thread_name << " with mode " << GetModeString(mode); 221 << thread_name << " with mode " << GetModeString(mode);
230 if (!started_) { 222 if (!started_) {
231 started_ = true; 223 started_ = true;
232 SendInitialSnapshot(); 224 SendInitialSnapshot();
233 } 225 }
234 226
235 DCHECK(!session_context_->account_name().empty()); 227 DCHECK(!session_context_->account_name().empty());
236 DCHECK(syncer_.get()); 228 DCHECK(syncer_.get());
237 Mode old_mode = mode_; 229 Mode old_mode = mode_;
238 mode_ = mode; 230 mode_ = mode;
239 AdjustPolling(NULL); // Will kick start poll timer if needed. 231 AdjustPolling(NULL); // Will kick start poll timer if needed.
240 232
241 if (old_mode != mode_) { 233 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) {
242 // We just changed our mode. See if there are any pending jobs that we could 234 // We just got back to normal mode. Let's try to run the work that was
243 // execute in the new mode. 235 // queued up while we were configuring.
244 if (mode_ == NORMAL_MODE) { 236 DoNudgeSyncSessionJob(NORMAL_PRIORITY);
245 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
246 // has not yet completed.
247 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
248 }
249
250 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
251 if (pending.get()) {
252 SDVLOG(2) << "Executing pending job. Good luck!";
253 DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY);
254 }
255 } 237 }
256 } 238 }
257 239
258 void SyncSchedulerImpl::SendInitialSnapshot() { 240 void SyncSchedulerImpl::SendInitialSnapshot() {
259 DCHECK_EQ(MessageLoop::current(), sync_loop_); 241 DCHECK_EQ(MessageLoop::current(), sync_loop_);
260 scoped_ptr<SyncSession> dummy(new SyncSession( 242 scoped_ptr<SyncSession> dummy(new SyncSession(
261 session_context_, this, SyncSourceInfo())); 243 session_context_, this, SyncSourceInfo()));
262 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 244 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
263 event.snapshot = dummy->TakeSnapshot(); 245 event.snapshot = dummy->TakeSnapshot();
264 session_context_->NotifyListeners(event); 246 session_context_->NotifyListeners(event);
(...skipping 23 matching lines...) Expand all
288 const ConfigurationParams& params) { 270 const ConfigurationParams& params) {
289 DCHECK_EQ(MessageLoop::current(), sync_loop_); 271 DCHECK_EQ(MessageLoop::current(), sync_loop_);
290 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 272 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
291 DCHECK_EQ(CONFIGURATION_MODE, mode_); 273 DCHECK_EQ(CONFIGURATION_MODE, mode_);
292 DCHECK(!params.ready_task.is_null()); 274 DCHECK(!params.ready_task.is_null());
293 CHECK(started_) << "Scheduler must be running to configure."; 275 CHECK(started_) << "Scheduler must be running to configure.";
294 SDVLOG(2) << "Reconfiguring syncer."; 276 SDVLOG(2) << "Reconfiguring syncer.";
295 277
296 // Only one configuration is allowed at a time. Verify we're not waiting 278 // Only one configuration is allowed at a time. Verify we're not waiting
297 // for a pending configure job. 279 // for a pending configure job.
298 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); 280 DCHECK(!pending_configure_job_);
299 281
300 ModelSafeRoutingInfo restricted_routes; 282 ModelSafeRoutingInfo restricted_routes;
301 BuildModelSafeParams(params.types_to_download, 283 BuildModelSafeParams(params.types_to_download,
302 params.routing_info, 284 params.routing_info,
303 &restricted_routes); 285 &restricted_routes);
304 session_context_->set_routing_info(restricted_routes); 286 session_context_->set_routing_info(restricted_routes);
305 287
306 // Only reconfigure if we have types to download. 288 // Only reconfigure if we have types to download.
307 if (!params.types_to_download.Empty()) { 289 if (!params.types_to_download.Empty()) {
308 DCHECK(!restricted_routes.empty()); 290 DCHECK(!restricted_routes.empty());
309 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 291 pending_configure_job_.reset(new SyncSessionJob(
310 SyncSessionJob::CONFIGURATION, 292 SyncSessionJob::CONFIGURATION,
311 TimeTicks::Now(), 293 TimeTicks::Now(),
312 SyncSourceInfo(params.source, 294 SyncSourceInfo(params.source,
313 ModelSafeRoutingInfoToInvalidationMap( 295 ModelSafeRoutingInfoToInvalidationMap(
314 restricted_routes, 296 restricted_routes,
315 std::string())), 297 std::string())),
316 params)); 298 params));
317 bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); 299 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY);
318 300
319 // If we failed, the job would have been saved as the pending configure 301 // If we failed, the job would have been saved as the pending configure
320 // job and a wait interval would have been set. 302 // job and a wait interval would have been set.
321 if (!succeeded) { 303 if (!succeeded) {
322 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); 304 DCHECK(pending_configure_job_);
323 return false; 305 return false;
306 } else {
307 DCHECK(!pending_configure_job_);
324 } 308 }
325 } else { 309 } else {
326 SDVLOG(2) << "No change in routing info, calling ready task directly."; 310 SDVLOG(2) << "No change in routing info, calling ready task directly.";
327 params.ready_task.Run(); 311 params.ready_task.Run();
328 } 312 }
329 313
330 return true; 314 return true;
331 } 315 }
332 316
333 SyncSchedulerImpl::JobProcessDecision 317 SyncSchedulerImpl::JobProcessDecision
(...skipping 21 matching lines...) Expand all
355 339
356 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 340 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
357 if (job.purpose() == SyncSessionJob::NUDGE) { 341 if (job.purpose() == SyncSessionJob::NUDGE) {
358 if (mode_ == CONFIGURATION_MODE) 342 if (mode_ == CONFIGURATION_MODE)
359 return SAVE; 343 return SAVE;
360 344
361 // If we already had one nudge then just drop this nudge. We will retry 345 // If we already had one nudge then just drop this nudge. We will retry
362 // later when the timer runs out. 346 // later when the timer runs out.
363 if (priority == NORMAL_PRIORITY) 347 if (priority == NORMAL_PRIORITY)
364 return wait_interval_->had_nudge ? DROP : CONTINUE; 348 return wait_interval_->had_nudge ? DROP : CONTINUE;
365 else // We are here because timer ran out. So retry. 349 else // We are here because timer ran out. So retry.
tim (not reviewing) 2013/04/04 18:07:30 Just noticed this clause is redundant and somewhat
rlarocque 2013/04/05 00:48:51 I was planning to leave it alone for now. There's
366 return CONTINUE; 350 return CONTINUE;
367 } 351 }
368 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; 352 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE;
369 } 353 }
370 354
371 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 355 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
372 const SyncSessionJob& job, 356 const SyncSessionJob& job,
373 JobPriority priority) { 357 JobPriority priority) {
374 DCHECK_EQ(MessageLoop::current(), sync_loop_); 358 DCHECK_EQ(MessageLoop::current(), sync_loop_);
375 359
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
442 // Decision now rests on state of auth tokens. 426 // Decision now rests on state of auth tokens.
443 if (!session_context_->connection_manager()->HasInvalidAuthToken()) 427 if (!session_context_->connection_manager()->HasInvalidAuthToken())
444 return CONTINUE; 428 return CONTINUE;
445 429
446 SDVLOG(2) << "No valid auth token. Using that to decide on job."; 430 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
447 // Running the job would require updated auth, so we can't honour 431 // Running the job would require updated auth, so we can't honour
448 // job.scheduled_start(). 432 // job.scheduled_start().
449 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; 433 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
450 } 434 }
451 435
452 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
453 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
454 if (is_nudge && pending_nudge_) {
455 SDVLOG(2) << "Coalescing a pending nudge";
456 // TODO(tim): This basically means we never use the more-careful coalescing
457 // logic in ScheduleNudgeImpl that takes the min of the two nudge start
458 // times, because we're calling this function first. Pull this out
459 // into a function to coalesce + set start times and reuse.
460 pending_nudge_->CoalesceSources(job->source_info());
461 return;
462 }
463
464 scoped_ptr<SyncSessionJob> job_to_save = job->Clone();
465 if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
466 // This job should be made the new canary.
467 if (is_nudge) {
468 pending_nudge_ = job_to_save.get();
469 } else {
470 SDVLOG(2) << "Saving a configuration job";
471 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
472 DCHECK(!wait_interval_->pending_configure_job);
473 DCHECK_EQ(mode_, CONFIGURATION_MODE);
474 DCHECK(!job->config_params().ready_task.is_null());
475 // The only nudge that could exist is a scheduled canary nudge.
476 DCHECK(!unscheduled_nudge_storage_.get());
477 if (pending_nudge_) {
478 // Pre-empt the nudge canary and abandon the old nudge (owned by task).
479 unscheduled_nudge_storage_ = pending_nudge_->Clone();
480 pending_nudge_ = unscheduled_nudge_storage_.get();
481 }
482 wait_interval_->pending_configure_job = job_to_save.get();
483 }
484 TimeDelta length =
485 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
486 wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
487 TimeDelta::FromSeconds(0) : length;
488 RestartWaiting(job_to_save.Pass());
489 return;
490 }
491
492 // Note that today there are no cases where we SAVE a CONFIGURATION job
493 // when we're not in a WaitInterval. See bug 147736.
494 DCHECK(is_nudge);
495 // There may or may not be a pending_configure_job. Either way this nudge
496 // is unschedulable.
497 pending_nudge_ = job_to_save.get();
498 unscheduled_nudge_storage_ = job_to_save.Pass();
499 }
500
501 // Functor for std::find_if to search by ModelSafeGroup.
502 struct ModelSafeWorkerGroupIs {
503 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
504 bool operator()(ModelSafeWorker* w) {
505 return group == w->GetModelSafeGroup();
506 }
507 ModelSafeGroup group;
508 };
509
510 void SyncSchedulerImpl::ScheduleNudgeAsync( 436 void SyncSchedulerImpl::ScheduleNudgeAsync(
511 const TimeDelta& desired_delay, 437 const TimeDelta& desired_delay,
512 NudgeSource source, ModelTypeSet types, 438 NudgeSource source, ModelTypeSet types,
513 const tracked_objects::Location& nudge_location) { 439 const tracked_objects::Location& nudge_location) {
514 DCHECK_EQ(MessageLoop::current(), sync_loop_); 440 DCHECK_EQ(MessageLoop::current(), sync_loop_);
515 SDVLOG_LOC(nudge_location, 2) 441 SDVLOG_LOC(nudge_location, 2)
516 << "Nudge scheduled with delay " 442 << "Nudge scheduled with delay "
517 << desired_delay.InMilliseconds() << " ms, " 443 << desired_delay.InMilliseconds() << " ms, "
518 << "source " << GetNudgeSourceString(source) << ", " 444 << "source " << GetNudgeSourceString(source) << ", "
519 << "types " << ModelTypeSetToString(types); 445 << "types " << ModelTypeSetToString(types);
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
579 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 505 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
580 SyncSessionJob::NUDGE, 506 SyncSessionJob::NUDGE,
581 TimeTicks::Now() + delay, 507 TimeTicks::Now() + delay,
582 info, 508 info,
583 ConfigurationParams())); 509 ConfigurationParams()));
584 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); 510 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY);
585 SDVLOG(2) << "Should run " 511 SDVLOG(2) << "Should run "
586 << SyncSessionJob::GetPurposeString(job->purpose()) 512 << SyncSessionJob::GetPurposeString(job->purpose())
587 << " in mode " << GetModeString(mode_) 513 << " in mode " << GetModeString(mode_)
588 << ": " << GetDecisionString(decision); 514 << ": " << GetDecisionString(decision);
589 if (decision != CONTINUE) { 515 if (decision == DROP) {
590 // End of the line, though we may save the job for later.
591 if (decision == SAVE) {
592 HandleSaveJobDecision(job.Pass());
593 } else {
594 DCHECK_EQ(decision, DROP);
595 }
596 return; 516 return;
597 } 517 }
598 518
599 if (pending_nudge_) { 519 // Try to coalesce in both SAVE and CONTINUE cases.
600 SDVLOG(2) << "Rescheduling pending nudge"; 520 if (pending_nudge_job_) {
601 pending_nudge_->CoalesceSources(job->source_info()); 521 pending_nudge_job_->CoalesceSources(job->source_info());
602 // Choose the start time as the earliest of the 2. Note that this means 522 if (decision == CONTINUE) {
603 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) 523 // Only update the scheduled_start if we're going to reschedule.
604 // but a nudge is already scheduled to go out, we'll send the (tab) commit 524 pending_nudge_job_->set_scheduled_start(
605 // without waiting. 525 std::min(job->scheduled_start(),
606 pending_nudge_->set_scheduled_start( 526 pending_nudge_job_->scheduled_start()));
607 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); 527 }
608 // Abandon the old task by cloning and replacing the session. 528 } else {
609 // It's possible that by "rescheduling" we're actually taking a job that 529 pending_nudge_job_ = job.Pass();
610 // was previously unscheduled and giving it wings, so take care to reset
611 // unscheduled nudge storage.
612 job = pending_nudge_->Clone();
613 pending_nudge_ = NULL;
614 unscheduled_nudge_storage_.reset();
615 // It's also possible we took a canary job, since we allow one nudge
616 // per backoff interval.
617 DCHECK(!wait_interval_ || !wait_interval_->had_nudge);
618 } 530 }
619 531
620 TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now(); 532 if (decision == SAVE) {
533 return;
534 }
535
536 TimeDelta run_delay =
537 pending_nudge_job_->scheduled_start() - TimeTicks::Now();
621 if (run_delay < TimeDelta::FromMilliseconds(0)) 538 if (run_delay < TimeDelta::FromMilliseconds(0))
622 run_delay = TimeDelta::FromMilliseconds(0); 539 run_delay = TimeDelta::FromMilliseconds(0);
623 SDVLOG_LOC(nudge_location, 2) 540 SDVLOG_LOC(nudge_location, 2)
624 << "Scheduling a nudge with " 541 << "Scheduling a nudge with "
625 << run_delay.InMilliseconds() << " ms delay"; 542 << run_delay.InMilliseconds() << " ms delay";
626 543
627 pending_nudge_ = job.get(); 544 PostDelayedTask(
628 PostDelayedTask(nudge_location, "DoSyncSessionJob", 545 nudge_location,
629 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), 546 "DoSyncSessionJob",
630 weak_ptr_factory_.GetWeakPtr(), 547 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoNudgeSyncSessionJob),
631 base::Passed(&job), 548 weak_ptr_factory_.GetWeakPtr(),
632 NORMAL_PRIORITY), 549 NORMAL_PRIORITY),
633 run_delay); 550 run_delay);
634 } 551 }
635 552
636 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 553 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
637 switch (mode) { 554 switch (mode) {
638 ENUM_CASE(CONFIGURATION_MODE); 555 ENUM_CASE(CONFIGURATION_MODE);
639 ENUM_CASE(NORMAL_MODE); 556 ENUM_CASE(NORMAL_MODE);
640 } 557 }
641 return ""; 558 return "";
642 } 559 }
(...skipping 16 matching lines...) Expand all
659 DCHECK_EQ(MessageLoop::current(), sync_loop_); 576 DCHECK_EQ(MessageLoop::current(), sync_loop_);
660 if (!started_) { 577 if (!started_) {
661 SDVLOG(1) << "Not posting task as scheduler is stopped."; 578 SDVLOG(1) << "Not posting task as scheduler is stopped.";
662 return; 579 return;
663 } 580 }
664 // This cancels the previous task, if one existed. 581 // This cancels the previous task, if one existed.
665 pending_wakeup_.Reset(task); 582 pending_wakeup_.Reset(task);
666 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); 583 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay);
667 } 584 }
668 585
669 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, 586 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job,
670 JobPriority priority) { 587 JobPriority priority) {
671 DCHECK_EQ(MessageLoop::current(), sync_loop_); 588 DCHECK_EQ(MessageLoop::current(), sync_loop_);
672 if (job->purpose() == SyncSessionJob::NUDGE) {
673 pending_nudge_ = NULL;
674 }
675 589
676 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 590 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
677 JobProcessDecision decision = DecideOnJob(*job, priority); 591 JobProcessDecision decision = DecideOnJob(*job, priority);
678 SDVLOG(2) << "Should run " 592 SDVLOG(2) << "Should run "
679 << SyncSessionJob::GetPurposeString(job->purpose()) 593 << SyncSessionJob::GetPurposeString(job->purpose())
680 << " in mode " << GetModeString(mode_) 594 << " in mode " << GetModeString(mode_)
681 << " with source " << job->source_info().updates_source 595 << " with source " << job->source_info().updates_source
682 << ": " << GetDecisionString(decision); 596 << ": " << GetDecisionString(decision);
683 if (decision != CONTINUE) { 597 if (decision != CONTINUE) {
684 if (decision == SAVE) { 598 if (decision == SAVE) {
685 HandleSaveJobDecision(job.Pass()); 599 if (job->purpose() == SyncSessionJob::CONFIGURATION) {
600 pending_configure_job_ = job.Pass();
601
602 // It's very unlikely, but possible, that the WaitInterval's wakeup task
603 // isn't actually active at this point. Sometimes the WaitInterval gets
604 // preempted by a nudge-while-in-backoff. We can't just assume that
605 // there's a task waiting to wake us up once the WaitInterval expires;
606 // we need to ensure it by rescheduling the WaitInterval (while taking
607 // into account any time already spent waiting, of course).
608 ResumeWaiting();
rlarocque 2013/04/02 23:01:12 Here's a paragraph that I had originally intended
tim (not reviewing) 2013/04/04 18:07:30 It looks like we actually keep the WaitInterval ti
609 } else {
610 pending_nudge_job_ = job.Pass();
611 }
686 } else { 612 } else {
687 DCHECK_EQ(decision, DROP); 613 DCHECK_EQ(decision, DROP);
688 } 614 }
689 return false; 615 return false;
690 } 616 }
691 617
692 DVLOG(2) << "Creating sync session with routes " 618 DVLOG(2) << "Creating sync session with routes "
693 << ModelSafeRoutingInfoToString(session_context_->routing_info()) 619 << ModelSafeRoutingInfoToString(session_context_->routing_info())
694 << "and purpose " << job->purpose(); 620 << "and purpose " << job->purpose();
695 SyncSession session(session_context_, this, job->source_info()); 621 SyncSession session(session_context_, this, job->source_info());
696 bool premature_exit = !syncer_->SyncShare(&session, 622 bool premature_exit = !syncer_->SyncShare(&session,
697 job->start_step(), 623 job->start_step(),
698 job->end_step()); 624 job->end_step());
699 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; 625 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
700 626
701 bool success = FinishSyncSessionJob(job.get(), 627 bool success = FinishSyncSessionJob(job.get(),
702 premature_exit, 628 premature_exit,
703 &session); 629 &session);
704 630
705 if (IsSyncingCurrentlySilenced()) { 631 if (IsSyncingCurrentlySilenced()) {
706 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; 632 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
707 // If we're here, it's because |job| was silenced until a server specified 633 // If we're here, it's because |job| was silenced until a server specified
708 // time. (Note, it had to be |job|, because DecideOnJob would not permit 634 // time. (Note, it had to be |job|, because DecideOnJob would not permit
709 // any job through while in WaitInterval::THROTTLED). 635 // any job through while in WaitInterval::THROTTLED).
710 scoped_ptr<SyncSessionJob> clone = job->Clone(); 636 if (job->purpose() == SyncSessionJob::NUDGE)
711 if (clone->purpose() == SyncSessionJob::NUDGE) 637 pending_nudge_job_ = job.Pass();
712 pending_nudge_ = clone.get(); 638 else if (job->purpose() == SyncSessionJob::CONFIGURATION)
713 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) 639 pending_configure_job_ = job.Pass();
714 wait_interval_->pending_configure_job = clone.get();
715 else 640 else
716 NOTREACHED(); 641 NOTREACHED();
717 642
718 RestartWaiting(clone.Pass()); 643 RestartWaiting();
719 return success; 644 return success;
720 } 645 }
721 646
722 if (!success) 647 if (!success)
723 ScheduleNextSync(job.Pass(), &session); 648 ScheduleNextSync(job.Pass(), &session);
724 649
725 return success; 650 return success;
726 } 651 }
727 652
653 bool SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
tim (not reviewing) 2013/04/04 00:22:11 Digging around, it looks like these Do*SSJ methods
rlarocque 2013/04/04 00:59:48 Your suggestion would work for now, but it wouldn'
654 return DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority);
655 }
656
657 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
658 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority);
659 }
660
728 bool SyncSchedulerImpl::ShouldPoll() { 661 bool SyncSchedulerImpl::ShouldPoll() {
729 if (wait_interval_.get()) { 662 if (wait_interval_.get()) {
730 SDVLOG(2) << "Not running poll in wait interval."; 663 SDVLOG(2) << "Not running poll in wait interval.";
731 return false; 664 return false;
732 } 665 }
733 666
734 if (mode_ == CONFIGURATION_MODE) { 667 if (mode_ == CONFIGURATION_MODE) {
735 SDVLOG(2) << "Not running poll in configuration mode."; 668 SDVLOG(2) << "Not running poll in configuration mode.";
736 return false; 669 return false;
737 } 670 }
738 671
739 // TODO(rlarocque): Refactor decision-making logic common to all types 672 // TODO(rlarocque): Refactor decision-making logic common to all types
740 // of jobs into a shared function. 673 // of jobs into a shared function.
741 674
742 if (session_context_->connection_manager()->HasInvalidAuthToken()) { 675 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
743 SDVLOG(2) << "Not running poll because auth token is invalid."; 676 SDVLOG(2) << "Not running poll because auth token is invalid.";
744 return false; 677 return false;
745 } 678 }
746 679
747 return true; 680 return true;
748 } 681 }
749 682
750 void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { 683 void SyncSchedulerImpl::DoPollSyncSessionJob() {
751 DCHECK_EQ(job->purpose(), SyncSessionJob::POLL); 684 ModelSafeRoutingInfo r;
685 ModelTypeInvalidationMap invalidation_map =
686 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
687 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
688 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
689 TimeTicks::Now(),
690 info,
691 ConfigurationParams()));
752 692
753 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 693 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
754 694
755 if (!ShouldPoll()) 695 if (!ShouldPoll())
756 return; 696 return;
757 697
758 DVLOG(2) << "Polling with routes " 698 DVLOG(2) << "Polling with routes "
759 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 699 << ModelSafeRoutingInfoToString(session_context_->routing_info());
760 SyncSession session(session_context_, this, job->source_info()); 700 SyncSession session(session_context_, this, job->source_info());
761 bool premature_exit = !syncer_->SyncShare(&session, 701 bool premature_exit = !syncer_->SyncShare(&session,
762 job->start_step(), 702 job->start_step(),
763 job->end_step()); 703 job->end_step());
764 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; 704 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
765 705
766 FinishSyncSessionJob(job.get(), premature_exit, &session); 706 FinishSyncSessionJob(job.get(), premature_exit, &session);
767 707
768 if (IsSyncingCurrentlySilenced()) { 708 if (IsSyncingCurrentlySilenced()) {
769 // This will start the countdown to unthrottle. Other kinds of jobs would 709 // Normally we would only call RestartWaiting() if we had a
770 // schedule themselves as the post-unthrottle canary. A poll job is not 710 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's
771 // that urgent, so it does not get to be the canary. We still need to start 711 // possible that neither is set. We create the wait interval anyway because
772 // the timer regardless. Otherwise there could be no one to clear the 712 // we need it to make sure we get unthrottled on time.
773 // WaitInterval when the throttling expires. 713 RestartWaiting();
774 RestartWaiting(scoped_ptr<SyncSessionJob>());
775 } 714 }
776 } 715 }
777 716
778 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 717 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
779 DCHECK_EQ(MessageLoop::current(), sync_loop_); 718 DCHECK_EQ(MessageLoop::current(), sync_loop_);
780 719
781 // We are interested in recording time between local nudges for datatypes. 720 // We are interested in recording time between local nudges for datatypes.
782 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 721 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
783 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 722 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
784 return; 723 return;
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
847 // appears that this was our nudge for this interval, and it failed. 786 // appears that this was our nudge for this interval, and it failed.
848 // 787 //
849 // Note: This does not prevent us from running canary jobs. For example, 788 // Note: This does not prevent us from running canary jobs. For example,
850 // an IP address change might still result in another nudge being executed 789 // an IP address change might still result in another nudge being executed
851 // during this backoff interval. 790 // during this backoff interval.
852 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; 791 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
853 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); 792 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
854 DCHECK(!wait_interval_->had_nudge); 793 DCHECK(!wait_interval_->had_nudge);
855 794
856 wait_interval_->had_nudge = true; 795 wait_interval_->had_nudge = true;
857 DCHECK(!pending_nudge_); 796 DCHECK(!pending_nudge_job_);
858 797
859 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); 798 pending_nudge_job_ = finished_job.Pass();
860 pending_nudge_ = new_job.get(); 799 RestartWaiting();
861 RestartWaiting(new_job.Pass());
862 } else { 800 } else {
863 // Either this is the first failure or a consecutive failure after our 801 // Either this is the first failure or a consecutive failure after our
864 // backoff timer expired. We handle it the same way in either case. 802 // backoff timer expired. We handle it the same way in either case.
865 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; 803 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
866 HandleContinuationError(finished_job.Pass(), session); 804 HandleContinuationError(finished_job.Pass(), session);
867 } 805 }
868 } 806 }
869 807
870 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { 808 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
871 DCHECK_EQ(MessageLoop::current(), sync_loop_); 809 DCHECK_EQ(MessageLoop::current(), sync_loop_);
872 810
873 TimeDelta poll = (!session_context_->notifications_enabled()) ? 811 TimeDelta poll = (!session_context_->notifications_enabled()) ?
874 syncer_short_poll_interval_seconds_ : 812 syncer_short_poll_interval_seconds_ :
875 syncer_long_poll_interval_seconds_; 813 syncer_long_poll_interval_seconds_;
876 bool rate_changed = !poll_timer_.IsRunning() || 814 bool rate_changed = !poll_timer_.IsRunning() ||
877 poll != poll_timer_.GetCurrentDelay(); 815 poll != poll_timer_.GetCurrentDelay();
878 816
879 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) 817 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed)
880 poll_timer_.Reset(); 818 poll_timer_.Reset();
881 819
882 if (!rate_changed) 820 if (!rate_changed)
883 return; 821 return;
884 822
885 // Adjust poll rate. 823 // Adjust poll rate.
886 poll_timer_.Stop(); 824 poll_timer_.Stop();
887 poll_timer_.Start(FROM_HERE, poll, this, 825 poll_timer_.Start(FROM_HERE, poll, this,
888 &SyncSchedulerImpl::PollTimerCallback); 826 &SyncSchedulerImpl::PollTimerCallback);
889 } 827 }
890 828
891 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { 829 void SyncSchedulerImpl::ResumeWaiting() {
830 TimeDelta length =
831 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
832 wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
833 TimeDelta::FromSeconds(0) : length;
834 RestartWaiting();
835 }
836
837 void SyncSchedulerImpl::RestartWaiting() {
892 CHECK(wait_interval_.get()); 838 CHECK(wait_interval_.get());
893 wait_interval_->timer.Stop(); 839 wait_interval_->timer.Stop();
894 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 840 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
895 if (wait_interval_->mode == WaitInterval::THROTTLED) { 841 if (wait_interval_->mode == WaitInterval::THROTTLED) {
896 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, 842 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle,
897 weak_ptr_factory_.GetWeakPtr(), 843 weak_ptr_factory_.GetWeakPtr()));
898 base::Passed(&job)));
899 844
900 } else { 845 } else {
901 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, 846 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::TryCanaryJob,
902 weak_ptr_factory_.GetWeakPtr(), 847 weak_ptr_factory_.GetWeakPtr()));
903 base::Passed(&job)));
904 } 848 }
905 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, 849 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
906 pending_wakeup_.callback()); 850 pending_wakeup_.callback());
907 } 851 }
908 852
909 void SyncSchedulerImpl::HandleContinuationError( 853 void SyncSchedulerImpl::HandleContinuationError(
910 scoped_ptr<SyncSessionJob> old_job, 854 scoped_ptr<SyncSessionJob> old_job,
911 SyncSession* session) { 855 SyncSession* session) {
912 DCHECK_EQ(MessageLoop::current(), sync_loop_); 856 DCHECK_EQ(MessageLoop::current(), sync_loop_);
913 857
914 TimeDelta length = delay_provider_->GetDelay( 858 TimeDelta length = delay_provider_->GetDelay(
915 IsBackingOff() ? wait_interval_->length : 859 IsBackingOff() ? wait_interval_->length :
916 delay_provider_->GetInitialDelay( 860 delay_provider_->GetInitialDelay(
917 session->status_controller().model_neutral_state())); 861 session->status_controller().model_neutral_state()));
918 862
919 SDVLOG(2) << "In handle continuation error with " 863 SDVLOG(2) << "In handle continuation error with "
920 << SyncSessionJob::GetPurposeString(old_job->purpose()) 864 << SyncSessionJob::GetPurposeString(old_job->purpose())
921 << " job. The time delta(ms) is " 865 << " job. The time delta(ms) is "
922 << length.InMilliseconds(); 866 << length.InMilliseconds();
923 867
924 // This will reset the had_nudge variable as well. 868 // This will reset the had_nudge variable as well.
925 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 869 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
926 length)); 870 length));
927 NotifyRetryTime(base::Time::Now() + length); 871 NotifyRetryTime(base::Time::Now() + length);
928 scoped_ptr<SyncSessionJob> new_job(old_job->Clone()); 872 old_job->set_scheduled_start(TimeTicks::Now() + length);
929 new_job->set_scheduled_start(TimeTicks::Now() + length);
930 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { 873 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
931 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; 874 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
932 // Config params should always get set. 875 // Config params should always get set.
933 DCHECK(!old_job->config_params().ready_task.is_null()); 876 DCHECK(!old_job->config_params().ready_task.is_null());
934 wait_interval_->pending_configure_job = new_job.get(); 877 DCHECK(!pending_configure_job_);
878 pending_configure_job_ = old_job.Pass();
935 } else { 879 } else {
936 // We are not in configuration mode. So wait_interval's pending job 880 // We're not in configure mode so we should not have a configure job.
937 // should be null. 881 DCHECK(!pending_configure_job_);
938 DCHECK(wait_interval_->pending_configure_job == NULL); 882 DCHECK(!pending_nudge_job_);
939 DCHECK(!pending_nudge_); 883 pending_nudge_job_ = old_job.Pass();
940 pending_nudge_ = new_job.get();
941 } 884 }
942 885
943 RestartWaiting(new_job.Pass()); 886 RestartWaiting();
944 } 887 }
945 888
946 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 889 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
947 syncer_->RequestEarlyExit(); // Safe to call from any thread. 890 syncer_->RequestEarlyExit(); // Safe to call from any thread.
948 DCHECK(weak_handle_this_.IsInitialized()); 891 DCHECK(weak_handle_this_.IsInitialized());
949 SDVLOG(3) << "Posting StopImpl"; 892 SDVLOG(3) << "Posting StopImpl";
950 weak_handle_this_.Call(FROM_HERE, 893 weak_handle_this_.Call(FROM_HERE,
951 &SyncSchedulerImpl::StopImpl, 894 &SyncSchedulerImpl::StopImpl,
952 callback); 895 callback);
953 } 896 }
954 897
955 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 898 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
956 DCHECK_EQ(MessageLoop::current(), sync_loop_); 899 DCHECK_EQ(MessageLoop::current(), sync_loop_);
957 SDVLOG(2) << "StopImpl called"; 900 SDVLOG(2) << "StopImpl called";
958 901
959 // Kill any in-flight method calls. 902 // Kill any in-flight method calls.
960 weak_ptr_factory_.InvalidateWeakPtrs(); 903 weak_ptr_factory_.InvalidateWeakPtrs();
961 wait_interval_.reset(); 904 wait_interval_.reset();
962 NotifyRetryTime(base::Time()); 905 NotifyRetryTime(base::Time());
963 poll_timer_.Stop(); 906 poll_timer_.Stop();
964 pending_nudge_ = NULL;
965 unscheduled_nudge_storage_.reset();
966 pending_wakeup_.Cancel(); 907 pending_wakeup_.Cancel();
tim (not reviewing) 2013/04/04 00:22:11 Can we rename pending_wakeup_ to pending_wakeup_ev
rlarocque 2013/04/04 00:59:48 Done.
908 pending_nudge_job_.reset();
909 pending_configure_job_.reset();
967 if (started_) { 910 if (started_) {
968 started_ = false; 911 started_ = false;
969 } 912 }
970 if (!callback.is_null()) 913 if (!callback.is_null())
971 callback.Run(); 914 callback.Run();
972 } 915 }
973 916
974 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { 917 // This is the only place where we invoke DoSyncSessionJob with canary
918 // privileges. Everyone else should use NORMAL_PRIORITY.
919 void SyncSchedulerImpl::TryCanaryJob() {
975 DCHECK_EQ(MessageLoop::current(), sync_loop_); 920 DCHECK_EQ(MessageLoop::current(), sync_loop_);
976 SDVLOG(2) << "Do canary job";
977 921
978 // This is the only place where we invoke DoSyncSessionJob with canary 922 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) {
tim (not reviewing) 2013/04/04 18:07:30 There's an implicit point here that canary jobs ha
979 // privileges. Everyone else should use NORMAL_PRIORITY. 923 SDVLOG(2) << "Found pending configure job; will run as canary";
980 DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); 924 DoConfigurationSyncSessionJob(CANARY_PRIORITY);
981 } 925 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) {
982 926 SDVLOG(2) << "Found pending nudge job; will run as canary";
983 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { 927 DoNudgeSyncSessionJob(CANARY_PRIORITY);
984 DCHECK_EQ(MessageLoop::current(), sync_loop_); 928 } else {
985 // If we find a scheduled pending_ job, abandon the old one and return a 929 SDVLOG(2) << "Found no work to do; will not run a canary";
986 // a clone. If unscheduled, just hand over ownership.
987 scoped_ptr<SyncSessionJob> candidate;
988 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
989 && wait_interval_->pending_configure_job) {
990 SDVLOG(2) << "Found pending configure job";
991 candidate =
992 wait_interval_->pending_configure_job->Clone().Pass();
993 wait_interval_->pending_configure_job = candidate.get();
994 } else if (mode_ == NORMAL_MODE && pending_nudge_) {
995 SDVLOG(2) << "Found pending nudge job";
996 candidate = pending_nudge_->Clone();
997 pending_nudge_ = candidate.get();
998 unscheduled_nudge_storage_.reset();
999 } 930 }
1000 // If we took a job and there's a wait interval, we took the pending canary.
1001 if (candidate && wait_interval_)
1002 wait_interval_->timer.Stop();
1003 return candidate.Pass();
1004 } 931 }
1005 932
1006 void SyncSchedulerImpl::PollTimerCallback() { 933 void SyncSchedulerImpl::PollTimerCallback() {
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); 934 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1008 ModelSafeRoutingInfo r;
1009 ModelTypeInvalidationMap invalidation_map =
1010 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
1011 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
1012 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
1013 TimeTicks::Now(),
1014 info,
1015 ConfigurationParams()));
1016 if (no_scheduling_allowed_) { 935 if (no_scheduling_allowed_) {
1017 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in 936 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
1018 // functions that are called only on the sync thread. This function is also 937 // functions that are called only on the sync thread. This function is also
1019 // called only on the sync thread, and only when it is posted by an expiring 938 // called only on the sync thread, and only when it is posted by an expiring
1020 // timer. If we find that no_scheduling_allowed_ is set here, then 939 // timer. If we find that no_scheduling_allowed_ is set here, then
1021 // something is very wrong. Maybe someone mistakenly called us directly, or 940 // something is very wrong. Maybe someone mistakenly called us directly, or
1022 // mishandled the book-keeping for no_scheduling_allowed_. 941 // mishandled the book-keeping for no_scheduling_allowed_.
1023 NOTREACHED() << "Illegal to schedule job while session in progress."; 942 NOTREACHED() << "Illegal to schedule job while session in progress.";
1024 return; 943 return;
1025 } 944 }
1026 945
1027 DoPollSyncSessionJob(job.Pass()); 946 DoPollSyncSessionJob();
1028 } 947 }
1029 948
1030 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { 949 void SyncSchedulerImpl::Unthrottle() {
1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); 950 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1032 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 951 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1033 DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() ||
1034 wait_interval_->pending_configure_job == to_be_canary.get());
1035 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
1036 << "canary.";
1037 952
1038 // We're no longer throttled, so clear the wait interval. 953 // We're no longer throttled, so clear the wait interval.
1039 wait_interval_.reset(); 954 wait_interval_.reset();
1040 NotifyRetryTime(base::Time()); 955 NotifyRetryTime(base::Time());
1041 956
1042 // We treat this as a 'canary' in the sense that it was originally scheduled 957 // We treat this as a 'canary' in the sense that it was originally scheduled
1043 // to run some time ago, failed, and we now want to retry, versus a job that 958 // to run some time ago, failed, and we now want to retry, versus a job that
1044 // was just created (e.g via ScheduleNudgeImpl). The main implication is 959 // was just created (e.g via ScheduleNudgeImpl). The main implication is
1045 // that we're careful to update routing info (etc) with such potentially 960 // that we're careful to update routing info (etc) with such potentially
1046 // stale canary jobs. 961 // stale canary jobs.
1047 if (to_be_canary.get()) { 962 TryCanaryJob();
1048 DoCanaryJob(to_be_canary.Pass());
1049 } else {
1050 DCHECK(!unscheduled_nudge_storage_.get());
1051 }
1052 } 963 }
1053 964
1054 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 965 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
1055 DCHECK_EQ(MessageLoop::current(), sync_loop_); 966 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1056 session_context_->NotifyListeners(SyncEngineEvent(cause)); 967 session_context_->NotifyListeners(SyncEngineEvent(cause));
1057 } 968 }
1058 969
1059 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { 970 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
1060 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); 971 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED);
1061 event.retry_time = retry_time; 972 event.retry_time = retry_time;
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
1140 1051
1141 #undef SDVLOG_LOC 1052 #undef SDVLOG_LOC
1142 1053
1143 #undef SDVLOG 1054 #undef SDVLOG
1144 1055
1145 #undef SLOG 1056 #undef SLOG
1146 1057
1147 #undef ENUM_CASE 1058 #undef ENUM_CASE
1148 1059
1149 } // namespace syncer 1060 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698