Chromium Code Reviews| Index: sync/engine/sync_scheduler.cc |
| diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc |
| index 1e9da41f81cace5e706faf0b40bddaa70188d474..38f7f41caacafcccc4b6121b5b53ccf29916a536 100644 |
| --- a/sync/engine/sync_scheduler.cc |
| +++ b/sync/engine/sync_scheduler.cc |
| @@ -67,6 +67,25 @@ bool IsActionableError( |
| } |
| } // namespace |
| +SyncScheduler::ConfigureParams::ConfigureParams() |
| + : source(GetUpdatesCallerInfo::UNKNOWN), |
| + need_cleanup(false), |
| + need_encryption_key(false) {} |
| +SyncScheduler::ConfigureParams::ConfigureParams( |
| + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, |
| + const syncable::ModelTypeSet& types_to_config, |
| + const browser_sync::ModelSafeRoutingInfo& routing_info, |
| + bool need_cleanup, |
| + bool need_encryption_key, |
| + const base::Closure& ready_task) |
| + : source(source), |
| + types_to_config(types_to_config), |
| + routing_info(routing_info), |
| + need_cleanup(need_cleanup), |
| + need_encryption_key(need_encryption_key), |
| + ready_task(ready_task) {} |
| +SyncScheduler::ConfigureParams::~ConfigureParams() {} |
| + |
| SyncScheduler::DelayProvider::DelayProvider() {} |
| SyncScheduler::DelayProvider::~DelayProvider() {} |
| @@ -246,7 +265,7 @@ void SyncScheduler::UpdateServerConnectionManagerStatus( |
| connection_code_ = code; |
| } |
| -void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
| +void SyncScheduler::Start(Mode mode) { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| std::string thread_name = MessageLoop::current()->thread_name(); |
| if (thread_name.empty()) |
| @@ -255,13 +274,9 @@ void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
| << thread_name << " with mode " << GetModeString(mode); |
| if (!started_) { |
| started_ = true; |
| - PostTask(FROM_HERE, "SendInitialSnapshot", |
| - base::Bind(&SyncScheduler::SendInitialSnapshot, |
| - weak_ptr_factory_.GetWeakPtr())); |
| + SendInitialSnapshot(); |
| } |
| - PostTask(FROM_HERE, "StartImpl", |
| - base::Bind(&SyncScheduler::StartImpl, |
| - weak_ptr_factory_.GetWeakPtr(), mode, callback)); |
| + StartImpl(mode); |
| } |
| void SyncScheduler::SendInitialSnapshot() { |
| @@ -274,7 +289,7 @@ void SyncScheduler::SendInitialSnapshot() { |
| session_context_->NotifyListeners(event); |
| } |
| -void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) { |
| +void SyncScheduler::StartImpl(Mode mode) { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| SDVLOG(2) << "In StartImpl with mode " << GetModeString(mode); |
| @@ -284,8 +299,6 @@ void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) { |
| Mode old_mode = mode_; |
| mode_ = mode; |
| AdjustPolling(NULL); // Will kick start poll timer if needed. |
| - if (!callback.is_null()) |
| - callback.Run(); |
| if (old_mode != mode_) { |
| // We just changed our mode. See if there are any pending jobs that we could |
| @@ -294,6 +307,108 @@ void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) { |
| } |
| } |
| +namespace { |
| + |
| +// Helper to extract the routing info and workers corresponding to types in |
| +// |types| from |current_routes| and |current_workers|. |
| +void BuildModelSafeParams( |
| + const ModelTypeSet& types_to_config, |
| + const ModelSafeRoutingInfo& current_routes, |
| + const std::vector<ModelSafeWorker*>& current_workers, |
| + ModelSafeRoutingInfo* result_routes, |
| + std::vector<ModelSafeWorker*>* result_workers) { |
| + std::set<ModelSafeGroup> active_groups; |
| + active_groups.insert(GROUP_PASSIVE); |
| + for (ModelTypeSet::Iterator iter = types_to_config.First(); iter.Good(); |
| + iter.Inc()) { |
| + syncable::ModelType type = iter.Get(); |
| + ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); |
| + DCHECK(route != current_routes.end()); |
| + ModelSafeGroup group = route->second; |
| + (*result_routes)[type] = group; |
| + active_groups.insert(group); |
| + } |
| + |
| + for(std::vector<ModelSafeWorker*>::const_iterator iter = |
| + current_workers.begin(); iter != current_workers.end(); ++iter) { |
| + if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) |
| + result_workers->push_back(*iter); |
| + } |
| +} |
| + |
| +} // namespace. |
| + |
| +bool SyncScheduler::Configure(const ConfigureParams& params) { |
| + DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| + DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
| + DCHECK_EQ(CONFIGURATION_MODE, mode_); |
| + SDVLOG(2) << "Reconfiguring syncer."; |
| + |
| + // Only one configuration is allowed at a time. Verify we're not waiting |
| + // for a pending configure job. |
| + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); |
| + |
| + // We set the routing info for all enabled types in the session context, but |
| + // the session for this configuration only knows about the routing info for |
| + // those types_to_config (set via restricted_routes and restricted_workers). |
| + browser_sync::ModelSafeRoutingInfo restricted_routes; |
| + std::vector<ModelSafeWorker*> restricted_workers; |
| + BuildModelSafeParams(params.types_to_config, |
| + params.routing_info, |
| + session_context_->workers(), |
| + &restricted_routes, |
| + &restricted_workers); |
| + const browser_sync::ModelSafeRoutingInfo& previous_routing_info = |
| + session_context_->routing_info(); |
| + // TODO(zea): ensure that the routing info can't be set anywhere but here. |
| + session_context_->set_routing_info(params.routing_info); |
| + |
| + if (params.need_cleanup) { |
|
rlarocque
2012/06/04 23:01:38
CleanupDisabledTypesCommand has some strange logic
tim (not reviewing)
2012/06/05 04:02:09
We do have a function that takes a 'types to clean
Nicolas Zea
2012/06/07 19:25:42
Discussed offline. I've removed this logic and now
|
| + SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
| + make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
| + false, |
| + FROM_HERE); |
| + DoSyncSessionJob(job); |
| + } |
| + |
| + if (params.need_encryption_key) { |
| + // TODO(zea): implement in such a way that we can handle failures and the |
| + // subsequent retry's the scheduler might perform. |
| + NOTIMPLEMENTED(); |
| + } |
| + |
| + // If the set of enabled types hasn't changed there's no need to configure. |
| + if (params.routing_info != previous_routing_info) { |
| + DCHECK(!restricted_routes.empty()); |
| + // TODO(tim): config-specific GetUpdatesCallerInfo value? |
| + SyncSession* session = new SyncSession( |
| + session_context_, |
| + this, |
| + SyncSourceInfo(params.source, |
| + syncable::ModelTypePayloadMapFromRoutingInfo( |
| + restricted_routes, |
| + std::string())), |
| + restricted_routes, |
| + restricted_workers); |
| + SyncSessionJob job(SyncSessionJob::CONFIGURATION, |
|
rlarocque
2012/06/04 23:01:38
The arguments for converting SyncSessionJobs into
|
| + TimeTicks::Now(), |
| + make_linked_ptr(session), |
| + false, |
| + FROM_HERE); |
| + job.config_params = params; |
| + DoSyncSessionJob(job); |
| + |
| + // If we failed, the job would have been saved as the pending configure |
| + // job and a wait interval would have been set. |
| + if (wait_interval_.get() && wait_interval_->pending_configure_job.get()) |
| + return false; |
| + } else { |
| + params.ready_task.Run(); |
| + } |
| + |
| + return true; |
| +} |
| + |
| SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( |
| const SyncSessionJob& job) { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| @@ -438,6 +553,8 @@ void SyncScheduler::SaveJob(const SyncSessionJob& job) { |
| DCHECK(wait_interval_.get()); |
| DCHECK(mode_ == CONFIGURATION_MODE); |
| + // Config params should always get set. |
| + DCHECK(!job.config_params.ready_task.is_null()); |
| SyncSession* old = job.session.get(); |
| SyncSession* s(new SyncSession(session_context_, this, old->source(), |
| old->routing_info(), old->workers())); |
| @@ -459,6 +576,8 @@ struct ModelSafeWorkerGroupIs { |
| ModelSafeGroup group; |
| }; |
| +// TODO(sync): Remove the *Impl methods for the other Schedule* |
| +// functions, too. |
| void SyncScheduler::ScheduleClearUserData() { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| PostTask(FROM_HERE, "ScheduleClearUserDataImpl", |
| @@ -466,17 +585,6 @@ void SyncScheduler::ScheduleClearUserData() { |
| weak_ptr_factory_.GetWeakPtr())); |
| } |
| -// TODO(sync): Remove the *Impl methods for the other Schedule* |
| -// functions, too. |
| -void SyncScheduler::ScheduleCleanupDisabledTypes() { |
| - DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| - SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
| - make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
| - false, |
| - FROM_HERE); |
| - ScheduleSyncSessionJob(job); |
| -} |
| - |
| void SyncScheduler::ScheduleNudge( |
| const TimeDelta& delay, |
| NudgeSource source, ModelTypeSet types, |
| @@ -581,93 +689,6 @@ void SyncScheduler::ScheduleNudgeImpl( |
| ScheduleSyncSessionJob(job); |
| } |
| -// Helper to extract the routing info and workers corresponding to types in |
| -// |types| from |current_routes| and |current_workers|. |
| -void GetModelSafeParamsForTypes(ModelTypeSet types, |
| - const ModelSafeRoutingInfo& current_routes, |
| - const std::vector<ModelSafeWorker*>& current_workers, |
| - ModelSafeRoutingInfo* result_routes, |
| - std::vector<ModelSafeWorker*>* result_workers) { |
| - bool passive_group_added = false; |
| - |
| - typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
| - for (ModelTypeSet::Iterator it = types.First(); |
| - it.Good(); it.Inc()) { |
| - const syncable::ModelType t = it.Get(); |
| - ModelSafeRoutingInfo::const_iterator route = current_routes.find(t); |
| - DCHECK(route != current_routes.end()); |
| - ModelSafeGroup group = route->second; |
| - |
| - (*result_routes)[t] = group; |
| - iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(), |
| - ModelSafeWorkerGroupIs(group)); |
| - if (w_tmp_it != current_workers.end()) { |
| - iter result_workers_it = std::find_if( |
| - result_workers->begin(), result_workers->end(), |
| - ModelSafeWorkerGroupIs(group)); |
| - if (result_workers_it == result_workers->end()) |
| - result_workers->push_back(*w_tmp_it); |
| - |
| - if (group == GROUP_PASSIVE) |
| - passive_group_added = true; |
| - } else { |
| - NOTREACHED(); |
| - } |
| - } |
| - |
| - // Always add group passive. |
| - if (passive_group_added == false) { |
| - iter it = std::find_if(current_workers.begin(), current_workers.end(), |
| - ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
| - if (it != current_workers.end()) |
| - result_workers->push_back(*it); |
| - else |
| - NOTREACHED(); |
| - } |
| -} |
| - |
| -void SyncScheduler::ScheduleConfig( |
| - ModelTypeSet types, |
| - GetUpdatesCallerInfo::GetUpdatesSource source) { |
| - DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| - DCHECK(IsConfigRelatedUpdateSourceValue(source)); |
| - SDVLOG(2) << "Scheduling a config"; |
| - |
| - ModelSafeRoutingInfo routes; |
| - std::vector<ModelSafeWorker*> workers; |
| - GetModelSafeParamsForTypes(types, |
| - session_context_->routing_info(), |
| - session_context_->workers(), |
| - &routes, &workers); |
| - |
| - PostTask(FROM_HERE, "ScheduleConfigImpl", |
| - base::Bind(&SyncScheduler::ScheduleConfigImpl, |
| - weak_ptr_factory_.GetWeakPtr(), |
| - routes, |
| - workers, |
| - source)); |
| -} |
| - |
| -void SyncScheduler::ScheduleConfigImpl( |
| - const ModelSafeRoutingInfo& routing_info, |
| - const std::vector<ModelSafeWorker*>& workers, |
| - const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
| - DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| - |
| - SDVLOG(2) << "In ScheduleConfigImpl"; |
| - // TODO(tim): config-specific GetUpdatesCallerInfo value? |
| - SyncSession* session = new SyncSession(session_context_, this, |
| - SyncSourceInfo(source, |
| - syncable::ModelTypePayloadMapFromRoutingInfo( |
| - routing_info, std::string())), |
| - routing_info, workers); |
| - SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), |
| - make_linked_ptr(session), |
| - false, |
| - FROM_HERE); |
| - ScheduleSyncSessionJob(job); |
| -} |
| - |
| const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { |
| switch (mode) { |
| ENUM_CASE(CONFIGURATION_MODE); |
| @@ -889,6 +910,11 @@ void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { |
| // special code here. |
| wait_interval_.reset(); |
| SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
| + |
| + // If this was a configuration job with a ready task, invoke it now that |
| + // we finished successfully. |
| + if (!old_job.config_params.ready_task.is_null()) |
| + old_job.config_params.ready_task.Run(); |
| return; |
| } |
| @@ -974,6 +1000,8 @@ void SyncScheduler::HandleContinuationError( |
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| length)); |
| if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| + // Config params should always get set. |
| + DCHECK(!old_job.config_params.ready_task.is_null()); |
| SyncSession* old = old_job.session.get(); |
| SyncSession* s(new SyncSession(session_context_, this, |
| old->source(), old->routing_info(), old->workers())); |