OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/bind_helpers.h" | |
12 #include "base/compiler_specific.h" | 13 #include "base/compiler_specific.h" |
13 #include "base/location.h" | 14 #include "base/location.h" |
14 #include "base/logging.h" | 15 #include "base/logging.h" |
15 #include "base/message_loop.h" | 16 #include "base/message_loop.h" |
16 #include "sync/engine/backoff_delay_provider.h" | 17 #include "sync/engine/backoff_delay_provider.h" |
17 #include "sync/engine/syncer.h" | 18 #include "sync/engine/syncer.h" |
18 #include "sync/engine/throttled_data_type_tracker.h" | 19 #include "sync/engine/throttled_data_type_tracker.h" |
19 #include "sync/protocol/proto_enum_conversions.h" | 20 #include "sync/protocol/proto_enum_conversions.h" |
20 #include "sync/protocol/sync.pb.h" | 21 #include "sync/protocol/sync.pb.h" |
21 #include "sync/util/data_type_histogram.h" | 22 #include "sync/util/data_type_histogram.h" |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
76 : source(source), | 77 : source(source), |
77 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
78 routing_info(routing_info), | 79 routing_info(routing_info), |
79 ready_task(ready_task) { | 80 ready_task(ready_task) { |
80 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
81 } | 82 } |
82 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
83 | 84 |
84 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
85 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
86 had_nudge(false) { | 87 had_nudge(false), |
88 pending_configure_job(NULL) { | |
87 } | 89 } |
88 | 90 |
91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
92 : mode(mode), had_nudge(false), length(length), | |
93 pending_configure_job(NULL) { } | |
94 | |
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
90 | 96 |
91 #define ENUM_CASE(x) case x: return #x; break; | 97 #define ENUM_CASE(x) case x: return #x; break; |
92 | 98 |
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
94 switch (mode) { | 100 switch (mode) { |
95 ENUM_CASE(UNKNOWN); | 101 ENUM_CASE(UNKNOWN); |
96 ENUM_CASE(EXPONENTIAL_BACKOFF); | 102 ENUM_CASE(EXPONENTIAL_BACKOFF); |
97 ENUM_CASE(THROTTLED); | 103 ENUM_CASE(THROTTLED); |
98 } | 104 } |
99 NOTREACHED(); | 105 NOTREACHED(); |
100 return ""; | 106 return ""; |
101 } | 107 } |
102 | 108 |
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() | |
104 : purpose(UNKNOWN), | |
105 is_canary_job(false) { | |
106 } | |
107 | |
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} | |
109 | |
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | |
111 base::TimeTicks start, | |
112 linked_ptr<sessions::SyncSession> session, | |
113 bool is_canary_job, | |
114 const ConfigurationParams& config_params, | |
115 const tracked_objects::Location& from_here) | |
116 : purpose(purpose), | |
117 scheduled_start(start), | |
118 session(session), | |
119 is_canary_job(is_canary_job), | |
120 config_params(config_params), | |
121 from_here(from_here) { | |
122 } | |
123 | |
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( | |
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { | |
126 switch (purpose) { | |
127 ENUM_CASE(UNKNOWN); | |
128 ENUM_CASE(POLL); | |
129 ENUM_CASE(NUDGE); | |
130 ENUM_CASE(CONFIGURATION); | |
131 } | |
132 NOTREACHED(); | |
133 return ""; | |
134 } | |
135 | |
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
137 NudgeSource source) { | 110 NudgeSource source) { |
138 switch (source) { | 111 switch (source) { |
139 case NUDGE_SOURCE_NOTIFICATION: | 112 case NUDGE_SOURCE_NOTIFICATION: |
140 return GetUpdatesCallerInfo::NOTIFICATION; | 113 return GetUpdatesCallerInfo::NOTIFICATION; |
141 case NUDGE_SOURCE_LOCAL: | 114 case NUDGE_SOURCE_LOCAL: |
142 return GetUpdatesCallerInfo::LOCAL; | 115 return GetUpdatesCallerInfo::LOCAL; |
143 case NUDGE_SOURCE_CONTINUATION: | 116 case NUDGE_SOURCE_CONTINUATION: |
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 117 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
145 case NUDGE_SOURCE_LOCAL_REFRESH: | 118 case NUDGE_SOURCE_LOCAL_REFRESH: |
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 119 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
147 case NUDGE_SOURCE_UNKNOWN: | 120 case NUDGE_SOURCE_UNKNOWN: |
148 return GetUpdatesCallerInfo::UNKNOWN; | 121 return GetUpdatesCallerInfo::UNKNOWN; |
149 default: | 122 default: |
150 NOTREACHED(); | 123 NOTREACHED(); |
151 return GetUpdatesCallerInfo::UNKNOWN; | 124 return GetUpdatesCallerInfo::UNKNOWN; |
152 } | 125 } |
153 } | 126 } |
154 | 127 |
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
156 : mode(mode), had_nudge(false), length(length) { } | |
157 | |
158 // Helper macros to log with the syncer thread name; useful when there | 128 // Helper macros to log with the syncer thread name; useful when there |
159 // are multiple syncer threads involved. | 129 // are multiple syncer threads involved. |
160 | 130 |
161 #define SLOG(severity) LOG(severity) << name_ << ": " | 131 #define SLOG(severity) LOG(severity) << name_ << ": " |
162 | 132 |
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 133 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
164 | 134 |
165 #define SDVLOG_LOC(from_here, verbose_level) \ | 135 #define SDVLOG_LOC(from_here, verbose_level) \ |
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 136 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
167 | 137 |
(...skipping 30 matching lines...) Expand all Loading... | |
198 syncer_short_poll_interval_seconds_( | 168 syncer_short_poll_interval_seconds_( |
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 169 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
200 syncer_long_poll_interval_seconds_( | 170 syncer_long_poll_interval_seconds_( |
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 171 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
202 sessions_commit_delay_( | 172 sessions_commit_delay_( |
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 173 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
204 mode_(NORMAL_MODE), | 174 mode_(NORMAL_MODE), |
205 // Start with assuming everything is fine with the connection. | 175 // Start with assuming everything is fine with the connection. |
206 // At the end of the sync cycle we would have the correct status. | 176 // At the end of the sync cycle we would have the correct status. |
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 177 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
178 pending_nudge_(NULL), | |
208 delay_provider_(delay_provider), | 179 delay_provider_(delay_provider), |
209 syncer_(syncer), | 180 syncer_(syncer), |
210 session_context_(context), | 181 session_context_(context), |
211 no_scheduling_allowed_(false) { | 182 no_scheduling_allowed_(false) { |
212 DCHECK(sync_loop_); | 183 DCHECK(sync_loop_); |
213 } | 184 } |
214 | 185 |
215 SyncSchedulerImpl::~SyncSchedulerImpl() { | 186 SyncSchedulerImpl::~SyncSchedulerImpl() { |
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 187 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
217 StopImpl(base::Closure()); | 188 StopImpl(base::Closure()); |
(...skipping 16 matching lines...) Expand all Loading... | |
234 void SyncSchedulerImpl::OnConnectionStatusChange() { | 205 void SyncSchedulerImpl::OnConnectionStatusChange() { |
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 206 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
236 // Optimistically assume that the connection is fixed and try | 207 // Optimistically assume that the connection is fixed and try |
237 // connecting. | 208 // connecting. |
238 OnServerConnectionErrorFixed(); | 209 OnServerConnectionErrorFixed(); |
239 } | 210 } |
240 } | 211 } |
241 | 212 |
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 213 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 214 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
215 // There could be a pending nudge or configuration job in several cases: | |
216 // | |
217 // 1. We're in exponential backoff. | |
218 // 2. We're silenced / throttled. | |
219 // 3. A nudge was saved previously due to not having a valid auth token. | |
220 // 4. A nudge was scheduled + saved while in configuration mode. | |
221 // | |
222 // In all cases except (2), we want to retry contacting the server. We | |
223 // call DoCanaryJob to achieve this, and note that nothing -- not even a | |
224 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
225 // has the authority to do that is the Unthrottle timer. | |
226 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
227 if (!pending.get()) | |
228 return; | |
229 | |
244 PostTask(FROM_HERE, "DoCanaryJob", | 230 PostTask(FROM_HERE, "DoCanaryJob", |
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 231 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
246 weak_ptr_factory_.GetWeakPtr())); | 232 weak_ptr_factory_.GetWeakPtr(), |
247 | 233 base::Passed(&pending))); |
248 } | 234 } |
249 | 235 |
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 236 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
251 HttpResponse::ServerConnectionCode code) { | 237 HttpResponse::ServerConnectionCode code) { |
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 238 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
253 SDVLOG(2) << "New server connection code: " | 239 SDVLOG(2) << "New server connection code: " |
254 << HttpResponse::GetServerConnectionCodeString(code); | 240 << HttpResponse::GetServerConnectionCodeString(code); |
255 | 241 |
256 connection_code_ = code; | 242 connection_code_ = code; |
257 } | 243 } |
(...skipping 15 matching lines...) Expand all Loading... | |
273 Mode old_mode = mode_; | 259 Mode old_mode = mode_; |
274 mode_ = mode; | 260 mode_ = mode; |
275 AdjustPolling(NULL); // Will kick start poll timer if needed. | 261 AdjustPolling(NULL); // Will kick start poll timer if needed. |
276 | 262 |
277 if (old_mode != mode_) { | 263 if (old_mode != mode_) { |
278 // We just changed our mode. See if there are any pending jobs that we could | 264 // We just changed our mode. See if there are any pending jobs that we could |
279 // execute in the new mode. | 265 // execute in the new mode. |
280 if (mode_ == NORMAL_MODE) { | 266 if (mode_ == NORMAL_MODE) { |
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 267 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
282 // has not yet completed. | 268 // has not yet completed. |
283 DCHECK(!wait_interval_.get() || | 269 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
284 !wait_interval_->pending_configure_job.get()); | |
285 } | 270 } |
286 | 271 |
287 DoPendingJobIfPossible(false); | 272 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
273 if (pending.get()) { | |
274 // FIXME: Remove this CreateSyncSession call... | |
275 scoped_ptr<SyncSession> session(CreateSyncSession( | |
276 pending->session()->source())); | |
277 // Also the routing info might have been changed since we cached the | |
278 // pending nudge. Update it by coalescing to the latest. | |
279 pending->mutable_session()->Coalesce(*(session)); | |
280 SDVLOG(2) << "Executing pending job. Good luck!"; | |
281 DoSyncSessionJob(pending.Pass()); | |
282 } | |
288 } | 283 } |
289 } | 284 } |
290 | 285 |
291 void SyncSchedulerImpl::SendInitialSnapshot() { | 286 void SyncSchedulerImpl::SendInitialSnapshot() { |
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 287 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 288 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
294 SyncSourceInfo(), ModelSafeRoutingInfo(), | 289 SyncSourceInfo(), ModelSafeRoutingInfo(), |
295 std::vector<ModelSafeWorker*>())); | 290 std::vector<ModelSafeWorker*>())); |
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 291 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
297 event.snapshot = dummy->TakeSnapshot(); | 292 event.snapshot = dummy->TakeSnapshot(); |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
332 bool SyncSchedulerImpl::ScheduleConfiguration( | 327 bool SyncSchedulerImpl::ScheduleConfiguration( |
333 const ConfigurationParams& params) { | 328 const ConfigurationParams& params) { |
334 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 329 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
335 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 330 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
336 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 331 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
337 DCHECK(!params.ready_task.is_null()); | 332 DCHECK(!params.ready_task.is_null()); |
338 SDVLOG(2) << "Reconfiguring syncer."; | 333 SDVLOG(2) << "Reconfiguring syncer."; |
339 | 334 |
340 // Only one configuration is allowed at a time. Verify we're not waiting | 335 // Only one configuration is allowed at a time. Verify we're not waiting |
341 // for a pending configure job. | 336 // for a pending configure job. |
342 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | 337 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
343 | 338 |
344 // TODO(sync): now that ModelChanging commands only use those workers within | 339 // TODO(sync): now that ModelChanging commands only use those workers within |
345 // the routing info, we don't really need |restricted_workers|. Remove it. | 340 // the routing info, we don't really need |restricted_workers|. Remove it. |
346 // crbug.com/133030 | 341 // crbug.com/133030 |
347 ModelSafeRoutingInfo restricted_routes; | 342 ModelSafeRoutingInfo restricted_routes; |
348 std::vector<ModelSafeWorker*> restricted_workers; | 343 std::vector<ModelSafeWorker*> restricted_workers; |
349 BuildModelSafeParams(params.types_to_download, | 344 BuildModelSafeParams(params.types_to_download, |
350 params.routing_info, | 345 params.routing_info, |
351 session_context_->workers(), | 346 session_context_->workers(), |
352 &restricted_routes, | 347 &restricted_routes, |
353 &restricted_workers); | 348 &restricted_workers); |
354 session_context_->set_routing_info(params.routing_info); | 349 session_context_->set_routing_info(params.routing_info); |
355 | 350 |
356 // Only reconfigure if we have types to download. | 351 // Only reconfigure if we have types to download. |
357 if (!params.types_to_download.Empty()) { | 352 if (!params.types_to_download.Empty()) { |
358 DCHECK(!restricted_routes.empty()); | 353 DCHECK(!restricted_routes.empty()); |
359 linked_ptr<SyncSession> session(new SyncSession( | 354 scoped_ptr<SyncSession> session(new SyncSession( |
360 session_context_, | 355 session_context_, |
361 this, | 356 this, |
362 SyncSourceInfo(params.source, | 357 SyncSourceInfo(params.source, |
363 ModelSafeRoutingInfoToStateMap( | 358 ModelSafeRoutingInfoToStateMap( |
364 restricted_routes, | 359 restricted_routes, |
365 std::string())), | 360 std::string())), |
366 restricted_routes, | 361 restricted_routes, |
367 restricted_workers)); | 362 restricted_workers)); |
368 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | 363 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
369 TimeTicks::Now(), | 364 SyncSessionJob::CONFIGURATION, |
370 session, | 365 TimeTicks::Now(), |
371 false, | 366 session.Pass(), |
372 params, | 367 params, |
373 FROM_HERE); | 368 FROM_HERE)); |
374 DoSyncSessionJob(job); | 369 bool succeeded = DoSyncSessionJob(job.Pass()); |
375 | 370 |
376 // If we failed, the job would have been saved as the pending configure | 371 // If we failed, the job would have been saved as the pending configure |
377 // job and a wait interval would have been set. | 372 // job and a wait interval would have been set. |
378 if (!session->Succeeded()) { | 373 if (!succeeded) { |
379 DCHECK(wait_interval_.get() && | 374 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
380 wait_interval_->pending_configure_job.get()); | |
381 return false; | 375 return false; |
382 } | 376 } |
383 } else { | 377 } else { |
384 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 378 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
385 params.ready_task.Run(); | 379 params.ready_task.Run(); |
386 } | 380 } |
387 | 381 |
388 return true; | 382 return true; |
389 } | 383 } |
390 | 384 |
391 SyncSchedulerImpl::JobProcessDecision | 385 SyncSchedulerImpl::JobProcessDecision |
392 SyncSchedulerImpl::DecideWhileInWaitInterval( | 386 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) { |
393 const SyncSessionJob& job) { | |
394 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 387 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
395 DCHECK(wait_interval_.get()); | 388 DCHECK(wait_interval_.get()); |
396 | 389 |
397 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 390 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
398 << WaitInterval::GetModeString(wait_interval_->mode) | 391 << WaitInterval::GetModeString(wait_interval_->mode) |
399 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 392 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
400 << (job.is_canary_job ? " (canary)" : ""); | 393 << (job->is_canary() ? " (canary)" : ""); |
401 | 394 |
402 if (job.purpose == SyncSessionJob::POLL) | 395 if (job->purpose() == SyncSessionJob::POLL) |
403 return DROP; | 396 return DROP; |
404 | 397 |
405 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 398 // If we save a job while in a WaitInterval, there is a well-defined moment |
406 job.purpose == SyncSessionJob::CONFIGURATION); | 399 // in time in the future when it makes sense for that SAVE-worthy job to try |
400 // running again -- the end of the WaitInterval. | |
401 DCHECK(job->purpose() == SyncSessionJob::NUDGE || | |
402 job->purpose() == SyncSessionJob::CONFIGURATION); | |
403 | |
404 // If throttled, there's a clock ticking to unthrottle. We want to get | |
405 // on the same train. | |
407 if (wait_interval_->mode == WaitInterval::THROTTLED) | 406 if (wait_interval_->mode == WaitInterval::THROTTLED) |
408 return SAVE; | 407 return SAVE; |
409 | 408 |
410 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 409 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
411 if (job.purpose == SyncSessionJob::NUDGE) { | 410 if (job->purpose() == SyncSessionJob::NUDGE) { |
412 if (mode_ == CONFIGURATION_MODE) | 411 if (mode_ == CONFIGURATION_MODE) |
413 return SAVE; | 412 return SAVE; |
414 | 413 |
415 // If we already had one nudge then just drop this nudge. We will retry | 414 // If we already had one nudge then just drop this nudge. We will retry |
416 // later when the timer runs out. | 415 // later when the timer runs out. |
417 if (!job.is_canary_job) | 416 if (!job->is_canary()) |
418 return wait_interval_->had_nudge ? DROP : CONTINUE; | 417 return wait_interval_->had_nudge ? DROP : CONTINUE; |
419 else // We are here because timer ran out. So retry. | 418 else // We are here because timer ran out. So retry. |
420 return CONTINUE; | 419 return CONTINUE; |
421 } | 420 } |
422 return job.is_canary_job ? CONTINUE : SAVE; | 421 return job->is_canary() ? CONTINUE : SAVE; |
423 } | 422 } |
424 | 423 |
425 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 424 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
426 const SyncSessionJob& job) { | 425 const SyncSessionJob* job) { |
427 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 426 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
428 | 427 |
429 // See if our type is throttled. | 428 // See if our type is throttled. |
430 ModelTypeSet throttled_types = | 429 ModelTypeSet throttled_types = |
431 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 430 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
432 if (job.purpose == SyncSessionJob::NUDGE && | 431 if (job->purpose() == SyncSessionJob::NUDGE && |
433 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 432 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
434 ModelTypeSet requested_types; | 433 ModelTypeSet requested_types; |
435 for (ModelTypeStateMap::const_iterator i = | 434 for (ModelTypeStateMap::const_iterator i = |
436 job.session->source().types.begin(); | 435 job->session()->source().types.begin(); |
437 i != job.session->source().types.end(); | 436 i != job->session()->source().types.end(); |
438 ++i) { | 437 ++i) { |
439 requested_types.Put(i->first); | 438 requested_types.Put(i->first); |
440 } | 439 } |
441 | 440 |
441 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
442 // a per-datatype "unthrottle" event as something that should force a | |
443 // canary job. For this reason, there's no good time to reschedule this job | |
444 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
445 // Note that there may already be such an event if we're in a WaitInterval, | |
446 // so we can retry it then. | |
442 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 447 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
443 return SAVE; | 448 return SAVE; |
444 } | 449 } |
445 | 450 |
446 if (wait_interval_.get()) | 451 if (wait_interval_.get()) |
447 return DecideWhileInWaitInterval(job); | 452 return DecideWhileInWaitInterval(job); |
448 | 453 |
449 if (mode_ == CONFIGURATION_MODE) { | 454 if (mode_ == CONFIGURATION_MODE) { |
450 if (job.purpose == SyncSessionJob::NUDGE) | 455 if (job->purpose() == SyncSessionJob::NUDGE) |
451 return SAVE; | 456 return SAVE; // Running requires a mode switch. |
452 else if (job.purpose == SyncSessionJob::CONFIGURATION) | 457 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
453 return CONTINUE; | 458 return CONTINUE; |
454 else | 459 else |
455 return DROP; | 460 return DROP; |
456 } | 461 } |
457 | 462 |
458 // We are in normal mode. | 463 // We are in normal mode. |
459 DCHECK_EQ(mode_, NORMAL_MODE); | 464 DCHECK_EQ(mode_, NORMAL_MODE); |
460 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 465 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION); |
461 | 466 |
462 // Note about some subtle scheduling semantics. | 467 // Note about some subtle scheduling semantics. |
463 // | 468 // |
464 // It's possible at this point that |job| is known to be unnecessary, and | 469 // It's possible at this point that |job| is known to be unnecessary, and |
465 // dropping it would be perfectly safe and correct. Consider | 470 // dropping it would be perfectly safe and correct. Consider |
466 // | 471 // |
467 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 472 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
468 // the time that the last successful all-datatype NUDGE completed. | 473 // the time that the last successful all-datatype NUDGE completed. |
469 // | 474 // |
470 // 2) |job| is a NUDGE (for any combination of types) with a | 475 // 2) |job| is a NUDGE (for any combination of types) with a |
(...skipping 22 matching lines...) Expand all Loading... | |
493 // * It's not strictly "impossible", but it would be reentrant and hence | 498 // * It's not strictly "impossible", but it would be reentrant and hence |
494 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 499 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
495 // legal side effect of any of the work being done as part of a sync cycle. | 500 // legal side effect of any of the work being done as part of a sync cycle. |
496 // See |no_scheduling_allowed_| for details. | 501 // See |no_scheduling_allowed_| for details. |
497 | 502 |
498 // Decision now rests on state of auth tokens. | 503 // Decision now rests on state of auth tokens. |
499 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 504 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
500 return CONTINUE; | 505 return CONTINUE; |
501 | 506 |
502 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 507 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
503 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 508 // Running the job would require updated auth, so we can't honour |
509 // job.scheduled_start(). | |
510 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
504 } | 511 } |
505 | 512 |
506 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 513 bool SyncSchedulerImpl::ShouldRunJob(SyncSessionJob* job) { |
507 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
508 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | |
509 if (pending_nudge_.get() == NULL) { | |
510 SDVLOG(2) << "Creating a pending nudge job"; | |
511 SyncSession* s = job.session.get(); | |
512 | |
513 // Get a fresh session with similar configuration as before (resets | |
514 // StatusController). | |
515 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | |
516 s->delegate(), s->source(), s->routing_info(), s->workers())); | |
517 | |
518 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | |
519 make_linked_ptr(session.release()), false, | |
520 ConfigurationParams(), job.from_here); | |
521 pending_nudge_.reset(new SyncSessionJob(new_job)); | |
522 return; | |
523 } | |
524 | |
525 SDVLOG(2) << "Coalescing a pending nudge"; | |
526 pending_nudge_->session->Coalesce(*(job.session.get())); | |
527 pending_nudge_->scheduled_start = job.scheduled_start; | |
528 | |
529 // Unfortunately the nudge location cannot be modified. So it stores the | |
530 // location of the first caller. | |
531 } | |
532 | |
533 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { | |
534 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 514 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
535 DCHECK(started_); | 515 DCHECK(started_); |
536 | 516 |
537 JobProcessDecision decision = DecideOnJob(job); | 517 JobProcessDecision decision = DecideOnJob(job); |
538 SDVLOG(2) << "Should run " | 518 SDVLOG(2) << "Should run " |
539 << SyncSessionJob::GetPurposeString(job.purpose) | 519 << SyncSessionJob::GetPurposeString(job->purpose()) |
540 << " job in mode " << GetModeString(mode_) | 520 << " job " << job->session() |
521 << " in mode " << GetModeString(mode_) | |
541 << ": " << GetDecisionString(decision); | 522 << ": " << GetDecisionString(decision); |
542 if (decision != SAVE) | 523 if (decision != SAVE) |
543 return decision == CONTINUE; | 524 return decision == CONTINUE; |
544 | 525 |
545 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 526 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
546 SyncSessionJob::CONFIGURATION); | 527 if (is_nudge && pending_nudge_) { |
528 SDVLOG(2) << "Coalescing a pending nudge"; | |
529 // TODO(tim): This basically means we never use the more-careful coalescing | |
530 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
531 // times, because we're calling ShouldRunJob first. Pull this out | |
532 // into a function to coalesce + set start times and reuse. | |
533 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | |
534 pending_nudge_->set_scheduled_start(job->scheduled_start()); | |
535 return false; | |
536 } | |
547 | 537 |
548 SaveJob(job); | 538 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon().Pass(); |
539 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
540 // This job should be made the new canary. | |
541 if (is_nudge) { | |
542 pending_nudge_ = job_to_save.get(); | |
543 } else { | |
544 SDVLOG(2) << "Saving a configuration job"; | |
545 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
546 DCHECK(!wait_interval_->pending_configure_job); | |
547 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
548 DCHECK(!job->config_params().ready_task.is_null()); | |
549 // The only nudge that could exist is a scheduled canary nudge. | |
550 DCHECK(!unscheduled_nudge_storage_.get()); | |
551 if (pending_nudge_) { | |
552 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
553 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); | |
554 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
555 } | |
556 wait_interval_->pending_configure_job = job_to_save.get(); | |
557 } | |
558 wait_interval_->length = | |
559 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
560 RestartWaiting(job_to_save.Pass()); | |
561 return false; | |
562 } | |
563 | |
564 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
565 // when we're not in a WaitInterval. See bug 147736. | |
566 DCHECK(is_nudge); | |
567 // There may or may not be a pending_configure_job. Either way this nudge | |
568 // is unschedulable. | |
569 pending_nudge_ = job_to_save.get(); | |
570 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
549 return false; | 571 return false; |
550 } | 572 } |
551 | 573 |
552 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { | |
553 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
554 if (job.purpose == SyncSessionJob::NUDGE) { | |
555 SDVLOG(2) << "Saving a nudge job"; | |
556 InitOrCoalescePendingJob(job); | |
557 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | |
558 SDVLOG(2) << "Saving a configuration job"; | |
559 DCHECK(wait_interval_.get()); | |
560 DCHECK(mode_ == CONFIGURATION_MODE); | |
561 | |
562 // Config params should always get set. | |
563 DCHECK(!job.config_params.ready_task.is_null()); | |
564 SyncSession* old = job.session.get(); | |
565 SyncSession* s(new SyncSession(session_context_, this, old->source(), | |
566 old->routing_info(), old->workers())); | |
567 SyncSessionJob new_job(job.purpose, | |
568 TimeTicks::Now(), | |
569 make_linked_ptr(s), | |
570 false, | |
571 job.config_params, | |
572 job.from_here); | |
573 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | |
574 } // drop the rest. | |
575 // TODO(sync): Is it okay to drop the rest? It's weird that | |
576 // SaveJob() only does what it says sometimes. (See | |
577 // http://crbug.com/90868.) | |
578 } | |
579 | |
580 // Functor for std::find_if to search by ModelSafeGroup. | 574 // Functor for std::find_if to search by ModelSafeGroup. |
581 struct ModelSafeWorkerGroupIs { | 575 struct ModelSafeWorkerGroupIs { |
582 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 576 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
583 bool operator()(ModelSafeWorker* w) { | 577 bool operator()(ModelSafeWorker* w) { |
584 return group == w->GetModelSafeGroup(); | 578 return group == w->GetModelSafeGroup(); |
585 } | 579 } |
586 ModelSafeGroup group; | 580 ModelSafeGroup group; |
587 }; | 581 }; |
588 | 582 |
589 void SyncSchedulerImpl::ScheduleNudgeAsync( | 583 void SyncSchedulerImpl::ScheduleNudgeAsync( |
590 const TimeDelta& delay, | 584 const TimeDelta& delay, |
591 NudgeSource source, ModelTypeSet types, | 585 NudgeSource source, ModelTypeSet types, |
592 const tracked_objects::Location& nudge_location) { | 586 const tracked_objects::Location& nudge_location) { |
593 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 587 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
594 SDVLOG_LOC(nudge_location, 2) | 588 SDVLOG_LOC(nudge_location, 2) |
595 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 589 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
596 << "source " << GetNudgeSourceString(source) << ", " | 590 << "source " << GetNudgeSourceString(source) << ", " |
597 << "types " << ModelTypeSetToString(types); | 591 << "types " << ModelTypeSetToString(types); |
598 | 592 |
599 ModelTypeStateMap type_state_map = | 593 ModelTypeStateMap type_state_map = |
600 ModelTypeSetToStateMap(types, std::string()); | 594 ModelTypeSetToStateMap(types, std::string()); |
601 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 595 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
602 GetUpdatesFromNudgeSource(source), | 596 GetUpdatesFromNudgeSource(source), |
603 type_state_map, | 597 type_state_map, |
604 false, | |
605 nudge_location); | 598 nudge_location); |
606 } | 599 } |
607 | 600 |
608 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 601 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
609 const TimeDelta& delay, | 602 const TimeDelta& delay, |
610 NudgeSource source, const ModelTypeStateMap& type_state_map, | 603 NudgeSource source, const ModelTypeStateMap& type_state_map, |
611 const tracked_objects::Location& nudge_location) { | 604 const tracked_objects::Location& nudge_location) { |
612 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 605 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
613 SDVLOG_LOC(nudge_location, 2) | 606 SDVLOG_LOC(nudge_location, 2) |
614 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 607 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
615 << "source " << GetNudgeSourceString(source) << ", " | 608 << "source " << GetNudgeSourceString(source) << ", " |
616 << "payloads " | 609 << "payloads " |
617 << ModelTypeStateMapToString(type_state_map); | 610 << ModelTypeStateMapToString(type_state_map); |
618 | 611 |
619 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 612 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
620 GetUpdatesFromNudgeSource(source), | 613 GetUpdatesFromNudgeSource(source), |
621 type_state_map, | 614 type_state_map, |
622 false, | |
623 nudge_location); | 615 nudge_location); |
624 } | 616 } |
625 | 617 |
626 void SyncSchedulerImpl::ScheduleNudgeImpl( | 618 void SyncSchedulerImpl::ScheduleNudgeImpl( |
627 const TimeDelta& delay, | 619 const TimeDelta& delay, |
628 GetUpdatesCallerInfo::GetUpdatesSource source, | 620 GetUpdatesCallerInfo::GetUpdatesSource source, |
629 const ModelTypeStateMap& type_state_map, | 621 const ModelTypeStateMap& type_state_map, |
630 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 622 const tracked_objects::Location& nudge_location) { |
631 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 623 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
632 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; | 624 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; |
633 | 625 |
634 SDVLOG_LOC(nudge_location, 2) | 626 SDVLOG_LOC(nudge_location, 2) |
635 << "In ScheduleNudgeImpl with delay " | 627 << "In ScheduleNudgeImpl with delay " |
636 << delay.InMilliseconds() << " ms, " | 628 << delay.InMilliseconds() << " ms, " |
637 << "source " << GetUpdatesSourceString(source) << ", " | 629 << "source " << GetUpdatesSourceString(source) << ", " |
638 << "payloads " | 630 << "payloads " |
639 << ModelTypeStateMapToString(type_state_map) | 631 << ModelTypeStateMapToString(type_state_map); |
640 << (is_canary_job ? " (canary)" : ""); | |
641 | 632 |
642 SyncSourceInfo info(source, type_state_map); | 633 SyncSourceInfo info(source, type_state_map); |
643 UpdateNudgeTimeRecords(info); | 634 UpdateNudgeTimeRecords(info); |
644 | 635 |
645 SyncSession* session(CreateSyncSession(info)); | 636 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
646 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 637 SyncSessionJob::NUDGE, |
647 make_linked_ptr(session), is_canary_job, | 638 TimeTicks::Now() + delay, |
648 ConfigurationParams(), nudge_location); | 639 CreateSyncSession(info).Pass(), |
640 ConfigurationParams(), | |
641 nudge_location)); | |
649 | 642 |
650 session = NULL; | 643 if (!ShouldRunJob(job.get())) |
651 if (!ShouldRunJob(job)) | |
652 return; | 644 return; |
653 | 645 |
654 if (pending_nudge_.get()) { | 646 if (pending_nudge_) { |
655 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | |
tim (not reviewing)
2012/09/13 09:00:47
Two cases - 1) this nudge was scheduled to start
| |
656 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | |
657 return; | |
658 } | |
659 | |
660 SDVLOG(2) << "Coalescing pending nudge"; | |
661 pending_nudge_->session->Coalesce(*(job.session.get())); | |
662 | |
663 SDVLOG(2) << "Rescheduling pending nudge"; | 647 SDVLOG(2) << "Rescheduling pending nudge"; |
664 SyncSession* s = pending_nudge_->session.get(); | 648 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
665 job.session.reset(new SyncSession(s->context(), s->delegate(), | 649 // Choose the start time as the earliest of the 2. Note that this means |
666 s->source(), s->routing_info(), s->workers())); | 650 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
667 | 651 // but a nudge is already scheduled to go out, we'll send the (tab) commit |
668 // Choose the start time as the earliest of the 2. | 652 // without waiting. |
669 job.scheduled_start = std::min(job.scheduled_start, | 653 pending_nudge_->set_scheduled_start( |
670 pending_nudge_->scheduled_start); | 654 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
671 pending_nudge_.reset(); | 655 // Abandon the old task by cloning and replacing the session. |
656 // It's possible that by "rescheduling" we're actually taking a job that | |
657 // was previously unscheduled and giving it wings, so take care to reset | |
658 // unscheduled nudge storage. | |
659 job = pending_nudge_->CloneAndAbandon(); | |
660 unscheduled_nudge_storage_.reset(); | |
661 pending_nudge_ = NULL; | |
672 } | 662 } |
673 | 663 |
674 // TODO(zea): Consider adding separate throttling/backoff for datatype | 664 // TODO(zea): Consider adding separate throttling/backoff for datatype |
675 // refresh requests. | 665 // refresh requests. |
676 ScheduleSyncSessionJob(job); | 666 ScheduleSyncSessionJob(job.Pass()); |
677 } | 667 } |
678 | 668 |
679 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 669 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
680 switch (mode) { | 670 switch (mode) { |
681 ENUM_CASE(CONFIGURATION_MODE); | 671 ENUM_CASE(CONFIGURATION_MODE); |
682 ENUM_CASE(NORMAL_MODE); | 672 ENUM_CASE(NORMAL_MODE); |
683 } | 673 } |
684 return ""; | 674 return ""; |
685 } | 675 } |
686 | 676 |
687 const char* SyncSchedulerImpl::GetDecisionString( | 677 const char* SyncSchedulerImpl::GetDecisionString( |
688 SyncSchedulerImpl::JobProcessDecision mode) { | 678 SyncSchedulerImpl::JobProcessDecision mode) { |
689 switch (mode) { | 679 switch (mode) { |
690 ENUM_CASE(CONTINUE); | 680 ENUM_CASE(CONTINUE); |
691 ENUM_CASE(SAVE); | 681 ENUM_CASE(SAVE); |
692 ENUM_CASE(DROP); | 682 ENUM_CASE(DROP); |
693 } | 683 } |
694 return ""; | 684 return ""; |
695 } | 685 } |
696 | 686 |
697 // static | |
698 void SyncSchedulerImpl::SetSyncerStepsForPurpose( | |
699 SyncSessionJob::SyncSessionJobPurpose purpose, | |
700 SyncerStep* start, | |
701 SyncerStep* end) { | |
702 switch (purpose) { | |
703 case SyncSessionJob::CONFIGURATION: | |
704 *start = DOWNLOAD_UPDATES; | |
705 *end = APPLY_UPDATES; | |
706 return; | |
707 case SyncSessionJob::NUDGE: | |
708 case SyncSessionJob::POLL: | |
709 *start = SYNCER_BEGIN; | |
710 *end = SYNCER_END; | |
711 return; | |
712 default: | |
713 NOTREACHED(); | |
714 *start = SYNCER_END; | |
715 *end = SYNCER_END; | |
716 return; | |
717 } | |
718 } | |
719 | |
720 void SyncSchedulerImpl::PostTask( | 687 void SyncSchedulerImpl::PostTask( |
721 const tracked_objects::Location& from_here, | 688 const tracked_objects::Location& from_here, |
722 const char* name, const base::Closure& task) { | 689 const char* name, const base::Closure& task) { |
723 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 690 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
724 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 691 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
725 if (!started_) { | 692 if (!started_) { |
726 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 693 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
727 return; | 694 return; |
728 } | 695 } |
729 sync_loop_->PostTask(from_here, task); | 696 sync_loop_->PostTask(from_here, task); |
730 } | 697 } |
731 | 698 |
732 void SyncSchedulerImpl::PostDelayedTask( | 699 void SyncSchedulerImpl::PostDelayedTask( |
733 const tracked_objects::Location& from_here, | 700 const tracked_objects::Location& from_here, |
734 const char* name, const base::Closure& task, base::TimeDelta delay) { | 701 const char* name, const base::Closure& task, base::TimeDelta delay) { |
735 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 702 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
736 << delay.InMilliseconds() << " ms delay"; | 703 << delay.InMilliseconds() << " ms delay"; |
737 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 704 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
738 if (!started_) { | 705 if (!started_) { |
739 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 706 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
740 return; | 707 return; |
741 } | 708 } |
742 sync_loop_->PostDelayedTask(from_here, task, delay); | 709 sync_loop_->PostDelayedTask(from_here, task, delay); |
743 } | 710 } |
744 | 711 |
745 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { | 712 void SyncSchedulerImpl::ScheduleSyncSessionJob( |
713 scoped_ptr<SyncSessionJob> job) { | |
746 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 714 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
747 if (no_scheduling_allowed_) { | 715 if (no_scheduling_allowed_) { |
748 NOTREACHED() << "Illegal to schedule job while session in progress."; | 716 NOTREACHED() << "Illegal to schedule job while session in progress."; |
749 return; | 717 return; |
750 } | 718 } |
751 | 719 |
752 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); | 720 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); |
753 if (delay < TimeDelta::FromMilliseconds(0)) | 721 if (delay < TimeDelta::FromMilliseconds(0)) |
754 delay = TimeDelta::FromMilliseconds(0); | 722 delay = TimeDelta::FromMilliseconds(0); |
755 SDVLOG_LOC(job.from_here, 2) | 723 SDVLOG_LOC(job->from_location(), 2) |
756 << "In ScheduleSyncSessionJob with " | 724 << "In ScheduleSyncSessionJob with " |
757 << SyncSessionJob::GetPurposeString(job.purpose) | 725 << SyncSessionJob::GetPurposeString(job->purpose()) |
758 << " job and " << delay.InMilliseconds() << " ms delay"; | 726 << " job and " << delay.InMilliseconds() << " ms delay"; |
759 | 727 |
760 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 728 DCHECK(job->purpose() == SyncSessionJob::NUDGE || |
761 job.purpose == SyncSessionJob::POLL); | 729 job->purpose() == SyncSessionJob::POLL); |
762 if (job.purpose == SyncSessionJob::NUDGE) { | 730 if (job->purpose() == SyncSessionJob::NUDGE) { |
763 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; | 731 SDVLOG_LOC(job->from_location(), 2) << "Resetting pending_nudge to "; |
764 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == | 732 DCHECK(!pending_nudge_ || pending_nudge_->session() == |
765 job.session); | 733 job->session()); |
766 pending_nudge_.reset(new SyncSessionJob(job)); | 734 pending_nudge_ = job.get(); |
767 } | 735 } |
768 PostDelayedTask(job.from_here, "DoSyncSessionJob", | 736 |
769 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, | 737 tracked_objects::Location loc(job->from_location()); |
770 weak_ptr_factory_.GetWeakPtr(), | 738 PostDelayedTask(loc, "DoSyncSessionJob", |
771 job), | 739 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
772 delay); | 740 weak_ptr_factory_.GetWeakPtr(), |
741 base::Passed(&job)), | |
742 delay); | |
773 } | 743 } |
774 | 744 |
775 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { | 745 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
776 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 746 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
747 if (job->purpose() == SyncSessionJob::NUDGE) { | |
748 if (pending_nudge_ == NULL || | |
749 pending_nudge_->session() != job->session()) { | |
750 // |job| is abandoned. | |
751 SDVLOG(2) << "Dropping a nudge in " | |
752 << "DoSyncSessionJob because another nudge was scheduled"; | |
753 return false; | |
754 } | |
755 pending_nudge_ = NULL; | |
756 | |
757 // Rebase the session with the latest model safe table and use it to purge | |
758 // and update any disabled or modified entries in the job. | |
759 job->mutable_session()->RebaseRoutingInfoWithLatest( | |
760 session_context_->routing_info(), session_context_->workers()); | |
761 } | |
777 | 762 |
778 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 763 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
779 if (!ShouldRunJob(job)) { | 764 GetUpdatesCallerInfo::GetUpdatesSource source( |
765 job->session()->source().updates_source); | |
766 if (!ShouldRunJob(job.get())) { | |
780 SLOG(WARNING) | 767 SLOG(WARNING) |
781 << "Not executing " | 768 << "Not executing " |
782 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " | 769 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from " |
783 << GetUpdatesSourceString(job.session->source().updates_source); | 770 << GetUpdatesSourceString(source); |
784 return; | 771 return false; |
785 } | 772 } |
786 | 773 |
787 if (job.purpose == SyncSessionJob::NUDGE) { | |
788 if (pending_nudge_.get() == NULL || | |
789 pending_nudge_->session != job.session) { | |
790 SDVLOG(2) << "Dropping a nudge in " | |
791 << "DoSyncSessionJob because another nudge was scheduled"; | |
792 return; // Another nudge must have been scheduled in in the meantime. | |
793 } | |
794 pending_nudge_.reset(); | |
795 | |
796 // Create the session with the latest model safe table and use it to purge | |
797 // and update any disabled or modified entries in the job. | |
798 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | |
799 | |
800 job.session->RebaseRoutingInfoWithLatest(*session); | |
801 } | |
802 SDVLOG(2) << "DoSyncSessionJob with " | 774 SDVLOG(2) << "DoSyncSessionJob with " |
803 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; | 775 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; |
804 | |
805 SyncerStep begin(SYNCER_END); | |
806 SyncerStep end(SYNCER_END); | |
807 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | |
808 | 776 |
809 bool has_more_to_sync = true; | 777 bool has_more_to_sync = true; |
810 while (ShouldRunJob(job) && has_more_to_sync) { | 778 bool premature_exit = false; |
779 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) { | |
811 SDVLOG(2) << "Calling SyncShare."; | 780 SDVLOG(2) << "Calling SyncShare."; |
812 // Synchronously perform the sync session from this thread. | 781 // Synchronously perform the sync session from this thread. |
813 syncer_->SyncShare(job.session.get(), begin, end); | 782 premature_exit = !syncer_->SyncShare(job->mutable_session(), |
814 has_more_to_sync = job.session->HasMoreToSync(); | 783 job->start_step(), |
784 job->end_step()); | |
785 | |
786 has_more_to_sync = job->session()->HasMoreToSync(); | |
815 if (has_more_to_sync) | 787 if (has_more_to_sync) |
816 job.session->PrepareForAnotherSyncCycle(); | 788 job->mutable_session()->PrepareForAnotherSyncCycle(); |
817 } | 789 } |
818 SDVLOG(2) << "Done SyncShare looping."; | 790 SDVLOG(2) << "Done SyncShare looping."; |
819 | 791 |
820 FinishSyncSessionJob(job); | 792 FinishSyncSessionJob(job.get(), premature_exit); |
793 return job->Succeeded(); | |
821 } | 794 } |
822 | 795 |
823 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 796 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
824 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 797 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
825 | 798 |
826 // We are interested in recording time between local nudges for datatypes. | 799 // We are interested in recording time between local nudges for datatypes. |
827 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 800 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
828 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 801 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
829 return; | 802 return; |
830 | 803 |
831 base::TimeTicks now = TimeTicks::Now(); | 804 base::TimeTicks now = TimeTicks::Now(); |
832 // Update timing information for how often datatypes are triggering nudges. | 805 // Update timing information for how often datatypes are triggering nudges. |
833 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); | 806 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); |
834 iter != info.types.end(); | 807 iter != info.types.end(); |
835 ++iter) { | 808 ++iter) { |
836 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 809 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
837 last_local_nudges_by_model_type_[iter->first] = now; | 810 last_local_nudges_by_model_type_[iter->first] = now; |
838 if (previous.is_null()) | 811 if (previous.is_null()) |
839 continue; | 812 continue; |
840 | 813 |
841 #define PER_DATA_TYPE_MACRO(type_str) \ | 814 #define PER_DATA_TYPE_MACRO(type_str) \ |
842 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 815 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
843 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 816 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
844 #undef PER_DATA_TYPE_MACRO | 817 #undef PER_DATA_TYPE_MACRO |
845 } | 818 } |
846 } | 819 } |
847 | 820 |
848 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { | 821 void SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, |
822 bool exited_prematurely) { | |
849 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 823 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
850 // Now update the status of the connection from SCM. We need this to decide | 824 // Now update the status of the connection from SCM. We need this to decide |
851 // whether we need to save/run future jobs. The notifications from SCM are not | 825 // whether we need to save/run future jobs. The notifications from SCM are |
852 // reliable. | 826 // not reliable. |
853 // | 827 // |
854 // TODO(rlarocque): crbug.com/110954 | 828 // TODO(rlarocque): crbug.com/110954 |
855 // We should get rid of the notifications and it is probably not needed to | 829 // We should get rid of the notifications and it is probably not needed to |
856 // maintain this status variable in 2 places. We should query it directly from | 830 // maintain this status variable in 2 places. We should query it directly |
857 // SCM when needed. | 831 // from SCM when needed. |
858 ServerConnectionManager* scm = session_context_->connection_manager(); | 832 ServerConnectionManager* scm = session_context_->connection_manager(); |
859 UpdateServerConnectionManagerStatus(scm->server_status()); | 833 UpdateServerConnectionManagerStatus(scm->server_status()); |
860 | 834 |
861 if (IsSyncingCurrentlySilenced()) { | 835 if (IsSyncingCurrentlySilenced()) { |
862 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 836 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
863 // TODO(sync): Investigate whether we need to check job.purpose | 837 // If we're here, it's because |job| was silenced until a server specified |
864 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | 838 // time. (Note, it had to be |job|, because ShouldRunJob would not permit |
865 SaveJob(job); | 839 // any job through while in WaitInterval::THROTTLED). |
866 return; // Nothing to do. | 840 scoped_ptr<SyncSessionJob> clone = job->CloneAndAbandon(); |
867 } else if (job.session->Succeeded() && | 841 if (job->purpose() == SyncSessionJob::NUDGE) |
868 !job.config_params.ready_task.is_null()) { | 842 pending_nudge_ = clone.get(); |
869 // If this was a configuration job with a ready task, invoke it now that | 843 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
870 // we finished successfully. | 844 wait_interval_->pending_configure_job = clone.get(); |
845 else | |
846 clone.reset(); // Unthrottling is enough, no need to force a canary. | |
847 | |
848 RestartWaiting(clone.Pass()); | |
849 return; | |
850 } | |
851 | |
852 // Let job know that we're through syncing (calling SyncShare) at this point. | |
853 { | |
871 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 854 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
872 job.config_params.ready_task.Run(); | 855 job->Finish(exited_prematurely); |
873 } | 856 } |
874 | 857 |
875 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 858 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
876 ScheduleNextSync(job); | 859 ScheduleNextSync(job); |
877 } | 860 } |
878 | 861 |
879 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { | 862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob* old_job) { |
880 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 863 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
881 DCHECK(!old_job.session->HasMoreToSync()); | 864 DCHECK(!old_job->session()->HasMoreToSync()); |
882 | 865 |
883 AdjustPolling(&old_job); | 866 AdjustPolling(old_job); |
884 | 867 |
885 if (old_job.session->Succeeded()) { | 868 if (old_job->Succeeded()) { |
886 // Only reset backoff if we actually reached the server. | 869 // Only reset backoff if we actually reached the server. |
887 if (old_job.session->SuccessfullyReachedServer()) | 870 // It's possible that we reached the server on one attempt, then had an |
871 // error on the next (or didn't perform some of the server-communicating | |
872 // commands). We want to verify that, for all commands attempted, we | |
873 // successfully spoke with the server. Therefore, we verify no errors | |
874 // and at least one SYNCER_OK. | |
875 if (old_job->session()->DidReachServer()) | |
888 wait_interval_.reset(); | 876 wait_interval_.reset(); |
889 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 877 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
890 return; | 878 return; |
891 } | 879 } |
892 | 880 |
893 if (old_job.purpose == SyncSessionJob::POLL) { | 881 if (old_job->purpose() == SyncSessionJob::POLL) { |
894 return; // We don't retry POLL jobs. | 882 return; // We don't retry POLL jobs. |
895 } | 883 } |
896 | 884 |
897 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 885 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
898 // if we don't succeed. Some types of errors are not likely to disappear on | 886 // if we don't succeed. Some types of errors are not likely to disappear on |
899 // their own. With the return values now available in the old_job.session, we | 887 // their own. With the return values now available in the old_job.session, |
900 // should be able to detect such errors and only retry when we detect | 888 // we should be able to detect such errors and only retry when we detect |
901 // transient errors. | 889 // transient errors. |
902 | 890 |
903 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 891 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
904 mode_ == NORMAL_MODE) { | 892 mode_ == NORMAL_MODE) { |
905 // When in normal mode, we allow up to one nudge per backoff interval. It | 893 // When in normal mode, we allow up to one nudge per backoff interval. It |
906 // appears that this was our nudge for this interval, and it failed. | 894 // appears that this was our nudge for this interval, and it failed. |
907 // | 895 // |
908 // Note: This does not prevent us from running canary jobs. For example, an | 896 // Note: This does not prevent us from running canary jobs. For example, |
909 // IP address change might still result in another nudge being executed | 897 // an IP address change might still result in another nudge being executed |
910 // during this backoff interval. | 898 // during this backoff interval. |
911 SDVLOG(2) << "A nudge during backoff failed"; | 899 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
912 | 900 DCHECK_EQ(SyncSessionJob::NUDGE, old_job->purpose()); |
913 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
914 DCHECK(!wait_interval_->had_nudge); | 901 DCHECK(!wait_interval_->had_nudge); |
915 | 902 |
916 wait_interval_->had_nudge = true; | 903 wait_interval_->had_nudge = true; |
917 InitOrCoalescePendingJob(old_job); | 904 DCHECK(!pending_nudge_); |
918 RestartWaiting(); | 905 |
906 scoped_ptr<SyncSessionJob> new_job = old_job->Clone(); | |
907 pending_nudge_ = new_job.get(); | |
908 RestartWaiting(new_job.Pass()); | |
919 } else { | 909 } else { |
920 // Either this is the first failure or a consecutive failure after our | 910 // Either this is the first failure or a consecutive failure after our |
921 // backoff timer expired. We handle it the same way in either case. | 911 // backoff timer expired. We handle it the same way in either case. |
922 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 912 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
923 HandleContinuationError(old_job); | 913 HandleContinuationError(old_job); |
924 } | 914 } |
925 } | 915 } |
926 | 916 |
927 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 917 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
928 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 918 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
929 | 919 |
930 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 920 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
931 syncer_short_poll_interval_seconds_ : | 921 syncer_short_poll_interval_seconds_ : |
932 syncer_long_poll_interval_seconds_; | 922 syncer_long_poll_interval_seconds_; |
933 bool rate_changed = !poll_timer_.IsRunning() || | 923 bool rate_changed = !poll_timer_.IsRunning() || |
934 poll != poll_timer_.GetCurrentDelay(); | 924 poll != poll_timer_.GetCurrentDelay(); |
935 | 925 |
936 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | 926 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
937 poll_timer_.Reset(); | 927 poll_timer_.Reset(); |
938 | 928 |
939 if (!rate_changed) | 929 if (!rate_changed) |
940 return; | 930 return; |
941 | 931 |
942 // Adjust poll rate. | 932 // Adjust poll rate. |
943 poll_timer_.Stop(); | 933 poll_timer_.Stop(); |
944 poll_timer_.Start(FROM_HERE, poll, this, | 934 poll_timer_.Start(FROM_HERE, poll, this, |
945 &SyncSchedulerImpl::PollTimerCallback); | 935 &SyncSchedulerImpl::PollTimerCallback); |
946 } | 936 } |
947 | 937 |
948 void SyncSchedulerImpl::RestartWaiting() { | 938 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
949 CHECK(wait_interval_.get()); | 939 CHECK(wait_interval_.get()); |
950 wait_interval_->timer.Stop(); | 940 wait_interval_->timer.Stop(); |
951 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 941 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
952 this, &SyncSchedulerImpl::DoCanaryJob); | 942 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
943 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
944 weak_ptr_factory_.GetWeakPtr(), | |
945 base::Passed(&job))); | |
946 } else { | |
947 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
948 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | |
949 weak_ptr_factory_.GetWeakPtr(), | |
950 base::Passed(&job))); | |
951 } | |
953 } | 952 } |
954 | 953 |
955 void SyncSchedulerImpl::HandleContinuationError( | 954 void SyncSchedulerImpl::HandleContinuationError( |
956 const SyncSessionJob& old_job) { | 955 const SyncSessionJob* old_job) { |
957 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 956 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
958 if (DCHECK_IS_ON()) { | 957 if (DCHECK_IS_ON()) { |
959 if (IsBackingOff()) { | 958 if (IsBackingOff()) { |
960 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 959 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); |
961 } | 960 } |
962 } | 961 } |
963 | 962 |
964 TimeDelta length = delay_provider_->GetDelay( | 963 TimeDelta length = delay_provider_->GetDelay( |
965 IsBackingOff() ? wait_interval_->length : | 964 IsBackingOff() ? wait_interval_->length : |
966 delay_provider_->GetInitialDelay( | 965 delay_provider_->GetInitialDelay( |
967 old_job.session->status_controller().model_neutral_state())); | 966 old_job->session()->status_controller().model_neutral_state())); |
968 | 967 |
969 SDVLOG(2) << "In handle continuation error with " | 968 SDVLOG(2) << "In handle continuation error with " |
970 << SyncSessionJob::GetPurposeString(old_job.purpose) | 969 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
971 << " job. The time delta(ms) is " | 970 << " job. The time delta(ms) is " |
972 << length.InMilliseconds(); | 971 << length.InMilliseconds(); |
973 | 972 |
974 // This will reset the had_nudge variable as well. | 973 // This will reset the had_nudge variable as well. |
975 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 974 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
976 length)); | 975 length)); |
977 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 976 scoped_ptr<SyncSessionJob> new_job( |
977 old_job->CloneFromLocation(FROM_HERE).Pass()); | |
978 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
979 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
978 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 980 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
979 // Config params should always get set. | 981 // Config params should always get set. |
980 DCHECK(!old_job.config_params.ready_task.is_null()); | 982 DCHECK(!old_job->config_params().ready_task.is_null()); |
981 SyncSession* old = old_job.session.get(); | 983 wait_interval_->pending_configure_job = new_job.get(); |
982 SyncSession* s(new SyncSession(session_context_, this, | |
983 old->source(), old->routing_info(), old->workers())); | |
984 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | |
985 make_linked_ptr(s), false, old_job.config_params, | |
986 FROM_HERE); | |
987 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | |
988 } else { | 984 } else { |
989 // We are not in configuration mode. So wait_interval's pending job | 985 // We are not in configuration mode. So wait_interval's pending job |
990 // should be null. | 986 // should be null. |
991 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 987 DCHECK(wait_interval_->pending_configure_job == NULL); |
988 DCHECK(!pending_nudge_); | |
989 pending_nudge_ = new_job.get(); | |
990 } | |
992 | 991 |
993 // TODO(lipalani) - handle clear user data. | 992 RestartWaiting(new_job.Pass()); |
994 InitOrCoalescePendingJob(old_job); | |
995 } | |
996 RestartWaiting(); | |
997 } | 993 } |
998 | 994 |
999 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 995 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
1000 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 996 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
1001 DCHECK(weak_handle_this_.IsInitialized()); | 997 DCHECK(weak_handle_this_.IsInitialized()); |
1002 SDVLOG(3) << "Posting StopImpl"; | 998 SDVLOG(3) << "Posting StopImpl"; |
1003 weak_handle_this_.Call(FROM_HERE, | 999 weak_handle_this_.Call(FROM_HERE, |
1004 &SyncSchedulerImpl::StopImpl, | 1000 &SyncSchedulerImpl::StopImpl, |
1005 callback); | 1001 callback); |
1006 } | 1002 } |
1007 | 1003 |
1008 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 1004 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
1009 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1005 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1010 SDVLOG(2) << "StopImpl called"; | 1006 SDVLOG(2) << "StopImpl called"; |
1011 | 1007 |
1012 // Kill any in-flight method calls. | 1008 // Kill any in-flight method calls. |
1013 weak_ptr_factory_.InvalidateWeakPtrs(); | 1009 weak_ptr_factory_.InvalidateWeakPtrs(); |
1014 wait_interval_.reset(); | 1010 wait_interval_.reset(); |
1015 poll_timer_.Stop(); | 1011 poll_timer_.Stop(); |
1016 if (started_) { | 1012 if (started_) { |
1017 started_ = false; | 1013 started_ = false; |
1018 } | 1014 } |
1019 if (!callback.is_null()) | 1015 if (!callback.is_null()) |
1020 callback.Run(); | 1016 callback.Run(); |
1021 } | 1017 } |
1022 | 1018 |
1023 void SyncSchedulerImpl::DoCanaryJob() { | 1019 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
1024 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1020 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1025 SDVLOG(2) << "Do canary job"; | 1021 SDVLOG(2) << "Do canary job"; |
1026 DoPendingJobIfPossible(true); | 1022 |
1023 // Only set canary privlieges here, when we are about to run the job. This | |
1024 // avoids confusion in managing canary bits during scheduling, when you | |
1025 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that | |
1026 // was scheduled as canary, and send it to an "unscheduled" state. | |
1027 to_be_canary->GrantCanaryPrivilege(); | |
1028 | |
1029 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { | |
1030 // FIXME: Remove this CreateSyncSession call.. | |
1031 scoped_ptr<SyncSession> temp = CreateSyncSession( | |
1032 to_be_canary->session()->source()).Pass(); | |
1033 // The routing info might have been changed since we cached the | |
1034 // pending nudge. Update it by coalescing to the latest. | |
1035 to_be_canary->mutable_session()->Coalesce(*(temp)); | |
1036 } | |
1037 DoSyncSessionJob(to_be_canary.Pass()); | |
1027 } | 1038 } |
1028 | 1039 |
1029 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { | 1040 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
1030 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1041 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1031 SyncSessionJob* job_to_execute = NULL; | 1042 // If we find a scheduled pending_ job, abandon the old one and return a |
1043 // a clone. If unscheduled, just hand over ownership. | |
1044 scoped_ptr<SyncSessionJob> candidate; | |
1032 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1045 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
1033 && wait_interval_->pending_configure_job.get()) { | 1046 && wait_interval_->pending_configure_job) { |
1034 SDVLOG(2) << "Found pending configure job"; | 1047 SDVLOG(2) << "Found pending configure job"; |
1035 job_to_execute = wait_interval_->pending_configure_job.get(); | 1048 candidate = |
1036 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 1049 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); |
1050 wait_interval_->pending_configure_job = candidate.get(); | |
1051 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
1037 SDVLOG(2) << "Found pending nudge job"; | 1052 SDVLOG(2) << "Found pending nudge job"; |
1038 | 1053 candidate = pending_nudge_->CloneAndAbandon().Pass(); |
1039 scoped_ptr<SyncSession> session(CreateSyncSession( | 1054 pending_nudge_ = candidate.get(); |
1040 pending_nudge_->session->source())); | 1055 unscheduled_nudge_storage_.reset(); |
1041 | |
1042 // Also the routing info might have been changed since we cached the | |
1043 // pending nudge. Update it by coalescing to the latest. | |
1044 pending_nudge_->session->Coalesce(*(session.get())); | |
1045 // The pending nudge would be cleared in the DoSyncSessionJob function. | |
1046 job_to_execute = pending_nudge_.get(); | |
1047 } | 1056 } |
1048 | 1057 return candidate.Pass(); |
1049 if (job_to_execute != NULL) { | |
1050 SDVLOG(2) << "Executing pending job"; | |
1051 SyncSessionJob copy = *job_to_execute; | |
1052 copy.is_canary_job = is_canary_job; | |
1053 DoSyncSessionJob(copy); | |
1054 } | |
1055 } | 1058 } |
1056 | 1059 |
1057 SyncSession* SyncSchedulerImpl::CreateSyncSession( | 1060 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( |
1058 const SyncSourceInfo& source) { | 1061 const SyncSourceInfo& source) { |
1059 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1062 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1060 DVLOG(2) << "Creating sync session with routes " | 1063 DVLOG(2) << "Creating sync session with routes " |
1061 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1064 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
1062 | 1065 |
1063 SyncSourceInfo info(source); | 1066 SyncSourceInfo info(source); |
1064 SyncSession* session(new SyncSession(session_context_, this, info, | 1067 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, |
1065 session_context_->routing_info(), session_context_->workers())); | 1068 session_context_->routing_info(), session_context_->workers())); |
1066 | |
1067 return session; | |
1068 } | 1069 } |
1069 | 1070 |
1070 void SyncSchedulerImpl::PollTimerCallback() { | 1071 void SyncSchedulerImpl::PollTimerCallback() { |
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1072 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1072 ModelSafeRoutingInfo r; | 1073 ModelSafeRoutingInfo r; |
1073 ModelTypeStateMap type_state_map = | 1074 ModelTypeStateMap type_state_map = |
1074 ModelSafeRoutingInfoToStateMap(r, std::string()); | 1075 ModelSafeRoutingInfoToStateMap(r, std::string()); |
1075 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); | 1076 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); |
1076 SyncSession* s = CreateSyncSession(info); | 1077 scoped_ptr<SyncSession> s(CreateSyncSession(info)); |
1077 | 1078 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
1078 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1079 TimeTicks::Now(), |
1079 make_linked_ptr(s), | 1080 s.Pass(), |
1080 false, | 1081 ConfigurationParams(), |
1081 ConfigurationParams(), | 1082 FROM_HERE)); |
1082 FROM_HERE); | 1083 ScheduleSyncSessionJob(job.Pass()); |
1083 | |
1084 ScheduleSyncSessionJob(job); | |
1085 } | 1084 } |
1086 | 1085 |
1087 void SyncSchedulerImpl::Unthrottle() { | 1086 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
1088 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1087 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1089 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1088 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1090 SDVLOG(2) << "Unthrottled."; | 1089 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
1091 DoCanaryJob(); | 1090 << "canary."; |
1091 if (to_be_canary.get()) | |
1092 DoCanaryJob(to_be_canary.Pass()); | |
1093 | |
1094 // TODO(tim): ?! This must have been broken. The way DecideOnJob works today | |
1095 // canary privileges aren't enough to bypass a THROTTLED wait interval, which | |
1096 // would suggest we need to reset first (though trusting canary in Decide is | |
1097 // probably the "right" thing to do). | |
1092 wait_interval_.reset(); | 1098 wait_interval_.reset(); |
1093 } | 1099 } |
1094 | 1100 |
1095 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1101 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1096 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1102 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1097 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1103 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1098 } | 1104 } |
1099 | 1105 |
1100 bool SyncSchedulerImpl::IsBackingOff() const { | 1106 bool SyncSchedulerImpl::IsBackingOff() const { |
1101 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1107 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1102 return wait_interval_.get() && wait_interval_->mode == | 1108 return wait_interval_.get() && wait_interval_->mode == |
1103 WaitInterval::EXPONENTIAL_BACKOFF; | 1109 WaitInterval::EXPONENTIAL_BACKOFF; |
1104 } | 1110 } |
1105 | 1111 |
1106 void SyncSchedulerImpl::OnSilencedUntil( | 1112 void SyncSchedulerImpl::OnSilencedUntil( |
1107 const base::TimeTicks& silenced_until) { | 1113 const base::TimeTicks& silenced_until) { |
1108 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1114 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1109 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1115 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
1110 silenced_until - TimeTicks::Now())); | 1116 silenced_until - TimeTicks::Now())); |
1111 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, | |
1112 &SyncSchedulerImpl::Unthrottle); | |
1113 } | 1117 } |
1114 | 1118 |
1115 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1119 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
1116 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1120 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1117 return wait_interval_.get() && wait_interval_->mode == | 1121 return wait_interval_.get() && wait_interval_->mode == |
1118 WaitInterval::THROTTLED; | 1122 WaitInterval::THROTTLED; |
1119 } | 1123 } |
1120 | 1124 |
1121 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1125 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
1122 const base::TimeDelta& new_interval) { | 1126 const base::TimeDelta& new_interval) { |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1176 | 1180 |
1177 #undef SDVLOG_LOC | 1181 #undef SDVLOG_LOC |
1178 | 1182 |
1179 #undef SDVLOG | 1183 #undef SDVLOG |
1180 | 1184 |
1181 #undef SLOG | 1185 #undef SLOG |
1182 | 1186 |
1183 #undef ENUM_CASE | 1187 #undef ENUM_CASE |
1184 | 1188 |
1185 } // namespace syncer | 1189 } // namespace syncer |
OLD | NEW |