| Index: chrome/browser/sync/engine/syncer_thread.cc
|
| diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc
|
| index 5851c45364547141653b8f1f676654e79baf1b89..7966f6713fdd2a404be10aa3043891ee5096a80b 100644
|
| --- a/chrome/browser/sync/engine/syncer_thread.cc
|
| +++ b/chrome/browser/sync/engine/syncer_thread.cc
|
| @@ -5,27 +5,10 @@
|
| #include "chrome/browser/sync/engine/syncer_thread.h"
|
|
|
| #include <algorithm>
|
| -#include <queue>
|
| -#include <string>
|
| -#include <vector>
|
|
|
| #include "base/rand_util.h"
|
| -#include "base/third_party/dynamic_annotations/dynamic_annotations.h"
|
| -#include "build/build_config.h"
|
| -#include "chrome/browser/sync/engine/model_safe_worker.h"
|
| -#include "chrome/browser/sync/engine/net/server_connection_manager.h"
|
| #include "chrome/browser/sync/engine/syncer.h"
|
| -#include "chrome/browser/sync/sessions/sync_session.h"
|
|
|
| -#if defined(OS_MACOSX)
|
| -#include <CoreFoundation/CFNumber.h>
|
| -#include <IOKit/IOTypes.h>
|
| -#include <IOKit/IOKitLib.h>
|
| -#endif
|
| -
|
| -using std::priority_queue;
|
| -using std::min;
|
| -using base::Time;
|
| using base::TimeDelta;
|
| using base::TimeTicks;
|
|
|
| @@ -35,861 +18,843 @@ using sessions::SyncSession;
|
| using sessions::SyncSessionSnapshot;
|
| using sessions::SyncSourceInfo;
|
| using syncable::ModelTypePayloadMap;
|
| +using syncable::ModelTypeBitSet;
|
| +using sync_pb::GetUpdatesCallerInfo;
|
|
|
| -// We use high values here to ensure that failure to receive poll updates from
|
| -// the server doesn't result in rapid-fire polling from the client due to low
|
| -// local limits.
|
| -const int SyncerThread::kDefaultShortPollIntervalSeconds = 3600 * 8;
|
| -const int SyncerThread::kDefaultLongPollIntervalSeconds = 3600 * 12;
|
| -
|
| -// TODO(tim): This is used to regulate the short poll (when notifications are
|
| -// disabled) based on user idle time. If it is set to a smaller value than
|
| -// the short poll interval, it basically does nothing; for now, this is what
|
| -// we want and allows stronger control over the poll rate from the server. We
|
| -// should probably re-visit this code later and figure out if user idle time
|
| -// is really something we want and make sure it works, if it is.
|
| -const int SyncerThread::kDefaultMaxPollIntervalMs = 30 * 60 * 1000;
|
| -
|
| -// Backoff interval randomization factor.
|
| -static const int kBackoffRandomizationFactor = 2;
|
| -
|
| -const int SyncerThread::kMaxBackoffSeconds = 60 * 60 * 4; // 4 hours.
|
| -
|
| -SyncerThread::ProtectedFields::ProtectedFields()
|
| - : stop_syncer_thread_(false),
|
| - pause_requested_(false),
|
| - paused_(false),
|
| - syncer_(NULL),
|
| - connected_(false),
|
| - pending_nudge_source_(kUnknown) {}
|
| -
|
| -SyncerThread::ProtectedFields::~ProtectedFields() {}
|
| -
|
| -void SyncerThread::NudgeSyncerWithPayloads(
|
| - int milliseconds_from_now,
|
| - NudgeSource source,
|
| - const ModelTypePayloadMap& model_types_with_payloads) {
|
| - base::AutoLock lock(lock_);
|
| - if (vault_.syncer_ == NULL) {
|
| - return;
|
| - }
|
| +SyncerThread::DelayProvider::DelayProvider() {}
|
| +SyncerThread::DelayProvider::~DelayProvider() {}
|
|
|
| - NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads);
|
| -}
|
| +SyncerThread::WaitInterval::WaitInterval() {}
|
| +SyncerThread::WaitInterval::~WaitInterval() {}
|
|
|
| -void SyncerThread::NudgeSyncerWithDataTypes(
|
| - int milliseconds_from_now,
|
| - NudgeSource source,
|
| - const syncable::ModelTypeBitSet& model_types) {
|
| - base::AutoLock lock(lock_);
|
| - if (vault_.syncer_ == NULL) {
|
| - return;
|
| - }
|
| +SyncerThread::SyncSessionJob::SyncSessionJob() {}
|
| +SyncerThread::SyncSessionJob::~SyncSessionJob() {}
|
|
|
| - ModelTypePayloadMap model_types_with_payloads =
|
| - syncable::ModelTypePayloadMapFromBitSet(model_types, std::string());
|
| - NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads);
|
| +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
|
| + base::TimeTicks start,
|
| + linked_ptr<sessions::SyncSession> session, bool is_canary_job,
|
| + const tracked_objects::Location& nudge_location) : purpose(purpose),
|
| + scheduled_start(start),
|
| + session(session),
|
| + is_canary_job(is_canary_job),
|
| + nudge_location(nudge_location) {
|
| }
|
|
|
| -void SyncerThread::NudgeSyncer(
|
| - int milliseconds_from_now,
|
| +TimeDelta SyncerThread::DelayProvider::GetDelay(
|
| + const base::TimeDelta& last_delay) {
|
| + return SyncerThread::GetRecommendedDelay(last_delay);
|
| +}
|
| +
|
| +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
|
| NudgeSource source) {
|
| - base::AutoLock lock(lock_);
|
| - if (vault_.syncer_ == NULL) {
|
| - return;
|
| + switch (source) {
|
| + case NUDGE_SOURCE_NOTIFICATION:
|
| + return GetUpdatesCallerInfo::NOTIFICATION;
|
| + case NUDGE_SOURCE_LOCAL:
|
| + return GetUpdatesCallerInfo::LOCAL;
|
| + case NUDGE_SOURCE_CONTINUATION:
|
| + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
|
| + case NUDGE_SOURCE_UNKNOWN:
|
| + return GetUpdatesCallerInfo::UNKNOWN;
|
| + default:
|
| + NOTREACHED();
|
| + return GetUpdatesCallerInfo::UNKNOWN;
|
| }
|
| -
|
| - // Set all enabled datatypes.
|
| - ModelSafeRoutingInfo routes;
|
| - session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
|
| - ModelTypePayloadMap model_types_with_payloads =
|
| - syncable::ModelTypePayloadMapFromRoutingInfo(routes, std::string());
|
| - NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads);
|
| }
|
|
|
| -SyncerThread::SyncerThread(sessions::SyncSessionContext* context)
|
| - : thread_main_started_(false, false),
|
| - thread_("SyncEngine_SyncerThread"),
|
| - vault_field_changed_(&lock_),
|
| - conn_mgr_hookup_(NULL),
|
| - syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
|
| - syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
|
| - syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
|
| - syncer_max_interval_(kDefaultMaxPollIntervalMs),
|
| - session_context_(context),
|
| - disable_idle_detection_(false) {
|
| - DCHECK(context);
|
| +SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
|
| + : mode(mode), had_nudge(false), length(length) { }
|
|
|
| - if (context->connection_manager())
|
| - WatchConnectionManager(context->connection_manager());
|
| +SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
|
| + Syncer* syncer)
|
| + : thread_("SyncEngine_SyncerThread"),
|
| + syncer_short_poll_interval_seconds_(
|
| + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
|
| + syncer_long_poll_interval_seconds_(
|
| + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
|
| + mode_(NORMAL_MODE),
|
| + server_connection_ok_(false),
|
| + delay_provider_(new DelayProvider()),
|
| + syncer_(syncer),
|
| + session_context_(context) {
|
| }
|
|
|
| SyncerThread::~SyncerThread() {
|
| - conn_mgr_hookup_.reset();
|
| - delete vault_.syncer_;
|
| - CHECK(!thread_.IsRunning());
|
| -}
|
| -
|
| -// Creates and starts a syncer thread.
|
| -// Returns true if it creates a thread or if there's currently a thread running
|
| -// and false otherwise.
|
| -bool SyncerThread::Start() {
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - if (thread_.IsRunning()) {
|
| - return true;
|
| - }
|
| + DCHECK(!thread_.IsRunning());
|
| +}
|
| +
|
| +void SyncerThread::CheckServerConnectionManagerStatus(
|
| + HttpResponse::ServerConnectionCode code) {
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
|
| + << "Old mode: " << server_connection_ok_ << " Code: " << code;
|
| + // Note, be careful when adding cases here because if the SyncerThread
|
| + // thinks there is no valid connection as determined by this method, it
|
| + // will drop out of *all* forward progress sync loops (it won't poll and it
|
| + // will queue up Talk notifications but not actually call SyncShare) until
|
| + // some external action causes a ServerConnectionManager to broadcast that
|
| + // a valid connection has been re-established.
|
| + if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
|
| + HttpResponse::SYNC_AUTH_ERROR == code) {
|
| + server_connection_ok_ = false;
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
|
| + << " new mode:" << server_connection_ok_;
|
| + } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
|
| + server_connection_ok_ = true;
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed."
|
| + << " new mode:" << server_connection_ok_;
|
| + DoCanaryJob();
|
| + }
|
| +}
|
|
|
| +void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread "
|
| + << MessageLoop::current()->thread_name();
|
| + if (!thread_.IsRunning()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode "
|
| + << mode;
|
| if (!thread_.Start()) {
|
| - return false;
|
| + NOTREACHED() << "Unable to start SyncerThread.";
|
| + return;
|
| }
|
| + WatchConnectionManager();
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &SyncerThread::SendInitialSnapshot));
|
| }
|
|
|
| - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
|
| - &SyncerThread::ThreadMain));
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = "
|
| + << mode;
|
|
|
| - // Wait for notification that our task makes it safely onto the message
|
| - // loop before returning, so the caller can't call Stop before we're
|
| - // actually up and running. This is for consistency with the old pthread
|
| - // impl because pthread_create would do this in one step.
|
| - thread_main_started_.Wait();
|
| - VLOG(1) << "SyncerThread started.";
|
| - return true;
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback)));
|
| }
|
|
|
| -// Stop processing. A max wait of at least 2*server RTT time is recommended.
|
| -// Returns true if we stopped, false otherwise.
|
| -bool SyncerThread::Stop(int max_wait) {
|
| - RequestSyncerExitAndSetThreadStopConditions();
|
| -
|
| - // This will join, and finish when ThreadMain terminates.
|
| - thread_.Stop();
|
| - return true;
|
| +void SyncerThread::SendInitialSnapshot() {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
|
| + SyncSourceInfo(), ModelSafeRoutingInfo(),
|
| + std::vector<ModelSafeWorker*>()));
|
| + SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
|
| + sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
|
| + event.snapshot = &snapshot;
|
| + session_context_->NotifyListeners(event);
|
| }
|
|
|
| -void SyncerThread::RequestSyncerExitAndSetThreadStopConditions() {
|
| - {
|
| - base::AutoLock lock(lock_);
|
| - // If the thread has been started, then we either already have or are about
|
| - // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
|
| - // it to finish. If the thread has not been started --and we now own the
|
| - // lock-- then we can early out because the caller has not called Start().
|
| - if (!thread_.IsRunning())
|
| - return;
|
| +void SyncerThread::WatchConnectionManager() {
|
| + ServerConnectionManager* scm = session_context_->connection_manager();
|
| + CheckServerConnectionManagerStatus(scm->server_status());
|
| + scm->AddListener(this);
|
| +}
|
|
|
| - VLOG(1) << "SyncerThread::Stop - setting ThreadMain exit condition to true "
|
| - "(vault_.stop_syncer_thread_)";
|
| - // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
|
| - // below).
|
| - vault_.stop_syncer_thread_ = true;
|
| - if (NULL != vault_.syncer_) {
|
| - // Try to early exit the syncer itself, which could be looping inside
|
| - // SyncShare.
|
| - vault_.syncer_->RequestEarlyExit();
|
| - }
|
| +void SyncerThread::StartImpl(Mode mode,
|
| + linked_ptr<ModeChangeCallback> callback) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
|
| + << mode;
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + DCHECK(!session_context_->account_name().empty());
|
| + DCHECK(syncer_.get());
|
| + mode_ = mode;
|
| + AdjustPolling(NULL); // Will kick start poll timer if needed.
|
| + if (callback.get())
|
| + callback->Run();
|
|
|
| - // stop_syncer_thread_ is now true and the Syncer has been told to exit.
|
| - // We want to wake up all waiters so they can re-examine state. We signal,
|
| - // causing all waiters to try to re-acquire the lock, and then we release
|
| - // the lock, and join on our internal thread which should soon run off the
|
| - // end of ThreadMain.
|
| - vault_field_changed_.Broadcast();
|
| - }
|
| + // We just changed our mode. See if there are any pending jobs that we could
|
| + // execute in the new mode.
|
| + DoPendingJobIfPossible(false);
|
| }
|
|
|
| -bool SyncerThread::RequestPause() {
|
| - base::AutoLock lock(lock_);
|
| - if (vault_.pause_requested_ || vault_.paused_)
|
| - return false;
|
| +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
|
| + const SyncSessionJob& job) {
|
|
|
| - if (thread_.IsRunning()) {
|
| - // Set the pause request. The syncer thread will read this
|
| - // request, enter the paused state, and send the PAUSED
|
| - // notification.
|
| - vault_.pause_requested_ = true;
|
| - vault_field_changed_.Broadcast();
|
| - VLOG(1) << "Pause requested.";
|
| - } else {
|
| - // If the thread is not running, go directly into the paused state
|
| - // and notify.
|
| - EnterPausedState();
|
| - VLOG(1) << "Paused while not running.";
|
| - }
|
| - return true;
|
| -}
|
| + DCHECK(wait_interval_.get());
|
| + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
|
|
|
| -void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
|
| - session_context_->NotifyListeners(SyncEngineEvent(cause));
|
| -}
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : "
|
| + << wait_interval_->mode << "Wait interval had nudge : "
|
| + << wait_interval_->had_nudge << "is canary job : "
|
| + << job.is_canary_job;
|
|
|
| -bool SyncerThread::RequestResume() {
|
| - base::AutoLock lock(lock_);
|
| - // Only valid to request a resume when we are already paused or we
|
| - // have a pause pending.
|
| - if (!(vault_.paused_ || vault_.pause_requested_))
|
| - return false;
|
| -
|
| - if (thread_.IsRunning()) {
|
| - if (vault_.pause_requested_) {
|
| - // If pause was requested we have not yet paused. In this case,
|
| - // the resume cancels the pause request.
|
| - vault_.pause_requested_ = false;
|
| - vault_field_changed_.Broadcast();
|
| - Notify(SyncEngineEvent::SYNCER_THREAD_RESUMED);
|
| - VLOG(1) << "Pending pause canceled by resume.";
|
| - } else {
|
| - // Unpause and notify.
|
| - vault_.paused_ = false;
|
| - vault_field_changed_.Broadcast();
|
| - }
|
| - } else {
|
| - ExitPausedState();
|
| - VLOG(1) << "Resumed while not running.";
|
| - }
|
| - return true;
|
| -}
|
| + if (job.purpose == SyncSessionJob::POLL)
|
| + return DROP;
|
|
|
| -void SyncerThread::OnReceivedLongPollIntervalUpdate(
|
| - const base::TimeDelta& new_interval) {
|
| - syncer_long_poll_interval_seconds_ = static_cast<int>(
|
| - new_interval.InSeconds());
|
| -}
|
| + DCHECK(job.purpose == SyncSessionJob::NUDGE ||
|
| + job.purpose == SyncSessionJob::CONFIGURATION);
|
| + if (wait_interval_->mode == WaitInterval::THROTTLED)
|
| + return SAVE;
|
|
|
| -void SyncerThread::OnReceivedShortPollIntervalUpdate(
|
| - const base::TimeDelta& new_interval) {
|
| - syncer_short_poll_interval_seconds_ = static_cast<int>(
|
| - new_interval.InSeconds());
|
| -}
|
| + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
|
| + if (job.purpose == SyncSessionJob::NUDGE) {
|
| + if (mode_ == CONFIGURATION_MODE)
|
| + return SAVE;
|
|
|
| -void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
|
| - silenced_until_ = silenced_until;
|
| + // If we already had one nudge then just drop this nudge. We will retry
|
| + // later when the timer runs out.
|
| + return wait_interval_->had_nudge ? DROP : CONTINUE;
|
| + }
|
| + // This is a config job.
|
| + return job.is_canary_job ? CONTINUE : SAVE;
|
| }
|
|
|
| -bool SyncerThread::IsSyncingCurrentlySilenced() {
|
| - // We should ignore reads from silenced_until_ under ThreadSanitizer
|
| - // since this is a benign race.
|
| - ANNOTATE_IGNORE_READS_BEGIN();
|
| - bool ret = (silenced_until_ - TimeTicks::Now()) >= TimeDelta::FromSeconds(0);
|
| - ANNOTATE_IGNORE_READS_END();
|
| - return ret;
|
| -}
|
| +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
|
| + const SyncSessionJob& job) {
|
| + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
|
| + return CONTINUE;
|
|
|
| -void SyncerThread::OnShouldStopSyncingPermanently() {
|
| - RequestSyncerExitAndSetThreadStopConditions();
|
| - Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
|
| -}
|
| + if (wait_interval_.get())
|
| + return DecideWhileInWaitInterval(job);
|
|
|
| -void SyncerThread::ThreadMainLoop() {
|
| - // This is called with lock_ acquired.
|
| - lock_.AssertAcquired();
|
| - VLOG(1) << "In thread main loop.";
|
| -
|
| - // Use the short poll value by default.
|
| - vault_.current_wait_interval_.poll_delta =
|
| - TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
|
| - int user_idle_milliseconds = 0;
|
| - TimeTicks last_sync_time;
|
| - bool initial_sync_for_thread = true;
|
| - bool continue_sync_cycle = false;
|
| -
|
| -#if defined(OS_LINUX)
|
| - idle_query_.reset(new IdleQueryLinux());
|
| -#endif
|
| -
|
| - if (vault_.syncer_ == NULL) {
|
| - VLOG(1) << "Syncer thread waiting for database initialization.";
|
| - while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
|
| - vault_field_changed_.Wait();
|
| - VLOG_IF(1, !(vault_.syncer_ == NULL)) << "Syncer was found after DB "
|
| - "started.";
|
| + if (mode_ == CONFIGURATION_MODE) {
|
| + if (job.purpose == SyncSessionJob::NUDGE)
|
| + return SAVE;
|
| + else if (job.purpose == SyncSessionJob::CONFIGURATION)
|
| + return CONTINUE;
|
| + else
|
| + return DROP;
|
| }
|
|
|
| - while (!vault_.stop_syncer_thread_) {
|
| - // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
|
| - // below) because we cannot poll until these conditions are met, so we wait
|
| - // indefinitely.
|
| -
|
| - // If we are not connected, enter WaitUntilConnectedOrQuit() which
|
| - // will return only when the network is connected or a quit is
|
| - // requested. Note that it is possible to exit
|
| - // WaitUntilConnectedOrQuit() in the paused state which will be
|
| - // handled by the next statement.
|
| - if (!vault_.connected_ && !initial_sync_for_thread) {
|
| - WaitUntilConnectedOrQuit();
|
| - continue;
|
| - }
|
| + // We are in normal mode.
|
| + DCHECK_EQ(mode_, NORMAL_MODE);
|
| + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
|
|
|
| - // Check if we should be paused or if a pause was requested. Note
|
| - // that we don't check initial_sync_for_thread here since we want
|
| - // the pause to happen regardless if it is the initial sync or not.
|
| - if (vault_.pause_requested_ || vault_.paused_) {
|
| - PauseUntilResumedOrQuit();
|
| - continue;
|
| - }
|
| + // Freshness condition
|
| + if (job.scheduled_start < last_sync_session_end_time_) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Dropping job because of freshness";
|
| + return DROP;
|
| + }
|
|
|
| - const TimeTicks next_poll = last_sync_time +
|
| - vault_.current_wait_interval_.poll_delta;
|
| - bool throttled = vault_.current_wait_interval_.mode ==
|
| - WaitInterval::THROTTLED;
|
| - // If we are throttled, we must wait. Otherwise, wait until either the next
|
| - // nudge (if one exists) or the poll interval.
|
| - TimeTicks end_wait = next_poll;
|
| - if (!throttled && !vault_.pending_nudge_time_.is_null()) {
|
| - end_wait = std::min(end_wait, vault_.pending_nudge_time_);
|
| - }
|
| - VLOG(1) << "end_wait is " << end_wait.ToInternalValue()
|
| - << "\nnext_poll is " << next_poll.ToInternalValue();
|
| -
|
| - // We block until the CV is signaled (e.g a control field changed, loss of
|
| - // network connection, nudge, spurious, etc), or the poll interval elapses.
|
| - TimeDelta sleep_time = end_wait - TimeTicks::Now();
|
| - if (!initial_sync_for_thread && sleep_time > TimeDelta::FromSeconds(0)) {
|
| - vault_field_changed_.TimedWait(sleep_time);
|
| -
|
| - if (TimeTicks::Now() < end_wait) {
|
| - // Didn't timeout. Could be a spurious signal, or a signal corresponding
|
| - // to an actual change in one of our control fields. By continuing here
|
| - // we perform the typical "always recheck conditions when signaled",
|
| - // (typically handled by a while(condition_not_met) cv.wait() construct)
|
| - // because we jump to the top of the loop. The main difference is we
|
| - // recalculate the wait interval, but last_sync_time won't have changed.
|
| - // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
|
| - // off the queue and wait for that delta. If it was a spurious signal,
|
| - // we'll keep waiting for the same moment in time as we just were.
|
| - continue;
|
| - }
|
| - }
|
| + if (server_connection_ok_)
|
| + return CONTINUE;
|
|
|
| - // Handle a nudge, caused by either a notification or a local bookmark
|
| - // event. This will also update the source of the following SyncMain call.
|
| - VLOG(1) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
|
| - bool nudged = false;
|
| - scoped_ptr<SyncSession> session;
|
| - session.reset(SyncMain(vault_.syncer_,
|
| - throttled, continue_sync_cycle, &initial_sync_for_thread, &nudged));
|
| -
|
| - // Update timing information for how often these datatypes are triggering
|
| - // nudges.
|
| - base::TimeTicks now = TimeTicks::Now();
|
| - if (!last_sync_time.is_null()) {
|
| - ModelTypePayloadMap::const_iterator iter;
|
| - for (iter = session->source().types.begin();
|
| - iter != session->source().types.end();
|
| - ++iter) {
|
| - syncable::PostTimeToTypeHistogram(iter->first,
|
| - now - last_sync_time);
|
| - }
|
| - }
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Bad server connection. Using that to decide on job.";
|
| + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
|
| +}
|
| +
|
| +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
|
| + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
|
| + if (pending_nudge_.get() == NULL) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Creating a pending nudge job";
|
| + SyncSession* s = job.session.get();
|
| + scoped_ptr<SyncSession> session(new SyncSession(s->context(),
|
| + s->delegate(), s->source(), s->routing_info(), s->workers()));
|
|
|
| - last_sync_time = now;
|
| + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
|
| + make_linked_ptr(session.release()), false, job.nudge_location);
|
| + pending_nudge_.reset(new SyncSessionJob(new_job));
|
|
|
| - VLOG(1) << "Updating the next polling time after SyncMain";
|
| - vault_.current_wait_interval_ = CalculatePollingWaitTime(
|
| - static_cast<int>(vault_.current_wait_interval_.poll_delta.InSeconds()),
|
| - &user_idle_milliseconds, &continue_sync_cycle, nudged);
|
| + return;
|
| }
|
| -#if defined(OS_LINUX)
|
| - idle_query_.reset();
|
| -#endif
|
| -}
|
|
|
| -void SyncerThread::SetConnected(bool connected) {
|
| - DCHECK(!thread_.IsRunning());
|
| - vault_.connected_ = connected;
|
| -}
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
|
| + pending_nudge_->session->Coalesce(*(job.session.get()));
|
| + pending_nudge_->scheduled_start = job.scheduled_start;
|
|
|
| -void SyncerThread::SetSyncerPollingInterval(base::TimeDelta interval) {
|
| - // TODO(timsteele): Use TimeDelta internally.
|
| - syncer_polling_interval_ = static_cast<int>(interval.InSeconds());
|
| + // Unfortunately the nudge location cannot be modified. So it stores the
|
| + // location of the first caller.
|
| }
|
|
|
| -void SyncerThread::SetSyncerShortPollInterval(base::TimeDelta interval) {
|
| - // TODO(timsteele): Use TimeDelta internally.
|
| - syncer_short_poll_interval_seconds_ =
|
| - static_cast<int>(interval.InSeconds());
|
| -}
|
| +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
|
| + JobProcessDecision decision = DecideOnJob(job);
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: "
|
| + << decision << " Job purpose " << job.purpose << "mode " << mode_;
|
| + if (decision != SAVE)
|
| + return decision == CONTINUE;
|
|
|
| -void SyncerThread::WaitUntilConnectedOrQuit() {
|
| - VLOG(1) << "Syncer thread waiting for connection.";
|
| - Notify(SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION);
|
| + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
|
| + SyncSessionJob::CONFIGURATION);
|
|
|
| - bool is_paused = vault_.paused_;
|
| + SaveJob(job);
|
| + return false;
|
| +}
|
|
|
| - while (!vault_.connected_ && !vault_.stop_syncer_thread_) {
|
| - if (!is_paused && vault_.pause_requested_) {
|
| - // If we get a pause request while waiting for a connection,
|
| - // enter the paused state.
|
| - EnterPausedState();
|
| - is_paused = true;
|
| - VLOG(1) << "Syncer thread entering disconnected pause.";
|
| - }
|
| +void SyncerThread::SaveJob(const SyncSessionJob& job) {
|
| + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
|
| + if (job.purpose == SyncSessionJob::NUDGE) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job";
|
| + InitOrCoalescePendingJob(job);
|
| + } else if (job.purpose == SyncSessionJob::CONFIGURATION){
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job";
|
| + DCHECK(wait_interval_.get());
|
| + DCHECK(mode_ == CONFIGURATION_MODE);
|
|
|
| - if (is_paused && !vault_.paused_) {
|
| - ExitPausedState();
|
| - is_paused = false;
|
| - VLOG(1) << "Syncer thread exiting disconnected pause.";
|
| - }
|
| + SyncSession* old = job.session.get();
|
| + SyncSession* s(new SyncSession(session_context_.get(), this,
|
| + old->source(), old->routing_info(), old->workers()));
|
| + SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
|
| + make_linked_ptr(s), false, job.nudge_location);
|
| + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
|
| + } // drop the rest.
|
| +}
|
|
|
| - vault_field_changed_.Wait();
|
| +// Functor for std::find_if to search by ModelSafeGroup.
|
| +struct ModelSafeWorkerGroupIs {
|
| + explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
|
| + bool operator()(ModelSafeWorker* w) {
|
| + return group == w->GetModelSafeGroup();
|
| }
|
| + ModelSafeGroup group;
|
| +};
|
|
|
| - if (!vault_.stop_syncer_thread_) {
|
| - Notify(SyncEngineEvent::SYNCER_THREAD_CONNECTED);
|
| - VLOG(1) << "Syncer thread found connection.";
|
| +void SyncerThread::ScheduleClearUserData() {
|
| + if (!thread_.IsRunning()) {
|
| + NOTREACHED();
|
| + return;
|
| }
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &SyncerThread::ScheduleClearUserDataImpl));
|
| }
|
|
|
| -void SyncerThread::PauseUntilResumedOrQuit() {
|
| - VLOG(1) << "Syncer thread entering pause.";
|
| - // If pause was requested (rather than already being paused), send
|
| - // the PAUSED notification.
|
| - if (vault_.pause_requested_)
|
| - EnterPausedState();
|
| -
|
| - // Thread will get stuck here until either a resume is requested
|
| - // or shutdown is started.
|
| - while (vault_.paused_ && !vault_.stop_syncer_thread_)
|
| - vault_field_changed_.Wait();
|
| +void SyncerThread::ScheduleNudge(const TimeDelta& delay,
|
| + NudgeSource source, const ModelTypeBitSet& types,
|
| + const tracked_objects::Location& nudge_location) {
|
| + if (!thread_.IsRunning()) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
|
|
| - // Notify that we have resumed if we are not shutting down.
|
| - if (!vault_.stop_syncer_thread_)
|
| - ExitPausedState();
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled";
|
|
|
| - VLOG(1) << "Syncer thread exiting pause.";
|
| + ModelTypePayloadMap types_with_payloads =
|
| + syncable::ModelTypePayloadMapFromBitSet(types, std::string());
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &SyncerThread::ScheduleNudgeImpl, delay,
|
| + GetUpdatesFromNudgeSource(source), types_with_payloads, false,
|
| + nudge_location));
|
| }
|
|
|
| -void SyncerThread::EnterPausedState() {
|
| - lock_.AssertAcquired();
|
| - vault_.pause_requested_ = false;
|
| - vault_.paused_ = true;
|
| - vault_field_changed_.Broadcast();
|
| - Notify(SyncEngineEvent::SYNCER_THREAD_PAUSED);
|
| -}
|
| +void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
|
| + NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
|
| + const tracked_objects::Location& nudge_location) {
|
| + if (!thread_.IsRunning()) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";
|
|
|
| -void SyncerThread::ExitPausedState() {
|
| - lock_.AssertAcquired();
|
| - vault_.paused_ = false;
|
| - vault_field_changed_.Broadcast();
|
| - Notify(SyncEngineEvent::SYNCER_THREAD_RESUMED);
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &SyncerThread::ScheduleNudgeImpl, delay,
|
| + GetUpdatesFromNudgeSource(source), types_with_payloads, false,
|
| + nudge_location));
|
| }
|
|
|
| -void SyncerThread::DisableIdleDetection() {
|
| - disable_idle_detection_ = true;
|
| +void SyncerThread::ScheduleClearUserDataImpl() {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + SyncSession* session = new SyncSession(session_context_.get(), this,
|
| + SyncSourceInfo(), ModelSafeRoutingInfo(),
|
| + std::vector<ModelSafeWorker*>());
|
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
|
| + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
|
| }
|
|
|
| -// We check how long the user's been idle and sync less often if the machine is
|
| -// not in use. The aim is to reduce server load.
|
| -SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime(
|
| - int last_poll_wait, // Time in seconds.
|
| - int* user_idle_milliseconds,
|
| - bool* continue_sync_cycle,
|
| - bool was_nudged) {
|
| - lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock.
|
| - WaitInterval return_interval;
|
| +void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
|
| + GetUpdatesCallerInfo::GetUpdatesSource source,
|
| + const ModelTypePayloadMap& types_with_payloads,
|
| + bool is_canary_job, const tracked_objects::Location& nudge_location) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
|
|
| - // Server initiated throttling trumps everything.
|
| - if (!silenced_until_.is_null()) {
|
| - // We don't need to reset other state, it can continue where it left off.
|
| - return_interval.mode = WaitInterval::THROTTLED;
|
| - return_interval.poll_delta = silenced_until_ - TimeTicks::Now();
|
| - return return_interval;
|
| - }
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
|
| + // Note we currently nudge for all types regardless of the ones incurring
|
| + // the nudge. Doing different would throw off some syncer commands like
|
| + // CleanupDisabledTypes. We may want to change this in the future.
|
| + SyncSourceInfo info(source, types_with_payloads);
|
|
|
| - bool is_continuing_sync_cyle = *continue_sync_cycle;
|
| - *continue_sync_cycle = false;
|
| + SyncSession* session(CreateSyncSession(info));
|
| + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
|
| + make_linked_ptr(session), is_canary_job,
|
| + nudge_location);
|
|
|
| - // Determine if the syncer has unfinished work to do.
|
| - SyncSessionSnapshot* snapshot = session_context_->previous_session_snapshot();
|
| - const bool syncer_has_work_to_do = snapshot &&
|
| - (snapshot->num_server_changes_remaining > 0 ||
|
| - snapshot->unsynced_count > 0);
|
| - VLOG(1) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
|
| + session = NULL;
|
| + if (!ShouldRunJob(job))
|
| + return;
|
|
|
| - // First calculate the expected wait time, figuring in any backoff because of
|
| - // user idle time. next_wait is in seconds
|
| - syncer_polling_interval_ = (!session_context_->notifications_enabled()) ?
|
| - syncer_short_poll_interval_seconds_ :
|
| - syncer_long_poll_interval_seconds_;
|
| - int default_next_wait = syncer_polling_interval_;
|
| - return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait);
|
| -
|
| - if (syncer_has_work_to_do) {
|
| - // Provide exponential backoff due to consecutive errors, else attempt to
|
| - // complete the work as soon as possible.
|
| - if (is_continuing_sync_cyle) {
|
| - return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF;
|
| - if (was_nudged && vault_.current_wait_interval_.mode ==
|
| - WaitInterval::EXPONENTIAL_BACKOFF) {
|
| - // We were nudged, it failed, and we were already in backoff.
|
| - return_interval.had_nudge_during_backoff = true;
|
| - // Keep exponent for exponential backoff the same in this case.
|
| - return_interval.poll_delta = vault_.current_wait_interval_.poll_delta;
|
| - } else {
|
| - // We weren't nudged, or we were in a NORMAL wait interval until now.
|
| - return_interval.poll_delta = TimeDelta::FromSeconds(
|
| - GetRecommendedDelaySeconds(last_poll_wait));
|
| - }
|
| - } else {
|
| - // No consecutive error.
|
| - return_interval.poll_delta = TimeDelta::FromSeconds(
|
| - GetRecommendedDelaySeconds(0));
|
| + if (pending_nudge_.get()) {
|
| + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
|
| + << "we are in backoff";
|
| + return;
|
| }
|
| - *continue_sync_cycle = true;
|
| - } else if (!session_context_->notifications_enabled()) {
|
| - // Ensure that we start exponential backoff from our base polling
|
| - // interval when we are not continuing a sync cycle.
|
| - last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
|
| -
|
| - // Did the user start interacting with the computer again?
|
| - // If so, revise our idle time (and probably next_sync_time) downwards
|
| - int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
|
| - if (new_idle_time < *user_idle_milliseconds) {
|
| - *user_idle_milliseconds = new_idle_time;
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
|
| + pending_nudge_->session->Coalesce(*(job.session.get()));
|
| +
|
| + if (!IsBackingOff()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
|
| + << " we are not in backoff and the job was coalesced";
|
| + return;
|
| + } else {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Rescheduling pending nudge";
|
| + SyncSession* s = pending_nudge_->session.get();
|
| + job.session.reset(new SyncSession(s->context(), s->delegate(),
|
| + s->source(), s->routing_info(), s->workers()));
|
| + pending_nudge_.reset();
|
| }
|
| - return_interval.poll_delta = TimeDelta::FromMilliseconds(
|
| - CalculateSyncWaitTime(last_poll_wait * 1000,
|
| - *user_idle_milliseconds));
|
| - DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait);
|
| }
|
|
|
| - VLOG(1) << "Sync wait: idle " << default_next_wait
|
| - << " non-idle or backoff " << return_interval.poll_delta.InSeconds();
|
| -
|
| - return return_interval;
|
| + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
|
| + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
|
| + nudge_location);
|
| }
|
|
|
| -void SyncerThread::ThreadMain() {
|
| - base::AutoLock lock(lock_);
|
| - // Signal Start() to let it know we've made it safely onto the message loop,
|
| - // and unblock it's caller.
|
| - thread_main_started_.Signal();
|
| - ThreadMainLoop();
|
| - VLOG(1) << "Syncer thread ThreadMain is done.";
|
| - Notify(SyncEngineEvent::SYNCER_THREAD_EXITING);
|
| -}
|
| +// Helper to extract the routing info and workers corresponding to types in
|
| +// |types| from |registrar|.
|
| +void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
|
| + ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
|
| + std::vector<ModelSafeWorker*>* workers) {
|
| + ModelSafeRoutingInfo r_tmp;
|
| + std::vector<ModelSafeWorker*> w_tmp;
|
| + registrar->GetModelSafeRoutingInfo(&r_tmp);
|
| + registrar->GetWorkers(&w_tmp);
|
|
|
| -SyncSession* SyncerThread::SyncMain(Syncer* syncer, bool was_throttled,
|
| - bool continue_sync_cycle, bool* initial_sync_for_thread,
|
| - bool* was_nudged) {
|
| - CHECK(syncer);
|
| + typedef std::vector<ModelSafeWorker*>::const_iterator iter;
|
| + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
|
| + if (!types.test(i))
|
| + continue;
|
| + syncable::ModelType t = syncable::ModelTypeFromInt(i);
|
| + DCHECK_EQ(1U, r_tmp.count(t));
|
| + (*routes)[t] = r_tmp[t];
|
| + iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
|
| + ModelSafeWorkerGroupIs(r_tmp[t]));
|
| + if (it != w_tmp.end())
|
| + workers->push_back(*it);
|
| + else
|
| + NOTREACHED();
|
| + }
|
| +
|
| + iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
|
| + ModelSafeWorkerGroupIs(GROUP_PASSIVE));
|
| + if (it != w_tmp.end())
|
| + workers->push_back(*it);
|
| + else
|
| + NOTREACHED();
|
| +}
|
|
|
| - // Since we are initiating a new session for which we are the delegate, we
|
| - // are not currently silenced so reset this state for the next session which
|
| - // may need to use it.
|
| - silenced_until_ = base::TimeTicks();
|
| +void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
|
| + if (!thread_.IsRunning()) {
|
| + NOTREACHED();
|
| + return;
|
| + }
|
|
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config";
|
| ModelSafeRoutingInfo routes;
|
| std::vector<ModelSafeWorker*> workers;
|
| - session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
|
| - session_context_->registrar()->GetWorkers(&workers);
|
| - SyncSourceInfo info(GetAndResetNudgeSource(was_throttled,
|
| - continue_sync_cycle, initial_sync_for_thread, was_nudged));
|
| - scoped_ptr<SyncSession> session;
|
| -
|
| - base::AutoUnlock unlock(lock_);
|
| - do {
|
| - session.reset(new SyncSession(session_context_.get(), this,
|
| - info, routes, workers));
|
| - VLOG(1) << "Calling SyncShare.";
|
| - syncer->SyncShare(session.get());
|
| - } while (session->HasMoreToSync() && silenced_until_.is_null());
|
| -
|
| - VLOG(1) << "Done calling SyncShare.";
|
| - return session.release();
|
| -}
|
| -
|
| -SyncSourceInfo SyncerThread::GetAndResetNudgeSource(bool was_throttled,
|
| - bool continue_sync_cycle,
|
| - bool* initial_sync,
|
| - bool* was_nudged) {
|
| - bool nudged = false;
|
| - NudgeSource nudge_source = kUnknown;
|
| - ModelTypePayloadMap model_types_with_payloads;
|
| - // Has the previous sync cycle completed?
|
| - if (continue_sync_cycle)
|
| - nudge_source = kContinuation;
|
| - // Update the nudge source if a new nudge has come through during the
|
| - // previous sync cycle.
|
| - if (!vault_.pending_nudge_time_.is_null()) {
|
| - if (!was_throttled) {
|
| - nudge_source = vault_.pending_nudge_source_;
|
| - model_types_with_payloads = vault_.pending_nudge_types_;
|
| - nudged = true;
|
| - }
|
| - VLOG(1) << "Clearing pending nudge from " << vault_.pending_nudge_source_
|
| - << " at tick " << vault_.pending_nudge_time_.ToInternalValue();
|
| - vault_.pending_nudge_source_ = kUnknown;
|
| - vault_.pending_nudge_types_.clear();
|
| - vault_.pending_nudge_time_ = base::TimeTicks();
|
| + GetModelSafeParamsForTypes(types, session_context_->registrar(),
|
| + &routes, &workers);
|
| +
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
|
| + this, &SyncerThread::ScheduleConfigImpl, routes, workers,
|
| + GetUpdatesCallerInfo::FIRST_UPDATE));
|
| +}
|
| +
|
| +void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
|
| + const std::vector<ModelSafeWorker*>& workers,
|
| + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
|
| + // TODO(tim): config-specific GetUpdatesCallerInfo value?
|
| + SyncSession* session = new SyncSession(session_context_.get(), this,
|
| + SyncSourceInfo(source,
|
| + syncable::ModelTypePayloadMapFromRoutingInfo(
|
| + routing_info, std::string())),
|
| + routing_info, workers);
|
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
|
| + SyncSessionJob::CONFIGURATION, session, FROM_HERE);
|
| +}
|
| +
|
| +void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
|
| + SyncSessionJob::SyncSessionJobPurpose purpose,
|
| + sessions::SyncSession* session,
|
| + const tracked_objects::Location& nudge_location) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| +
|
| + SyncSessionJob job(purpose, TimeTicks::Now() + delay,
|
| + make_linked_ptr(session), false, nudge_location);
|
| + if (purpose == SyncSessionJob::NUDGE) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
|
| + << " ScheduleSyncSessionJob";
|
| + DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
|
| + pending_nudge_.reset(new SyncSessionJob(job));
|
| + }
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Posting job to execute in DoSyncSessionJob. Job purpose "
|
| + << job.purpose;
|
| + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
|
| + &SyncerThread::DoSyncSessionJob, job),
|
| + delay.InMilliseconds());
|
| +}
|
| +
|
| +void SyncerThread::SetSyncerStepsForPurpose(
|
| + SyncSessionJob::SyncSessionJobPurpose purpose,
|
| + SyncerStep* start, SyncerStep* end) {
|
| + *end = SYNCER_END;
|
| + switch (purpose) {
|
| + case SyncSessionJob::CONFIGURATION:
|
| + *start = DOWNLOAD_UPDATES;
|
| + *end = APPLY_UPDATES;
|
| + return;
|
| + case SyncSessionJob::CLEAR_USER_DATA:
|
| + *start = CLEAR_PRIVATE_DATA;
|
| + return;
|
| + case SyncSessionJob::NUDGE:
|
| + case SyncSessionJob::POLL:
|
| + *start = SYNCER_BEGIN;
|
| + return;
|
| + default:
|
| + NOTREACHED();
|
| }
|
| +}
|
|
|
| - *was_nudged = nudged;
|
| -
|
| - // TODO(tim): Hack for bug 64136 to correctly tag continuations that result
|
| - // from syncer having more work to do. This will be handled properly with
|
| - // the message loop based syncer thread, bug 26339.
|
| - return MakeSyncSourceInfo(nudged || nudge_source == kContinuation,
|
| - nudge_source, model_types_with_payloads, initial_sync);
|
| -}
|
| -
|
| -SyncSourceInfo SyncerThread::MakeSyncSourceInfo(bool nudged,
|
| - NudgeSource nudge_source,
|
| - const ModelTypePayloadMap& model_types_with_payloads,
|
| - bool* initial_sync) {
|
| - sync_pb::GetUpdatesCallerInfo::GetUpdatesSource updates_source =
|
| - sync_pb::GetUpdatesCallerInfo::UNKNOWN;
|
| - if (*initial_sync) {
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
|
| - *initial_sync = false;
|
| - } else if (!nudged) {
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
|
| +void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + if (!ShouldRunJob(job))
|
| + return;
|
| +
|
| + if (job.purpose == SyncSessionJob::NUDGE) {
|
| + DCHECK(pending_nudge_.get());
|
| + if (pending_nudge_->session != job.session)
|
| + return; // Another nudge must have been scheduled in in the meantime.
|
| + pending_nudge_.reset();
|
| + }
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
|
| + << job.purpose;
|
| +
|
| + SyncerStep begin(SYNCER_BEGIN);
|
| + SyncerStep end(SYNCER_END);
|
| + SetSyncerStepsForPurpose(job.purpose, &begin, &end);
|
| +
|
| + bool has_more_to_sync = true;
|
| + while (ShouldRunJob(job) && has_more_to_sync) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " SyncerThread: Calling SyncShare.";
|
| + // Synchronously perform the sync session from this thread.
|
| + syncer_->SyncShare(job.session.get(), begin, end);
|
| + has_more_to_sync = job.session->HasMoreToSync();
|
| + if (has_more_to_sync)
|
| + job.session->ResetTransientState();
|
| + }
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " SyncerThread: Done SyncShare looping.";
|
| + FinishSyncSessionJob(job);
|
| +}
|
| +
|
| +void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
|
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| + // Whatever types were part of a configuration task will have had updates
|
| + // downloaded. For that reason, we make sure they get recorded in the
|
| + // event that they get disabled at a later time.
|
| + ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
|
| + if (!r.empty()) {
|
| + ModelSafeRoutingInfo temp_r;
|
| + ModelSafeRoutingInfo old_info(old_job.session->routing_info());
|
| + std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
|
| + std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
|
| + session_context_->set_previous_session_routing_info(temp_r);
|
| + }
|
| } else {
|
| - switch (nudge_source) {
|
| - case kNotification:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
|
| - break;
|
| - case kLocal:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
|
| - break;
|
| - case kContinuation:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
|
| - break;
|
| - case kClearPrivateData:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA;
|
| - break;
|
| - case kUnknown:
|
| - default:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
|
| - break;
|
| + session_context_->set_previous_session_routing_info(
|
| + old_job.session->routing_info());
|
| + }
|
| +}
|
| +
|
| +void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + // Update timing information for how often datatypes are triggering nudges.
|
| + base::TimeTicks now = TimeTicks::Now();
|
| + if (!last_sync_session_end_time_.is_null()) {
|
| + ModelTypePayloadMap::const_iterator iter;
|
| + for (iter = job.session->source().types.begin();
|
| + iter != job.session->source().types.end();
|
| + ++iter) {
|
| + syncable::PostTimeToTypeHistogram(iter->first,
|
| + now - last_sync_session_end_time_);
|
| }
|
| }
|
| + last_sync_session_end_time_ = now;
|
| + UpdateCarryoverSessionState(job);
|
| + if (IsSyncingCurrentlySilenced()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " We are currently throttled. So not scheduling the next sync.";
|
| + SaveJob(job);
|
| + return; // Nothing to do.
|
| + }
|
|
|
| - ModelTypePayloadMap sync_source_types;
|
| - if (model_types_with_payloads.empty()) {
|
| - // No datatypes requested. This must be a poll so set all enabled datatypes.
|
| - ModelSafeRoutingInfo routes;
|
| - session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
|
| - sync_source_types = syncable::ModelTypePayloadMapFromRoutingInfo(routes,
|
| - std::string());
|
| - } else {
|
| - sync_source_types = model_types_with_payloads;
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Updating the next polling time after SyncMain";
|
| + ScheduleNextSync(job);
|
| +}
|
| +
|
| +void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + DCHECK(!old_job.session->HasMoreToSync());
|
| + // Note: |num_server_changes_remaining| > 0 here implies that we received a
|
| + // broken response while trying to download all updates, because the Syncer
|
| + // will loop until this value is exhausted. Also, if unsynced_handles exist
|
| + // but HasMoreToSync is false, this implies that the Syncer determined no
|
| + // forward progress was possible at this time (an error, such as an HTTP
|
| + // 500, is likely to have occurred during commit).
|
| + const bool work_to_do =
|
| + old_job.session->status_controller()->num_server_changes_remaining() > 0
|
| + || old_job.session->status_controller()->unsynced_handles().size() > 0;
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: "
|
| + << work_to_do;
|
| +
|
| + AdjustPolling(&old_job);
|
| +
|
| + // TODO(tim): Old impl had special code if notifications disabled. Needed?
|
| + if (!work_to_do) {
|
| + // Success implies backoff relief. Note that if this was a "one-off" job
|
| + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
|
| + // work_to_do before it ran this wont have changed, as jobs like this don't
|
| + // run a full sync cycle. So we don't need special code here.
|
| + wait_interval_.reset();
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Job suceeded so not scheduling more jobs";
|
| + return;
|
| }
|
|
|
| - return SyncSourceInfo(updates_source, sync_source_types);
|
| -}
|
| -
|
| -void SyncerThread::CreateSyncer(const std::string& dirname) {
|
| - base::AutoLock lock(lock_);
|
| - VLOG(1) << "Creating syncer up for: " << dirname;
|
| - // The underlying database structure is ready, and we should create
|
| - // the syncer.
|
| - CHECK(vault_.syncer_ == NULL);
|
| - vault_.syncer_ = new Syncer();
|
| - vault_field_changed_.Broadcast();
|
| -}
|
| -
|
| -// Sets |*connected| to false if it is currently true but |code| suggests that
|
| -// the current network configuration and/or auth state cannot be used to make
|
| -// forward progress, and user intervention (e.g changing server URL or auth
|
| -// credentials) is likely necessary. If |*connected| is false, set it to true
|
| -// if |code| suggests that we just recently made healthy contact with the
|
| -// server.
|
| -static inline void CheckConnected(bool* connected,
|
| - HttpResponse::ServerConnectionCode code,
|
| - base::ConditionVariable* condvar) {
|
| - if (*connected) {
|
| - // Note, be careful when adding cases here because if the SyncerThread
|
| - // thinks there is no valid connection as determined by this method, it
|
| - // will drop out of *all* forward progress sync loops (it won't poll and it
|
| - // will queue up Talk notifications but not actually call SyncShare) until
|
| - // some external action causes a ServerConnectionManager to broadcast that
|
| - // a valid connection has been re-established.
|
| - if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
|
| - HttpResponse::SYNC_AUTH_ERROR == code) {
|
| - *connected = false;
|
| - condvar->Broadcast();
|
| - }
|
| + if (old_job.session->source().updates_source ==
|
| + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Job failed with source continuation";
|
| + // We don't seem to have made forward progress. Start or extend backoff.
|
| + HandleConsecutiveContinuationError(old_job);
|
| + } else if (IsBackingOff()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " A nudge during backoff failed";
|
| + // We weren't continuing but we're in backoff; must have been a nudge.
|
| + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
|
| + DCHECK(!wait_interval_->had_nudge);
|
| + wait_interval_->had_nudge = true;
|
| + wait_interval_->timer.Reset();
|
| } else {
|
| - if (HttpResponse::SERVER_CONNECTION_OK == code) {
|
| - *connected = true;
|
| - condvar->Broadcast();
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " Failed. Schedule a job with continuation as source";
|
| + // We weren't continuing and we aren't in backoff. Schedule a normal
|
| + // continuation.
|
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| + ScheduleConfigImpl(old_job.session->routing_info(),
|
| + old_job.session->workers(),
|
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
|
| + } else {
|
| + // For all other purposes(nudge and poll) we schedule a retry nudge.
|
| + ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
|
| + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
|
| + old_job.session->source().types, false, FROM_HERE);
|
| }
|
| }
|
| }
|
|
|
| -void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
|
| - conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
|
| - &SyncerThread::HandleServerConnectionEvent));
|
| - CheckConnected(&vault_.connected_, conn_mgr->server_status(),
|
| - &vault_field_changed_);
|
| +void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
|
| + DCHECK(thread_.IsRunning());
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| +
|
| + TimeDelta poll = (!session_context_->notifications_enabled()) ?
|
| + syncer_short_poll_interval_seconds_ :
|
| + syncer_long_poll_interval_seconds_;
|
| + bool rate_changed = !poll_timer_.IsRunning() ||
|
| + poll != poll_timer_.GetCurrentDelay();
|
| +
|
| + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
|
| + poll_timer_.Reset();
|
| +
|
| + if (!rate_changed)
|
| + return;
|
| +
|
| + // Adjust poll rate.
|
| + poll_timer_.Stop();
|
| + poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
|
| }
|
|
|
| -void SyncerThread::HandleServerConnectionEvent(
|
| - const ServerConnectionEvent& event) {
|
| - if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
|
| - base::AutoLock lock(lock_);
|
| - CheckConnected(&vault_.connected_, event.connection_code,
|
| - &vault_field_changed_);
|
| +void SyncerThread::HandleConsecutiveContinuationError(
|
| + const SyncSessionJob& old_job) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + // This if conditions should be compiled out in retail builds.
|
| + if (IsBackingOff()) {
|
| + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
|
| }
|
| + SyncSession* old = old_job.session.get();
|
| + SyncSession* s(new SyncSession(session_context_.get(), this,
|
| + old->source(), old->routing_info(), old->workers()));
|
| + TimeDelta length = delay_provider_->GetDelay(
|
| + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
|
| +
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " In handle continuation error. Old job purpose is "
|
| + << old_job.purpose;
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " In Handle continuation error. The time delta(ms) is: "
|
| + << length.InMilliseconds();
|
| +
|
| + // This will reset the had_nudge variable as well.
|
| + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
|
| + length));
|
| + if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
|
| + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
|
| + make_linked_ptr(s), false, FROM_HERE);
|
| + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
|
| + } else {
|
| + // We are not in configuration mode. So wait_interval's pending job
|
| + // should be null.
|
| + DCHECK(wait_interval_->pending_configure_job.get() == NULL);
|
| +
|
| + // TODO(lipalani) - handle clear user data.
|
| + InitOrCoalescePendingJob(old_job);
|
| + }
|
| + wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
|
| }
|
|
|
| -int SyncerThread::GetRecommendedDelaySeconds(int base_delay_seconds) {
|
| - if (base_delay_seconds >= kMaxBackoffSeconds)
|
| - return kMaxBackoffSeconds;
|
| +// static
|
| +TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
|
| + if (last_delay.InSeconds() >= kMaxBackoffSeconds)
|
| + return TimeDelta::FromSeconds(kMaxBackoffSeconds);
|
|
|
| // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
|
| - int backoff_s =
|
| - std::max(1, base_delay_seconds * kBackoffRandomizationFactor);
|
| + int64 backoff_s =
|
| + std::max(static_cast<int64>(1),
|
| + last_delay.InSeconds() * kBackoffRandomizationFactor);
|
|
|
| // Flip a coin to randomize backoff interval by +/- 50%.
|
| int rand_sign = base::RandInt(0, 1) * 2 - 1;
|
|
|
| // Truncation is adequate for rounding here.
|
| backoff_s = backoff_s +
|
| - (rand_sign * (base_delay_seconds / kBackoffRandomizationFactor));
|
| + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
|
|
|
| // Cap the backoff interval.
|
| - backoff_s = std::max(1, std::min(backoff_s, kMaxBackoffSeconds));
|
| + backoff_s = std::max(static_cast<int64>(1),
|
| + std::min(backoff_s, kMaxBackoffSeconds));
|
|
|
| - return backoff_s;
|
| + return TimeDelta::FromSeconds(backoff_s);
|
| }
|
|
|
| -// Inputs and return value in milliseconds.
|
| -int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
|
| - // syncer_polling_interval_ is in seconds
|
| - int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
|
| -
|
| - // This is our default and lower bound.
|
| - int next_wait = syncer_polling_interval_ms;
|
| -
|
| - // Get idle time, bounded by max wait.
|
| - int idle = min(user_idle_ms, syncer_max_interval_);
|
| +void SyncerThread::Stop() {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " stop called";
|
| + syncer_->RequestEarlyExit(); // Safe to call from any thread.
|
| + session_context_->connection_manager()->RemoveListener(this);
|
| + thread_.Stop();
|
| +}
|
|
|
| - // If the user has been idle for a while, we'll start decreasing the poll
|
| - // rate.
|
| - if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
|
| - next_wait = std::min(GetRecommendedDelaySeconds(
|
| - last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
|
| +void SyncerThread::DoCanaryJob() {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job";
|
| + DoPendingJobIfPossible(true);
|
| +}
|
| +
|
| +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
|
| + SyncSessionJob* job_to_execute = NULL;
|
| + if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
|
| + && wait_interval_->pending_configure_job.get()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job";
|
| + job_to_execute = wait_interval_->pending_configure_job.get();
|
| + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job";
|
| + // Pending jobs mostly have time from the past. Reset it so this job
|
| + // will get executed.
|
| + if (pending_nudge_->scheduled_start < TimeTicks::Now())
|
| + pending_nudge_->scheduled_start = TimeTicks::Now();
|
| +
|
| + scoped_ptr<SyncSession> session(CreateSyncSession(
|
| + pending_nudge_->session->source()));
|
| +
|
| + // Also the routing info might have been changed since we cached the
|
| + // pending nudge. Update it by coalescing to the latest.
|
| + pending_nudge_->session->Coalesce(*(session.get()));
|
| + // The pending nudge would be cleared in the DoSyncSessionJob function.
|
| + job_to_execute = pending_nudge_.get();
|
| }
|
|
|
| - return next_wait;
|
| -}
|
| -
|
| -// Called with mutex_ already locked.
|
| -void SyncerThread::NudgeSyncImpl(
|
| - int milliseconds_from_now,
|
| - NudgeSource source,
|
| - const ModelTypePayloadMap& model_types_with_payloads) {
|
| - // TODO(sync): Add the option to reset the backoff state machine.
|
| - // This is needed so nudges that are a result of the user's desire
|
| - // to download updates for a new data type can be satisfied quickly.
|
| - if (vault_.current_wait_interval_.mode == WaitInterval::THROTTLED ||
|
| - vault_.current_wait_interval_.had_nudge_during_backoff) {
|
| - // Drop nudges on the floor if we've already had one since starting this
|
| - // stage of exponential backoff or we are throttled.
|
| - return;
|
| + if (job_to_execute != NULL) {
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job";
|
| + SyncSessionJob copy = *job_to_execute;
|
| + copy.is_canary_job = is_canary_job;
|
| + DoSyncSessionJob(copy);
|
| }
|
| +}
|
|
|
| - // Union the current ModelTypePayloadMap with any from nudges that may have
|
| - // already posted (coalesce the nudge datatype information).
|
| - // TODO(tim): It seems weird to do this if the sources don't match up (e.g.
|
| - // if pending_source is kLocal and |source| is kClearPrivateData).
|
| - syncable::CoalescePayloads(&vault_.pending_nudge_types_,
|
| - model_types_with_payloads);
|
| -
|
| - const TimeTicks nudge_time = TimeTicks::Now() +
|
| - TimeDelta::FromMilliseconds(milliseconds_from_now);
|
| - if (nudge_time <= vault_.pending_nudge_time_) {
|
| - VLOG(1) << "Nudge for source " << source
|
| - << " dropped due to existing later pending nudge";
|
| - return;
|
| - }
|
| +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
|
| + ModelSafeRoutingInfo routes;
|
| + std::vector<ModelSafeWorker*> workers;
|
| + session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
|
| + session_context_->registrar()->GetWorkers(&workers);
|
| + SyncSourceInfo info(source);
|
|
|
| - VLOG(1) << "Replacing pending nudge for source " << source
|
| - << " at " << nudge_time.ToInternalValue();
|
| + SyncSession* session(new SyncSession(session_context_.get(), this, info,
|
| + routes, workers));
|
|
|
| - vault_.pending_nudge_source_ = source;
|
| - vault_.pending_nudge_time_ = nudge_time;
|
| - vault_field_changed_.Broadcast();
|
| + return session;
|
| }
|
|
|
| -void SyncerThread::SetNotificationsEnabled(bool notifications_enabled) {
|
| - base::AutoLock lock(lock_);
|
| - session_context_->set_notifications_enabled(notifications_enabled);
|
| +void SyncerThread::PollTimerCallback() {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + ModelSafeRoutingInfo r;
|
| + ModelTypePayloadMap types_with_payloads =
|
| + syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
|
| + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
|
| + SyncSession* s = CreateSyncSession(info);
|
| + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
|
| + FROM_HERE);
|
| }
|
|
|
| -// Returns the amount of time since the user last interacted with the computer,
|
| -// in milliseconds
|
| -int SyncerThread::UserIdleTime() {
|
| -#if defined(OS_WIN)
|
| - LASTINPUTINFO last_input_info;
|
| - last_input_info.cbSize = sizeof(LASTINPUTINFO);
|
| -
|
| - // Get time in windows ticks since system start of last activity.
|
| - BOOL b = ::GetLastInputInfo(&last_input_info);
|
| - if (b == TRUE)
|
| - return ::GetTickCount() - last_input_info.dwTime;
|
| -#elif defined(OS_MACOSX)
|
| - // It would be great to do something like:
|
| - //
|
| - // return 1000 *
|
| - // CGEventSourceSecondsSinceLastEventType(
|
| - // kCGEventSourceStateCombinedSessionState,
|
| - // kCGAnyInputEventType);
|
| - //
|
| - // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
|
| - // and can't link that high up the food chain. Thus this mucking in IOKit.
|
| -
|
| - io_service_t hid_service =
|
| - IOServiceGetMatchingService(kIOMasterPortDefault,
|
| - IOServiceMatching("IOHIDSystem"));
|
| - if (!hid_service) {
|
| - LOG(WARNING) << "Could not obtain IOHIDSystem";
|
| - return 0;
|
| - }
|
| +void SyncerThread::Unthrottle() {
|
| + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
|
| + VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled..";
|
| + DoCanaryJob();
|
| + wait_interval_.reset();
|
| +}
|
|
|
| - CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
|
| - CFSTR("HIDIdleTime"),
|
| - kCFAllocatorDefault,
|
| - 0);
|
| - if (!object) {
|
| - LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
|
| - IOObjectRelease(hid_service);
|
| - return 0;
|
| - }
|
| +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + session_context_->NotifyListeners(SyncEngineEvent(cause));
|
| +}
|
|
|
| - int64 idle_time; // in nanoseconds
|
| - Boolean success = false;
|
| - if (CFGetTypeID(object) == CFNumberGetTypeID()) {
|
| - success = CFNumberGetValue((CFNumberRef)object,
|
| - kCFNumberSInt64Type,
|
| - &idle_time);
|
| - } else {
|
| - LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
|
| - }
|
| +bool SyncerThread::IsBackingOff() const {
|
| + return wait_interval_.get() && wait_interval_->mode ==
|
| + WaitInterval::EXPONENTIAL_BACKOFF;
|
| +}
|
| +
|
| +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
|
| + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
|
| + silenced_until - TimeTicks::Now()));
|
| + wait_interval_->timer.Start(wait_interval_->length, this,
|
| + &SyncerThread::Unthrottle);
|
| +}
|
|
|
| - CFRelease(object);
|
| - IOObjectRelease(hid_service);
|
| +bool SyncerThread::IsSyncingCurrentlySilenced() {
|
| + return wait_interval_.get() && wait_interval_->mode ==
|
| + WaitInterval::THROTTLED;
|
| +}
|
|
|
| - if (!success) {
|
| - LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
|
| - return 0;
|
| - }
|
| - return idle_time / 1000000; // nano to milli
|
| -#elif defined(OS_LINUX)
|
| - if (idle_query_.get())
|
| - return idle_query_->IdleTime();
|
| - return 0;
|
| -#else
|
| - static bool was_logged = false;
|
| - if (!was_logged) {
|
| - was_logged = true;
|
| - VLOG(1) << "UserIdleTime unimplemented on this platform, synchronization "
|
| - "will not throttle when user idle";
|
| - }
|
| -#endif
|
| +void SyncerThread::OnReceivedShortPollIntervalUpdate(
|
| + const base::TimeDelta& new_interval) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + syncer_short_poll_interval_seconds_ = new_interval;
|
| +}
|
|
|
| - return 0;
|
| +void SyncerThread::OnReceivedLongPollIntervalUpdate(
|
| + const base::TimeDelta& new_interval) {
|
| + DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
|
| + syncer_long_poll_interval_seconds_ = new_interval;
|
| +}
|
| +
|
| +void SyncerThread::OnShouldStopSyncingPermanently() {
|
| + VLOG(2) << "SyncerThread(" << this << ")"
|
| + << " OnShouldStopSyncingPermanently";
|
| + syncer_->RequestEarlyExit(); // Thread-safe.
|
| + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
|
| +}
|
| +
|
| +void SyncerThread::OnServerConnectionEvent(
|
| + const ServerConnectionEvent2& event) {
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
|
| + &SyncerThread::CheckServerConnectionManagerStatus,
|
| + event.connection_code));
|
| +}
|
| +
|
| +void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
|
| + session_context_->set_notifications_enabled(notifications_enabled);
|
| }
|
|
|
| -} // namespace browser_sync
|
| +} // browser_sync
|
|
|