Chromium Code Reviews| Index: sync/engine/sync_scheduler.cc |
| diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc |
| index d528604bf3469e9a3a87385bdca459608b418d3b..7f087b8a22cd71327233b395a1344a43f9348408 100644 |
| --- a/sync/engine/sync_scheduler.cc |
| +++ b/sync/engine/sync_scheduler.cc |
| @@ -67,6 +67,24 @@ bool IsActionableError( |
| } |
| } // namespace |
| +ConfigureParams::ConfigureParams() |
| + : source(GetUpdatesCallerInfo::UNKNOWN), |
| + need_encryption_key(false) {} |
| +ConfigureParams::ConfigureParams( |
| + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, |
| + const syncable::ModelTypeSet& types_to_config, |
| + const browser_sync::ModelSafeRoutingInfo& routing_info, |
| + bool need_encryption_key, |
| + const base::Closure& ready_task) |
| + : source(source), |
| + types_to_config(types_to_config), |
| + routing_info(routing_info), |
| + need_encryption_key(need_encryption_key), |
| + ready_task(ready_task) { |
| + DCHECK(!ready_task.is_null()); |
| +} |
| +ConfigureParams::~ConfigureParams() {} |
| + |
| SyncScheduler::DelayProvider::DelayProvider() {} |
| SyncScheduler::DelayProvider::~DelayProvider() {} |
| @@ -99,11 +117,13 @@ SyncScheduler::SyncSessionJob::~SyncSessionJob() {} |
| SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
| base::TimeTicks start, |
| linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
| - const tracked_objects::Location& from_here) : purpose(purpose), |
| - scheduled_start(start), |
| - session(session), |
| - is_canary_job(is_canary_job), |
| - from_here(from_here) { |
| + ConfigureParams config_params, const tracked_objects::Location& from_here) |
| + : purpose(purpose), |
| + scheduled_start(start), |
| + session(session), |
| + is_canary_job(is_canary_job), |
| + config_params(config_params), |
| + from_here(from_here) { |
| } |
| const char* SyncScheduler::SyncSessionJob::GetPurposeString( |
| @@ -246,7 +266,7 @@ void SyncScheduler::UpdateServerConnectionManagerStatus( |
| connection_code_ = code; |
| } |
| -void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
| +void SyncScheduler::Start(Mode mode) { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| std::string thread_name = MessageLoop::current()->thread_name(); |
| if (thread_name.empty()) |
| @@ -263,8 +283,6 @@ void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
| Mode old_mode = mode_; |
| mode_ = mode; |
| AdjustPolling(NULL); // Will kick start poll timer if needed. |
| - if (!callback.is_null()) |
| - callback.Run(); |
| if (old_mode != mode_) { |
| // We just changed our mode. See if there are any pending jobs that we could |
| @@ -283,6 +301,113 @@ void SyncScheduler::SendInitialSnapshot() { |
| session_context_->NotifyListeners(event); |
| } |
| +namespace { |
| + |
| +// Helper to extract the routing info and workers corresponding to types in |
| +// |types| from |current_routes| and |current_workers|. |
| +void BuildModelSafeParams( |
| + const ModelTypeSet& types_to_config, |
| + const ModelSafeRoutingInfo& current_routes, |
| + const std::vector<ModelSafeWorker*>& current_workers, |
| + ModelSafeRoutingInfo* result_routes, |
| + std::vector<ModelSafeWorker*>* result_workers) { |
| + std::set<ModelSafeGroup> active_groups; |
| + active_groups.insert(GROUP_PASSIVE); |
| + for (ModelTypeSet::Iterator iter = types_to_config.First(); iter.Good(); |
| + iter.Inc()) { |
| + syncable::ModelType type = iter.Get(); |
| + ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); |
| + DCHECK(route != current_routes.end()); |
| + ModelSafeGroup group = route->second; |
| + (*result_routes)[type] = group; |
| + active_groups.insert(group); |
| + } |
| + |
| + for(std::vector<ModelSafeWorker*>::const_iterator iter = |
| + current_workers.begin(); iter != current_workers.end(); ++iter) { |
| + if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) |
| + result_workers->push_back(*iter); |
| + } |
| +} |
| + |
| +} // namespace. |
| + |
| +bool SyncScheduler::ScheduleConfiguration(const ConfigureParams& params) { |
| + DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| + DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
| + DCHECK_EQ(CONFIGURATION_MODE, mode_); |
| + DCHECK(!params.ready_task.is_null()); |
| + SDVLOG(2) << "Reconfiguring syncer."; |
| + |
| + // Only one configuration is allowed at a time. Verify we're not waiting |
| + // for a pending configure job. |
| + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); |
| + |
| + // TODO(sync): now that ModelChanging commands only use those workers within |
| + // the routing info, we don't really need |restricted_workers|. Remove it. |
| + browser_sync::ModelSafeRoutingInfo restricted_routes; |
| + std::vector<ModelSafeWorker*> restricted_workers; |
| + BuildModelSafeParams(params.types_to_config, |
| + params.routing_info, |
| + session_context_->workers(), |
| + &restricted_routes, |
| + &restricted_workers); |
| + session_context_->set_routing_info(params.routing_info); |
| + |
| + // TODO(sync): if it's confirmed that Cleanup has no effect on non-configures, |
| + // remove this command and add a call to PurgeEntriesWithTypeIn here. This |
|
tim (not reviewing)
2012/06/14 00:26:53
nit - well, we don't want to poke the directory di
Nicolas Zea
2012/06/14 22:35:52
Removed comment. Note that this is being remove an
|
| + // will also allow us to get rid of previous_session_routing_info. |
| + SyncSessionJob cleanup_job( |
| + SyncSessionJob::CLEANUP_DISABLED_TYPES, |
| + TimeTicks::Now(), |
| + make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
| + false, |
| + ConfigureParams(), |
| + FROM_HERE); |
| + DoSyncSessionJob(cleanup_job); |
|
tim (not reviewing)
2012/06/14 00:26:53
Hmm. So, this might choose not to actually execute
Nicolas Zea
2012/06/14 22:35:52
Done, and mentioned bug number for getting rid of
|
| + |
| + if (params.need_encryption_key) { |
| + // TODO(zea): implement in such a way that we can handle failures and the |
| + // subsequent retrys the scheduler might perform. |
| + NOTIMPLEMENTED(); |
|
tim (not reviewing)
2012/06/14 00:26:53
bug?
Nicolas Zea
2012/06/14 22:35:52
Done.
|
| + } |
| + |
| + // Only reconfigure if we have types to config. |
|
tim (not reviewing)
2012/06/14 00:26:53
It seems weird that this whole flow is called conf
Nicolas Zea
2012/06/14 22:35:52
Done.
|
| + if (!params.types_to_config.Empty()) { |
| + DCHECK(!restricted_routes.empty()); |
| + // TODO(tim): config-specific GetUpdatesCallerInfo value? |
|
tim (not reviewing)
2012/06/14 00:26:53
We have a config specific GetUpdatesCallerInfo...
Nicolas Zea
2012/06/14 22:35:52
Woops, I removed this is the other patch, but it c
|
| + linked_ptr<SyncSession> session(new SyncSession( |
| + session_context_, |
| + this, |
| + SyncSourceInfo(params.source, |
| + syncable::ModelTypePayloadMapFromRoutingInfo( |
| + restricted_routes, |
| + std::string())), |
| + restricted_routes, |
| + restricted_workers)); |
| + SyncSessionJob job(SyncSessionJob::CONFIGURATION, |
| + TimeTicks::Now(), |
| + session, |
| + false, |
| + params, |
| + FROM_HERE); |
| + DoSyncSessionJob(job); |
| + |
| + // If we failed, the job would have been saved as the pending configure |
| + // job and a wait interval would have been set. |
| + if (!session->Succeeded()) { |
| + DCHECK(wait_interval_.get() && |
| + wait_interval_->pending_configure_job.get()); |
| + return false; |
| + } |
| + } else { |
| + SDVLOG(2) << "No change in routing info, calling ready task directly."; |
| + params.ready_task.Run(); |
| + } |
| + |
| + return true; |
| +} |
| + |
| SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( |
| const SyncSessionJob& job) { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| @@ -381,7 +506,8 @@ void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
| s->delegate(), s->source(), s->routing_info(), s->workers())); |
| SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
| - make_linked_ptr(session.release()), false, job.from_here); |
| + make_linked_ptr(session.release()), false, |
| + ConfigureParams(), job.from_here); |
| pending_nudge_.reset(new SyncSessionJob(new_job)); |
| return; |
| @@ -427,11 +553,17 @@ void SyncScheduler::SaveJob(const SyncSessionJob& job) { |
| DCHECK(wait_interval_.get()); |
| DCHECK(mode_ == CONFIGURATION_MODE); |
| + // Config params should always get set. |
| + DCHECK(!job.config_params.ready_task.is_null()); |
| SyncSession* old = job.session.get(); |
| SyncSession* s(new SyncSession(session_context_, this, old->source(), |
| old->routing_info(), old->workers())); |
| - SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
| - make_linked_ptr(s), false, job.from_here); |
| + SyncSessionJob new_job(job.purpose, |
| + TimeTicks::Now(), |
| + make_linked_ptr(s), |
| + false, |
| + job.config_params, |
| + job.from_here); |
| wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
| } // drop the rest. |
| // TODO(sync): Is it okay to drop the rest? It's weird that |
| @@ -450,23 +582,16 @@ struct ModelSafeWorkerGroupIs { |
| void SyncScheduler::ClearUserData() { |
| DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| - SyncSessionJob job(SyncSessionJob::CLEAR_USER_DATA, TimeTicks::Now(), |
| + SyncSessionJob job(SyncSessionJob::CLEAR_USER_DATA, |
| + TimeTicks::Now(), |
| make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
| false, |
| + ConfigureParams(), |
| FROM_HERE); |
| DoSyncSessionJob(job); |
| } |
| -void SyncScheduler::CleanupDisabledTypes() { |
| - DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| - SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
| - make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
| - false, |
| - FROM_HERE); |
| - DoSyncSessionJob(job); |
| -} |
| - |
| void SyncScheduler::ScheduleNudgeAsync( |
| const TimeDelta& delay, |
| NudgeSource source, ModelTypeSet types, |
| @@ -524,7 +649,7 @@ void SyncScheduler::ScheduleNudgeImpl( |
| SyncSession* session(CreateSyncSession(info)); |
| SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
| make_linked_ptr(session), is_canary_job, |
| - nudge_location); |
| + ConfigureParams(), nudge_location); |
| session = NULL; |
| if (!ShouldRunJob(job)) |
| @@ -555,77 +680,6 @@ void SyncScheduler::ScheduleNudgeImpl( |
| ScheduleSyncSessionJob(job); |
| } |
| -// Helper to extract the routing info and workers corresponding to types in |
| -// |types| from |current_routes| and |current_workers|. |
| -void GetModelSafeParamsForTypes(ModelTypeSet types, |
| - const ModelSafeRoutingInfo& current_routes, |
| - const std::vector<ModelSafeWorker*>& current_workers, |
| - ModelSafeRoutingInfo* result_routes, |
| - std::vector<ModelSafeWorker*>* result_workers) { |
| - bool passive_group_added = false; |
| - |
| - typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
| - for (ModelTypeSet::Iterator it = types.First(); |
| - it.Good(); it.Inc()) { |
| - const syncable::ModelType t = it.Get(); |
| - ModelSafeRoutingInfo::const_iterator route = current_routes.find(t); |
| - DCHECK(route != current_routes.end()); |
| - ModelSafeGroup group = route->second; |
| - |
| - (*result_routes)[t] = group; |
| - iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(), |
| - ModelSafeWorkerGroupIs(group)); |
| - if (w_tmp_it != current_workers.end()) { |
| - iter result_workers_it = std::find_if( |
| - result_workers->begin(), result_workers->end(), |
| - ModelSafeWorkerGroupIs(group)); |
| - if (result_workers_it == result_workers->end()) |
| - result_workers->push_back(*w_tmp_it); |
| - |
| - if (group == GROUP_PASSIVE) |
| - passive_group_added = true; |
| - } else { |
| - NOTREACHED(); |
| - } |
| - } |
| - |
| - // Always add group passive. |
| - if (passive_group_added == false) { |
| - iter it = std::find_if(current_workers.begin(), current_workers.end(), |
| - ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
| - if (it != current_workers.end()) |
| - result_workers->push_back(*it); |
| - else |
| - NOTREACHED(); |
| - } |
| -} |
| - |
| -void SyncScheduler::ScheduleConfiguration( |
| - ModelTypeSet types, |
| - GetUpdatesCallerInfo::GetUpdatesSource source) { |
| - DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| - DCHECK(IsConfigRelatedUpdateSourceValue(source)); |
| - SDVLOG(2) << "Scheduling a config"; |
| - |
| - ModelSafeRoutingInfo routes; |
| - std::vector<ModelSafeWorker*> workers; |
| - GetModelSafeParamsForTypes(types, |
| - session_context_->routing_info(), |
| - session_context_->workers(), |
| - &routes, &workers); |
| - |
| - SyncSession* session = new SyncSession(session_context_, this, |
| - SyncSourceInfo(source, |
| - syncable::ModelTypePayloadMapFromRoutingInfo( |
| - routes, std::string())), |
| - routes, workers); |
| - SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), |
| - make_linked_ptr(session), |
| - false, |
| - FROM_HERE); |
| - DoSyncSessionJob(job); |
| -} |
| - |
| const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { |
| switch (mode) { |
| ENUM_CASE(CONFIGURATION_MODE); |
| @@ -845,6 +899,11 @@ void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { |
| if (old_job.session->SuccessfullyReachedServer()) |
| wait_interval_.reset(); |
| SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
| + |
| + // If this was a configuration job with a ready task, invoke it now that |
| + // we finished successfully. |
|
tim (not reviewing)
2012/06/14 00:26:53
Why do this in 'ScheduleNextSync'? It's not reall
Nicolas Zea
2012/06/14 22:35:52
Primarily just because this is where we were alrea
|
| + if (!old_job.config_params.ready_task.is_null()) |
| + old_job.config_params.ready_task.Run(); |
| return; |
| } |
| @@ -930,11 +989,15 @@ void SyncScheduler::HandleContinuationError( |
| wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| length)); |
| if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| + SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
| + // Config params should always get set. |
| + DCHECK(!old_job.config_params.ready_task.is_null()); |
| SyncSession* old = old_job.session.get(); |
| SyncSession* s(new SyncSession(session_context_, this, |
| old->source(), old->routing_info(), old->workers())); |
| SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
| - make_linked_ptr(s), false, FROM_HERE); |
| + make_linked_ptr(s), false, old_job.config_params, |
| + FROM_HERE); |
| wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
| } else { |
| // We are not in configuration mode. So wait_interval's pending job |
| @@ -1056,6 +1119,7 @@ void SyncScheduler::PollTimerCallback() { |
| SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), |
| make_linked_ptr(s), |
| false, |
| + ConfigureParams(), |
| FROM_HERE); |
| ScheduleSyncSessionJob(job); |