Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(339)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 1132013004: [Sync] Refactoring polling to be reliable. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Full patch Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h"
11 #include "base/bind.h" 10 #include "base/bind.h"
12 #include "base/bind_helpers.h" 11 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h" 12 #include "base/compiler_specific.h"
14 #include "base/location.h" 13 #include "base/location.h"
15 #include "base/logging.h" 14 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h" 16 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h" 17 #include "sync/engine/syncer.h"
19 #include "sync/protocol/proto_enum_conversions.h" 18 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h" 19 #include "sync/protocol/sync.pb.h"
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, 148 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
150 BackoffDelayProvider* delay_provider, 149 BackoffDelayProvider* delay_provider,
151 sessions::SyncSessionContext* context, 150 sessions::SyncSessionContext* context,
152 Syncer* syncer) 151 Syncer* syncer)
153 : name_(name), 152 : name_(name),
154 started_(false), 153 started_(false),
155 syncer_short_poll_interval_seconds_( 154 syncer_short_poll_interval_seconds_(
156 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 155 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
157 syncer_long_poll_interval_seconds_( 156 syncer_long_poll_interval_seconds_(
158 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 157 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
159 mode_(NORMAL_MODE), 158 mode_(CONFIGURATION_MODE),
160 delay_provider_(delay_provider), 159 delay_provider_(delay_provider),
161 syncer_(syncer), 160 syncer_(syncer),
162 session_context_(context), 161 session_context_(context),
163 no_scheduling_allowed_(false),
164 do_poll_after_credentials_updated_(false),
165 next_sync_session_job_priority_(NORMAL_PRIORITY), 162 next_sync_session_job_priority_(NORMAL_PRIORITY),
166 weak_ptr_factory_(this), 163 weak_ptr_factory_(this),
167 weak_ptr_factory_for_weak_handle_(this) { 164 weak_ptr_factory_for_weak_handle_(this) {
168 weak_handle_this_ = MakeWeakHandle( 165 weak_handle_this_ = MakeWeakHandle(
169 weak_ptr_factory_for_weak_handle_.GetWeakPtr()); 166 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
170 } 167 }
171 168
172 SyncSchedulerImpl::~SyncSchedulerImpl() { 169 SyncSchedulerImpl::~SyncSchedulerImpl() {
173 DCHECK(CalledOnValidThread()); 170 DCHECK(CalledOnValidThread());
174 Stop(); 171 Stop();
(...skipping 25 matching lines...) Expand all
200 // 3. A nudge was saved previously due to not having a valid auth token. 197 // 3. A nudge was saved previously due to not having a valid auth token.
201 // 4. A nudge was scheduled + saved while in configuration mode. 198 // 4. A nudge was scheduled + saved while in configuration mode.
202 // 199 //
203 // In all cases except (2), we want to retry contacting the server. We 200 // In all cases except (2), we want to retry contacting the server. We
204 // call TryCanaryJob to achieve this, and note that nothing -- not even a 201 // call TryCanaryJob to achieve this, and note that nothing -- not even a
205 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that 202 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
206 // has the authority to do that is the Unthrottle timer. 203 // has the authority to do that is the Unthrottle timer.
207 TryCanaryJob(); 204 TryCanaryJob();
208 } 205 }
209 206
210 void SyncSchedulerImpl::Start(Mode mode) { 207 void SyncSchedulerImpl::Start(Mode mode, base::Time last_poll_time) {
211 DCHECK(CalledOnValidThread()); 208 DCHECK(CalledOnValidThread());
212 std::string thread_name = base::MessageLoop::current()->thread_name(); 209 std::string thread_name = base::MessageLoop::current()->thread_name();
213 if (thread_name.empty()) 210 if (thread_name.empty())
214 thread_name = "<Main thread>"; 211 thread_name = "<Main thread>";
215 SDVLOG(2) << "Start called from thread " 212 SDVLOG(2) << "Start called from thread "
216 << thread_name << " with mode " << GetModeString(mode); 213 << thread_name << " with mode " << GetModeString(mode);
217 if (!started_) { 214 if (!started_) {
218 started_ = true; 215 started_ = true;
219 SendInitialSnapshot(); 216 SendInitialSnapshot();
220 } 217 }
221 218
222 DCHECK(!session_context_->account_name().empty()); 219 DCHECK(!session_context_->account_name().empty());
223 DCHECK(syncer_.get()); 220 DCHECK(syncer_.get());
224 Mode old_mode = mode_; 221 Mode old_mode = mode_;
225 mode_ = mode; 222 mode_ = mode;
226 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. 223 // Only adjust the poll reset time if it was valid and in the past.
224 if (!last_poll_time.is_null() && last_poll_time < base::Time::Now()) {
225 // Convert from base::Time to base::TimeTicks. The reason we use Time
226 // for persisting is that TimeTicks can stop making forward progress when
227 // the machine is suspended. This implies that on resume the client might
228 // actually have miss the real poll, unless the client is restarted. Fixing
229 // that would require using an AlarmTimer though, which is only supported
230 // on certain platforms.
231 last_poll_reset_ =
232 base::TimeTicks::Now() - (base::Time::Now() - last_poll_time);
233 }
227 234
228 if (old_mode != mode_ && mode_ == NORMAL_MODE) { 235 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
229 // We just got back to normal mode. Let's try to run the work that was 236 // We just got back to normal mode. Let's try to run the work that was
230 // queued up while we were configuring. 237 // queued up while we were configuring.
231 238
239 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
240
232 // Update our current time before checking IsRetryRequired(). 241 // Update our current time before checking IsRetryRequired().
233 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 242 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
234 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) { 243 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
235 TrySyncSessionJob(); 244 TrySyncSessionJob();
236 } 245 }
237 } 246 }
238 } 247 }
239 248
240 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() { 249 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
241 ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); 250 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
299 pending_configure_params_.reset(new ConfigurationParams(params)); 308 pending_configure_params_.reset(new ConfigurationParams(params));
300 TrySyncSessionJob(); 309 TrySyncSessionJob();
301 } else { 310 } else {
302 SDVLOG(2) << "No change in routing info, calling ready task directly."; 311 SDVLOG(2) << "No change in routing info, calling ready task directly.";
303 params.ready_task.Run(); 312 params.ready_task.Run();
304 } 313 }
305 } 314 }
306 315
307 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { 316 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
308 DCHECK(CalledOnValidThread()); 317 DCHECK(CalledOnValidThread());
309 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) { 318 if (IsCurrentlyThrottled()) {
310 SDVLOG(1) << "Unable to run a job because we're throttled."; 319 SDVLOG(1) << "Unable to run a job because we're throttled.";
311 return false; 320 return false;
312 } 321 }
313 322
314 if (wait_interval_ 323 if (IsBackingOff() && priority != CANARY_PRIORITY) {
315 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
316 && priority != CANARY_PRIORITY) {
317 SDVLOG(1) << "Unable to run a job because we're backing off."; 324 SDVLOG(1) << "Unable to run a job because we're backing off.";
318 return false; 325 return false;
319 } 326 }
320 327
321 if (session_context_->connection_manager()->HasInvalidAuthToken()) { 328 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
322 SDVLOG(1) << "Unable to run a job because we have no valid auth token."; 329 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
323 return false; 330 return false;
324 } 331 }
325 332
326 return true; 333 return true;
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
397 nudge_tracker_.RecordInitialSyncRequired(model_type); 404 nudge_tracker_.RecordInitialSyncRequired(model_type);
398 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE); 405 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
399 } 406 }
400 407
401 // TODO(zea): Consider adding separate throttling/backoff for datatype 408 // TODO(zea): Consider adding separate throttling/backoff for datatype
402 // refresh requests. 409 // refresh requests.
403 void SyncSchedulerImpl::ScheduleNudgeImpl( 410 void SyncSchedulerImpl::ScheduleNudgeImpl(
404 const TimeDelta& delay, 411 const TimeDelta& delay,
405 const tracked_objects::Location& nudge_location) { 412 const tracked_objects::Location& nudge_location) {
406 DCHECK(CalledOnValidThread()); 413 DCHECK(CalledOnValidThread());
407 414 CHECK(!syncer_->IsSyncing());
408 if (no_scheduling_allowed_) {
409 NOTREACHED() << "Illegal to schedule job while session in progress.";
410 return;
411 }
412 415
413 if (!started_) { 416 if (!started_) {
414 SDVLOG_LOC(nudge_location, 2) 417 SDVLOG_LOC(nudge_location, 2)
415 << "Dropping nudge, scheduler is not running."; 418 << "Dropping nudge, scheduler is not running.";
416 return; 419 return;
417 } 420 }
418 421
419 SDVLOG_LOC(nudge_location, 2) 422 SDVLOG_LOC(nudge_location, 2)
420 << "In ScheduleNudgeImpl with delay " 423 << "In ScheduleNudgeImpl with delay "
421 << delay.InMilliseconds() << " ms"; 424 << delay.InMilliseconds() << " ms";
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
457 nudge_tracker_.SetDefaultNudgeDelay(delay_ms); 460 nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
458 } 461 }
459 462
460 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { 463 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
461 DCHECK(CalledOnValidThread()); 464 DCHECK(CalledOnValidThread());
462 DCHECK(CanRunNudgeJobNow(priority)); 465 DCHECK(CanRunNudgeJobNow(priority));
463 466
464 DVLOG(2) << "Will run normal mode sync cycle with types " 467 DVLOG(2) << "Will run normal mode sync cycle with types "
465 << ModelTypeSetToString(session_context_->GetEnabledTypes()); 468 << ModelTypeSetToString(session_context_->GetEnabledTypes());
466 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 469 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
467 bool premature_exit = !syncer_->NormalSyncShare( 470 bool success = syncer_->NormalSyncShare(
468 GetEnabledAndUnthrottledTypes(), &nudge_tracker_, session.get()); 471 GetEnabledAndUnthrottledTypes(), &nudge_tracker_, session.get());
469 AdjustPolling(FORCE_RESET);
470 // Don't run poll job till the next time poll timer fires.
471 do_poll_after_credentials_updated_ = false;
472
473 bool success = !premature_exit
474 && !sessions::HasSyncerError(
475 session->status_controller().model_neutral_state());
476 472
477 if (success) { 473 if (success) {
478 // That cycle took care of any outstanding work we had. 474 // That cycle took care of any outstanding work we had.
479 SDVLOG(2) << "Nudge succeeded."; 475 SDVLOG(2) << "Nudge succeeded.";
480 nudge_tracker_.RecordSuccessfulSyncCycle(); 476 nudge_tracker_.RecordSuccessfulSyncCycle();
481 scheduled_nudge_time_ = base::TimeTicks(); 477 scheduled_nudge_time_ = base::TimeTicks();
478 HandleSuccess();
482 479
483 // If we're here, then we successfully reached the server. End all backoff. 480 // If this was a canary, we may need to restart the poll timer (the poll
484 wait_interval_.reset(); 481 // timer may have fired while the scheduler was in an error state, ignoring
485 NotifyRetryTime(base::Time()); 482 // the poll).
483 if (!poll_timer_.IsRunning()) {
484 SDVLOG(1) << "Canary succeeded, restarting polling.";
485 AdjustPolling(UPDATE_INTERVAL);
486 }
486 } else { 487 } else {
487 HandleFailure(session->status_controller().model_neutral_state()); 488 HandleFailure(session->status_controller().model_neutral_state());
488 } 489 }
489 } 490 }
490 491
491 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { 492 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
492 DCHECK(CalledOnValidThread()); 493 DCHECK(CalledOnValidThread());
493 DCHECK_EQ(mode_, CONFIGURATION_MODE); 494 DCHECK_EQ(mode_, CONFIGURATION_MODE);
494 DCHECK(pending_configure_params_ != NULL); 495 DCHECK(pending_configure_params_ != NULL);
495 496
496 if (!CanRunJobNow(priority)) { 497 if (!CanRunJobNow(priority)) {
497 SDVLOG(2) << "Unable to run configure job right now."; 498 SDVLOG(2) << "Unable to run configure job right now.";
498 if (!pending_configure_params_->retry_task.is_null()) { 499 if (!pending_configure_params_->retry_task.is_null()) {
499 pending_configure_params_->retry_task.Run(); 500 pending_configure_params_->retry_task.Run();
500 pending_configure_params_->retry_task.Reset(); 501 pending_configure_params_->retry_task.Reset();
501 } 502 }
502 return; 503 return;
503 } 504 }
504 505
505 SDVLOG(2) << "Will run configure SyncShare with types " 506 SDVLOG(2) << "Will run configure SyncShare with types "
506 << ModelTypeSetToString(session_context_->GetEnabledTypes()); 507 << ModelTypeSetToString(session_context_->GetEnabledTypes());
507 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 508 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
508 bool premature_exit = !syncer_->ConfigureSyncShare( 509 bool success = syncer_->ConfigureSyncShare(
509 pending_configure_params_->types_to_download, 510 pending_configure_params_->types_to_download,
510 pending_configure_params_->source, 511 pending_configure_params_->source,
511 session.get()); 512 session.get());
512 AdjustPolling(FORCE_RESET);
513 // Don't run poll job till the next time poll timer fires.
514 do_poll_after_credentials_updated_ = false;
515
516 bool success = !premature_exit
517 && !sessions::HasSyncerError(
518 session->status_controller().model_neutral_state());
519 513
520 if (success) { 514 if (success) {
521 SDVLOG(2) << "Configure succeeded."; 515 SDVLOG(2) << "Configure succeeded.";
522 pending_configure_params_->ready_task.Run(); 516 pending_configure_params_->ready_task.Run();
523 pending_configure_params_.reset(); 517 pending_configure_params_.reset();
524 518 HandleSuccess();
525 // If we're here, then we successfully reached the server. End all backoff.
526 wait_interval_.reset();
527 NotifyRetryTime(base::Time());
528 } else { 519 } else {
529 HandleFailure(session->status_controller().model_neutral_state()); 520 HandleFailure(session->status_controller().model_neutral_state());
530 // Sync cycle might receive response from server that causes scheduler to 521 // Sync cycle might receive response from server that causes scheduler to
531 // stop and draws pending_configure_params_ invalid. 522 // stop and draws pending_configure_params_ invalid.
532 if (started_ && !pending_configure_params_->retry_task.is_null()) { 523 if (started_ && !pending_configure_params_->retry_task.is_null()) {
533 pending_configure_params_->retry_task.Run(); 524 pending_configure_params_->retry_task.Run();
534 pending_configure_params_->retry_task.Reset(); 525 pending_configure_params_->retry_task.Reset();
535 } 526 }
536 } 527 }
537 } 528 }
538 529
530 void SyncSchedulerImpl::HandleSuccess() {
531 // If we're here, then we successfully reached the server. End all backoff.
532 wait_interval_.reset();
533 NotifyRetryTime(base::Time());
534 }
535
539 void SyncSchedulerImpl::HandleFailure( 536 void SyncSchedulerImpl::HandleFailure(
540 const sessions::ModelNeutralState& model_neutral_state) { 537 const sessions::ModelNeutralState& model_neutral_state) {
541 if (IsCurrentlyThrottled()) { 538 if (IsCurrentlyThrottled()) {
542 SDVLOG(2) << "Was throttled during previous sync cycle."; 539 SDVLOG(2) << "Was throttled during previous sync cycle.";
543 RestartWaiting();
544 } else if (!IsBackingOff()) { 540 } else if (!IsBackingOff()) {
545 // Setup our backoff if this is our first such failure. 541 // Setup our backoff if this is our first such failure.
546 TimeDelta length = delay_provider_->GetDelay( 542 TimeDelta length = delay_provider_->GetDelay(
547 delay_provider_->GetInitialDelay(model_neutral_state)); 543 delay_provider_->GetInitialDelay(model_neutral_state));
548 wait_interval_.reset( 544 wait_interval_.reset(
549 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 545 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
550 SDVLOG(2) << "Sync cycle failed. Will back off for " 546 SDVLOG(2) << "Sync cycle failed. Will back off for "
551 << wait_interval_->length.InMilliseconds() << "ms."; 547 << wait_interval_->length.InMilliseconds() << "ms.";
552 RestartWaiting(); 548 } else {
549 // Increase our backoff interval and schedule another retry.
550 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
551 wait_interval_.reset(
552 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
553 SDVLOG(2) << "Sync cycle failed. Will back off for "
554 << wait_interval_->length.InMilliseconds() << "ms.";
555 }
556 RestartWaiting();
557 }
558
559 void SyncSchedulerImpl::DoPollSyncSessionJob() {
560 SDVLOG(2) << "Polling with types "
561 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
562 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
563 bool success = syncer_->PollSyncShare(
564 GetEnabledAndUnthrottledTypes(),
565 session.get());
566
567 // Only restart the timer if the poll succeeded. Otherwise rely on normal
568 // failure handling to retry with backoff.
569 if (success) {
570 AdjustPolling(FORCE_RESET);
571 HandleSuccess();
572 } else {
573 HandleFailure(session->status_controller().model_neutral_state());
553 } 574 }
554 } 575 }
555 576
556 void SyncSchedulerImpl::DoPollSyncSessionJob() {
557 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
558
559 SDVLOG(2) << "Polling with types "
560 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
561 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
562 syncer_->PollSyncShare(
563 GetEnabledAndUnthrottledTypes(),
564 session.get());
565
566 AdjustPolling(FORCE_RESET);
567
568 if (IsCurrentlyThrottled()) {
569 SDVLOG(2) << "Poll request got us throttled.";
570 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
571 // to do is start the timer.
572 RestartWaiting();
573 }
574 }
575
576 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { 577 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
577 DCHECK(CalledOnValidThread()); 578 DCHECK(CalledOnValidThread());
578 base::TimeTicks now = TimeTicks::Now(); 579 base::TimeTicks now = TimeTicks::Now();
579 // Update timing information for how often datatypes are triggering nudges. 580 // Update timing information for how often datatypes are triggering nudges.
580 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { 581 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
581 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; 582 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
582 last_local_nudges_by_model_type_[iter.Get()] = now; 583 last_local_nudges_by_model_type_[iter.Get()] = now;
583 if (previous.is_null()) 584 if (previous.is_null())
584 continue; 585 continue;
585 586
586 #define PER_DATA_TYPE_MACRO(type_str) \ 587 #define PER_DATA_TYPE_MACRO(type_str) \
587 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 588 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
588 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); 589 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
589 #undef PER_DATA_TYPE_MACRO 590 #undef PER_DATA_TYPE_MACRO
590 } 591 }
591 } 592 }
592 593
593 TimeDelta SyncSchedulerImpl::GetPollInterval() { 594 TimeDelta SyncSchedulerImpl::GetPollInterval() {
594 return (!session_context_->notifications_enabled() || 595 return (!session_context_->notifications_enabled() ||
595 !session_context_->ShouldFetchUpdatesBeforeCommit()) ? 596 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
596 syncer_short_poll_interval_seconds_ : 597 syncer_short_poll_interval_seconds_ :
597 syncer_long_poll_interval_seconds_; 598 syncer_long_poll_interval_seconds_;
598 } 599 }
599 600
600 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { 601 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
601 DCHECK(CalledOnValidThread()); 602 DCHECK(CalledOnValidThread());
603 if (!started_)
604 return;
602 605
603 TimeDelta poll = GetPollInterval(); 606 TimeDelta poll_interval = GetPollInterval();
604 bool rate_changed = !poll_timer_.IsRunning() || 607 TimeDelta poll_delay = poll_interval;
605 poll != poll_timer_.GetCurrentDelay(); 608 const TimeTicks now = TimeTicks::Now();
606 609
607 if (type == FORCE_RESET) { 610 if (type == UPDATE_INTERVAL) {
608 last_poll_reset_ = base::TimeTicks::Now(); 611 if (!last_poll_reset_.is_null()) {
609 if (!rate_changed) 612 // Override the delay based on the last successful poll time (if it was
610 poll_timer_.Reset(); 613 // set).
614 TimeTicks new_poll_time = poll_interval + last_poll_reset_;
615 poll_delay = new_poll_time - TimeTicks::Now();
616
617 if (poll_delay < TimeDelta()) {
618 // The desired poll time was in the past, so trigger a poll now (the
619 // timer will post the task asynchronously, so re-entrancy isn't an
620 // issue).
621 poll_delay = TimeDelta();
622 }
623 } else {
624 // There was no previous poll. Keep the delay set to the normal interval,
625 // as if we had just completed a poll.
626 DCHECK_EQ(GetPollInterval(), poll_delay);
627 last_poll_reset_ = now;
628 }
629 } else {
630 // Otherwise just restart the timer.
631 DCHECK_EQ(FORCE_RESET, type);
632 DCHECK_EQ(GetPollInterval(), poll_delay);
633 last_poll_reset_ = now;
611 } 634 }
612 635
613 if (!rate_changed) 636 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes()
614 return; 637 << " minutes.";
615 638
616 // Adjust poll rate. 639 // Adjust poll rate. Start will reset the timer if it was already running.
617 poll_timer_.Stop(); 640 poll_timer_.Start(FROM_HERE, poll_delay, this,
618 poll_timer_.Start(FROM_HERE, poll, this,
619 &SyncSchedulerImpl::PollTimerCallback); 641 &SyncSchedulerImpl::PollTimerCallback);
620 } 642 }
621 643
622 void SyncSchedulerImpl::RestartWaiting() { 644 void SyncSchedulerImpl::RestartWaiting() {
623 CHECK(wait_interval_.get()); 645 CHECK(wait_interval_.get());
624 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 646 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
625 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 647 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
626 SDVLOG(2) << "Starting WaitInterval timer of length " 648 SDVLOG(2) << "Starting WaitInterval timer of length "
627 << wait_interval_->length.InMilliseconds() << "ms."; 649 << wait_interval_->length.InMilliseconds() << "ms.";
628 if (wait_interval_->mode == WaitInterval::THROTTLED) { 650 if (wait_interval_->mode == WaitInterval::THROTTLED) {
(...skipping 23 matching lines...) Expand all
652 pending_wakeup_timer_.Stop(); 674 pending_wakeup_timer_.Stop();
653 pending_configure_params_.reset(); 675 pending_configure_params_.reset();
654 if (started_) 676 if (started_)
655 started_ = false; 677 started_ = false;
656 } 678 }
657 679
658 // This is the only place where we invoke DoSyncSessionJob with canary 680 // This is the only place where we invoke DoSyncSessionJob with canary
659 // privileges. Everyone else should use NORMAL_PRIORITY. 681 // privileges. Everyone else should use NORMAL_PRIORITY.
660 void SyncSchedulerImpl::TryCanaryJob() { 682 void SyncSchedulerImpl::TryCanaryJob() {
661 next_sync_session_job_priority_ = CANARY_PRIORITY; 683 next_sync_session_job_priority_ = CANARY_PRIORITY;
684 SDVLOG(2) << "Attempting canary job";
662 TrySyncSessionJob(); 685 TrySyncSessionJob();
663 } 686 }
664 687
665 void SyncSchedulerImpl::TrySyncSessionJob() { 688 void SyncSchedulerImpl::TrySyncSessionJob() {
666 // Post call to TrySyncSessionJobImpl on current thread. Later request for 689 // Post call to TrySyncSessionJobImpl on current thread. Later request for
667 // access token will be here. 690 // access token will be here.
668 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind( 691 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
669 &SyncSchedulerImpl::TrySyncSessionJobImpl, 692 &SyncSchedulerImpl::TrySyncSessionJobImpl,
670 weak_ptr_factory_.GetWeakPtr())); 693 weak_ptr_factory_.GetWeakPtr()));
671 } 694 }
672 695
673 void SyncSchedulerImpl::TrySyncSessionJobImpl() { 696 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
674 JobPriority priority = next_sync_session_job_priority_; 697 JobPriority priority = next_sync_session_job_priority_;
675 next_sync_session_job_priority_ = NORMAL_PRIORITY; 698 next_sync_session_job_priority_ = NORMAL_PRIORITY;
676 699
677 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 700 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
678 701
679 DCHECK(CalledOnValidThread()); 702 DCHECK(CalledOnValidThread());
680 if (mode_ == CONFIGURATION_MODE) { 703 if (mode_ == CONFIGURATION_MODE) {
681 if (pending_configure_params_) { 704 if (pending_configure_params_) {
682 SDVLOG(2) << "Found pending configure job"; 705 SDVLOG(2) << "Found pending configure job";
683 DoConfigurationSyncSessionJob(priority); 706 DoConfigurationSyncSessionJob(priority);
684 } 707 }
685 } else if (CanRunNudgeJobNow(priority)) { 708 } else if (CanRunNudgeJobNow(priority)) {
686 if (nudge_tracker_.IsSyncRequired()) { 709 if (nudge_tracker_.IsSyncRequired()) {
687 SDVLOG(2) << "Found pending nudge job"; 710 SDVLOG(2) << "Found pending nudge job";
688 DoNudgeSyncSessionJob(priority); 711 DoNudgeSyncSessionJob(priority);
689 } else if (do_poll_after_credentials_updated_ || 712 } else if (((base::TimeTicks::Now() - last_poll_reset_) >=
690 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) { 713 GetPollInterval())) {
714 SDVLOG(2) << "Found pending poll";
691 DoPollSyncSessionJob(); 715 DoPollSyncSessionJob();
692 // Poll timer fires infrequently. Usually by this time access token is
693 // already expired and poll job will fail with auth error. Set flag to
694 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
695 // will be called after access token is retrieved.
696 if (HttpResponse::SYNC_AUTH_ERROR ==
697 session_context_->connection_manager()->server_status()) {
698 do_poll_after_credentials_updated_ = true;
699 }
700 } 716 }
701 } 717 } else {
702 718 // We must be in an error state. Transitioning out of each of these
703 if (priority == CANARY_PRIORITY) { 719 // error states should trigger a canary job.
704 // If this is canary job then whatever result was don't run poll job till 720 DCHECK(IsCurrentlyThrottled() || IsBackingOff() ||
705 // the next time poll timer fires. 721 session_context_->connection_manager()->HasInvalidAuthToken());
706 do_poll_after_credentials_updated_ = false;
707 } 722 }
708 723
709 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) { 724 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
710 // If we succeeded, our wait interval would have been cleared. If it hasn't 725 // If we succeeded, our wait interval would have been cleared. If it hasn't
711 // been cleared, then we should increase our backoff interval and schedule 726 // been cleared, then we should increase our backoff interval and schedule
712 // another retry. 727 // another retry.
713 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); 728 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
714 wait_interval_.reset( 729 wait_interval_.reset(
715 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 730 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
716 SDVLOG(2) << "Sync cycle failed. Will back off for " 731 SDVLOG(2) << "Sync cycle failed. Will back off for "
717 << wait_interval_->length.InMilliseconds() << "ms."; 732 << wait_interval_->length.InMilliseconds() << "ms.";
718 RestartWaiting(); 733 RestartWaiting();
719 } 734 }
720 } 735 }
721 736
722 void SyncSchedulerImpl::PollTimerCallback() { 737 void SyncSchedulerImpl::PollTimerCallback() {
723 DCHECK(CalledOnValidThread()); 738 DCHECK(CalledOnValidThread());
724 if (no_scheduling_allowed_) { 739 CHECK(!syncer_->IsSyncing());
725 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
726 // functions that are called only on the sync thread. This function is also
727 // called only on the sync thread, and only when it is posted by an expiring
728 // timer. If we find that no_scheduling_allowed_ is set here, then
729 // something is very wrong. Maybe someone mistakenly called us directly, or
730 // mishandled the book-keeping for no_scheduling_allowed_.
731 NOTREACHED() << "Illegal to schedule job while session in progress.";
732 return;
733 }
734 740
735 TrySyncSessionJob(); 741 TrySyncSessionJob();
736 } 742 }
737 743
738 void SyncSchedulerImpl::RetryTimerCallback() { 744 void SyncSchedulerImpl::RetryTimerCallback() {
739 TrySyncSessionJob(); 745 TrySyncSessionJob();
740 } 746 }
741 747
742 void SyncSchedulerImpl::Unthrottle() { 748 void SyncSchedulerImpl::Unthrottle() {
743 DCHECK(CalledOnValidThread()); 749 DCHECK(CalledOnValidThread());
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
819 throttle_duration)); 825 throttle_duration));
820 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 826 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
821 NotifyThrottledTypesChanged(ModelTypeSet::All()); 827 NotifyThrottledTypesChanged(ModelTypeSet::All());
822 } 828 }
823 829
824 void SyncSchedulerImpl::OnTypesThrottled( 830 void SyncSchedulerImpl::OnTypesThrottled(
825 ModelTypeSet types, 831 ModelTypeSet types,
826 const base::TimeDelta& throttle_duration) { 832 const base::TimeDelta& throttle_duration) {
827 base::TimeTicks now = base::TimeTicks::Now(); 833 base::TimeTicks now = base::TimeTicks::Now();
828 834
835 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for "
836 << throttle_duration.InMinutes() << " minutes.";
837
829 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); 838 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
830 base::TimeDelta time_until_next_unthrottle = 839 base::TimeDelta time_until_next_unthrottle =
831 nudge_tracker_.GetTimeUntilNextUnthrottle(now); 840 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
832 type_unthrottle_timer_.Start( 841 type_unthrottle_timer_.Start(
833 FROM_HERE, 842 FROM_HERE,
834 time_until_next_unthrottle, 843 time_until_next_unthrottle,
835 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 844 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
836 weak_ptr_factory_.GetWeakPtr(), 845 weak_ptr_factory_.GetWeakPtr(),
837 now + time_until_next_unthrottle)); 846 now + time_until_next_unthrottle));
838 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 847 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
839 } 848 }
840 849
841 bool SyncSchedulerImpl::IsCurrentlyThrottled() { 850 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
842 DCHECK(CalledOnValidThread()); 851 DCHECK(CalledOnValidThread());
843 return wait_interval_.get() && wait_interval_->mode == 852 return wait_interval_.get() && wait_interval_->mode ==
844 WaitInterval::THROTTLED; 853 WaitInterval::THROTTLED;
845 } 854 }
846 855
847 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 856 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
848 const base::TimeDelta& new_interval) { 857 const base::TimeDelta& new_interval) {
849 DCHECK(CalledOnValidThread()); 858 DCHECK(CalledOnValidThread());
859 if (new_interval == syncer_short_poll_interval_seconds_)
860 return;
861 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes()
862 << " minutes.";
850 syncer_short_poll_interval_seconds_ = new_interval; 863 syncer_short_poll_interval_seconds_ = new_interval;
864 AdjustPolling(UPDATE_INTERVAL);
851 } 865 }
852 866
853 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( 867 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
854 const base::TimeDelta& new_interval) { 868 const base::TimeDelta& new_interval) {
855 DCHECK(CalledOnValidThread()); 869 DCHECK(CalledOnValidThread());
870 if (new_interval == syncer_long_poll_interval_seconds_)
871 return;
872 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes()
873 << " minutes.";
856 syncer_long_poll_interval_seconds_ = new_interval; 874 syncer_long_poll_interval_seconds_ = new_interval;
875 AdjustPolling(UPDATE_INTERVAL);
857 } 876 }
858 877
859 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays( 878 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
860 const std::map<ModelType, base::TimeDelta>& nudge_delays) { 879 const std::map<ModelType, base::TimeDelta>& nudge_delays) {
861 DCHECK(CalledOnValidThread()); 880 DCHECK(CalledOnValidThread());
862 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays); 881 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
863 } 882 }
864 883
865 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { 884 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
866 if (size > 0) 885 if (size > 0)
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
907 926
908 #undef SDVLOG_LOC 927 #undef SDVLOG_LOC
909 928
910 #undef SDVLOG 929 #undef SDVLOG
911 930
912 #undef SLOG 931 #undef SLOG
913 932
914 #undef ENUM_CASE 933 #undef ENUM_CASE
915 934
916 } // namespace syncer 935 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698