Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(410)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 10917234: sync: make scheduling logic and job ownership more obvious. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: tweak Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
12 #include "base/compiler_specific.h" 13 #include "base/compiler_specific.h"
13 #include "base/location.h" 14 #include "base/location.h"
14 #include "base/logging.h" 15 #include "base/logging.h"
15 #include "base/message_loop.h" 16 #include "base/message_loop.h"
16 #include "sync/engine/backoff_delay_provider.h" 17 #include "sync/engine/backoff_delay_provider.h"
17 #include "sync/engine/syncer.h" 18 #include "sync/engine/syncer.h"
18 #include "sync/engine/throttled_data_type_tracker.h" 19 #include "sync/engine/throttled_data_type_tracker.h"
19 #include "sync/protocol/proto_enum_conversions.h" 20 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h" 21 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h" 22 #include "sync/util/data_type_histogram.h"
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 : source(source), 77 : source(source),
77 types_to_download(types_to_download), 78 types_to_download(types_to_download),
78 routing_info(routing_info), 79 routing_info(routing_info),
79 ready_task(ready_task) { 80 ready_task(ready_task) {
80 DCHECK(!ready_task.is_null()); 81 DCHECK(!ready_task.is_null());
81 } 82 }
82 ConfigurationParams::~ConfigurationParams() {} 83 ConfigurationParams::~ConfigurationParams() {}
83 84
84 SyncSchedulerImpl::WaitInterval::WaitInterval() 85 SyncSchedulerImpl::WaitInterval::WaitInterval()
85 : mode(UNKNOWN), 86 : mode(UNKNOWN),
86 had_nudge(false) { 87 had_nudge(false),
87 } 88 pending_configure_job(NULL) {}
89
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
91 : mode(mode), had_nudge(false), length(length),
92 pending_configure_job(NULL) {}
88 93
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
90 95
91 #define ENUM_CASE(x) case x: return #x; break; 96 #define ENUM_CASE(x) case x: return #x; break;
92 97
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
94 switch (mode) { 99 switch (mode) {
95 ENUM_CASE(UNKNOWN); 100 ENUM_CASE(UNKNOWN);
96 ENUM_CASE(EXPONENTIAL_BACKOFF); 101 ENUM_CASE(EXPONENTIAL_BACKOFF);
97 ENUM_CASE(THROTTLED); 102 ENUM_CASE(THROTTLED);
98 } 103 }
99 NOTREACHED(); 104 NOTREACHED();
100 return ""; 105 return "";
101 } 106 }
102 107
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob()
104 : purpose(UNKNOWN),
105 is_canary_job(false) {
106 }
107
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {}
109
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
111 base::TimeTicks start,
112 linked_ptr<sessions::SyncSession> session,
113 bool is_canary_job,
114 const ConfigurationParams& config_params,
115 const tracked_objects::Location& from_here)
116 : purpose(purpose),
117 scheduled_start(start),
118 session(session),
119 is_canary_job(is_canary_job),
120 config_params(config_params),
121 from_here(from_here) {
122 }
123
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString(
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
126 switch (purpose) {
127 ENUM_CASE(UNKNOWN);
128 ENUM_CASE(POLL);
129 ENUM_CASE(NUDGE);
130 ENUM_CASE(CONFIGURATION);
131 }
132 NOTREACHED();
133 return "";
134 }
135
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
137 NudgeSource source) { 109 NudgeSource source) {
138 switch (source) { 110 switch (source) {
139 case NUDGE_SOURCE_NOTIFICATION: 111 case NUDGE_SOURCE_NOTIFICATION:
140 return GetUpdatesCallerInfo::NOTIFICATION; 112 return GetUpdatesCallerInfo::NOTIFICATION;
141 case NUDGE_SOURCE_LOCAL: 113 case NUDGE_SOURCE_LOCAL:
142 return GetUpdatesCallerInfo::LOCAL; 114 return GetUpdatesCallerInfo::LOCAL;
143 case NUDGE_SOURCE_CONTINUATION: 115 case NUDGE_SOURCE_CONTINUATION:
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
145 case NUDGE_SOURCE_LOCAL_REFRESH: 117 case NUDGE_SOURCE_LOCAL_REFRESH:
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 118 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
147 case NUDGE_SOURCE_UNKNOWN: 119 case NUDGE_SOURCE_UNKNOWN:
148 return GetUpdatesCallerInfo::UNKNOWN; 120 return GetUpdatesCallerInfo::UNKNOWN;
149 default: 121 default:
150 NOTREACHED(); 122 NOTREACHED();
151 return GetUpdatesCallerInfo::UNKNOWN; 123 return GetUpdatesCallerInfo::UNKNOWN;
152 } 124 }
153 } 125 }
154 126
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
156 : mode(mode), had_nudge(false), length(length) { }
157
158 // Helper macros to log with the syncer thread name; useful when there 127 // Helper macros to log with the syncer thread name; useful when there
159 // are multiple syncer threads involved. 128 // are multiple syncer threads involved.
160 129
161 #define SLOG(severity) LOG(severity) << name_ << ": " 130 #define SLOG(severity) LOG(severity) << name_ << ": "
162 131
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
164 133
165 #define SDVLOG_LOC(from_here, verbose_level) \ 134 #define SDVLOG_LOC(from_here, verbose_level) \
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 135 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
167 136
(...skipping 30 matching lines...) Expand all
198 syncer_short_poll_interval_seconds_( 167 syncer_short_poll_interval_seconds_(
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
200 syncer_long_poll_interval_seconds_( 169 syncer_long_poll_interval_seconds_(
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
202 sessions_commit_delay_( 171 sessions_commit_delay_(
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
204 mode_(NORMAL_MODE), 173 mode_(NORMAL_MODE),
205 // Start with assuming everything is fine with the connection. 174 // Start with assuming everything is fine with the connection.
206 // At the end of the sync cycle we would have the correct status. 175 // At the end of the sync cycle we would have the correct status.
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), 176 connection_code_(HttpResponse::SERVER_CONNECTION_OK),
177 pending_nudge_(NULL),
208 delay_provider_(delay_provider), 178 delay_provider_(delay_provider),
209 syncer_(syncer), 179 syncer_(syncer),
210 session_context_(context), 180 session_context_(context),
211 no_scheduling_allowed_(false) { 181 no_scheduling_allowed_(false) {
212 DCHECK(sync_loop_); 182 DCHECK(sync_loop_);
213 } 183 }
214 184
215 SyncSchedulerImpl::~SyncSchedulerImpl() { 185 SyncSchedulerImpl::~SyncSchedulerImpl() {
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); 186 DCHECK_EQ(MessageLoop::current(), sync_loop_);
217 StopImpl(base::Closure()); 187 StopImpl(base::Closure());
(...skipping 16 matching lines...) Expand all
234 void SyncSchedulerImpl::OnConnectionStatusChange() { 204 void SyncSchedulerImpl::OnConnectionStatusChange() {
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { 205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) {
236 // Optimistically assume that the connection is fixed and try 206 // Optimistically assume that the connection is fixed and try
237 // connecting. 207 // connecting.
238 OnServerConnectionErrorFixed(); 208 OnServerConnectionErrorFixed();
239 } 209 }
240 } 210 }
241 211
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; 213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
214 // There could be a pending nudge or configuration job in several cases:
215 //
216 // 1. We're in exponential backoff.
217 // 2. We're silenced / throttled.
218 // 3. A nudge was saved previously due to not having a valid auth token.
219 // 4. A nudge was scheduled + saved while in configuration mode.
220 //
221 // In all cases except (2), we want to retry contacting the server. We
222 // call DoCanaryJob to achieve this, and note that nothing -- not even a
223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
224 // has the authority to do that is the Unthrottle timer.
225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
226 if (!pending.get())
227 return;
228
244 PostTask(FROM_HERE, "DoCanaryJob", 229 PostTask(FROM_HERE, "DoCanaryJob",
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, 230 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
246 weak_ptr_factory_.GetWeakPtr())); 231 weak_ptr_factory_.GetWeakPtr(),
247 232 base::Passed(&pending)));
248 } 233 }
249 234
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( 235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus(
251 HttpResponse::ServerConnectionCode code) { 236 HttpResponse::ServerConnectionCode code) {
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); 237 DCHECK_EQ(MessageLoop::current(), sync_loop_);
253 SDVLOG(2) << "New server connection code: " 238 SDVLOG(2) << "New server connection code: "
254 << HttpResponse::GetServerConnectionCodeString(code); 239 << HttpResponse::GetServerConnectionCodeString(code);
255 240
256 connection_code_ = code; 241 connection_code_ = code;
257 } 242 }
(...skipping 15 matching lines...) Expand all
273 Mode old_mode = mode_; 258 Mode old_mode = mode_;
274 mode_ = mode; 259 mode_ = mode;
275 AdjustPolling(NULL); // Will kick start poll timer if needed. 260 AdjustPolling(NULL); // Will kick start poll timer if needed.
276 261
277 if (old_mode != mode_) { 262 if (old_mode != mode_) {
278 // We just changed our mode. See if there are any pending jobs that we could 263 // We just changed our mode. See if there are any pending jobs that we could
279 // execute in the new mode. 264 // execute in the new mode.
280 if (mode_ == NORMAL_MODE) { 265 if (mode_ == NORMAL_MODE) {
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job 266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
282 // has not yet completed. 267 // has not yet completed.
283 DCHECK(!wait_interval_.get() || 268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
284 !wait_interval_->pending_configure_job.get());
285 } 269 }
286 270
287 DoPendingJobIfPossible(false); 271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
272 if (pending.get()) {
273 // TODO(tim): We should be able to remove this...
274 scoped_ptr<SyncSession> session(CreateSyncSession(
275 pending->session()->source()));
276 // Also the routing info might have been changed since we cached the
277 // pending nudge. Update it by coalescing to the latest.
278 pending->mutable_session()->Coalesce(*(session));
279 SDVLOG(2) << "Executing pending job. Good luck!";
280 DoSyncSessionJob(pending.Pass());
281 }
288 } 282 }
289 } 283 }
290 284
291 void SyncSchedulerImpl::SendInitialSnapshot() { 285 void SyncSchedulerImpl::SendInitialSnapshot() {
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); 286 DCHECK_EQ(MessageLoop::current(), sync_loop_);
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, 287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
294 SyncSourceInfo(), ModelSafeRoutingInfo(), 288 SyncSourceInfo(), ModelSafeRoutingInfo(),
295 std::vector<ModelSafeWorker*>())); 289 std::vector<ModelSafeWorker*>()));
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
297 event.snapshot = dummy->TakeSnapshot(); 291 event.snapshot = dummy->TakeSnapshot();
(...skipping 23 matching lines...) Expand all
321 bool SyncSchedulerImpl::ScheduleConfiguration( 315 bool SyncSchedulerImpl::ScheduleConfiguration(
322 const ConfigurationParams& params) { 316 const ConfigurationParams& params) {
323 DCHECK_EQ(MessageLoop::current(), sync_loop_); 317 DCHECK_EQ(MessageLoop::current(), sync_loop_);
324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
325 DCHECK_EQ(CONFIGURATION_MODE, mode_); 319 DCHECK_EQ(CONFIGURATION_MODE, mode_);
326 DCHECK(!params.ready_task.is_null()); 320 DCHECK(!params.ready_task.is_null());
327 SDVLOG(2) << "Reconfiguring syncer."; 321 SDVLOG(2) << "Reconfiguring syncer.";
328 322
329 // Only one configuration is allowed at a time. Verify we're not waiting 323 // Only one configuration is allowed at a time. Verify we're not waiting
330 // for a pending configure job. 324 // for a pending configure job.
331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); 325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
332 326
333 ModelSafeRoutingInfo restricted_routes; 327 ModelSafeRoutingInfo restricted_routes;
334 BuildModelSafeParams(params.types_to_download, 328 BuildModelSafeParams(params.types_to_download,
335 params.routing_info, 329 params.routing_info,
336 &restricted_routes); 330 &restricted_routes);
337 session_context_->set_routing_info(params.routing_info); 331 session_context_->set_routing_info(params.routing_info);
338 332
339 // Only reconfigure if we have types to download. 333 // Only reconfigure if we have types to download.
340 if (!params.types_to_download.Empty()) { 334 if (!params.types_to_download.Empty()) {
341 DCHECK(!restricted_routes.empty()); 335 DCHECK(!restricted_routes.empty());
342 linked_ptr<SyncSession> session(new SyncSession( 336 scoped_ptr<SyncSession> session(new SyncSession(
343 session_context_, 337 session_context_,
344 this, 338 this,
345 SyncSourceInfo(params.source, 339 SyncSourceInfo(params.source,
346 ModelSafeRoutingInfoToInvalidationMap( 340 ModelSafeRoutingInfoToInvalidationMap(
347 restricted_routes, 341 restricted_routes,
348 std::string())), 342 std::string())),
349 restricted_routes, 343 restricted_routes,
350 session_context_->workers())); 344 session_context_->workers()));
351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, 345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
352 TimeTicks::Now(), 346 SyncSessionJob::CONFIGURATION,
353 session, 347 TimeTicks::Now(),
354 false, 348 session.Pass(),
355 params, 349 params,
356 FROM_HERE); 350 FROM_HERE));
357 DoSyncSessionJob(job); 351 bool succeeded = DoSyncSessionJob(job.Pass());
358 352
359 // If we failed, the job would have been saved as the pending configure 353 // If we failed, the job would have been saved as the pending configure
360 // job and a wait interval would have been set. 354 // job and a wait interval would have been set.
361 if (!session->Succeeded()) { 355 if (!succeeded) {
362 DCHECK(wait_interval_.get() && 356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job);
363 wait_interval_->pending_configure_job.get());
364 return false; 357 return false;
365 } 358 }
366 } else { 359 } else {
367 SDVLOG(2) << "No change in routing info, calling ready task directly."; 360 SDVLOG(2) << "No change in routing info, calling ready task directly.";
368 params.ready_task.Run(); 361 params.ready_task.Run();
369 } 362 }
370 363
371 return true; 364 return true;
372 } 365 }
373 366
374 SyncSchedulerImpl::JobProcessDecision 367 SyncSchedulerImpl::JobProcessDecision
375 SyncSchedulerImpl::DecideWhileInWaitInterval( 368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) {
akalin 2012/10/17 17:39:17 why does this need a pointer?
tim (not reviewing) 2012/10/17 20:33:32 Every callsite has a pointer, so it seems simpler
akalin 2012/10/17 22:55:45 I don't know...const references are the usual styl
376 const SyncSessionJob& job) {
377 DCHECK_EQ(MessageLoop::current(), sync_loop_); 369 DCHECK_EQ(MessageLoop::current(), sync_loop_);
378 DCHECK(wait_interval_.get()); 370 DCHECK(wait_interval_.get());
379 371
380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
381 << WaitInterval::GetModeString(wait_interval_->mode) 373 << WaitInterval::GetModeString(wait_interval_->mode)
382 << (wait_interval_->had_nudge ? " (had nudge)" : "") 374 << (wait_interval_->had_nudge ? " (had nudge)" : "")
383 << (job.is_canary_job ? " (canary)" : ""); 375 << (job->is_canary() ? " (canary)" : "");
384 376
385 if (job.purpose == SyncSessionJob::POLL) 377 if (job->purpose() == SyncSessionJob::POLL)
386 return DROP; 378 return DROP;
387 379
388 DCHECK(job.purpose == SyncSessionJob::NUDGE || 380 // If we save a job while in a WaitInterval, there is a well-defined moment
389 job.purpose == SyncSessionJob::CONFIGURATION); 381 // in time in the future when it makes sense for that SAVE-worthy job to try
382 // running again -- the end of the WaitInterval.
383 DCHECK(job->purpose() == SyncSessionJob::NUDGE ||
384 job->purpose() == SyncSessionJob::CONFIGURATION);
385
386 // If throttled, there's a clock ticking to unthrottle. We want to get
387 // on the same train.
390 if (wait_interval_->mode == WaitInterval::THROTTLED) 388 if (wait_interval_->mode == WaitInterval::THROTTLED)
391 return SAVE; 389 return SAVE;
392 390
393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
394 if (job.purpose == SyncSessionJob::NUDGE) { 392 if (job->purpose() == SyncSessionJob::NUDGE) {
395 if (mode_ == CONFIGURATION_MODE) 393 if (mode_ == CONFIGURATION_MODE)
396 return SAVE; 394 return SAVE;
397 395
398 // If we already had one nudge then just drop this nudge. We will retry 396 // If we already had one nudge then just drop this nudge. We will retry
399 // later when the timer runs out. 397 // later when the timer runs out.
400 if (!job.is_canary_job) 398 if (!job->is_canary())
401 return wait_interval_->had_nudge ? DROP : CONTINUE; 399 return wait_interval_->had_nudge ? DROP : CONTINUE;
402 else // We are here because timer ran out. So retry. 400 else // We are here because timer ran out. So retry.
403 return CONTINUE; 401 return CONTINUE;
404 } 402 }
405 return job.is_canary_job ? CONTINUE : SAVE; 403 return job->is_canary() ? CONTINUE : SAVE;
406 } 404 }
407 405
408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
409 const SyncSessionJob& job) { 407 const SyncSessionJob* job) {
akalin 2012/10/17 17:39:17 why does this need a pointer?
tim (not reviewing) 2012/10/17 20:33:32 Every callsite has a pointer, so it seems simpler
akalin 2012/10/17 22:55:45 same.
410 DCHECK_EQ(MessageLoop::current(), sync_loop_); 408 DCHECK_EQ(MessageLoop::current(), sync_loop_);
411 409
412 // See if our type is throttled. 410 // See if our type is throttled.
413 ModelTypeSet throttled_types = 411 ModelTypeSet throttled_types =
414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); 412 session_context_->throttled_data_type_tracker()->GetThrottledTypes();
415 if (job.purpose == SyncSessionJob::NUDGE && 413 if (job->purpose() == SyncSessionJob::NUDGE &&
416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { 414 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
417 ModelTypeSet requested_types; 415 ModelTypeSet requested_types;
418 for (ModelTypeInvalidationMap::const_iterator i = 416 for (ModelTypeInvalidationMap::const_iterator i =
419 job.session->source().types.begin(); 417 job->session()->source().types.begin();
420 i != job.session->source().types.end(); 418 i != job->session()->source().types.end();
421 ++i) { 419 ++i) {
422 requested_types.Put(i->first); 420 requested_types.Put(i->first);
423 } 421 }
424 422
423 // If all types are throttled, do not CONTINUE. Today, we don't treat
424 // a per-datatype "unthrottle" event as something that should force a
425 // canary job. For this reason, there's no good time to reschedule this job
426 // to run -- we'll lazily wait for an independent event to trigger a sync.
427 // Note that there may already be such an event if we're in a WaitInterval,
428 // so we can retry it then.
425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) 429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
426 return SAVE; 430 return SAVE;
427 } 431 }
428 432
429 if (wait_interval_.get()) 433 if (wait_interval_.get())
430 return DecideWhileInWaitInterval(job); 434 return DecideWhileInWaitInterval(job);
431 435
432 if (mode_ == CONFIGURATION_MODE) { 436 if (mode_ == CONFIGURATION_MODE) {
433 if (job.purpose == SyncSessionJob::NUDGE) 437 if (job->purpose() == SyncSessionJob::NUDGE)
434 return SAVE; 438 return SAVE; // Running requires a mode switch.
435 else if (job.purpose == SyncSessionJob::CONFIGURATION) 439 else if (job->purpose() == SyncSessionJob::CONFIGURATION)
436 return CONTINUE; 440 return CONTINUE;
437 else 441 else
438 return DROP; 442 return DROP;
439 } 443 }
440 444
441 // We are in normal mode. 445 // We are in normal mode.
442 DCHECK_EQ(mode_, NORMAL_MODE); 446 DCHECK_EQ(mode_, NORMAL_MODE);
443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); 447 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION);
444 448
445 // Note about some subtle scheduling semantics. 449 // Note about some subtle scheduling semantics.
446 // 450 //
447 // It's possible at this point that |job| is known to be unnecessary, and 451 // It's possible at this point that |job| is known to be unnecessary, and
448 // dropping it would be perfectly safe and correct. Consider 452 // dropping it would be perfectly safe and correct. Consider
449 // 453 //
450 // 1) |job| is a POLL with a |scheduled_start| time that is less than 454 // 1) |job| is a POLL with a |scheduled_start| time that is less than
451 // the time that the last successful all-datatype NUDGE completed. 455 // the time that the last successful all-datatype NUDGE completed.
452 // 456 //
453 // 2) |job| is a NUDGE (for any combination of types) with a 457 // 2) |job| is a NUDGE (for any combination of types) with a
(...skipping 22 matching lines...) Expand all
476 // * It's not strictly "impossible", but it would be reentrant and hence 480 // * It's not strictly "impossible", but it would be reentrant and hence
477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a 481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a
478 // legal side effect of any of the work being done as part of a sync cycle. 482 // legal side effect of any of the work being done as part of a sync cycle.
479 // See |no_scheduling_allowed_| for details. 483 // See |no_scheduling_allowed_| for details.
480 484
481 // Decision now rests on state of auth tokens. 485 // Decision now rests on state of auth tokens.
482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) 486 if (!session_context_->connection_manager()->HasInvalidAuthToken())
483 return CONTINUE; 487 return CONTINUE;
484 488
485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; 489 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; 490 // Running the job would require updated auth, so we can't honour
491 // job.scheduled_start().
492 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
487 } 493 }
488 494
489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { 495 bool SyncSchedulerImpl::ShouldRunJobSaveIfNecessary(SyncSessionJob* job) {
akalin 2012/10/17 17:39:17 the name for this is unclear -- "ShouldRunJob" imp
tim (not reviewing) 2012/10/17 20:33:32 SaveIfNecessary works, although there's also the c
akalin 2012/10/17 22:55:45 I see. See my comment below.
490 DCHECK_EQ(MessageLoop::current(), sync_loop_);
491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
492 if (pending_nudge_.get() == NULL) {
493 SDVLOG(2) << "Creating a pending nudge job";
494 SyncSession* s = job.session.get();
495
496 // Get a fresh session with similar configuration as before (resets
497 // StatusController).
498 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
499 s->delegate(), s->source(), s->routing_info(), s->workers()));
500
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
502 make_linked_ptr(session.release()), false,
503 ConfigurationParams(), job.from_here);
504 pending_nudge_.reset(new SyncSessionJob(new_job));
505 return;
506 }
507
508 SDVLOG(2) << "Coalescing a pending nudge";
509 pending_nudge_->session->Coalesce(*(job.session.get()));
510 pending_nudge_->scheduled_start = job.scheduled_start;
511
512 // Unfortunately the nudge location cannot be modified. So it stores the
513 // location of the first caller.
514 }
515
516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) {
517 DCHECK_EQ(MessageLoop::current(), sync_loop_); 496 DCHECK_EQ(MessageLoop::current(), sync_loop_);
518 DCHECK(started_); 497 DCHECK(started_);
519 498
520 JobProcessDecision decision = DecideOnJob(job); 499 JobProcessDecision decision = DecideOnJob(job);
521 SDVLOG(2) << "Should run " 500 SDVLOG(2) << "Should run "
522 << SyncSessionJob::GetPurposeString(job.purpose) 501 << SyncSessionJob::GetPurposeString(job->purpose())
523 << " job in mode " << GetModeString(mode_) 502 << " job " << job->session()
503 << " in mode " << GetModeString(mode_)
524 << ": " << GetDecisionString(decision); 504 << ": " << GetDecisionString(decision);
525 if (decision != SAVE) 505 if (decision == DROP)
526 return decision == CONTINUE; 506 return false;
507 if (decision == CONTINUE)
508 return true;
509 DCHECK_EQ(decision, SAVE);
527 510
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == 511 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
529 SyncSessionJob::CONFIGURATION); 512 if (is_nudge && pending_nudge_) {
513 SDVLOG(2) << "Coalescing a pending nudge";
514 // TODO(tim): This basically means we never use the more-careful coalescing
rlarocque 2012/10/08 19:18:05 After some more thought, I'm not sure this ever ma
515 // logic in ScheduleNudgeImpl that takes the min of the two nudge start
516 // times, because we're calling this function first. Pull this out
517 // into a function to coalesce + set start times and reuse.
518 pending_nudge_->mutable_session()->Coalesce(*(job->session()));
519 return false;
520 }
530 521
531 SaveJob(job); 522 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon();
523 if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
524 // This job should be made the new canary.
525 if (is_nudge) {
526 pending_nudge_ = job_to_save.get();
527 } else {
528 SDVLOG(2) << "Saving a configuration job";
529 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
530 DCHECK(!wait_interval_->pending_configure_job);
531 DCHECK_EQ(mode_, CONFIGURATION_MODE);
532 DCHECK(!job->config_params().ready_task.is_null());
533 // The only nudge that could exist is a scheduled canary nudge.
534 DCHECK(!unscheduled_nudge_storage_.get());
535 if (pending_nudge_) {
536 // Pre-empt the nudge canary and abandon the old nudge (owned by task).
537 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon();
538 pending_nudge_ = unscheduled_nudge_storage_.get();
539 }
540 wait_interval_->pending_configure_job = job_to_save.get();
541 }
542 TimeDelta length =
543 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
544 wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
545 TimeDelta::FromSeconds(0) : length;
546 RestartWaiting(job_to_save.Pass());
547 return false;
548 }
549
550 // Note that today there are no cases where we SAVE a CONFIGURATION job
551 // when we're not in a WaitInterval. See bug 147736.
552 DCHECK(is_nudge);
553 // There may or may not be a pending_configure_job. Either way this nudge
554 // is unschedulable.
555 pending_nudge_ = job_to_save.get();
556 unscheduled_nudge_storage_ = job_to_save.Pass();
532 return false; 557 return false;
533 } 558 }
534 559
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) {
536 DCHECK_EQ(MessageLoop::current(), sync_loop_);
537 if (job.purpose == SyncSessionJob::NUDGE) {
538 SDVLOG(2) << "Saving a nudge job";
539 InitOrCoalescePendingJob(job);
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
541 SDVLOG(2) << "Saving a configuration job";
542 DCHECK(wait_interval_.get());
543 DCHECK(mode_ == CONFIGURATION_MODE);
544
545 // Config params should always get set.
546 DCHECK(!job.config_params.ready_task.is_null());
547 SyncSession* old = job.session.get();
548 SyncSession* s(new SyncSession(session_context_, this, old->source(),
549 old->routing_info(), old->workers()));
550 SyncSessionJob new_job(job.purpose,
551 TimeTicks::Now(),
552 make_linked_ptr(s),
553 false,
554 job.config_params,
555 job.from_here);
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
557 } // drop the rest.
558 // TODO(sync): Is it okay to drop the rest? It's weird that
559 // SaveJob() only does what it says sometimes. (See
560 // http://crbug.com/90868.)
561 }
562
563 // Functor for std::find_if to search by ModelSafeGroup. 560 // Functor for std::find_if to search by ModelSafeGroup.
564 struct ModelSafeWorkerGroupIs { 561 struct ModelSafeWorkerGroupIs {
565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 562 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
566 bool operator()(ModelSafeWorker* w) { 563 bool operator()(ModelSafeWorker* w) {
567 return group == w->GetModelSafeGroup(); 564 return group == w->GetModelSafeGroup();
568 } 565 }
569 ModelSafeGroup group; 566 ModelSafeGroup group;
570 }; 567 };
571 568
572 void SyncSchedulerImpl::ScheduleNudgeAsync( 569 void SyncSchedulerImpl::ScheduleNudgeAsync(
573 const TimeDelta& delay, 570 const TimeDelta& desired_delay,
574 NudgeSource source, ModelTypeSet types, 571 NudgeSource source, ModelTypeSet types,
575 const tracked_objects::Location& nudge_location) { 572 const tracked_objects::Location& nudge_location) {
576 DCHECK_EQ(MessageLoop::current(), sync_loop_); 573 DCHECK_EQ(MessageLoop::current(), sync_loop_);
577 SDVLOG_LOC(nudge_location, 2) 574 SDVLOG_LOC(nudge_location, 2)
578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 575 << "Nudge scheduled with delay "
576 << desired_delay.InMilliseconds() << " ms, "
579 << "source " << GetNudgeSourceString(source) << ", " 577 << "source " << GetNudgeSourceString(source) << ", "
580 << "types " << ModelTypeSetToString(types); 578 << "types " << ModelTypeSetToString(types);
581 579
582 ModelTypeInvalidationMap invalidation_map = 580 ModelTypeInvalidationMap invalidation_map =
583 ModelTypeSetToInvalidationMap(types, std::string()); 581 ModelTypeSetToInvalidationMap(types, std::string());
584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, 582 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
585 GetUpdatesFromNudgeSource(source), 583 GetUpdatesFromNudgeSource(source),
586 invalidation_map, 584 invalidation_map,
587 false,
588 nudge_location); 585 nudge_location);
589 } 586 }
590 587
591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( 588 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
592 const TimeDelta& delay, 589 const TimeDelta& desired_delay,
593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, 590 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
594 const tracked_objects::Location& nudge_location) { 591 const tracked_objects::Location& nudge_location) {
595 DCHECK_EQ(MessageLoop::current(), sync_loop_); 592 DCHECK_EQ(MessageLoop::current(), sync_loop_);
596 SDVLOG_LOC(nudge_location, 2) 593 SDVLOG_LOC(nudge_location, 2)
597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 594 << "Nudge scheduled with delay "
595 << desired_delay.InMilliseconds() << " ms, "
598 << "source " << GetNudgeSourceString(source) << ", " 596 << "source " << GetNudgeSourceString(source) << ", "
599 << "payloads " 597 << "payloads "
600 << ModelTypeInvalidationMapToString(invalidation_map); 598 << ModelTypeInvalidationMapToString(invalidation_map);
601 599
602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, 600 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
603 GetUpdatesFromNudgeSource(source), 601 GetUpdatesFromNudgeSource(source),
604 invalidation_map, 602 invalidation_map,
605 false,
606 nudge_location); 603 nudge_location);
607 } 604 }
608 605
609 void SyncSchedulerImpl::ScheduleNudgeImpl( 606 void SyncSchedulerImpl::ScheduleNudgeImpl(
610 const TimeDelta& delay, 607 const TimeDelta& delay,
611 GetUpdatesCallerInfo::GetUpdatesSource source, 608 GetUpdatesCallerInfo::GetUpdatesSource source,
612 const ModelTypeInvalidationMap& invalidation_map, 609 const ModelTypeInvalidationMap& invalidation_map,
613 bool is_canary_job, const tracked_objects::Location& nudge_location) { 610 const tracked_objects::Location& nudge_location) {
614 DCHECK_EQ(MessageLoop::current(), sync_loop_); 611 DCHECK_EQ(MessageLoop::current(), sync_loop_);
615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; 612 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
616 613
617 SDVLOG_LOC(nudge_location, 2) 614 SDVLOG_LOC(nudge_location, 2)
618 << "In ScheduleNudgeImpl with delay " 615 << "In ScheduleNudgeImpl with delay "
619 << delay.InMilliseconds() << " ms, " 616 << delay.InMilliseconds() << " ms, "
620 << "source " << GetUpdatesSourceString(source) << ", " 617 << "source " << GetUpdatesSourceString(source) << ", "
621 << "payloads " 618 << "payloads "
622 << ModelTypeInvalidationMapToString(invalidation_map) 619 << ModelTypeInvalidationMapToString(invalidation_map);
623 << (is_canary_job ? " (canary)" : "");
624 620
625 SyncSourceInfo info(source, invalidation_map); 621 SyncSourceInfo info(source, invalidation_map);
626 UpdateNudgeTimeRecords(info); 622 UpdateNudgeTimeRecords(info);
627 623
628 SyncSession* session(CreateSyncSession(info)); 624 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 625 SyncSessionJob::NUDGE,
630 make_linked_ptr(session), is_canary_job, 626 TimeTicks::Now() + delay,
631 ConfigurationParams(), nudge_location); 627 CreateSyncSession(info).Pass(),
628 ConfigurationParams(),
629 nudge_location));
632 630
633 session = NULL; 631 if (!ShouldRunJobSaveIfNecessary(job.get()))
akalin 2012/10/17 17:39:17 Naively, I'd expect to be able to split this call
tim (not reviewing) 2012/10/17 20:33:32 Yes. !ShouldRunJob doesn't mean you can forget ab
akalin 2012/10/17 22:55:45 I see. See comment below...
634 if (!ShouldRunJob(job))
635 return; 632 return;
636 633
637 if (pending_nudge_.get()) { 634 if (pending_nudge_) {
638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
639 SDVLOG(2) << "Dropping the nudge because we are in backoff";
640 return;
641 }
642
643 SDVLOG(2) << "Coalescing pending nudge";
644 pending_nudge_->session->Coalesce(*(job.session.get()));
645
646 SDVLOG(2) << "Rescheduling pending nudge"; 635 SDVLOG(2) << "Rescheduling pending nudge";
647 SyncSession* s = pending_nudge_->session.get(); 636 pending_nudge_->mutable_session()->Coalesce(*(job->session()));
648 job.session.reset(new SyncSession(s->context(), s->delegate(), 637 // Choose the start time as the earliest of the 2. Note that this means
649 s->source(), s->routing_info(), s->workers())); 638 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
650 639 // but a nudge is already scheduled to go out, we'll send the (tab) commit
651 // Choose the start time as the earliest of the 2. 640 // without waiting.
652 job.scheduled_start = std::min(job.scheduled_start, 641 pending_nudge_->set_scheduled_start(
653 pending_nudge_->scheduled_start); 642 std::min(job->scheduled_start(), pending_nudge_->scheduled_start()));
654 pending_nudge_.reset(); 643 // Abandon the old task by cloning and replacing the session.
644 // It's possible that by "rescheduling" we're actually taking a job that
645 // was previously unscheduled and giving it wings, so take care to reset
646 // unscheduled nudge storage.
647 job = pending_nudge_->CloneAndAbandon();
648 unscheduled_nudge_storage_.reset();
649 pending_nudge_ = NULL;
655 } 650 }
656 651
657 // TODO(zea): Consider adding separate throttling/backoff for datatype 652 // TODO(zea): Consider adding separate throttling/backoff for datatype
658 // refresh requests. 653 // refresh requests.
659 ScheduleSyncSessionJob(job); 654 ScheduleSyncSessionJob(job.Pass());
akalin 2012/10/17 17:39:17 Isn't the job abandoned at this point? Do we stil
tim (not reviewing) 2012/10/17 20:33:32 Err, did you mean to put this comment elsewhere? T
akalin 2012/10/17 22:55:45 Right, I was misunderstanding the flow. So it loo
660 } 655 }
661 656
662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 657 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
663 switch (mode) { 658 switch (mode) {
664 ENUM_CASE(CONFIGURATION_MODE); 659 ENUM_CASE(CONFIGURATION_MODE);
665 ENUM_CASE(NORMAL_MODE); 660 ENUM_CASE(NORMAL_MODE);
666 } 661 }
667 return ""; 662 return "";
668 } 663 }
669 664
670 const char* SyncSchedulerImpl::GetDecisionString( 665 const char* SyncSchedulerImpl::GetDecisionString(
671 SyncSchedulerImpl::JobProcessDecision mode) { 666 SyncSchedulerImpl::JobProcessDecision mode) {
672 switch (mode) { 667 switch (mode) {
673 ENUM_CASE(CONTINUE); 668 ENUM_CASE(CONTINUE);
674 ENUM_CASE(SAVE); 669 ENUM_CASE(SAVE);
675 ENUM_CASE(DROP); 670 ENUM_CASE(DROP);
676 } 671 }
677 return ""; 672 return "";
678 } 673 }
679 674
680 // static
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose(
682 SyncSessionJob::SyncSessionJobPurpose purpose,
683 SyncerStep* start,
684 SyncerStep* end) {
685 switch (purpose) {
686 case SyncSessionJob::CONFIGURATION:
687 *start = DOWNLOAD_UPDATES;
688 *end = APPLY_UPDATES;
689 return;
690 case SyncSessionJob::NUDGE:
691 case SyncSessionJob::POLL:
692 *start = SYNCER_BEGIN;
693 *end = SYNCER_END;
694 return;
695 default:
696 NOTREACHED();
697 *start = SYNCER_END;
698 *end = SYNCER_END;
699 return;
700 }
701 }
702
703 void SyncSchedulerImpl::PostTask( 675 void SyncSchedulerImpl::PostTask(
704 const tracked_objects::Location& from_here, 676 const tracked_objects::Location& from_here,
705 const char* name, const base::Closure& task) { 677 const char* name, const base::Closure& task) {
706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; 678 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
707 DCHECK_EQ(MessageLoop::current(), sync_loop_); 679 DCHECK_EQ(MessageLoop::current(), sync_loop_);
708 if (!started_) { 680 if (!started_) {
709 SDVLOG(1) << "Not posting task as scheduler is stopped."; 681 SDVLOG(1) << "Not posting task as scheduler is stopped.";
710 return; 682 return;
711 } 683 }
712 sync_loop_->PostTask(from_here, task); 684 sync_loop_->PostTask(from_here, task);
713 } 685 }
714 686
715 void SyncSchedulerImpl::PostDelayedTask( 687 void SyncSchedulerImpl::PostDelayedTask(
716 const tracked_objects::Location& from_here, 688 const tracked_objects::Location& from_here,
717 const char* name, const base::Closure& task, base::TimeDelta delay) { 689 const char* name, const base::Closure& task, base::TimeDelta delay) {
718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " 690 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
719 << delay.InMilliseconds() << " ms delay"; 691 << delay.InMilliseconds() << " ms delay";
720 DCHECK_EQ(MessageLoop::current(), sync_loop_); 692 DCHECK_EQ(MessageLoop::current(), sync_loop_);
721 if (!started_) { 693 if (!started_) {
722 SDVLOG(1) << "Not posting task as scheduler is stopped."; 694 SDVLOG(1) << "Not posting task as scheduler is stopped.";
723 return; 695 return;
724 } 696 }
725 sync_loop_->PostDelayedTask(from_here, task, delay); 697 sync_loop_->PostDelayedTask(from_here, task, delay);
726 } 698 }
727 699
728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { 700 void SyncSchedulerImpl::ScheduleSyncSessionJob(
701 scoped_ptr<SyncSessionJob> job) {
729 DCHECK_EQ(MessageLoop::current(), sync_loop_); 702 DCHECK_EQ(MessageLoop::current(), sync_loop_);
730 if (no_scheduling_allowed_) { 703 if (no_scheduling_allowed_) {
731 NOTREACHED() << "Illegal to schedule job while session in progress."; 704 NOTREACHED() << "Illegal to schedule job while session in progress.";
732 return; 705 return;
733 } 706 }
734 707
735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); 708 TimeDelta delay = job->scheduled_start() - TimeTicks::Now();
709 tracked_objects::Location loc(job->from_location());
736 if (delay < TimeDelta::FromMilliseconds(0)) 710 if (delay < TimeDelta::FromMilliseconds(0))
737 delay = TimeDelta::FromMilliseconds(0); 711 delay = TimeDelta::FromMilliseconds(0);
738 SDVLOG_LOC(job.from_here, 2) 712 SDVLOG_LOC(loc, 2)
739 << "In ScheduleSyncSessionJob with " 713 << "In ScheduleSyncSessionJob with "
740 << SyncSessionJob::GetPurposeString(job.purpose) 714 << SyncSessionJob::GetPurposeString(job->purpose())
741 << " job and " << delay.InMilliseconds() << " ms delay"; 715 << " job and " << delay.InMilliseconds() << " ms delay";
742 716
743 DCHECK(job.purpose == SyncSessionJob::NUDGE || 717 DCHECK(job->purpose() == SyncSessionJob::NUDGE ||
744 job.purpose == SyncSessionJob::POLL); 718 job->purpose() == SyncSessionJob::POLL);
745 if (job.purpose == SyncSessionJob::NUDGE) { 719 if (job->purpose() == SyncSessionJob::NUDGE) {
746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; 720 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to ";
747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == 721 DCHECK(!pending_nudge_ || pending_nudge_->session() ==
748 job.session); 722 job->session());
749 pending_nudge_.reset(new SyncSessionJob(job)); 723 pending_nudge_ = job.get();
750 } 724 }
751 PostDelayedTask(job.from_here, "DoSyncSessionJob", 725
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, 726 PostDelayedTask(loc, "DoSyncSessionJob",
753 weak_ptr_factory_.GetWeakPtr(), 727 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob),
754 job), 728 weak_ptr_factory_.GetWeakPtr(),
755 delay); 729 base::Passed(&job)),
730 delay);
756 } 731 }
757 732
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { 733 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
759 DCHECK_EQ(MessageLoop::current(), sync_loop_); 734 DCHECK_EQ(MessageLoop::current(), sync_loop_);
735 if (job->purpose() == SyncSessionJob::NUDGE) {
736 if (pending_nudge_ == NULL ||
737 pending_nudge_->session() != job->session()) {
738 // |job| is abandoned.
739 SDVLOG(2) << "Dropping a nudge in "
740 << "DoSyncSessionJob because another nudge was scheduled";
741 return false;
742 }
743 pending_nudge_ = NULL;
744
745 // Rebase the session with the latest model safe table and use it to purge
746 // and update any disabled or modified entries in the job.
747 job->mutable_session()->RebaseRoutingInfoWithLatest(
748 session_context_->routing_info(), session_context_->workers());
749 }
760 750
761 AutoReset<bool> protector(&no_scheduling_allowed_, true); 751 AutoReset<bool> protector(&no_scheduling_allowed_, true);
762 if (!ShouldRunJob(job)) { 752 GetUpdatesCallerInfo::GetUpdatesSource source(
753 job->session()->source().updates_source);
754 if (!ShouldRunJobSaveIfNecessary(job.get())) {
763 SLOG(WARNING) 755 SLOG(WARNING)
764 << "Not executing " 756 << "Not executing "
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " 757 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from "
766 << GetUpdatesSourceString(job.session->source().updates_source); 758 << GetUpdatesSourceString(source);
767 return; 759 return false;
768 } 760 }
769 761
770 if (job.purpose == SyncSessionJob::NUDGE) {
771 if (pending_nudge_.get() == NULL ||
772 pending_nudge_->session != job.session) {
773 SDVLOG(2) << "Dropping a nudge in "
774 << "DoSyncSessionJob because another nudge was scheduled";
775 return; // Another nudge must have been scheduled in in the meantime.
776 }
777 pending_nudge_.reset();
778
779 // Create the session with the latest model safe table and use it to purge
780 // and update any disabled or modified entries in the job.
781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
782
783 job.session->RebaseRoutingInfoWithLatest(*session);
784 }
785 SDVLOG(2) << "DoSyncSessionJob with " 762 SDVLOG(2) << "DoSyncSessionJob with "
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; 763 << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
787
788 SyncerStep begin(SYNCER_END);
789 SyncerStep end(SYNCER_END);
790 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
791 764
792 bool has_more_to_sync = true; 765 bool has_more_to_sync = true;
793 while (ShouldRunJob(job) && has_more_to_sync) { 766 bool premature_exit = false;
767 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) {
794 SDVLOG(2) << "Calling SyncShare."; 768 SDVLOG(2) << "Calling SyncShare.";
795 // Synchronously perform the sync session from this thread. 769 // Synchronously perform the sync session from this thread.
796 syncer_->SyncShare(job.session.get(), begin, end); 770 premature_exit = !syncer_->SyncShare(job->mutable_session(),
797 has_more_to_sync = job.session->HasMoreToSync(); 771 job->start_step(),
772 job->end_step());
773
774 has_more_to_sync = job->session()->HasMoreToSync();
798 if (has_more_to_sync) 775 if (has_more_to_sync)
799 job.session->PrepareForAnotherSyncCycle(); 776 job->mutable_session()->PrepareForAnotherSyncCycle();
800 } 777 }
801 SDVLOG(2) << "Done SyncShare looping."; 778 SDVLOG(2) << "Done SyncShare looping.";
802 779
803 FinishSyncSessionJob(job); 780 return FinishSyncSessionJob(job.Pass(), premature_exit);
804 } 781 }
805 782
806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 783 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
807 DCHECK_EQ(MessageLoop::current(), sync_loop_); 784 DCHECK_EQ(MessageLoop::current(), sync_loop_);
808 785
809 // We are interested in recording time between local nudges for datatypes. 786 // We are interested in recording time between local nudges for datatypes.
810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 787 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 788 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
812 return; 789 return;
813 790
814 base::TimeTicks now = TimeTicks::Now(); 791 base::TimeTicks now = TimeTicks::Now();
815 // Update timing information for how often datatypes are triggering nudges. 792 // Update timing information for how often datatypes are triggering nudges.
816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); 793 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin();
817 iter != info.types.end(); 794 iter != info.types.end();
818 ++iter) { 795 ++iter) {
819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; 796 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first];
820 last_local_nudges_by_model_type_[iter->first] = now; 797 last_local_nudges_by_model_type_[iter->first] = now;
821 if (previous.is_null()) 798 if (previous.is_null())
822 continue; 799 continue;
823 800
824 #define PER_DATA_TYPE_MACRO(type_str) \ 801 #define PER_DATA_TYPE_MACRO(type_str) \
825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 802 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); 803 SYNC_DATA_TYPE_HISTOGRAM(iter->first);
827 #undef PER_DATA_TYPE_MACRO 804 #undef PER_DATA_TYPE_MACRO
828 } 805 }
829 } 806 }
830 807
831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { 808 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job,
809 bool exited_prematurely) {
832 DCHECK_EQ(MessageLoop::current(), sync_loop_); 810 DCHECK_EQ(MessageLoop::current(), sync_loop_);
833 // Now update the status of the connection from SCM. We need this to decide 811 // Now update the status of the connection from SCM. We need this to decide
834 // whether we need to save/run future jobs. The notifications from SCM are not 812 // whether we need to save/run future jobs. The notifications from SCM are
835 // reliable. 813 // not reliable.
836 // 814 //
837 // TODO(rlarocque): crbug.com/110954 815 // TODO(rlarocque): crbug.com/110954
838 // We should get rid of the notifications and it is probably not needed to 816 // We should get rid of the notifications and it is probably not needed to
839 // maintain this status variable in 2 places. We should query it directly from 817 // maintain this status variable in 2 places. We should query it directly
840 // SCM when needed. 818 // from SCM when needed.
841 ServerConnectionManager* scm = session_context_->connection_manager(); 819 ServerConnectionManager* scm = session_context_->connection_manager();
842 UpdateServerConnectionManagerStatus(scm->server_status()); 820 UpdateServerConnectionManagerStatus(scm->server_status());
843 821
844 if (IsSyncingCurrentlySilenced()) { 822 // Let job know that we're through syncing (calling SyncShare) at this point.
845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; 823 bool succeeded = false;
846 // TODO(sync): Investigate whether we need to check job.purpose 824 {
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
848 SaveJob(job);
849 return; // Nothing to do.
850 } else if (job.session->Succeeded() &&
851 !job.config_params.ready_task.is_null()) {
852 // If this was a configuration job with a ready task, invoke it now that
853 // we finished successfully.
854 AutoReset<bool> protector(&no_scheduling_allowed_, true); 825 AutoReset<bool> protector(&no_scheduling_allowed_, true);
855 job.config_params.ready_task.Run(); 826 succeeded = job->Finish(exited_prematurely);
856 } 827 }
857 828
858 SDVLOG(2) << "Updating the next polling time after SyncMain"; 829 SDVLOG(2) << "Updating the next polling time after SyncMain";
859 ScheduleNextSync(job); 830 ScheduleNextSync(job.Pass(), succeeded);
831 return succeeded;
860 } 832 }
861 833
862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { 834 void SyncSchedulerImpl::ScheduleNextSync(
835 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) {
863 DCHECK_EQ(MessageLoop::current(), sync_loop_); 836 DCHECK_EQ(MessageLoop::current(), sync_loop_);
864 DCHECK(!old_job.session->HasMoreToSync()); 837 DCHECK(!finished_job->session()->HasMoreToSync());
865 838
866 AdjustPolling(&old_job); 839 AdjustPolling(finished_job.get());
867 840
868 if (old_job.session->Succeeded()) { 841 if (succeeded) {
869 // Only reset backoff if we actually reached the server. 842 // Only reset backoff if we actually reached the server.
870 if (old_job.session->SuccessfullyReachedServer()) 843 // It's possible that we reached the server on one attempt, then had an
844 // error on the next (or didn't perform some of the server-communicating
845 // commands). We want to verify that, for all commands attempted, we
846 // successfully spoke with the server. Therefore, we verify no errors
847 // and at least one SYNCER_OK.
848 if (finished_job->session()->DidReachServer())
871 wait_interval_.reset(); 849 wait_interval_.reset();
872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; 850 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
873 return; 851 return;
874 } 852 }
875 853
876 if (old_job.purpose == SyncSessionJob::POLL) { 854 if (IsSyncingCurrentlySilenced()) {
855 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
856 // If we're here, it's because |job| was silenced until a server specified
857 // time. (Note, it had to be |job|, because DecideOnJob would not permit
858 // any job through while in WaitInterval::THROTTLED).
859 scoped_ptr<SyncSessionJob> clone = finished_job->Clone();
860 if (clone->purpose() == SyncSessionJob::NUDGE)
861 pending_nudge_ = clone.get();
862 else if (clone->purpose() == SyncSessionJob::CONFIGURATION)
863 wait_interval_->pending_configure_job = clone.get();
864 else
865 clone.reset(); // Unthrottling is enough, no need to force a canary.
866
867 RestartWaiting(clone.Pass());
868 return;
869 }
870
871 if (finished_job->purpose() == SyncSessionJob::POLL) {
877 return; // We don't retry POLL jobs. 872 return; // We don't retry POLL jobs.
878 } 873 }
879 874
880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry 875 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
881 // if we don't succeed. Some types of errors are not likely to disappear on 876 // if we don't succeed. Some types of errors are not likely to disappear on
882 // their own. With the return values now available in the old_job.session, we 877 // their own. With the return values now available in the old_job.session,
883 // should be able to detect such errors and only retry when we detect 878 // we should be able to detect such errors and only retry when we detect
884 // transient errors. 879 // transient errors.
885 880
886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && 881 if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
887 mode_ == NORMAL_MODE) { 882 mode_ == NORMAL_MODE) {
888 // When in normal mode, we allow up to one nudge per backoff interval. It 883 // When in normal mode, we allow up to one nudge per backoff interval. It
889 // appears that this was our nudge for this interval, and it failed. 884 // appears that this was our nudge for this interval, and it failed.
890 // 885 //
891 // Note: This does not prevent us from running canary jobs. For example, an 886 // Note: This does not prevent us from running canary jobs. For example,
892 // IP address change might still result in another nudge being executed 887 // an IP address change might still result in another nudge being executed
893 // during this backoff interval. 888 // during this backoff interval.
894 SDVLOG(2) << "A nudge during backoff failed"; 889 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
895 890 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
897 DCHECK(!wait_interval_->had_nudge); 891 DCHECK(!wait_interval_->had_nudge);
898 892
899 wait_interval_->had_nudge = true; 893 wait_interval_->had_nudge = true;
900 InitOrCoalescePendingJob(old_job); 894 DCHECK(!pending_nudge_);
901 RestartWaiting(); 895
896 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
897 pending_nudge_ = new_job.get();
898 RestartWaiting(new_job.Pass());
902 } else { 899 } else {
903 // Either this is the first failure or a consecutive failure after our 900 // Either this is the first failure or a consecutive failure after our
904 // backoff timer expired. We handle it the same way in either case. 901 // backoff timer expired. We handle it the same way in either case.
905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; 902 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
906 HandleContinuationError(old_job); 903 HandleContinuationError(finished_job.get());
907 } 904 }
908 } 905 }
909 906
910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { 907 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
911 DCHECK_EQ(MessageLoop::current(), sync_loop_); 908 DCHECK_EQ(MessageLoop::current(), sync_loop_);
912 909
913 TimeDelta poll = (!session_context_->notifications_enabled()) ? 910 TimeDelta poll = (!session_context_->notifications_enabled()) ?
914 syncer_short_poll_interval_seconds_ : 911 syncer_short_poll_interval_seconds_ :
915 syncer_long_poll_interval_seconds_; 912 syncer_long_poll_interval_seconds_;
916 bool rate_changed = !poll_timer_.IsRunning() || 913 bool rate_changed = !poll_timer_.IsRunning() ||
917 poll != poll_timer_.GetCurrentDelay(); 914 poll != poll_timer_.GetCurrentDelay();
918 915
919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) 916 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed)
920 poll_timer_.Reset(); 917 poll_timer_.Reset();
921 918
922 if (!rate_changed) 919 if (!rate_changed)
923 return; 920 return;
924 921
925 // Adjust poll rate. 922 // Adjust poll rate.
926 poll_timer_.Stop(); 923 poll_timer_.Stop();
927 poll_timer_.Start(FROM_HERE, poll, this, 924 poll_timer_.Start(FROM_HERE, poll, this,
928 &SyncSchedulerImpl::PollTimerCallback); 925 &SyncSchedulerImpl::PollTimerCallback);
929 } 926 }
930 927
931 void SyncSchedulerImpl::RestartWaiting() { 928 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
932 CHECK(wait_interval_.get()); 929 CHECK(wait_interval_.get());
933 wait_interval_->timer.Stop(); 930 wait_interval_->timer.Stop();
934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, 931 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
935 this, &SyncSchedulerImpl::DoCanaryJob); 932 if (wait_interval_->mode == WaitInterval::THROTTLED) {
933 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
934 base::Bind(&SyncSchedulerImpl::Unthrottle,
935 weak_ptr_factory_.GetWeakPtr(),
936 base::Passed(&job)));
937 } else {
938 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
939 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
940 weak_ptr_factory_.GetWeakPtr(),
941 base::Passed(&job)));
942 }
936 } 943 }
937 944
938 void SyncSchedulerImpl::HandleContinuationError( 945 void SyncSchedulerImpl::HandleContinuationError(
939 const SyncSessionJob& old_job) { 946 const SyncSessionJob* old_job) {
940 DCHECK_EQ(MessageLoop::current(), sync_loop_); 947 DCHECK_EQ(MessageLoop::current(), sync_loop_);
941 if (DCHECK_IS_ON()) { 948 if (DCHECK_IS_ON()) {
942 if (IsBackingOff()) { 949 if (IsBackingOff()) {
943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); 950 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary());
944 } 951 }
945 } 952 }
946 953
947 TimeDelta length = delay_provider_->GetDelay( 954 TimeDelta length = delay_provider_->GetDelay(
948 IsBackingOff() ? wait_interval_->length : 955 IsBackingOff() ? wait_interval_->length :
949 delay_provider_->GetInitialDelay( 956 delay_provider_->GetInitialDelay(
950 old_job.session->status_controller().model_neutral_state())); 957 old_job->session()->status_controller().model_neutral_state()));
951 958
952 SDVLOG(2) << "In handle continuation error with " 959 SDVLOG(2) << "In handle continuation error with "
953 << SyncSessionJob::GetPurposeString(old_job.purpose) 960 << SyncSessionJob::GetPurposeString(old_job->purpose())
954 << " job. The time delta(ms) is " 961 << " job. The time delta(ms) is "
955 << length.InMilliseconds(); 962 << length.InMilliseconds();
956 963
957 // This will reset the had_nudge variable as well. 964 // This will reset the had_nudge variable as well.
958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 965 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
959 length)); 966 length));
960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 967 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE));
968 new_job->set_scheduled_start(TimeTicks::Now() + length);
969 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; 970 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
962 // Config params should always get set. 971 // Config params should always get set.
963 DCHECK(!old_job.config_params.ready_task.is_null()); 972 DCHECK(!old_job->config_params().ready_task.is_null());
964 SyncSession* old = old_job.session.get(); 973 wait_interval_->pending_configure_job = new_job.get();
965 SyncSession* s(new SyncSession(session_context_, this,
966 old->source(), old->routing_info(), old->workers()));
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
968 make_linked_ptr(s), false, old_job.config_params,
969 FROM_HERE);
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
971 } else { 974 } else {
972 // We are not in configuration mode. So wait_interval's pending job 975 // We are not in configuration mode. So wait_interval's pending job
973 // should be null. 976 // should be null.
974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); 977 DCHECK(wait_interval_->pending_configure_job == NULL);
978 DCHECK(!pending_nudge_);
979 pending_nudge_ = new_job.get();
980 }
975 981
976 // TODO(lipalani) - handle clear user data. 982 RestartWaiting(new_job.Pass());
977 InitOrCoalescePendingJob(old_job);
978 }
979 RestartWaiting();
980 } 983 }
981 984
982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 985 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
983 syncer_->RequestEarlyExit(); // Safe to call from any thread. 986 syncer_->RequestEarlyExit(); // Safe to call from any thread.
984 DCHECK(weak_handle_this_.IsInitialized()); 987 DCHECK(weak_handle_this_.IsInitialized());
985 SDVLOG(3) << "Posting StopImpl"; 988 SDVLOG(3) << "Posting StopImpl";
986 weak_handle_this_.Call(FROM_HERE, 989 weak_handle_this_.Call(FROM_HERE,
987 &SyncSchedulerImpl::StopImpl, 990 &SyncSchedulerImpl::StopImpl,
988 callback); 991 callback);
989 } 992 }
990 993
991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 994 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
992 DCHECK_EQ(MessageLoop::current(), sync_loop_); 995 DCHECK_EQ(MessageLoop::current(), sync_loop_);
993 SDVLOG(2) << "StopImpl called"; 996 SDVLOG(2) << "StopImpl called";
994 997
995 // Kill any in-flight method calls. 998 // Kill any in-flight method calls.
996 weak_ptr_factory_.InvalidateWeakPtrs(); 999 weak_ptr_factory_.InvalidateWeakPtrs();
997 wait_interval_.reset(); 1000 wait_interval_.reset();
998 poll_timer_.Stop(); 1001 poll_timer_.Stop();
999 if (started_) { 1002 if (started_) {
1000 started_ = false; 1003 started_ = false;
1001 } 1004 }
1002 if (!callback.is_null()) 1005 if (!callback.is_null())
1003 callback.Run(); 1006 callback.Run();
1004 } 1007 }
1005 1008
1006 void SyncSchedulerImpl::DoCanaryJob() { 1009 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1010 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1008 SDVLOG(2) << "Do canary job"; 1011 SDVLOG(2) << "Do canary job";
1009 DoPendingJobIfPossible(true); 1012
1013 // Only set canary privileges here, when we are about to run the job. This
1014 // avoids confusion in managing canary bits during scheduling, when you
1015 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that
1016 // was scheduled as canary, and send it to an "unscheduled" state.
1017 to_be_canary->GrantCanaryPrivilege();
1018
1019 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) {
1020 // TODO(tim): We should be able to remove this...
1021 scoped_ptr<SyncSession> temp = CreateSyncSession(
1022 to_be_canary->session()->source()).Pass();
1023 // The routing info might have been changed since we cached the
1024 // pending nudge. Update it by coalescing to the latest.
1025 to_be_canary->mutable_session()->Coalesce(*(temp));
1026 }
1027 DoSyncSessionJob(to_be_canary.Pass());
1010 } 1028 }
1011 1029
1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { 1030 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1031 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1014 SyncSessionJob* job_to_execute = NULL; 1032 // If we find a scheduled pending_ job, abandon the old one and return a
1033 // a clone. If unscheduled, just hand over ownership.
1034 scoped_ptr<SyncSessionJob> candidate;
1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() 1035 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
1016 && wait_interval_->pending_configure_job.get()) { 1036 && wait_interval_->pending_configure_job) {
1017 SDVLOG(2) << "Found pending configure job"; 1037 SDVLOG(2) << "Found pending configure job";
1018 job_to_execute = wait_interval_->pending_configure_job.get(); 1038 candidate =
1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { 1039 wait_interval_->pending_configure_job->CloneAndAbandon().Pass();
1040 wait_interval_->pending_configure_job = candidate.get();
1041 } else if (mode_ == NORMAL_MODE && pending_nudge_) {
1020 SDVLOG(2) << "Found pending nudge job"; 1042 SDVLOG(2) << "Found pending nudge job";
1021 1043 candidate = pending_nudge_->CloneAndAbandon();
1022 scoped_ptr<SyncSession> session(CreateSyncSession( 1044 pending_nudge_ = candidate.get();
1023 pending_nudge_->session->source())); 1045 unscheduled_nudge_storage_.reset();
1024
1025 // Also the routing info might have been changed since we cached the
1026 // pending nudge. Update it by coalescing to the latest.
1027 pending_nudge_->session->Coalesce(*(session.get()));
1028 // The pending nudge would be cleared in the DoSyncSessionJob function.
1029 job_to_execute = pending_nudge_.get();
1030 } 1046 }
1031 1047 return candidate.Pass();
1032 if (job_to_execute != NULL) {
1033 SDVLOG(2) << "Executing pending job";
1034 SyncSessionJob copy = *job_to_execute;
1035 copy.is_canary_job = is_canary_job;
1036 DoSyncSessionJob(copy);
1037 }
1038 } 1048 }
1039 1049
1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( 1050 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession(
1041 const SyncSourceInfo& source) { 1051 const SyncSourceInfo& source) {
1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1052 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1043 DVLOG(2) << "Creating sync session with routes " 1053 DVLOG(2) << "Creating sync session with routes "
1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 1054 << ModelSafeRoutingInfoToString(session_context_->routing_info());
1045 1055
1046 SyncSourceInfo info(source); 1056 SyncSourceInfo info(source);
1047 SyncSession* session(new SyncSession(session_context_, this, info, 1057 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info,
1048 session_context_->routing_info(), session_context_->workers())); 1058 session_context_->routing_info(), session_context_->workers()));
1049
1050 return session;
1051 } 1059 }
1052 1060
1053 void SyncSchedulerImpl::PollTimerCallback() { 1061 void SyncSchedulerImpl::PollTimerCallback() {
1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1062 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1055 ModelSafeRoutingInfo r; 1063 ModelSafeRoutingInfo r;
1056 ModelTypeInvalidationMap invalidation_map = 1064 ModelTypeInvalidationMap invalidation_map =
1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); 1065 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); 1066 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
1059 SyncSession* s = CreateSyncSession(info); 1067 scoped_ptr<SyncSession> s(CreateSyncSession(info));
1060 1068 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), 1069 TimeTicks::Now(),
1062 make_linked_ptr(s), 1070 s.Pass(),
1063 false, 1071 ConfigurationParams(),
1064 ConfigurationParams(), 1072 FROM_HERE));
1065 FROM_HERE); 1073 ScheduleSyncSessionJob(job.Pass());
1066
1067 ScheduleSyncSessionJob(job);
1068 } 1074 }
1069 1075
1070 void SyncSchedulerImpl::Unthrottle() { 1076 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1077 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 1078 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1073 SDVLOG(2) << "Unthrottled."; 1079 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
1074 DoCanaryJob(); 1080 << "canary.";
1081 if (to_be_canary.get())
1082 DoCanaryJob(to_be_canary.Pass());
1083
1084 // TODO(tim): The way DecideOnJob works today, canary privileges aren't
1085 // enough to bypass a THROTTLED wait interval, which would suggest we need
1086 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is
1087 // probably the "right" thing to do). Bug 154216.
1075 wait_interval_.reset(); 1088 wait_interval_.reset();
1076 } 1089 }
1077 1090
1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 1091 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1092 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); 1093 session_context_->NotifyListeners(SyncEngineEvent(cause));
1081 } 1094 }
1082 1095
1083 bool SyncSchedulerImpl::IsBackingOff() const { 1096 bool SyncSchedulerImpl::IsBackingOff() const {
1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1097 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1085 return wait_interval_.get() && wait_interval_->mode == 1098 return wait_interval_.get() && wait_interval_->mode ==
1086 WaitInterval::EXPONENTIAL_BACKOFF; 1099 WaitInterval::EXPONENTIAL_BACKOFF;
1087 } 1100 }
1088 1101
1089 void SyncSchedulerImpl::OnSilencedUntil( 1102 void SyncSchedulerImpl::OnSilencedUntil(
1090 const base::TimeTicks& silenced_until) { 1103 const base::TimeTicks& silenced_until) {
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1104 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 1105 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
1093 silenced_until - TimeTicks::Now())); 1106 silenced_until - TimeTicks::Now()));
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
1095 &SyncSchedulerImpl::Unthrottle);
1096 } 1107 }
1097 1108
1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { 1109 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1110 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1100 return wait_interval_.get() && wait_interval_->mode == 1111 return wait_interval_.get() && wait_interval_->mode ==
1101 WaitInterval::THROTTLED; 1112 WaitInterval::THROTTLED;
1102 } 1113 }
1103 1114
1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 1115 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
1105 const base::TimeDelta& new_interval) { 1116 const base::TimeDelta& new_interval) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1159 1170
1160 #undef SDVLOG_LOC 1171 #undef SDVLOG_LOC
1161 1172
1162 #undef SDVLOG 1173 #undef SDVLOG
1163 1174
1164 #undef SLOG 1175 #undef SLOG
1165 1176
1166 #undef ENUM_CASE 1177 #undef ENUM_CASE
1167 1178
1168 } // namespace syncer 1179 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698