Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(164)

Unified Diff: sync/engine/sync_scheduler.cc

Issue 10483015: [Sync] Refactor sync configuration logic. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sync/engine/sync_scheduler.cc
diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc
index 1e9da41f81cace5e706faf0b40bddaa70188d474..38f7f41caacafcccc4b6121b5b53ccf29916a536 100644
--- a/sync/engine/sync_scheduler.cc
+++ b/sync/engine/sync_scheduler.cc
@@ -67,6 +67,25 @@ bool IsActionableError(
}
} // namespace
+SyncScheduler::ConfigureParams::ConfigureParams()
+ : source(GetUpdatesCallerInfo::UNKNOWN),
+ need_cleanup(false),
+ need_encryption_key(false) {}
+SyncScheduler::ConfigureParams::ConfigureParams(
+ const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
+ const syncable::ModelTypeSet& types_to_config,
+ const browser_sync::ModelSafeRoutingInfo& routing_info,
+ bool need_cleanup,
+ bool need_encryption_key,
+ const base::Closure& ready_task)
+ : source(source),
+ types_to_config(types_to_config),
+ routing_info(routing_info),
+ need_cleanup(need_cleanup),
+ need_encryption_key(need_encryption_key),
+ ready_task(ready_task) {}
+SyncScheduler::ConfigureParams::~ConfigureParams() {}
+
SyncScheduler::DelayProvider::DelayProvider() {}
SyncScheduler::DelayProvider::~DelayProvider() {}
@@ -246,7 +265,7 @@ void SyncScheduler::UpdateServerConnectionManagerStatus(
connection_code_ = code;
}
-void SyncScheduler::Start(Mode mode, const base::Closure& callback) {
+void SyncScheduler::Start(Mode mode) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
std::string thread_name = MessageLoop::current()->thread_name();
if (thread_name.empty())
@@ -255,13 +274,9 @@ void SyncScheduler::Start(Mode mode, const base::Closure& callback) {
<< thread_name << " with mode " << GetModeString(mode);
if (!started_) {
started_ = true;
- PostTask(FROM_HERE, "SendInitialSnapshot",
- base::Bind(&SyncScheduler::SendInitialSnapshot,
- weak_ptr_factory_.GetWeakPtr()));
+ SendInitialSnapshot();
}
- PostTask(FROM_HERE, "StartImpl",
- base::Bind(&SyncScheduler::StartImpl,
- weak_ptr_factory_.GetWeakPtr(), mode, callback));
+ StartImpl(mode);
}
void SyncScheduler::SendInitialSnapshot() {
@@ -274,7 +289,7 @@ void SyncScheduler::SendInitialSnapshot() {
session_context_->NotifyListeners(event);
}
-void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) {
+void SyncScheduler::StartImpl(Mode mode) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
SDVLOG(2) << "In StartImpl with mode " << GetModeString(mode);
@@ -284,8 +299,6 @@ void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) {
Mode old_mode = mode_;
mode_ = mode;
AdjustPolling(NULL); // Will kick start poll timer if needed.
- if (!callback.is_null())
- callback.Run();
if (old_mode != mode_) {
// We just changed our mode. See if there are any pending jobs that we could
@@ -294,6 +307,108 @@ void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) {
}
}
+namespace {
+
+// Helper to extract the routing info and workers corresponding to types in
+// |types| from |current_routes| and |current_workers|.
+void BuildModelSafeParams(
+ const ModelTypeSet& types_to_config,
+ const ModelSafeRoutingInfo& current_routes,
+ const std::vector<ModelSafeWorker*>& current_workers,
+ ModelSafeRoutingInfo* result_routes,
+ std::vector<ModelSafeWorker*>* result_workers) {
+ std::set<ModelSafeGroup> active_groups;
+ active_groups.insert(GROUP_PASSIVE);
+ for (ModelTypeSet::Iterator iter = types_to_config.First(); iter.Good();
+ iter.Inc()) {
+ syncable::ModelType type = iter.Get();
+ ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
+ DCHECK(route != current_routes.end());
+ ModelSafeGroup group = route->second;
+ (*result_routes)[type] = group;
+ active_groups.insert(group);
+ }
+
+ for(std::vector<ModelSafeWorker*>::const_iterator iter =
+ current_workers.begin(); iter != current_workers.end(); ++iter) {
+ if (active_groups.count((*iter)->GetModelSafeGroup()) > 0)
+ result_workers->push_back(*iter);
+ }
+}
+
+} // namespace.
+
+bool SyncScheduler::Configure(const ConfigureParams& params) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
+ DCHECK_EQ(CONFIGURATION_MODE, mode_);
+ SDVLOG(2) << "Reconfiguring syncer.";
+
+ // Only one configuration is allowed at a time. Verify we're not waiting
+ // for a pending configure job.
+ DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
+
+ // We set the routing info for all enabled types in the session context, but
+ // the session for this configuration only knows about the routing info for
+ // those types_to_config (set via restricted_routes and restricted_workers).
+ browser_sync::ModelSafeRoutingInfo restricted_routes;
+ std::vector<ModelSafeWorker*> restricted_workers;
+ BuildModelSafeParams(params.types_to_config,
+ params.routing_info,
+ session_context_->workers(),
+ &restricted_routes,
+ &restricted_workers);
+ const browser_sync::ModelSafeRoutingInfo& previous_routing_info =
+ session_context_->routing_info();
+ // TODO(zea): ensure that the routing info can't be set anywhere but here.
+ session_context_->set_routing_info(params.routing_info);
+
+ if (params.need_cleanup) {
rlarocque 2012/06/04 23:01:38 CleanupDisabledTypesCommand has some strange logic
tim (not reviewing) 2012/06/05 04:02:09 We do have a function that takes a 'types to clean
Nicolas Zea 2012/06/07 19:25:42 Discussed offline. I've removed this logic and now
+ SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(),
+ make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
+ false,
+ FROM_HERE);
+ DoSyncSessionJob(job);
+ }
+
+ if (params.need_encryption_key) {
+ // TODO(zea): implement in such a way that we can handle failures and the
+ // subsequent retry's the scheduler might perform.
+ NOTIMPLEMENTED();
+ }
+
+ // If the set of enabled types hasn't changed there's no need to configure.
+ if (params.routing_info != previous_routing_info) {
+ DCHECK(!restricted_routes.empty());
+ // TODO(tim): config-specific GetUpdatesCallerInfo value?
+ SyncSession* session = new SyncSession(
+ session_context_,
+ this,
+ SyncSourceInfo(params.source,
+ syncable::ModelTypePayloadMapFromRoutingInfo(
+ restricted_routes,
+ std::string())),
+ restricted_routes,
+ restricted_workers);
+ SyncSessionJob job(SyncSessionJob::CONFIGURATION,
rlarocque 2012/06/04 23:01:38 The arguments for converting SyncSessionJobs into
+ TimeTicks::Now(),
+ make_linked_ptr(session),
+ false,
+ FROM_HERE);
+ job.config_params = params;
+ DoSyncSessionJob(job);
+
+ // If we failed, the job would have been saved as the pending configure
+ // job and a wait interval would have been set.
+ if (wait_interval_.get() && wait_interval_->pending_configure_job.get())
+ return false;
+ } else {
+ params.ready_task.Run();
+ }
+
+ return true;
+}
+
SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval(
const SyncSessionJob& job) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
@@ -438,6 +553,8 @@ void SyncScheduler::SaveJob(const SyncSessionJob& job) {
DCHECK(wait_interval_.get());
DCHECK(mode_ == CONFIGURATION_MODE);
+ // Config params should always get set.
+ DCHECK(!job.config_params.ready_task.is_null());
SyncSession* old = job.session.get();
SyncSession* s(new SyncSession(session_context_, this, old->source(),
old->routing_info(), old->workers()));
@@ -459,6 +576,8 @@ struct ModelSafeWorkerGroupIs {
ModelSafeGroup group;
};
+// TODO(sync): Remove the *Impl methods for the other Schedule*
+// functions, too.
void SyncScheduler::ScheduleClearUserData() {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
PostTask(FROM_HERE, "ScheduleClearUserDataImpl",
@@ -466,17 +585,6 @@ void SyncScheduler::ScheduleClearUserData() {
weak_ptr_factory_.GetWeakPtr()));
}
-// TODO(sync): Remove the *Impl methods for the other Schedule*
-// functions, too.
-void SyncScheduler::ScheduleCleanupDisabledTypes() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(),
- make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
- false,
- FROM_HERE);
- ScheduleSyncSessionJob(job);
-}
-
void SyncScheduler::ScheduleNudge(
const TimeDelta& delay,
NudgeSource source, ModelTypeSet types,
@@ -581,93 +689,6 @@ void SyncScheduler::ScheduleNudgeImpl(
ScheduleSyncSessionJob(job);
}
-// Helper to extract the routing info and workers corresponding to types in
-// |types| from |current_routes| and |current_workers|.
-void GetModelSafeParamsForTypes(ModelTypeSet types,
- const ModelSafeRoutingInfo& current_routes,
- const std::vector<ModelSafeWorker*>& current_workers,
- ModelSafeRoutingInfo* result_routes,
- std::vector<ModelSafeWorker*>* result_workers) {
- bool passive_group_added = false;
-
- typedef std::vector<ModelSafeWorker*>::const_iterator iter;
- for (ModelTypeSet::Iterator it = types.First();
- it.Good(); it.Inc()) {
- const syncable::ModelType t = it.Get();
- ModelSafeRoutingInfo::const_iterator route = current_routes.find(t);
- DCHECK(route != current_routes.end());
- ModelSafeGroup group = route->second;
-
- (*result_routes)[t] = group;
- iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(),
- ModelSafeWorkerGroupIs(group));
- if (w_tmp_it != current_workers.end()) {
- iter result_workers_it = std::find_if(
- result_workers->begin(), result_workers->end(),
- ModelSafeWorkerGroupIs(group));
- if (result_workers_it == result_workers->end())
- result_workers->push_back(*w_tmp_it);
-
- if (group == GROUP_PASSIVE)
- passive_group_added = true;
- } else {
- NOTREACHED();
- }
- }
-
- // Always add group passive.
- if (passive_group_added == false) {
- iter it = std::find_if(current_workers.begin(), current_workers.end(),
- ModelSafeWorkerGroupIs(GROUP_PASSIVE));
- if (it != current_workers.end())
- result_workers->push_back(*it);
- else
- NOTREACHED();
- }
-}
-
-void SyncScheduler::ScheduleConfig(
- ModelTypeSet types,
- GetUpdatesCallerInfo::GetUpdatesSource source) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK(IsConfigRelatedUpdateSourceValue(source));
- SDVLOG(2) << "Scheduling a config";
-
- ModelSafeRoutingInfo routes;
- std::vector<ModelSafeWorker*> workers;
- GetModelSafeParamsForTypes(types,
- session_context_->routing_info(),
- session_context_->workers(),
- &routes, &workers);
-
- PostTask(FROM_HERE, "ScheduleConfigImpl",
- base::Bind(&SyncScheduler::ScheduleConfigImpl,
- weak_ptr_factory_.GetWeakPtr(),
- routes,
- workers,
- source));
-}
-
-void SyncScheduler::ScheduleConfigImpl(
- const ModelSafeRoutingInfo& routing_info,
- const std::vector<ModelSafeWorker*>& workers,
- const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
-
- SDVLOG(2) << "In ScheduleConfigImpl";
- // TODO(tim): config-specific GetUpdatesCallerInfo value?
- SyncSession* session = new SyncSession(session_context_, this,
- SyncSourceInfo(source,
- syncable::ModelTypePayloadMapFromRoutingInfo(
- routing_info, std::string())),
- routing_info, workers);
- SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(),
- make_linked_ptr(session),
- false,
- FROM_HERE);
- ScheduleSyncSessionJob(job);
-}
-
const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) {
switch (mode) {
ENUM_CASE(CONFIGURATION_MODE);
@@ -889,6 +910,11 @@ void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) {
// special code here.
wait_interval_.reset();
SDVLOG(2) << "Job succeeded so not scheduling more jobs";
+
+ // If this was a configuration job with a ready task, invoke it now that
+ // we finished successfully.
+ if (!old_job.config_params.ready_task.is_null())
+ old_job.config_params.ready_task.Run();
return;
}
@@ -974,6 +1000,8 @@ void SyncScheduler::HandleContinuationError(
wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
length));
if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
+ // Config params should always get set.
+ DCHECK(!old_job.config_params.ready_task.is_null());
SyncSession* old = old_job.session.get();
SyncSession* s(new SyncSession(session_context_, this,
old->source(), old->routing_info(), old->workers()));

Powered by Google App Engine
This is Rietveld 408576698