Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 13743003: sync: Finish the SyncScheduler refactor (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix ModelNeutralState forward decl Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
(...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after
218 << thread_name << " with mode " << GetModeString(mode); 218 << thread_name << " with mode " << GetModeString(mode);
219 if (!started_) { 219 if (!started_) {
220 started_ = true; 220 started_ = true;
221 SendInitialSnapshot(); 221 SendInitialSnapshot();
222 } 222 }
223 223
224 DCHECK(!session_context_->account_name().empty()); 224 DCHECK(!session_context_->account_name().empty());
225 DCHECK(syncer_.get()); 225 DCHECK(syncer_.get());
226 Mode old_mode = mode_; 226 Mode old_mode = mode_;
227 mode_ = mode; 227 mode_ = mode;
228 AdjustPolling(NULL); // Will kick start poll timer if needed. 228 AdjustPolling(false); // Will kick start poll timer if needed.
229 229
230 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) { 230 if (old_mode != mode_ && mode_ == NORMAL_MODE && !nudge_tracker_.IsEmpty()) {
231 // We just got back to normal mode. Let's try to run the work that was 231 // We just got back to normal mode. Let's try to run the work that was
232 // queued up while we were configuring. 232 // queued up while we were configuring.
233 DoNudgeSyncSessionJob(NORMAL_PRIORITY); 233 DoNudgeSyncSessionJob(NORMAL_PRIORITY);
234 } 234 }
235 } 235 }
236 236
237 void SyncSchedulerImpl::SendInitialSnapshot() { 237 void SyncSchedulerImpl::SendInitialSnapshot() {
238 DCHECK(CalledOnValidThread()); 238 DCHECK(CalledOnValidThread());
239 scoped_ptr<SyncSession> dummy(new SyncSession( 239 scoped_ptr<SyncSession> dummy(new SyncSession(
240 session_context_, this, SyncSourceInfo())); 240 session_context_, this, SyncSourceInfo()));
(...skipping 26 matching lines...) Expand all
267 const ConfigurationParams& params) { 267 const ConfigurationParams& params) {
268 DCHECK(CalledOnValidThread()); 268 DCHECK(CalledOnValidThread());
269 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 269 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
270 DCHECK_EQ(CONFIGURATION_MODE, mode_); 270 DCHECK_EQ(CONFIGURATION_MODE, mode_);
271 DCHECK(!params.ready_task.is_null()); 271 DCHECK(!params.ready_task.is_null());
272 CHECK(started_) << "Scheduler must be running to configure."; 272 CHECK(started_) << "Scheduler must be running to configure.";
273 SDVLOG(2) << "Reconfiguring syncer."; 273 SDVLOG(2) << "Reconfiguring syncer.";
274 274
275 // Only one configuration is allowed at a time. Verify we're not waiting 275 // Only one configuration is allowed at a time. Verify we're not waiting
276 // for a pending configure job. 276 // for a pending configure job.
277 DCHECK(!pending_configure_job_); 277 DCHECK(!pending_configure_params_);
278 278
279 ModelSafeRoutingInfo restricted_routes; 279 ModelSafeRoutingInfo restricted_routes;
280 BuildModelSafeParams(params.types_to_download, 280 BuildModelSafeParams(params.types_to_download,
281 params.routing_info, 281 params.routing_info,
282 &restricted_routes); 282 &restricted_routes);
283 session_context_->set_routing_info(restricted_routes); 283 session_context_->set_routing_info(restricted_routes);
284 284
285 // Only reconfigure if we have types to download. 285 // Only reconfigure if we have types to download.
286 if (!params.types_to_download.Empty()) { 286 if (!params.types_to_download.Empty()) {
287 DCHECK(!restricted_routes.empty()); 287 pending_configure_params_.reset(new ConfigurationParams(params));
288 pending_configure_job_.reset(new SyncSessionJob(
289 SyncSessionJob::CONFIGURATION,
290 TimeTicks::Now(),
291 SyncSourceInfo(params.source,
292 ModelSafeRoutingInfoToInvalidationMap(
293 restricted_routes,
294 std::string())),
295 params));
296 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); 288 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY);
297 289
298 // If we failed, the job would have been saved as the pending configure 290 // If we failed, the job would have been saved as the pending configure
299 // job and a wait interval would have been set. 291 // job and a wait interval would have been set.
300 if (!succeeded) { 292 if (!succeeded) {
301 DCHECK(pending_configure_job_); 293 DCHECK(pending_configure_params_);
302 return false;
303 } else { 294 } else {
304 DCHECK(!pending_configure_job_); 295 DCHECK(!pending_configure_params_);
305 } 296 }
297 return succeeded;
306 } else { 298 } else {
307 SDVLOG(2) << "No change in routing info, calling ready task directly."; 299 SDVLOG(2) << "No change in routing info, calling ready task directly.";
308 params.ready_task.Run(); 300 params.ready_task.Run();
309 } 301 }
310 302
311 return true; 303 return true;
312 } 304 }
313 305
314 SyncSchedulerImpl::JobProcessDecision 306 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
315 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
316 JobPriority priority) {
317 DCHECK(CalledOnValidThread()); 307 DCHECK(CalledOnValidThread());
318 DCHECK(wait_interval_.get()); 308 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
319 DCHECK_NE(job.purpose(), SyncSessionJob::POLL); 309 SDVLOG(1) << "Unable to run a job because we're throttled.";
310 return false;
311 }
320 312
321 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 313 if (wait_interval_
322 << WaitInterval::GetModeString(wait_interval_->mode) 314 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
323 << ((priority == CANARY_PRIORITY) ? " (canary)" : ""); 315 && priority != CANARY_PRIORITY) {
316 SDVLOG(1) << "Unable to run a job because we're backing off.";
317 return false;
318 }
324 319
325 // If we save a job while in a WaitInterval, there is a well-defined moment 320 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
326 // in time in the future when it makes sense for that SAVE-worthy job to try 321 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
327 // running again -- the end of the WaitInterval. 322 return false;
328 DCHECK(job.purpose() == SyncSessionJob::NUDGE || 323 }
329 job.purpose() == SyncSessionJob::CONFIGURATION);
330 324
331 // If throttled, there's a clock ticking to unthrottle. We want to get 325 return true;
332 // on the same train.
333 if (wait_interval_->mode == WaitInterval::THROTTLED)
334 return SAVE;
335
336 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
337 if (job.purpose() == SyncSessionJob::NUDGE) {
338 if (mode_ == CONFIGURATION_MODE)
339 return SAVE;
340
341 if (priority == NORMAL_PRIORITY)
342 return DROP;
343 else // Either backoff has ended, or we have permission to bypass it.
344 return CONTINUE;
345 }
346 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE;
347 } 326 }
348 327
349 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 328 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
350 const SyncSessionJob& job,
351 JobPriority priority) {
352 DCHECK(CalledOnValidThread()); 329 DCHECK(CalledOnValidThread());
353 330
354 // POLL jobs do not call this function. 331 if (!CanRunJobNow(priority)) {
355 DCHECK(job.purpose() == SyncSessionJob::NUDGE || 332 SDVLOG(1) << "Unable to run a nudge job right now";
356 job.purpose() == SyncSessionJob::CONFIGURATION); 333 return false;
334 }
357 335
358 // See if our type is throttled. 336 // If all types are throttled, do not continue. Today, we don't treat a
337 // per-datatype "unthrottle" event as something that should force a canary
338 // job. For this reason, there's no good time to reschedule this job to run
339 // -- we'll lazily wait for an independent event to trigger a sync.
359 ModelTypeSet throttled_types = 340 ModelTypeSet throttled_types =
360 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); 341 session_context_->throttled_data_type_tracker()->GetThrottledTypes();
361 if (job.purpose() == SyncSessionJob::NUDGE && 342 if (!nudge_tracker_.GetLocallyModifiedTypes().Empty() &&
362 job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) { 343 throttled_types.HasAll(nudge_tracker_.GetLocallyModifiedTypes())) {
tim (not reviewing) 2013/04/15 22:37:49 Can you add a todo that we should probably PruneTh
rlarocque 2013/04/16 01:30:26 Done.
363 ModelTypeSet requested_types; 344 SDVLOG(1) << "Not running a nudge because we're fully datatype throttled.";
364 for (ModelTypeInvalidationMap::const_iterator i = 345 return false;
365 job.source_info().types.begin(); i != job.source_info().types.end();
366 ++i) {
367 requested_types.Put(i->first);
368 }
369
370 // If all types are throttled, do not CONTINUE. Today, we don't treat
371 // a per-datatype "unthrottle" event as something that should force a
372 // canary job. For this reason, there's no good time to reschedule this job
373 // to run -- we'll lazily wait for an independent event to trigger a sync.
374 // Note that there may already be such an event if we're in a WaitInterval,
375 // so we can retry it then.
376 if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
377 return DROP; // TODO(tim): Don't drop. http://crbug.com/177659
378 } 346 }
379 347
380 if (wait_interval_.get())
381 return DecideWhileInWaitInterval(job, priority);
382
383 if (mode_ == CONFIGURATION_MODE) { 348 if (mode_ == CONFIGURATION_MODE) {
384 if (job.purpose() == SyncSessionJob::NUDGE) 349 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
385 return SAVE; // Running requires a mode switch. 350 return false;
386 else // Implies job.purpose() == SyncSessionJob::CONFIGURATION.
387 return CONTINUE;
388 } 351 }
389 352
390 // We are in normal mode. 353 return true;
391 DCHECK_EQ(mode_, NORMAL_MODE);
392 DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION);
393
394 // Note about some subtle scheduling semantics.
395 //
396 // It's possible at this point that |job| is known to be unnecessary, and
397 // dropping it would be perfectly safe and correct. Consider
398 //
399 // 1) |job| is a NUDGE (for any combination of types) with a
400 // |scheduled_start| time that is less than the time that the last
401 // successful all-datatype NUDGE completed, and it has a NOTIFICATION
402 // GetUpdatesCallerInfo value yet offers no new notification hint.
403 //
404 // 2) |job| is a NUDGE with a |scheduled_start| time that is less than
405 // the time that the last successful matching-datatype NUDGE completed,
406 // and payloads (hints) are identical to that last successful NUDGE.
407 //
408 // We avoid cases 1 and 2 by externally synchronizing NUDGE requests --
409 // scheduling a NUDGE requires command of the sync thread, which is
410 // impossible* from outside of SyncScheduler if a NUDGE is taking place.
411 // And if you have command of the sync thread when scheduling a NUDGE and a
412 // previous NUDGE exists, they will be coalesced and the stale job will be
413 // cancelled via the session-equality check in DoSyncSessionJob.
414 //
415 // * It's not strictly "impossible", but it would be reentrant and hence
416 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a
417 // legal side effect of any of the work being done as part of a sync cycle.
418 // See |no_scheduling_allowed_| for details.
419
420 // Decision now rests on state of auth tokens.
421 if (!session_context_->connection_manager()->HasInvalidAuthToken())
422 return CONTINUE;
423
424 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
425 // Running the job would require updated auth, so we can't honour
426 // job.scheduled_start().
427 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
428 } 354 }
429 355
430 void SyncSchedulerImpl::ScheduleNudgeAsync( 356 void SyncSchedulerImpl::ScheduleNudgeAsync(
431 const TimeDelta& desired_delay, 357 const TimeDelta& desired_delay,
432 NudgeSource source, ModelTypeSet types, 358 NudgeSource source, ModelTypeSet types,
433 const tracked_objects::Location& nudge_location) { 359 const tracked_objects::Location& nudge_location) {
434 DCHECK(CalledOnValidThread()); 360 DCHECK(CalledOnValidThread());
435 SDVLOG_LOC(nudge_location, 2) 361 SDVLOG_LOC(nudge_location, 2)
436 << "Nudge scheduled with delay " 362 << "Nudge scheduled with delay "
437 << desired_delay.InMilliseconds() << " ms, " 363 << desired_delay.InMilliseconds() << " ms, "
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
489 SDVLOG_LOC(nudge_location, 2) 415 SDVLOG_LOC(nudge_location, 2)
490 << "In ScheduleNudgeImpl with delay " 416 << "In ScheduleNudgeImpl with delay "
491 << delay.InMilliseconds() << " ms, " 417 << delay.InMilliseconds() << " ms, "
492 << "source " << GetUpdatesSourceString(source) << ", " 418 << "source " << GetUpdatesSourceString(source) << ", "
493 << "payloads " 419 << "payloads "
494 << ModelTypeInvalidationMapToString(invalidation_map); 420 << ModelTypeInvalidationMapToString(invalidation_map);
495 421
496 SyncSourceInfo info(source, invalidation_map); 422 SyncSourceInfo info(source, invalidation_map);
497 UpdateNudgeTimeRecords(info); 423 UpdateNudgeTimeRecords(info);
498 424
499 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 425 // Coalesce the new nudge information with any existing information.
500 SyncSessionJob::NUDGE, 426 nudge_tracker_.CoalesceSources(info);
501 TimeTicks::Now() + delay, 427
502 info, 428 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
503 ConfigurationParams())); 429 return;
504 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); 430
505 SDVLOG(2) << "Should run " 431 if (!started_) {
506 << SyncSessionJob::GetPurposeString(job->purpose()) 432 SDVLOG_LOC(nudge_location, 2)
507 << " in mode " << GetModeString(mode_) 433 << "Schedule not started; not running a nudge.";
508 << ": " << GetDecisionString(decision);
509 if (decision == DROP) {
510 return; 434 return;
511 } 435 }
512 436
513 // Try to coalesce in both SAVE and CONTINUE cases. 437 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
514 if (pending_nudge_job_) { 438 if (!scheduled_nudge_time_.is_null() &&
515 pending_nudge_job_->CoalesceSources(job->source_info()); 439 (scheduled_nudge_time_ < incoming_run_time)) {
516 if (decision == CONTINUE) { 440 // Old job arrives sooner than this one. Don't reschedule it.
517 // Only update the scheduled_start if we're going to reschedule.
518 pending_nudge_job_->set_scheduled_start(
519 std::min(job->scheduled_start(),
520 pending_nudge_job_->scheduled_start()));
521 }
522 } else {
523 pending_nudge_job_ = job.Pass();
524 }
525
526 if (decision == SAVE) {
527 return; 441 return;
528 } 442 }
529 443
530 TimeDelta run_delay = 444 // Either there is no existing nudge in flight or the incoming nudge arrives
531 pending_nudge_job_->scheduled_start() - TimeTicks::Now(); 445 // sooner than it does. We reschedule in either case.
tim (not reviewing) 2013/04/15 22:37:49 'than it does' is confusing. '... or the incoming
rlarocque 2013/04/16 01:30:26 Done.
532 if (run_delay < TimeDelta::FromMilliseconds(0))
533 run_delay = TimeDelta::FromMilliseconds(0);
534 SDVLOG_LOC(nudge_location, 2) 446 SDVLOG_LOC(nudge_location, 2)
535 << "Scheduling a nudge with " 447 << "Scheduling a nudge with "
536 << run_delay.InMilliseconds() << " ms delay"; 448 << delay.InMilliseconds() << " ms delay";
537 449 scheduled_nudge_time_ = incoming_run_time;
538 if (started_) { 450 pending_wakeup_timer_.Start(
539 pending_wakeup_timer_.Start( 451 nudge_location,
540 nudge_location, 452 delay,
541 run_delay, 453 base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob,
542 base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob, 454 weak_ptr_factory_.GetWeakPtr(),
543 weak_ptr_factory_.GetWeakPtr(), 455 NORMAL_PRIORITY));
544 NORMAL_PRIORITY));
545 }
546 } 456 }
547 457
548 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 458 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
549 switch (mode) { 459 switch (mode) {
550 ENUM_CASE(CONFIGURATION_MODE); 460 ENUM_CASE(CONFIGURATION_MODE);
551 ENUM_CASE(NORMAL_MODE); 461 ENUM_CASE(NORMAL_MODE);
552 } 462 }
553 return ""; 463 return "";
554 } 464 }
555 465
556 const char* SyncSchedulerImpl::GetDecisionString( 466 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
557 SyncSchedulerImpl::JobProcessDecision mode) { 467 DCHECK(CalledOnValidThread());
558 switch (mode) { 468
559 ENUM_CASE(CONTINUE); 469 if (!CanRunNudgeJobNow(priority)) {
560 ENUM_CASE(SAVE); 470 return;
tim (not reviewing) 2013/04/15 22:37:49 nit - remain consistent with {} on single line ifs
rlarocque 2013/04/16 01:30:26 Done.
561 ENUM_CASE(DROP);
562 } 471 }
563 return ""; 472
473 DVLOG(2) << "Will run normal mode sync cycle with routing info "
474 << ModelSafeRoutingInfoToString(session_context_->routing_info());
475 SyncSession session(session_context_, this, nudge_tracker_.source_info());
476 bool premature_exit = !syncer_->SyncShare(&session, SYNCER_BEGIN, SYNCER_END);
477 AdjustPolling(true);
478
479 if (!premature_exit
480 && !sessions::HasSyncerError(
tim (not reviewing) 2013/04/15 22:37:49 nit - at least the && should go on the previous li
rlarocque 2013/04/16 01:30:26 Done.
481 session.status_controller().model_neutral_state())) {
482 SDVLOG(2) << "Normal mode sync cycle succeeded";
483
484 // That cycle took care of any outstanding work we had.
485 nudge_tracker_.Reset();
486 scheduled_nudge_time_ = base::TimeTicks();
487
488 // If we're here, then we successfully reached the syncer. End all backoff.
tim (not reviewing) 2013/04/15 22:37:49 s/reached the syncer/reached the server
rlarocque 2013/04/16 01:30:26 Done.
489 wait_interval_.reset();
490 NotifyRetryTime(base::Time());
491 } else if (IsSyncingCurrentlySilenced()) {
492 SDVLOG(2) << "Normal mode sync cycle got throttled.";
493 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
494 // to do is start the timer.
495 RestartWaiting();
496 } else {
497 UpdateExponentialBackoff(session.status_controller().model_neutral_state());
498 SDVLOG(2) << "Normal mode sync cycle failed.";
499 RestartWaiting();
500 }
564 } 501 }
565 502
566 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job, 503 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
567 JobPriority priority) {
568 DCHECK(CalledOnValidThread()); 504 DCHECK(CalledOnValidThread());
505 DCHECK_EQ(mode_, CONFIGURATION_MODE);
569 506
570 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 507 if (!CanRunJobNow(priority)) {
571 JobProcessDecision decision = DecideOnJob(*job, priority); 508 SDVLOG(2) << "Unable to run configure job right now.";
572 SDVLOG(2) << "Should run "
573 << SyncSessionJob::GetPurposeString(job->purpose())
574 << " in mode " << GetModeString(mode_)
575 << " with source " << job->source_info().updates_source
576 << ": " << GetDecisionString(decision);
577 if (decision != CONTINUE) {
578 if (decision == SAVE) {
579 if (job->purpose() == SyncSessionJob::CONFIGURATION) {
580 pending_configure_job_ = job.Pass();
581 } else {
582 pending_nudge_job_ = job.Pass();
583 }
584 } else {
585 DCHECK_EQ(decision, DROP);
586 }
587 return false; 509 return false;
588 } 510 }
589 511
590 DVLOG(2) << "Creating sync session with routes " 512 SDVLOG(2) << "Will run configure SyncShare with routes "
591 << ModelSafeRoutingInfoToString(session_context_->routing_info()) 513 << ModelSafeRoutingInfoToString(session_context_->routing_info());
592 << "and purpose " << job->purpose(); 514 SyncSourceInfo source_info(pending_configure_params_->source,
593 SyncSession session(session_context_, this, job->source_info()); 515 ModelSafeRoutingInfoToInvalidationMap(
516 session_context_->routing_info(),
517 std::string()));
518 SyncSession session(session_context_, this, source_info);
594 bool premature_exit = !syncer_->SyncShare(&session, 519 bool premature_exit = !syncer_->SyncShare(&session,
595 job->start_step(), 520 DOWNLOAD_UPDATES,
596 job->end_step()); 521 APPLY_UPDATES);
597 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; 522 AdjustPolling(true);
598 523
599 bool success = FinishSyncSessionJob(job.get(), 524 if (!premature_exit
600 premature_exit, 525 && !sessions::HasSyncerError(
601 &session); 526 session.status_controller().model_neutral_state())) {
527 SDVLOG(2) << "Configure succeeded";
528 pending_configure_params_->ready_task.Run();
529 pending_configure_params_.reset();
602 530
603 if (IsSyncingCurrentlySilenced()) { 531 // If we're here, then we successfully reached the server. End all backoff.
604 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; 532 wait_interval_.reset();
605 // If we're here, it's because |job| was silenced until a server specified 533 NotifyRetryTime(base::Time());
606 // time. (Note, it had to be |job|, because DecideOnJob would not permit 534 return true;
607 // any job through while in WaitInterval::THROTTLED). 535 } else if (IsSyncingCurrentlySilenced()) {
608 if (job->purpose() == SyncSessionJob::NUDGE) 536 // This shouldn't happen, but we try to respect the server's request anyway.
tim (not reviewing) 2013/04/15 22:37:49 Why shouldn't this happen any more than it should
rlarocque 2013/04/16 01:30:26 I was under the impression that the server only th
tim (not reviewing) 2013/04/16 20:16:24 Yeah, global throttling is more of a panic switch
rlarocque 2013/04/16 22:20:05 I didn't think this would be a good idea, but I im
609 pending_nudge_job_ = job.Pass(); 537 SDVLOG(2) << "Was throttled during configure request.";
610 else if (job->purpose() == SyncSessionJob::CONFIGURATION)
611 pending_configure_job_ = job.Pass();
612 else
613 NOTREACHED();
614
615 RestartWaiting(); 538 RestartWaiting();
616 return success; 539 return false;
617 } 540 } else {
618 541 UpdateExponentialBackoff(session.status_controller().model_neutral_state());
619 if (!success) 542 SDVLOG(2) << "Configure failed. Will back off for "
620 ScheduleNextSync(job.Pass(), &session); 543 << wait_interval_->length.InMilliseconds() << "ms.";
621 544 RestartWaiting();
622 return success;
623 }
624
625 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
626 DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority);
627 }
628
629 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
630 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority);
631 }
632
633 bool SyncSchedulerImpl::ShouldPoll() {
634 if (wait_interval_.get()) {
635 SDVLOG(2) << "Not running poll in wait interval.";
636 return false; 545 return false;
637 } 546 }
638
639 if (mode_ == CONFIGURATION_MODE) {
640 SDVLOG(2) << "Not running poll in configuration mode.";
641 return false;
642 }
643
644 // TODO(rlarocque): Refactor decision-making logic common to all types
645 // of jobs into a shared function.
646
647 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
648 SDVLOG(2) << "Not running poll because auth token is invalid.";
649 return false;
650 }
651
652 return true;
653 } 547 }
654 548
655 void SyncSchedulerImpl::DoPollSyncSessionJob() { 549 void SyncSchedulerImpl::DoPollSyncSessionJob() {
656 ModelSafeRoutingInfo r; 550 ModelSafeRoutingInfo r;
657 ModelTypeInvalidationMap invalidation_map = 551 ModelTypeInvalidationMap invalidation_map =
658 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); 552 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
659 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); 553 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
660 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
661 TimeTicks::Now(),
662 info,
663 ConfigurationParams()));
664
665 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 554 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
666 555
667 if (!ShouldPoll()) 556 if (!CanRunJobNow(NORMAL_PRIORITY)) {
557 SDVLOG(2) << "Unable to run a poll job right now.";
668 return; 558 return;
559 }
669 560
670 DVLOG(2) << "Polling with routes " 561 if (mode_ != NORMAL_MODE) {
562 SDVLOG(2) << "Not running poll job in configure mode.";
563 return;
564 }
565
566 SDVLOG(2) << "Polling with routes "
671 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 567 << ModelSafeRoutingInfoToString(session_context_->routing_info());
672 SyncSession session(session_context_, this, job->source_info()); 568 SyncSession session(session_context_, this, info);
673 bool premature_exit = !syncer_->SyncShare(&session, 569 syncer_->SyncShare(&session, SYNCER_BEGIN, SYNCER_END);
674 job->start_step(),
675 job->end_step());
676 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
677 570
678 FinishSyncSessionJob(job.get(), premature_exit, &session); 571 AdjustPolling(false);
679 572
680 if (IsSyncingCurrentlySilenced()) { 573 if (IsSyncingCurrentlySilenced()) {
681 // Normally we would only call RestartWaiting() if we had a 574 SDVLOG(2) << "Poll request got us throttled.";
682 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's 575 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
683 // possible that neither is set. We create the wait interval anyway because 576 // to do is start the timer.
684 // we need it to make sure we get unthrottled on time.
685 RestartWaiting(); 577 RestartWaiting();
686 } 578 }
687 } 579 }
688 580
689 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 581 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
690 DCHECK(CalledOnValidThread()); 582 DCHECK(CalledOnValidThread());
691 583
692 // We are interested in recording time between local nudges for datatypes. 584 // We are interested in recording time between local nudges for datatypes.
693 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 585 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
694 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 586 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
695 return; 587 return;
696 588
697 base::TimeTicks now = TimeTicks::Now(); 589 base::TimeTicks now = TimeTicks::Now();
698 // Update timing information for how often datatypes are triggering nudges. 590 // Update timing information for how often datatypes are triggering nudges.
699 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); 591 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin();
700 iter != info.types.end(); 592 iter != info.types.end();
701 ++iter) { 593 ++iter) {
702 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; 594 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first];
703 last_local_nudges_by_model_type_[iter->first] = now; 595 last_local_nudges_by_model_type_[iter->first] = now;
704 if (previous.is_null()) 596 if (previous.is_null())
705 continue; 597 continue;
706 598
707 #define PER_DATA_TYPE_MACRO(type_str) \ 599 #define PER_DATA_TYPE_MACRO(type_str) \
708 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 600 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
709 SYNC_DATA_TYPE_HISTOGRAM(iter->first); 601 SYNC_DATA_TYPE_HISTOGRAM(iter->first);
710 #undef PER_DATA_TYPE_MACRO 602 #undef PER_DATA_TYPE_MACRO
711 } 603 }
712 } 604 }
713 605
714 bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, 606 void SyncSchedulerImpl::AdjustPolling(bool force_reset) {
715 bool exited_prematurely,
716 SyncSession* session) {
717 DCHECK(CalledOnValidThread());
718
719 // Let job know that we're through syncing (calling SyncShare) at this point.
720 bool succeeded = false;
721 {
722 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
723 succeeded = job->Finish(exited_prematurely, session);
724 }
725
726 SDVLOG(2) << "Updating the next polling time after SyncMain";
727
728 AdjustPolling(job);
729
730 if (succeeded) {
731 // No job currently supported by the scheduler could succeed without
732 // successfully reaching the server. Therefore, if we make it here, it is
733 // appropriate to reset the backoff interval.
734 wait_interval_.reset();
735 NotifyRetryTime(base::Time());
736 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
737 }
738
739 return succeeded;
740 }
741
742 void SyncSchedulerImpl::ScheduleNextSync(
743 scoped_ptr<SyncSessionJob> finished_job,
744 SyncSession* session) {
745 DCHECK(CalledOnValidThread());
746 DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION
747 || finished_job->purpose() == SyncSessionJob::NUDGE);
748
749 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
750 // if we don't succeed. Some types of errors are not likely to disappear on
751 // their own. With the return values now available in the old_job.session,
752 // we should be able to detect such errors and only retry when we detect
753 // transient errors.
754
755 SDVLOG(2) << "SyncShare job failed; will start or update backoff";
756 HandleContinuationError(finished_job.Pass(), session);
757 }
758
759 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
760 DCHECK(CalledOnValidThread()); 607 DCHECK(CalledOnValidThread());
761 608
762 TimeDelta poll = (!session_context_->notifications_enabled()) ? 609 TimeDelta poll = (!session_context_->notifications_enabled()) ?
763 syncer_short_poll_interval_seconds_ : 610 syncer_short_poll_interval_seconds_ :
764 syncer_long_poll_interval_seconds_; 611 syncer_long_poll_interval_seconds_;
765 bool rate_changed = !poll_timer_.IsRunning() || 612 bool rate_changed = !poll_timer_.IsRunning() ||
766 poll != poll_timer_.GetCurrentDelay(); 613 poll != poll_timer_.GetCurrentDelay();
767 614
768 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) 615 if (force_reset && !rate_changed)
769 poll_timer_.Reset(); 616 poll_timer_.Reset();
770 617
771 if (!rate_changed) 618 if (!rate_changed)
772 return; 619 return;
773 620
774 // Adjust poll rate. 621 // Adjust poll rate.
775 poll_timer_.Stop(); 622 poll_timer_.Stop();
776 poll_timer_.Start(FROM_HERE, poll, this, 623 poll_timer_.Start(FROM_HERE, poll, this,
777 &SyncSchedulerImpl::PollTimerCallback); 624 &SyncSchedulerImpl::PollTimerCallback);
778 } 625 }
779 626
780 void SyncSchedulerImpl::RestartWaiting() { 627 void SyncSchedulerImpl::RestartWaiting() {
781 CHECK(wait_interval_.get()); 628 CHECK(wait_interval_.get());
782 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 629 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
630 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
631 SDVLOG(2) << "Starting WaitInterval timer of length "
632 << wait_interval_->length.InMilliseconds() << "ms.";
783 if (wait_interval_->mode == WaitInterval::THROTTLED) { 633 if (wait_interval_->mode == WaitInterval::THROTTLED) {
784 pending_wakeup_timer_.Start( 634 pending_wakeup_timer_.Start(
785 FROM_HERE, 635 FROM_HERE,
786 wait_interval_->length, 636 wait_interval_->length,
787 base::Bind(&SyncSchedulerImpl::Unthrottle, 637 base::Bind(&SyncSchedulerImpl::Unthrottle,
788 weak_ptr_factory_.GetWeakPtr())); 638 weak_ptr_factory_.GetWeakPtr()));
789 } else { 639 } else {
790 pending_wakeup_timer_.Start( 640 pending_wakeup_timer_.Start(
791 FROM_HERE, 641 FROM_HERE,
792 wait_interval_->length, 642 wait_interval_->length,
793 base::Bind(&SyncSchedulerImpl::TryCanaryJob, 643 base::Bind(&SyncSchedulerImpl::TryCanaryJob,
794 weak_ptr_factory_.GetWeakPtr())); 644 weak_ptr_factory_.GetWeakPtr()));
795 } 645 }
796 } 646 }
797 647
798 void SyncSchedulerImpl::HandleContinuationError( 648 void SyncSchedulerImpl::UpdateExponentialBackoff(
799 scoped_ptr<SyncSessionJob> old_job, 649 const sessions::ModelNeutralState& model_neutral_state) {
800 SyncSession* session) {
801 DCHECK(CalledOnValidThread()); 650 DCHECK(CalledOnValidThread());
802 651
803 TimeDelta length = delay_provider_->GetDelay( 652 TimeDelta length = delay_provider_->GetDelay(
804 IsBackingOff() ? wait_interval_->length : 653 IsBackingOff() ? wait_interval_->length :
805 delay_provider_->GetInitialDelay( 654 delay_provider_->GetInitialDelay(model_neutral_state));
806 session->status_controller().model_neutral_state()));
807
808 SDVLOG(2) << "In handle continuation error with "
809 << SyncSessionJob::GetPurposeString(old_job->purpose())
810 << " job. The time delta(ms) is "
811 << length.InMilliseconds();
812
813 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 655 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
814 length)); 656 length));
815 NotifyRetryTime(base::Time::Now() + length);
816 old_job->set_scheduled_start(TimeTicks::Now() + length);
817 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
818 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
819 // Config params should always get set.
820 DCHECK(!old_job->config_params().ready_task.is_null());
821 DCHECK(!pending_configure_job_);
822 pending_configure_job_ = old_job.Pass();
823 } else {
824 // We're not in configure mode so we should not have a configure job.
825 DCHECK(!pending_configure_job_);
826 DCHECK(!pending_nudge_job_);
827 pending_nudge_job_ = old_job.Pass();
828 }
829
830 RestartWaiting();
831 } 657 }
832 658
833 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 659 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
834 syncer_->RequestEarlyExit(); // Safe to call from any thread. 660 syncer_->RequestEarlyExit(); // Safe to call from any thread.
835 DCHECK(weak_handle_this_.IsInitialized()); 661 DCHECK(weak_handle_this_.IsInitialized());
836 SDVLOG(3) << "Posting StopImpl"; 662 SDVLOG(3) << "Posting StopImpl";
837 weak_handle_this_.Call(FROM_HERE, 663 weak_handle_this_.Call(FROM_HERE,
838 &SyncSchedulerImpl::StopImpl, 664 &SyncSchedulerImpl::StopImpl,
839 callback); 665 callback);
840 } 666 }
841 667
842 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 668 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
843 DCHECK(CalledOnValidThread()); 669 DCHECK(CalledOnValidThread());
844 SDVLOG(2) << "StopImpl called"; 670 SDVLOG(2) << "StopImpl called";
845 671
846 // Kill any in-flight method calls. 672 // Kill any in-flight method calls.
847 weak_ptr_factory_.InvalidateWeakPtrs(); 673 weak_ptr_factory_.InvalidateWeakPtrs();
848 wait_interval_.reset(); 674 wait_interval_.reset();
849 NotifyRetryTime(base::Time()); 675 NotifyRetryTime(base::Time());
850 poll_timer_.Stop(); 676 poll_timer_.Stop();
851 pending_wakeup_timer_.Stop(); 677 pending_wakeup_timer_.Stop();
852 pending_nudge_job_.reset(); 678 pending_configure_params_.reset();
853 pending_configure_job_.reset();
854 if (started_) { 679 if (started_) {
855 started_ = false; 680 started_ = false;
856 } 681 }
857 if (!callback.is_null()) 682 if (!callback.is_null())
858 callback.Run(); 683 callback.Run();
859 } 684 }
860 685
861 // This is the only place where we invoke DoSyncSessionJob with canary 686 // This is the only place where we invoke DoSyncSessionJob with canary
862 // privileges. Everyone else should use NORMAL_PRIORITY. 687 // privileges. Everyone else should use NORMAL_PRIORITY.
863 void SyncSchedulerImpl::TryCanaryJob() { 688 void SyncSchedulerImpl::TryCanaryJob() {
864 DCHECK(CalledOnValidThread()); 689 DCHECK(CalledOnValidThread());
865 690
866 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) { 691 if (mode_ == CONFIGURATION_MODE && pending_configure_params_) {
867 SDVLOG(2) << "Found pending configure job; will run as canary"; 692 SDVLOG(2) << "Found pending configure job; will run as canary";
868 DoConfigurationSyncSessionJob(CANARY_PRIORITY); 693 DoConfigurationSyncSessionJob(CANARY_PRIORITY);
869 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) { 694 } else if (mode_ == NORMAL_MODE && !nudge_tracker_.IsEmpty()) {
870 SDVLOG(2) << "Found pending nudge job; will run as canary"; 695 SDVLOG(2) << "Found pending nudge job; will run as canary";
871 DoNudgeSyncSessionJob(CANARY_PRIORITY); 696 DoNudgeSyncSessionJob(CANARY_PRIORITY);
872 } else { 697 } else {
873 SDVLOG(2) << "Found no work to do; will not run a canary"; 698 SDVLOG(2) << "Found no work to do; will not run a canary";
874 } 699 }
875 } 700 }
876 701
877 void SyncSchedulerImpl::PollTimerCallback() { 702 void SyncSchedulerImpl::PollTimerCallback() {
878 DCHECK(CalledOnValidThread()); 703 DCHECK(CalledOnValidThread());
879 if (no_scheduling_allowed_) { 704 if (no_scheduling_allowed_) {
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
995 820
996 #undef SDVLOG_LOC 821 #undef SDVLOG_LOC
997 822
998 #undef SDVLOG 823 #undef SDVLOG
999 824
1000 #undef SLOG 825 #undef SLOG
1001 826
1002 #undef ENUM_CASE 827 #undef ENUM_CASE
1003 828
1004 } // namespace syncer 829 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698