OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "components/sync/engine_impl/sync_scheduler_impl.h" | 5 #include "components/sync/engine_impl/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
142 mode_(CONFIGURATION_MODE), | 142 mode_(CONFIGURATION_MODE), |
143 delay_provider_(delay_provider), | 143 delay_provider_(delay_provider), |
144 syncer_(syncer), | 144 syncer_(syncer), |
145 cycle_context_(context), | 145 cycle_context_(context), |
146 next_sync_cycle_job_priority_(NORMAL_PRIORITY), | 146 next_sync_cycle_job_priority_(NORMAL_PRIORITY), |
147 ignore_auth_credentials_(ignore_auth_credentials), | 147 ignore_auth_credentials_(ignore_auth_credentials), |
148 weak_ptr_factory_(this) {} | 148 weak_ptr_factory_(this) {} |
149 | 149 |
150 SyncSchedulerImpl::~SyncSchedulerImpl() { | 150 SyncSchedulerImpl::~SyncSchedulerImpl() { |
151 DCHECK(CalledOnValidThread()); | 151 DCHECK(CalledOnValidThread()); |
152 | |
152 Stop(); | 153 Stop(); |
153 } | 154 } |
154 | 155 |
155 void SyncSchedulerImpl::OnCredentialsUpdated() { | 156 void SyncSchedulerImpl::OnCredentialsUpdated() { |
156 DCHECK(CalledOnValidThread()); | 157 DCHECK(CalledOnValidThread()); |
157 | 158 |
158 if (HttpResponse::SYNC_AUTH_ERROR == | 159 if (HttpResponse::SYNC_AUTH_ERROR == |
159 cycle_context_->connection_manager()->server_status()) { | 160 cycle_context_->connection_manager()->server_status()) { |
160 OnServerConnectionErrorFixed(); | 161 OnServerConnectionErrorFixed(); |
161 } | 162 } |
162 } | 163 } |
163 | 164 |
164 void SyncSchedulerImpl::OnConnectionStatusChange() { | 165 void SyncSchedulerImpl::OnConnectionStatusChange() { |
166 DCHECK(CalledOnValidThread()); | |
167 | |
165 if (HttpResponse::CONNECTION_UNAVAILABLE == | 168 if (HttpResponse::CONNECTION_UNAVAILABLE == |
166 cycle_context_->connection_manager()->server_status()) { | 169 cycle_context_->connection_manager()->server_status()) { |
167 // Optimistically assume that the connection is fixed and try | 170 // Optimistically assume that the connection is fixed and try |
168 // connecting. | 171 // connecting. |
169 OnServerConnectionErrorFixed(); | 172 OnServerConnectionErrorFixed(); |
170 } | 173 } |
171 } | 174 } |
172 | 175 |
173 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 176 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
174 // There could be a pending nudge or configuration job in several cases: | 177 // There could be a pending nudge or configuration job in several cases: |
175 // | 178 // |
176 // 1. We're in exponential backoff. | 179 // 1. We're in exponential backoff. |
177 // 2. We're silenced / throttled. | 180 // 2. We're silenced / throttled. |
178 // 3. A nudge was saved previously due to not having a valid auth token. | 181 // 3. A nudge was saved previously due to not having a valid auth token. |
179 // 4. A nudge was scheduled + saved while in configuration mode. | 182 // 4. A nudge was scheduled + saved while in configuration mode. |
180 // | 183 // |
181 // In all cases except (2), we want to retry contacting the server. We | 184 // In all cases except (2), we want to retry contacting the server. We |
182 // call TryCanaryJob to achieve this, and note that nothing -- not even a | 185 // call TryCanaryJob to achieve this, and note that nothing -- not even a |
183 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | 186 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that |
184 // has the authority to do that is the Unthrottle timer. | 187 // has the authority to do that is the Unthrottle timer. |
185 TryCanaryJob(); | 188 TryCanaryJob(); |
186 } | 189 } |
187 | 190 |
188 void SyncSchedulerImpl::Start(Mode mode, base::Time last_poll_time) { | 191 void SyncSchedulerImpl::Start(Mode mode, base::Time last_poll_time) { |
189 DCHECK(CalledOnValidThread()); | 192 DCHECK(CalledOnValidThread()); |
193 | |
190 std::string thread_name = base::PlatformThread::GetName(); | 194 std::string thread_name = base::PlatformThread::GetName(); |
191 if (thread_name.empty()) | 195 if (thread_name.empty()) |
192 thread_name = "<Main thread>"; | 196 thread_name = "<Main thread>"; |
193 SDVLOG(2) << "Start called from thread " << thread_name << " with mode " | 197 SDVLOG(2) << "Start called from thread " << thread_name << " with mode " |
194 << GetModeString(mode); | 198 << GetModeString(mode); |
195 if (!started_) { | 199 if (!started_) { |
196 started_ = true; | 200 started_ = true; |
197 SendInitialSnapshot(); | 201 SendInitialSnapshot(); |
198 } | 202 } |
199 | 203 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
232 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnblockedTypes() { | 236 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnblockedTypes() { |
233 ModelTypeSet enabled_types = cycle_context_->GetEnabledTypes(); | 237 ModelTypeSet enabled_types = cycle_context_->GetEnabledTypes(); |
234 ModelTypeSet enabled_protocol_types = | 238 ModelTypeSet enabled_protocol_types = |
235 Intersection(ProtocolTypes(), enabled_types); | 239 Intersection(ProtocolTypes(), enabled_types); |
236 ModelTypeSet blocked_types = nudge_tracker_.GetBlockedTypes(); | 240 ModelTypeSet blocked_types = nudge_tracker_.GetBlockedTypes(); |
237 return Difference(enabled_protocol_types, blocked_types); | 241 return Difference(enabled_protocol_types, blocked_types); |
238 } | 242 } |
239 | 243 |
240 void SyncSchedulerImpl::SendInitialSnapshot() { | 244 void SyncSchedulerImpl::SendInitialSnapshot() { |
241 DCHECK(CalledOnValidThread()); | 245 DCHECK(CalledOnValidThread()); |
246 | |
242 std::unique_ptr<SyncCycle> dummy(SyncCycle::Build(cycle_context_, this)); | 247 std::unique_ptr<SyncCycle> dummy(SyncCycle::Build(cycle_context_, this)); |
243 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED); | 248 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED); |
244 event.snapshot = dummy->TakeSnapshot(); | 249 event.snapshot = dummy->TakeSnapshot(); |
245 for (auto& observer : *cycle_context_->listeners()) | 250 for (auto& observer : *cycle_context_->listeners()) |
246 observer.OnSyncCycleEvent(event); | 251 observer.OnSyncCycleEvent(event); |
247 } | 252 } |
248 | 253 |
249 void SyncSchedulerImpl::ScheduleConfiguration( | 254 void SyncSchedulerImpl::ScheduleConfiguration( |
250 const ConfigurationParams& params) { | 255 const ConfigurationParams& params) { |
251 DCHECK(CalledOnValidThread()); | 256 DCHECK(CalledOnValidThread()); |
(...skipping 16 matching lines...) Expand all Loading... | |
268 params.ready_task.Run(); | 273 params.ready_task.Run(); |
269 } | 274 } |
270 } | 275 } |
271 | 276 |
272 void SyncSchedulerImpl::ScheduleClearServerData(const ClearParams& params) { | 277 void SyncSchedulerImpl::ScheduleClearServerData(const ClearParams& params) { |
273 DCHECK(CalledOnValidThread()); | 278 DCHECK(CalledOnValidThread()); |
274 DCHECK_EQ(CLEAR_SERVER_DATA_MODE, mode_); | 279 DCHECK_EQ(CLEAR_SERVER_DATA_MODE, mode_); |
275 DCHECK(!pending_configure_params_); | 280 DCHECK(!pending_configure_params_); |
276 DCHECK(!params.report_success_task.is_null()); | 281 DCHECK(!params.report_success_task.is_null()); |
277 CHECK(started_) << "Scheduler must be running to clear."; | 282 CHECK(started_) << "Scheduler must be running to clear."; |
283 | |
278 pending_clear_params_ = base::MakeUnique<ClearParams>(params); | 284 pending_clear_params_ = base::MakeUnique<ClearParams>(params); |
279 TrySyncCycleJob(); | 285 TrySyncCycleJob(); |
280 } | 286 } |
281 | 287 |
282 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { | 288 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { |
283 DCHECK(CalledOnValidThread()); | 289 DCHECK(CalledOnValidThread()); |
290 | |
284 if (IsCurrentlyThrottled()) { | 291 if (IsCurrentlyThrottled()) { |
285 SDVLOG(1) << "Unable to run a job because we're throttled."; | 292 SDVLOG(1) << "Unable to run a job because we're throttled."; |
286 return false; | 293 return false; |
287 } | 294 } |
288 | 295 |
289 if (IsBackingOff() && priority != CANARY_PRIORITY) { | 296 if (IsBackingOff() && priority != CANARY_PRIORITY) { |
290 SDVLOG(1) << "Unable to run a job because we're backing off."; | 297 SDVLOG(1) << "Unable to run a job because we're backing off."; |
291 return false; | 298 return false; |
292 } | 299 } |
293 | 300 |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
385 << "Dropping nudge, scheduler is not running."; | 392 << "Dropping nudge, scheduler is not running."; |
386 return; | 393 return; |
387 } | 394 } |
388 | 395 |
389 SDVLOG_LOC(nudge_location, 2) << "In ScheduleNudgeImpl with delay " | 396 SDVLOG_LOC(nudge_location, 2) << "In ScheduleNudgeImpl with delay " |
390 << delay.InMilliseconds() << " ms"; | 397 << delay.InMilliseconds() << " ms"; |
391 | 398 |
392 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) | 399 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) |
393 return; | 400 return; |
394 | 401 |
395 TimeTicks incoming_run_time = TimeTicks::Now() + delay; | 402 if (!IsEarlierThanCurrentPendingJob(delay)) { |
396 if (pending_wakeup_timer_.IsRunning() && | |
397 (pending_wakeup_timer_.desired_run_time() < incoming_run_time)) { | |
398 // Old job arrives sooner than this one. Don't reschedule it. | 403 // Old job arrives sooner than this one. Don't reschedule it. |
399 return; | 404 return; |
400 } | 405 } |
401 | 406 |
402 // Either there is no existing nudge in flight or the incoming nudge should be | 407 // Either there is no existing nudge in flight or the incoming nudge should be |
403 // made to arrive first (preempt) the existing nudge. We reschedule in either | 408 // made to arrive first (preempt) the existing nudge. We reschedule in either |
404 // case. | 409 // case. |
405 SDVLOG_LOC(nudge_location, 2) << "Scheduling a nudge with " | 410 SDVLOG_LOC(nudge_location, 2) << "Scheduling a nudge with " |
406 << delay.InMilliseconds() << " ms delay"; | 411 << delay.InMilliseconds() << " ms delay"; |
407 pending_wakeup_timer_.Start( | 412 pending_wakeup_timer_.Start( |
408 nudge_location, delay, base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, | 413 nudge_location, delay, base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, |
409 weak_ptr_factory_.GetWeakPtr())); | 414 weak_ptr_factory_.GetWeakPtr())); |
410 } | 415 } |
411 | 416 |
412 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 417 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
413 switch (mode) { | 418 switch (mode) { |
414 ENUM_CASE(CONFIGURATION_MODE); | 419 ENUM_CASE(CONFIGURATION_MODE); |
415 ENUM_CASE(CLEAR_SERVER_DATA_MODE); | 420 ENUM_CASE(CLEAR_SERVER_DATA_MODE); |
416 ENUM_CASE(NORMAL_MODE); | 421 ENUM_CASE(NORMAL_MODE); |
417 } | 422 } |
418 return ""; | 423 return ""; |
419 } | 424 } |
420 | 425 |
421 void SyncSchedulerImpl::SetDefaultNudgeDelay(TimeDelta delay_ms) { | 426 void SyncSchedulerImpl::SetDefaultNudgeDelay(TimeDelta delay_ms) { |
422 DCHECK(CalledOnValidThread()); | 427 DCHECK(CalledOnValidThread()); |
428 | |
423 nudge_tracker_.SetDefaultNudgeDelay(delay_ms); | 429 nudge_tracker_.SetDefaultNudgeDelay(delay_ms); |
424 } | 430 } |
425 | 431 |
426 void SyncSchedulerImpl::DoNudgeSyncCycleJob(JobPriority priority) { | 432 void SyncSchedulerImpl::DoNudgeSyncCycleJob(JobPriority priority) { |
427 DCHECK(CalledOnValidThread()); | 433 DCHECK(CalledOnValidThread()); |
428 DCHECK(CanRunNudgeJobNow(priority)); | 434 DCHECK(CanRunNudgeJobNow(priority)); |
429 | 435 |
430 DVLOG(2) << "Will run normal mode sync cycle with types " | 436 DVLOG(2) << "Will run normal mode sync cycle with types " |
431 << ModelTypeSetToString(GetEnabledAndUnblockedTypes()); | 437 << ModelTypeSetToString(GetEnabledAndUnblockedTypes()); |
432 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this)); | 438 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this)); |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
525 SDVLOG(2) << "Sync cycle failed. Will back off for " | 531 SDVLOG(2) << "Sync cycle failed. Will back off for " |
526 << wait_interval_->length.InMilliseconds() << "ms."; | 532 << wait_interval_->length.InMilliseconds() << "ms."; |
527 } else { | 533 } else { |
528 // Increase our backoff interval and schedule another retry. | 534 // Increase our backoff interval and schedule another retry. |
529 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); | 535 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); |
530 wait_interval_ = base::MakeUnique<WaitInterval>( | 536 wait_interval_ = base::MakeUnique<WaitInterval>( |
531 WaitInterval::EXPONENTIAL_BACKOFF, length); | 537 WaitInterval::EXPONENTIAL_BACKOFF, length); |
532 SDVLOG(2) << "Sync cycle failed. Will back off for " | 538 SDVLOG(2) << "Sync cycle failed. Will back off for " |
533 << wait_interval_->length.InMilliseconds() << "ms."; | 539 << wait_interval_->length.InMilliseconds() << "ms."; |
534 } | 540 } |
535 RestartWaiting(); | |
536 } | 541 } |
537 | 542 |
538 void SyncSchedulerImpl::DoPollSyncCycleJob() { | 543 void SyncSchedulerImpl::DoPollSyncCycleJob() { |
539 SDVLOG(2) << "Polling with types " | 544 SDVLOG(2) << "Polling with types " |
540 << ModelTypeSetToString(GetEnabledAndUnblockedTypes()); | 545 << ModelTypeSetToString(GetEnabledAndUnblockedTypes()); |
541 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this)); | 546 std::unique_ptr<SyncCycle> cycle(SyncCycle::Build(cycle_context_, this)); |
542 bool success = | 547 bool success = |
543 syncer_->PollSyncShare(GetEnabledAndUnblockedTypes(), cycle.get()); | 548 syncer_->PollSyncShare(GetEnabledAndUnblockedTypes(), cycle.get()); |
544 | 549 |
545 // Only restart the timer if the poll succeeded. Otherwise rely on normal | 550 // Only restart the timer if the poll succeeded. Otherwise rely on normal |
546 // failure handling to retry with backoff. | 551 // failure handling to retry with backoff. |
547 if (success) { | 552 if (success) { |
548 AdjustPolling(FORCE_RESET); | 553 AdjustPolling(FORCE_RESET); |
549 HandleSuccess(); | 554 HandleSuccess(); |
550 } else { | 555 } else { |
551 HandleFailure(cycle->status_controller().model_neutral_state()); | 556 HandleFailure(cycle->status_controller().model_neutral_state()); |
552 } | 557 } |
553 } | 558 } |
554 | 559 |
555 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { | 560 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { |
556 DCHECK(CalledOnValidThread()); | 561 DCHECK(CalledOnValidThread()); |
562 | |
557 TimeTicks now = TimeTicks::Now(); | 563 TimeTicks now = TimeTicks::Now(); |
558 // Update timing information for how often datatypes are triggering nudges. | 564 // Update timing information for how often datatypes are triggering nudges. |
559 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { | 565 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { |
560 TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; | 566 TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; |
561 last_local_nudges_by_model_type_[iter.Get()] = now; | 567 last_local_nudges_by_model_type_[iter.Get()] = now; |
562 if (previous.is_null()) | 568 if (previous.is_null()) |
563 continue; | 569 continue; |
564 | 570 |
565 #define PER_DATA_TYPE_MACRO(type_str) \ | 571 #define PER_DATA_TYPE_MACRO(type_str) \ |
566 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 572 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
567 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); | 573 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); |
568 #undef PER_DATA_TYPE_MACRO | 574 #undef PER_DATA_TYPE_MACRO |
569 } | 575 } |
570 } | 576 } |
571 | 577 |
572 TimeDelta SyncSchedulerImpl::GetPollInterval() { | 578 TimeDelta SyncSchedulerImpl::GetPollInterval() { |
573 return (!cycle_context_->notifications_enabled() || | 579 return (!cycle_context_->notifications_enabled() || |
574 !cycle_context_->ShouldFetchUpdatesBeforeCommit()) | 580 !cycle_context_->ShouldFetchUpdatesBeforeCommit()) |
575 ? syncer_short_poll_interval_seconds_ | 581 ? syncer_short_poll_interval_seconds_ |
576 : syncer_long_poll_interval_seconds_; | 582 : syncer_long_poll_interval_seconds_; |
577 } | 583 } |
578 | 584 |
579 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { | 585 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { |
580 DCHECK(CalledOnValidThread()); | 586 DCHECK(CalledOnValidThread()); |
587 | |
581 if (!started_) | 588 if (!started_) |
582 return; | 589 return; |
583 | 590 |
584 TimeDelta poll_interval = GetPollInterval(); | 591 TimeDelta poll_interval = GetPollInterval(); |
585 TimeDelta poll_delay = poll_interval; | 592 TimeDelta poll_delay = poll_interval; |
586 const TimeTicks now = TimeTicks::Now(); | 593 const TimeTicks now = TimeTicks::Now(); |
587 | 594 |
588 if (type == UPDATE_INTERVAL) { | 595 if (type == UPDATE_INTERVAL) { |
589 if (!last_poll_reset_.is_null()) { | 596 if (!last_poll_reset_.is_null()) { |
590 // Override the delay based on the last successful poll time (if it was | 597 // Override the delay based on the last successful poll time (if it was |
(...skipping 23 matching lines...) Expand all Loading... | |
614 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes() | 621 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes() |
615 << " minutes."; | 622 << " minutes."; |
616 | 623 |
617 // Adjust poll rate. Start will reset the timer if it was already running. | 624 // Adjust poll rate. Start will reset the timer if it was already running. |
618 poll_timer_.Start(FROM_HERE, poll_delay, this, | 625 poll_timer_.Start(FROM_HERE, poll_delay, this, |
619 &SyncSchedulerImpl::PollTimerCallback); | 626 &SyncSchedulerImpl::PollTimerCallback); |
620 } | 627 } |
621 | 628 |
622 void SyncSchedulerImpl::RestartWaiting() { | 629 void SyncSchedulerImpl::RestartWaiting() { |
623 if (wait_interval_.get()) { | 630 if (wait_interval_.get()) { |
624 // Global throttling or backoff | 631 // Global throttling or backoff. |
632 if (!IsEarlierThanCurrentPendingJob(wait_interval_->length)) { | |
633 // Since RestartWaiting() is called in TrySyncCycleJobImpl(), we should | |
skym
2017/05/01 17:59:14
I don't understand what this comment is saying, or
Gang Wu
2017/05/01 23:01:41
Done.
| |
634 // not overwrite existing the scheduled job, otherwise the unblock jobs | |
635 // will be keep push away and never be run. | |
636 return; | |
637 } | |
625 NotifyRetryTime(base::Time::Now() + wait_interval_->length); | 638 NotifyRetryTime(base::Time::Now() + wait_interval_->length); |
626 SDVLOG(2) << "Starting WaitInterval timer of length " | 639 SDVLOG(2) << "Starting WaitInterval timer of length " |
627 << wait_interval_->length.InMilliseconds() << "ms."; | 640 << wait_interval_->length.InMilliseconds() << "ms."; |
628 if (wait_interval_->mode == WaitInterval::THROTTLED) { | 641 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
629 pending_wakeup_timer_.Start(FROM_HERE, wait_interval_->length, | 642 pending_wakeup_timer_.Start(FROM_HERE, wait_interval_->length, |
630 base::Bind(&SyncSchedulerImpl::Unthrottle, | 643 base::Bind(&SyncSchedulerImpl::Unthrottle, |
631 weak_ptr_factory_.GetWeakPtr())); | 644 weak_ptr_factory_.GetWeakPtr())); |
632 } else { | 645 } else { |
633 pending_wakeup_timer_.Start( | 646 pending_wakeup_timer_.Start( |
634 FROM_HERE, wait_interval_->length, | 647 FROM_HERE, wait_interval_->length, |
635 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry, | 648 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry, |
636 weak_ptr_factory_.GetWeakPtr())); | 649 weak_ptr_factory_.GetWeakPtr())); |
637 } | 650 } |
638 } else if (nudge_tracker_.IsAnyTypeBlocked()) { | 651 } else if (nudge_tracker_.IsAnyTypeBlocked()) { |
639 // Per-datatype throttled or backed off. | 652 // Per-datatype throttled or backed off. |
640 TimeDelta time_until_next_unblock = | 653 TimeDelta time_until_next_unblock = |
641 nudge_tracker_.GetTimeUntilNextUnblock(); | 654 nudge_tracker_.GetTimeUntilNextUnblock(); |
655 if (!IsEarlierThanCurrentPendingJob(time_until_next_unblock)) { | |
656 return; | |
657 } | |
642 pending_wakeup_timer_.Start(FROM_HERE, time_until_next_unblock, | 658 pending_wakeup_timer_.Start(FROM_HERE, time_until_next_unblock, |
643 base::Bind(&SyncSchedulerImpl::OnTypesUnblocked, | 659 base::Bind(&SyncSchedulerImpl::OnTypesUnblocked, |
644 weak_ptr_factory_.GetWeakPtr())); | 660 weak_ptr_factory_.GetWeakPtr())); |
645 } | 661 } |
646 } | 662 } |
647 | 663 |
648 void SyncSchedulerImpl::Stop() { | 664 void SyncSchedulerImpl::Stop() { |
649 DCHECK(CalledOnValidThread()); | 665 DCHECK(CalledOnValidThread()); |
650 SDVLOG(2) << "Stop called"; | 666 SDVLOG(2) << "Stop called"; |
651 | 667 |
(...skipping 19 matching lines...) Expand all Loading... | |
671 | 687 |
672 void SyncSchedulerImpl::TrySyncCycleJob() { | 688 void SyncSchedulerImpl::TrySyncCycleJob() { |
673 // Post call to TrySyncCycleJobImpl on current thread. Later request for | 689 // Post call to TrySyncCycleJobImpl on current thread. Later request for |
674 // access token will be here. | 690 // access token will be here. |
675 base::ThreadTaskRunnerHandle::Get()->PostTask( | 691 base::ThreadTaskRunnerHandle::Get()->PostTask( |
676 FROM_HERE, base::Bind(&SyncSchedulerImpl::TrySyncCycleJobImpl, | 692 FROM_HERE, base::Bind(&SyncSchedulerImpl::TrySyncCycleJobImpl, |
677 weak_ptr_factory_.GetWeakPtr())); | 693 weak_ptr_factory_.GetWeakPtr())); |
678 } | 694 } |
679 | 695 |
680 void SyncSchedulerImpl::TrySyncCycleJobImpl() { | 696 void SyncSchedulerImpl::TrySyncCycleJobImpl() { |
697 DCHECK(CalledOnValidThread()); | |
698 | |
681 JobPriority priority = next_sync_cycle_job_priority_; | 699 JobPriority priority = next_sync_cycle_job_priority_; |
682 next_sync_cycle_job_priority_ = NORMAL_PRIORITY; | 700 next_sync_cycle_job_priority_ = NORMAL_PRIORITY; |
683 | 701 |
684 nudge_tracker_.SetSyncCycleStartTime(TimeTicks::Now()); | 702 nudge_tracker_.SetSyncCycleStartTime(TimeTicks::Now()); |
685 | 703 |
686 DCHECK(CalledOnValidThread()); | |
687 if (mode_ == CONFIGURATION_MODE) { | 704 if (mode_ == CONFIGURATION_MODE) { |
688 if (pending_configure_params_) { | 705 if (pending_configure_params_) { |
689 SDVLOG(2) << "Found pending configure job"; | 706 SDVLOG(2) << "Found pending configure job"; |
690 DoConfigurationSyncCycleJob(priority); | 707 DoConfigurationSyncCycleJob(priority); |
691 } | 708 } |
692 } else if (mode_ == CLEAR_SERVER_DATA_MODE) { | 709 } else if (mode_ == CLEAR_SERVER_DATA_MODE) { |
693 if (pending_clear_params_) { | 710 if (pending_clear_params_) { |
694 DoClearServerDataSyncCycleJob(priority); | 711 DoClearServerDataSyncCycleJob(priority); |
695 } | 712 } |
696 } else if (CanRunNudgeJobNow(priority)) { | 713 } else if (CanRunNudgeJobNow(priority)) { |
697 if (nudge_tracker_.IsSyncRequired()) { | 714 if (nudge_tracker_.IsSyncRequired()) { |
698 SDVLOG(2) << "Found pending nudge job"; | 715 SDVLOG(2) << "Found pending nudge job"; |
699 DoNudgeSyncCycleJob(priority); | 716 DoNudgeSyncCycleJob(priority); |
700 } else if (((TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) { | 717 } else if (((TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) { |
701 SDVLOG(2) << "Found pending poll"; | 718 SDVLOG(2) << "Found pending poll"; |
702 DoPollSyncCycleJob(); | 719 DoPollSyncCycleJob(); |
703 } | 720 } |
704 } else { | 721 } else { |
705 // We must be in an error state. Transitioning out of each of these | 722 // We must be in an error state. Transitioning out of each of these |
706 // error states should trigger a canary job. | 723 // error states should trigger a canary job. |
707 DCHECK(IsCurrentlyThrottled() || IsBackingOff() || | 724 DCHECK(IsCurrentlyThrottled() || IsBackingOff() || |
708 cycle_context_->connection_manager()->HasInvalidAuthToken()); | 725 cycle_context_->connection_manager()->HasInvalidAuthToken()); |
709 } | 726 } |
710 | 727 |
711 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) { | 728 RestartWaiting(); |
skym
2017/05/01 17:59:14
We should explain somewhere what the overall strat
Gang Wu
2017/05/01 23:01:41
Done.
| |
712 // If we succeeded, our wait interval would have been cleared. If it hasn't | |
713 // been cleared, then we should increase our backoff interval and schedule | |
714 // another retry. | |
715 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); | |
716 wait_interval_ = base::MakeUnique<WaitInterval>( | |
717 WaitInterval::EXPONENTIAL_BACKOFF, length); | |
718 SDVLOG(2) << "Sync cycle failed. Will back off for " | |
719 << wait_interval_->length.InMilliseconds() << "ms."; | |
720 RestartWaiting(); | |
721 } | |
722 } | 729 } |
723 | 730 |
724 void SyncSchedulerImpl::PollTimerCallback() { | 731 void SyncSchedulerImpl::PollTimerCallback() { |
725 DCHECK(CalledOnValidThread()); | 732 DCHECK(CalledOnValidThread()); |
726 CHECK(!syncer_->IsSyncing()); | 733 CHECK(!syncer_->IsSyncing()); |
727 | 734 |
728 TrySyncCycleJob(); | 735 TrySyncCycleJob(); |
729 } | 736 } |
730 | 737 |
731 void SyncSchedulerImpl::RetryTimerCallback() { | 738 void SyncSchedulerImpl::RetryTimerCallback() { |
(...skipping 12 matching lines...) Expand all Loading... | |
744 // We treat this as a 'canary' in the sense that it was originally scheduled | 751 // We treat this as a 'canary' in the sense that it was originally scheduled |
745 // to run some time ago, failed, and we now want to retry, versus a job that | 752 // to run some time ago, failed, and we now want to retry, versus a job that |
746 // was just created (e.g via ScheduleNudgeImpl). The main implication is | 753 // was just created (e.g via ScheduleNudgeImpl). The main implication is |
747 // that we're careful to update routing info (etc) with such potentially | 754 // that we're careful to update routing info (etc) with such potentially |
748 // stale canary jobs. | 755 // stale canary jobs. |
749 TryCanaryJob(); | 756 TryCanaryJob(); |
750 } | 757 } |
751 | 758 |
752 void SyncSchedulerImpl::OnTypesUnblocked() { | 759 void SyncSchedulerImpl::OnTypesUnblocked() { |
753 DCHECK(CalledOnValidThread()); | 760 DCHECK(CalledOnValidThread()); |
761 | |
754 nudge_tracker_.UpdateTypeThrottlingAndBackoffState(); | 762 nudge_tracker_.UpdateTypeThrottlingAndBackoffState(); |
755 NotifyBlockedTypesChanged(nudge_tracker_.GetBlockedTypes()); | 763 NotifyBlockedTypesChanged(nudge_tracker_.GetBlockedTypes()); |
756 | 764 |
757 RestartWaiting(); | |
758 | |
759 // Maybe this is a good time to run a nudge job. Let's try it. | 765 // Maybe this is a good time to run a nudge job. Let's try it. |
766 // If not a good time, reschedule a new run. | |
760 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) | 767 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) |
761 TrySyncCycleJob(); | 768 TrySyncCycleJob(); |
769 else | |
770 RestartWaiting(); | |
762 } | 771 } |
763 | 772 |
764 void SyncSchedulerImpl::PerformDelayedNudge() { | 773 void SyncSchedulerImpl::PerformDelayedNudge() { |
765 // Circumstances may have changed since we scheduled this delayed nudge. | 774 // Circumstances may have changed since we scheduled this delayed nudge. |
766 // We must check to see if it's OK to run the job before we do so. | 775 // We must check to see if it's OK to run the job before we do so. |
767 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) | 776 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) |
768 TrySyncCycleJob(); | 777 TrySyncCycleJob(); |
769 | 778 |
770 // We're not responsible for setting up any retries here. The functions that | 779 // We're not responsible for setting up any retries here. The functions that |
771 // first put us into a state that prevents successful sync cycles (eg. global | 780 // first put us into a state that prevents successful sync cycles (eg. global |
(...skipping 26 matching lines...) Expand all Loading... | |
798 } | 807 } |
799 | 808 |
800 for (auto& observer : *cycle_context_->listeners()) { | 809 for (auto& observer : *cycle_context_->listeners()) { |
801 observer.OnThrottledTypesChanged(throttled_types); | 810 observer.OnThrottledTypesChanged(throttled_types); |
802 observer.OnBackedOffTypesChanged(backed_off_types); | 811 observer.OnBackedOffTypesChanged(backed_off_types); |
803 } | 812 } |
804 } | 813 } |
805 | 814 |
806 bool SyncSchedulerImpl::IsBackingOff() const { | 815 bool SyncSchedulerImpl::IsBackingOff() const { |
807 DCHECK(CalledOnValidThread()); | 816 DCHECK(CalledOnValidThread()); |
817 | |
808 return wait_interval_.get() && | 818 return wait_interval_.get() && |
809 wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF; | 819 wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF; |
810 } | 820 } |
811 | 821 |
812 void SyncSchedulerImpl::OnThrottled(const TimeDelta& throttle_duration) { | 822 void SyncSchedulerImpl::OnThrottled(const TimeDelta& throttle_duration) { |
813 DCHECK(CalledOnValidThread()); | 823 DCHECK(CalledOnValidThread()); |
824 | |
814 wait_interval_ = base::MakeUnique<WaitInterval>(WaitInterval::THROTTLED, | 825 wait_interval_ = base::MakeUnique<WaitInterval>(WaitInterval::THROTTLED, |
815 throttle_duration); | 826 throttle_duration); |
816 NotifyRetryTime(base::Time::Now() + wait_interval_->length); | 827 NotifyRetryTime(base::Time::Now() + wait_interval_->length); |
817 | 828 |
818 for (auto& observer : *cycle_context_->listeners()) { | 829 for (auto& observer : *cycle_context_->listeners()) { |
819 observer.OnThrottledTypesChanged(ModelTypeSet::All()); | 830 observer.OnThrottledTypesChanged(ModelTypeSet::All()); |
820 } | 831 } |
821 } | 832 } |
822 | 833 |
823 void SyncSchedulerImpl::OnTypesThrottled(ModelTypeSet types, | 834 void SyncSchedulerImpl::OnTypesThrottled(ModelTypeSet types, |
824 const TimeDelta& throttle_duration) { | 835 const TimeDelta& throttle_duration) { |
836 DCHECK(CalledOnValidThread()); | |
837 | |
825 TimeTicks now = TimeTicks::Now(); | 838 TimeTicks now = TimeTicks::Now(); |
826 | 839 |
827 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for " | 840 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for " |
828 << throttle_duration.InMinutes() << " minutes."; | 841 << throttle_duration.InMinutes() << " minutes."; |
829 | 842 |
830 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); | 843 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); |
831 RestartWaiting(); | |
832 NotifyBlockedTypesChanged(nudge_tracker_.GetBlockedTypes()); | 844 NotifyBlockedTypesChanged(nudge_tracker_.GetBlockedTypes()); |
833 } | 845 } |
834 | 846 |
835 void SyncSchedulerImpl::OnTypesBackedOff(ModelTypeSet types) { | 847 void SyncSchedulerImpl::OnTypesBackedOff(ModelTypeSet types) { |
848 DCHECK(CalledOnValidThread()); | |
849 | |
836 TimeTicks now = TimeTicks::Now(); | 850 TimeTicks now = TimeTicks::Now(); |
837 | 851 |
838 for (ModelTypeSet::Iterator type = types.First(); type.Good(); type.Inc()) { | 852 for (ModelTypeSet::Iterator type = types.First(); type.Good(); type.Inc()) { |
839 TimeDelta last_backoff_time = | 853 TimeDelta last_backoff_time = |
840 TimeDelta::FromSeconds(kInitialBackoffRetrySeconds); | 854 TimeDelta::FromSeconds(kInitialBackoffRetrySeconds); |
841 if (nudge_tracker_.GetTypeBlockingMode(type.Get()) == | 855 if (nudge_tracker_.GetTypeBlockingMode(type.Get()) == |
842 WaitInterval::EXPONENTIAL_BACKOFF_RETRYING) { | 856 WaitInterval::EXPONENTIAL_BACKOFF_RETRYING) { |
843 last_backoff_time = nudge_tracker_.GetTypeLastBackoffInterval(type.Get()); | 857 last_backoff_time = nudge_tracker_.GetTypeLastBackoffInterval(type.Get()); |
844 } | 858 } |
845 | 859 |
846 TimeDelta length = delay_provider_->GetDelay(last_backoff_time); | 860 TimeDelta length = delay_provider_->GetDelay(last_backoff_time); |
847 nudge_tracker_.SetTypeBackedOff(type.Get(), length, now); | 861 nudge_tracker_.SetTypeBackedOff(type.Get(), length, now); |
848 SDVLOG(1) << "Backing off " << ModelTypeToString(type.Get()) << " for " | 862 SDVLOG(1) << "Backing off " << ModelTypeToString(type.Get()) << " for " |
849 << length.InSeconds() << " second."; | 863 << length.InSeconds() << " second."; |
850 } | 864 } |
851 RestartWaiting(); | |
852 NotifyBlockedTypesChanged(nudge_tracker_.GetBlockedTypes()); | 865 NotifyBlockedTypesChanged(nudge_tracker_.GetBlockedTypes()); |
853 } | 866 } |
854 | 867 |
855 bool SyncSchedulerImpl::IsCurrentlyThrottled() { | 868 bool SyncSchedulerImpl::IsCurrentlyThrottled() { |
856 DCHECK(CalledOnValidThread()); | 869 DCHECK(CalledOnValidThread()); |
870 | |
857 return wait_interval_.get() && | 871 return wait_interval_.get() && |
858 wait_interval_->mode == WaitInterval::THROTTLED; | 872 wait_interval_->mode == WaitInterval::THROTTLED; |
859 } | 873 } |
860 | 874 |
861 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 875 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
862 const TimeDelta& new_interval) { | 876 const TimeDelta& new_interval) { |
863 DCHECK(CalledOnValidThread()); | 877 DCHECK(CalledOnValidThread()); |
878 | |
864 if (new_interval == syncer_short_poll_interval_seconds_) | 879 if (new_interval == syncer_short_poll_interval_seconds_) |
865 return; | 880 return; |
866 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes() | 881 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes() |
867 << " minutes."; | 882 << " minutes."; |
868 syncer_short_poll_interval_seconds_ = new_interval; | 883 syncer_short_poll_interval_seconds_ = new_interval; |
869 AdjustPolling(UPDATE_INTERVAL); | 884 AdjustPolling(UPDATE_INTERVAL); |
870 } | 885 } |
871 | 886 |
872 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( | 887 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( |
873 const TimeDelta& new_interval) { | 888 const TimeDelta& new_interval) { |
874 DCHECK(CalledOnValidThread()); | 889 DCHECK(CalledOnValidThread()); |
890 | |
875 if (new_interval == syncer_long_poll_interval_seconds_) | 891 if (new_interval == syncer_long_poll_interval_seconds_) |
876 return; | 892 return; |
877 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes() | 893 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes() |
878 << " minutes."; | 894 << " minutes."; |
879 syncer_long_poll_interval_seconds_ = new_interval; | 895 syncer_long_poll_interval_seconds_ = new_interval; |
880 AdjustPolling(UPDATE_INTERVAL); | 896 AdjustPolling(UPDATE_INTERVAL); |
881 } | 897 } |
882 | 898 |
883 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays( | 899 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays( |
884 const std::map<ModelType, TimeDelta>& nudge_delays) { | 900 const std::map<ModelType, TimeDelta>& nudge_delays) { |
885 DCHECK(CalledOnValidThread()); | 901 DCHECK(CalledOnValidThread()); |
902 | |
886 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays); | 903 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays); |
887 } | 904 } |
888 | 905 |
889 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { | 906 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { |
907 DCHECK(CalledOnValidThread()); | |
908 | |
890 if (size > 0) | 909 if (size > 0) |
891 nudge_tracker_.SetHintBufferSize(size); | 910 nudge_tracker_.SetHintBufferSize(size); |
892 else | 911 else |
893 NOTREACHED() << "Hint buffer size should be > 0."; | 912 NOTREACHED() << "Hint buffer size should be > 0."; |
894 } | 913 } |
895 | 914 |
896 void SyncSchedulerImpl::OnSyncProtocolError( | 915 void SyncSchedulerImpl::OnSyncProtocolError( |
897 const SyncProtocolError& sync_protocol_error) { | 916 const SyncProtocolError& sync_protocol_error) { |
898 DCHECK(CalledOnValidThread()); | 917 DCHECK(CalledOnValidThread()); |
918 | |
899 if (ShouldRequestEarlyExit(sync_protocol_error)) { | 919 if (ShouldRequestEarlyExit(sync_protocol_error)) { |
900 SDVLOG(2) << "Sync Scheduler requesting early exit."; | 920 SDVLOG(2) << "Sync Scheduler requesting early exit."; |
901 Stop(); | 921 Stop(); |
902 } | 922 } |
903 if (IsActionableError(sync_protocol_error)) { | 923 if (IsActionableError(sync_protocol_error)) { |
904 SDVLOG(2) << "OnActionableError"; | 924 SDVLOG(2) << "OnActionableError"; |
905 for (auto& observer : *cycle_context_->listeners()) | 925 for (auto& observer : *cycle_context_->listeners()) |
906 observer.OnActionableError(sync_protocol_error); | 926 observer.OnActionableError(sync_protocol_error); |
907 } | 927 } |
908 } | 928 } |
909 | 929 |
910 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const TimeDelta& delay) { | 930 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const TimeDelta& delay) { |
931 DCHECK(CalledOnValidThread()); | |
932 | |
911 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay); | 933 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay); |
912 retry_timer_.Start(FROM_HERE, delay, this, | 934 retry_timer_.Start(FROM_HERE, delay, this, |
913 &SyncSchedulerImpl::RetryTimerCallback); | 935 &SyncSchedulerImpl::RetryTimerCallback); |
914 } | 936 } |
915 | 937 |
916 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) { | 938 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) { |
939 DCHECK(CalledOnValidThread()); | |
940 | |
917 for (auto& observer : *cycle_context_->listeners()) | 941 for (auto& observer : *cycle_context_->listeners()) |
918 observer.OnMigrationRequested(types); | 942 observer.OnMigrationRequested(types); |
919 } | 943 } |
920 | 944 |
921 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { | 945 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { |
922 DCHECK(CalledOnValidThread()); | 946 DCHECK(CalledOnValidThread()); |
947 | |
923 cycle_context_->set_notifications_enabled(notifications_enabled); | 948 cycle_context_->set_notifications_enabled(notifications_enabled); |
924 if (notifications_enabled) | 949 if (notifications_enabled) |
925 nudge_tracker_.OnInvalidationsEnabled(); | 950 nudge_tracker_.OnInvalidationsEnabled(); |
926 else | 951 else |
927 nudge_tracker_.OnInvalidationsDisabled(); | 952 nudge_tracker_.OnInvalidationsDisabled(); |
928 } | 953 } |
929 | 954 |
955 bool SyncSchedulerImpl::IsEarlierThanCurrentPendingJob(const TimeDelta& delay) { | |
956 TimeTicks incoming_run_time = TimeTicks::Now() + delay; | |
957 if (pending_wakeup_timer_.IsRunning() && | |
958 (pending_wakeup_timer_.desired_run_time() < incoming_run_time)) { | |
959 // Old job arrives sooner than this one. | |
960 return false; | |
961 } | |
962 return true; | |
963 } | |
964 | |
930 #undef SDVLOG_LOC | 965 #undef SDVLOG_LOC |
931 | 966 |
932 #undef SDVLOG | 967 #undef SDVLOG |
933 | 968 |
934 #undef SLOG | 969 #undef SLOG |
935 | 970 |
936 #undef ENUM_CASE | 971 #undef ENUM_CASE |
937 | 972 |
938 } // namespace syncer | 973 } // namespace syncer |
OLD | NEW |