Index: chrome/browser/sync/engine/syncer_thread.cc |
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc |
index 5851c45364547141653b8f1f676654e79baf1b89..7966f6713fdd2a404be10aa3043891ee5096a80b 100644 |
--- a/chrome/browser/sync/engine/syncer_thread.cc |
+++ b/chrome/browser/sync/engine/syncer_thread.cc |
@@ -5,27 +5,10 @@ |
#include "chrome/browser/sync/engine/syncer_thread.h" |
#include <algorithm> |
-#include <queue> |
-#include <string> |
-#include <vector> |
#include "base/rand_util.h" |
-#include "base/third_party/dynamic_annotations/dynamic_annotations.h" |
-#include "build/build_config.h" |
-#include "chrome/browser/sync/engine/model_safe_worker.h" |
-#include "chrome/browser/sync/engine/net/server_connection_manager.h" |
#include "chrome/browser/sync/engine/syncer.h" |
-#include "chrome/browser/sync/sessions/sync_session.h" |
-#if defined(OS_MACOSX) |
-#include <CoreFoundation/CFNumber.h> |
-#include <IOKit/IOTypes.h> |
-#include <IOKit/IOKitLib.h> |
-#endif |
- |
-using std::priority_queue; |
-using std::min; |
-using base::Time; |
using base::TimeDelta; |
using base::TimeTicks; |
@@ -35,861 +18,843 @@ using sessions::SyncSession; |
using sessions::SyncSessionSnapshot; |
using sessions::SyncSourceInfo; |
using syncable::ModelTypePayloadMap; |
+using syncable::ModelTypeBitSet; |
+using sync_pb::GetUpdatesCallerInfo; |
-// We use high values here to ensure that failure to receive poll updates from |
-// the server doesn't result in rapid-fire polling from the client due to low |
-// local limits. |
-const int SyncerThread::kDefaultShortPollIntervalSeconds = 3600 * 8; |
-const int SyncerThread::kDefaultLongPollIntervalSeconds = 3600 * 12; |
- |
-// TODO(tim): This is used to regulate the short poll (when notifications are |
-// disabled) based on user idle time. If it is set to a smaller value than |
-// the short poll interval, it basically does nothing; for now, this is what |
-// we want and allows stronger control over the poll rate from the server. We |
-// should probably re-visit this code later and figure out if user idle time |
-// is really something we want and make sure it works, if it is. |
-const int SyncerThread::kDefaultMaxPollIntervalMs = 30 * 60 * 1000; |
- |
-// Backoff interval randomization factor. |
-static const int kBackoffRandomizationFactor = 2; |
- |
-const int SyncerThread::kMaxBackoffSeconds = 60 * 60 * 4; // 4 hours. |
- |
-SyncerThread::ProtectedFields::ProtectedFields() |
- : stop_syncer_thread_(false), |
- pause_requested_(false), |
- paused_(false), |
- syncer_(NULL), |
- connected_(false), |
- pending_nudge_source_(kUnknown) {} |
- |
-SyncerThread::ProtectedFields::~ProtectedFields() {} |
- |
-void SyncerThread::NudgeSyncerWithPayloads( |
- int milliseconds_from_now, |
- NudgeSource source, |
- const ModelTypePayloadMap& model_types_with_payloads) { |
- base::AutoLock lock(lock_); |
- if (vault_.syncer_ == NULL) { |
- return; |
- } |
+SyncerThread::DelayProvider::DelayProvider() {} |
+SyncerThread::DelayProvider::~DelayProvider() {} |
- NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads); |
-} |
+SyncerThread::WaitInterval::WaitInterval() {} |
+SyncerThread::WaitInterval::~WaitInterval() {} |
-void SyncerThread::NudgeSyncerWithDataTypes( |
- int milliseconds_from_now, |
- NudgeSource source, |
- const syncable::ModelTypeBitSet& model_types) { |
- base::AutoLock lock(lock_); |
- if (vault_.syncer_ == NULL) { |
- return; |
- } |
+SyncerThread::SyncSessionJob::SyncSessionJob() {} |
+SyncerThread::SyncSessionJob::~SyncSessionJob() {} |
- ModelTypePayloadMap model_types_with_payloads = |
- syncable::ModelTypePayloadMapFromBitSet(model_types, std::string()); |
- NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads); |
+SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
+ base::TimeTicks start, |
+ linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
+ const tracked_objects::Location& nudge_location) : purpose(purpose), |
+ scheduled_start(start), |
+ session(session), |
+ is_canary_job(is_canary_job), |
+ nudge_location(nudge_location) { |
} |
-void SyncerThread::NudgeSyncer( |
- int milliseconds_from_now, |
+TimeDelta SyncerThread::DelayProvider::GetDelay( |
+ const base::TimeDelta& last_delay) { |
+ return SyncerThread::GetRecommendedDelay(last_delay); |
+} |
+ |
+GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
NudgeSource source) { |
- base::AutoLock lock(lock_); |
- if (vault_.syncer_ == NULL) { |
- return; |
+ switch (source) { |
+ case NUDGE_SOURCE_NOTIFICATION: |
+ return GetUpdatesCallerInfo::NOTIFICATION; |
+ case NUDGE_SOURCE_LOCAL: |
+ return GetUpdatesCallerInfo::LOCAL; |
+ case NUDGE_SOURCE_CONTINUATION: |
+ return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
+ case NUDGE_SOURCE_UNKNOWN: |
+ return GetUpdatesCallerInfo::UNKNOWN; |
+ default: |
+ NOTREACHED(); |
+ return GetUpdatesCallerInfo::UNKNOWN; |
} |
- |
- // Set all enabled datatypes. |
- ModelSafeRoutingInfo routes; |
- session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
- ModelTypePayloadMap model_types_with_payloads = |
- syncable::ModelTypePayloadMapFromRoutingInfo(routes, std::string()); |
- NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads); |
} |
-SyncerThread::SyncerThread(sessions::SyncSessionContext* context) |
- : thread_main_started_(false, false), |
- thread_("SyncEngine_SyncerThread"), |
- vault_field_changed_(&lock_), |
- conn_mgr_hookup_(NULL), |
- syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), |
- syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), |
- syncer_polling_interval_(kDefaultShortPollIntervalSeconds), |
- syncer_max_interval_(kDefaultMaxPollIntervalMs), |
- session_context_(context), |
- disable_idle_detection_(false) { |
- DCHECK(context); |
+SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
+ : mode(mode), had_nudge(false), length(length) { } |
- if (context->connection_manager()) |
- WatchConnectionManager(context->connection_manager()); |
+SyncerThread::SyncerThread(sessions::SyncSessionContext* context, |
+ Syncer* syncer) |
+ : thread_("SyncEngine_SyncerThread"), |
+ syncer_short_poll_interval_seconds_( |
+ TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
+ syncer_long_poll_interval_seconds_( |
+ TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
+ mode_(NORMAL_MODE), |
+ server_connection_ok_(false), |
+ delay_provider_(new DelayProvider()), |
+ syncer_(syncer), |
+ session_context_(context) { |
} |
SyncerThread::~SyncerThread() { |
- conn_mgr_hookup_.reset(); |
- delete vault_.syncer_; |
- CHECK(!thread_.IsRunning()); |
-} |
- |
-// Creates and starts a syncer thread. |
-// Returns true if it creates a thread or if there's currently a thread running |
-// and false otherwise. |
-bool SyncerThread::Start() { |
- { |
- base::AutoLock lock(lock_); |
- if (thread_.IsRunning()) { |
- return true; |
- } |
+ DCHECK(!thread_.IsRunning()); |
+} |
+ |
+void SyncerThread::CheckServerConnectionManagerStatus( |
+ HttpResponse::ServerConnectionCode code) { |
+ |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." |
+ << "Old mode: " << server_connection_ok_ << " Code: " << code; |
+ // Note, be careful when adding cases here because if the SyncerThread |
+ // thinks there is no valid connection as determined by this method, it |
+ // will drop out of *all* forward progress sync loops (it won't poll and it |
+ // will queue up Talk notifications but not actually call SyncShare) until |
+ // some external action causes a ServerConnectionManager to broadcast that |
+ // a valid connection has been re-established. |
+ if (HttpResponse::CONNECTION_UNAVAILABLE == code || |
+ HttpResponse::SYNC_AUTH_ERROR == code) { |
+ server_connection_ok_ = false; |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." |
+ << " new mode:" << server_connection_ok_; |
+ } else if (HttpResponse::SERVER_CONNECTION_OK == code) { |
+ server_connection_ok_ = true; |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." |
+ << " new mode:" << server_connection_ok_; |
+ DoCanaryJob(); |
+ } |
+} |
+void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread " |
+ << MessageLoop::current()->thread_name(); |
+ if (!thread_.IsRunning()) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode " |
+ << mode; |
if (!thread_.Start()) { |
- return false; |
+ NOTREACHED() << "Unable to start SyncerThread."; |
+ return; |
} |
+ WatchConnectionManager(); |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &SyncerThread::SendInitialSnapshot)); |
} |
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
- &SyncerThread::ThreadMain)); |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = " |
+ << mode; |
- // Wait for notification that our task makes it safely onto the message |
- // loop before returning, so the caller can't call Stop before we're |
- // actually up and running. This is for consistency with the old pthread |
- // impl because pthread_create would do this in one step. |
- thread_main_started_.Wait(); |
- VLOG(1) << "SyncerThread started."; |
- return true; |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); |
} |
-// Stop processing. A max wait of at least 2*server RTT time is recommended. |
-// Returns true if we stopped, false otherwise. |
-bool SyncerThread::Stop(int max_wait) { |
- RequestSyncerExitAndSetThreadStopConditions(); |
- |
- // This will join, and finish when ThreadMain terminates. |
- thread_.Stop(); |
- return true; |
+void SyncerThread::SendInitialSnapshot() { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, |
+ SyncSourceInfo(), ModelSafeRoutingInfo(), |
+ std::vector<ModelSafeWorker*>())); |
+ SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
+ sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); |
+ event.snapshot = &snapshot; |
+ session_context_->NotifyListeners(event); |
} |
-void SyncerThread::RequestSyncerExitAndSetThreadStopConditions() { |
- { |
- base::AutoLock lock(lock_); |
- // If the thread has been started, then we either already have or are about |
- // to enter ThreadMainLoop so we have to proceed with shutdown and wait for |
- // it to finish. If the thread has not been started --and we now own the |
- // lock-- then we can early out because the caller has not called Start(). |
- if (!thread_.IsRunning()) |
- return; |
+void SyncerThread::WatchConnectionManager() { |
+ ServerConnectionManager* scm = session_context_->connection_manager(); |
+ CheckServerConnectionManagerStatus(scm->server_status()); |
+ scm->AddListener(this); |
+} |
- VLOG(1) << "SyncerThread::Stop - setting ThreadMain exit condition to true " |
- "(vault_.stop_syncer_thread_)"; |
- // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit |
- // below). |
- vault_.stop_syncer_thread_ = true; |
- if (NULL != vault_.syncer_) { |
- // Try to early exit the syncer itself, which could be looping inside |
- // SyncShare. |
- vault_.syncer_->RequestEarlyExit(); |
- } |
+void SyncerThread::StartImpl(Mode mode, |
+ linked_ptr<ModeChangeCallback> callback) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " |
+ << mode; |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ DCHECK(!session_context_->account_name().empty()); |
+ DCHECK(syncer_.get()); |
+ mode_ = mode; |
+ AdjustPolling(NULL); // Will kick start poll timer if needed. |
+ if (callback.get()) |
+ callback->Run(); |
- // stop_syncer_thread_ is now true and the Syncer has been told to exit. |
- // We want to wake up all waiters so they can re-examine state. We signal, |
- // causing all waiters to try to re-acquire the lock, and then we release |
- // the lock, and join on our internal thread which should soon run off the |
- // end of ThreadMain. |
- vault_field_changed_.Broadcast(); |
- } |
+ // We just changed our mode. See if there are any pending jobs that we could |
+ // execute in the new mode. |
+ DoPendingJobIfPossible(false); |
} |
-bool SyncerThread::RequestPause() { |
- base::AutoLock lock(lock_); |
- if (vault_.pause_requested_ || vault_.paused_) |
- return false; |
+SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( |
+ const SyncSessionJob& job) { |
- if (thread_.IsRunning()) { |
- // Set the pause request. The syncer thread will read this |
- // request, enter the paused state, and send the PAUSED |
- // notification. |
- vault_.pause_requested_ = true; |
- vault_field_changed_.Broadcast(); |
- VLOG(1) << "Pause requested."; |
- } else { |
- // If the thread is not running, go directly into the paused state |
- // and notify. |
- EnterPausedState(); |
- VLOG(1) << "Paused while not running."; |
- } |
- return true; |
-} |
+ DCHECK(wait_interval_.get()); |
+ DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); |
-void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { |
- session_context_->NotifyListeners(SyncEngineEvent(cause)); |
-} |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : " |
+ << wait_interval_->mode << "Wait interval had nudge : " |
+ << wait_interval_->had_nudge << "is canary job : " |
+ << job.is_canary_job; |
-bool SyncerThread::RequestResume() { |
- base::AutoLock lock(lock_); |
- // Only valid to request a resume when we are already paused or we |
- // have a pause pending. |
- if (!(vault_.paused_ || vault_.pause_requested_)) |
- return false; |
- |
- if (thread_.IsRunning()) { |
- if (vault_.pause_requested_) { |
- // If pause was requested we have not yet paused. In this case, |
- // the resume cancels the pause request. |
- vault_.pause_requested_ = false; |
- vault_field_changed_.Broadcast(); |
- Notify(SyncEngineEvent::SYNCER_THREAD_RESUMED); |
- VLOG(1) << "Pending pause canceled by resume."; |
- } else { |
- // Unpause and notify. |
- vault_.paused_ = false; |
- vault_field_changed_.Broadcast(); |
- } |
- } else { |
- ExitPausedState(); |
- VLOG(1) << "Resumed while not running."; |
- } |
- return true; |
-} |
+ if (job.purpose == SyncSessionJob::POLL) |
+ return DROP; |
-void SyncerThread::OnReceivedLongPollIntervalUpdate( |
- const base::TimeDelta& new_interval) { |
- syncer_long_poll_interval_seconds_ = static_cast<int>( |
- new_interval.InSeconds()); |
-} |
+ DCHECK(job.purpose == SyncSessionJob::NUDGE || |
+ job.purpose == SyncSessionJob::CONFIGURATION); |
+ if (wait_interval_->mode == WaitInterval::THROTTLED) |
+ return SAVE; |
-void SyncerThread::OnReceivedShortPollIntervalUpdate( |
- const base::TimeDelta& new_interval) { |
- syncer_short_poll_interval_seconds_ = static_cast<int>( |
- new_interval.InSeconds()); |
-} |
+ DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
+ if (job.purpose == SyncSessionJob::NUDGE) { |
+ if (mode_ == CONFIGURATION_MODE) |
+ return SAVE; |
-void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { |
- silenced_until_ = silenced_until; |
+ // If we already had one nudge then just drop this nudge. We will retry |
+ // later when the timer runs out. |
+ return wait_interval_->had_nudge ? DROP : CONTINUE; |
+ } |
+ // This is a config job. |
+ return job.is_canary_job ? CONTINUE : SAVE; |
} |
-bool SyncerThread::IsSyncingCurrentlySilenced() { |
- // We should ignore reads from silenced_until_ under ThreadSanitizer |
- // since this is a benign race. |
- ANNOTATE_IGNORE_READS_BEGIN(); |
- bool ret = (silenced_until_ - TimeTicks::Now()) >= TimeDelta::FromSeconds(0); |
- ANNOTATE_IGNORE_READS_END(); |
- return ret; |
-} |
+SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( |
+ const SyncSessionJob& job) { |
+ if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) |
+ return CONTINUE; |
-void SyncerThread::OnShouldStopSyncingPermanently() { |
- RequestSyncerExitAndSetThreadStopConditions(); |
- Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
-} |
+ if (wait_interval_.get()) |
+ return DecideWhileInWaitInterval(job); |
-void SyncerThread::ThreadMainLoop() { |
- // This is called with lock_ acquired. |
- lock_.AssertAcquired(); |
- VLOG(1) << "In thread main loop."; |
- |
- // Use the short poll value by default. |
- vault_.current_wait_interval_.poll_delta = |
- TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); |
- int user_idle_milliseconds = 0; |
- TimeTicks last_sync_time; |
- bool initial_sync_for_thread = true; |
- bool continue_sync_cycle = false; |
- |
-#if defined(OS_LINUX) |
- idle_query_.reset(new IdleQueryLinux()); |
-#endif |
- |
- if (vault_.syncer_ == NULL) { |
- VLOG(1) << "Syncer thread waiting for database initialization."; |
- while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) |
- vault_field_changed_.Wait(); |
- VLOG_IF(1, !(vault_.syncer_ == NULL)) << "Syncer was found after DB " |
- "started."; |
+ if (mode_ == CONFIGURATION_MODE) { |
+ if (job.purpose == SyncSessionJob::NUDGE) |
+ return SAVE; |
+ else if (job.purpose == SyncSessionJob::CONFIGURATION) |
+ return CONTINUE; |
+ else |
+ return DROP; |
} |
- while (!vault_.stop_syncer_thread_) { |
- // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as |
- // below) because we cannot poll until these conditions are met, so we wait |
- // indefinitely. |
- |
- // If we are not connected, enter WaitUntilConnectedOrQuit() which |
- // will return only when the network is connected or a quit is |
- // requested. Note that it is possible to exit |
- // WaitUntilConnectedOrQuit() in the paused state which will be |
- // handled by the next statement. |
- if (!vault_.connected_ && !initial_sync_for_thread) { |
- WaitUntilConnectedOrQuit(); |
- continue; |
- } |
+ // We are in normal mode. |
+ DCHECK_EQ(mode_, NORMAL_MODE); |
+ DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); |
- // Check if we should be paused or if a pause was requested. Note |
- // that we don't check initial_sync_for_thread here since we want |
- // the pause to happen regardless if it is the initial sync or not. |
- if (vault_.pause_requested_ || vault_.paused_) { |
- PauseUntilResumedOrQuit(); |
- continue; |
- } |
+ // Freshness condition |
+ if (job.scheduled_start < last_sync_session_end_time_) { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Dropping job because of freshness"; |
+ return DROP; |
+ } |
- const TimeTicks next_poll = last_sync_time + |
- vault_.current_wait_interval_.poll_delta; |
- bool throttled = vault_.current_wait_interval_.mode == |
- WaitInterval::THROTTLED; |
- // If we are throttled, we must wait. Otherwise, wait until either the next |
- // nudge (if one exists) or the poll interval. |
- TimeTicks end_wait = next_poll; |
- if (!throttled && !vault_.pending_nudge_time_.is_null()) { |
- end_wait = std::min(end_wait, vault_.pending_nudge_time_); |
- } |
- VLOG(1) << "end_wait is " << end_wait.ToInternalValue() |
- << "\nnext_poll is " << next_poll.ToInternalValue(); |
- |
- // We block until the CV is signaled (e.g a control field changed, loss of |
- // network connection, nudge, spurious, etc), or the poll interval elapses. |
- TimeDelta sleep_time = end_wait - TimeTicks::Now(); |
- if (!initial_sync_for_thread && sleep_time > TimeDelta::FromSeconds(0)) { |
- vault_field_changed_.TimedWait(sleep_time); |
- |
- if (TimeTicks::Now() < end_wait) { |
- // Didn't timeout. Could be a spurious signal, or a signal corresponding |
- // to an actual change in one of our control fields. By continuing here |
- // we perform the typical "always recheck conditions when signaled", |
- // (typically handled by a while(condition_not_met) cv.wait() construct) |
- // because we jump to the top of the loop. The main difference is we |
- // recalculate the wait interval, but last_sync_time won't have changed. |
- // So if we were signaled by a nudge (for ex.) we'll grab the new nudge |
- // off the queue and wait for that delta. If it was a spurious signal, |
- // we'll keep waiting for the same moment in time as we just were. |
- continue; |
- } |
- } |
+ if (server_connection_ok_) |
+ return CONTINUE; |
- // Handle a nudge, caused by either a notification or a local bookmark |
- // event. This will also update the source of the following SyncMain call. |
- VLOG(1) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); |
- bool nudged = false; |
- scoped_ptr<SyncSession> session; |
- session.reset(SyncMain(vault_.syncer_, |
- throttled, continue_sync_cycle, &initial_sync_for_thread, &nudged)); |
- |
- // Update timing information for how often these datatypes are triggering |
- // nudges. |
- base::TimeTicks now = TimeTicks::Now(); |
- if (!last_sync_time.is_null()) { |
- ModelTypePayloadMap::const_iterator iter; |
- for (iter = session->source().types.begin(); |
- iter != session->source().types.end(); |
- ++iter) { |
- syncable::PostTimeToTypeHistogram(iter->first, |
- now - last_sync_time); |
- } |
- } |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Bad server connection. Using that to decide on job."; |
+ return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; |
+} |
+ |
+void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
+ DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
+ if (pending_nudge_.get() == NULL) { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Creating a pending nudge job"; |
+ SyncSession* s = job.session.get(); |
+ scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
+ s->delegate(), s->source(), s->routing_info(), s->workers())); |
- last_sync_time = now; |
+ SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
+ make_linked_ptr(session.release()), false, job.nudge_location); |
+ pending_nudge_.reset(new SyncSessionJob(new_job)); |
- VLOG(1) << "Updating the next polling time after SyncMain"; |
- vault_.current_wait_interval_ = CalculatePollingWaitTime( |
- static_cast<int>(vault_.current_wait_interval_.poll_delta.InSeconds()), |
- &user_idle_milliseconds, &continue_sync_cycle, nudged); |
+ return; |
} |
-#if defined(OS_LINUX) |
- idle_query_.reset(); |
-#endif |
-} |
-void SyncerThread::SetConnected(bool connected) { |
- DCHECK(!thread_.IsRunning()); |
- vault_.connected_ = connected; |
-} |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; |
+ pending_nudge_->session->Coalesce(*(job.session.get())); |
+ pending_nudge_->scheduled_start = job.scheduled_start; |
-void SyncerThread::SetSyncerPollingInterval(base::TimeDelta interval) { |
- // TODO(timsteele): Use TimeDelta internally. |
- syncer_polling_interval_ = static_cast<int>(interval.InSeconds()); |
+ // Unfortunately the nudge location cannot be modified. So it stores the |
+ // location of the first caller. |
} |
-void SyncerThread::SetSyncerShortPollInterval(base::TimeDelta interval) { |
- // TODO(timsteele): Use TimeDelta internally. |
- syncer_short_poll_interval_seconds_ = |
- static_cast<int>(interval.InSeconds()); |
-} |
+bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { |
+ JobProcessDecision decision = DecideOnJob(job); |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: " |
+ << decision << " Job purpose " << job.purpose << "mode " << mode_; |
+ if (decision != SAVE) |
+ return decision == CONTINUE; |
-void SyncerThread::WaitUntilConnectedOrQuit() { |
- VLOG(1) << "Syncer thread waiting for connection."; |
- Notify(SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION); |
+ DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == |
+ SyncSessionJob::CONFIGURATION); |
- bool is_paused = vault_.paused_; |
+ SaveJob(job); |
+ return false; |
+} |
- while (!vault_.connected_ && !vault_.stop_syncer_thread_) { |
- if (!is_paused && vault_.pause_requested_) { |
- // If we get a pause request while waiting for a connection, |
- // enter the paused state. |
- EnterPausedState(); |
- is_paused = true; |
- VLOG(1) << "Syncer thread entering disconnected pause."; |
- } |
+void SyncerThread::SaveJob(const SyncSessionJob& job) { |
+ DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); |
+ if (job.purpose == SyncSessionJob::NUDGE) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job"; |
+ InitOrCoalescePendingJob(job); |
+ } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job"; |
+ DCHECK(wait_interval_.get()); |
+ DCHECK(mode_ == CONFIGURATION_MODE); |
- if (is_paused && !vault_.paused_) { |
- ExitPausedState(); |
- is_paused = false; |
- VLOG(1) << "Syncer thread exiting disconnected pause."; |
- } |
+ SyncSession* old = job.session.get(); |
+ SyncSession* s(new SyncSession(session_context_.get(), this, |
+ old->source(), old->routing_info(), old->workers())); |
+ SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
+ make_linked_ptr(s), false, job.nudge_location); |
+ wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
+ } // drop the rest. |
+} |
- vault_field_changed_.Wait(); |
+// Functor for std::find_if to search by ModelSafeGroup. |
+struct ModelSafeWorkerGroupIs { |
+ explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
+ bool operator()(ModelSafeWorker* w) { |
+ return group == w->GetModelSafeGroup(); |
} |
+ ModelSafeGroup group; |
+}; |
- if (!vault_.stop_syncer_thread_) { |
- Notify(SyncEngineEvent::SYNCER_THREAD_CONNECTED); |
- VLOG(1) << "Syncer thread found connection."; |
+void SyncerThread::ScheduleClearUserData() { |
+ if (!thread_.IsRunning()) { |
+ NOTREACHED(); |
+ return; |
} |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &SyncerThread::ScheduleClearUserDataImpl)); |
} |
-void SyncerThread::PauseUntilResumedOrQuit() { |
- VLOG(1) << "Syncer thread entering pause."; |
- // If pause was requested (rather than already being paused), send |
- // the PAUSED notification. |
- if (vault_.pause_requested_) |
- EnterPausedState(); |
- |
- // Thread will get stuck here until either a resume is requested |
- // or shutdown is started. |
- while (vault_.paused_ && !vault_.stop_syncer_thread_) |
- vault_field_changed_.Wait(); |
+void SyncerThread::ScheduleNudge(const TimeDelta& delay, |
+ NudgeSource source, const ModelTypeBitSet& types, |
+ const tracked_objects::Location& nudge_location) { |
+ if (!thread_.IsRunning()) { |
+ NOTREACHED(); |
+ return; |
+ } |
- // Notify that we have resumed if we are not shutting down. |
- if (!vault_.stop_syncer_thread_) |
- ExitPausedState(); |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled"; |
- VLOG(1) << "Syncer thread exiting pause."; |
+ ModelTypePayloadMap types_with_payloads = |
+ syncable::ModelTypePayloadMapFromBitSet(types, std::string()); |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &SyncerThread::ScheduleNudgeImpl, delay, |
+ GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
+ nudge_location)); |
} |
-void SyncerThread::EnterPausedState() { |
- lock_.AssertAcquired(); |
- vault_.pause_requested_ = false; |
- vault_.paused_ = true; |
- vault_field_changed_.Broadcast(); |
- Notify(SyncEngineEvent::SYNCER_THREAD_PAUSED); |
-} |
+void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, |
+ NudgeSource source, const ModelTypePayloadMap& types_with_payloads, |
+ const tracked_objects::Location& nudge_location) { |
+ if (!thread_.IsRunning()) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; |
-void SyncerThread::ExitPausedState() { |
- lock_.AssertAcquired(); |
- vault_.paused_ = false; |
- vault_field_changed_.Broadcast(); |
- Notify(SyncEngineEvent::SYNCER_THREAD_RESUMED); |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &SyncerThread::ScheduleNudgeImpl, delay, |
+ GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
+ nudge_location)); |
} |
-void SyncerThread::DisableIdleDetection() { |
- disable_idle_detection_ = true; |
+void SyncerThread::ScheduleClearUserDataImpl() { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ SyncSession* session = new SyncSession(session_context_.get(), this, |
+ SyncSourceInfo(), ModelSafeRoutingInfo(), |
+ std::vector<ModelSafeWorker*>()); |
+ ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
+ SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); |
} |
-// We check how long the user's been idle and sync less often if the machine is |
-// not in use. The aim is to reduce server load. |
-SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime( |
- int last_poll_wait, // Time in seconds. |
- int* user_idle_milliseconds, |
- bool* continue_sync_cycle, |
- bool was_nudged) { |
- lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock. |
- WaitInterval return_interval; |
+void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
+ GetUpdatesCallerInfo::GetUpdatesSource source, |
+ const ModelTypePayloadMap& types_with_payloads, |
+ bool is_canary_job, const tracked_objects::Location& nudge_location) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
- // Server initiated throttling trumps everything. |
- if (!silenced_until_.is_null()) { |
- // We don't need to reset other state, it can continue where it left off. |
- return_interval.mode = WaitInterval::THROTTLED; |
- return_interval.poll_delta = silenced_until_ - TimeTicks::Now(); |
- return return_interval; |
- } |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; |
+ // Note we currently nudge for all types regardless of the ones incurring |
+ // the nudge. Doing different would throw off some syncer commands like |
+ // CleanupDisabledTypes. We may want to change this in the future. |
+ SyncSourceInfo info(source, types_with_payloads); |
- bool is_continuing_sync_cyle = *continue_sync_cycle; |
- *continue_sync_cycle = false; |
+ SyncSession* session(CreateSyncSession(info)); |
+ SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
+ make_linked_ptr(session), is_canary_job, |
+ nudge_location); |
- // Determine if the syncer has unfinished work to do. |
- SyncSessionSnapshot* snapshot = session_context_->previous_session_snapshot(); |
- const bool syncer_has_work_to_do = snapshot && |
- (snapshot->num_server_changes_remaining > 0 || |
- snapshot->unsynced_count > 0); |
- VLOG(1) << "syncer_has_work_to_do is " << syncer_has_work_to_do; |
+ session = NULL; |
+ if (!ShouldRunJob(job)) |
+ return; |
- // First calculate the expected wait time, figuring in any backoff because of |
- // user idle time. next_wait is in seconds |
- syncer_polling_interval_ = (!session_context_->notifications_enabled()) ? |
- syncer_short_poll_interval_seconds_ : |
- syncer_long_poll_interval_seconds_; |
- int default_next_wait = syncer_polling_interval_; |
- return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait); |
- |
- if (syncer_has_work_to_do) { |
- // Provide exponential backoff due to consecutive errors, else attempt to |
- // complete the work as soon as possible. |
- if (is_continuing_sync_cyle) { |
- return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF; |
- if (was_nudged && vault_.current_wait_interval_.mode == |
- WaitInterval::EXPONENTIAL_BACKOFF) { |
- // We were nudged, it failed, and we were already in backoff. |
- return_interval.had_nudge_during_backoff = true; |
- // Keep exponent for exponential backoff the same in this case. |
- return_interval.poll_delta = vault_.current_wait_interval_.poll_delta; |
- } else { |
- // We weren't nudged, or we were in a NORMAL wait interval until now. |
- return_interval.poll_delta = TimeDelta::FromSeconds( |
- GetRecommendedDelaySeconds(last_poll_wait)); |
- } |
- } else { |
- // No consecutive error. |
- return_interval.poll_delta = TimeDelta::FromSeconds( |
- GetRecommendedDelaySeconds(0)); |
+ if (pending_nudge_.get()) { |
+ if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because" |
+ << "we are in backoff"; |
+ return; |
} |
- *continue_sync_cycle = true; |
- } else if (!session_context_->notifications_enabled()) { |
- // Ensure that we start exponential backoff from our base polling |
- // interval when we are not continuing a sync cycle. |
- last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); |
- |
- // Did the user start interacting with the computer again? |
- // If so, revise our idle time (and probably next_sync_time) downwards |
- int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); |
- if (new_idle_time < *user_idle_milliseconds) { |
- *user_idle_milliseconds = new_idle_time; |
+ |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; |
+ pending_nudge_->session->Coalesce(*(job.session.get())); |
+ |
+ if (!IsBackingOff()) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because" |
+ << " we are not in backoff and the job was coalesced"; |
+ return; |
+ } else { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Rescheduling pending nudge"; |
+ SyncSession* s = pending_nudge_->session.get(); |
+ job.session.reset(new SyncSession(s->context(), s->delegate(), |
+ s->source(), s->routing_info(), s->workers())); |
+ pending_nudge_.reset(); |
} |
- return_interval.poll_delta = TimeDelta::FromMilliseconds( |
- CalculateSyncWaitTime(last_poll_wait * 1000, |
- *user_idle_milliseconds)); |
- DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait); |
} |
- VLOG(1) << "Sync wait: idle " << default_next_wait |
- << " non-idle or backoff " << return_interval.poll_delta.InSeconds(); |
- |
- return return_interval; |
+ // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. |
+ ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), |
+ nudge_location); |
} |
-void SyncerThread::ThreadMain() { |
- base::AutoLock lock(lock_); |
- // Signal Start() to let it know we've made it safely onto the message loop, |
- // and unblock it's caller. |
- thread_main_started_.Signal(); |
- ThreadMainLoop(); |
- VLOG(1) << "Syncer thread ThreadMain is done."; |
- Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); |
-} |
+// Helper to extract the routing info and workers corresponding to types in |
+// |types| from |registrar|. |
+void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, |
+ ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, |
+ std::vector<ModelSafeWorker*>* workers) { |
+ ModelSafeRoutingInfo r_tmp; |
+ std::vector<ModelSafeWorker*> w_tmp; |
+ registrar->GetModelSafeRoutingInfo(&r_tmp); |
+ registrar->GetWorkers(&w_tmp); |
-SyncSession* SyncerThread::SyncMain(Syncer* syncer, bool was_throttled, |
- bool continue_sync_cycle, bool* initial_sync_for_thread, |
- bool* was_nudged) { |
- CHECK(syncer); |
+ typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
+ for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { |
+ if (!types.test(i)) |
+ continue; |
+ syncable::ModelType t = syncable::ModelTypeFromInt(i); |
+ DCHECK_EQ(1U, r_tmp.count(t)); |
+ (*routes)[t] = r_tmp[t]; |
+ iter it = std::find_if(w_tmp.begin(), w_tmp.end(), |
+ ModelSafeWorkerGroupIs(r_tmp[t])); |
+ if (it != w_tmp.end()) |
+ workers->push_back(*it); |
+ else |
+ NOTREACHED(); |
+ } |
+ |
+ iter it = std::find_if(w_tmp.begin(), w_tmp.end(), |
+ ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
+ if (it != w_tmp.end()) |
+ workers->push_back(*it); |
+ else |
+ NOTREACHED(); |
+} |
- // Since we are initiating a new session for which we are the delegate, we |
- // are not currently silenced so reset this state for the next session which |
- // may need to use it. |
- silenced_until_ = base::TimeTicks(); |
+void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { |
+ if (!thread_.IsRunning()) { |
+ NOTREACHED(); |
+ return; |
+ } |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config"; |
ModelSafeRoutingInfo routes; |
std::vector<ModelSafeWorker*> workers; |
- session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
- session_context_->registrar()->GetWorkers(&workers); |
- SyncSourceInfo info(GetAndResetNudgeSource(was_throttled, |
- continue_sync_cycle, initial_sync_for_thread, was_nudged)); |
- scoped_ptr<SyncSession> session; |
- |
- base::AutoUnlock unlock(lock_); |
- do { |
- session.reset(new SyncSession(session_context_.get(), this, |
- info, routes, workers)); |
- VLOG(1) << "Calling SyncShare."; |
- syncer->SyncShare(session.get()); |
- } while (session->HasMoreToSync() && silenced_until_.is_null()); |
- |
- VLOG(1) << "Done calling SyncShare."; |
- return session.release(); |
-} |
- |
-SyncSourceInfo SyncerThread::GetAndResetNudgeSource(bool was_throttled, |
- bool continue_sync_cycle, |
- bool* initial_sync, |
- bool* was_nudged) { |
- bool nudged = false; |
- NudgeSource nudge_source = kUnknown; |
- ModelTypePayloadMap model_types_with_payloads; |
- // Has the previous sync cycle completed? |
- if (continue_sync_cycle) |
- nudge_source = kContinuation; |
- // Update the nudge source if a new nudge has come through during the |
- // previous sync cycle. |
- if (!vault_.pending_nudge_time_.is_null()) { |
- if (!was_throttled) { |
- nudge_source = vault_.pending_nudge_source_; |
- model_types_with_payloads = vault_.pending_nudge_types_; |
- nudged = true; |
- } |
- VLOG(1) << "Clearing pending nudge from " << vault_.pending_nudge_source_ |
- << " at tick " << vault_.pending_nudge_time_.ToInternalValue(); |
- vault_.pending_nudge_source_ = kUnknown; |
- vault_.pending_nudge_types_.clear(); |
- vault_.pending_nudge_time_ = base::TimeTicks(); |
+ GetModelSafeParamsForTypes(types, session_context_->registrar(), |
+ &routes, &workers); |
+ |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
+ this, &SyncerThread::ScheduleConfigImpl, routes, workers, |
+ GetUpdatesCallerInfo::FIRST_UPDATE)); |
+} |
+ |
+void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, |
+ const std::vector<ModelSafeWorker*>& workers, |
+ const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ |
+ VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; |
+ // TODO(tim): config-specific GetUpdatesCallerInfo value? |
+ SyncSession* session = new SyncSession(session_context_.get(), this, |
+ SyncSourceInfo(source, |
+ syncable::ModelTypePayloadMapFromRoutingInfo( |
+ routing_info, std::string())), |
+ routing_info, workers); |
+ ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
+ SyncSessionJob::CONFIGURATION, session, FROM_HERE); |
+} |
+ |
+void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
+ SyncSessionJob::SyncSessionJobPurpose purpose, |
+ sessions::SyncSession* session, |
+ const tracked_objects::Location& nudge_location) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ |
+ SyncSessionJob job(purpose, TimeTicks::Now() + delay, |
+ make_linked_ptr(session), false, nudge_location); |
+ if (purpose == SyncSessionJob::NUDGE) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" |
+ << " ScheduleSyncSessionJob"; |
+ DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); |
+ pending_nudge_.reset(new SyncSessionJob(job)); |
+ } |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Posting job to execute in DoSyncSessionJob. Job purpose " |
+ << job.purpose; |
+ MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, |
+ &SyncerThread::DoSyncSessionJob, job), |
+ delay.InMilliseconds()); |
+} |
+ |
+void SyncerThread::SetSyncerStepsForPurpose( |
+ SyncSessionJob::SyncSessionJobPurpose purpose, |
+ SyncerStep* start, SyncerStep* end) { |
+ *end = SYNCER_END; |
+ switch (purpose) { |
+ case SyncSessionJob::CONFIGURATION: |
+ *start = DOWNLOAD_UPDATES; |
+ *end = APPLY_UPDATES; |
+ return; |
+ case SyncSessionJob::CLEAR_USER_DATA: |
+ *start = CLEAR_PRIVATE_DATA; |
+ return; |
+ case SyncSessionJob::NUDGE: |
+ case SyncSessionJob::POLL: |
+ *start = SYNCER_BEGIN; |
+ return; |
+ default: |
+ NOTREACHED(); |
} |
+} |
- *was_nudged = nudged; |
- |
- // TODO(tim): Hack for bug 64136 to correctly tag continuations that result |
- // from syncer having more work to do. This will be handled properly with |
- // the message loop based syncer thread, bug 26339. |
- return MakeSyncSourceInfo(nudged || nudge_source == kContinuation, |
- nudge_source, model_types_with_payloads, initial_sync); |
-} |
- |
-SyncSourceInfo SyncerThread::MakeSyncSourceInfo(bool nudged, |
- NudgeSource nudge_source, |
- const ModelTypePayloadMap& model_types_with_payloads, |
- bool* initial_sync) { |
- sync_pb::GetUpdatesCallerInfo::GetUpdatesSource updates_source = |
- sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
- if (*initial_sync) { |
- updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
- *initial_sync = false; |
- } else if (!nudged) { |
- updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; |
+void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ if (!ShouldRunJob(job)) |
+ return; |
+ |
+ if (job.purpose == SyncSessionJob::NUDGE) { |
+ DCHECK(pending_nudge_.get()); |
+ if (pending_nudge_->session != job.session) |
+ return; // Another nudge must have been scheduled in in the meantime. |
+ pending_nudge_.reset(); |
+ } |
+ VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " |
+ << job.purpose; |
+ |
+ SyncerStep begin(SYNCER_BEGIN); |
+ SyncerStep end(SYNCER_END); |
+ SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
+ |
+ bool has_more_to_sync = true; |
+ while (ShouldRunJob(job) && has_more_to_sync) { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " SyncerThread: Calling SyncShare."; |
+ // Synchronously perform the sync session from this thread. |
+ syncer_->SyncShare(job.session.get(), begin, end); |
+ has_more_to_sync = job.session->HasMoreToSync(); |
+ if (has_more_to_sync) |
+ job.session->ResetTransientState(); |
+ } |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " SyncerThread: Done SyncShare looping."; |
+ FinishSyncSessionJob(job); |
+} |
+ |
+void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { |
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
+ // Whatever types were part of a configuration task will have had updates |
+ // downloaded. For that reason, we make sure they get recorded in the |
+ // event that they get disabled at a later time. |
+ ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); |
+ if (!r.empty()) { |
+ ModelSafeRoutingInfo temp_r; |
+ ModelSafeRoutingInfo old_info(old_job.session->routing_info()); |
+ std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), |
+ std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); |
+ session_context_->set_previous_session_routing_info(temp_r); |
+ } |
} else { |
- switch (nudge_source) { |
- case kNotification: |
- updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; |
- break; |
- case kLocal: |
- updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; |
- break; |
- case kContinuation: |
- updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
- break; |
- case kClearPrivateData: |
- updates_source = sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; |
- break; |
- case kUnknown: |
- default: |
- updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
- break; |
+ session_context_->set_previous_session_routing_info( |
+ old_job.session->routing_info()); |
+ } |
+} |
+ |
+void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ // Update timing information for how often datatypes are triggering nudges. |
+ base::TimeTicks now = TimeTicks::Now(); |
+ if (!last_sync_session_end_time_.is_null()) { |
+ ModelTypePayloadMap::const_iterator iter; |
+ for (iter = job.session->source().types.begin(); |
+ iter != job.session->source().types.end(); |
+ ++iter) { |
+ syncable::PostTimeToTypeHistogram(iter->first, |
+ now - last_sync_session_end_time_); |
} |
} |
+ last_sync_session_end_time_ = now; |
+ UpdateCarryoverSessionState(job); |
+ if (IsSyncingCurrentlySilenced()) { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " We are currently throttled. So not scheduling the next sync."; |
+ SaveJob(job); |
+ return; // Nothing to do. |
+ } |
- ModelTypePayloadMap sync_source_types; |
- if (model_types_with_payloads.empty()) { |
- // No datatypes requested. This must be a poll so set all enabled datatypes. |
- ModelSafeRoutingInfo routes; |
- session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
- sync_source_types = syncable::ModelTypePayloadMapFromRoutingInfo(routes, |
- std::string()); |
- } else { |
- sync_source_types = model_types_with_payloads; |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Updating the next polling time after SyncMain"; |
+ ScheduleNextSync(job); |
+} |
+ |
+void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ DCHECK(!old_job.session->HasMoreToSync()); |
+ // Note: |num_server_changes_remaining| > 0 here implies that we received a |
+ // broken response while trying to download all updates, because the Syncer |
+ // will loop until this value is exhausted. Also, if unsynced_handles exist |
+ // but HasMoreToSync is false, this implies that the Syncer determined no |
+ // forward progress was possible at this time (an error, such as an HTTP |
+ // 500, is likely to have occurred during commit). |
+ const bool work_to_do = |
+ old_job.session->status_controller()->num_server_changes_remaining() > 0 |
+ || old_job.session->status_controller()->unsynced_handles().size() > 0; |
+ VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: " |
+ << work_to_do; |
+ |
+ AdjustPolling(&old_job); |
+ |
+ // TODO(tim): Old impl had special code if notifications disabled. Needed? |
+ if (!work_to_do) { |
+ // Success implies backoff relief. Note that if this was a "one-off" job |
+ // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was |
+ // work_to_do before it ran this wont have changed, as jobs like this don't |
+ // run a full sync cycle. So we don't need special code here. |
+ wait_interval_.reset(); |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Job suceeded so not scheduling more jobs"; |
+ return; |
} |
- return SyncSourceInfo(updates_source, sync_source_types); |
-} |
- |
-void SyncerThread::CreateSyncer(const std::string& dirname) { |
- base::AutoLock lock(lock_); |
- VLOG(1) << "Creating syncer up for: " << dirname; |
- // The underlying database structure is ready, and we should create |
- // the syncer. |
- CHECK(vault_.syncer_ == NULL); |
- vault_.syncer_ = new Syncer(); |
- vault_field_changed_.Broadcast(); |
-} |
- |
-// Sets |*connected| to false if it is currently true but |code| suggests that |
-// the current network configuration and/or auth state cannot be used to make |
-// forward progress, and user intervention (e.g changing server URL or auth |
-// credentials) is likely necessary. If |*connected| is false, set it to true |
-// if |code| suggests that we just recently made healthy contact with the |
-// server. |
-static inline void CheckConnected(bool* connected, |
- HttpResponse::ServerConnectionCode code, |
- base::ConditionVariable* condvar) { |
- if (*connected) { |
- // Note, be careful when adding cases here because if the SyncerThread |
- // thinks there is no valid connection as determined by this method, it |
- // will drop out of *all* forward progress sync loops (it won't poll and it |
- // will queue up Talk notifications but not actually call SyncShare) until |
- // some external action causes a ServerConnectionManager to broadcast that |
- // a valid connection has been re-established. |
- if (HttpResponse::CONNECTION_UNAVAILABLE == code || |
- HttpResponse::SYNC_AUTH_ERROR == code) { |
- *connected = false; |
- condvar->Broadcast(); |
- } |
+ if (old_job.session->source().updates_source == |
+ GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Job failed with source continuation"; |
+ // We don't seem to have made forward progress. Start or extend backoff. |
+ HandleConsecutiveContinuationError(old_job); |
+ } else if (IsBackingOff()) { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " A nudge during backoff failed"; |
+ // We weren't continuing but we're in backoff; must have been a nudge. |
+ DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
+ DCHECK(!wait_interval_->had_nudge); |
+ wait_interval_->had_nudge = true; |
+ wait_interval_->timer.Reset(); |
} else { |
- if (HttpResponse::SERVER_CONNECTION_OK == code) { |
- *connected = true; |
- condvar->Broadcast(); |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " Failed. Schedule a job with continuation as source"; |
+ // We weren't continuing and we aren't in backoff. Schedule a normal |
+ // continuation. |
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
+ ScheduleConfigImpl(old_job.session->routing_info(), |
+ old_job.session->workers(), |
+ GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); |
+ } else { |
+ // For all other purposes(nudge and poll) we schedule a retry nudge. |
+ ScheduleNudgeImpl(TimeDelta::FromSeconds(0), |
+ GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), |
+ old_job.session->source().types, false, FROM_HERE); |
} |
} |
} |
-void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { |
- conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |
- &SyncerThread::HandleServerConnectionEvent)); |
- CheckConnected(&vault_.connected_, conn_mgr->server_status(), |
- &vault_field_changed_); |
+void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { |
+ DCHECK(thread_.IsRunning()); |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ |
+ TimeDelta poll = (!session_context_->notifications_enabled()) ? |
+ syncer_short_poll_interval_seconds_ : |
+ syncer_long_poll_interval_seconds_; |
+ bool rate_changed = !poll_timer_.IsRunning() || |
+ poll != poll_timer_.GetCurrentDelay(); |
+ |
+ if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) |
+ poll_timer_.Reset(); |
+ |
+ if (!rate_changed) |
+ return; |
+ |
+ // Adjust poll rate. |
+ poll_timer_.Stop(); |
+ poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); |
} |
-void SyncerThread::HandleServerConnectionEvent( |
- const ServerConnectionEvent& event) { |
- if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |
- base::AutoLock lock(lock_); |
- CheckConnected(&vault_.connected_, event.connection_code, |
- &vault_field_changed_); |
+void SyncerThread::HandleConsecutiveContinuationError( |
+ const SyncSessionJob& old_job) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ // This if conditions should be compiled out in retail builds. |
+ if (IsBackingOff()) { |
+ DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); |
} |
+ SyncSession* old = old_job.session.get(); |
+ SyncSession* s(new SyncSession(session_context_.get(), this, |
+ old->source(), old->routing_info(), old->workers())); |
+ TimeDelta length = delay_provider_->GetDelay( |
+ IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
+ |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " In handle continuation error. Old job purpose is " |
+ << old_job.purpose; |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " In Handle continuation error. The time delta(ms) is: " |
+ << length.InMilliseconds(); |
+ |
+ // This will reset the had_nudge variable as well. |
+ wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
+ length)); |
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
+ SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
+ make_linked_ptr(s), false, FROM_HERE); |
+ wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
+ } else { |
+ // We are not in configuration mode. So wait_interval's pending job |
+ // should be null. |
+ DCHECK(wait_interval_->pending_configure_job.get() == NULL); |
+ |
+ // TODO(lipalani) - handle clear user data. |
+ InitOrCoalescePendingJob(old_job); |
+ } |
+ wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); |
} |
-int SyncerThread::GetRecommendedDelaySeconds(int base_delay_seconds) { |
- if (base_delay_seconds >= kMaxBackoffSeconds) |
- return kMaxBackoffSeconds; |
+// static |
+TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { |
+ if (last_delay.InSeconds() >= kMaxBackoffSeconds) |
+ return TimeDelta::FromSeconds(kMaxBackoffSeconds); |
// This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 |
- int backoff_s = |
- std::max(1, base_delay_seconds * kBackoffRandomizationFactor); |
+ int64 backoff_s = |
+ std::max(static_cast<int64>(1), |
+ last_delay.InSeconds() * kBackoffRandomizationFactor); |
// Flip a coin to randomize backoff interval by +/- 50%. |
int rand_sign = base::RandInt(0, 1) * 2 - 1; |
// Truncation is adequate for rounding here. |
backoff_s = backoff_s + |
- (rand_sign * (base_delay_seconds / kBackoffRandomizationFactor)); |
+ (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); |
// Cap the backoff interval. |
- backoff_s = std::max(1, std::min(backoff_s, kMaxBackoffSeconds)); |
+ backoff_s = std::max(static_cast<int64>(1), |
+ std::min(backoff_s, kMaxBackoffSeconds)); |
- return backoff_s; |
+ return TimeDelta::FromSeconds(backoff_s); |
} |
-// Inputs and return value in milliseconds. |
-int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { |
- // syncer_polling_interval_ is in seconds |
- int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; |
- |
- // This is our default and lower bound. |
- int next_wait = syncer_polling_interval_ms; |
- |
- // Get idle time, bounded by max wait. |
- int idle = min(user_idle_ms, syncer_max_interval_); |
+void SyncerThread::Stop() { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " stop called"; |
+ syncer_->RequestEarlyExit(); // Safe to call from any thread. |
+ session_context_->connection_manager()->RemoveListener(this); |
+ thread_.Stop(); |
+} |
- // If the user has been idle for a while, we'll start decreasing the poll |
- // rate. |
- if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { |
- next_wait = std::min(GetRecommendedDelaySeconds( |
- last_interval / 1000), syncer_max_interval_ / 1000) * 1000; |
+void SyncerThread::DoCanaryJob() { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job"; |
+ DoPendingJobIfPossible(true); |
+} |
+ |
+void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { |
+ SyncSessionJob* job_to_execute = NULL; |
+ if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
+ && wait_interval_->pending_configure_job.get()) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job"; |
+ job_to_execute = wait_interval_->pending_configure_job.get(); |
+ } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job"; |
+ // Pending jobs mostly have time from the past. Reset it so this job |
+ // will get executed. |
+ if (pending_nudge_->scheduled_start < TimeTicks::Now()) |
+ pending_nudge_->scheduled_start = TimeTicks::Now(); |
+ |
+ scoped_ptr<SyncSession> session(CreateSyncSession( |
+ pending_nudge_->session->source())); |
+ |
+ // Also the routing info might have been changed since we cached the |
+ // pending nudge. Update it by coalescing to the latest. |
+ pending_nudge_->session->Coalesce(*(session.get())); |
+ // The pending nudge would be cleared in the DoSyncSessionJob function. |
+ job_to_execute = pending_nudge_.get(); |
} |
- return next_wait; |
-} |
- |
-// Called with mutex_ already locked. |
-void SyncerThread::NudgeSyncImpl( |
- int milliseconds_from_now, |
- NudgeSource source, |
- const ModelTypePayloadMap& model_types_with_payloads) { |
- // TODO(sync): Add the option to reset the backoff state machine. |
- // This is needed so nudges that are a result of the user's desire |
- // to download updates for a new data type can be satisfied quickly. |
- if (vault_.current_wait_interval_.mode == WaitInterval::THROTTLED || |
- vault_.current_wait_interval_.had_nudge_during_backoff) { |
- // Drop nudges on the floor if we've already had one since starting this |
- // stage of exponential backoff or we are throttled. |
- return; |
+ if (job_to_execute != NULL) { |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job"; |
+ SyncSessionJob copy = *job_to_execute; |
+ copy.is_canary_job = is_canary_job; |
+ DoSyncSessionJob(copy); |
} |
+} |
- // Union the current ModelTypePayloadMap with any from nudges that may have |
- // already posted (coalesce the nudge datatype information). |
- // TODO(tim): It seems weird to do this if the sources don't match up (e.g. |
- // if pending_source is kLocal and |source| is kClearPrivateData). |
- syncable::CoalescePayloads(&vault_.pending_nudge_types_, |
- model_types_with_payloads); |
- |
- const TimeTicks nudge_time = TimeTicks::Now() + |
- TimeDelta::FromMilliseconds(milliseconds_from_now); |
- if (nudge_time <= vault_.pending_nudge_time_) { |
- VLOG(1) << "Nudge for source " << source |
- << " dropped due to existing later pending nudge"; |
- return; |
- } |
+SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { |
+ ModelSafeRoutingInfo routes; |
+ std::vector<ModelSafeWorker*> workers; |
+ session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
+ session_context_->registrar()->GetWorkers(&workers); |
+ SyncSourceInfo info(source); |
- VLOG(1) << "Replacing pending nudge for source " << source |
- << " at " << nudge_time.ToInternalValue(); |
+ SyncSession* session(new SyncSession(session_context_.get(), this, info, |
+ routes, workers)); |
- vault_.pending_nudge_source_ = source; |
- vault_.pending_nudge_time_ = nudge_time; |
- vault_field_changed_.Broadcast(); |
+ return session; |
} |
-void SyncerThread::SetNotificationsEnabled(bool notifications_enabled) { |
- base::AutoLock lock(lock_); |
- session_context_->set_notifications_enabled(notifications_enabled); |
+void SyncerThread::PollTimerCallback() { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ ModelSafeRoutingInfo r; |
+ ModelTypePayloadMap types_with_payloads = |
+ syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); |
+ SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
+ SyncSession* s = CreateSyncSession(info); |
+ ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, |
+ FROM_HERE); |
} |
-// Returns the amount of time since the user last interacted with the computer, |
-// in milliseconds |
-int SyncerThread::UserIdleTime() { |
-#if defined(OS_WIN) |
- LASTINPUTINFO last_input_info; |
- last_input_info.cbSize = sizeof(LASTINPUTINFO); |
- |
- // Get time in windows ticks since system start of last activity. |
- BOOL b = ::GetLastInputInfo(&last_input_info); |
- if (b == TRUE) |
- return ::GetTickCount() - last_input_info.dwTime; |
-#elif defined(OS_MACOSX) |
- // It would be great to do something like: |
- // |
- // return 1000 * |
- // CGEventSourceSecondsSinceLastEventType( |
- // kCGEventSourceStateCombinedSessionState, |
- // kCGAnyInputEventType); |
- // |
- // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon |
- // and can't link that high up the food chain. Thus this mucking in IOKit. |
- |
- io_service_t hid_service = |
- IOServiceGetMatchingService(kIOMasterPortDefault, |
- IOServiceMatching("IOHIDSystem")); |
- if (!hid_service) { |
- LOG(WARNING) << "Could not obtain IOHIDSystem"; |
- return 0; |
- } |
+void SyncerThread::Unthrottle() { |
+ DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
+ VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled.."; |
+ DoCanaryJob(); |
+ wait_interval_.reset(); |
+} |
- CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, |
- CFSTR("HIDIdleTime"), |
- kCFAllocatorDefault, |
- 0); |
- if (!object) { |
- LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; |
- IOObjectRelease(hid_service); |
- return 0; |
- } |
+void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ session_context_->NotifyListeners(SyncEngineEvent(cause)); |
+} |
- int64 idle_time; // in nanoseconds |
- Boolean success = false; |
- if (CFGetTypeID(object) == CFNumberGetTypeID()) { |
- success = CFNumberGetValue((CFNumberRef)object, |
- kCFNumberSInt64Type, |
- &idle_time); |
- } else { |
- LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; |
- } |
+bool SyncerThread::IsBackingOff() const { |
+ return wait_interval_.get() && wait_interval_->mode == |
+ WaitInterval::EXPONENTIAL_BACKOFF; |
+} |
+ |
+void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { |
+ wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
+ silenced_until - TimeTicks::Now())); |
+ wait_interval_->timer.Start(wait_interval_->length, this, |
+ &SyncerThread::Unthrottle); |
+} |
- CFRelease(object); |
- IOObjectRelease(hid_service); |
+bool SyncerThread::IsSyncingCurrentlySilenced() { |
+ return wait_interval_.get() && wait_interval_->mode == |
+ WaitInterval::THROTTLED; |
+} |
- if (!success) { |
- LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; |
- return 0; |
- } |
- return idle_time / 1000000; // nano to milli |
-#elif defined(OS_LINUX) |
- if (idle_query_.get()) |
- return idle_query_->IdleTime(); |
- return 0; |
-#else |
- static bool was_logged = false; |
- if (!was_logged) { |
- was_logged = true; |
- VLOG(1) << "UserIdleTime unimplemented on this platform, synchronization " |
- "will not throttle when user idle"; |
- } |
-#endif |
+void SyncerThread::OnReceivedShortPollIntervalUpdate( |
+ const base::TimeDelta& new_interval) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ syncer_short_poll_interval_seconds_ = new_interval; |
+} |
- return 0; |
+void SyncerThread::OnReceivedLongPollIntervalUpdate( |
+ const base::TimeDelta& new_interval) { |
+ DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
+ syncer_long_poll_interval_seconds_ = new_interval; |
+} |
+ |
+void SyncerThread::OnShouldStopSyncingPermanently() { |
+ VLOG(2) << "SyncerThread(" << this << ")" |
+ << " OnShouldStopSyncingPermanently"; |
+ syncer_->RequestEarlyExit(); // Thread-safe. |
+ Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
+} |
+ |
+void SyncerThread::OnServerConnectionEvent( |
+ const ServerConnectionEvent2& event) { |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
+ &SyncerThread::CheckServerConnectionManagerStatus, |
+ event.connection_code)); |
+} |
+ |
+void SyncerThread::set_notifications_enabled(bool notifications_enabled) { |
+ session_context_->set_notifications_enabled(notifications_enabled); |
} |
-} // namespace browser_sync |
+} // browser_sync |