OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/bind_helpers.h" | |
12 #include "base/compiler_specific.h" | 13 #include "base/compiler_specific.h" |
13 #include "base/location.h" | 14 #include "base/location.h" |
14 #include "base/logging.h" | 15 #include "base/logging.h" |
15 #include "base/message_loop.h" | 16 #include "base/message_loop.h" |
16 #include "sync/engine/backoff_delay_provider.h" | 17 #include "sync/engine/backoff_delay_provider.h" |
17 #include "sync/engine/syncer.h" | 18 #include "sync/engine/syncer.h" |
18 #include "sync/engine/throttled_data_type_tracker.h" | 19 #include "sync/engine/throttled_data_type_tracker.h" |
19 #include "sync/protocol/proto_enum_conversions.h" | 20 #include "sync/protocol/proto_enum_conversions.h" |
20 #include "sync/protocol/sync.pb.h" | 21 #include "sync/protocol/sync.pb.h" |
21 #include "sync/util/data_type_histogram.h" | 22 #include "sync/util/data_type_histogram.h" |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
76 : source(source), | 77 : source(source), |
77 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
78 routing_info(routing_info), | 79 routing_info(routing_info), |
79 ready_task(ready_task) { | 80 ready_task(ready_task) { |
80 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
81 } | 82 } |
82 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
83 | 84 |
84 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
85 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
86 had_nudge(false) { | 87 had_nudge(false), |
88 pending_configure_job(NULL) { | |
akalin
2012/09/25 22:37:24
three different styles for empty function bodies h
tim (not reviewing)
2012/10/08 00:20:03
Done.
| |
87 } | 89 } |
88 | 90 |
91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
92 : mode(mode), had_nudge(false), length(length), | |
93 pending_configure_job(NULL) { } | |
94 | |
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
90 | 96 |
91 #define ENUM_CASE(x) case x: return #x; break; | 97 #define ENUM_CASE(x) case x: return #x; break; |
92 | 98 |
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
94 switch (mode) { | 100 switch (mode) { |
95 ENUM_CASE(UNKNOWN); | 101 ENUM_CASE(UNKNOWN); |
96 ENUM_CASE(EXPONENTIAL_BACKOFF); | 102 ENUM_CASE(EXPONENTIAL_BACKOFF); |
97 ENUM_CASE(THROTTLED); | 103 ENUM_CASE(THROTTLED); |
98 } | 104 } |
99 NOTREACHED(); | 105 NOTREACHED(); |
100 return ""; | 106 return ""; |
101 } | 107 } |
102 | 108 |
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() | |
104 : purpose(UNKNOWN), | |
105 is_canary_job(false) { | |
106 } | |
107 | |
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} | |
109 | |
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | |
111 base::TimeTicks start, | |
112 linked_ptr<sessions::SyncSession> session, | |
113 bool is_canary_job, | |
114 const ConfigurationParams& config_params, | |
115 const tracked_objects::Location& from_here) | |
116 : purpose(purpose), | |
117 scheduled_start(start), | |
118 session(session), | |
119 is_canary_job(is_canary_job), | |
120 config_params(config_params), | |
121 from_here(from_here) { | |
122 } | |
123 | |
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( | |
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { | |
126 switch (purpose) { | |
127 ENUM_CASE(UNKNOWN); | |
128 ENUM_CASE(POLL); | |
129 ENUM_CASE(NUDGE); | |
130 ENUM_CASE(CONFIGURATION); | |
131 } | |
132 NOTREACHED(); | |
133 return ""; | |
134 } | |
135 | |
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
137 NudgeSource source) { | 110 NudgeSource source) { |
138 switch (source) { | 111 switch (source) { |
139 case NUDGE_SOURCE_NOTIFICATION: | 112 case NUDGE_SOURCE_NOTIFICATION: |
140 return GetUpdatesCallerInfo::NOTIFICATION; | 113 return GetUpdatesCallerInfo::NOTIFICATION; |
141 case NUDGE_SOURCE_LOCAL: | 114 case NUDGE_SOURCE_LOCAL: |
142 return GetUpdatesCallerInfo::LOCAL; | 115 return GetUpdatesCallerInfo::LOCAL; |
143 case NUDGE_SOURCE_CONTINUATION: | 116 case NUDGE_SOURCE_CONTINUATION: |
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 117 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
145 case NUDGE_SOURCE_LOCAL_REFRESH: | 118 case NUDGE_SOURCE_LOCAL_REFRESH: |
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 119 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
147 case NUDGE_SOURCE_UNKNOWN: | 120 case NUDGE_SOURCE_UNKNOWN: |
148 return GetUpdatesCallerInfo::UNKNOWN; | 121 return GetUpdatesCallerInfo::UNKNOWN; |
149 default: | 122 default: |
150 NOTREACHED(); | 123 NOTREACHED(); |
151 return GetUpdatesCallerInfo::UNKNOWN; | 124 return GetUpdatesCallerInfo::UNKNOWN; |
152 } | 125 } |
153 } | 126 } |
154 | 127 |
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
156 : mode(mode), had_nudge(false), length(length) { } | |
157 | |
158 // Helper macros to log with the syncer thread name; useful when there | 128 // Helper macros to log with the syncer thread name; useful when there |
159 // are multiple syncer threads involved. | 129 // are multiple syncer threads involved. |
160 | 130 |
161 #define SLOG(severity) LOG(severity) << name_ << ": " | 131 #define SLOG(severity) LOG(severity) << name_ << ": " |
162 | 132 |
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 133 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
164 | 134 |
165 #define SDVLOG_LOC(from_here, verbose_level) \ | 135 #define SDVLOG_LOC(from_here, verbose_level) \ |
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 136 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
167 | 137 |
(...skipping 30 matching lines...) Expand all Loading... | |
198 syncer_short_poll_interval_seconds_( | 168 syncer_short_poll_interval_seconds_( |
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 169 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
200 syncer_long_poll_interval_seconds_( | 170 syncer_long_poll_interval_seconds_( |
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 171 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
202 sessions_commit_delay_( | 172 sessions_commit_delay_( |
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 173 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
204 mode_(NORMAL_MODE), | 174 mode_(NORMAL_MODE), |
205 // Start with assuming everything is fine with the connection. | 175 // Start with assuming everything is fine with the connection. |
206 // At the end of the sync cycle we would have the correct status. | 176 // At the end of the sync cycle we would have the correct status. |
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 177 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
178 pending_nudge_(NULL), | |
208 delay_provider_(delay_provider), | 179 delay_provider_(delay_provider), |
209 syncer_(syncer), | 180 syncer_(syncer), |
210 session_context_(context), | 181 session_context_(context), |
211 no_scheduling_allowed_(false) { | 182 no_scheduling_allowed_(false) { |
212 DCHECK(sync_loop_); | 183 DCHECK(sync_loop_); |
213 } | 184 } |
214 | 185 |
215 SyncSchedulerImpl::~SyncSchedulerImpl() { | 186 SyncSchedulerImpl::~SyncSchedulerImpl() { |
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 187 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
217 StopImpl(base::Closure()); | 188 StopImpl(base::Closure()); |
(...skipping 16 matching lines...) Expand all Loading... | |
234 void SyncSchedulerImpl::OnConnectionStatusChange() { | 205 void SyncSchedulerImpl::OnConnectionStatusChange() { |
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 206 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
236 // Optimistically assume that the connection is fixed and try | 207 // Optimistically assume that the connection is fixed and try |
237 // connecting. | 208 // connecting. |
238 OnServerConnectionErrorFixed(); | 209 OnServerConnectionErrorFixed(); |
239 } | 210 } |
240 } | 211 } |
241 | 212 |
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 213 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 214 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
215 // There could be a pending nudge or configuration job in several cases: | |
216 // | |
217 // 1. We're in exponential backoff. | |
218 // 2. We're silenced / throttled. | |
219 // 3. A nudge was saved previously due to not having a valid auth token. | |
220 // 4. A nudge was scheduled + saved while in configuration mode. | |
221 // | |
222 // In all cases except (2), we want to retry contacting the server. We | |
223 // call DoCanaryJob to achieve this, and note that nothing -- not even a | |
224 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | |
225 // has the authority to do that is the Unthrottle timer. | |
226 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
227 if (!pending.get()) | |
228 return; | |
229 | |
244 PostTask(FROM_HERE, "DoCanaryJob", | 230 PostTask(FROM_HERE, "DoCanaryJob", |
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 231 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
246 weak_ptr_factory_.GetWeakPtr())); | 232 weak_ptr_factory_.GetWeakPtr(), |
247 | 233 base::Passed(&pending))); |
248 } | 234 } |
249 | 235 |
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 236 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
251 HttpResponse::ServerConnectionCode code) { | 237 HttpResponse::ServerConnectionCode code) { |
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 238 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
253 SDVLOG(2) << "New server connection code: " | 239 SDVLOG(2) << "New server connection code: " |
254 << HttpResponse::GetServerConnectionCodeString(code); | 240 << HttpResponse::GetServerConnectionCodeString(code); |
255 | 241 |
256 connection_code_ = code; | 242 connection_code_ = code; |
257 } | 243 } |
(...skipping 15 matching lines...) Expand all Loading... | |
273 Mode old_mode = mode_; | 259 Mode old_mode = mode_; |
274 mode_ = mode; | 260 mode_ = mode; |
275 AdjustPolling(NULL); // Will kick start poll timer if needed. | 261 AdjustPolling(NULL); // Will kick start poll timer if needed. |
276 | 262 |
277 if (old_mode != mode_) { | 263 if (old_mode != mode_) { |
278 // We just changed our mode. See if there are any pending jobs that we could | 264 // We just changed our mode. See if there are any pending jobs that we could |
279 // execute in the new mode. | 265 // execute in the new mode. |
280 if (mode_ == NORMAL_MODE) { | 266 if (mode_ == NORMAL_MODE) { |
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 267 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
282 // has not yet completed. | 268 // has not yet completed. |
283 DCHECK(!wait_interval_.get() || | 269 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
284 !wait_interval_->pending_configure_job.get()); | |
285 } | 270 } |
286 | 271 |
287 DoPendingJobIfPossible(false); | 272 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
273 if (pending.get()) { | |
274 // TODO(tim): We should be able to remove this... | |
275 scoped_ptr<SyncSession> session(CreateSyncSession( | |
276 pending->session()->source())); | |
277 // Also the routing info might have been changed since we cached the | |
278 // pending nudge. Update it by coalescing to the latest. | |
279 pending->mutable_session()->Coalesce(*(session)); | |
280 SDVLOG(2) << "Executing pending job. Good luck!"; | |
281 DoSyncSessionJob(pending.Pass()); | |
282 } | |
288 } | 283 } |
289 } | 284 } |
290 | 285 |
291 void SyncSchedulerImpl::SendInitialSnapshot() { | 286 void SyncSchedulerImpl::SendInitialSnapshot() { |
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 287 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 288 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
294 SyncSourceInfo(), ModelSafeRoutingInfo(), | 289 SyncSourceInfo(), ModelSafeRoutingInfo(), |
295 std::vector<ModelSafeWorker*>())); | 290 std::vector<ModelSafeWorker*>())); |
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 291 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
297 event.snapshot = dummy->TakeSnapshot(); | 292 event.snapshot = dummy->TakeSnapshot(); |
(...skipping 23 matching lines...) Expand all Loading... | |
321 bool SyncSchedulerImpl::ScheduleConfiguration( | 316 bool SyncSchedulerImpl::ScheduleConfiguration( |
322 const ConfigurationParams& params) { | 317 const ConfigurationParams& params) { |
323 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 318 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 319 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
325 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 320 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
326 DCHECK(!params.ready_task.is_null()); | 321 DCHECK(!params.ready_task.is_null()); |
327 SDVLOG(2) << "Reconfiguring syncer."; | 322 SDVLOG(2) << "Reconfiguring syncer."; |
328 | 323 |
329 // Only one configuration is allowed at a time. Verify we're not waiting | 324 // Only one configuration is allowed at a time. Verify we're not waiting |
330 // for a pending configure job. | 325 // for a pending configure job. |
331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | 326 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
332 | 327 |
333 ModelSafeRoutingInfo restricted_routes; | 328 ModelSafeRoutingInfo restricted_routes; |
334 BuildModelSafeParams(params.types_to_download, | 329 BuildModelSafeParams(params.types_to_download, |
335 params.routing_info, | 330 params.routing_info, |
336 &restricted_routes); | 331 &restricted_routes); |
337 session_context_->set_routing_info(params.routing_info); | 332 session_context_->set_routing_info(params.routing_info); |
338 | 333 |
339 // Only reconfigure if we have types to download. | 334 // Only reconfigure if we have types to download. |
340 if (!params.types_to_download.Empty()) { | 335 if (!params.types_to_download.Empty()) { |
341 DCHECK(!restricted_routes.empty()); | 336 DCHECK(!restricted_routes.empty()); |
342 linked_ptr<SyncSession> session(new SyncSession( | 337 scoped_ptr<SyncSession> session(new SyncSession( |
343 session_context_, | 338 session_context_, |
344 this, | 339 this, |
345 SyncSourceInfo(params.source, | 340 SyncSourceInfo(params.source, |
346 ModelSafeRoutingInfoToStateMap( | 341 ModelSafeRoutingInfoToStateMap( |
347 restricted_routes, | 342 restricted_routes, |
348 std::string())), | 343 std::string())), |
349 restricted_routes, | 344 restricted_routes, |
350 session_context_->workers())); | 345 session_context_->workers())); |
351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | 346 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
352 TimeTicks::Now(), | 347 SyncSessionJob::CONFIGURATION, |
353 session, | 348 TimeTicks::Now(), |
354 false, | 349 session.Pass(), |
355 params, | 350 params, |
356 FROM_HERE); | 351 FROM_HERE)); |
357 DoSyncSessionJob(job); | 352 bool succeeded = DoSyncSessionJob(job.Pass()); |
358 | 353 |
359 // If we failed, the job would have been saved as the pending configure | 354 // If we failed, the job would have been saved as the pending configure |
360 // job and a wait interval would have been set. | 355 // job and a wait interval would have been set. |
361 if (!session->Succeeded()) { | 356 if (!succeeded) { |
362 DCHECK(wait_interval_.get() && | 357 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
363 wait_interval_->pending_configure_job.get()); | |
364 return false; | 358 return false; |
365 } | 359 } |
366 } else { | 360 } else { |
367 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 361 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
368 params.ready_task.Run(); | 362 params.ready_task.Run(); |
369 } | 363 } |
370 | 364 |
371 return true; | 365 return true; |
372 } | 366 } |
373 | 367 |
374 SyncSchedulerImpl::JobProcessDecision | 368 SyncSchedulerImpl::JobProcessDecision |
375 SyncSchedulerImpl::DecideWhileInWaitInterval( | 369 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) { |
376 const SyncSessionJob& job) { | |
377 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 370 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
378 DCHECK(wait_interval_.get()); | 371 DCHECK(wait_interval_.get()); |
379 | 372 |
380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 373 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
381 << WaitInterval::GetModeString(wait_interval_->mode) | 374 << WaitInterval::GetModeString(wait_interval_->mode) |
382 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 375 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
383 << (job.is_canary_job ? " (canary)" : ""); | 376 << (job->is_canary() ? " (canary)" : ""); |
384 | 377 |
385 if (job.purpose == SyncSessionJob::POLL) | 378 if (job->purpose() == SyncSessionJob::POLL) |
386 return DROP; | 379 return DROP; |
387 | 380 |
388 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 381 // If we save a job while in a WaitInterval, there is a well-defined moment |
389 job.purpose == SyncSessionJob::CONFIGURATION); | 382 // in time in the future when it makes sense for that SAVE-worthy job to try |
383 // running again -- the end of the WaitInterval. | |
384 DCHECK(job->purpose() == SyncSessionJob::NUDGE || | |
385 job->purpose() == SyncSessionJob::CONFIGURATION); | |
386 | |
387 // If throttled, there's a clock ticking to unthrottle. We want to get | |
388 // on the same train. | |
390 if (wait_interval_->mode == WaitInterval::THROTTLED) | 389 if (wait_interval_->mode == WaitInterval::THROTTLED) |
391 return SAVE; | 390 return SAVE; |
392 | 391 |
393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 392 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
394 if (job.purpose == SyncSessionJob::NUDGE) { | 393 if (job->purpose() == SyncSessionJob::NUDGE) { |
395 if (mode_ == CONFIGURATION_MODE) | 394 if (mode_ == CONFIGURATION_MODE) |
396 return SAVE; | 395 return SAVE; |
397 | 396 |
398 // If we already had one nudge then just drop this nudge. We will retry | 397 // If we already had one nudge then just drop this nudge. We will retry |
399 // later when the timer runs out. | 398 // later when the timer runs out. |
400 if (!job.is_canary_job) | 399 if (!job->is_canary()) |
401 return wait_interval_->had_nudge ? DROP : CONTINUE; | 400 return wait_interval_->had_nudge ? DROP : CONTINUE; |
402 else // We are here because timer ran out. So retry. | 401 else // We are here because timer ran out. So retry. |
403 return CONTINUE; | 402 return CONTINUE; |
404 } | 403 } |
405 return job.is_canary_job ? CONTINUE : SAVE; | 404 return job->is_canary() ? CONTINUE : SAVE; |
406 } | 405 } |
407 | 406 |
408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 407 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
409 const SyncSessionJob& job) { | 408 const SyncSessionJob* job) { |
410 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 409 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
411 | 410 |
412 // See if our type is throttled. | 411 // See if our type is throttled. |
413 ModelTypeSet throttled_types = | 412 ModelTypeSet throttled_types = |
414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 413 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
415 if (job.purpose == SyncSessionJob::NUDGE && | 414 if (job->purpose() == SyncSessionJob::NUDGE && |
416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 415 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
417 ModelTypeSet requested_types; | 416 ModelTypeSet requested_types; |
418 for (ModelTypeStateMap::const_iterator i = | 417 for (ModelTypeStateMap::const_iterator i = |
419 job.session->source().types.begin(); | 418 job->session()->source().types.begin(); |
420 i != job.session->source().types.end(); | 419 i != job->session()->source().types.end(); |
421 ++i) { | 420 ++i) { |
422 requested_types.Put(i->first); | 421 requested_types.Put(i->first); |
423 } | 422 } |
424 | 423 |
424 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
425 // a per-datatype "unthrottle" event as something that should force a | |
426 // canary job. For this reason, there's no good time to reschedule this job | |
427 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
428 // Note that there may already be such an event if we're in a WaitInterval, | |
429 // so we can retry it then. | |
425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 430 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
rlarocque
2012/09/24 21:15:14
This isn't related to your change, but your commen
tim (not reviewing)
2012/10/08 00:20:03
Can you define 'forget'? The job would have accoun
rlarocque
2012/10/08 19:18:04
When we calculate the items to commit in GetCommit
tim (not reviewing)
2012/10/11 17:35:14
Hm, I see. Good point. I filed crbug.com/155296 t
| |
426 return SAVE; | 431 return SAVE; |
427 } | 432 } |
428 | 433 |
429 if (wait_interval_.get()) | 434 if (wait_interval_.get()) |
430 return DecideWhileInWaitInterval(job); | 435 return DecideWhileInWaitInterval(job); |
431 | 436 |
432 if (mode_ == CONFIGURATION_MODE) { | 437 if (mode_ == CONFIGURATION_MODE) { |
433 if (job.purpose == SyncSessionJob::NUDGE) | 438 if (job->purpose() == SyncSessionJob::NUDGE) |
434 return SAVE; | 439 return SAVE; // Running requires a mode switch. |
435 else if (job.purpose == SyncSessionJob::CONFIGURATION) | 440 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
436 return CONTINUE; | 441 return CONTINUE; |
437 else | 442 else |
438 return DROP; | 443 return DROP; |
439 } | 444 } |
440 | 445 |
441 // We are in normal mode. | 446 // We are in normal mode. |
442 DCHECK_EQ(mode_, NORMAL_MODE); | 447 DCHECK_EQ(mode_, NORMAL_MODE); |
443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 448 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION); |
444 | 449 |
445 // Note about some subtle scheduling semantics. | 450 // Note about some subtle scheduling semantics. |
446 // | 451 // |
447 // It's possible at this point that |job| is known to be unnecessary, and | 452 // It's possible at this point that |job| is known to be unnecessary, and |
448 // dropping it would be perfectly safe and correct. Consider | 453 // dropping it would be perfectly safe and correct. Consider |
449 // | 454 // |
450 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 455 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
451 // the time that the last successful all-datatype NUDGE completed. | 456 // the time that the last successful all-datatype NUDGE completed. |
452 // | 457 // |
453 // 2) |job| is a NUDGE (for any combination of types) with a | 458 // 2) |job| is a NUDGE (for any combination of types) with a |
(...skipping 22 matching lines...) Expand all Loading... | |
476 // * It's not strictly "impossible", but it would be reentrant and hence | 481 // * It's not strictly "impossible", but it would be reentrant and hence |
477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 482 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
478 // legal side effect of any of the work being done as part of a sync cycle. | 483 // legal side effect of any of the work being done as part of a sync cycle. |
479 // See |no_scheduling_allowed_| for details. | 484 // See |no_scheduling_allowed_| for details. |
480 | 485 |
481 // Decision now rests on state of auth tokens. | 486 // Decision now rests on state of auth tokens. |
482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 487 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
483 return CONTINUE; | 488 return CONTINUE; |
484 | 489 |
485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 490 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 491 // Running the job would require updated auth, so we can't honour |
492 // job.scheduled_start(). | |
493 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
487 } | 494 } |
488 | 495 |
489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 496 bool SyncSchedulerImpl::ShouldRunJobSaveIfNecessary(SyncSessionJob* job) { |
490 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | |
492 if (pending_nudge_.get() == NULL) { | |
493 SDVLOG(2) << "Creating a pending nudge job"; | |
494 SyncSession* s = job.session.get(); | |
495 | |
496 // Get a fresh session with similar configuration as before (resets | |
497 // StatusController). | |
498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | |
499 s->delegate(), s->source(), s->routing_info(), s->workers())); | |
500 | |
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | |
502 make_linked_ptr(session.release()), false, | |
503 ConfigurationParams(), job.from_here); | |
504 pending_nudge_.reset(new SyncSessionJob(new_job)); | |
505 return; | |
506 } | |
507 | |
508 SDVLOG(2) << "Coalescing a pending nudge"; | |
509 pending_nudge_->session->Coalesce(*(job.session.get())); | |
510 pending_nudge_->scheduled_start = job.scheduled_start; | |
511 | |
512 // Unfortunately the nudge location cannot be modified. So it stores the | |
513 // location of the first caller. | |
514 } | |
515 | |
516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { | |
517 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 497 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
518 DCHECK(started_); | 498 DCHECK(started_); |
519 | 499 |
520 JobProcessDecision decision = DecideOnJob(job); | 500 JobProcessDecision decision = DecideOnJob(job); |
521 SDVLOG(2) << "Should run " | 501 SDVLOG(2) << "Should run " |
522 << SyncSessionJob::GetPurposeString(job.purpose) | 502 << SyncSessionJob::GetPurposeString(job->purpose()) |
523 << " job in mode " << GetModeString(mode_) | 503 << " job " << job->session() |
504 << " in mode " << GetModeString(mode_) | |
524 << ": " << GetDecisionString(decision); | 505 << ": " << GetDecisionString(decision); |
525 if (decision != SAVE) | 506 if (decision != SAVE) |
rlarocque
2012/09/24 21:15:14
These lines have always bugged me. It's not clear
tim (not reviewing)
2012/10/08 00:20:03
Done.
| |
526 return decision == CONTINUE; | 507 return decision == CONTINUE; |
527 | 508 |
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 509 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
529 SyncSessionJob::CONFIGURATION); | 510 if (is_nudge && pending_nudge_) { |
511 SDVLOG(2) << "Coalescing a pending nudge"; | |
512 // TODO(tim): This basically means we never use the more-careful coalescing | |
rlarocque
2012/09/24 21:15:14
This is a regression, right? How/Why did this hap
tim (not reviewing)
2012/10/08 00:20:03
It is, but I suspect it has been this way for quit
rlarocque
2012/10/08 19:18:04
Sounds good. I was confused for a bit there; I di
| |
513 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
514 // times, because we're calling this function first. Pull this out | |
515 // into a function to coalesce + set start times and reuse. | |
516 pending_nudge_->mutable_session()->Coalesce(*(job->session())); | |
517 pending_nudge_->set_scheduled_start(job->scheduled_start()); | |
rlarocque
2012/09/24 21:15:14
When a user is actively using their browser, I exp
tim (not reviewing)
2012/10/08 00:20:03
Hmm. Afaik, this set_scheduled_start call doesn't
| |
518 return false; | |
519 } | |
530 | 520 |
531 SaveJob(job); | 521 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); |
rlarocque
2012/09/24 21:15:14
This makes me think ShouldRunJobSaveIfNecessary()
tim (not reviewing)
2012/10/08 00:20:03
Well, note the intended contract here is simpler t
| |
522 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
523 // This job should be made the new canary. | |
524 if (is_nudge) { | |
525 pending_nudge_ = job_to_save.get(); | |
526 } else { | |
527 SDVLOG(2) << "Saving a configuration job"; | |
528 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
529 DCHECK(!wait_interval_->pending_configure_job); | |
530 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
531 DCHECK(!job->config_params().ready_task.is_null()); | |
532 // The only nudge that could exist is a scheduled canary nudge. | |
533 DCHECK(!unscheduled_nudge_storage_.get()); | |
534 if (pending_nudge_) { | |
535 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
536 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); | |
rlarocque
2012/09/24 21:15:14
What effect do these two lines have?
tim (not reviewing)
2012/10/08 00:20:03
By "pre-empt" I mean cancel the pending nudge job
| |
537 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
538 } | |
539 wait_interval_->pending_configure_job = job_to_save.get(); | |
540 } | |
541 wait_interval_->length = | |
rlarocque
2012/09/24 21:15:14
Could this end up being zero or negative? What ef
tim (not reviewing)
2012/10/08 00:20:03
Hmph. Good catch - I had fixed this since I hit a
| |
542 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
543 RestartWaiting(job_to_save.Pass()); | |
544 return false; | |
545 } | |
546 | |
547 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
548 // when we're not in a WaitInterval. See bug 147736. | |
549 DCHECK(is_nudge); | |
550 // There may or may not be a pending_configure_job. Either way this nudge | |
551 // is unschedulable. | |
552 pending_nudge_ = job_to_save.get(); | |
553 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
532 return false; | 554 return false; |
533 } | 555 } |
534 | 556 |
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { | |
536 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
537 if (job.purpose == SyncSessionJob::NUDGE) { | |
538 SDVLOG(2) << "Saving a nudge job"; | |
539 InitOrCoalescePendingJob(job); | |
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | |
541 SDVLOG(2) << "Saving a configuration job"; | |
542 DCHECK(wait_interval_.get()); | |
543 DCHECK(mode_ == CONFIGURATION_MODE); | |
544 | |
545 // Config params should always get set. | |
546 DCHECK(!job.config_params.ready_task.is_null()); | |
547 SyncSession* old = job.session.get(); | |
548 SyncSession* s(new SyncSession(session_context_, this, old->source(), | |
549 old->routing_info(), old->workers())); | |
550 SyncSessionJob new_job(job.purpose, | |
551 TimeTicks::Now(), | |
552 make_linked_ptr(s), | |
553 false, | |
554 job.config_params, | |
555 job.from_here); | |
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | |
557 } // drop the rest. | |
558 // TODO(sync): Is it okay to drop the rest? It's weird that | |
559 // SaveJob() only does what it says sometimes. (See | |
560 // http://crbug.com/90868.) | |
561 } | |
562 | |
563 // Functor for std::find_if to search by ModelSafeGroup. | 557 // Functor for std::find_if to search by ModelSafeGroup. |
564 struct ModelSafeWorkerGroupIs { | 558 struct ModelSafeWorkerGroupIs { |
565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 559 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
566 bool operator()(ModelSafeWorker* w) { | 560 bool operator()(ModelSafeWorker* w) { |
567 return group == w->GetModelSafeGroup(); | 561 return group == w->GetModelSafeGroup(); |
568 } | 562 } |
569 ModelSafeGroup group; | 563 ModelSafeGroup group; |
570 }; | 564 }; |
571 | 565 |
572 void SyncSchedulerImpl::ScheduleNudgeAsync( | 566 void SyncSchedulerImpl::ScheduleNudgeAsync( |
573 const TimeDelta& delay, | 567 const TimeDelta& delay, |
574 NudgeSource source, ModelTypeSet types, | 568 NudgeSource source, ModelTypeSet types, |
575 const tracked_objects::Location& nudge_location) { | 569 const tracked_objects::Location& nudge_location) { |
576 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 570 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
577 SDVLOG_LOC(nudge_location, 2) | 571 SDVLOG_LOC(nudge_location, 2) |
578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 572 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
579 << "source " << GetNudgeSourceString(source) << ", " | 573 << "source " << GetNudgeSourceString(source) << ", " |
580 << "types " << ModelTypeSetToString(types); | 574 << "types " << ModelTypeSetToString(types); |
581 | 575 |
582 ModelTypeStateMap type_state_map = | 576 ModelTypeStateMap type_state_map = |
583 ModelTypeSetToStateMap(types, std::string()); | 577 ModelTypeSetToStateMap(types, std::string()); |
584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 578 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
585 GetUpdatesFromNudgeSource(source), | 579 GetUpdatesFromNudgeSource(source), |
586 type_state_map, | 580 type_state_map, |
587 false, | |
588 nudge_location); | 581 nudge_location); |
589 } | 582 } |
590 | 583 |
591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 584 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
592 const TimeDelta& delay, | 585 const TimeDelta& delay, |
593 NudgeSource source, const ModelTypeStateMap& type_state_map, | 586 NudgeSource source, const ModelTypeStateMap& type_state_map, |
594 const tracked_objects::Location& nudge_location) { | 587 const tracked_objects::Location& nudge_location) { |
595 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 588 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
596 SDVLOG_LOC(nudge_location, 2) | 589 SDVLOG_LOC(nudge_location, 2) |
597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 590 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
598 << "source " << GetNudgeSourceString(source) << ", " | 591 << "source " << GetNudgeSourceString(source) << ", " |
599 << "payloads " | 592 << "payloads " |
600 << ModelTypeStateMapToString(type_state_map); | 593 << ModelTypeStateMapToString(type_state_map); |
601 | 594 |
602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 595 SyncSchedulerImpl::ScheduleNudgeImpl(delay, |
603 GetUpdatesFromNudgeSource(source), | 596 GetUpdatesFromNudgeSource(source), |
604 type_state_map, | 597 type_state_map, |
605 false, | |
606 nudge_location); | 598 nudge_location); |
607 } | 599 } |
608 | 600 |
609 void SyncSchedulerImpl::ScheduleNudgeImpl( | 601 void SyncSchedulerImpl::ScheduleNudgeImpl( |
610 const TimeDelta& delay, | 602 const TimeDelta& delay, |
611 GetUpdatesCallerInfo::GetUpdatesSource source, | 603 GetUpdatesCallerInfo::GetUpdatesSource source, |
612 const ModelTypeStateMap& type_state_map, | 604 const ModelTypeStateMap& type_state_map, |
613 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 605 const tracked_objects::Location& nudge_location) { |
614 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 606 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
615 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; | 607 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; |
616 | 608 |
617 SDVLOG_LOC(nudge_location, 2) | 609 SDVLOG_LOC(nudge_location, 2) |
618 << "In ScheduleNudgeImpl with delay " | 610 << "In ScheduleNudgeImpl with delay " |
619 << delay.InMilliseconds() << " ms, " | 611 << delay.InMilliseconds() << " ms, " |
620 << "source " << GetUpdatesSourceString(source) << ", " | 612 << "source " << GetUpdatesSourceString(source) << ", " |
621 << "payloads " | 613 << "payloads " |
622 << ModelTypeStateMapToString(type_state_map) | 614 << ModelTypeStateMapToString(type_state_map); |
623 << (is_canary_job ? " (canary)" : ""); | |
624 | 615 |
625 SyncSourceInfo info(source, type_state_map); | 616 SyncSourceInfo info(source, type_state_map); |
626 UpdateNudgeTimeRecords(info); | 617 UpdateNudgeTimeRecords(info); |
627 | 618 |
628 SyncSession* session(CreateSyncSession(info)); | 619 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 620 SyncSessionJob::NUDGE, |
630 make_linked_ptr(session), is_canary_job, | 621 TimeTicks::Now() + delay, |
631 ConfigurationParams(), nudge_location); | 622 CreateSyncSession(info).Pass(), |
623 ConfigurationParams(), | |
624 nudge_location)); | |
632 | 625 |
633 session = NULL; | 626 if (!ShouldRunJobSaveIfNecessary(job.get())) |
634 if (!ShouldRunJob(job)) | |
635 return; | 627 return; |
636 | 628 |
637 if (pending_nudge_.get()) { | 629 if (pending_nudge_) { |
638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | |
639 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | |
640 return; | |
641 } | |
642 | |
643 SDVLOG(2) << "Coalescing pending nudge"; | |
644 pending_nudge_->session->Coalesce(*(job.session.get())); | |
645 | |
646 SDVLOG(2) << "Rescheduling pending nudge"; | 630 SDVLOG(2) << "Rescheduling pending nudge"; |
647 SyncSession* s = pending_nudge_->session.get(); | 631 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
648 job.session.reset(new SyncSession(s->context(), s->delegate(), | 632 // Choose the start time as the earliest of the 2. Note that this means |
649 s->source(), s->routing_info(), s->workers())); | 633 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
650 | 634 // but a nudge is already scheduled to go out, we'll send the (tab) commit |
651 // Choose the start time as the earliest of the 2. | 635 // without waiting. |
652 job.scheduled_start = std::min(job.scheduled_start, | 636 pending_nudge_->set_scheduled_start( |
653 pending_nudge_->scheduled_start); | 637 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
654 pending_nudge_.reset(); | 638 // Abandon the old task by cloning and replacing the session. |
639 // It's possible that by "rescheduling" we're actually taking a job that | |
640 // was previously unscheduled and giving it wings, so take care to reset | |
641 // unscheduled nudge storage. | |
642 job = pending_nudge_->CloneAndAbandon(); | |
643 unscheduled_nudge_storage_.reset(); | |
644 pending_nudge_ = NULL; | |
655 } | 645 } |
656 | 646 |
657 // TODO(zea): Consider adding separate throttling/backoff for datatype | 647 // TODO(zea): Consider adding separate throttling/backoff for datatype |
658 // refresh requests. | 648 // refresh requests. |
659 ScheduleSyncSessionJob(job); | 649 ScheduleSyncSessionJob(job.Pass()); |
660 } | 650 } |
661 | 651 |
662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 652 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
663 switch (mode) { | 653 switch (mode) { |
664 ENUM_CASE(CONFIGURATION_MODE); | 654 ENUM_CASE(CONFIGURATION_MODE); |
665 ENUM_CASE(NORMAL_MODE); | 655 ENUM_CASE(NORMAL_MODE); |
666 } | 656 } |
667 return ""; | 657 return ""; |
668 } | 658 } |
669 | 659 |
670 const char* SyncSchedulerImpl::GetDecisionString( | 660 const char* SyncSchedulerImpl::GetDecisionString( |
671 SyncSchedulerImpl::JobProcessDecision mode) { | 661 SyncSchedulerImpl::JobProcessDecision mode) { |
672 switch (mode) { | 662 switch (mode) { |
673 ENUM_CASE(CONTINUE); | 663 ENUM_CASE(CONTINUE); |
674 ENUM_CASE(SAVE); | 664 ENUM_CASE(SAVE); |
675 ENUM_CASE(DROP); | 665 ENUM_CASE(DROP); |
676 } | 666 } |
677 return ""; | 667 return ""; |
678 } | 668 } |
679 | 669 |
680 // static | |
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose( | |
682 SyncSessionJob::SyncSessionJobPurpose purpose, | |
683 SyncerStep* start, | |
684 SyncerStep* end) { | |
685 switch (purpose) { | |
686 case SyncSessionJob::CONFIGURATION: | |
687 *start = DOWNLOAD_UPDATES; | |
688 *end = APPLY_UPDATES; | |
689 return; | |
690 case SyncSessionJob::NUDGE: | |
691 case SyncSessionJob::POLL: | |
692 *start = SYNCER_BEGIN; | |
693 *end = SYNCER_END; | |
694 return; | |
695 default: | |
696 NOTREACHED(); | |
697 *start = SYNCER_END; | |
698 *end = SYNCER_END; | |
699 return; | |
700 } | |
701 } | |
702 | |
703 void SyncSchedulerImpl::PostTask( | 670 void SyncSchedulerImpl::PostTask( |
704 const tracked_objects::Location& from_here, | 671 const tracked_objects::Location& from_here, |
705 const char* name, const base::Closure& task) { | 672 const char* name, const base::Closure& task) { |
706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 673 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
707 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 674 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
708 if (!started_) { | 675 if (!started_) { |
709 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 676 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
710 return; | 677 return; |
711 } | 678 } |
712 sync_loop_->PostTask(from_here, task); | 679 sync_loop_->PostTask(from_here, task); |
713 } | 680 } |
714 | 681 |
715 void SyncSchedulerImpl::PostDelayedTask( | 682 void SyncSchedulerImpl::PostDelayedTask( |
716 const tracked_objects::Location& from_here, | 683 const tracked_objects::Location& from_here, |
717 const char* name, const base::Closure& task, base::TimeDelta delay) { | 684 const char* name, const base::Closure& task, base::TimeDelta delay) { |
718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 685 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
719 << delay.InMilliseconds() << " ms delay"; | 686 << delay.InMilliseconds() << " ms delay"; |
720 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 687 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
721 if (!started_) { | 688 if (!started_) { |
722 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 689 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
723 return; | 690 return; |
724 } | 691 } |
725 sync_loop_->PostDelayedTask(from_here, task, delay); | 692 sync_loop_->PostDelayedTask(from_here, task, delay); |
726 } | 693 } |
727 | 694 |
728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { | 695 void SyncSchedulerImpl::ScheduleSyncSessionJob( |
696 scoped_ptr<SyncSessionJob> job) { | |
729 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 697 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
730 if (no_scheduling_allowed_) { | 698 if (no_scheduling_allowed_) { |
731 NOTREACHED() << "Illegal to schedule job while session in progress."; | 699 NOTREACHED() << "Illegal to schedule job while session in progress."; |
732 return; | 700 return; |
733 } | 701 } |
734 | 702 |
735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); | 703 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); |
736 if (delay < TimeDelta::FromMilliseconds(0)) | 704 if (delay < TimeDelta::FromMilliseconds(0)) |
737 delay = TimeDelta::FromMilliseconds(0); | 705 delay = TimeDelta::FromMilliseconds(0); |
738 SDVLOG_LOC(job.from_here, 2) | 706 SDVLOG_LOC(job->from_location(), 2) |
739 << "In ScheduleSyncSessionJob with " | 707 << "In ScheduleSyncSessionJob with " |
740 << SyncSessionJob::GetPurposeString(job.purpose) | 708 << SyncSessionJob::GetPurposeString(job->purpose()) |
741 << " job and " << delay.InMilliseconds() << " ms delay"; | 709 << " job and " << delay.InMilliseconds() << " ms delay"; |
742 | 710 |
743 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 711 DCHECK(job->purpose() == SyncSessionJob::NUDGE || |
744 job.purpose == SyncSessionJob::POLL); | 712 job->purpose() == SyncSessionJob::POLL); |
745 if (job.purpose == SyncSessionJob::NUDGE) { | 713 if (job->purpose() == SyncSessionJob::NUDGE) { |
746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; | 714 SDVLOG_LOC(job->from_location(), 2) << "Resetting pending_nudge to "; |
747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == | 715 DCHECK(!pending_nudge_ || pending_nudge_->session() == |
748 job.session); | 716 job->session()); |
749 pending_nudge_.reset(new SyncSessionJob(job)); | 717 pending_nudge_ = job.get(); |
750 } | 718 } |
751 PostDelayedTask(job.from_here, "DoSyncSessionJob", | 719 |
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, | 720 tracked_objects::Location loc(job->from_location()); |
753 weak_ptr_factory_.GetWeakPtr(), | 721 PostDelayedTask(loc, "DoSyncSessionJob", |
akalin
2012/09/25 22:37:24
why not just inline job->from_location() here? Or
tim (not reviewing)
2012/10/08 00:20:03
:( Because base::Passed takes ownership of the job
| |
754 job), | 722 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
755 delay); | 723 weak_ptr_factory_.GetWeakPtr(), |
724 base::Passed(&job)), | |
725 delay); | |
756 } | 726 } |
757 | 727 |
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { | 728 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
759 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 729 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
730 if (job->purpose() == SyncSessionJob::NUDGE) { | |
rlarocque
2012/09/24 21:15:14
I get the feeling that there's not much point in s
tim (not reviewing)
2012/10/08 00:20:03
There's still a chunk of common code, but also, fo
rlarocque
2012/10/08 19:18:04
I see your point, but I think the call sites shoul
tim (not reviewing)
2012/10/11 17:35:14
I'm not convinced we should do this yet. I still
rlarocque
2012/10/11 18:15:08
I think it would be easier to define a function "D
| |
731 if (pending_nudge_ == NULL || | |
732 pending_nudge_->session() != job->session()) { | |
733 // |job| is abandoned. | |
734 SDVLOG(2) << "Dropping a nudge in " | |
735 << "DoSyncSessionJob because another nudge was scheduled"; | |
736 return false; | |
737 } | |
738 pending_nudge_ = NULL; | |
739 | |
740 // Rebase the session with the latest model safe table and use it to purge | |
741 // and update any disabled or modified entries in the job. | |
742 job->mutable_session()->RebaseRoutingInfoWithLatest( | |
743 session_context_->routing_info(), session_context_->workers()); | |
744 } | |
760 | 745 |
761 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 746 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
762 if (!ShouldRunJob(job)) { | 747 GetUpdatesCallerInfo::GetUpdatesSource source( |
748 job->session()->source().updates_source); | |
749 if (!ShouldRunJobSaveIfNecessary(job.get())) { | |
763 SLOG(WARNING) | 750 SLOG(WARNING) |
764 << "Not executing " | 751 << "Not executing " |
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " | 752 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from " |
766 << GetUpdatesSourceString(job.session->source().updates_source); | 753 << GetUpdatesSourceString(source); |
767 return; | 754 return false; |
768 } | 755 } |
769 | 756 |
770 if (job.purpose == SyncSessionJob::NUDGE) { | |
771 if (pending_nudge_.get() == NULL || | |
772 pending_nudge_->session != job.session) { | |
773 SDVLOG(2) << "Dropping a nudge in " | |
774 << "DoSyncSessionJob because another nudge was scheduled"; | |
775 return; // Another nudge must have been scheduled in in the meantime. | |
776 } | |
777 pending_nudge_.reset(); | |
778 | |
779 // Create the session with the latest model safe table and use it to purge | |
780 // and update any disabled or modified entries in the job. | |
781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | |
782 | |
783 job.session->RebaseRoutingInfoWithLatest(*session); | |
784 } | |
785 SDVLOG(2) << "DoSyncSessionJob with " | 757 SDVLOG(2) << "DoSyncSessionJob with " |
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; | 758 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; |
787 | |
788 SyncerStep begin(SYNCER_END); | |
789 SyncerStep end(SYNCER_END); | |
790 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | |
791 | 759 |
792 bool has_more_to_sync = true; | 760 bool has_more_to_sync = true; |
793 while (ShouldRunJob(job) && has_more_to_sync) { | 761 bool premature_exit = false; |
762 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) { | |
794 SDVLOG(2) << "Calling SyncShare."; | 763 SDVLOG(2) << "Calling SyncShare."; |
795 // Synchronously perform the sync session from this thread. | 764 // Synchronously perform the sync session from this thread. |
796 syncer_->SyncShare(job.session.get(), begin, end); | 765 premature_exit = !syncer_->SyncShare(job->mutable_session(), |
797 has_more_to_sync = job.session->HasMoreToSync(); | 766 job->start_step(), |
767 job->end_step()); | |
768 | |
769 has_more_to_sync = job->session()->HasMoreToSync(); | |
798 if (has_more_to_sync) | 770 if (has_more_to_sync) |
799 job.session->PrepareForAnotherSyncCycle(); | 771 job->mutable_session()->PrepareForAnotherSyncCycle(); |
800 } | 772 } |
801 SDVLOG(2) << "Done SyncShare looping."; | 773 SDVLOG(2) << "Done SyncShare looping."; |
802 | 774 |
803 FinishSyncSessionJob(job); | 775 FinishSyncSessionJob(job.get(), premature_exit); |
akalin
2012/10/03 00:11:34
can you make SyncSessionJob::Finish() return the v
tim (not reviewing)
2012/10/08 00:20:03
Yeah, that works! Done.
| |
776 return job->Succeeded(); | |
804 } | 777 } |
805 | 778 |
806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 779 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
807 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 780 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
808 | 781 |
809 // We are interested in recording time between local nudges for datatypes. | 782 // We are interested in recording time between local nudges for datatypes. |
810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 783 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 784 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
812 return; | 785 return; |
813 | 786 |
814 base::TimeTicks now = TimeTicks::Now(); | 787 base::TimeTicks now = TimeTicks::Now(); |
815 // Update timing information for how often datatypes are triggering nudges. | 788 // Update timing information for how often datatypes are triggering nudges. |
816 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); | 789 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); |
817 iter != info.types.end(); | 790 iter != info.types.end(); |
818 ++iter) { | 791 ++iter) { |
819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 792 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
820 last_local_nudges_by_model_type_[iter->first] = now; | 793 last_local_nudges_by_model_type_[iter->first] = now; |
821 if (previous.is_null()) | 794 if (previous.is_null()) |
822 continue; | 795 continue; |
823 | 796 |
824 #define PER_DATA_TYPE_MACRO(type_str) \ | 797 #define PER_DATA_TYPE_MACRO(type_str) \ |
825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 798 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 799 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
827 #undef PER_DATA_TYPE_MACRO | 800 #undef PER_DATA_TYPE_MACRO |
828 } | 801 } |
829 } | 802 } |
830 | 803 |
831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { | 804 void SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, |
805 bool exited_prematurely) { | |
832 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 806 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
833 // Now update the status of the connection from SCM. We need this to decide | 807 // Now update the status of the connection from SCM. We need this to decide |
834 // whether we need to save/run future jobs. The notifications from SCM are not | 808 // whether we need to save/run future jobs. The notifications from SCM are |
835 // reliable. | 809 // not reliable. |
836 // | 810 // |
837 // TODO(rlarocque): crbug.com/110954 | 811 // TODO(rlarocque): crbug.com/110954 |
838 // We should get rid of the notifications and it is probably not needed to | 812 // We should get rid of the notifications and it is probably not needed to |
839 // maintain this status variable in 2 places. We should query it directly from | 813 // maintain this status variable in 2 places. We should query it directly |
840 // SCM when needed. | 814 // from SCM when needed. |
841 ServerConnectionManager* scm = session_context_->connection_manager(); | 815 ServerConnectionManager* scm = session_context_->connection_manager(); |
842 UpdateServerConnectionManagerStatus(scm->server_status()); | 816 UpdateServerConnectionManagerStatus(scm->server_status()); |
843 | 817 |
844 if (IsSyncingCurrentlySilenced()) { | 818 if (IsSyncingCurrentlySilenced()) { |
845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 819 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
846 // TODO(sync): Investigate whether we need to check job.purpose | 820 // If we're here, it's because |job| was silenced until a server specified |
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | 821 // time. (Note, it had to be |job|, because DecideOnJob would not permit |
848 SaveJob(job); | 822 // any job through while in WaitInterval::THROTTLED). |
849 return; // Nothing to do. | 823 scoped_ptr<SyncSessionJob> clone = job->CloneAndAbandon(); |
akalin
2012/10/03 00:11:34
Assume you implement my comment above re. passing
tim (not reviewing)
2012/10/08 00:20:03
Interesting point. In general the logic is if you
| |
850 } else if (job.session->Succeeded() && | 824 if (job->purpose() == SyncSessionJob::NUDGE) |
851 !job.config_params.ready_task.is_null()) { | 825 pending_nudge_ = clone.get(); |
852 // If this was a configuration job with a ready task, invoke it now that | 826 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
853 // we finished successfully. | 827 wait_interval_->pending_configure_job = clone.get(); |
828 else | |
829 clone.reset(); // Unthrottling is enough, no need to force a canary. | |
830 | |
831 RestartWaiting(clone.Pass()); | |
832 return; | |
833 } | |
834 | |
835 // Let job know that we're through syncing (calling SyncShare) at this point. | |
836 { | |
854 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 837 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
855 job.config_params.ready_task.Run(); | 838 job->Finish(exited_prematurely); |
856 } | 839 } |
857 | 840 |
858 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 841 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
859 ScheduleNextSync(job); | 842 ScheduleNextSync(job); |
860 } | 843 } |
861 | 844 |
862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { | 845 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob* old_job) { |
863 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 846 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
864 DCHECK(!old_job.session->HasMoreToSync()); | 847 DCHECK(!old_job->session()->HasMoreToSync()); |
865 | 848 |
866 AdjustPolling(&old_job); | 849 AdjustPolling(old_job); |
867 | 850 |
868 if (old_job.session->Succeeded()) { | 851 if (old_job->Succeeded()) { |
869 // Only reset backoff if we actually reached the server. | 852 // Only reset backoff if we actually reached the server. |
870 if (old_job.session->SuccessfullyReachedServer()) | 853 // It's possible that we reached the server on one attempt, then had an |
854 // error on the next (or didn't perform some of the server-communicating | |
855 // commands). We want to verify that, for all commands attempted, we | |
856 // successfully spoke with the server. Therefore, we verify no errors | |
857 // and at least one SYNCER_OK. | |
858 if (old_job->session()->DidReachServer()) | |
871 wait_interval_.reset(); | 859 wait_interval_.reset(); |
872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 860 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
873 return; | 861 return; |
874 } | 862 } |
875 | 863 |
876 if (old_job.purpose == SyncSessionJob::POLL) { | 864 if (old_job->purpose() == SyncSessionJob::POLL) { |
877 return; // We don't retry POLL jobs. | 865 return; // We don't retry POLL jobs. |
878 } | 866 } |
879 | 867 |
880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 868 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
881 // if we don't succeed. Some types of errors are not likely to disappear on | 869 // if we don't succeed. Some types of errors are not likely to disappear on |
882 // their own. With the return values now available in the old_job.session, we | 870 // their own. With the return values now available in the old_job.session, |
883 // should be able to detect such errors and only retry when we detect | 871 // we should be able to detect such errors and only retry when we detect |
884 // transient errors. | 872 // transient errors. |
885 | 873 |
886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 874 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
887 mode_ == NORMAL_MODE) { | 875 mode_ == NORMAL_MODE) { |
888 // When in normal mode, we allow up to one nudge per backoff interval. It | 876 // When in normal mode, we allow up to one nudge per backoff interval. It |
889 // appears that this was our nudge for this interval, and it failed. | 877 // appears that this was our nudge for this interval, and it failed. |
890 // | 878 // |
891 // Note: This does not prevent us from running canary jobs. For example, an | 879 // Note: This does not prevent us from running canary jobs. For example, |
892 // IP address change might still result in another nudge being executed | 880 // an IP address change might still result in another nudge being executed |
893 // during this backoff interval. | 881 // during this backoff interval. |
894 SDVLOG(2) << "A nudge during backoff failed"; | 882 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
895 | 883 DCHECK_EQ(SyncSessionJob::NUDGE, old_job->purpose()); |
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
897 DCHECK(!wait_interval_->had_nudge); | 884 DCHECK(!wait_interval_->had_nudge); |
898 | 885 |
899 wait_interval_->had_nudge = true; | 886 wait_interval_->had_nudge = true; |
900 InitOrCoalescePendingJob(old_job); | 887 DCHECK(!pending_nudge_); |
901 RestartWaiting(); | 888 |
889 scoped_ptr<SyncSessionJob> new_job = old_job->Clone(); | |
akalin
2012/10/03 00:11:34
same question here -- can you pass ownership to th
tim (not reviewing)
2012/10/08 00:20:03
I did change this around a bit, see my response ab
| |
890 pending_nudge_ = new_job.get(); | |
891 RestartWaiting(new_job.Pass()); | |
902 } else { | 892 } else { |
903 // Either this is the first failure or a consecutive failure after our | 893 // Either this is the first failure or a consecutive failure after our |
904 // backoff timer expired. We handle it the same way in either case. | 894 // backoff timer expired. We handle it the same way in either case. |
905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 895 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
906 HandleContinuationError(old_job); | 896 HandleContinuationError(old_job); |
907 } | 897 } |
908 } | 898 } |
909 | 899 |
910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 900 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
911 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 901 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
912 | 902 |
913 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 903 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
914 syncer_short_poll_interval_seconds_ : | 904 syncer_short_poll_interval_seconds_ : |
915 syncer_long_poll_interval_seconds_; | 905 syncer_long_poll_interval_seconds_; |
916 bool rate_changed = !poll_timer_.IsRunning() || | 906 bool rate_changed = !poll_timer_.IsRunning() || |
917 poll != poll_timer_.GetCurrentDelay(); | 907 poll != poll_timer_.GetCurrentDelay(); |
918 | 908 |
919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | 909 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
920 poll_timer_.Reset(); | 910 poll_timer_.Reset(); |
921 | 911 |
922 if (!rate_changed) | 912 if (!rate_changed) |
923 return; | 913 return; |
924 | 914 |
925 // Adjust poll rate. | 915 // Adjust poll rate. |
926 poll_timer_.Stop(); | 916 poll_timer_.Stop(); |
927 poll_timer_.Start(FROM_HERE, poll, this, | 917 poll_timer_.Start(FROM_HERE, poll, this, |
928 &SyncSchedulerImpl::PollTimerCallback); | 918 &SyncSchedulerImpl::PollTimerCallback); |
929 } | 919 } |
930 | 920 |
931 void SyncSchedulerImpl::RestartWaiting() { | 921 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
932 CHECK(wait_interval_.get()); | 922 CHECK(wait_interval_.get()); |
933 wait_interval_->timer.Stop(); | 923 wait_interval_->timer.Stop(); |
934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 924 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
935 this, &SyncSchedulerImpl::DoCanaryJob); | 925 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
926 base::Bind(&SyncSchedulerImpl::Unthrottle, | |
927 weak_ptr_factory_.GetWeakPtr(), | |
928 base::Passed(&job))); | |
929 } else { | |
930 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
931 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | |
932 weak_ptr_factory_.GetWeakPtr(), | |
933 base::Passed(&job))); | |
934 } | |
936 } | 935 } |
937 | 936 |
938 void SyncSchedulerImpl::HandleContinuationError( | 937 void SyncSchedulerImpl::HandleContinuationError( |
939 const SyncSessionJob& old_job) { | 938 const SyncSessionJob* old_job) { |
akalin
2012/10/03 00:11:34
similar to the above, can you pass ownership to th
tim (not reviewing)
2012/10/08 00:20:03
I'm going with the logic that a job has a well def
| |
940 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 939 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
941 if (DCHECK_IS_ON()) { | 940 if (DCHECK_IS_ON()) { |
942 if (IsBackingOff()) { | 941 if (IsBackingOff()) { |
943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 942 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); |
944 } | 943 } |
945 } | 944 } |
946 | 945 |
947 TimeDelta length = delay_provider_->GetDelay( | 946 TimeDelta length = delay_provider_->GetDelay( |
948 IsBackingOff() ? wait_interval_->length : | 947 IsBackingOff() ? wait_interval_->length : |
949 delay_provider_->GetInitialDelay( | 948 delay_provider_->GetInitialDelay( |
950 old_job.session->status_controller().model_neutral_state())); | 949 old_job->session()->status_controller().model_neutral_state())); |
951 | 950 |
952 SDVLOG(2) << "In handle continuation error with " | 951 SDVLOG(2) << "In handle continuation error with " |
953 << SyncSessionJob::GetPurposeString(old_job.purpose) | 952 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
954 << " job. The time delta(ms) is " | 953 << " job. The time delta(ms) is " |
955 << length.InMilliseconds(); | 954 << length.InMilliseconds(); |
956 | 955 |
957 // This will reset the had_nudge variable as well. | 956 // This will reset the had_nudge variable as well. |
958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 957 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
959 length)); | 958 length)); |
960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 959 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); |
960 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
961 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 962 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
962 // Config params should always get set. | 963 // Config params should always get set. |
963 DCHECK(!old_job.config_params.ready_task.is_null()); | 964 DCHECK(!old_job->config_params().ready_task.is_null()); |
964 SyncSession* old = old_job.session.get(); | 965 wait_interval_->pending_configure_job = new_job.get(); |
965 SyncSession* s(new SyncSession(session_context_, this, | |
966 old->source(), old->routing_info(), old->workers())); | |
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | |
968 make_linked_ptr(s), false, old_job.config_params, | |
969 FROM_HERE); | |
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | |
971 } else { | 966 } else { |
972 // We are not in configuration mode. So wait_interval's pending job | 967 // We are not in configuration mode. So wait_interval's pending job |
973 // should be null. | 968 // should be null. |
974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 969 DCHECK(wait_interval_->pending_configure_job == NULL); |
970 DCHECK(!pending_nudge_); | |
971 pending_nudge_ = new_job.get(); | |
972 } | |
975 | 973 |
976 // TODO(lipalani) - handle clear user data. | 974 RestartWaiting(new_job.Pass()); |
977 InitOrCoalescePendingJob(old_job); | |
978 } | |
979 RestartWaiting(); | |
980 } | 975 } |
981 | 976 |
982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 977 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
983 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 978 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
984 DCHECK(weak_handle_this_.IsInitialized()); | 979 DCHECK(weak_handle_this_.IsInitialized()); |
985 SDVLOG(3) << "Posting StopImpl"; | 980 SDVLOG(3) << "Posting StopImpl"; |
986 weak_handle_this_.Call(FROM_HERE, | 981 weak_handle_this_.Call(FROM_HERE, |
987 &SyncSchedulerImpl::StopImpl, | 982 &SyncSchedulerImpl::StopImpl, |
988 callback); | 983 callback); |
989 } | 984 } |
990 | 985 |
991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 986 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
992 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 987 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
993 SDVLOG(2) << "StopImpl called"; | 988 SDVLOG(2) << "StopImpl called"; |
994 | 989 |
995 // Kill any in-flight method calls. | 990 // Kill any in-flight method calls. |
996 weak_ptr_factory_.InvalidateWeakPtrs(); | 991 weak_ptr_factory_.InvalidateWeakPtrs(); |
997 wait_interval_.reset(); | 992 wait_interval_.reset(); |
998 poll_timer_.Stop(); | 993 poll_timer_.Stop(); |
999 if (started_) { | 994 if (started_) { |
1000 started_ = false; | 995 started_ = false; |
1001 } | 996 } |
1002 if (!callback.is_null()) | 997 if (!callback.is_null()) |
1003 callback.Run(); | 998 callback.Run(); |
1004 } | 999 } |
1005 | 1000 |
1006 void SyncSchedulerImpl::DoCanaryJob() { | 1001 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1002 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1008 SDVLOG(2) << "Do canary job"; | 1003 SDVLOG(2) << "Do canary job"; |
1009 DoPendingJobIfPossible(true); | 1004 |
1005 // Only set canary privlieges here, when we are about to run the job. This | |
akalin
2012/09/25 22:37:24
privileges
tim (not reviewing)
2012/10/08 00:20:03
Done.
| |
1006 // avoids confusion in managing canary bits during scheduling, when you | |
1007 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that | |
1008 // was scheduled as canary, and send it to an "unscheduled" state. | |
1009 to_be_canary->GrantCanaryPrivilege(); | |
1010 | |
1011 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { | |
1012 // TODO(tim): We should be able to remove this... | |
1013 scoped_ptr<SyncSession> temp = CreateSyncSession( | |
1014 to_be_canary->session()->source()).Pass(); | |
1015 // The routing info might have been changed since we cached the | |
1016 // pending nudge. Update it by coalescing to the latest. | |
1017 to_be_canary->mutable_session()->Coalesce(*(temp)); | |
1018 } | |
1019 DoSyncSessionJob(to_be_canary.Pass()); | |
1010 } | 1020 } |
1011 | 1021 |
1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { | 1022 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1023 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1014 SyncSessionJob* job_to_execute = NULL; | 1024 // If we find a scheduled pending_ job, abandon the old one and return a |
1025 // a clone. If unscheduled, just hand over ownership. | |
1026 scoped_ptr<SyncSessionJob> candidate; | |
1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1027 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
1016 && wait_interval_->pending_configure_job.get()) { | 1028 && wait_interval_->pending_configure_job) { |
1017 SDVLOG(2) << "Found pending configure job"; | 1029 SDVLOG(2) << "Found pending configure job"; |
1018 job_to_execute = wait_interval_->pending_configure_job.get(); | 1030 candidate = |
1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 1031 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); |
1032 wait_interval_->pending_configure_job = candidate.get(); | |
1033 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
1020 SDVLOG(2) << "Found pending nudge job"; | 1034 SDVLOG(2) << "Found pending nudge job"; |
1021 | 1035 candidate = pending_nudge_->CloneAndAbandon(); |
1022 scoped_ptr<SyncSession> session(CreateSyncSession( | 1036 pending_nudge_ = candidate.get(); |
1023 pending_nudge_->session->source())); | 1037 unscheduled_nudge_storage_.reset(); |
1024 | |
1025 // Also the routing info might have been changed since we cached the | |
1026 // pending nudge. Update it by coalescing to the latest. | |
1027 pending_nudge_->session->Coalesce(*(session.get())); | |
1028 // The pending nudge would be cleared in the DoSyncSessionJob function. | |
1029 job_to_execute = pending_nudge_.get(); | |
1030 } | 1038 } |
1031 | 1039 return candidate.Pass(); |
1032 if (job_to_execute != NULL) { | |
1033 SDVLOG(2) << "Executing pending job"; | |
1034 SyncSessionJob copy = *job_to_execute; | |
1035 copy.is_canary_job = is_canary_job; | |
1036 DoSyncSessionJob(copy); | |
1037 } | |
1038 } | 1040 } |
1039 | 1041 |
1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( | 1042 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( |
1041 const SyncSourceInfo& source) { | 1043 const SyncSourceInfo& source) { |
1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1044 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1043 DVLOG(2) << "Creating sync session with routes " | 1045 DVLOG(2) << "Creating sync session with routes " |
1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1046 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
1045 | 1047 |
1046 SyncSourceInfo info(source); | 1048 SyncSourceInfo info(source); |
1047 SyncSession* session(new SyncSession(session_context_, this, info, | 1049 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, |
1048 session_context_->routing_info(), session_context_->workers())); | 1050 session_context_->routing_info(), session_context_->workers())); |
1049 | |
1050 return session; | |
1051 } | 1051 } |
1052 | 1052 |
1053 void SyncSchedulerImpl::PollTimerCallback() { | 1053 void SyncSchedulerImpl::PollTimerCallback() { |
1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1055 ModelSafeRoutingInfo r; | 1055 ModelSafeRoutingInfo r; |
1056 ModelTypeStateMap type_state_map = | 1056 ModelTypeStateMap type_state_map = |
1057 ModelSafeRoutingInfoToStateMap(r, std::string()); | 1057 ModelSafeRoutingInfoToStateMap(r, std::string()); |
1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); | 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); |
1059 SyncSession* s = CreateSyncSession(info); | 1059 scoped_ptr<SyncSession> s(CreateSyncSession(info)); |
1060 | 1060 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1061 TimeTicks::Now(), |
1062 make_linked_ptr(s), | 1062 s.Pass(), |
1063 false, | 1063 ConfigurationParams(), |
1064 ConfigurationParams(), | 1064 FROM_HERE)); |
1065 FROM_HERE); | 1065 ScheduleSyncSessionJob(job.Pass()); |
1066 | |
1067 ScheduleSyncSessionJob(job); | |
1068 } | 1066 } |
1069 | 1067 |
1070 void SyncSchedulerImpl::Unthrottle() { | 1068 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1069 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1070 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1073 SDVLOG(2) << "Unthrottled."; | 1071 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
1074 DoCanaryJob(); | 1072 << "canary."; |
1073 if (to_be_canary.get()) | |
1074 DoCanaryJob(to_be_canary.Pass()); | |
1075 | |
1076 // TODO(tim): ?! This must have been broken. The way DecideOnJob works today | |
rlarocque
2012/09/24 21:15:14
Agreed, but this comment won't make much sense oth
tim (not reviewing)
2012/10/08 00:20:03
Ah, right. I had left this as a comment for mysel
rlarocque
2012/10/08 19:18:04
On second thought, this function resets the wait_i
tim (not reviewing)
2012/10/11 17:35:14
You're right. It should mean we're good to go on t
| |
1077 // canary privileges aren't enough to bypass a THROTTLED wait interval, which | |
1078 // would suggest we need to reset first (though trusting canary in Decide is | |
1079 // probably the "right" thing to do). | |
1075 wait_interval_.reset(); | 1080 wait_interval_.reset(); |
1076 } | 1081 } |
1077 | 1082 |
1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1083 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1085 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1081 } | 1086 } |
1082 | 1087 |
1083 bool SyncSchedulerImpl::IsBackingOff() const { | 1088 bool SyncSchedulerImpl::IsBackingOff() const { |
1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1089 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1085 return wait_interval_.get() && wait_interval_->mode == | 1090 return wait_interval_.get() && wait_interval_->mode == |
1086 WaitInterval::EXPONENTIAL_BACKOFF; | 1091 WaitInterval::EXPONENTIAL_BACKOFF; |
1087 } | 1092 } |
1088 | 1093 |
1089 void SyncSchedulerImpl::OnSilencedUntil( | 1094 void SyncSchedulerImpl::OnSilencedUntil( |
1090 const base::TimeTicks& silenced_until) { | 1095 const base::TimeTicks& silenced_until) { |
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1096 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1097 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
1093 silenced_until - TimeTicks::Now())); | 1098 silenced_until - TimeTicks::Now())); |
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, | |
1095 &SyncSchedulerImpl::Unthrottle); | |
1096 } | 1099 } |
1097 | 1100 |
1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1101 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1102 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1100 return wait_interval_.get() && wait_interval_->mode == | 1103 return wait_interval_.get() && wait_interval_->mode == |
1101 WaitInterval::THROTTLED; | 1104 WaitInterval::THROTTLED; |
1102 } | 1105 } |
1103 | 1106 |
1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1107 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
1105 const base::TimeDelta& new_interval) { | 1108 const base::TimeDelta& new_interval) { |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1159 | 1162 |
1160 #undef SDVLOG_LOC | 1163 #undef SDVLOG_LOC |
1161 | 1164 |
1162 #undef SDVLOG | 1165 #undef SDVLOG |
1163 | 1166 |
1164 #undef SLOG | 1167 #undef SLOG |
1165 | 1168 |
1166 #undef ENUM_CASE | 1169 #undef ENUM_CASE |
1167 | 1170 |
1168 } // namespace syncer | 1171 } // namespace syncer |
OLD | NEW |