OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/bind_helpers.h" | |
13 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
14 #include "base/location.h" | 13 #include "base/location.h" |
15 #include "base/logging.h" | 14 #include "base/logging.h" |
16 #include "base/message_loop.h" | 15 #include "base/message_loop.h" |
17 #include "sync/engine/backoff_delay_provider.h" | 16 #include "sync/engine/backoff_delay_provider.h" |
18 #include "sync/engine/syncer.h" | 17 #include "sync/engine/syncer.h" |
19 #include "sync/engine/throttled_data_type_tracker.h" | 18 #include "sync/engine/throttled_data_type_tracker.h" |
20 #include "sync/protocol/proto_enum_conversions.h" | 19 #include "sync/protocol/proto_enum_conversions.h" |
21 #include "sync/protocol/sync.pb.h" | 20 #include "sync/protocol/sync.pb.h" |
22 #include "sync/util/data_type_histogram.h" | 21 #include "sync/util/data_type_histogram.h" |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
77 : source(source), | 76 : source(source), |
78 types_to_download(types_to_download), | 77 types_to_download(types_to_download), |
79 routing_info(routing_info), | 78 routing_info(routing_info), |
80 ready_task(ready_task) { | 79 ready_task(ready_task) { |
81 DCHECK(!ready_task.is_null()); | 80 DCHECK(!ready_task.is_null()); |
82 } | 81 } |
83 ConfigurationParams::~ConfigurationParams() {} | 82 ConfigurationParams::~ConfigurationParams() {} |
84 | 83 |
85 SyncSchedulerImpl::WaitInterval::WaitInterval() | 84 SyncSchedulerImpl::WaitInterval::WaitInterval() |
86 : mode(UNKNOWN), | 85 : mode(UNKNOWN), |
87 had_nudge(false), | 86 had_nudge(false) { |
88 pending_configure_job(NULL) {} | 87 } |
89 | |
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
91 : mode(mode), had_nudge(false), length(length), | |
92 pending_configure_job(NULL) {} | |
93 | 88 |
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
95 | 90 |
96 #define ENUM_CASE(x) case x: return #x; break; | 91 #define ENUM_CASE(x) case x: return #x; break; |
97 | 92 |
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
99 switch (mode) { | 94 switch (mode) { |
100 ENUM_CASE(UNKNOWN); | 95 ENUM_CASE(UNKNOWN); |
101 ENUM_CASE(EXPONENTIAL_BACKOFF); | 96 ENUM_CASE(EXPONENTIAL_BACKOFF); |
102 ENUM_CASE(THROTTLED); | 97 ENUM_CASE(THROTTLED); |
103 } | 98 } |
104 NOTREACHED(); | 99 NOTREACHED(); |
105 return ""; | 100 return ""; |
106 } | 101 } |
107 | 102 |
| 103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() |
| 104 : purpose(UNKNOWN), |
| 105 is_canary_job(false) { |
| 106 } |
| 107 |
| 108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} |
| 109 |
| 110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
| 111 base::TimeTicks start, |
| 112 linked_ptr<sessions::SyncSession> session, |
| 113 bool is_canary_job, |
| 114 const ConfigurationParams& config_params, |
| 115 const tracked_objects::Location& from_here) |
| 116 : purpose(purpose), |
| 117 scheduled_start(start), |
| 118 session(session), |
| 119 is_canary_job(is_canary_job), |
| 120 config_params(config_params), |
| 121 from_here(from_here) { |
| 122 } |
| 123 |
| 124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( |
| 125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { |
| 126 switch (purpose) { |
| 127 ENUM_CASE(UNKNOWN); |
| 128 ENUM_CASE(POLL); |
| 129 ENUM_CASE(NUDGE); |
| 130 ENUM_CASE(CONFIGURATION); |
| 131 } |
| 132 NOTREACHED(); |
| 133 return ""; |
| 134 } |
| 135 |
108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
109 NudgeSource source) { | 137 NudgeSource source) { |
110 switch (source) { | 138 switch (source) { |
111 case NUDGE_SOURCE_NOTIFICATION: | 139 case NUDGE_SOURCE_NOTIFICATION: |
112 return GetUpdatesCallerInfo::NOTIFICATION; | 140 return GetUpdatesCallerInfo::NOTIFICATION; |
113 case NUDGE_SOURCE_LOCAL: | 141 case NUDGE_SOURCE_LOCAL: |
114 return GetUpdatesCallerInfo::LOCAL; | 142 return GetUpdatesCallerInfo::LOCAL; |
115 case NUDGE_SOURCE_CONTINUATION: | 143 case NUDGE_SOURCE_CONTINUATION: |
116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
117 case NUDGE_SOURCE_LOCAL_REFRESH: | 145 case NUDGE_SOURCE_LOCAL_REFRESH: |
118 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
119 case NUDGE_SOURCE_UNKNOWN: | 147 case NUDGE_SOURCE_UNKNOWN: |
120 return GetUpdatesCallerInfo::UNKNOWN; | 148 return GetUpdatesCallerInfo::UNKNOWN; |
121 default: | 149 default: |
122 NOTREACHED(); | 150 NOTREACHED(); |
123 return GetUpdatesCallerInfo::UNKNOWN; | 151 return GetUpdatesCallerInfo::UNKNOWN; |
124 } | 152 } |
125 } | 153 } |
126 | 154 |
| 155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
| 156 : mode(mode), had_nudge(false), length(length) { } |
| 157 |
127 // Helper macros to log with the syncer thread name; useful when there | 158 // Helper macros to log with the syncer thread name; useful when there |
128 // are multiple syncer threads involved. | 159 // are multiple syncer threads involved. |
129 | 160 |
130 #define SLOG(severity) LOG(severity) << name_ << ": " | 161 #define SLOG(severity) LOG(severity) << name_ << ": " |
131 | 162 |
132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
133 | 164 |
134 #define SDVLOG_LOC(from_here, verbose_level) \ | 165 #define SDVLOG_LOC(from_here, verbose_level) \ |
135 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
136 | 167 |
(...skipping 30 matching lines...) Expand all Loading... |
167 syncer_short_poll_interval_seconds_( | 198 syncer_short_poll_interval_seconds_( |
168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
169 syncer_long_poll_interval_seconds_( | 200 syncer_long_poll_interval_seconds_( |
170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
171 sessions_commit_delay_( | 202 sessions_commit_delay_( |
172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
173 mode_(NORMAL_MODE), | 204 mode_(NORMAL_MODE), |
174 // Start with assuming everything is fine with the connection. | 205 // Start with assuming everything is fine with the connection. |
175 // At the end of the sync cycle we would have the correct status. | 206 // At the end of the sync cycle we would have the correct status. |
176 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
177 pending_nudge_(NULL), | |
178 delay_provider_(delay_provider), | 208 delay_provider_(delay_provider), |
179 syncer_(syncer), | 209 syncer_(syncer), |
180 session_context_(context), | 210 session_context_(context), |
181 no_scheduling_allowed_(false) { | 211 no_scheduling_allowed_(false) { |
182 DCHECK(sync_loop_); | 212 DCHECK(sync_loop_); |
183 } | 213 } |
184 | 214 |
185 SyncSchedulerImpl::~SyncSchedulerImpl() { | 215 SyncSchedulerImpl::~SyncSchedulerImpl() { |
186 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 216 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
187 StopImpl(base::Closure()); | 217 StopImpl(base::Closure()); |
(...skipping 16 matching lines...) Expand all Loading... |
204 void SyncSchedulerImpl::OnConnectionStatusChange() { | 234 void SyncSchedulerImpl::OnConnectionStatusChange() { |
205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
206 // Optimistically assume that the connection is fixed and try | 236 // Optimistically assume that the connection is fixed and try |
207 // connecting. | 237 // connecting. |
208 OnServerConnectionErrorFixed(); | 238 OnServerConnectionErrorFixed(); |
209 } | 239 } |
210 } | 240 } |
211 | 241 |
212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
214 // There could be a pending nudge or configuration job in several cases: | |
215 // | |
216 // 1. We're in exponential backoff. | |
217 // 2. We're silenced / throttled. | |
218 // 3. A nudge was saved previously due to not having a valid auth token. | |
219 // 4. A nudge was scheduled + saved while in configuration mode. | |
220 // | |
221 // In all cases except (2), we want to retry contacting the server. We | |
222 // call DoCanaryJob to achieve this, and note that nothing -- not even a | |
223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
224 // has the authority to do that is the Unthrottle timer. | |
225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
226 if (!pending.get()) | |
227 return; | |
228 | |
229 PostTask(FROM_HERE, "DoCanaryJob", | 244 PostTask(FROM_HERE, "DoCanaryJob", |
230 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
231 weak_ptr_factory_.GetWeakPtr(), | 246 weak_ptr_factory_.GetWeakPtr())); |
232 base::Passed(&pending))); | 247 |
233 } | 248 } |
234 | 249 |
235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
236 HttpResponse::ServerConnectionCode code) { | 251 HttpResponse::ServerConnectionCode code) { |
237 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 252 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
238 SDVLOG(2) << "New server connection code: " | 253 SDVLOG(2) << "New server connection code: " |
239 << HttpResponse::GetServerConnectionCodeString(code); | 254 << HttpResponse::GetServerConnectionCodeString(code); |
240 | 255 |
241 connection_code_ = code; | 256 connection_code_ = code; |
242 } | 257 } |
(...skipping 15 matching lines...) Expand all Loading... |
258 Mode old_mode = mode_; | 273 Mode old_mode = mode_; |
259 mode_ = mode; | 274 mode_ = mode; |
260 AdjustPolling(NULL); // Will kick start poll timer if needed. | 275 AdjustPolling(NULL); // Will kick start poll timer if needed. |
261 | 276 |
262 if (old_mode != mode_) { | 277 if (old_mode != mode_) { |
263 // We just changed our mode. See if there are any pending jobs that we could | 278 // We just changed our mode. See if there are any pending jobs that we could |
264 // execute in the new mode. | 279 // execute in the new mode. |
265 if (mode_ == NORMAL_MODE) { | 280 if (mode_ == NORMAL_MODE) { |
266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
267 // has not yet completed. | 282 // has not yet completed. |
268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | 283 DCHECK(!wait_interval_.get() || |
| 284 !wait_interval_->pending_configure_job.get()); |
269 } | 285 } |
270 | 286 |
271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | 287 DoPendingJobIfPossible(false); |
272 if (pending.get()) { | |
273 // TODO(tim): We should be able to remove this... | |
274 scoped_ptr<SyncSession> session(CreateSyncSession( | |
275 pending->session()->source())); | |
276 // Also the routing info might have been changed since we cached the | |
277 // pending nudge. Update it by coalescing to the latest. | |
278 pending->mutable_session()->Coalesce(*session); | |
279 SDVLOG(2) << "Executing pending job. Good luck!"; | |
280 DoSyncSessionJob(pending.Pass()); | |
281 } | |
282 } | 288 } |
283 } | 289 } |
284 | 290 |
285 void SyncSchedulerImpl::SendInitialSnapshot() { | 291 void SyncSchedulerImpl::SendInitialSnapshot() { |
286 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 292 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
288 SyncSourceInfo(), ModelSafeRoutingInfo(), | 294 SyncSourceInfo(), ModelSafeRoutingInfo(), |
289 std::vector<ModelSafeWorker*>())); | 295 std::vector<ModelSafeWorker*>())); |
290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
291 event.snapshot = dummy->TakeSnapshot(); | 297 event.snapshot = dummy->TakeSnapshot(); |
(...skipping 23 matching lines...) Expand all Loading... |
315 bool SyncSchedulerImpl::ScheduleConfiguration( | 321 bool SyncSchedulerImpl::ScheduleConfiguration( |
316 const ConfigurationParams& params) { | 322 const ConfigurationParams& params) { |
317 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 323 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
319 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 325 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
320 DCHECK(!params.ready_task.is_null()); | 326 DCHECK(!params.ready_task.is_null()); |
321 SDVLOG(2) << "Reconfiguring syncer."; | 327 SDVLOG(2) << "Reconfiguring syncer."; |
322 | 328 |
323 // Only one configuration is allowed at a time. Verify we're not waiting | 329 // Only one configuration is allowed at a time. Verify we're not waiting |
324 // for a pending configure job. | 330 // for a pending configure job. |
325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | 331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); |
326 | 332 |
327 ModelSafeRoutingInfo restricted_routes; | 333 ModelSafeRoutingInfo restricted_routes; |
328 BuildModelSafeParams(params.types_to_download, | 334 BuildModelSafeParams(params.types_to_download, |
329 params.routing_info, | 335 params.routing_info, |
330 &restricted_routes); | 336 &restricted_routes); |
331 session_context_->set_routing_info(params.routing_info); | 337 session_context_->set_routing_info(params.routing_info); |
332 | 338 |
333 // Only reconfigure if we have types to download. | 339 // Only reconfigure if we have types to download. |
334 if (!params.types_to_download.Empty()) { | 340 if (!params.types_to_download.Empty()) { |
335 DCHECK(!restricted_routes.empty()); | 341 DCHECK(!restricted_routes.empty()); |
336 scoped_ptr<SyncSession> session(new SyncSession( | 342 linked_ptr<SyncSession> session(new SyncSession( |
337 session_context_, | 343 session_context_, |
338 this, | 344 this, |
339 SyncSourceInfo(params.source, | 345 SyncSourceInfo(params.source, |
340 ModelSafeRoutingInfoToInvalidationMap( | 346 ModelSafeRoutingInfoToInvalidationMap( |
341 restricted_routes, | 347 restricted_routes, |
342 std::string())), | 348 std::string())), |
343 restricted_routes, | 349 restricted_routes, |
344 session_context_->workers())); | 350 session_context_->workers())); |
345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, |
346 SyncSessionJob::CONFIGURATION, | 352 TimeTicks::Now(), |
347 TimeTicks::Now(), | 353 session, |
348 session.Pass(), | 354 false, |
349 params, | 355 params, |
350 FROM_HERE)); | 356 FROM_HERE); |
351 bool succeeded = DoSyncSessionJob(job.Pass()); | 357 DoSyncSessionJob(job); |
352 | 358 |
353 // If we failed, the job would have been saved as the pending configure | 359 // If we failed, the job would have been saved as the pending configure |
354 // job and a wait interval would have been set. | 360 // job and a wait interval would have been set. |
355 if (!succeeded) { | 361 if (!session->Succeeded()) { |
356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); | 362 DCHECK(wait_interval_.get() && |
| 363 wait_interval_->pending_configure_job.get()); |
357 return false; | 364 return false; |
358 } | 365 } |
359 } else { | 366 } else { |
360 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 367 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
361 params.ready_task.Run(); | 368 params.ready_task.Run(); |
362 } | 369 } |
363 | 370 |
364 return true; | 371 return true; |
365 } | 372 } |
366 | 373 |
367 SyncSchedulerImpl::JobProcessDecision | 374 SyncSchedulerImpl::JobProcessDecision |
368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job) { | 375 SyncSchedulerImpl::DecideWhileInWaitInterval( |
| 376 const SyncSessionJob& job) { |
369 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 377 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
370 DCHECK(wait_interval_.get()); | 378 DCHECK(wait_interval_.get()); |
371 | 379 |
372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
373 << WaitInterval::GetModeString(wait_interval_->mode) | 381 << WaitInterval::GetModeString(wait_interval_->mode) |
374 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 382 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
375 << (job.is_canary() ? " (canary)" : ""); | 383 << (job.is_canary_job ? " (canary)" : ""); |
376 | 384 |
377 if (job.purpose() == SyncSessionJob::POLL) | 385 if (job.purpose == SyncSessionJob::POLL) |
378 return DROP; | 386 return DROP; |
379 | 387 |
380 // If we save a job while in a WaitInterval, there is a well-defined moment | 388 DCHECK(job.purpose == SyncSessionJob::NUDGE || |
381 // in time in the future when it makes sense for that SAVE-worthy job to try | 389 job.purpose == SyncSessionJob::CONFIGURATION); |
382 // running again -- the end of the WaitInterval. | |
383 DCHECK(job.purpose() == SyncSessionJob::NUDGE || | |
384 job.purpose() == SyncSessionJob::CONFIGURATION); | |
385 | |
386 // If throttled, there's a clock ticking to unthrottle. We want to get | |
387 // on the same train. | |
388 if (wait_interval_->mode == WaitInterval::THROTTLED) | 390 if (wait_interval_->mode == WaitInterval::THROTTLED) |
389 return SAVE; | 391 return SAVE; |
390 | 392 |
391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
392 if (job.purpose() == SyncSessionJob::NUDGE) { | 394 if (job.purpose == SyncSessionJob::NUDGE) { |
393 if (mode_ == CONFIGURATION_MODE) | 395 if (mode_ == CONFIGURATION_MODE) |
394 return SAVE; | 396 return SAVE; |
395 | 397 |
396 // If we already had one nudge then just drop this nudge. We will retry | 398 // If we already had one nudge then just drop this nudge. We will retry |
397 // later when the timer runs out. | 399 // later when the timer runs out. |
398 if (!job.is_canary()) | 400 if (!job.is_canary_job) |
399 return wait_interval_->had_nudge ? DROP : CONTINUE; | 401 return wait_interval_->had_nudge ? DROP : CONTINUE; |
400 else // We are here because timer ran out. So retry. | 402 else // We are here because timer ran out. So retry. |
401 return CONTINUE; | 403 return CONTINUE; |
402 } | 404 } |
403 return job.is_canary() ? CONTINUE : SAVE; | 405 return job.is_canary_job ? CONTINUE : SAVE; |
404 } | 406 } |
405 | 407 |
406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
407 const SyncSessionJob& job) { | 409 const SyncSessionJob& job) { |
408 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 410 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
409 | 411 |
410 // See if our type is throttled. | 412 // See if our type is throttled. |
411 ModelTypeSet throttled_types = | 413 ModelTypeSet throttled_types = |
412 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
413 if (job.purpose() == SyncSessionJob::NUDGE && | 415 if (job.purpose == SyncSessionJob::NUDGE && |
414 job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
415 ModelTypeSet requested_types; | 417 ModelTypeSet requested_types; |
416 for (ModelTypeInvalidationMap::const_iterator i = | 418 for (ModelTypeInvalidationMap::const_iterator i = |
417 job.session()->source().types.begin(); | 419 job.session->source().types.begin(); |
418 i != job.session()->source().types.end(); | 420 i != job.session->source().types.end(); |
419 ++i) { | 421 ++i) { |
420 requested_types.Put(i->first); | 422 requested_types.Put(i->first); |
421 } | 423 } |
422 | 424 |
423 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
424 // a per-datatype "unthrottle" event as something that should force a | |
425 // canary job. For this reason, there's no good time to reschedule this job | |
426 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
427 // Note that there may already be such an event if we're in a WaitInterval, | |
428 // so we can retry it then. | |
429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
430 return SAVE; | 426 return SAVE; |
431 } | 427 } |
432 | 428 |
433 if (wait_interval_.get()) | 429 if (wait_interval_.get()) |
434 return DecideWhileInWaitInterval(job); | 430 return DecideWhileInWaitInterval(job); |
435 | 431 |
436 if (mode_ == CONFIGURATION_MODE) { | 432 if (mode_ == CONFIGURATION_MODE) { |
437 if (job.purpose() == SyncSessionJob::NUDGE) | 433 if (job.purpose == SyncSessionJob::NUDGE) |
438 return SAVE; // Running requires a mode switch. | 434 return SAVE; |
439 else if (job.purpose() == SyncSessionJob::CONFIGURATION) | 435 else if (job.purpose == SyncSessionJob::CONFIGURATION) |
440 return CONTINUE; | 436 return CONTINUE; |
441 else | 437 else |
442 return DROP; | 438 return DROP; |
443 } | 439 } |
444 | 440 |
445 // We are in normal mode. | 441 // We are in normal mode. |
446 DCHECK_EQ(mode_, NORMAL_MODE); | 442 DCHECK_EQ(mode_, NORMAL_MODE); |
447 DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION); | 443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); |
448 | 444 |
449 // Note about some subtle scheduling semantics. | 445 // Note about some subtle scheduling semantics. |
450 // | 446 // |
451 // It's possible at this point that |job| is known to be unnecessary, and | 447 // It's possible at this point that |job| is known to be unnecessary, and |
452 // dropping it would be perfectly safe and correct. Consider | 448 // dropping it would be perfectly safe and correct. Consider |
453 // | 449 // |
454 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 450 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
455 // the time that the last successful all-datatype NUDGE completed. | 451 // the time that the last successful all-datatype NUDGE completed. |
456 // | 452 // |
457 // 2) |job| is a NUDGE (for any combination of types) with a | 453 // 2) |job| is a NUDGE (for any combination of types) with a |
(...skipping 22 matching lines...) Expand all Loading... |
480 // * It's not strictly "impossible", but it would be reentrant and hence | 476 // * It's not strictly "impossible", but it would be reentrant and hence |
481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
482 // legal side effect of any of the work being done as part of a sync cycle. | 478 // legal side effect of any of the work being done as part of a sync cycle. |
483 // See |no_scheduling_allowed_| for details. | 479 // See |no_scheduling_allowed_| for details. |
484 | 480 |
485 // Decision now rests on state of auth tokens. | 481 // Decision now rests on state of auth tokens. |
486 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
487 return CONTINUE; | 483 return CONTINUE; |
488 | 484 |
489 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
490 // Running the job would require updated auth, so we can't honour | 486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; |
491 // job.scheduled_start(). | |
492 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
493 } | 487 } |
494 | 488 |
495 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { | 489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
496 DCHECK_EQ(DecideOnJob(*job), SAVE); | 490 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
497 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; | 491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
498 if (is_nudge && pending_nudge_) { | 492 if (pending_nudge_.get() == NULL) { |
499 SDVLOG(2) << "Coalescing a pending nudge"; | 493 SDVLOG(2) << "Creating a pending nudge job"; |
500 // TODO(tim): This basically means we never use the more-careful coalescing | 494 SyncSession* s = job.session.get(); |
501 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | 495 |
502 // times, because we're calling this function first. Pull this out | 496 // Get a fresh session with similar configuration as before (resets |
503 // into a function to coalesce + set start times and reuse. | 497 // StatusController). |
504 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | 498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
| 499 s->delegate(), s->source(), s->routing_info(), s->workers())); |
| 500 |
| 501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
| 502 make_linked_ptr(session.release()), false, |
| 503 ConfigurationParams(), job.from_here); |
| 504 pending_nudge_.reset(new SyncSessionJob(new_job)); |
505 return; | 505 return; |
506 } | 506 } |
507 | 507 |
508 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); | 508 SDVLOG(2) << "Coalescing a pending nudge"; |
509 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | 509 pending_nudge_->session->Coalesce(*(job.session.get())); |
510 // This job should be made the new canary. | 510 pending_nudge_->scheduled_start = job.scheduled_start; |
511 if (is_nudge) { | |
512 pending_nudge_ = job_to_save.get(); | |
513 } else { | |
514 SDVLOG(2) << "Saving a configuration job"; | |
515 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
516 DCHECK(!wait_interval_->pending_configure_job); | |
517 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
518 DCHECK(!job->config_params().ready_task.is_null()); | |
519 // The only nudge that could exist is a scheduled canary nudge. | |
520 DCHECK(!unscheduled_nudge_storage_.get()); | |
521 if (pending_nudge_) { | |
522 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
523 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); | |
524 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
525 } | |
526 wait_interval_->pending_configure_job = job_to_save.get(); | |
527 } | |
528 TimeDelta length = | |
529 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
530 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
531 TimeDelta::FromSeconds(0) : length; | |
532 RestartWaiting(job_to_save.Pass()); | |
533 return; | |
534 } | |
535 | 511 |
536 // Note that today there are no cases where we SAVE a CONFIGURATION job | 512 // Unfortunately the nudge location cannot be modified. So it stores the |
537 // when we're not in a WaitInterval. See bug 147736. | 513 // location of the first caller. |
538 DCHECK(is_nudge); | 514 } |
539 // There may or may not be a pending_configure_job. Either way this nudge | 515 |
540 // is unschedulable. | 516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { |
541 pending_nudge_ = job_to_save.get(); | 517 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
542 unscheduled_nudge_storage_ = job_to_save.Pass(); | 518 DCHECK(started_); |
| 519 |
| 520 JobProcessDecision decision = DecideOnJob(job); |
| 521 SDVLOG(2) << "Should run " |
| 522 << SyncSessionJob::GetPurposeString(job.purpose) |
| 523 << " job in mode " << GetModeString(mode_) |
| 524 << ": " << GetDecisionString(decision); |
| 525 if (decision != SAVE) |
| 526 return decision == CONTINUE; |
| 527 |
| 528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == |
| 529 SyncSessionJob::CONFIGURATION); |
| 530 |
| 531 SaveJob(job); |
| 532 return false; |
| 533 } |
| 534 |
| 535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { |
| 536 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 537 if (job.purpose == SyncSessionJob::NUDGE) { |
| 538 SDVLOG(2) << "Saving a nudge job"; |
| 539 InitOrCoalescePendingJob(job); |
| 540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
| 541 SDVLOG(2) << "Saving a configuration job"; |
| 542 DCHECK(wait_interval_.get()); |
| 543 DCHECK(mode_ == CONFIGURATION_MODE); |
| 544 |
| 545 // Config params should always get set. |
| 546 DCHECK(!job.config_params.ready_task.is_null()); |
| 547 SyncSession* old = job.session.get(); |
| 548 SyncSession* s(new SyncSession(session_context_, this, old->source(), |
| 549 old->routing_info(), old->workers())); |
| 550 SyncSessionJob new_job(job.purpose, |
| 551 TimeTicks::Now(), |
| 552 make_linked_ptr(s), |
| 553 false, |
| 554 job.config_params, |
| 555 job.from_here); |
| 556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
| 557 } // drop the rest. |
| 558 // TODO(sync): Is it okay to drop the rest? It's weird that |
| 559 // SaveJob() only does what it says sometimes. (See |
| 560 // http://crbug.com/90868.) |
543 } | 561 } |
544 | 562 |
545 // Functor for std::find_if to search by ModelSafeGroup. | 563 // Functor for std::find_if to search by ModelSafeGroup. |
546 struct ModelSafeWorkerGroupIs { | 564 struct ModelSafeWorkerGroupIs { |
547 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
548 bool operator()(ModelSafeWorker* w) { | 566 bool operator()(ModelSafeWorker* w) { |
549 return group == w->GetModelSafeGroup(); | 567 return group == w->GetModelSafeGroup(); |
550 } | 568 } |
551 ModelSafeGroup group; | 569 ModelSafeGroup group; |
552 }; | 570 }; |
553 | 571 |
554 void SyncSchedulerImpl::ScheduleNudgeAsync( | 572 void SyncSchedulerImpl::ScheduleNudgeAsync( |
555 const TimeDelta& desired_delay, | 573 const TimeDelta& delay, |
556 NudgeSource source, ModelTypeSet types, | 574 NudgeSource source, ModelTypeSet types, |
557 const tracked_objects::Location& nudge_location) { | 575 const tracked_objects::Location& nudge_location) { |
558 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 576 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
559 SDVLOG_LOC(nudge_location, 2) | 577 SDVLOG_LOC(nudge_location, 2) |
560 << "Nudge scheduled with delay " | 578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
561 << desired_delay.InMilliseconds() << " ms, " | |
562 << "source " << GetNudgeSourceString(source) << ", " | 579 << "source " << GetNudgeSourceString(source) << ", " |
563 << "types " << ModelTypeSetToString(types); | 580 << "types " << ModelTypeSetToString(types); |
564 | 581 |
565 ModelTypeInvalidationMap invalidation_map = | 582 ModelTypeInvalidationMap invalidation_map = |
566 ModelTypeSetToInvalidationMap(types, std::string()); | 583 ModelTypeSetToInvalidationMap(types, std::string()); |
567 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, | 584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
568 GetUpdatesFromNudgeSource(source), | 585 GetUpdatesFromNudgeSource(source), |
569 invalidation_map, | 586 invalidation_map, |
| 587 false, |
570 nudge_location); | 588 nudge_location); |
571 } | 589 } |
572 | 590 |
573 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
574 const TimeDelta& desired_delay, | 592 const TimeDelta& delay, |
575 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, | 593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, |
576 const tracked_objects::Location& nudge_location) { | 594 const tracked_objects::Location& nudge_location) { |
577 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 595 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
578 SDVLOG_LOC(nudge_location, 2) | 596 SDVLOG_LOC(nudge_location, 2) |
579 << "Nudge scheduled with delay " | 597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
580 << desired_delay.InMilliseconds() << " ms, " | |
581 << "source " << GetNudgeSourceString(source) << ", " | 598 << "source " << GetNudgeSourceString(source) << ", " |
582 << "payloads " | 599 << "payloads " |
583 << ModelTypeInvalidationMapToString(invalidation_map); | 600 << ModelTypeInvalidationMapToString(invalidation_map); |
584 | 601 |
585 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, | 602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
586 GetUpdatesFromNudgeSource(source), | 603 GetUpdatesFromNudgeSource(source), |
587 invalidation_map, | 604 invalidation_map, |
| 605 false, |
588 nudge_location); | 606 nudge_location); |
589 } | 607 } |
590 | 608 |
591 void SyncSchedulerImpl::ScheduleNudgeImpl( | 609 void SyncSchedulerImpl::ScheduleNudgeImpl( |
592 const TimeDelta& delay, | 610 const TimeDelta& delay, |
593 GetUpdatesCallerInfo::GetUpdatesSource source, | 611 GetUpdatesCallerInfo::GetUpdatesSource source, |
594 const ModelTypeInvalidationMap& invalidation_map, | 612 const ModelTypeInvalidationMap& invalidation_map, |
595 const tracked_objects::Location& nudge_location) { | 613 bool is_canary_job, const tracked_objects::Location& nudge_location) { |
596 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 614 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
597 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; | 615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; |
598 | 616 |
599 SDVLOG_LOC(nudge_location, 2) | 617 SDVLOG_LOC(nudge_location, 2) |
600 << "In ScheduleNudgeImpl with delay " | 618 << "In ScheduleNudgeImpl with delay " |
601 << delay.InMilliseconds() << " ms, " | 619 << delay.InMilliseconds() << " ms, " |
602 << "source " << GetUpdatesSourceString(source) << ", " | 620 << "source " << GetUpdatesSourceString(source) << ", " |
603 << "payloads " | 621 << "payloads " |
604 << ModelTypeInvalidationMapToString(invalidation_map); | 622 << ModelTypeInvalidationMapToString(invalidation_map) |
| 623 << (is_canary_job ? " (canary)" : ""); |
605 | 624 |
606 SyncSourceInfo info(source, invalidation_map); | 625 SyncSourceInfo info(source, invalidation_map); |
607 UpdateNudgeTimeRecords(info); | 626 UpdateNudgeTimeRecords(info); |
608 | 627 |
609 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 628 SyncSession* session(CreateSyncSession(info)); |
610 SyncSessionJob::NUDGE, | 629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
611 TimeTicks::Now() + delay, | 630 make_linked_ptr(session), is_canary_job, |
612 CreateSyncSession(info).Pass(), | 631 ConfigurationParams(), nudge_location); |
613 ConfigurationParams(), | |
614 nudge_location)); | |
615 | 632 |
616 JobProcessDecision decision = DecideOnJob(*job); | 633 session = NULL; |
617 SDVLOG(2) << "Should run " | 634 if (!ShouldRunJob(job)) |
618 << SyncSessionJob::GetPurposeString(job->purpose()) | 635 return; |
619 << " job " << job->session() | 636 |
620 << " in mode " << GetModeString(mode_) | 637 if (pending_nudge_.get()) { |
621 << ": " << GetDecisionString(decision); | 638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
622 if (decision != CONTINUE) { | 639 SDVLOG(2) << "Dropping the nudge because we are in backoff"; |
623 // End of the line, though we may save the job for later. | 640 return; |
624 if (decision == SAVE) { | |
625 HandleSaveJobDecision(job.Pass()); | |
626 } else { | |
627 DCHECK_EQ(decision, DROP); | |
628 } | 641 } |
629 return; | |
630 } | |
631 | 642 |
632 if (pending_nudge_) { | 643 SDVLOG(2) << "Coalescing pending nudge"; |
| 644 pending_nudge_->session->Coalesce(*(job.session.get())); |
| 645 |
633 SDVLOG(2) << "Rescheduling pending nudge"; | 646 SDVLOG(2) << "Rescheduling pending nudge"; |
634 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | 647 SyncSession* s = pending_nudge_->session.get(); |
635 // Choose the start time as the earliest of the 2. Note that this means | 648 job.session.reset(new SyncSession(s->context(), s->delegate(), |
636 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) | 649 s->source(), s->routing_info(), s->workers())); |
637 // but a nudge is already scheduled to go out, we'll send the (tab) commit | 650 |
638 // without waiting. | 651 // Choose the start time as the earliest of the 2. |
639 pending_nudge_->set_scheduled_start( | 652 job.scheduled_start = std::min(job.scheduled_start, |
640 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); | 653 pending_nudge_->scheduled_start); |
641 // Abandon the old task by cloning and replacing the session. | 654 pending_nudge_.reset(); |
642 // It's possible that by "rescheduling" we're actually taking a job that | |
643 // was previously unscheduled and giving it wings, so take care to reset | |
644 // unscheduled nudge storage. | |
645 job = pending_nudge_->CloneAndAbandon(); | |
646 unscheduled_nudge_storage_.reset(); | |
647 pending_nudge_ = NULL; | |
648 } | 655 } |
649 | 656 |
650 // TODO(zea): Consider adding separate throttling/backoff for datatype | 657 // TODO(zea): Consider adding separate throttling/backoff for datatype |
651 // refresh requests. | 658 // refresh requests. |
652 ScheduleSyncSessionJob(job.Pass()); | 659 ScheduleSyncSessionJob(job); |
653 } | 660 } |
654 | 661 |
655 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
656 switch (mode) { | 663 switch (mode) { |
657 ENUM_CASE(CONFIGURATION_MODE); | 664 ENUM_CASE(CONFIGURATION_MODE); |
658 ENUM_CASE(NORMAL_MODE); | 665 ENUM_CASE(NORMAL_MODE); |
659 } | 666 } |
660 return ""; | 667 return ""; |
661 } | 668 } |
662 | 669 |
663 const char* SyncSchedulerImpl::GetDecisionString( | 670 const char* SyncSchedulerImpl::GetDecisionString( |
664 SyncSchedulerImpl::JobProcessDecision mode) { | 671 SyncSchedulerImpl::JobProcessDecision mode) { |
665 switch (mode) { | 672 switch (mode) { |
666 ENUM_CASE(CONTINUE); | 673 ENUM_CASE(CONTINUE); |
667 ENUM_CASE(SAVE); | 674 ENUM_CASE(SAVE); |
668 ENUM_CASE(DROP); | 675 ENUM_CASE(DROP); |
669 } | 676 } |
670 return ""; | 677 return ""; |
671 } | 678 } |
672 | 679 |
| 680 // static |
| 681 void SyncSchedulerImpl::SetSyncerStepsForPurpose( |
| 682 SyncSessionJob::SyncSessionJobPurpose purpose, |
| 683 SyncerStep* start, |
| 684 SyncerStep* end) { |
| 685 switch (purpose) { |
| 686 case SyncSessionJob::CONFIGURATION: |
| 687 *start = DOWNLOAD_UPDATES; |
| 688 *end = APPLY_UPDATES; |
| 689 return; |
| 690 case SyncSessionJob::NUDGE: |
| 691 case SyncSessionJob::POLL: |
| 692 *start = SYNCER_BEGIN; |
| 693 *end = SYNCER_END; |
| 694 return; |
| 695 default: |
| 696 NOTREACHED(); |
| 697 *start = SYNCER_END; |
| 698 *end = SYNCER_END; |
| 699 return; |
| 700 } |
| 701 } |
| 702 |
673 void SyncSchedulerImpl::PostTask( | 703 void SyncSchedulerImpl::PostTask( |
674 const tracked_objects::Location& from_here, | 704 const tracked_objects::Location& from_here, |
675 const char* name, const base::Closure& task) { | 705 const char* name, const base::Closure& task) { |
676 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
677 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 707 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
678 if (!started_) { | 708 if (!started_) { |
679 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 709 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
680 return; | 710 return; |
681 } | 711 } |
682 sync_loop_->PostTask(from_here, task); | 712 sync_loop_->PostTask(from_here, task); |
683 } | 713 } |
684 | 714 |
685 void SyncSchedulerImpl::PostDelayedTask( | 715 void SyncSchedulerImpl::PostDelayedTask( |
686 const tracked_objects::Location& from_here, | 716 const tracked_objects::Location& from_here, |
687 const char* name, const base::Closure& task, base::TimeDelta delay) { | 717 const char* name, const base::Closure& task, base::TimeDelta delay) { |
688 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
689 << delay.InMilliseconds() << " ms delay"; | 719 << delay.InMilliseconds() << " ms delay"; |
690 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 720 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
691 if (!started_) { | 721 if (!started_) { |
692 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 722 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
693 return; | 723 return; |
694 } | 724 } |
695 sync_loop_->PostDelayedTask(from_here, task, delay); | 725 sync_loop_->PostDelayedTask(from_here, task, delay); |
696 } | 726 } |
697 | 727 |
698 void SyncSchedulerImpl::ScheduleSyncSessionJob( | 728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { |
699 scoped_ptr<SyncSessionJob> job) { | |
700 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 729 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
701 if (no_scheduling_allowed_) { | 730 if (no_scheduling_allowed_) { |
702 NOTREACHED() << "Illegal to schedule job while session in progress."; | 731 NOTREACHED() << "Illegal to schedule job while session in progress."; |
703 return; | 732 return; |
704 } | 733 } |
705 | 734 |
706 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); | 735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); |
707 tracked_objects::Location loc(job->from_location()); | |
708 if (delay < TimeDelta::FromMilliseconds(0)) | 736 if (delay < TimeDelta::FromMilliseconds(0)) |
709 delay = TimeDelta::FromMilliseconds(0); | 737 delay = TimeDelta::FromMilliseconds(0); |
710 SDVLOG_LOC(loc, 2) | 738 SDVLOG_LOC(job.from_here, 2) |
711 << "In ScheduleSyncSessionJob with " | 739 << "In ScheduleSyncSessionJob with " |
712 << SyncSessionJob::GetPurposeString(job->purpose()) | 740 << SyncSessionJob::GetPurposeString(job.purpose) |
713 << " job and " << delay.InMilliseconds() << " ms delay"; | 741 << " job and " << delay.InMilliseconds() << " ms delay"; |
714 | 742 |
715 DCHECK(job->purpose() == SyncSessionJob::NUDGE || | 743 DCHECK(job.purpose == SyncSessionJob::NUDGE || |
716 job->purpose() == SyncSessionJob::POLL); | 744 job.purpose == SyncSessionJob::POLL); |
717 if (job->purpose() == SyncSessionJob::NUDGE) { | 745 if (job.purpose == SyncSessionJob::NUDGE) { |
718 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to "; | 746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; |
719 DCHECK(!pending_nudge_ || pending_nudge_->session() == | 747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == |
720 job->session()); | 748 job.session); |
721 pending_nudge_ = job.get(); | 749 pending_nudge_.reset(new SyncSessionJob(job)); |
| 750 } |
| 751 PostDelayedTask(job.from_here, "DoSyncSessionJob", |
| 752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, |
| 753 weak_ptr_factory_.GetWeakPtr(), |
| 754 job), |
| 755 delay); |
| 756 } |
| 757 |
| 758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { |
| 759 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 760 |
| 761 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
| 762 if (!ShouldRunJob(job)) { |
| 763 SLOG(WARNING) |
| 764 << "Not executing " |
| 765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " |
| 766 << GetUpdatesSourceString(job.session->source().updates_source); |
| 767 return; |
722 } | 768 } |
723 | 769 |
724 PostDelayedTask(loc, "DoSyncSessionJob", | 770 if (job.purpose == SyncSessionJob::NUDGE) { |
725 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), | 771 if (pending_nudge_.get() == NULL || |
726 weak_ptr_factory_.GetWeakPtr(), | 772 pending_nudge_->session != job.session) { |
727 base::Passed(&job)), | |
728 delay); | |
729 } | |
730 | |
731 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { | |
732 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
733 if (job->purpose() == SyncSessionJob::NUDGE) { | |
734 if (pending_nudge_ == NULL || | |
735 pending_nudge_->session() != job->session()) { | |
736 // |job| is abandoned. | |
737 SDVLOG(2) << "Dropping a nudge in " | 773 SDVLOG(2) << "Dropping a nudge in " |
738 << "DoSyncSessionJob because another nudge was scheduled"; | 774 << "DoSyncSessionJob because another nudge was scheduled"; |
739 return false; | 775 return; // Another nudge must have been scheduled in in the meantime. |
740 } | 776 } |
741 pending_nudge_ = NULL; | 777 pending_nudge_.reset(); |
742 | 778 |
743 // Rebase the session with the latest model safe table and use it to purge | 779 // Create the session with the latest model safe table and use it to purge |
744 // and update any disabled or modified entries in the job. | 780 // and update any disabled or modified entries in the job. |
745 job->mutable_session()->RebaseRoutingInfoWithLatest( | 781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); |
746 session_context_->routing_info(), session_context_->workers()); | 782 |
| 783 job.session->RebaseRoutingInfoWithLatest(*session); |
747 } | 784 } |
| 785 SDVLOG(2) << "DoSyncSessionJob with " |
| 786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; |
748 | 787 |
749 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 788 SyncerStep begin(SYNCER_END); |
750 JobProcessDecision decision = DecideOnJob(*job); | 789 SyncerStep end(SYNCER_END); |
751 SDVLOG(2) << "Should run " | 790 SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
752 << SyncSessionJob::GetPurposeString(job->purpose()) | |
753 << " job " << job->session() | |
754 << " in mode " << GetModeString(mode_) | |
755 << " with source " << job->session()->source().updates_source | |
756 << ": " << GetDecisionString(decision); | |
757 if (decision != CONTINUE) { | |
758 if (decision == SAVE) { | |
759 HandleSaveJobDecision(job.Pass()); | |
760 } else { | |
761 DCHECK_EQ(decision, DROP); | |
762 } | |
763 return false; | |
764 } | |
765 | |
766 SDVLOG(2) << "DoSyncSessionJob with " | |
767 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; | |
768 | 791 |
769 bool has_more_to_sync = true; | 792 bool has_more_to_sync = true; |
770 bool premature_exit = false; | 793 while (ShouldRunJob(job) && has_more_to_sync) { |
771 while (DecideOnJob(*job) == CONTINUE && has_more_to_sync) { | |
772 SDVLOG(2) << "Calling SyncShare."; | 794 SDVLOG(2) << "Calling SyncShare."; |
773 // Synchronously perform the sync session from this thread. | 795 // Synchronously perform the sync session from this thread. |
774 premature_exit = !syncer_->SyncShare(job->mutable_session(), | 796 syncer_->SyncShare(job.session.get(), begin, end); |
775 job->start_step(), | 797 has_more_to_sync = job.session->HasMoreToSync(); |
776 job->end_step()); | |
777 | |
778 has_more_to_sync = job->session()->HasMoreToSync(); | |
779 if (has_more_to_sync) | 798 if (has_more_to_sync) |
780 job->mutable_session()->PrepareForAnotherSyncCycle(); | 799 job.session->PrepareForAnotherSyncCycle(); |
781 } | 800 } |
782 SDVLOG(2) << "Done SyncShare looping."; | 801 SDVLOG(2) << "Done SyncShare looping."; |
783 | 802 |
784 return FinishSyncSessionJob(job.Pass(), premature_exit); | 803 FinishSyncSessionJob(job); |
785 } | 804 } |
786 | 805 |
787 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
788 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 807 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
789 | 808 |
790 // We are interested in recording time between local nudges for datatypes. | 809 // We are interested in recording time between local nudges for datatypes. |
791 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
792 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
793 return; | 812 return; |
794 | 813 |
795 base::TimeTicks now = TimeTicks::Now(); | 814 base::TimeTicks now = TimeTicks::Now(); |
796 // Update timing information for how often datatypes are triggering nudges. | 815 // Update timing information for how often datatypes are triggering nudges. |
797 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); | 816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); |
798 iter != info.types.end(); | 817 iter != info.types.end(); |
799 ++iter) { | 818 ++iter) { |
800 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
801 last_local_nudges_by_model_type_[iter->first] = now; | 820 last_local_nudges_by_model_type_[iter->first] = now; |
802 if (previous.is_null()) | 821 if (previous.is_null()) |
803 continue; | 822 continue; |
804 | 823 |
805 #define PER_DATA_TYPE_MACRO(type_str) \ | 824 #define PER_DATA_TYPE_MACRO(type_str) \ |
806 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
807 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
808 #undef PER_DATA_TYPE_MACRO | 827 #undef PER_DATA_TYPE_MACRO |
809 } | 828 } |
810 } | 829 } |
811 | 830 |
812 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, | 831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { |
813 bool exited_prematurely) { | |
814 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 832 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
815 // Now update the status of the connection from SCM. We need this to decide | 833 // Now update the status of the connection from SCM. We need this to decide |
816 // whether we need to save/run future jobs. The notifications from SCM are | 834 // whether we need to save/run future jobs. The notifications from SCM are not |
817 // not reliable. | 835 // reliable. |
818 // | 836 // |
819 // TODO(rlarocque): crbug.com/110954 | 837 // TODO(rlarocque): crbug.com/110954 |
820 // We should get rid of the notifications and it is probably not needed to | 838 // We should get rid of the notifications and it is probably not needed to |
821 // maintain this status variable in 2 places. We should query it directly | 839 // maintain this status variable in 2 places. We should query it directly from |
822 // from SCM when needed. | 840 // SCM when needed. |
823 ServerConnectionManager* scm = session_context_->connection_manager(); | 841 ServerConnectionManager* scm = session_context_->connection_manager(); |
824 UpdateServerConnectionManagerStatus(scm->server_status()); | 842 UpdateServerConnectionManagerStatus(scm->server_status()); |
825 | 843 |
826 // Let job know that we're through syncing (calling SyncShare) at this point. | 844 if (IsSyncingCurrentlySilenced()) { |
827 bool succeeded = false; | 845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; |
828 { | 846 // TODO(sync): Investigate whether we need to check job.purpose |
| 847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) |
| 848 SaveJob(job); |
| 849 return; // Nothing to do. |
| 850 } else if (job.session->Succeeded() && |
| 851 !job.config_params.ready_task.is_null()) { |
| 852 // If this was a configuration job with a ready task, invoke it now that |
| 853 // we finished successfully. |
829 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 854 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
830 succeeded = job->Finish(exited_prematurely); | 855 job.config_params.ready_task.Run(); |
831 } | 856 } |
832 | 857 |
833 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 858 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
834 ScheduleNextSync(job.Pass(), succeeded); | 859 ScheduleNextSync(job); |
835 return succeeded; | |
836 } | 860 } |
837 | 861 |
838 void SyncSchedulerImpl::ScheduleNextSync( | 862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { |
839 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) { | |
840 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 863 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
841 DCHECK(!finished_job->session()->HasMoreToSync()); | 864 DCHECK(!old_job.session->HasMoreToSync()); |
842 | 865 |
843 AdjustPolling(finished_job.get()); | 866 AdjustPolling(&old_job); |
844 | 867 |
845 if (succeeded) { | 868 if (old_job.session->Succeeded()) { |
846 // Only reset backoff if we actually reached the server. | 869 // Only reset backoff if we actually reached the server. |
847 // It's possible that we reached the server on one attempt, then had an | 870 if (old_job.session->SuccessfullyReachedServer()) |
848 // error on the next (or didn't perform some of the server-communicating | |
849 // commands). We want to verify that, for all commands attempted, we | |
850 // successfully spoke with the server. Therefore, we verify no errors | |
851 // and at least one SYNCER_OK. | |
852 if (finished_job->session()->DidReachServer()) | |
853 wait_interval_.reset(); | 871 wait_interval_.reset(); |
854 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
855 return; | 873 return; |
856 } | 874 } |
857 | 875 |
858 if (IsSyncingCurrentlySilenced()) { | 876 if (old_job.purpose == SyncSessionJob::POLL) { |
859 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | |
860 // If we're here, it's because |job| was silenced until a server specified | |
861 // time. (Note, it had to be |job|, because DecideOnJob would not permit | |
862 // any job through while in WaitInterval::THROTTLED). | |
863 scoped_ptr<SyncSessionJob> clone = finished_job->Clone(); | |
864 if (clone->purpose() == SyncSessionJob::NUDGE) | |
865 pending_nudge_ = clone.get(); | |
866 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) | |
867 wait_interval_->pending_configure_job = clone.get(); | |
868 else | |
869 clone.reset(); // Unthrottling is enough, no need to force a canary. | |
870 | |
871 RestartWaiting(clone.Pass()); | |
872 return; | |
873 } | |
874 | |
875 if (finished_job->purpose() == SyncSessionJob::POLL) { | |
876 return; // We don't retry POLL jobs. | 877 return; // We don't retry POLL jobs. |
877 } | 878 } |
878 | 879 |
879 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
880 // if we don't succeed. Some types of errors are not likely to disappear on | 881 // if we don't succeed. Some types of errors are not likely to disappear on |
881 // their own. With the return values now available in the old_job.session, | 882 // their own. With the return values now available in the old_job.session, we |
882 // we should be able to detect such errors and only retry when we detect | 883 // should be able to detect such errors and only retry when we detect |
883 // transient errors. | 884 // transient errors. |
884 | 885 |
885 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
886 mode_ == NORMAL_MODE) { | 887 mode_ == NORMAL_MODE) { |
887 // When in normal mode, we allow up to one nudge per backoff interval. It | 888 // When in normal mode, we allow up to one nudge per backoff interval. It |
888 // appears that this was our nudge for this interval, and it failed. | 889 // appears that this was our nudge for this interval, and it failed. |
889 // | 890 // |
890 // Note: This does not prevent us from running canary jobs. For example, | 891 // Note: This does not prevent us from running canary jobs. For example, an |
891 // an IP address change might still result in another nudge being executed | 892 // IP address change might still result in another nudge being executed |
892 // during this backoff interval. | 893 // during this backoff interval. |
893 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; | 894 SDVLOG(2) << "A nudge during backoff failed"; |
894 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); | 895 |
| 896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
895 DCHECK(!wait_interval_->had_nudge); | 897 DCHECK(!wait_interval_->had_nudge); |
896 | 898 |
897 wait_interval_->had_nudge = true; | 899 wait_interval_->had_nudge = true; |
898 DCHECK(!pending_nudge_); | 900 InitOrCoalescePendingJob(old_job); |
899 | 901 RestartWaiting(); |
900 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); | |
901 pending_nudge_ = new_job.get(); | |
902 RestartWaiting(new_job.Pass()); | |
903 } else { | 902 } else { |
904 // Either this is the first failure or a consecutive failure after our | 903 // Either this is the first failure or a consecutive failure after our |
905 // backoff timer expired. We handle it the same way in either case. | 904 // backoff timer expired. We handle it the same way in either case. |
906 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
907 HandleContinuationError(finished_job.Pass()); | 906 HandleContinuationError(old_job); |
908 } | 907 } |
909 } | 908 } |
910 | 909 |
911 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
912 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 911 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
913 | 912 |
914 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 913 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
915 syncer_short_poll_interval_seconds_ : | 914 syncer_short_poll_interval_seconds_ : |
916 syncer_long_poll_interval_seconds_; | 915 syncer_long_poll_interval_seconds_; |
917 bool rate_changed = !poll_timer_.IsRunning() || | 916 bool rate_changed = !poll_timer_.IsRunning() || |
918 poll != poll_timer_.GetCurrentDelay(); | 917 poll != poll_timer_.GetCurrentDelay(); |
919 | 918 |
920 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) | 919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) |
921 poll_timer_.Reset(); | 920 poll_timer_.Reset(); |
922 | 921 |
923 if (!rate_changed) | 922 if (!rate_changed) |
924 return; | 923 return; |
925 | 924 |
926 // Adjust poll rate. | 925 // Adjust poll rate. |
927 poll_timer_.Stop(); | 926 poll_timer_.Stop(); |
928 poll_timer_.Start(FROM_HERE, poll, this, | 927 poll_timer_.Start(FROM_HERE, poll, this, |
929 &SyncSchedulerImpl::PollTimerCallback); | 928 &SyncSchedulerImpl::PollTimerCallback); |
930 } | 929 } |
931 | 930 |
932 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { | 931 void SyncSchedulerImpl::RestartWaiting() { |
933 CHECK(wait_interval_.get()); | 932 CHECK(wait_interval_.get()); |
934 wait_interval_->timer.Stop(); | 933 wait_interval_->timer.Stop(); |
935 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); | 934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
936 if (wait_interval_->mode == WaitInterval::THROTTLED) { | 935 this, &SyncSchedulerImpl::DoCanaryJob); |
937 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
938 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
939 weak_ptr_factory_.GetWeakPtr(), | |
940 base::Passed(&job))); | |
941 } else { | |
942 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
943 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | |
944 weak_ptr_factory_.GetWeakPtr(), | |
945 base::Passed(&job))); | |
946 } | |
947 } | 936 } |
948 | 937 |
949 void SyncSchedulerImpl::HandleContinuationError( | 938 void SyncSchedulerImpl::HandleContinuationError( |
950 scoped_ptr<SyncSessionJob> old_job) { | 939 const SyncSessionJob& old_job) { |
951 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 940 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
952 if (DCHECK_IS_ON()) { | 941 if (DCHECK_IS_ON()) { |
953 if (IsBackingOff()) { | 942 if (IsBackingOff()) { |
954 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); | 943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); |
955 } | 944 } |
956 } | 945 } |
957 | 946 |
958 TimeDelta length = delay_provider_->GetDelay( | 947 TimeDelta length = delay_provider_->GetDelay( |
959 IsBackingOff() ? wait_interval_->length : | 948 IsBackingOff() ? wait_interval_->length : |
960 delay_provider_->GetInitialDelay( | 949 delay_provider_->GetInitialDelay( |
961 old_job->session()->status_controller().model_neutral_state())); | 950 old_job.session->status_controller().model_neutral_state())); |
962 | 951 |
963 SDVLOG(2) << "In handle continuation error with " | 952 SDVLOG(2) << "In handle continuation error with " |
964 << SyncSessionJob::GetPurposeString(old_job->purpose()) | 953 << SyncSessionJob::GetPurposeString(old_job.purpose) |
965 << " job. The time delta(ms) is " | 954 << " job. The time delta(ms) is " |
966 << length.InMilliseconds(); | 955 << length.InMilliseconds(); |
967 | 956 |
968 // This will reset the had_nudge variable as well. | 957 // This will reset the had_nudge variable as well. |
969 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
970 length)); | 959 length)); |
971 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); | 960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
972 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
973 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
974 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
975 // Config params should always get set. | 962 // Config params should always get set. |
976 DCHECK(!old_job->config_params().ready_task.is_null()); | 963 DCHECK(!old_job.config_params.ready_task.is_null()); |
977 wait_interval_->pending_configure_job = new_job.get(); | 964 SyncSession* old = old_job.session.get(); |
| 965 SyncSession* s(new SyncSession(session_context_, this, |
| 966 old->source(), old->routing_info(), old->workers())); |
| 967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
| 968 make_linked_ptr(s), false, old_job.config_params, |
| 969 FROM_HERE); |
| 970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
978 } else { | 971 } else { |
979 // We are not in configuration mode. So wait_interval's pending job | 972 // We are not in configuration mode. So wait_interval's pending job |
980 // should be null. | 973 // should be null. |
981 DCHECK(wait_interval_->pending_configure_job == NULL); | 974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); |
982 DCHECK(!pending_nudge_); | 975 |
983 pending_nudge_ = new_job.get(); | 976 // TODO(lipalani) - handle clear user data. |
| 977 InitOrCoalescePendingJob(old_job); |
984 } | 978 } |
985 | 979 RestartWaiting(); |
986 RestartWaiting(new_job.Pass()); | |
987 } | 980 } |
988 | 981 |
989 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
990 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 983 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
991 DCHECK(weak_handle_this_.IsInitialized()); | 984 DCHECK(weak_handle_this_.IsInitialized()); |
992 SDVLOG(3) << "Posting StopImpl"; | 985 SDVLOG(3) << "Posting StopImpl"; |
993 weak_handle_this_.Call(FROM_HERE, | 986 weak_handle_this_.Call(FROM_HERE, |
994 &SyncSchedulerImpl::StopImpl, | 987 &SyncSchedulerImpl::StopImpl, |
995 callback); | 988 callback); |
996 } | 989 } |
997 | 990 |
998 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
999 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 992 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1000 SDVLOG(2) << "StopImpl called"; | 993 SDVLOG(2) << "StopImpl called"; |
1001 | 994 |
1002 // Kill any in-flight method calls. | 995 // Kill any in-flight method calls. |
1003 weak_ptr_factory_.InvalidateWeakPtrs(); | 996 weak_ptr_factory_.InvalidateWeakPtrs(); |
1004 wait_interval_.reset(); | 997 wait_interval_.reset(); |
1005 poll_timer_.Stop(); | 998 poll_timer_.Stop(); |
1006 if (started_) { | 999 if (started_) { |
1007 started_ = false; | 1000 started_ = false; |
1008 } | 1001 } |
1009 if (!callback.is_null()) | 1002 if (!callback.is_null()) |
1010 callback.Run(); | 1003 callback.Run(); |
1011 } | 1004 } |
1012 | 1005 |
1013 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { | 1006 void SyncSchedulerImpl::DoCanaryJob() { |
1014 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1015 SDVLOG(2) << "Do canary job"; | 1008 SDVLOG(2) << "Do canary job"; |
1016 | 1009 DoPendingJobIfPossible(true); |
1017 // Only set canary privileges here, when we are about to run the job. This | |
1018 // avoids confusion in managing canary bits during scheduling, when you | |
1019 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that | |
1020 // was scheduled as canary, and send it to an "unscheduled" state. | |
1021 to_be_canary->GrantCanaryPrivilege(); | |
1022 | |
1023 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { | |
1024 // TODO(tim): We should be able to remove this... | |
1025 scoped_ptr<SyncSession> temp = CreateSyncSession( | |
1026 to_be_canary->session()->source()).Pass(); | |
1027 // The routing info might have been changed since we cached the | |
1028 // pending nudge. Update it by coalescing to the latest. | |
1029 to_be_canary->mutable_session()->Coalesce(*(temp)); | |
1030 } | |
1031 DoSyncSessionJob(to_be_canary.Pass()); | |
1032 } | 1010 } |
1033 | 1011 |
1034 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { | 1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { |
1035 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1036 // If we find a scheduled pending_ job, abandon the old one and return a | 1014 SyncSessionJob* job_to_execute = NULL; |
1037 // a clone. If unscheduled, just hand over ownership. | |
1038 scoped_ptr<SyncSessionJob> candidate; | |
1039 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
1040 && wait_interval_->pending_configure_job) { | 1016 && wait_interval_->pending_configure_job.get()) { |
1041 SDVLOG(2) << "Found pending configure job"; | 1017 SDVLOG(2) << "Found pending configure job"; |
1042 candidate = | 1018 job_to_execute = wait_interval_->pending_configure_job.get(); |
1043 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); | 1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { |
1044 wait_interval_->pending_configure_job = candidate.get(); | |
1045 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
1046 SDVLOG(2) << "Found pending nudge job"; | 1020 SDVLOG(2) << "Found pending nudge job"; |
1047 candidate = pending_nudge_->CloneAndAbandon(); | 1021 |
1048 pending_nudge_ = candidate.get(); | 1022 scoped_ptr<SyncSession> session(CreateSyncSession( |
1049 unscheduled_nudge_storage_.reset(); | 1023 pending_nudge_->session->source())); |
| 1024 |
| 1025 // Also the routing info might have been changed since we cached the |
| 1026 // pending nudge. Update it by coalescing to the latest. |
| 1027 pending_nudge_->session->Coalesce(*(session.get())); |
| 1028 // The pending nudge would be cleared in the DoSyncSessionJob function. |
| 1029 job_to_execute = pending_nudge_.get(); |
1050 } | 1030 } |
1051 return candidate.Pass(); | 1031 |
| 1032 if (job_to_execute != NULL) { |
| 1033 SDVLOG(2) << "Executing pending job"; |
| 1034 SyncSessionJob copy = *job_to_execute; |
| 1035 copy.is_canary_job = is_canary_job; |
| 1036 DoSyncSessionJob(copy); |
| 1037 } |
1052 } | 1038 } |
1053 | 1039 |
1054 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( | 1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( |
1055 const SyncSourceInfo& source) { | 1041 const SyncSourceInfo& source) { |
1056 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1057 DVLOG(2) << "Creating sync session with routes " | 1043 DVLOG(2) << "Creating sync session with routes " |
1058 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
1059 | 1045 |
1060 SyncSourceInfo info(source); | 1046 SyncSourceInfo info(source); |
1061 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, | 1047 SyncSession* session(new SyncSession(session_context_, this, info, |
1062 session_context_->routing_info(), session_context_->workers())); | 1048 session_context_->routing_info(), session_context_->workers())); |
| 1049 |
| 1050 return session; |
1063 } | 1051 } |
1064 | 1052 |
1065 void SyncSchedulerImpl::PollTimerCallback() { | 1053 void SyncSchedulerImpl::PollTimerCallback() { |
1066 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1067 ModelSafeRoutingInfo r; | 1055 ModelSafeRoutingInfo r; |
1068 ModelTypeInvalidationMap invalidation_map = | 1056 ModelTypeInvalidationMap invalidation_map = |
1069 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | 1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
1070 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
1071 scoped_ptr<SyncSession> s(CreateSyncSession(info)); | 1059 SyncSession* s = CreateSyncSession(info); |
1072 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | 1060 |
1073 TimeTicks::Now(), | 1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), |
1074 s.Pass(), | 1062 make_linked_ptr(s), |
1075 ConfigurationParams(), | 1063 false, |
1076 FROM_HERE)); | 1064 ConfigurationParams(), |
1077 ScheduleSyncSessionJob(job.Pass()); | 1065 FROM_HERE); |
| 1066 |
| 1067 ScheduleSyncSessionJob(job); |
1078 } | 1068 } |
1079 | 1069 |
1080 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { | 1070 void SyncSchedulerImpl::Unthrottle() { |
1081 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1082 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1083 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") | 1073 SDVLOG(2) << "Unthrottled."; |
1084 << "canary."; | 1074 DoCanaryJob(); |
1085 if (to_be_canary.get()) | |
1086 DoCanaryJob(to_be_canary.Pass()); | |
1087 | |
1088 // TODO(tim): The way DecideOnJob works today, canary privileges aren't | |
1089 // enough to bypass a THROTTLED wait interval, which would suggest we need | |
1090 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is | |
1091 // probably the "right" thing to do). Bug 154216. | |
1092 wait_interval_.reset(); | 1075 wait_interval_.reset(); |
1093 } | 1076 } |
1094 | 1077 |
1095 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1096 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1097 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1098 } | 1081 } |
1099 | 1082 |
1100 bool SyncSchedulerImpl::IsBackingOff() const { | 1083 bool SyncSchedulerImpl::IsBackingOff() const { |
1101 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1102 return wait_interval_.get() && wait_interval_->mode == | 1085 return wait_interval_.get() && wait_interval_->mode == |
1103 WaitInterval::EXPONENTIAL_BACKOFF; | 1086 WaitInterval::EXPONENTIAL_BACKOFF; |
1104 } | 1087 } |
1105 | 1088 |
1106 void SyncSchedulerImpl::OnSilencedUntil( | 1089 void SyncSchedulerImpl::OnSilencedUntil( |
1107 const base::TimeTicks& silenced_until) { | 1090 const base::TimeTicks& silenced_until) { |
1108 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1109 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
1110 silenced_until - TimeTicks::Now())); | 1093 silenced_until - TimeTicks::Now())); |
| 1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, |
| 1095 &SyncSchedulerImpl::Unthrottle); |
1111 } | 1096 } |
1112 | 1097 |
1113 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
1114 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1115 return wait_interval_.get() && wait_interval_->mode == | 1100 return wait_interval_.get() && wait_interval_->mode == |
1116 WaitInterval::THROTTLED; | 1101 WaitInterval::THROTTLED; |
1117 } | 1102 } |
1118 | 1103 |
1119 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
1120 const base::TimeDelta& new_interval) { | 1105 const base::TimeDelta& new_interval) { |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1174 | 1159 |
1175 #undef SDVLOG_LOC | 1160 #undef SDVLOG_LOC |
1176 | 1161 |
1177 #undef SDVLOG | 1162 #undef SDVLOG |
1178 | 1163 |
1179 #undef SLOG | 1164 #undef SLOG |
1180 | 1165 |
1181 #undef ENUM_CASE | 1166 #undef ENUM_CASE |
1182 | 1167 |
1183 } // namespace syncer | 1168 } // namespace syncer |
OLD | NEW |