OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/bind_helpers.h" | |
12 #include "base/compiler_specific.h" | 13 #include "base/compiler_specific.h" |
13 #include "base/location.h" | 14 #include "base/location.h" |
14 #include "base/logging.h" | 15 #include "base/logging.h" |
15 #include "base/message_loop.h" | 16 #include "base/message_loop.h" |
16 #include "sync/engine/backoff_delay_provider.h" | 17 #include "sync/engine/backoff_delay_provider.h" |
17 #include "sync/engine/syncer.h" | 18 #include "sync/engine/syncer.h" |
18 #include "sync/engine/throttled_data_type_tracker.h" | 19 #include "sync/engine/throttled_data_type_tracker.h" |
19 #include "sync/protocol/proto_enum_conversions.h" | 20 #include "sync/protocol/proto_enum_conversions.h" |
20 #include "sync/protocol/sync.pb.h" | 21 #include "sync/protocol/sync.pb.h" |
21 #include "sync/util/data_type_histogram.h" | 22 #include "sync/util/data_type_histogram.h" |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
76 : source(source), | 77 : source(source), |
77 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
78 routing_info(routing_info), | 79 routing_info(routing_info), |
79 ready_task(ready_task) { | 80 ready_task(ready_task) { |
80 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
81 } | 82 } |
82 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
83 | 84 |
84 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
85 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
86 had_nudge(false) { | 87 had_nudge(false), |
87 } | 88 pending_configure_job(NULL) {} |
89 | |
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
91 : mode(mode), had_nudge(false), length(length), | |
92 pending_configure_job(NULL) {} | |
88 | 93 |
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
90 | 95 |
91 #define ENUM_CASE(x) case x: return #x; break; | 96 #define ENUM_CASE(x) case x: return #x; break; |
92 | 97 |
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
94 switch (mode) { | 99 switch (mode) { |
95 ENUM_CASE(UNKNOWN); | 100 ENUM_CASE(UNKNOWN); |
96 ENUM_CASE(EXPONENTIAL_BACKOFF); | 101 ENUM_CASE(EXPONENTIAL_BACKOFF); |
97 ENUM_CASE(THROTTLED); | 102 ENUM_CASE(THROTTLED); |
98 } | 103 } |
99 NOTREACHED(); | 104 NOTREACHED(); |
100 return ""; | 105 return ""; |
101 } | 106 } |
102 | 107 |
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() | |
104 : purpose(UNKNOWN), | |
105 is_canary_job(false) { | |
106 } | |
107 | |
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} | |
109 | |
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | |
111 base::TimeTicks start, | |
112 linked_ptr<sessions::SyncSession> session, | |
113 bool is_canary_job, | |
114 const ConfigurationParams& config_params, | |
115 const tracked_objects::Location& from_here) | |
116 : purpose(purpose), | |
117 scheduled_start(start), | |
118 session(session), | |
119 is_canary_job(is_canary_job), | |
120 config_params(config_params), | |
121 from_here(from_here) { | |
122 } | |
123 | |
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( | |
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { | |
126 switch (purpose) { | |
127 ENUM_CASE(UNKNOWN); | |
128 ENUM_CASE(POLL); | |
129 ENUM_CASE(NUDGE); | |
130 ENUM_CASE(CONFIGURATION); | |
131 } | |
132 NOTREACHED(); | |
133 return ""; | |
134 } | |
135 | |
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
137 NudgeSource source) { | 109 NudgeSource source) { |
138 switch (source) { | 110 switch (source) { |
139 case NUDGE_SOURCE_NOTIFICATION: | 111 case NUDGE_SOURCE_NOTIFICATION: |
140 return GetUpdatesCallerInfo::NOTIFICATION; | 112 return GetUpdatesCallerInfo::NOTIFICATION; |
141 case NUDGE_SOURCE_LOCAL: | 113 case NUDGE_SOURCE_LOCAL: |
142 return GetUpdatesCallerInfo::LOCAL; | 114 return GetUpdatesCallerInfo::LOCAL; |
143 case NUDGE_SOURCE_CONTINUATION: | 115 case NUDGE_SOURCE_CONTINUATION: |
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
145 case NUDGE_SOURCE_LOCAL_REFRESH: | 117 case NUDGE_SOURCE_LOCAL_REFRESH: |
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 118 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
147 case NUDGE_SOURCE_UNKNOWN: | 119 case NUDGE_SOURCE_UNKNOWN: |
148 return GetUpdatesCallerInfo::UNKNOWN; | 120 return GetUpdatesCallerInfo::UNKNOWN; |
149 default: | 121 default: |
150 NOTREACHED(); | 122 NOTREACHED(); |
151 return GetUpdatesCallerInfo::UNKNOWN; | 123 return GetUpdatesCallerInfo::UNKNOWN; |
152 } | 124 } |
153 } | 125 } |
154 | 126 |
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
156 : mode(mode), had_nudge(false), length(length) { } | |
157 | |
158 // Helper macros to log with the syncer thread name; useful when there | 127 // Helper macros to log with the syncer thread name; useful when there |
159 // are multiple syncer threads involved. | 128 // are multiple syncer threads involved. |
160 | 129 |
161 #define SLOG(severity) LOG(severity) << name_ << ": " | 130 #define SLOG(severity) LOG(severity) << name_ << ": " |
162 | 131 |
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
164 | 133 |
165 #define SDVLOG_LOC(from_here, verbose_level) \ | 134 #define SDVLOG_LOC(from_here, verbose_level) \ |
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 135 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
167 | 136 |
(...skipping 30 matching lines...) Expand all Loading... | |
198 syncer_short_poll_interval_seconds_( | 167 syncer_short_poll_interval_seconds_( |
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
200 syncer_long_poll_interval_seconds_( | 169 syncer_long_poll_interval_seconds_( |
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
202 sessions_commit_delay_( | 171 sessions_commit_delay_( |
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
204 mode_(NORMAL_MODE), | 173 mode_(NORMAL_MODE), |
205 // Start with assuming everything is fine with the connection. | 174 // Start with assuming everything is fine with the connection. |
206 // At the end of the sync cycle we would have the correct status. | 175 // At the end of the sync cycle we would have the correct status. |
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 176 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
177 pending_nudge_(NULL), | |
208 delay_provider_(delay_provider), | 178 delay_provider_(delay_provider), |
209 syncer_(syncer), | 179 syncer_(syncer), |
210 session_context_(context), | 180 session_context_(context), |
211 no_scheduling_allowed_(false) { | 181 no_scheduling_allowed_(false) { |
212 DCHECK(sync_loop_); | 182 DCHECK(sync_loop_); |
213 } | 183 } |
214 | 184 |
215 SyncSchedulerImpl::~SyncSchedulerImpl() { | 185 SyncSchedulerImpl::~SyncSchedulerImpl() { |
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 186 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
217 StopImpl(base::Closure()); | 187 StopImpl(base::Closure()); |
(...skipping 16 matching lines...) Expand all Loading... | |
234 void SyncSchedulerImpl::OnConnectionStatusChange() { | 204 void SyncSchedulerImpl::OnConnectionStatusChange() { |
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
236 // Optimistically assume that the connection is fixed and try | 206 // Optimistically assume that the connection is fixed and try |
237 // connecting. | 207 // connecting. |
238 OnServerConnectionErrorFixed(); | 208 OnServerConnectionErrorFixed(); |
239 } | 209 } |
240 } | 210 } |
241 | 211 |
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
214 // There could be a pending nudge or configuration job in several cases: | |
215 // | |
216 // 1. We're in exponential backoff. | |
217 // 2. We're silenced / throttled. | |
218 // 3. A nudge was saved previously due to not having a valid auth token. | |
219 // 4. A nudge was scheduled + saved while in configuration mode. | |
220 // | |
221 // In all cases except (2), we want to retry contacting the server. We | |
222 // call DoCanaryJob to achieve this, and note that nothing -- not even a | |
223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
224 // has the authority to do that is the Unthrottle timer. | |
225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
226 if (!pending.get()) | |
227 return; | |
228 | |
244 PostTask(FROM_HERE, "DoCanaryJob", | 229 PostTask(FROM_HERE, "DoCanaryJob", |
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 230 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
246 weak_ptr_factory_.GetWeakPtr())); | 231 weak_ptr_factory_.GetWeakPtr(), |
247 | 232 base::Passed(&pending))); |
248 } | 233 } |
249 | 234 |
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
251 HttpResponse::ServerConnectionCode code) { | 236 HttpResponse::ServerConnectionCode code) { |
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 237 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
253 SDVLOG(2) << "New server connection code: " | 238 SDVLOG(2) << "New server connection code: " |
254 << HttpResponse::GetServerConnectionCodeString(code); | 239 << HttpResponse::GetServerConnectionCodeString(code); |
255 | 240 |
256 connection_code_ = code; | 241 connection_code_ = code; |
257 } | 242 } |
(...skipping 15 matching lines...) Expand all Loading... | |
273 Mode old_mode = mode_; | 258 Mode old_mode = mode_; |
274 mode_ = mode; | 259 mode_ = mode; |
275 AdjustPolling(NULL); // Will kick start poll timer if needed. | 260 AdjustPolling(NULL); // Will kick start poll timer if needed. |
276 | 261 |
277 if (old_mode != mode_) { | 262 if (old_mode != mode_) { |
278 // We just changed our mode. See if there are any pending jobs that we could | 263 // We just changed our mode. See if there are any pending jobs that we could |
279 // execute in the new mode. | 264 // execute in the new mode. |
280 if (mode_ == NORMAL_MODE) { | 265 if (mode_ == NORMAL_MODE) { |
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
282 // has not yet completed. | 267 // has not yet completed. |
283 DCHECK(!wait_interval_.get() || | 268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
284 !wait_interval_->pending_configure_job.get()); | |
285 } | 269 } |
286 | 270 |
287 DoPendingJobIfPossible(false); | 271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
272 if (pending.get()) { | |
273 // TODO(tim): We should be able to remove this... | |
274 scoped_ptr<SyncSession> session(CreateSyncSession( | |
275 pending->session()->source())); | |
276 // Also the routing info might have been changed since we cached the | |
277 // pending nudge. Update it by coalescing to the latest. | |
278 pending->mutable_session()->Coalesce(*(session)); | |
279 SDVLOG(2) << "Executing pending job. Good luck!"; | |
280 DoSyncSessionJob(pending.Pass()); | |
281 } | |
288 } | 282 } |
289 } | 283 } |
290 | 284 |
291 void SyncSchedulerImpl::SendInitialSnapshot() { | 285 void SyncSchedulerImpl::SendInitialSnapshot() { |
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 286 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
294 SyncSourceInfo(), ModelSafeRoutingInfo(), | 288 SyncSourceInfo(), ModelSafeRoutingInfo(), |
295 std::vector<ModelSafeWorker*>())); | 289 std::vector<ModelSafeWorker*>())); |
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
297 event.snapshot = dummy->TakeSnapshot(); | 291 event.snapshot = dummy->TakeSnapshot(); |
(...skipping 23 matching lines...) Expand all Loading... | |
321 bool SyncSchedulerImpl::ScheduleConfiguration( | 315 bool SyncSchedulerImpl::ScheduleConfiguration( |
322 const ConfigurationParams& params) { | 316 const ConfigurationParams& params) { |
323 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 317 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
325 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 319 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
326 DCHECK(!params.ready_task.is_null()); | 320 DCHECK(!params.ready_task.is_null()); |
327 SDVLOG(2) << "Reconfiguring syncer."; | 321 SDVLOG(2) << "Reconfiguring syncer."; |
328 | 322 |
329 // Only one configuration is allowed at a time. Verify we're not waiting | 323 // Only one configuration is allowed at a time. Verify we're not waiting |
330 // for a pending configure job. | 324 // for a pending configure job. |
331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | 325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
332 | 326 |
333 ModelSafeRoutingInfo restricted_routes; | 327 ModelSafeRoutingInfo restricted_routes; |
334 BuildModelSafeParams(params.types_to_download, | 328 BuildModelSafeParams(params.types_to_download, |
335 params.routing_info, | 329 params.routing_info, |
336 &restricted_routes); | 330 &restricted_routes); |
337 session_context_->set_routing_info(params.routing_info); | 331 session_context_->set_routing_info(params.routing_info); |
338 | 332 |
339 // Only reconfigure if we have types to download. | 333 // Only reconfigure if we have types to download. |
340 if (!params.types_to_download.Empty()) { | 334 if (!params.types_to_download.Empty()) { |
341 DCHECK(!restricted_routes.empty()); | 335 DCHECK(!restricted_routes.empty()); |
342 linked_ptr<SyncSession> session(new SyncSession( | 336 scoped_ptr<SyncSession> session(new SyncSession( |
343 session_context_, | 337 session_context_, |
344 this, | 338 this, |
345 SyncSourceInfo(params.source, | 339 SyncSourceInfo(params.source, |
346 ModelSafeRoutingInfoToInvalidationMap( | 340 ModelSafeRoutingInfoToInvalidationMap( |
347 restricted_routes, | 341 restricted_routes, |
348 std::string())), | 342 std::string())), |
349 restricted_routes, | 343 restricted_routes, |
350 session_context_->workers())); | 344 session_context_->workers())); |
351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | 345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
352 TimeTicks::Now(), | 346 SyncSessionJob::CONFIGURATION, |
353 session, | 347 TimeTicks::Now(), |
354 false, | 348 session.Pass(), |
355 params, | 349 params, |
356 FROM_HERE); | 350 FROM_HERE)); |
357 DoSyncSessionJob(job); | 351 bool succeeded = DoSyncSessionJob(job.Pass()); |
358 | 352 |
359 // If we failed, the job would have been saved as the pending configure | 353 // If we failed, the job would have been saved as the pending configure |
360 // job and a wait interval would have been set. | 354 // job and a wait interval would have been set. |
361 if (!session->Succeeded()) { | 355 if (!succeeded) { |
362 DCHECK(wait_interval_.get() && | 356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
363 wait_interval_->pending_configure_job.get()); | |
364 return false; | 357 return false; |
365 } | 358 } |
366 } else { | 359 } else { |
367 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 360 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
368 params.ready_task.Run(); | 361 params.ready_task.Run(); |
369 } | 362 } |
370 | 363 |
371 return true; | 364 return true; |
372 } | 365 } |
373 | 366 |
374 SyncSchedulerImpl::JobProcessDecision | 367 SyncSchedulerImpl::JobProcessDecision |
375 SyncSchedulerImpl::DecideWhileInWaitInterval( | 368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) { |
akalin
2012/10/17 17:39:17
why does this need a pointer?
tim (not reviewing)
2012/10/17 20:33:32
Every callsite has a pointer, so it seems simpler
akalin
2012/10/17 22:55:45
I don't know...const references are the usual styl
| |
376 const SyncSessionJob& job) { | |
377 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 369 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
378 DCHECK(wait_interval_.get()); | 370 DCHECK(wait_interval_.get()); |
379 | 371 |
380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
381 << WaitInterval::GetModeString(wait_interval_->mode) | 373 << WaitInterval::GetModeString(wait_interval_->mode) |
382 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 374 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
383 << (job.is_canary_job ? " (canary)" : ""); | 375 << (job->is_canary() ? " (canary)" : ""); |
384 | 376 |
385 if (job.purpose == SyncSessionJob::POLL) | 377 if (job->purpose() == SyncSessionJob::POLL) |
386 return DROP; | 378 return DROP; |
387 | 379 |
388 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 380 // If we save a job while in a WaitInterval, there is a well-defined moment |
389 job.purpose == SyncSessionJob::CONFIGURATION); | 381 // in time in the future when it makes sense for that SAVE-worthy job to try |
382 // running again -- the end of the WaitInterval. | |
383 DCHECK(job->purpose() == SyncSessionJob::NUDGE || | |
384 job->purpose() == SyncSessionJob::CONFIGURATION); | |
385 | |
386 // If throttled, there's a clock ticking to unthrottle. We want to get | |
387 // on the same train. | |
390 if (wait_interval_->mode == WaitInterval::THROTTLED) | 388 if (wait_interval_->mode == WaitInterval::THROTTLED) |
391 return SAVE; | 389 return SAVE; |
392 | 390 |
393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
394 if (job.purpose == SyncSessionJob::NUDGE) { | 392 if (job->purpose() == SyncSessionJob::NUDGE) { |
395 if (mode_ == CONFIGURATION_MODE) | 393 if (mode_ == CONFIGURATION_MODE) |
396 return SAVE; | 394 return SAVE; |
397 | 395 |
398 // If we already had one nudge then just drop this nudge. We will retry | 396 // If we already had one nudge then just drop this nudge. We will retry |
399 // later when the timer runs out. | 397 // later when the timer runs out. |
400 if (!job.is_canary_job) | 398 if (!job->is_canary()) |
401 return wait_interval_->had_nudge ? DROP : CONTINUE; | 399 return wait_interval_->had_nudge ? DROP : CONTINUE; |
402 else // We are here because timer ran out. So retry. | 400 else // We are here because timer ran out. So retry. |
403 return CONTINUE; | 401 return CONTINUE; |
404 } | 402 } |
405 return job.is_canary_job ? CONTINUE : SAVE; | 403 return job->is_canary() ? CONTINUE : SAVE; |
406 } | 404 } |
407 | 405 |
408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
409 const SyncSessionJob& job) { | 407 const SyncSessionJob* job) { |
akalin
2012/10/17 17:39:17
why does this need a pointer?
tim (not reviewing)
2012/10/17 20:33:32
Every callsite has a pointer, so it seems simpler
akalin
2012/10/17 22:55:45
same.
| |
410 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 408 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
411 | 409 |
412 // See if our type is throttled. | 410 // See if our type is throttled. |
413 ModelTypeSet throttled_types = | 411 ModelTypeSet throttled_types = |
414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 412 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
415 if (job.purpose == SyncSessionJob::NUDGE && | 413 if (job->purpose() == SyncSessionJob::NUDGE && |
416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 414 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
417 ModelTypeSet requested_types; | 415 ModelTypeSet requested_types; |
418 for (ModelTypeInvalidationMap::const_iterator i = | 416 for (ModelTypeInvalidationMap::const_iterator i = |
419 job.session->source().types.begin(); | 417 job->session()->source().types.begin(); |
420 i != job.session->source().types.end(); | 418 i != job->session()->source().types.end(); |
421 ++i) { | 419 ++i) { |
422 requested_types.Put(i->first); | 420 requested_types.Put(i->first); |
423 } | 421 } |
424 | 422 |
423 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
424 // a per-datatype "unthrottle" event as something that should force a | |
425 // canary job. For this reason, there's no good time to reschedule this job | |
426 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
427 // Note that there may already be such an event if we're in a WaitInterval, | |
428 // so we can retry it then. | |
425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
426 return SAVE; | 430 return SAVE; |
427 } | 431 } |
428 | 432 |
429 if (wait_interval_.get()) | 433 if (wait_interval_.get()) |
430 return DecideWhileInWaitInterval(job); | 434 return DecideWhileInWaitInterval(job); |
431 | 435 |
432 if (mode_ == CONFIGURATION_MODE) { | 436 if (mode_ == CONFIGURATION_MODE) { |
433 if (job.purpose == SyncSessionJob::NUDGE) | 437 if (job->purpose() == SyncSessionJob::NUDGE) |
434 return SAVE; | 438 return SAVE; // Running requires a mode switch. |
435 else if (job.purpose == SyncSessionJob::CONFIGURATION) | 439 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
436 return CONTINUE; | 440 return CONTINUE; |
437 else | 441 else |
438 return DROP; | 442 return DROP; |
439 } | 443 } |
440 | 444 |
441 // We are in normal mode. | 445 // We are in normal mode. |
442 DCHECK_EQ(mode_, NORMAL_MODE); | 446 DCHECK_EQ(mode_, NORMAL_MODE); |
443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 447 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION); |
444 | 448 |
445 // Note about some subtle scheduling semantics. | 449 // Note about some subtle scheduling semantics. |
446 // | 450 // |
447 // It's possible at this point that |job| is known to be unnecessary, and | 451 // It's possible at this point that |job| is known to be unnecessary, and |
448 // dropping it would be perfectly safe and correct. Consider | 452 // dropping it would be perfectly safe and correct. Consider |
449 // | 453 // |
450 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 454 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
451 // the time that the last successful all-datatype NUDGE completed. | 455 // the time that the last successful all-datatype NUDGE completed. |
452 // | 456 // |
453 // 2) |job| is a NUDGE (for any combination of types) with a | 457 // 2) |job| is a NUDGE (for any combination of types) with a |
(...skipping 22 matching lines...) Expand all Loading... | |
476 // * It's not strictly "impossible", but it would be reentrant and hence | 480 // * It's not strictly "impossible", but it would be reentrant and hence |
477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
478 // legal side effect of any of the work being done as part of a sync cycle. | 482 // legal side effect of any of the work being done as part of a sync cycle. |
479 // See |no_scheduling_allowed_| for details. | 483 // See |no_scheduling_allowed_| for details. |
480 | 484 |
481 // Decision now rests on state of auth tokens. | 485 // Decision now rests on state of auth tokens. |
482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 486 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
483 return CONTINUE; | 487 return CONTINUE; |
484 | 488 |
485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 489 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 490 // Running the job would require updated auth, so we can't honour |
491 // job.scheduled_start(). | |
492 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
487 } | 493 } |
488 | 494 |
489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 495 bool SyncSchedulerImpl::ShouldRunJobSaveIfNecessary(SyncSessionJob* job) { |
akalin
2012/10/17 17:39:17
the name for this is unclear -- "ShouldRunJob" imp
tim (not reviewing)
2012/10/17 20:33:32
SaveIfNecessary works, although there's also the c
akalin
2012/10/17 22:55:45
I see. See my comment below.
| |
490 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | |
492 if (pending_nudge_.get() == NULL) { | |
493 SDVLOG(2) << "Creating a pending nudge job"; | |
494 SyncSession* s = job.session.get(); | |
495 | |
496 // Get a fresh session with similar configuration as before (resets | |
497 // StatusController). | |
498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | |
499 s->delegate(), s->source(), s->routing_info(), s->workers())); | |
500 | |
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | |
502 make_linked_ptr(session.release()), false, | |
503 ConfigurationParams(), job.from_here); | |
504 pending_nudge_.reset(new SyncSessionJob(new_job)); | |
505 return; | |
506 } | |
507 | |
508 SDVLOG(2) << "Coalescing a pending nudge"; | |
509 pending_nudge_->session->Coalesce(*(job.session.get())); | |
510 pending_nudge_->scheduled_start = job.scheduled_start; | |
511 | |
512 // Unfortunately the nudge location cannot be modified. So it stores the | |
513 // location of the first caller. | |
514 } | |
515 | |
516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { | |
517 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 496 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
518 DCHECK(started_); | 497 DCHECK(started_); |
519 | 498 |
520 JobProcessDecision decision = DecideOnJob(job); | 499 JobProcessDecision decision = DecideOnJob(job); |
521 SDVLOG(2) << "Should run " | 500 SDVLOG(2) << "Should run " |
522 << SyncSessionJob::GetPurposeString(job.purpose) | 501 << SyncSessionJob::GetPurposeString(job->purpose()) |
523 << " job in mode " << GetModeString(mode_) | 502 << " job " << job->session() |
503 << " in mode " << GetModeString(mode_) | |
524 << ": " << GetDecisionString(decision); | 504 << ": " << GetDecisionString(decision); |
525 if (decision != SAVE) | 505 if (decision == DROP) |
526 return decision == CONTINUE; | 506 return false; |
507 if (decision == CONTINUE) | |
508 return true; | |
509 DCHECK_EQ(decision, SAVE); | |
527 | 510 |
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 511 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
529 SyncSessionJob::CONFIGURATION); | 512 if (is_nudge && pending_nudge_) { |
513 SDVLOG(2) << "Coalescing a pending nudge"; | |
514 // TODO(tim): This basically means we never use the more-careful coalescing | |
rlarocque
2012/10/08 19:18:05
After some more thought, I'm not sure this ever ma
| |
515 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
516 // times, because we're calling this function first. Pull this out | |
517 // into a function to coalesce + set start times and reuse. | |
518 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | |
519 return false; | |
520 } | |
530 | 521 |
531 SaveJob(job); | 522 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); |
523 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
524 // This job should be made the new canary. | |
525 if (is_nudge) { | |
526 pending_nudge_ = job_to_save.get(); | |
527 } else { | |
528 SDVLOG(2) << "Saving a configuration job"; | |
529 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
530 DCHECK(!wait_interval_->pending_configure_job); | |
531 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
532 DCHECK(!job->config_params().ready_task.is_null()); | |
533 // The only nudge that could exist is a scheduled canary nudge. | |
534 DCHECK(!unscheduled_nudge_storage_.get()); | |
535 if (pending_nudge_) { | |
536 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
537 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); | |
538 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
539 } | |
540 wait_interval_->pending_configure_job = job_to_save.get(); | |
541 } | |
542 TimeDelta length = | |
543 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
544 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
545 TimeDelta::FromSeconds(0) : length; | |
546 RestartWaiting(job_to_save.Pass()); | |
547 return false; | |
548 } | |
549 | |
550 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
551 // when we're not in a WaitInterval. See bug 147736. | |
552 DCHECK(is_nudge); | |
553 // There may or may not be a pending_configure_job. Either way this nudge | |
554 // is unschedulable. | |
555 pending_nudge_ = job_to_save.get(); | |
556 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
532 return false; | 557 return false; |
533 } | 558 } |
534 | 559 |
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { | |
536 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
537 if (job.purpose == SyncSessionJob::NUDGE) { | |
538 SDVLOG(2) << "Saving a nudge job"; | |
539 InitOrCoalescePendingJob(job); | |
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | |
541 SDVLOG(2) << "Saving a configuration job"; | |
542 DCHECK(wait_interval_.get()); | |
543 DCHECK(mode_ == CONFIGURATION_MODE); | |
544 | |
545 // Config params should always get set. | |
546 DCHECK(!job.config_params.ready_task.is_null()); | |
547 SyncSession* old = job.session.get(); | |
548 SyncSession* s(new SyncSession(session_context_, this, old->source(), | |
549 old->routing_info(), old->workers())); | |
550 SyncSessionJob new_job(job.purpose, | |
551 TimeTicks::Now(), | |
552 make_linked_ptr(s), | |
553 false, | |
554 job.config_params, | |
555 job.from_here); | |
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | |
557 } // drop the rest. | |
558 // TODO(sync): Is it okay to drop the rest? It's weird that | |
559 // SaveJob() only does what it says sometimes. (See | |
560 // http://crbug.com/90868.) | |
561 } | |
562 | |
563 // Functor for std::find_if to search by ModelSafeGroup. | 560 // Functor for std::find_if to search by ModelSafeGroup. |
564 struct ModelSafeWorkerGroupIs { | 561 struct ModelSafeWorkerGroupIs { |
565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 562 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
566 bool operator()(ModelSafeWorker* w) { | 563 bool operator()(ModelSafeWorker* w) { |
567 return group == w->GetModelSafeGroup(); | 564 return group == w->GetModelSafeGroup(); |
568 } | 565 } |
569 ModelSafeGroup group; | 566 ModelSafeGroup group; |
570 }; | 567 }; |
571 | 568 |
572 void SyncSchedulerImpl::ScheduleNudgeAsync( | 569 void SyncSchedulerImpl::ScheduleNudgeAsync( |
573 const TimeDelta& delay, | 570 const TimeDelta& desired_delay, |
574 NudgeSource source, ModelTypeSet types, | 571 NudgeSource source, ModelTypeSet types, |
575 const tracked_objects::Location& nudge_location) { | 572 const tracked_objects::Location& nudge_location) { |
576 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 573 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
577 SDVLOG_LOC(nudge_location, 2) | 574 SDVLOG_LOC(nudge_location, 2) |
578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 575 << "Nudge scheduled with delay " |
576 << desired_delay.InMilliseconds() << " ms, " | |
579 << "source " << GetNudgeSourceString(source) << ", " | 577 << "source " << GetNudgeSourceString(source) << ", " |
580 << "types " << ModelTypeSetToString(types); | 578 << "types " << ModelTypeSetToString(types); |
581 | 579 |
582 ModelTypeInvalidationMap invalidation_map = | 580 ModelTypeInvalidationMap invalidation_map = |
583 ModelTypeSetToInvalidationMap(types, std::string()); | 581 ModelTypeSetToInvalidationMap(types, std::string()); |
584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 582 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
585 GetUpdatesFromNudgeSource(source), | 583 GetUpdatesFromNudgeSource(source), |
586 invalidation_map, | 584 invalidation_map, |
587 false, | |
588 nudge_location); | 585 nudge_location); |
589 } | 586 } |
590 | 587 |
591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 588 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
592 const TimeDelta& delay, | 589 const TimeDelta& desired_delay, |
593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, | 590 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, |
594 const tracked_objects::Location& nudge_location) { | 591 const tracked_objects::Location& nudge_location) { |
595 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 592 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
596 SDVLOG_LOC(nudge_location, 2) | 593 SDVLOG_LOC(nudge_location, 2) |
597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 594 << "Nudge scheduled with delay " |
595 << desired_delay.InMilliseconds() << " ms, " | |
598 << "source " << GetNudgeSourceString(source) << ", " | 596 << "source " << GetNudgeSourceString(source) << ", " |
599 << "payloads " | 597 << "payloads " |
600 << ModelTypeInvalidationMapToString(invalidation_map); | 598 << ModelTypeInvalidationMapToString(invalidation_map); |
601 | 599 |
602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 600 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
603 GetUpdatesFromNudgeSource(source), | 601 GetUpdatesFromNudgeSource(source), |
604 invalidation_map, | 602 invalidation_map, |
605 false, | |
606 nudge_location); | 603 nudge_location); |
607 } | 604 } |
608 | 605 |
609 void SyncSchedulerImpl::ScheduleNudgeImpl( | 606 void SyncSchedulerImpl::ScheduleNudgeImpl( |
610 const TimeDelta& delay, | 607 const TimeDelta& delay, |
611 GetUpdatesCallerInfo::GetUpdatesSource source, | 608 GetUpdatesCallerInfo::GetUpdatesSource source, |
612 const ModelTypeInvalidationMap& invalidation_map, | 609 const ModelTypeInvalidationMap& invalidation_map, |
613 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 610 const tracked_objects::Location& nudge_location) { |
614 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 611 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; | 612 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; |
616 | 613 |
617 SDVLOG_LOC(nudge_location, 2) | 614 SDVLOG_LOC(nudge_location, 2) |
618 << "In ScheduleNudgeImpl with delay " | 615 << "In ScheduleNudgeImpl with delay " |
619 << delay.InMilliseconds() << " ms, " | 616 << delay.InMilliseconds() << " ms, " |
620 << "source " << GetUpdatesSourceString(source) << ", " | 617 << "source " << GetUpdatesSourceString(source) << ", " |
621 << "payloads " | 618 << "payloads " |
622 << ModelTypeInvalidationMapToString(invalidation_map) | 619 << ModelTypeInvalidationMapToString(invalidation_map); |
623 << (is_canary_job ? " (canary)" : ""); | |
624 | 620 |
625 SyncSourceInfo info(source, invalidation_map); | 621 SyncSourceInfo info(source, invalidation_map); |
626 UpdateNudgeTimeRecords(info); | 622 UpdateNudgeTimeRecords(info); |
627 | 623 |
628 SyncSession* session(CreateSyncSession(info)); | 624 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 625 SyncSessionJob::NUDGE, |
630 make_linked_ptr(session), is_canary_job, | 626 TimeTicks::Now() + delay, |
631 ConfigurationParams(), nudge_location); | 627 CreateSyncSession(info).Pass(), |
628 ConfigurationParams(), | |
629 nudge_location)); | |
632 | 630 |
633 session = NULL; | 631 if (!ShouldRunJobSaveIfNecessary(job.get())) |
akalin
2012/10/17 17:39:17
Naively, I'd expect to be able to split this call
tim (not reviewing)
2012/10/17 20:33:32
Yes. !ShouldRunJob doesn't mean you can forget ab
akalin
2012/10/17 22:55:45
I see. See comment below...
| |
634 if (!ShouldRunJob(job)) | |
635 return; | 632 return; |
636 | 633 |
637 if (pending_nudge_.get()) { | 634 if (pending_nudge_) { |
638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | |
639 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | |
640 return; | |
641 } | |
642 | |
643 SDVLOG(2) << "Coalescing pending nudge"; | |
644 pending_nudge_->session->Coalesce(*(job.session.get())); | |
645 | |
646 SDVLOG(2) << "Rescheduling pending nudge"; | 635 SDVLOG(2) << "Rescheduling pending nudge"; |
647 SyncSession* s = pending_nudge_->session.get(); | 636 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
648 job.session.reset(new SyncSession(s->context(), s->delegate(), | 637 // Choose the start time as the earliest of the 2. Note that this means |
649 s->source(), s->routing_info(), s->workers())); | 638 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
650 | 639 // but a nudge is already scheduled to go out, we'll send the (tab) commit |
651 // Choose the start time as the earliest of the 2. | 640 // without waiting. |
652 job.scheduled_start = std::min(job.scheduled_start, | 641 pending_nudge_->set_scheduled_start( |
653 pending_nudge_->scheduled_start); | 642 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
654 pending_nudge_.reset(); | 643 // Abandon the old task by cloning and replacing the session. |
644 // It's possible that by "rescheduling" we're actually taking a job that | |
645 // was previously unscheduled and giving it wings, so take care to reset | |
646 // unscheduled nudge storage. | |
647 job = pending_nudge_->CloneAndAbandon(); | |
648 unscheduled_nudge_storage_.reset(); | |
649 pending_nudge_ = NULL; | |
655 } | 650 } |
656 | 651 |
657 // TODO(zea): Consider adding separate throttling/backoff for datatype | 652 // TODO(zea): Consider adding separate throttling/backoff for datatype |
658 // refresh requests. | 653 // refresh requests. |
659 ScheduleSyncSessionJob(job); | 654 ScheduleSyncSessionJob(job.Pass()); |
akalin
2012/10/17 17:39:17
Isn't the job abandoned at this point? Do we stil
tim (not reviewing)
2012/10/17 20:33:32
Err, did you mean to put this comment elsewhere? T
akalin
2012/10/17 22:55:45
Right, I was misunderstanding the flow.
So it loo
| |
660 } | 655 } |
661 | 656 |
662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 657 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
663 switch (mode) { | 658 switch (mode) { |
664 ENUM_CASE(CONFIGURATION_MODE); | 659 ENUM_CASE(CONFIGURATION_MODE); |
665 ENUM_CASE(NORMAL_MODE); | 660 ENUM_CASE(NORMAL_MODE); |
666 } | 661 } |
667 return ""; | 662 return ""; |
668 } | 663 } |
669 | 664 |
670 const char* SyncSchedulerImpl::GetDecisionString( | 665 const char* SyncSchedulerImpl::GetDecisionString( |
671 SyncSchedulerImpl::JobProcessDecision mode) { | 666 SyncSchedulerImpl::JobProcessDecision mode) { |
672 switch (mode) { | 667 switch (mode) { |
673 ENUM_CASE(CONTINUE); | 668 ENUM_CASE(CONTINUE); |
674 ENUM_CASE(SAVE); | 669 ENUM_CASE(SAVE); |
675 ENUM_CASE(DROP); | 670 ENUM_CASE(DROP); |
676 } | 671 } |
677 return ""; | 672 return ""; |
678 } | 673 } |
679 | 674 |
680 // static | |
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose( | |
682 SyncSessionJob::SyncSessionJobPurpose purpose, | |
683 SyncerStep* start, | |
684 SyncerStep* end) { | |
685 switch (purpose) { | |
686 case SyncSessionJob::CONFIGURATION: | |
687 *start = DOWNLOAD_UPDATES; | |
688 *end = APPLY_UPDATES; | |
689 return; | |
690 case SyncSessionJob::NUDGE: | |
691 case SyncSessionJob::POLL: | |
692 *start = SYNCER_BEGIN; | |
693 *end = SYNCER_END; | |
694 return; | |
695 default: | |
696 NOTREACHED(); | |
697 *start = SYNCER_END; | |
698 *end = SYNCER_END; | |
699 return; | |
700 } | |
701 } | |
702 | |
703 void SyncSchedulerImpl::PostTask( | 675 void SyncSchedulerImpl::PostTask( |
704 const tracked_objects::Location& from_here, | 676 const tracked_objects::Location& from_here, |
705 const char* name, const base::Closure& task) { | 677 const char* name, const base::Closure& task) { |
706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 678 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
707 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 679 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
708 if (!started_) { | 680 if (!started_) { |
709 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 681 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
710 return; | 682 return; |
711 } | 683 } |
712 sync_loop_->PostTask(from_here, task); | 684 sync_loop_->PostTask(from_here, task); |
713 } | 685 } |
714 | 686 |
715 void SyncSchedulerImpl::PostDelayedTask( | 687 void SyncSchedulerImpl::PostDelayedTask( |
716 const tracked_objects::Location& from_here, | 688 const tracked_objects::Location& from_here, |
717 const char* name, const base::Closure& task, base::TimeDelta delay) { | 689 const char* name, const base::Closure& task, base::TimeDelta delay) { |
718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 690 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
719 << delay.InMilliseconds() << " ms delay"; | 691 << delay.InMilliseconds() << " ms delay"; |
720 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 692 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
721 if (!started_) { | 693 if (!started_) { |
722 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 694 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
723 return; | 695 return; |
724 } | 696 } |
725 sync_loop_->PostDelayedTask(from_here, task, delay); | 697 sync_loop_->PostDelayedTask(from_here, task, delay); |
726 } | 698 } |
727 | 699 |
728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { | 700 void SyncSchedulerImpl::ScheduleSyncSessionJob( |
701 scoped_ptr<SyncSessionJob> job) { | |
729 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 702 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
730 if (no_scheduling_allowed_) { | 703 if (no_scheduling_allowed_) { |
731 NOTREACHED() << "Illegal to schedule job while session in progress."; | 704 NOTREACHED() << "Illegal to schedule job while session in progress."; |
732 return; | 705 return; |
733 } | 706 } |
734 | 707 |
735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); | 708 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); |
709 tracked_objects::Location loc(job->from_location()); | |
736 if (delay < TimeDelta::FromMilliseconds(0)) | 710 if (delay < TimeDelta::FromMilliseconds(0)) |
737 delay = TimeDelta::FromMilliseconds(0); | 711 delay = TimeDelta::FromMilliseconds(0); |
738 SDVLOG_LOC(job.from_here, 2) | 712 SDVLOG_LOC(loc, 2) |
739 << "In ScheduleSyncSessionJob with " | 713 << "In ScheduleSyncSessionJob with " |
740 << SyncSessionJob::GetPurposeString(job.purpose) | 714 << SyncSessionJob::GetPurposeString(job->purpose()) |
741 << " job and " << delay.InMilliseconds() << " ms delay"; | 715 << " job and " << delay.InMilliseconds() << " ms delay"; |
742 | 716 |
743 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 717 DCHECK(job->purpose() == SyncSessionJob::NUDGE || |
744 job.purpose == SyncSessionJob::POLL); | 718 job->purpose() == SyncSessionJob::POLL); |
745 if (job.purpose == SyncSessionJob::NUDGE) { | 719 if (job->purpose() == SyncSessionJob::NUDGE) { |
746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; | 720 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to "; |
747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == | 721 DCHECK(!pending_nudge_ || pending_nudge_->session() == |
748 job.session); | 722 job->session()); |
749 pending_nudge_.reset(new SyncSessionJob(job)); | 723 pending_nudge_ = job.get(); |
750 } | 724 } |
751 PostDelayedTask(job.from_here, "DoSyncSessionJob", | 725 |
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, | 726 PostDelayedTask(loc, "DoSyncSessionJob", |
753 weak_ptr_factory_.GetWeakPtr(), | 727 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
754 job), | 728 weak_ptr_factory_.GetWeakPtr(), |
755 delay); | 729 base::Passed(&job)), |
730 delay); | |
756 } | 731 } |
757 | 732 |
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { | 733 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
759 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 734 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
735 if (job->purpose() == SyncSessionJob::NUDGE) { | |
736 if (pending_nudge_ == NULL || | |
737 pending_nudge_->session() != job->session()) { | |
738 // |job| is abandoned. | |
739 SDVLOG(2) << "Dropping a nudge in " | |
740 << "DoSyncSessionJob because another nudge was scheduled"; | |
741 return false; | |
742 } | |
743 pending_nudge_ = NULL; | |
744 | |
745 // Rebase the session with the latest model safe table and use it to purge | |
746 // and update any disabled or modified entries in the job. | |
747 job->mutable_session()->RebaseRoutingInfoWithLatest( | |
748 session_context_->routing_info(), session_context_->workers()); | |
749 } | |
760 | 750 |
761 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 751 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
762 if (!ShouldRunJob(job)) { | 752 GetUpdatesCallerInfo::GetUpdatesSource source( |
753 job->session()->source().updates_source); | |
754 if (!ShouldRunJobSaveIfNecessary(job.get())) { | |
763 SLOG(WARNING) | 755 SLOG(WARNING) |
764 << "Not executing " | 756 << "Not executing " |
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " | 757 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from " |
766 << GetUpdatesSourceString(job.session->source().updates_source); | 758 << GetUpdatesSourceString(source); |
767 return; | 759 return false; |
768 } | 760 } |
769 | 761 |
770 if (job.purpose == SyncSessionJob::NUDGE) { | |
771 if (pending_nudge_.get() == NULL || | |
772 pending_nudge_->session != job.session) { | |
773 SDVLOG(2) << "Dropping a nudge in " | |
774 << "DoSyncSessionJob because another nudge was scheduled"; | |
775 return; // Another nudge must have been scheduled in in the meantime. | |
776 } | |
777 pending_nudge_.reset(); | |
778 | |
779 // Create the session with the latest model safe table and use it to purge | |
780 // and update any disabled or modified entries in the job. | |
781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | |
782 | |
783 job.session->RebaseRoutingInfoWithLatest(*session); | |
784 } | |
785 SDVLOG(2) << "DoSyncSessionJob with " | 762 SDVLOG(2) << "DoSyncSessionJob with " |
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; | 763 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; |
787 | |
788 SyncerStep begin(SYNCER_END); | |
789 SyncerStep end(SYNCER_END); | |
790 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | |
791 | 764 |
792 bool has_more_to_sync = true; | 765 bool has_more_to_sync = true; |
793 while (ShouldRunJob(job) && has_more_to_sync) { | 766 bool premature_exit = false; |
767 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) { | |
794 SDVLOG(2) << "Calling SyncShare."; | 768 SDVLOG(2) << "Calling SyncShare."; |
795 // Synchronously perform the sync session from this thread. | 769 // Synchronously perform the sync session from this thread. |
796 syncer_->SyncShare(job.session.get(), begin, end); | 770 premature_exit = !syncer_->SyncShare(job->mutable_session(), |
797 has_more_to_sync = job.session->HasMoreToSync(); | 771 job->start_step(), |
772 job->end_step()); | |
773 | |
774 has_more_to_sync = job->session()->HasMoreToSync(); | |
798 if (has_more_to_sync) | 775 if (has_more_to_sync) |
799 job.session->PrepareForAnotherSyncCycle(); | 776 job->mutable_session()->PrepareForAnotherSyncCycle(); |
800 } | 777 } |
801 SDVLOG(2) << "Done SyncShare looping."; | 778 SDVLOG(2) << "Done SyncShare looping."; |
802 | 779 |
803 FinishSyncSessionJob(job); | 780 return FinishSyncSessionJob(job.Pass(), premature_exit); |
804 } | 781 } |
805 | 782 |
806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 783 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
807 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 784 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
808 | 785 |
809 // We are interested in recording time between local nudges for datatypes. | 786 // We are interested in recording time between local nudges for datatypes. |
810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 787 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 788 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
812 return; | 789 return; |
813 | 790 |
814 base::TimeTicks now = TimeTicks::Now(); | 791 base::TimeTicks now = TimeTicks::Now(); |
815 // Update timing information for how often datatypes are triggering nudges. | 792 // Update timing information for how often datatypes are triggering nudges. |
816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); | 793 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); |
817 iter != info.types.end(); | 794 iter != info.types.end(); |
818 ++iter) { | 795 ++iter) { |
819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 796 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
820 last_local_nudges_by_model_type_[iter->first] = now; | 797 last_local_nudges_by_model_type_[iter->first] = now; |
821 if (previous.is_null()) | 798 if (previous.is_null()) |
822 continue; | 799 continue; |
823 | 800 |
824 #define PER_DATA_TYPE_MACRO(type_str) \ | 801 #define PER_DATA_TYPE_MACRO(type_str) \ |
825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 802 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 803 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
827 #undef PER_DATA_TYPE_MACRO | 804 #undef PER_DATA_TYPE_MACRO |
828 } | 805 } |
829 } | 806 } |
830 | 807 |
831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { | 808 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
809 bool exited_prematurely) { | |
832 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 810 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
833 // Now update the status of the connection from SCM. We need this to decide | 811 // Now update the status of the connection from SCM. We need this to decide |
834 // whether we need to save/run future jobs. The notifications from SCM are not | 812 // whether we need to save/run future jobs. The notifications from SCM are |
835 // reliable. | 813 // not reliable. |
836 // | 814 // |
837 // TODO(rlarocque): crbug.com/110954 | 815 // TODO(rlarocque): crbug.com/110954 |
838 // We should get rid of the notifications and it is probably not needed to | 816 // We should get rid of the notifications and it is probably not needed to |
839 // maintain this status variable in 2 places. We should query it directly from | 817 // maintain this status variable in 2 places. We should query it directly |
840 // SCM when needed. | 818 // from SCM when needed. |
841 ServerConnectionManager* scm = session_context_->connection_manager(); | 819 ServerConnectionManager* scm = session_context_->connection_manager(); |
842 UpdateServerConnectionManagerStatus(scm->server_status()); | 820 UpdateServerConnectionManagerStatus(scm->server_status()); |
843 | 821 |
844 if (IsSyncingCurrentlySilenced()) { | 822 // Let job know that we're through syncing (calling SyncShare) at this point. |
845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 823 bool succeeded = false; |
846 // TODO(sync): Investigate whether we need to check job.purpose | 824 { |
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | |
848 SaveJob(job); | |
849 return; // Nothing to do. | |
850 } else if (job.session->Succeeded() && | |
851 !job.config_params.ready_task.is_null()) { | |
852 // If this was a configuration job with a ready task, invoke it now that | |
853 // we finished successfully. | |
854 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 825 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
855 job.config_params.ready_task.Run(); | 826 succeeded = job->Finish(exited_prematurely); |
856 } | 827 } |
857 | 828 |
858 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 829 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
859 ScheduleNextSync(job); | 830 ScheduleNextSync(job.Pass(), succeeded); |
831 return succeeded; | |
860 } | 832 } |
861 | 833 |
862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { | 834 void SyncSchedulerImpl::ScheduleNextSync( |
835 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) { | |
863 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 836 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
864 DCHECK(!old_job.session->HasMoreToSync()); | 837 DCHECK(!finished_job->session()->HasMoreToSync()); |
865 | 838 |
866 AdjustPolling(&old_job); | 839 AdjustPolling(finished_job.get()); |
867 | 840 |
868 if (old_job.session->Succeeded()) { | 841 if (succeeded) { |
869 // Only reset backoff if we actually reached the server. | 842 // Only reset backoff if we actually reached the server. |
870 if (old_job.session->SuccessfullyReachedServer()) | 843 // It's possible that we reached the server on one attempt, then had an |
844 // error on the next (or didn't perform some of the server-communicating | |
845 // commands). We want to verify that, for all commands attempted, we | |
846 // successfully spoke with the server. Therefore, we verify no errors | |
847 // and at least one SYNCER_OK. | |
848 if (finished_job->session()->DidReachServer()) | |
871 wait_interval_.reset(); | 849 wait_interval_.reset(); |
872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 850 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
873 return; | 851 return; |
874 } | 852 } |
875 | 853 |
876 if (old_job.purpose == SyncSessionJob::POLL) { | 854 if (IsSyncingCurrentlySilenced()) { |
855 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | |
856 // If we're here, it's because |job| was silenced until a server specified | |
857 // time. (Note, it had to be |job|, because DecideOnJob would not permit | |
858 // any job through while in WaitInterval::THROTTLED). | |
859 scoped_ptr<SyncSessionJob> clone = finished_job->Clone(); | |
860 if (clone->purpose() == SyncSessionJob::NUDGE) | |
861 pending_nudge_ = clone.get(); | |
862 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) | |
863 wait_interval_->pending_configure_job = clone.get(); | |
864 else | |
865 clone.reset(); // Unthrottling is enough, no need to force a canary. | |
866 | |
867 RestartWaiting(clone.Pass()); | |
868 return; | |
869 } | |
870 | |
871 if (finished_job->purpose() == SyncSessionJob::POLL) { | |
877 return; // We don't retry POLL jobs. | 872 return; // We don't retry POLL jobs. |
878 } | 873 } |
879 | 874 |
880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 875 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
881 // if we don't succeed. Some types of errors are not likely to disappear on | 876 // if we don't succeed. Some types of errors are not likely to disappear on |
882 // their own. With the return values now available in the old_job.session, we | 877 // their own. With the return values now available in the old_job.session, |
883 // should be able to detect such errors and only retry when we detect | 878 // we should be able to detect such errors and only retry when we detect |
884 // transient errors. | 879 // transient errors. |
885 | 880 |
886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 881 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
887 mode_ == NORMAL_MODE) { | 882 mode_ == NORMAL_MODE) { |
888 // When in normal mode, we allow up to one nudge per backoff interval. It | 883 // When in normal mode, we allow up to one nudge per backoff interval. It |
889 // appears that this was our nudge for this interval, and it failed. | 884 // appears that this was our nudge for this interval, and it failed. |
890 // | 885 // |
891 // Note: This does not prevent us from running canary jobs. For example, an | 886 // Note: This does not prevent us from running canary jobs. For example, |
892 // IP address change might still result in another nudge being executed | 887 // an IP address change might still result in another nudge being executed |
893 // during this backoff interval. | 888 // during this backoff interval. |
894 SDVLOG(2) << "A nudge during backoff failed"; | 889 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
895 | 890 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); |
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
897 DCHECK(!wait_interval_->had_nudge); | 891 DCHECK(!wait_interval_->had_nudge); |
898 | 892 |
899 wait_interval_->had_nudge = true; | 893 wait_interval_->had_nudge = true; |
900 InitOrCoalescePendingJob(old_job); | 894 DCHECK(!pending_nudge_); |
901 RestartWaiting(); | 895 |
896 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); | |
897 pending_nudge_ = new_job.get(); | |
898 RestartWaiting(new_job.Pass()); | |
902 } else { | 899 } else { |
903 // Either this is the first failure or a consecutive failure after our | 900 // Either this is the first failure or a consecutive failure after our |
904 // backoff timer expired. We handle it the same way in either case. | 901 // backoff timer expired. We handle it the same way in either case. |
905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 902 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
906 HandleContinuationError(old_job); | 903 HandleContinuationError(finished_job.get()); |
907 } | 904 } |
908 } | 905 } |
909 | 906 |
910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 907 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
911 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 908 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
912 | 909 |
913 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 910 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
914 syncer_short_poll_interval_seconds_ : | 911 syncer_short_poll_interval_seconds_ : |
915 syncer_long_poll_interval_seconds_; | 912 syncer_long_poll_interval_seconds_; |
916 bool rate_changed = !poll_timer_.IsRunning() || | 913 bool rate_changed = !poll_timer_.IsRunning() || |
917 poll != poll_timer_.GetCurrentDelay(); | 914 poll != poll_timer_.GetCurrentDelay(); |
918 | 915 |
919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | 916 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
920 poll_timer_.Reset(); | 917 poll_timer_.Reset(); |
921 | 918 |
922 if (!rate_changed) | 919 if (!rate_changed) |
923 return; | 920 return; |
924 | 921 |
925 // Adjust poll rate. | 922 // Adjust poll rate. |
926 poll_timer_.Stop(); | 923 poll_timer_.Stop(); |
927 poll_timer_.Start(FROM_HERE, poll, this, | 924 poll_timer_.Start(FROM_HERE, poll, this, |
928 &SyncSchedulerImpl::PollTimerCallback); | 925 &SyncSchedulerImpl::PollTimerCallback); |
929 } | 926 } |
930 | 927 |
931 void SyncSchedulerImpl::RestartWaiting() { | 928 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
932 CHECK(wait_interval_.get()); | 929 CHECK(wait_interval_.get()); |
933 wait_interval_->timer.Stop(); | 930 wait_interval_->timer.Stop(); |
934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 931 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
935 this, &SyncSchedulerImpl::DoCanaryJob); | 932 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
933 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
934 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
935 weak_ptr_factory_.GetWeakPtr(), | |
936 base::Passed(&job))); | |
937 } else { | |
938 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
939 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | |
940 weak_ptr_factory_.GetWeakPtr(), | |
941 base::Passed(&job))); | |
942 } | |
936 } | 943 } |
937 | 944 |
938 void SyncSchedulerImpl::HandleContinuationError( | 945 void SyncSchedulerImpl::HandleContinuationError( |
939 const SyncSessionJob& old_job) { | 946 const SyncSessionJob* old_job) { |
940 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 947 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
941 if (DCHECK_IS_ON()) { | 948 if (DCHECK_IS_ON()) { |
942 if (IsBackingOff()) { | 949 if (IsBackingOff()) { |
943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 950 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); |
944 } | 951 } |
945 } | 952 } |
946 | 953 |
947 TimeDelta length = delay_provider_->GetDelay( | 954 TimeDelta length = delay_provider_->GetDelay( |
948 IsBackingOff() ? wait_interval_->length : | 955 IsBackingOff() ? wait_interval_->length : |
949 delay_provider_->GetInitialDelay( | 956 delay_provider_->GetInitialDelay( |
950 old_job.session->status_controller().model_neutral_state())); | 957 old_job->session()->status_controller().model_neutral_state())); |
951 | 958 |
952 SDVLOG(2) << "In handle continuation error with " | 959 SDVLOG(2) << "In handle continuation error with " |
953 << SyncSessionJob::GetPurposeString(old_job.purpose) | 960 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
954 << " job. The time delta(ms) is " | 961 << " job. The time delta(ms) is " |
955 << length.InMilliseconds(); | 962 << length.InMilliseconds(); |
956 | 963 |
957 // This will reset the had_nudge variable as well. | 964 // This will reset the had_nudge variable as well. |
958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 965 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
959 length)); | 966 length)); |
960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 967 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); |
968 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
969 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 970 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
962 // Config params should always get set. | 971 // Config params should always get set. |
963 DCHECK(!old_job.config_params.ready_task.is_null()); | 972 DCHECK(!old_job->config_params().ready_task.is_null()); |
964 SyncSession* old = old_job.session.get(); | 973 wait_interval_->pending_configure_job = new_job.get(); |
965 SyncSession* s(new SyncSession(session_context_, this, | |
966 old->source(), old->routing_info(), old->workers())); | |
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | |
968 make_linked_ptr(s), false, old_job.config_params, | |
969 FROM_HERE); | |
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | |
971 } else { | 974 } else { |
972 // We are not in configuration mode. So wait_interval's pending job | 975 // We are not in configuration mode. So wait_interval's pending job |
973 // should be null. | 976 // should be null. |
974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 977 DCHECK(wait_interval_->pending_configure_job == NULL); |
978 DCHECK(!pending_nudge_); | |
979 pending_nudge_ = new_job.get(); | |
980 } | |
975 | 981 |
976 // TODO(lipalani) - handle clear user data. | 982 RestartWaiting(new_job.Pass()); |
977 InitOrCoalescePendingJob(old_job); | |
978 } | |
979 RestartWaiting(); | |
980 } | 983 } |
981 | 984 |
982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 985 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
983 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 986 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
984 DCHECK(weak_handle_this_.IsInitialized()); | 987 DCHECK(weak_handle_this_.IsInitialized()); |
985 SDVLOG(3) << "Posting StopImpl"; | 988 SDVLOG(3) << "Posting StopImpl"; |
986 weak_handle_this_.Call(FROM_HERE, | 989 weak_handle_this_.Call(FROM_HERE, |
987 &SyncSchedulerImpl::StopImpl, | 990 &SyncSchedulerImpl::StopImpl, |
988 callback); | 991 callback); |
989 } | 992 } |
990 | 993 |
991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 994 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
992 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 995 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
993 SDVLOG(2) << "StopImpl called"; | 996 SDVLOG(2) << "StopImpl called"; |
994 | 997 |
995 // Kill any in-flight method calls. | 998 // Kill any in-flight method calls. |
996 weak_ptr_factory_.InvalidateWeakPtrs(); | 999 weak_ptr_factory_.InvalidateWeakPtrs(); |
997 wait_interval_.reset(); | 1000 wait_interval_.reset(); |
998 poll_timer_.Stop(); | 1001 poll_timer_.Stop(); |
999 if (started_) { | 1002 if (started_) { |
1000 started_ = false; | 1003 started_ = false; |
1001 } | 1004 } |
1002 if (!callback.is_null()) | 1005 if (!callback.is_null()) |
1003 callback.Run(); | 1006 callback.Run(); |
1004 } | 1007 } |
1005 | 1008 |
1006 void SyncSchedulerImpl::DoCanaryJob() { | 1009 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1010 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1008 SDVLOG(2) << "Do canary job"; | 1011 SDVLOG(2) << "Do canary job"; |
1009 DoPendingJobIfPossible(true); | 1012 |
1013 // Only set canary privileges here, when we are about to run the job. This | |
1014 // avoids confusion in managing canary bits during scheduling, when you | |
1015 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that | |
1016 // was scheduled as canary, and send it to an "unscheduled" state. | |
1017 to_be_canary->GrantCanaryPrivilege(); | |
1018 | |
1019 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { | |
1020 // TODO(tim): We should be able to remove this... | |
1021 scoped_ptr<SyncSession> temp = CreateSyncSession( | |
1022 to_be_canary->session()->source()).Pass(); | |
1023 // The routing info might have been changed since we cached the | |
1024 // pending nudge. Update it by coalescing to the latest. | |
1025 to_be_canary->mutable_session()->Coalesce(*(temp)); | |
1026 } | |
1027 DoSyncSessionJob(to_be_canary.Pass()); | |
1010 } | 1028 } |
1011 | 1029 |
1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { | 1030 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1014 SyncSessionJob* job_to_execute = NULL; | 1032 // If we find a scheduled pending_ job, abandon the old one and return a |
1033 // a clone. If unscheduled, just hand over ownership. | |
1034 scoped_ptr<SyncSessionJob> candidate; | |
1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1035 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
1016 && wait_interval_->pending_configure_job.get()) { | 1036 && wait_interval_->pending_configure_job) { |
1017 SDVLOG(2) << "Found pending configure job"; | 1037 SDVLOG(2) << "Found pending configure job"; |
1018 job_to_execute = wait_interval_->pending_configure_job.get(); | 1038 candidate = |
1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 1039 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); |
1040 wait_interval_->pending_configure_job = candidate.get(); | |
1041 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
1020 SDVLOG(2) << "Found pending nudge job"; | 1042 SDVLOG(2) << "Found pending nudge job"; |
1021 | 1043 candidate = pending_nudge_->CloneAndAbandon(); |
1022 scoped_ptr<SyncSession> session(CreateSyncSession( | 1044 pending_nudge_ = candidate.get(); |
1023 pending_nudge_->session->source())); | 1045 unscheduled_nudge_storage_.reset(); |
1024 | |
1025 // Also the routing info might have been changed since we cached the | |
1026 // pending nudge. Update it by coalescing to the latest. | |
1027 pending_nudge_->session->Coalesce(*(session.get())); | |
1028 // The pending nudge would be cleared in the DoSyncSessionJob function. | |
1029 job_to_execute = pending_nudge_.get(); | |
1030 } | 1046 } |
1031 | 1047 return candidate.Pass(); |
1032 if (job_to_execute != NULL) { | |
1033 SDVLOG(2) << "Executing pending job"; | |
1034 SyncSessionJob copy = *job_to_execute; | |
1035 copy.is_canary_job = is_canary_job; | |
1036 DoSyncSessionJob(copy); | |
1037 } | |
1038 } | 1048 } |
1039 | 1049 |
1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( | 1050 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( |
1041 const SyncSourceInfo& source) { | 1051 const SyncSourceInfo& source) { |
1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1052 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1043 DVLOG(2) << "Creating sync session with routes " | 1053 DVLOG(2) << "Creating sync session with routes " |
1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1054 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
1045 | 1055 |
1046 SyncSourceInfo info(source); | 1056 SyncSourceInfo info(source); |
1047 SyncSession* session(new SyncSession(session_context_, this, info, | 1057 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, |
1048 session_context_->routing_info(), session_context_->workers())); | 1058 session_context_->routing_info(), session_context_->workers())); |
1049 | |
1050 return session; | |
1051 } | 1059 } |
1052 | 1060 |
1053 void SyncSchedulerImpl::PollTimerCallback() { | 1061 void SyncSchedulerImpl::PollTimerCallback() { |
1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1062 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1055 ModelSafeRoutingInfo r; | 1063 ModelSafeRoutingInfo r; |
1056 ModelTypeInvalidationMap invalidation_map = | 1064 ModelTypeInvalidationMap invalidation_map = |
1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | 1065 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | 1066 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
1059 SyncSession* s = CreateSyncSession(info); | 1067 scoped_ptr<SyncSession> s(CreateSyncSession(info)); |
1060 | 1068 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1069 TimeTicks::Now(), |
1062 make_linked_ptr(s), | 1070 s.Pass(), |
1063 false, | 1071 ConfigurationParams(), |
1064 ConfigurationParams(), | 1072 FROM_HERE)); |
1065 FROM_HERE); | 1073 ScheduleSyncSessionJob(job.Pass()); |
1066 | |
1067 ScheduleSyncSessionJob(job); | |
1068 } | 1074 } |
1069 | 1075 |
1070 void SyncSchedulerImpl::Unthrottle() { | 1076 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1077 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1078 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1073 SDVLOG(2) << "Unthrottled."; | 1079 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
1074 DoCanaryJob(); | 1080 << "canary."; |
1081 if (to_be_canary.get()) | |
1082 DoCanaryJob(to_be_canary.Pass()); | |
1083 | |
1084 // TODO(tim): The way DecideOnJob works today, canary privileges aren't | |
1085 // enough to bypass a THROTTLED wait interval, which would suggest we need | |
1086 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is | |
1087 // probably the "right" thing to do). Bug 154216. | |
1075 wait_interval_.reset(); | 1088 wait_interval_.reset(); |
1076 } | 1089 } |
1077 | 1090 |
1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1091 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1092 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1093 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1081 } | 1094 } |
1082 | 1095 |
1083 bool SyncSchedulerImpl::IsBackingOff() const { | 1096 bool SyncSchedulerImpl::IsBackingOff() const { |
1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1097 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1085 return wait_interval_.get() && wait_interval_->mode == | 1098 return wait_interval_.get() && wait_interval_->mode == |
1086 WaitInterval::EXPONENTIAL_BACKOFF; | 1099 WaitInterval::EXPONENTIAL_BACKOFF; |
1087 } | 1100 } |
1088 | 1101 |
1089 void SyncSchedulerImpl::OnSilencedUntil( | 1102 void SyncSchedulerImpl::OnSilencedUntil( |
1090 const base::TimeTicks& silenced_until) { | 1103 const base::TimeTicks& silenced_until) { |
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1104 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1105 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
1093 silenced_until - TimeTicks::Now())); | 1106 silenced_until - TimeTicks::Now())); |
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, | |
1095 &SyncSchedulerImpl::Unthrottle); | |
1096 } | 1107 } |
1097 | 1108 |
1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1109 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1110 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1100 return wait_interval_.get() && wait_interval_->mode == | 1111 return wait_interval_.get() && wait_interval_->mode == |
1101 WaitInterval::THROTTLED; | 1112 WaitInterval::THROTTLED; |
1102 } | 1113 } |
1103 | 1114 |
1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1115 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
1105 const base::TimeDelta& new_interval) { | 1116 const base::TimeDelta& new_interval) { |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1159 | 1170 |
1160 #undef SDVLOG_LOC | 1171 #undef SDVLOG_LOC |
1161 | 1172 |
1162 #undef SDVLOG | 1173 #undef SDVLOG |
1163 | 1174 |
1164 #undef SLOG | 1175 #undef SLOG |
1165 | 1176 |
1166 #undef ENUM_CASE | 1177 #undef ENUM_CASE |
1167 | 1178 |
1168 } // namespace syncer | 1179 } // namespace syncer |
OLD | NEW |