Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(227)

Side by Side Diff: components/sync/engine_impl/sync_scheduler_impl.cc

Issue 2475043002: [Sync] Sync client should to exponential backoff when receive partial failure (Closed)
Patch Set: code review Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/sync/engine_impl/sync_scheduler_impl.h" 5 #include "components/sync/engine_impl/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 #include <utility> 9 #include <utility>
10 10
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
81 } 81 }
82 82
83 void RunAndReset(base::Closure* task) { 83 void RunAndReset(base::Closure* task) {
84 DCHECK(task); 84 DCHECK(task);
85 if (task->is_null()) 85 if (task->is_null())
86 return; 86 return;
87 task->Run(); 87 task->Run();
88 task->Reset(); 88 task->Reset();
89 } 89 }
90 90
91 #define ENUM_CASE(x) \
92 case x: \
93 return #x; \
94 break;
95
91 } // namespace 96 } // namespace
92 97
93 ConfigurationParams::ConfigurationParams() 98 ConfigurationParams::ConfigurationParams()
94 : source(GetUpdatesCallerInfo::UNKNOWN) {} 99 : source(GetUpdatesCallerInfo::UNKNOWN) {}
95 ConfigurationParams::ConfigurationParams( 100 ConfigurationParams::ConfigurationParams(
96 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, 101 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
97 ModelTypeSet types_to_download, 102 ModelTypeSet types_to_download,
98 const ModelSafeRoutingInfo& routing_info, 103 const ModelSafeRoutingInfo& routing_info,
99 const base::Closure& ready_task, 104 const base::Closure& ready_task,
100 const base::Closure& retry_task) 105 const base::Closure& retry_task)
101 : source(source), 106 : source(source),
102 types_to_download(types_to_download), 107 types_to_download(types_to_download),
103 routing_info(routing_info), 108 routing_info(routing_info),
104 ready_task(ready_task), 109 ready_task(ready_task),
105 retry_task(retry_task) { 110 retry_task(retry_task) {
106 DCHECK(!ready_task.is_null()); 111 DCHECK(!ready_task.is_null());
107 } 112 }
108 ConfigurationParams::ConfigurationParams(const ConfigurationParams& other) = 113 ConfigurationParams::ConfigurationParams(const ConfigurationParams& other) =
109 default; 114 default;
110 ConfigurationParams::~ConfigurationParams() {} 115 ConfigurationParams::~ConfigurationParams() {}
111 116
112 ClearParams::ClearParams(const base::Closure& report_success_task) 117 ClearParams::ClearParams(const base::Closure& report_success_task)
113 : report_success_task(report_success_task) { 118 : report_success_task(report_success_task) {
114 DCHECK(!report_success_task.is_null()); 119 DCHECK(!report_success_task.is_null());
115 } 120 }
116 ClearParams::ClearParams(const ClearParams& other) = default; 121 ClearParams::ClearParams(const ClearParams& other) = default;
117 ClearParams::~ClearParams() {} 122 ClearParams::~ClearParams() {}
118 123
119 SyncSchedulerImpl::WaitInterval::WaitInterval() : mode(UNKNOWN) {}
120
121 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
122 : mode(mode), length(length) {}
123
124 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
125
126 #define ENUM_CASE(x) \
127 case x: \
128 return #x; \
129 break;
130
131 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
132 switch (mode) {
133 ENUM_CASE(UNKNOWN);
134 ENUM_CASE(EXPONENTIAL_BACKOFF);
135 ENUM_CASE(THROTTLED);
136 }
137 NOTREACHED();
138 return "";
139 }
140
141 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 124 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
142 NudgeSource source) { 125 NudgeSource source) {
143 switch (source) { 126 switch (source) {
144 case NUDGE_SOURCE_NOTIFICATION: 127 case NUDGE_SOURCE_NOTIFICATION:
145 return GetUpdatesCallerInfo::NOTIFICATION; 128 return GetUpdatesCallerInfo::NOTIFICATION;
146 case NUDGE_SOURCE_LOCAL: 129 case NUDGE_SOURCE_LOCAL:
147 return GetUpdatesCallerInfo::LOCAL; 130 return GetUpdatesCallerInfo::LOCAL;
148 case NUDGE_SOURCE_LOCAL_REFRESH: 131 case NUDGE_SOURCE_LOCAL_REFRESH:
149 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 132 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
150 case NUDGE_SOURCE_UNKNOWN: 133 case NUDGE_SOURCE_UNKNOWN:
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 Mode old_mode = mode_; 227 Mode old_mode = mode_;
245 mode_ = mode; 228 mode_ = mode;
246 // Only adjust the poll reset time if it was valid and in the past. 229 // Only adjust the poll reset time if it was valid and in the past.
247 if (!last_poll_time.is_null() && last_poll_time < base::Time::Now()) { 230 if (!last_poll_time.is_null() && last_poll_time < base::Time::Now()) {
248 // Convert from base::Time to base::TimeTicks. The reason we use Time 231 // Convert from base::Time to base::TimeTicks. The reason we use Time
249 // for persisting is that TimeTicks can stop making forward progress when 232 // for persisting is that TimeTicks can stop making forward progress when
250 // the machine is suspended. This implies that on resume the client might 233 // the machine is suspended. This implies that on resume the client might
251 // actually have miss the real poll, unless the client is restarted. Fixing 234 // actually have miss the real poll, unless the client is restarted. Fixing
252 // that would require using an AlarmTimer though, which is only supported 235 // that would require using an AlarmTimer though, which is only supported
253 // on certain platforms. 236 // on certain platforms.
254 last_poll_reset_ = 237 last_poll_reset_ = TimeTicks::Now() - (base::Time::Now() - last_poll_time);
255 base::TimeTicks::Now() - (base::Time::Now() - last_poll_time);
256 } 238 }
257 239
258 if (old_mode != mode_ && mode_ == NORMAL_MODE) { 240 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
259 // We just got back to normal mode. Let's try to run the work that was 241 // We just got back to normal mode. Let's try to run the work that was
260 // queued up while we were configuring. 242 // queued up while we were configuring.
261 243
262 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. 244 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
263 245
264 // Update our current time before checking IsRetryRequired(). 246 // Update our current time before checking IsRetryRequired().
265 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 247 nudge_tracker_.SetSyncCycleStartTime(TimeTicks::Now());
266 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) { 248 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
267 TrySyncCycleJob(); 249 TrySyncCycleJob();
268 } 250 }
269 } 251 }
270 } 252 }
271 253
272 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() { 254 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnblockedTypes() {
273 ModelTypeSet enabled_types = cycle_context_->GetEnabledTypes(); 255 ModelTypeSet enabled_types = cycle_context_->GetEnabledTypes();
274 ModelTypeSet enabled_protocol_types = 256 ModelTypeSet enabled_protocol_types =
275 Intersection(ProtocolTypes(), enabled_types); 257 Intersection(ProtocolTypes(), enabled_types);
276 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes(); 258 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
277 return Difference(enabled_protocol_types, throttled_types); 259 ModelTypeSet backed_off_types = nudge_tracker_.GetBackedOffTypes();
260 return Difference(enabled_protocol_types,
261 Union(throttled_types, backed_off_types));
278 } 262 }
279 263
280 void SyncSchedulerImpl::SendInitialSnapshot() { 264 void SyncSchedulerImpl::SendInitialSnapshot() {
281 DCHECK(CalledOnValidThread()); 265 DCHECK(CalledOnValidThread());
282 std::unique_ptr<SyncCycle> dummy(SyncCycle::Build(cycle_context_, this)); 266 std::unique_ptr<SyncCycle> dummy(SyncCycle::Build(cycle_context_, this));
283 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED); 267 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
284 event.snapshot = dummy->TakeSnapshot(); 268 event.snapshot = dummy->TakeSnapshot();
285 for (auto& observer : *cycle_context_->listeners()) 269 for (auto& observer : *cycle_context_->listeners())
286 observer.OnSyncCycleEvent(event); 270 observer.OnSyncCycleEvent(event);
287 } 271 }
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
387 371
388 void SyncSchedulerImpl::ScheduleLocalNudge( 372 void SyncSchedulerImpl::ScheduleLocalNudge(
389 ModelTypeSet types, 373 ModelTypeSet types,
390 const tracked_objects::Location& nudge_location) { 374 const tracked_objects::Location& nudge_location) {
391 DCHECK(CalledOnValidThread()); 375 DCHECK(CalledOnValidThread());
392 DCHECK(!types.Empty()); 376 DCHECK(!types.Empty());
393 377
394 SDVLOG_LOC(nudge_location, 2) << "Scheduling sync because of local change to " 378 SDVLOG_LOC(nudge_location, 2) << "Scheduling sync because of local change to "
395 << ModelTypeSetToString(types); 379 << ModelTypeSetToString(types);
396 UpdateNudgeTimeRecords(types); 380 UpdateNudgeTimeRecords(types);
397 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types); 381 TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
398 ScheduleNudgeImpl(nudge_delay, nudge_location); 382 ScheduleNudgeImpl(nudge_delay, nudge_location);
399 } 383 }
400 384
401 void SyncSchedulerImpl::ScheduleLocalRefreshRequest( 385 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
402 ModelTypeSet types, 386 ModelTypeSet types,
403 const tracked_objects::Location& nudge_location) { 387 const tracked_objects::Location& nudge_location) {
404 DCHECK(CalledOnValidThread()); 388 DCHECK(CalledOnValidThread());
405 DCHECK(!types.Empty()); 389 DCHECK(!types.Empty());
406 390
407 SDVLOG_LOC(nudge_location, 2) 391 SDVLOG_LOC(nudge_location, 2)
408 << "Scheduling sync because of local refresh request for " 392 << "Scheduling sync because of local refresh request for "
409 << ModelTypeSetToString(types); 393 << ModelTypeSetToString(types);
410 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types); 394 TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
411 ScheduleNudgeImpl(nudge_delay, nudge_location); 395 ScheduleNudgeImpl(nudge_delay, nudge_location);
412 } 396 }
413 397
414 void SyncSchedulerImpl::ScheduleInvalidationNudge( 398 void SyncSchedulerImpl::ScheduleInvalidationNudge(
415 ModelType model_type, 399 ModelType model_type,
416 std::unique_ptr<InvalidationInterface> invalidation, 400 std::unique_ptr<InvalidationInterface> invalidation,
417 const tracked_objects::Location& nudge_location) { 401 const tracked_objects::Location& nudge_location) {
418 DCHECK(CalledOnValidThread()); 402 DCHECK(CalledOnValidThread());
419 403
420 SDVLOG_LOC(nudge_location, 2) 404 SDVLOG_LOC(nudge_location, 2)
421 << "Scheduling sync because we received invalidation for " 405 << "Scheduling sync because we received invalidation for "
422 << ModelTypeToString(model_type); 406 << ModelTypeToString(model_type);
423 base::TimeDelta nudge_delay = nudge_tracker_.RecordRemoteInvalidation( 407 TimeDelta nudge_delay = nudge_tracker_.RecordRemoteInvalidation(
424 model_type, std::move(invalidation)); 408 model_type, std::move(invalidation));
425 ScheduleNudgeImpl(nudge_delay, nudge_location); 409 ScheduleNudgeImpl(nudge_delay, nudge_location);
426 } 410 }
427 411
428 void SyncSchedulerImpl::ScheduleInitialSyncNudge(ModelType model_type) { 412 void SyncSchedulerImpl::ScheduleInitialSyncNudge(ModelType model_type) {
429 DCHECK(CalledOnValidThread()); 413 DCHECK(CalledOnValidThread());
430 414
431 SDVLOG(2) << "Scheduling non-blocking initial sync for " 415 SDVLOG(2) << "Scheduling non-blocking initial sync for "
432 << ModelTypeToString(model_type); 416 << ModelTypeToString(model_type);
433 nudge_tracker_.RecordInitialSyncRequired(model_type); 417 nudge_tracker_.RecordInitialSyncRequired(model_type);
(...skipping 14 matching lines...) Expand all
448 return; 432 return;
449 } 433 }
450 434
451 SDVLOG_LOC(nudge_location, 2) << "In ScheduleNudgeImpl with delay " 435 SDVLOG_LOC(nudge_location, 2) << "In ScheduleNudgeImpl with delay "
452 << delay.InMilliseconds() << " ms"; 436 << delay.InMilliseconds() << " ms";
453 437
454 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) 438 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
455 return; 439 return;
456 440
457 TimeTicks incoming_run_time = TimeTicks::Now() + delay; 441 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
458 if (!scheduled_nudge_time_.is_null() && 442 if (pending_wakeup_timer_.IsRunning() &&
459 (scheduled_nudge_time_ < incoming_run_time)) { 443 (pending_wakeup_timer_.desired_run_time() < incoming_run_time)) {
460 // Old job arrives sooner than this one. Don't reschedule it. 444 // Old job arrives sooner than this one. Don't reschedule it.
461 return; 445 return;
462 } 446 }
463 447
464 // Either there is no existing nudge in flight or the incoming nudge should be 448 // Either there is no existing nudge in flight or the incoming nudge should be
465 // made to arrive first (preempt) the existing nudge. We reschedule in either 449 // made to arrive first (preempt) the existing nudge. We reschedule in either
466 // case. 450 // case.
467 SDVLOG_LOC(nudge_location, 2) << "Scheduling a nudge with " 451 SDVLOG_LOC(nudge_location, 2) << "Scheduling a nudge with "
468 << delay.InMilliseconds() << " ms delay"; 452 << delay.InMilliseconds() << " ms delay";
469 scheduled_nudge_time_ = incoming_run_time;
470 pending_wakeup_timer_.Start( 453 pending_wakeup_timer_.Start(
471 nudge_location, delay, base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, 454 nudge_location, delay, base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
472 weak_ptr_factory_.GetWeakPtr())); 455 weak_ptr_factory_.GetWeakPtr()));
473 } 456 }
474 457
475 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 458 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
476 switch (mode) { 459 switch (mode) {
477 ENUM_CASE(CONFIGURATION_MODE); 460 ENUM_CASE(CONFIGURATION_MODE);
478 ENUM_CASE(CLEAR_SERVER_DATA_MODE); 461 ENUM_CASE(CLEAR_SERVER_DATA_MODE);
479 ENUM_CASE(NORMAL_MODE); 462 ENUM_CASE(NORMAL_MODE);
480 } 463 }
481 return ""; 464 return "";
482 } 465 }
483 466
484 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) { 467 void SyncSchedulerImpl::SetDefaultNudgeDelay(TimeDelta delay_ms) {
485 DCHECK(CalledOnValidThread()); 468 DCHECK(CalledOnValidThread());
486 nudge_tracker_.SetDefaultNudgeDelay(delay_ms); 469 nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
487 } 470 }
488 471
489 void SyncSchedulerImpl::DoNudgeSyncCycleJob(JobPriority priority) { 472 void SyncSchedulerImpl::DoNudgeSyncCycleJob(JobPriority priority) {
490 DCHECK(CalledOnValidThread()); 473 DCHECK(CalledOnValidThread());
491 DCHECK(CanRunNudgeJobNow(priority)); 474 DCHECK(CanRunNudgeJobNow(priority));
492 475
493 DVLOG(2) << "Will run normal mode sync cycle with types " 476 DVLOG(2) << "Will run normal mode sync cycle with types "
494 << ModelTypeSetToString(cycle_context_->GetEnabledTypes()); 477 << ModelTypeSetToString(cycle_context_->GetEnabledTypes());
495 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this)); 478 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this));
496 bool success = syncer_->NormalSyncShare(GetEnabledAndUnthrottledTypes(), 479 bool success = syncer_->NormalSyncShare(GetEnabledAndUnblockedTypes(),
497 &nudge_tracker_, cycle.get()); 480 &nudge_tracker_, cycle.get());
498 481
499 if (success) { 482 if (success) {
500 // That cycle took care of any outstanding work we had. 483 // That cycle took care of any outstanding work we had.
501 SDVLOG(2) << "Nudge succeeded."; 484 SDVLOG(2) << "Nudge succeeded.";
502 nudge_tracker_.RecordSuccessfulSyncCycle(); 485 nudge_tracker_.RecordSuccessfulSyncCycle();
503 scheduled_nudge_time_ = base::TimeTicks();
504 HandleSuccess(); 486 HandleSuccess();
505 487
506 // If this was a canary, we may need to restart the poll timer (the poll 488 // If this was a canary, we may need to restart the poll timer (the poll
507 // timer may have fired while the scheduler was in an error state, ignoring 489 // timer may have fired while the scheduler was in an error state, ignoring
508 // the poll). 490 // the poll).
509 if (!poll_timer_.IsRunning()) { 491 if (!poll_timer_.IsRunning()) {
510 SDVLOG(1) << "Canary succeeded, restarting polling."; 492 SDVLOG(1) << "Canary succeeded, restarting polling.";
511 AdjustPolling(UPDATE_INTERVAL); 493 AdjustPolling(UPDATE_INTERVAL);
512 } 494 }
513 } else { 495 } else {
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
594 wait_interval_ = base::MakeUnique<WaitInterval>( 576 wait_interval_ = base::MakeUnique<WaitInterval>(
595 WaitInterval::EXPONENTIAL_BACKOFF, length); 577 WaitInterval::EXPONENTIAL_BACKOFF, length);
596 SDVLOG(2) << "Sync cycle failed. Will back off for " 578 SDVLOG(2) << "Sync cycle failed. Will back off for "
597 << wait_interval_->length.InMilliseconds() << "ms."; 579 << wait_interval_->length.InMilliseconds() << "ms.";
598 } 580 }
599 RestartWaiting(); 581 RestartWaiting();
600 } 582 }
601 583
602 void SyncSchedulerImpl::DoPollSyncCycleJob() { 584 void SyncSchedulerImpl::DoPollSyncCycleJob() {
603 SDVLOG(2) << "Polling with types " 585 SDVLOG(2) << "Polling with types "
604 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes()); 586 << ModelTypeSetToString(GetEnabledAndUnblockedTypes());
605 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this)); 587 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this));
606 bool success = 588 bool success =
607 syncer_->PollSyncShare(GetEnabledAndUnthrottledTypes(), cycle.get()); 589 syncer_->PollSyncShare(GetEnabledAndUnblockedTypes(), cycle.get());
608 590
609 // Only restart the timer if the poll succeeded. Otherwise rely on normal 591 // Only restart the timer if the poll succeeded. Otherwise rely on normal
610 // failure handling to retry with backoff. 592 // failure handling to retry with backoff.
611 if (success) { 593 if (success) {
612 AdjustPolling(FORCE_RESET); 594 AdjustPolling(FORCE_RESET);
613 HandleSuccess(); 595 HandleSuccess();
614 } else { 596 } else {
615 HandleFailure(cycle->status_controller().model_neutral_state()); 597 HandleFailure(cycle->status_controller().model_neutral_state());
616 } 598 }
617 } 599 }
618 600
619 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { 601 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
620 DCHECK(CalledOnValidThread()); 602 DCHECK(CalledOnValidThread());
621 base::TimeTicks now = TimeTicks::Now(); 603 TimeTicks now = TimeTicks::Now();
622 // Update timing information for how often datatypes are triggering nudges. 604 // Update timing information for how often datatypes are triggering nudges.
623 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { 605 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
624 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; 606 TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
625 last_local_nudges_by_model_type_[iter.Get()] = now; 607 last_local_nudges_by_model_type_[iter.Get()] = now;
626 if (previous.is_null()) 608 if (previous.is_null())
627 continue; 609 continue;
628 610
629 #define PER_DATA_TYPE_MACRO(type_str) \ 611 #define PER_DATA_TYPE_MACRO(type_str) \
630 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 612 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
631 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); 613 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
632 #undef PER_DATA_TYPE_MACRO 614 #undef PER_DATA_TYPE_MACRO
633 } 615 }
634 } 616 }
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
677 659
678 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes() 660 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes()
679 << " minutes."; 661 << " minutes.";
680 662
681 // Adjust poll rate. Start will reset the timer if it was already running. 663 // Adjust poll rate. Start will reset the timer if it was already running.
682 poll_timer_.Start(FROM_HERE, poll_delay, this, 664 poll_timer_.Start(FROM_HERE, poll_delay, this,
683 &SyncSchedulerImpl::PollTimerCallback); 665 &SyncSchedulerImpl::PollTimerCallback);
684 } 666 }
685 667
686 void SyncSchedulerImpl::RestartWaiting() { 668 void SyncSchedulerImpl::RestartWaiting() {
687 CHECK(wait_interval_.get()); 669 if (wait_interval_.get()) {
688 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 670 // Global throttling or backoff
689 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 671 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
690 SDVLOG(2) << "Starting WaitInterval timer of length " 672 SDVLOG(2) << "Starting WaitInterval timer of length "
691 << wait_interval_->length.InMilliseconds() << "ms."; 673 << wait_interval_->length.InMilliseconds() << "ms.";
692 if (wait_interval_->mode == WaitInterval::THROTTLED) { 674 if (wait_interval_->mode == WaitInterval::THROTTLED) {
693 pending_wakeup_timer_.Start(FROM_HERE, wait_interval_->length, 675 pending_wakeup_timer_.Start(FROM_HERE, wait_interval_->length,
694 base::Bind(&SyncSchedulerImpl::Unthrottle, 676 base::Bind(&SyncSchedulerImpl::Unthrottle,
677 weak_ptr_factory_.GetWeakPtr()));
678 } else {
679 pending_wakeup_timer_.Start(
680 FROM_HERE, wait_interval_->length,
681 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
682 weak_ptr_factory_.GetWeakPtr()));
683 }
684 } else if (nudge_tracker_.IsAnyTypeThrottled() ||
685 nudge_tracker_.IsAnyTypeBackedOff()) {
686 // Per-datatype throttled or backed off.
687 TimeDelta time_until_next_unblock =
688 nudge_tracker_.GetTimeUntilNextUnblock();
689 pending_wakeup_timer_.Start(FROM_HERE, time_until_next_unblock,
690 base::Bind(&SyncSchedulerImpl::OnTypesUnblocked,
695 weak_ptr_factory_.GetWeakPtr())); 691 weak_ptr_factory_.GetWeakPtr()));
696 } else {
697 pending_wakeup_timer_.Start(
698 FROM_HERE, wait_interval_->length,
699 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
700 weak_ptr_factory_.GetWeakPtr()));
701 } 692 }
702 } 693 }
703 694
704 void SyncSchedulerImpl::Stop() { 695 void SyncSchedulerImpl::Stop() {
705 DCHECK(CalledOnValidThread()); 696 DCHECK(CalledOnValidThread());
706 SDVLOG(2) << "Stop called"; 697 SDVLOG(2) << "Stop called";
707 698
708 // Kill any in-flight method calls. 699 // Kill any in-flight method calls.
709 weak_ptr_factory_.InvalidateWeakPtrs(); 700 weak_ptr_factory_.InvalidateWeakPtrs();
710 wait_interval_.reset(); 701 wait_interval_.reset();
(...skipping 19 matching lines...) Expand all
730 // access token will be here. 721 // access token will be here.
731 base::ThreadTaskRunnerHandle::Get()->PostTask( 722 base::ThreadTaskRunnerHandle::Get()->PostTask(
732 FROM_HERE, base::Bind(&SyncSchedulerImpl::TrySyncCycleJobImpl, 723 FROM_HERE, base::Bind(&SyncSchedulerImpl::TrySyncCycleJobImpl,
733 weak_ptr_factory_.GetWeakPtr())); 724 weak_ptr_factory_.GetWeakPtr()));
734 } 725 }
735 726
736 void SyncSchedulerImpl::TrySyncCycleJobImpl() { 727 void SyncSchedulerImpl::TrySyncCycleJobImpl() {
737 JobPriority priority = next_sync_cycle_job_priority_; 728 JobPriority priority = next_sync_cycle_job_priority_;
738 next_sync_cycle_job_priority_ = NORMAL_PRIORITY; 729 next_sync_cycle_job_priority_ = NORMAL_PRIORITY;
739 730
740 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 731 nudge_tracker_.SetSyncCycleStartTime(TimeTicks::Now());
741 732
742 DCHECK(CalledOnValidThread()); 733 DCHECK(CalledOnValidThread());
743 if (mode_ == CONFIGURATION_MODE) { 734 if (mode_ == CONFIGURATION_MODE) {
744 if (pending_configure_params_) { 735 if (pending_configure_params_) {
745 SDVLOG(2) << "Found pending configure job"; 736 SDVLOG(2) << "Found pending configure job";
746 DoConfigurationSyncCycleJob(priority); 737 DoConfigurationSyncCycleJob(priority);
747 } 738 }
748 } else if (mode_ == CLEAR_SERVER_DATA_MODE) { 739 } else if (mode_ == CLEAR_SERVER_DATA_MODE) {
749 if (pending_clear_params_) { 740 if (pending_clear_params_) {
750 DoClearServerDataSyncCycleJob(priority); 741 DoClearServerDataSyncCycleJob(priority);
751 } 742 }
752 } else if (CanRunNudgeJobNow(priority)) { 743 } else if (CanRunNudgeJobNow(priority)) {
753 if (nudge_tracker_.IsSyncRequired()) { 744 if (nudge_tracker_.IsSyncRequired()) {
754 SDVLOG(2) << "Found pending nudge job"; 745 SDVLOG(2) << "Found pending nudge job";
755 DoNudgeSyncCycleJob(priority); 746 DoNudgeSyncCycleJob(priority);
756 } else if (((base::TimeTicks::Now() - last_poll_reset_) >= 747 } else if (((TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
757 GetPollInterval())) {
758 SDVLOG(2) << "Found pending poll"; 748 SDVLOG(2) << "Found pending poll";
759 DoPollSyncCycleJob(); 749 DoPollSyncCycleJob();
760 } 750 }
761 } else { 751 } else {
762 // We must be in an error state. Transitioning out of each of these 752 // We must be in an error state. Transitioning out of each of these
763 // error states should trigger a canary job. 753 // error states should trigger a canary job.
764 DCHECK(IsCurrentlyThrottled() || IsBackingOff() || 754 DCHECK(IsCurrentlyThrottled() || IsBackingOff() ||
765 cycle_context_->connection_manager()->HasInvalidAuthToken()); 755 cycle_context_->connection_manager()->HasInvalidAuthToken());
766 } 756 }
767 757
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
799 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 789 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
800 790
801 // We treat this as a 'canary' in the sense that it was originally scheduled 791 // We treat this as a 'canary' in the sense that it was originally scheduled
802 // to run some time ago, failed, and we now want to retry, versus a job that 792 // to run some time ago, failed, and we now want to retry, versus a job that
803 // was just created (e.g via ScheduleNudgeImpl). The main implication is 793 // was just created (e.g via ScheduleNudgeImpl). The main implication is
804 // that we're careful to update routing info (etc) with such potentially 794 // that we're careful to update routing info (etc) with such potentially
805 // stale canary jobs. 795 // stale canary jobs.
806 TryCanaryJob(); 796 TryCanaryJob();
807 } 797 }
808 798
809 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) { 799 void SyncSchedulerImpl::OnTypesUnblocked() {
810 DCHECK(CalledOnValidThread()); 800 DCHECK(CalledOnValidThread());
811 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time); 801 nudge_tracker_.UpdateTypeThrottlingAndBackoffState();
Nicolas Zea 2016/11/10 23:28:21 See my other comment about this method. I think th
Gang Wu 2016/11/11 19:15:30 Yes, just added another blocking state, EXPONENTIA
812 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 802 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
803 NotifyBackedOffTypesChanged(nudge_tracker_.GetBackedOffTypes());
813 804
814 if (nudge_tracker_.IsAnyTypeThrottled()) { 805 RestartWaiting();
815 const base::TimeTicks now = base::TimeTicks::Now();
816 base::TimeDelta time_until_next_unthrottle =
817 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
818 type_unthrottle_timer_.Start(FROM_HERE, time_until_next_unthrottle,
819 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
820 weak_ptr_factory_.GetWeakPtr(),
821 now + time_until_next_unthrottle));
822 }
823 806
824 // Maybe this is a good time to run a nudge job. Let's try it. 807 // Maybe this is a good time to run a nudge job. Let's try it.
825 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) 808 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
826 TrySyncCycleJob(); 809 TrySyncCycleJob();
827 } 810 }
828 811
829 void SyncSchedulerImpl::PerformDelayedNudge() { 812 void SyncSchedulerImpl::PerformDelayedNudge() {
830 // Circumstances may have changed since we scheduled this delayed nudge. 813 // Circumstances may have changed since we scheduled this delayed nudge.
831 // We must check to see if it's OK to run the job before we do so. 814 // We must check to see if it's OK to run the job before we do so.
832 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) 815 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
(...skipping 13 matching lines...) Expand all
846 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { 829 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
847 for (auto& observer : *cycle_context_->listeners()) 830 for (auto& observer : *cycle_context_->listeners())
848 observer.OnRetryTimeChanged(retry_time); 831 observer.OnRetryTimeChanged(retry_time);
849 } 832 }
850 833
851 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) { 834 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
852 for (auto& observer : *cycle_context_->listeners()) 835 for (auto& observer : *cycle_context_->listeners())
853 observer.OnThrottledTypesChanged(types); 836 observer.OnThrottledTypesChanged(types);
854 } 837 }
855 838
839 void SyncSchedulerImpl::NotifyBackedOffTypesChanged(ModelTypeSet types) {
840 for (auto& observer : *cycle_context_->listeners())
841 observer.OnBackedOffTypesChanged(types);
842 }
843
856 bool SyncSchedulerImpl::IsBackingOff() const { 844 bool SyncSchedulerImpl::IsBackingOff() const {
857 DCHECK(CalledOnValidThread()); 845 DCHECK(CalledOnValidThread());
858 return wait_interval_.get() && 846 return wait_interval_.get() &&
859 wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF; 847 wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF;
860 } 848 }
861 849
862 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) { 850 void SyncSchedulerImpl::OnThrottled(const TimeDelta& throttle_duration) {
863 DCHECK(CalledOnValidThread()); 851 DCHECK(CalledOnValidThread());
864 wait_interval_ = base::MakeUnique<WaitInterval>(WaitInterval::THROTTLED, 852 wait_interval_ = base::MakeUnique<WaitInterval>(WaitInterval::THROTTLED,
865 throttle_duration); 853 throttle_duration);
866 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 854 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
867 NotifyThrottledTypesChanged(ModelTypeSet::All()); 855 NotifyThrottledTypesChanged(ModelTypeSet::All());
868 } 856 }
869 857
870 void SyncSchedulerImpl::OnTypesThrottled( 858 void SyncSchedulerImpl::OnTypesThrottled(ModelTypeSet types,
871 ModelTypeSet types, 859 const TimeDelta& throttle_duration) {
872 const base::TimeDelta& throttle_duration) { 860 TimeTicks now = TimeTicks::Now();
873 base::TimeTicks now = base::TimeTicks::Now();
874 861
875 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for " 862 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for "
876 << throttle_duration.InMinutes() << " minutes."; 863 << throttle_duration.InMinutes() << " minutes.";
877 864
878 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); 865 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
879 base::TimeDelta time_until_next_unthrottle = 866 RestartWaiting();
880 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
881 type_unthrottle_timer_.Start(FROM_HERE, time_until_next_unthrottle,
882 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
883 weak_ptr_factory_.GetWeakPtr(),
884 now + time_until_next_unthrottle));
885 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 867 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
886 } 868 }
887 869
870 void SyncSchedulerImpl::OnTypesBackedOff(ModelTypeSet types) {
871 TimeTicks now = TimeTicks::Now();
872
873 for (ModelTypeSet::Iterator type = types.First(); type.Good(); type.Inc()) {
874 TimeDelta last_backoff_time =
875 TimeDelta::FromSeconds(kInitialBackoffRetrySeconds);
876 if (nudge_tracker_.IsTypeBackedOff(type.Get())) {
877 last_backoff_time = nudge_tracker_.GetTypeLastBackoffInterval(type.Get());
878 }
879
880 TimeDelta length = delay_provider_->GetDelay(last_backoff_time);
881 nudge_tracker_.SetTypeBackedOff(type.Get(), length, now);
882 SDVLOG(1) << "Backing off " << ModelTypeToString(type.Get()) << " for "
883 << length.InSeconds() << " second.";
884 }
885 RestartWaiting();
886 NotifyBackedOffTypesChanged(nudge_tracker_.GetBackedOffTypes());
887 }
888
888 bool SyncSchedulerImpl::IsCurrentlyThrottled() { 889 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
889 DCHECK(CalledOnValidThread()); 890 DCHECK(CalledOnValidThread());
890 return wait_interval_.get() && 891 return wait_interval_.get() &&
891 wait_interval_->mode == WaitInterval::THROTTLED; 892 wait_interval_->mode == WaitInterval::THROTTLED;
892 } 893 }
893 894
894 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 895 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
895 const base::TimeDelta& new_interval) { 896 const TimeDelta& new_interval) {
896 DCHECK(CalledOnValidThread()); 897 DCHECK(CalledOnValidThread());
897 if (new_interval == syncer_short_poll_interval_seconds_) 898 if (new_interval == syncer_short_poll_interval_seconds_)
898 return; 899 return;
899 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes() 900 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes()
900 << " minutes."; 901 << " minutes.";
901 syncer_short_poll_interval_seconds_ = new_interval; 902 syncer_short_poll_interval_seconds_ = new_interval;
902 AdjustPolling(UPDATE_INTERVAL); 903 AdjustPolling(UPDATE_INTERVAL);
903 } 904 }
904 905
905 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( 906 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
906 const base::TimeDelta& new_interval) { 907 const TimeDelta& new_interval) {
907 DCHECK(CalledOnValidThread()); 908 DCHECK(CalledOnValidThread());
908 if (new_interval == syncer_long_poll_interval_seconds_) 909 if (new_interval == syncer_long_poll_interval_seconds_)
909 return; 910 return;
910 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes() 911 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes()
911 << " minutes."; 912 << " minutes.";
912 syncer_long_poll_interval_seconds_ = new_interval; 913 syncer_long_poll_interval_seconds_ = new_interval;
913 AdjustPolling(UPDATE_INTERVAL); 914 AdjustPolling(UPDATE_INTERVAL);
914 } 915 }
915 916
916 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays( 917 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
917 const std::map<ModelType, base::TimeDelta>& nudge_delays) { 918 const std::map<ModelType, TimeDelta>& nudge_delays) {
918 DCHECK(CalledOnValidThread()); 919 DCHECK(CalledOnValidThread());
919 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays); 920 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
920 } 921 }
921 922
922 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { 923 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
923 if (size > 0) 924 if (size > 0)
924 nudge_tracker_.SetHintBufferSize(size); 925 nudge_tracker_.SetHintBufferSize(size);
925 else 926 else
926 NOTREACHED() << "Hint buffer size should be > 0."; 927 NOTREACHED() << "Hint buffer size should be > 0.";
927 } 928 }
928 929
929 void SyncSchedulerImpl::OnSyncProtocolError( 930 void SyncSchedulerImpl::OnSyncProtocolError(
930 const SyncProtocolError& sync_protocol_error) { 931 const SyncProtocolError& sync_protocol_error) {
931 DCHECK(CalledOnValidThread()); 932 DCHECK(CalledOnValidThread());
932 if (ShouldRequestEarlyExit(sync_protocol_error)) { 933 if (ShouldRequestEarlyExit(sync_protocol_error)) {
933 SDVLOG(2) << "Sync Scheduler requesting early exit."; 934 SDVLOG(2) << "Sync Scheduler requesting early exit.";
934 Stop(); 935 Stop();
935 } 936 }
936 if (IsActionableError(sync_protocol_error)) { 937 if (IsActionableError(sync_protocol_error)) {
937 SDVLOG(2) << "OnActionableError"; 938 SDVLOG(2) << "OnActionableError";
938 for (auto& observer : *cycle_context_->listeners()) 939 for (auto& observer : *cycle_context_->listeners())
939 observer.OnActionableError(sync_protocol_error); 940 observer.OnActionableError(sync_protocol_error);
940 } 941 }
941 } 942 }
942 943
943 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) { 944 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const TimeDelta& delay) {
944 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay); 945 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
945 retry_timer_.Start(FROM_HERE, delay, this, 946 retry_timer_.Start(FROM_HERE, delay, this,
946 &SyncSchedulerImpl::RetryTimerCallback); 947 &SyncSchedulerImpl::RetryTimerCallback);
947 } 948 }
948 949
949 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) { 950 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
950 for (auto& observer : *cycle_context_->listeners()) 951 for (auto& observer : *cycle_context_->listeners())
951 observer.OnMigrationRequested(types); 952 observer.OnMigrationRequested(types);
952 } 953 }
953 954
954 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { 955 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
955 DCHECK(CalledOnValidThread()); 956 DCHECK(CalledOnValidThread());
956 cycle_context_->set_notifications_enabled(notifications_enabled); 957 cycle_context_->set_notifications_enabled(notifications_enabled);
957 if (notifications_enabled) 958 if (notifications_enabled)
958 nudge_tracker_.OnInvalidationsEnabled(); 959 nudge_tracker_.OnInvalidationsEnabled();
959 else 960 else
960 nudge_tracker_.OnInvalidationsDisabled(); 961 nudge_tracker_.OnInvalidationsDisabled();
961 } 962 }
962 963
963 #undef SDVLOG_LOC 964 #undef SDVLOG_LOC
964 965
965 #undef SDVLOG 966 #undef SDVLOG
966 967
967 #undef SLOG 968 #undef SLOG
968 969
969 #undef ENUM_CASE 970 #undef ENUM_CASE
970 971
971 } // namespace syncer 972 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698