Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 11342008: Revert 164565 - sync: make scheduling logic and job ownership more obvious. (Closed) Base URL: svn://svn.chromium.org/chrome/branches/1311/src/
Patch Set: Created 8 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h" 12 #include "base/compiler_specific.h"
14 #include "base/location.h" 13 #include "base/location.h"
15 #include "base/logging.h" 14 #include "base/logging.h"
16 #include "base/message_loop.h" 15 #include "base/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h" 16 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h" 17 #include "sync/engine/syncer.h"
19 #include "sync/engine/throttled_data_type_tracker.h" 18 #include "sync/engine/throttled_data_type_tracker.h"
20 #include "sync/protocol/proto_enum_conversions.h" 19 #include "sync/protocol/proto_enum_conversions.h"
21 #include "sync/protocol/sync.pb.h" 20 #include "sync/protocol/sync.pb.h"
22 #include "sync/util/data_type_histogram.h" 21 #include "sync/util/data_type_histogram.h"
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 : source(source), 76 : source(source),
78 types_to_download(types_to_download), 77 types_to_download(types_to_download),
79 routing_info(routing_info), 78 routing_info(routing_info),
80 ready_task(ready_task) { 79 ready_task(ready_task) {
81 DCHECK(!ready_task.is_null()); 80 DCHECK(!ready_task.is_null());
82 } 81 }
83 ConfigurationParams::~ConfigurationParams() {} 82 ConfigurationParams::~ConfigurationParams() {}
84 83
85 SyncSchedulerImpl::WaitInterval::WaitInterval() 84 SyncSchedulerImpl::WaitInterval::WaitInterval()
86 : mode(UNKNOWN), 85 : mode(UNKNOWN),
87 had_nudge(false), 86 had_nudge(false) {
88 pending_configure_job(NULL) {} 87 }
89
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
91 : mode(mode), had_nudge(false), length(length),
92 pending_configure_job(NULL) {}
93 88
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
95 90
96 #define ENUM_CASE(x) case x: return #x; break; 91 #define ENUM_CASE(x) case x: return #x; break;
97 92
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
99 switch (mode) { 94 switch (mode) {
100 ENUM_CASE(UNKNOWN); 95 ENUM_CASE(UNKNOWN);
101 ENUM_CASE(EXPONENTIAL_BACKOFF); 96 ENUM_CASE(EXPONENTIAL_BACKOFF);
102 ENUM_CASE(THROTTLED); 97 ENUM_CASE(THROTTLED);
103 } 98 }
104 NOTREACHED(); 99 NOTREACHED();
105 return ""; 100 return "";
106 } 101 }
107 102
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob()
104 : purpose(UNKNOWN),
105 is_canary_job(false) {
106 }
107
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {}
109
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
111 base::TimeTicks start,
112 linked_ptr<sessions::SyncSession> session,
113 bool is_canary_job,
114 const ConfigurationParams& config_params,
115 const tracked_objects::Location& from_here)
116 : purpose(purpose),
117 scheduled_start(start),
118 session(session),
119 is_canary_job(is_canary_job),
120 config_params(config_params),
121 from_here(from_here) {
122 }
123
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString(
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
126 switch (purpose) {
127 ENUM_CASE(UNKNOWN);
128 ENUM_CASE(POLL);
129 ENUM_CASE(NUDGE);
130 ENUM_CASE(CONFIGURATION);
131 }
132 NOTREACHED();
133 return "";
134 }
135
108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
109 NudgeSource source) { 137 NudgeSource source) {
110 switch (source) { 138 switch (source) {
111 case NUDGE_SOURCE_NOTIFICATION: 139 case NUDGE_SOURCE_NOTIFICATION:
112 return GetUpdatesCallerInfo::NOTIFICATION; 140 return GetUpdatesCallerInfo::NOTIFICATION;
113 case NUDGE_SOURCE_LOCAL: 141 case NUDGE_SOURCE_LOCAL:
114 return GetUpdatesCallerInfo::LOCAL; 142 return GetUpdatesCallerInfo::LOCAL;
115 case NUDGE_SOURCE_CONTINUATION: 143 case NUDGE_SOURCE_CONTINUATION:
116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
117 case NUDGE_SOURCE_LOCAL_REFRESH: 145 case NUDGE_SOURCE_LOCAL_REFRESH:
118 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 146 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
119 case NUDGE_SOURCE_UNKNOWN: 147 case NUDGE_SOURCE_UNKNOWN:
120 return GetUpdatesCallerInfo::UNKNOWN; 148 return GetUpdatesCallerInfo::UNKNOWN;
121 default: 149 default:
122 NOTREACHED(); 150 NOTREACHED();
123 return GetUpdatesCallerInfo::UNKNOWN; 151 return GetUpdatesCallerInfo::UNKNOWN;
124 } 152 }
125 } 153 }
126 154
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
156 : mode(mode), had_nudge(false), length(length) { }
157
127 // Helper macros to log with the syncer thread name; useful when there 158 // Helper macros to log with the syncer thread name; useful when there
128 // are multiple syncer threads involved. 159 // are multiple syncer threads involved.
129 160
130 #define SLOG(severity) LOG(severity) << name_ << ": " 161 #define SLOG(severity) LOG(severity) << name_ << ": "
131 162
132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
133 164
134 #define SDVLOG_LOC(from_here, verbose_level) \ 165 #define SDVLOG_LOC(from_here, verbose_level) \
135 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 166 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
136 167
(...skipping 30 matching lines...) Expand all
167 syncer_short_poll_interval_seconds_( 198 syncer_short_poll_interval_seconds_(
168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
169 syncer_long_poll_interval_seconds_( 200 syncer_long_poll_interval_seconds_(
170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
171 sessions_commit_delay_( 202 sessions_commit_delay_(
172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
173 mode_(NORMAL_MODE), 204 mode_(NORMAL_MODE),
174 // Start with assuming everything is fine with the connection. 205 // Start with assuming everything is fine with the connection.
175 // At the end of the sync cycle we would have the correct status. 206 // At the end of the sync cycle we would have the correct status.
176 connection_code_(HttpResponse::SERVER_CONNECTION_OK), 207 connection_code_(HttpResponse::SERVER_CONNECTION_OK),
177 pending_nudge_(NULL),
178 delay_provider_(delay_provider), 208 delay_provider_(delay_provider),
179 syncer_(syncer), 209 syncer_(syncer),
180 session_context_(context), 210 session_context_(context),
181 no_scheduling_allowed_(false) { 211 no_scheduling_allowed_(false) {
182 DCHECK(sync_loop_); 212 DCHECK(sync_loop_);
183 } 213 }
184 214
185 SyncSchedulerImpl::~SyncSchedulerImpl() { 215 SyncSchedulerImpl::~SyncSchedulerImpl() {
186 DCHECK_EQ(MessageLoop::current(), sync_loop_); 216 DCHECK_EQ(MessageLoop::current(), sync_loop_);
187 StopImpl(base::Closure()); 217 StopImpl(base::Closure());
(...skipping 16 matching lines...) Expand all
204 void SyncSchedulerImpl::OnConnectionStatusChange() { 234 void SyncSchedulerImpl::OnConnectionStatusChange() {
205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { 235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) {
206 // Optimistically assume that the connection is fixed and try 236 // Optimistically assume that the connection is fixed and try
207 // connecting. 237 // connecting.
208 OnServerConnectionErrorFixed(); 238 OnServerConnectionErrorFixed();
209 } 239 }
210 } 240 }
211 241
212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; 243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
214 // There could be a pending nudge or configuration job in several cases:
215 //
216 // 1. We're in exponential backoff.
217 // 2. We're silenced / throttled.
218 // 3. A nudge was saved previously due to not having a valid auth token.
219 // 4. A nudge was scheduled + saved while in configuration mode.
220 //
221 // In all cases except (2), we want to retry contacting the server. We
222 // call DoCanaryJob to achieve this, and note that nothing -- not even a
223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
224 // has the authority to do that is the Unthrottle timer.
225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
226 if (!pending.get())
227 return;
228
229 PostTask(FROM_HERE, "DoCanaryJob", 244 PostTask(FROM_HERE, "DoCanaryJob",
230 base::Bind(&SyncSchedulerImpl::DoCanaryJob, 245 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
231 weak_ptr_factory_.GetWeakPtr(), 246 weak_ptr_factory_.GetWeakPtr()));
232 base::Passed(&pending))); 247
233 } 248 }
234 249
235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( 250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus(
236 HttpResponse::ServerConnectionCode code) { 251 HttpResponse::ServerConnectionCode code) {
237 DCHECK_EQ(MessageLoop::current(), sync_loop_); 252 DCHECK_EQ(MessageLoop::current(), sync_loop_);
238 SDVLOG(2) << "New server connection code: " 253 SDVLOG(2) << "New server connection code: "
239 << HttpResponse::GetServerConnectionCodeString(code); 254 << HttpResponse::GetServerConnectionCodeString(code);
240 255
241 connection_code_ = code; 256 connection_code_ = code;
242 } 257 }
(...skipping 15 matching lines...) Expand all
258 Mode old_mode = mode_; 273 Mode old_mode = mode_;
259 mode_ = mode; 274 mode_ = mode;
260 AdjustPolling(NULL); // Will kick start poll timer if needed. 275 AdjustPolling(NULL); // Will kick start poll timer if needed.
261 276
262 if (old_mode != mode_) { 277 if (old_mode != mode_) {
263 // We just changed our mode. See if there are any pending jobs that we could 278 // We just changed our mode. See if there are any pending jobs that we could
264 // execute in the new mode. 279 // execute in the new mode.
265 if (mode_ == NORMAL_MODE) { 280 if (mode_ == NORMAL_MODE) {
266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job 281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
267 // has not yet completed. 282 // has not yet completed.
268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); 283 DCHECK(!wait_interval_.get() ||
284 !wait_interval_->pending_configure_job.get());
269 } 285 }
270 286
271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); 287 DoPendingJobIfPossible(false);
272 if (pending.get()) {
273 // TODO(tim): We should be able to remove this...
274 scoped_ptr<SyncSession> session(CreateSyncSession(
275 pending->session()->source()));
276 // Also the routing info might have been changed since we cached the
277 // pending nudge. Update it by coalescing to the latest.
278 pending->mutable_session()->Coalesce(*session);
279 SDVLOG(2) << "Executing pending job. Good luck!";
280 DoSyncSessionJob(pending.Pass());
281 }
282 } 288 }
283 } 289 }
284 290
285 void SyncSchedulerImpl::SendInitialSnapshot() { 291 void SyncSchedulerImpl::SendInitialSnapshot() {
286 DCHECK_EQ(MessageLoop::current(), sync_loop_); 292 DCHECK_EQ(MessageLoop::current(), sync_loop_);
287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, 293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
288 SyncSourceInfo(), ModelSafeRoutingInfo(), 294 SyncSourceInfo(), ModelSafeRoutingInfo(),
289 std::vector<ModelSafeWorker*>())); 295 std::vector<ModelSafeWorker*>()));
290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
291 event.snapshot = dummy->TakeSnapshot(); 297 event.snapshot = dummy->TakeSnapshot();
(...skipping 23 matching lines...) Expand all
315 bool SyncSchedulerImpl::ScheduleConfiguration( 321 bool SyncSchedulerImpl::ScheduleConfiguration(
316 const ConfigurationParams& params) { 322 const ConfigurationParams& params) {
317 DCHECK_EQ(MessageLoop::current(), sync_loop_); 323 DCHECK_EQ(MessageLoop::current(), sync_loop_);
318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
319 DCHECK_EQ(CONFIGURATION_MODE, mode_); 325 DCHECK_EQ(CONFIGURATION_MODE, mode_);
320 DCHECK(!params.ready_task.is_null()); 326 DCHECK(!params.ready_task.is_null());
321 SDVLOG(2) << "Reconfiguring syncer."; 327 SDVLOG(2) << "Reconfiguring syncer.";
322 328
323 // Only one configuration is allowed at a time. Verify we're not waiting 329 // Only one configuration is allowed at a time. Verify we're not waiting
324 // for a pending configure job. 330 // for a pending configure job.
325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); 331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
326 332
327 ModelSafeRoutingInfo restricted_routes; 333 ModelSafeRoutingInfo restricted_routes;
328 BuildModelSafeParams(params.types_to_download, 334 BuildModelSafeParams(params.types_to_download,
329 params.routing_info, 335 params.routing_info,
330 &restricted_routes); 336 &restricted_routes);
331 session_context_->set_routing_info(params.routing_info); 337 session_context_->set_routing_info(params.routing_info);
332 338
333 // Only reconfigure if we have types to download. 339 // Only reconfigure if we have types to download.
334 if (!params.types_to_download.Empty()) { 340 if (!params.types_to_download.Empty()) {
335 DCHECK(!restricted_routes.empty()); 341 DCHECK(!restricted_routes.empty());
336 scoped_ptr<SyncSession> session(new SyncSession( 342 linked_ptr<SyncSession> session(new SyncSession(
337 session_context_, 343 session_context_,
338 this, 344 this,
339 SyncSourceInfo(params.source, 345 SyncSourceInfo(params.source,
340 ModelSafeRoutingInfoToInvalidationMap( 346 ModelSafeRoutingInfoToInvalidationMap(
341 restricted_routes, 347 restricted_routes,
342 std::string())), 348 std::string())),
343 restricted_routes, 349 restricted_routes,
344 session_context_->workers())); 350 session_context_->workers()));
345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 351 SyncSessionJob job(SyncSessionJob::CONFIGURATION,
346 SyncSessionJob::CONFIGURATION, 352 TimeTicks::Now(),
347 TimeTicks::Now(), 353 session,
348 session.Pass(), 354 false,
349 params, 355 params,
350 FROM_HERE)); 356 FROM_HERE);
351 bool succeeded = DoSyncSessionJob(job.Pass()); 357 DoSyncSessionJob(job);
352 358
353 // If we failed, the job would have been saved as the pending configure 359 // If we failed, the job would have been saved as the pending configure
354 // job and a wait interval would have been set. 360 // job and a wait interval would have been set.
355 if (!succeeded) { 361 if (!session->Succeeded()) {
356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); 362 DCHECK(wait_interval_.get() &&
363 wait_interval_->pending_configure_job.get());
357 return false; 364 return false;
358 } 365 }
359 } else { 366 } else {
360 SDVLOG(2) << "No change in routing info, calling ready task directly."; 367 SDVLOG(2) << "No change in routing info, calling ready task directly.";
361 params.ready_task.Run(); 368 params.ready_task.Run();
362 } 369 }
363 370
364 return true; 371 return true;
365 } 372 }
366 373
367 SyncSchedulerImpl::JobProcessDecision 374 SyncSchedulerImpl::JobProcessDecision
368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job) { 375 SyncSchedulerImpl::DecideWhileInWaitInterval(
376 const SyncSessionJob& job) {
369 DCHECK_EQ(MessageLoop::current(), sync_loop_); 377 DCHECK_EQ(MessageLoop::current(), sync_loop_);
370 DCHECK(wait_interval_.get()); 378 DCHECK(wait_interval_.get());
371 379
372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
373 << WaitInterval::GetModeString(wait_interval_->mode) 381 << WaitInterval::GetModeString(wait_interval_->mode)
374 << (wait_interval_->had_nudge ? " (had nudge)" : "") 382 << (wait_interval_->had_nudge ? " (had nudge)" : "")
375 << (job.is_canary() ? " (canary)" : ""); 383 << (job.is_canary_job ? " (canary)" : "");
376 384
377 if (job.purpose() == SyncSessionJob::POLL) 385 if (job.purpose == SyncSessionJob::POLL)
378 return DROP; 386 return DROP;
379 387
380 // If we save a job while in a WaitInterval, there is a well-defined moment 388 DCHECK(job.purpose == SyncSessionJob::NUDGE ||
381 // in time in the future when it makes sense for that SAVE-worthy job to try 389 job.purpose == SyncSessionJob::CONFIGURATION);
382 // running again -- the end of the WaitInterval.
383 DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
384 job.purpose() == SyncSessionJob::CONFIGURATION);
385
386 // If throttled, there's a clock ticking to unthrottle. We want to get
387 // on the same train.
388 if (wait_interval_->mode == WaitInterval::THROTTLED) 390 if (wait_interval_->mode == WaitInterval::THROTTLED)
389 return SAVE; 391 return SAVE;
390 392
391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
392 if (job.purpose() == SyncSessionJob::NUDGE) { 394 if (job.purpose == SyncSessionJob::NUDGE) {
393 if (mode_ == CONFIGURATION_MODE) 395 if (mode_ == CONFIGURATION_MODE)
394 return SAVE; 396 return SAVE;
395 397
396 // If we already had one nudge then just drop this nudge. We will retry 398 // If we already had one nudge then just drop this nudge. We will retry
397 // later when the timer runs out. 399 // later when the timer runs out.
398 if (!job.is_canary()) 400 if (!job.is_canary_job)
399 return wait_interval_->had_nudge ? DROP : CONTINUE; 401 return wait_interval_->had_nudge ? DROP : CONTINUE;
400 else // We are here because timer ran out. So retry. 402 else // We are here because timer ran out. So retry.
401 return CONTINUE; 403 return CONTINUE;
402 } 404 }
403 return job.is_canary() ? CONTINUE : SAVE; 405 return job.is_canary_job ? CONTINUE : SAVE;
404 } 406 }
405 407
406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
407 const SyncSessionJob& job) { 409 const SyncSessionJob& job) {
408 DCHECK_EQ(MessageLoop::current(), sync_loop_); 410 DCHECK_EQ(MessageLoop::current(), sync_loop_);
409 411
410 // See if our type is throttled. 412 // See if our type is throttled.
411 ModelTypeSet throttled_types = 413 ModelTypeSet throttled_types =
412 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); 414 session_context_->throttled_data_type_tracker()->GetThrottledTypes();
413 if (job.purpose() == SyncSessionJob::NUDGE && 415 if (job.purpose == SyncSessionJob::NUDGE &&
414 job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { 416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
415 ModelTypeSet requested_types; 417 ModelTypeSet requested_types;
416 for (ModelTypeInvalidationMap::const_iterator i = 418 for (ModelTypeInvalidationMap::const_iterator i =
417 job.session()->source().types.begin(); 419 job.session->source().types.begin();
418 i != job.session()->source().types.end(); 420 i != job.session->source().types.end();
419 ++i) { 421 ++i) {
420 requested_types.Put(i->first); 422 requested_types.Put(i->first);
421 } 423 }
422 424
423 // If all types are throttled, do not CONTINUE. Today, we don't treat
424 // a per-datatype "unthrottle" event as something that should force a
425 // canary job. For this reason, there's no good time to reschedule this job
426 // to run -- we'll lazily wait for an independent event to trigger a sync.
427 // Note that there may already be such an event if we're in a WaitInterval,
428 // so we can retry it then.
429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) 425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
430 return SAVE; 426 return SAVE;
431 } 427 }
432 428
433 if (wait_interval_.get()) 429 if (wait_interval_.get())
434 return DecideWhileInWaitInterval(job); 430 return DecideWhileInWaitInterval(job);
435 431
436 if (mode_ == CONFIGURATION_MODE) { 432 if (mode_ == CONFIGURATION_MODE) {
437 if (job.purpose() == SyncSessionJob::NUDGE) 433 if (job.purpose == SyncSessionJob::NUDGE)
438 return SAVE; // Running requires a mode switch. 434 return SAVE;
439 else if (job.purpose() == SyncSessionJob::CONFIGURATION) 435 else if (job.purpose == SyncSessionJob::CONFIGURATION)
440 return CONTINUE; 436 return CONTINUE;
441 else 437 else
442 return DROP; 438 return DROP;
443 } 439 }
444 440
445 // We are in normal mode. 441 // We are in normal mode.
446 DCHECK_EQ(mode_, NORMAL_MODE); 442 DCHECK_EQ(mode_, NORMAL_MODE);
447 DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION); 443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
448 444
449 // Note about some subtle scheduling semantics. 445 // Note about some subtle scheduling semantics.
450 // 446 //
451 // It's possible at this point that |job| is known to be unnecessary, and 447 // It's possible at this point that |job| is known to be unnecessary, and
452 // dropping it would be perfectly safe and correct. Consider 448 // dropping it would be perfectly safe and correct. Consider
453 // 449 //
454 // 1) |job| is a POLL with a |scheduled_start| time that is less than 450 // 1) |job| is a POLL with a |scheduled_start| time that is less than
455 // the time that the last successful all-datatype NUDGE completed. 451 // the time that the last successful all-datatype NUDGE completed.
456 // 452 //
457 // 2) |job| is a NUDGE (for any combination of types) with a 453 // 2) |job| is a NUDGE (for any combination of types) with a
(...skipping 22 matching lines...) Expand all
480 // * It's not strictly "impossible", but it would be reentrant and hence 476 // * It's not strictly "impossible", but it would be reentrant and hence
481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a 477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a
482 // legal side effect of any of the work being done as part of a sync cycle. 478 // legal side effect of any of the work being done as part of a sync cycle.
483 // See |no_scheduling_allowed_| for details. 479 // See |no_scheduling_allowed_| for details.
484 480
485 // Decision now rests on state of auth tokens. 481 // Decision now rests on state of auth tokens.
486 if (!session_context_->connection_manager()->HasInvalidAuthToken()) 482 if (!session_context_->connection_manager()->HasInvalidAuthToken())
487 return CONTINUE; 483 return CONTINUE;
488 484
489 SDVLOG(2) << "No valid auth token. Using that to decide on job."; 485 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
490 // Running the job would require updated auth, so we can't honour 486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
491 // job.scheduled_start().
492 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
493 } 487 }
494 488
495 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { 489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) {
496 DCHECK_EQ(DecideOnJob(*job), SAVE); 490 DCHECK_EQ(MessageLoop::current(), sync_loop_);
497 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; 491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
498 if (is_nudge && pending_nudge_) { 492 if (pending_nudge_.get() == NULL) {
499 SDVLOG(2) << "Coalescing a pending nudge"; 493 SDVLOG(2) << "Creating a pending nudge job";
500 // TODO(tim): This basically means we never use the more-careful coalescing 494 SyncSession* s = job.session.get();
501 // logic in ScheduleNudgeImpl that takes the min of the two nudge start 495
502 // times, because we're calling this function first. Pull this out 496 // Get a fresh session with similar configuration as before (resets
503 // into a function to coalesce + set start times and reuse. 497 // StatusController).
504 pending_nudge_->mutable_session()->Coalesce(*(job->session())); 498 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
499 s->delegate(), s->source(), s->routing_info(), s->workers()));
500
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
502 make_linked_ptr(session.release()), false,
503 ConfigurationParams(), job.from_here);
504 pending_nudge_.reset(new SyncSessionJob(new_job));
505 return; 505 return;
506 } 506 }
507 507
508 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); 508 SDVLOG(2) << "Coalescing a pending nudge";
509 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { 509 pending_nudge_->session->Coalesce(*(job.session.get()));
510 // This job should be made the new canary. 510 pending_nudge_->scheduled_start = job.scheduled_start;
511 if (is_nudge) {
512 pending_nudge_ = job_to_save.get();
513 } else {
514 SDVLOG(2) << "Saving a configuration job";
515 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
516 DCHECK(!wait_interval_->pending_configure_job);
517 DCHECK_EQ(mode_, CONFIGURATION_MODE);
518 DCHECK(!job->config_params().ready_task.is_null());
519 // The only nudge that could exist is a scheduled canary nudge.
520 DCHECK(!unscheduled_nudge_storage_.get());
521 if (pending_nudge_) {
522 // Pre-empt the nudge canary and abandon the old nudge (owned by task).
523 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon();
524 pending_nudge_ = unscheduled_nudge_storage_.get();
525 }
526 wait_interval_->pending_configure_job = job_to_save.get();
527 }
528 TimeDelta length =
529 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
530 wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
531 TimeDelta::FromSeconds(0) : length;
532 RestartWaiting(job_to_save.Pass());
533 return;
534 }
535 511
536 // Note that today there are no cases where we SAVE a CONFIGURATION job 512 // Unfortunately the nudge location cannot be modified. So it stores the
537 // when we're not in a WaitInterval. See bug 147736. 513 // location of the first caller.
538 DCHECK(is_nudge); 514 }
539 // There may or may not be a pending_configure_job. Either way this nudge 515
540 // is unschedulable. 516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) {
541 pending_nudge_ = job_to_save.get(); 517 DCHECK_EQ(MessageLoop::current(), sync_loop_);
542 unscheduled_nudge_storage_ = job_to_save.Pass(); 518 DCHECK(started_);
519
520 JobProcessDecision decision = DecideOnJob(job);
521 SDVLOG(2) << "Should run "
522 << SyncSessionJob::GetPurposeString(job.purpose)
523 << " job in mode " << GetModeString(mode_)
524 << ": " << GetDecisionString(decision);
525 if (decision != SAVE)
526 return decision == CONTINUE;
527
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
529 SyncSessionJob::CONFIGURATION);
530
531 SaveJob(job);
532 return false;
533 }
534
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) {
536 DCHECK_EQ(MessageLoop::current(), sync_loop_);
537 if (job.purpose == SyncSessionJob::NUDGE) {
538 SDVLOG(2) << "Saving a nudge job";
539 InitOrCoalescePendingJob(job);
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
541 SDVLOG(2) << "Saving a configuration job";
542 DCHECK(wait_interval_.get());
543 DCHECK(mode_ == CONFIGURATION_MODE);
544
545 // Config params should always get set.
546 DCHECK(!job.config_params.ready_task.is_null());
547 SyncSession* old = job.session.get();
548 SyncSession* s(new SyncSession(session_context_, this, old->source(),
549 old->routing_info(), old->workers()));
550 SyncSessionJob new_job(job.purpose,
551 TimeTicks::Now(),
552 make_linked_ptr(s),
553 false,
554 job.config_params,
555 job.from_here);
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
557 } // drop the rest.
558 // TODO(sync): Is it okay to drop the rest? It's weird that
559 // SaveJob() only does what it says sometimes. (See
560 // http://crbug.com/90868.)
543 } 561 }
544 562
545 // Functor for std::find_if to search by ModelSafeGroup. 563 // Functor for std::find_if to search by ModelSafeGroup.
546 struct ModelSafeWorkerGroupIs { 564 struct ModelSafeWorkerGroupIs {
547 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
548 bool operator()(ModelSafeWorker* w) { 566 bool operator()(ModelSafeWorker* w) {
549 return group == w->GetModelSafeGroup(); 567 return group == w->GetModelSafeGroup();
550 } 568 }
551 ModelSafeGroup group; 569 ModelSafeGroup group;
552 }; 570 };
553 571
554 void SyncSchedulerImpl::ScheduleNudgeAsync( 572 void SyncSchedulerImpl::ScheduleNudgeAsync(
555 const TimeDelta& desired_delay, 573 const TimeDelta& delay,
556 NudgeSource source, ModelTypeSet types, 574 NudgeSource source, ModelTypeSet types,
557 const tracked_objects::Location& nudge_location) { 575 const tracked_objects::Location& nudge_location) {
558 DCHECK_EQ(MessageLoop::current(), sync_loop_); 576 DCHECK_EQ(MessageLoop::current(), sync_loop_);
559 SDVLOG_LOC(nudge_location, 2) 577 SDVLOG_LOC(nudge_location, 2)
560 << "Nudge scheduled with delay " 578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
561 << desired_delay.InMilliseconds() << " ms, "
562 << "source " << GetNudgeSourceString(source) << ", " 579 << "source " << GetNudgeSourceString(source) << ", "
563 << "types " << ModelTypeSetToString(types); 580 << "types " << ModelTypeSetToString(types);
564 581
565 ModelTypeInvalidationMap invalidation_map = 582 ModelTypeInvalidationMap invalidation_map =
566 ModelTypeSetToInvalidationMap(types, std::string()); 583 ModelTypeSetToInvalidationMap(types, std::string());
567 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, 584 SyncSchedulerImpl::ScheduleNudgeImpl(delay,
568 GetUpdatesFromNudgeSource(source), 585 GetUpdatesFromNudgeSource(source),
569 invalidation_map, 586 invalidation_map,
587 false,
570 nudge_location); 588 nudge_location);
571 } 589 }
572 590
573 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( 591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
574 const TimeDelta& desired_delay, 592 const TimeDelta& delay,
575 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, 593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
576 const tracked_objects::Location& nudge_location) { 594 const tracked_objects::Location& nudge_location) {
577 DCHECK_EQ(MessageLoop::current(), sync_loop_); 595 DCHECK_EQ(MessageLoop::current(), sync_loop_);
578 SDVLOG_LOC(nudge_location, 2) 596 SDVLOG_LOC(nudge_location, 2)
579 << "Nudge scheduled with delay " 597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
580 << desired_delay.InMilliseconds() << " ms, "
581 << "source " << GetNudgeSourceString(source) << ", " 598 << "source " << GetNudgeSourceString(source) << ", "
582 << "payloads " 599 << "payloads "
583 << ModelTypeInvalidationMapToString(invalidation_map); 600 << ModelTypeInvalidationMapToString(invalidation_map);
584 601
585 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, 602 SyncSchedulerImpl::ScheduleNudgeImpl(delay,
586 GetUpdatesFromNudgeSource(source), 603 GetUpdatesFromNudgeSource(source),
587 invalidation_map, 604 invalidation_map,
605 false,
588 nudge_location); 606 nudge_location);
589 } 607 }
590 608
591 void SyncSchedulerImpl::ScheduleNudgeImpl( 609 void SyncSchedulerImpl::ScheduleNudgeImpl(
592 const TimeDelta& delay, 610 const TimeDelta& delay,
593 GetUpdatesCallerInfo::GetUpdatesSource source, 611 GetUpdatesCallerInfo::GetUpdatesSource source,
594 const ModelTypeInvalidationMap& invalidation_map, 612 const ModelTypeInvalidationMap& invalidation_map,
595 const tracked_objects::Location& nudge_location) { 613 bool is_canary_job, const tracked_objects::Location& nudge_location) {
596 DCHECK_EQ(MessageLoop::current(), sync_loop_); 614 DCHECK_EQ(MessageLoop::current(), sync_loop_);
597 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; 615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
598 616
599 SDVLOG_LOC(nudge_location, 2) 617 SDVLOG_LOC(nudge_location, 2)
600 << "In ScheduleNudgeImpl with delay " 618 << "In ScheduleNudgeImpl with delay "
601 << delay.InMilliseconds() << " ms, " 619 << delay.InMilliseconds() << " ms, "
602 << "source " << GetUpdatesSourceString(source) << ", " 620 << "source " << GetUpdatesSourceString(source) << ", "
603 << "payloads " 621 << "payloads "
604 << ModelTypeInvalidationMapToString(invalidation_map); 622 << ModelTypeInvalidationMapToString(invalidation_map)
623 << (is_canary_job ? " (canary)" : "");
605 624
606 SyncSourceInfo info(source, invalidation_map); 625 SyncSourceInfo info(source, invalidation_map);
607 UpdateNudgeTimeRecords(info); 626 UpdateNudgeTimeRecords(info);
608 627
609 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 628 SyncSession* session(CreateSyncSession(info));
610 SyncSessionJob::NUDGE, 629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
611 TimeTicks::Now() + delay, 630 make_linked_ptr(session), is_canary_job,
612 CreateSyncSession(info).Pass(), 631 ConfigurationParams(), nudge_location);
613 ConfigurationParams(),
614 nudge_location));
615 632
616 JobProcessDecision decision = DecideOnJob(*job); 633 session = NULL;
617 SDVLOG(2) << "Should run " 634 if (!ShouldRunJob(job))
618 << SyncSessionJob::GetPurposeString(job->purpose()) 635 return;
619 << " job " << job->session() 636
620 << " in mode " << GetModeString(mode_) 637 if (pending_nudge_.get()) {
621 << ": " << GetDecisionString(decision); 638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
622 if (decision != CONTINUE) { 639 SDVLOG(2) << "Dropping the nudge because we are in backoff";
623 // End of the line, though we may save the job for later. 640 return;
624 if (decision == SAVE) {
625 HandleSaveJobDecision(job.Pass());
626 } else {
627 DCHECK_EQ(decision, DROP);
628 } 641 }
629 return;
630 }
631 642
632 if (pending_nudge_) { 643 SDVLOG(2) << "Coalescing pending nudge";
644 pending_nudge_->session->Coalesce(*(job.session.get()));
645
633 SDVLOG(2) << "Rescheduling pending nudge"; 646 SDVLOG(2) << "Rescheduling pending nudge";
634 pending_nudge_->mutable_session()->Coalesce(*(job->session())); 647 SyncSession* s = pending_nudge_->session.get();
635 // Choose the start time as the earliest of the 2. Note that this means 648 job.session.reset(new SyncSession(s->context(), s->delegate(),
636 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) 649 s->source(), s->routing_info(), s->workers()));
637 // but a nudge is already scheduled to go out, we'll send the (tab) commit 650
638 // without waiting. 651 // Choose the start time as the earliest of the 2.
639 pending_nudge_->set_scheduled_start( 652 job.scheduled_start = std::min(job.scheduled_start,
640 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); 653 pending_nudge_->scheduled_start);
641 // Abandon the old task by cloning and replacing the session. 654 pending_nudge_.reset();
642 // It's possible that by "rescheduling" we're actually taking a job that
643 // was previously unscheduled and giving it wings, so take care to reset
644 // unscheduled nudge storage.
645 job = pending_nudge_->CloneAndAbandon();
646 unscheduled_nudge_storage_.reset();
647 pending_nudge_ = NULL;
648 } 655 }
649 656
650 // TODO(zea): Consider adding separate throttling/backoff for datatype 657 // TODO(zea): Consider adding separate throttling/backoff for datatype
651 // refresh requests. 658 // refresh requests.
652 ScheduleSyncSessionJob(job.Pass()); 659 ScheduleSyncSessionJob(job);
653 } 660 }
654 661
655 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
656 switch (mode) { 663 switch (mode) {
657 ENUM_CASE(CONFIGURATION_MODE); 664 ENUM_CASE(CONFIGURATION_MODE);
658 ENUM_CASE(NORMAL_MODE); 665 ENUM_CASE(NORMAL_MODE);
659 } 666 }
660 return ""; 667 return "";
661 } 668 }
662 669
663 const char* SyncSchedulerImpl::GetDecisionString( 670 const char* SyncSchedulerImpl::GetDecisionString(
664 SyncSchedulerImpl::JobProcessDecision mode) { 671 SyncSchedulerImpl::JobProcessDecision mode) {
665 switch (mode) { 672 switch (mode) {
666 ENUM_CASE(CONTINUE); 673 ENUM_CASE(CONTINUE);
667 ENUM_CASE(SAVE); 674 ENUM_CASE(SAVE);
668 ENUM_CASE(DROP); 675 ENUM_CASE(DROP);
669 } 676 }
670 return ""; 677 return "";
671 } 678 }
672 679
680 // static
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose(
682 SyncSessionJob::SyncSessionJobPurpose purpose,
683 SyncerStep* start,
684 SyncerStep* end) {
685 switch (purpose) {
686 case SyncSessionJob::CONFIGURATION:
687 *start = DOWNLOAD_UPDATES;
688 *end = APPLY_UPDATES;
689 return;
690 case SyncSessionJob::NUDGE:
691 case SyncSessionJob::POLL:
692 *start = SYNCER_BEGIN;
693 *end = SYNCER_END;
694 return;
695 default:
696 NOTREACHED();
697 *start = SYNCER_END;
698 *end = SYNCER_END;
699 return;
700 }
701 }
702
673 void SyncSchedulerImpl::PostTask( 703 void SyncSchedulerImpl::PostTask(
674 const tracked_objects::Location& from_here, 704 const tracked_objects::Location& from_here,
675 const char* name, const base::Closure& task) { 705 const char* name, const base::Closure& task) {
676 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; 706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
677 DCHECK_EQ(MessageLoop::current(), sync_loop_); 707 DCHECK_EQ(MessageLoop::current(), sync_loop_);
678 if (!started_) { 708 if (!started_) {
679 SDVLOG(1) << "Not posting task as scheduler is stopped."; 709 SDVLOG(1) << "Not posting task as scheduler is stopped.";
680 return; 710 return;
681 } 711 }
682 sync_loop_->PostTask(from_here, task); 712 sync_loop_->PostTask(from_here, task);
683 } 713 }
684 714
685 void SyncSchedulerImpl::PostDelayedTask( 715 void SyncSchedulerImpl::PostDelayedTask(
686 const tracked_objects::Location& from_here, 716 const tracked_objects::Location& from_here,
687 const char* name, const base::Closure& task, base::TimeDelta delay) { 717 const char* name, const base::Closure& task, base::TimeDelta delay) {
688 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " 718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
689 << delay.InMilliseconds() << " ms delay"; 719 << delay.InMilliseconds() << " ms delay";
690 DCHECK_EQ(MessageLoop::current(), sync_loop_); 720 DCHECK_EQ(MessageLoop::current(), sync_loop_);
691 if (!started_) { 721 if (!started_) {
692 SDVLOG(1) << "Not posting task as scheduler is stopped."; 722 SDVLOG(1) << "Not posting task as scheduler is stopped.";
693 return; 723 return;
694 } 724 }
695 sync_loop_->PostDelayedTask(from_here, task, delay); 725 sync_loop_->PostDelayedTask(from_here, task, delay);
696 } 726 }
697 727
698 void SyncSchedulerImpl::ScheduleSyncSessionJob( 728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) {
699 scoped_ptr<SyncSessionJob> job) {
700 DCHECK_EQ(MessageLoop::current(), sync_loop_); 729 DCHECK_EQ(MessageLoop::current(), sync_loop_);
701 if (no_scheduling_allowed_) { 730 if (no_scheduling_allowed_) {
702 NOTREACHED() << "Illegal to schedule job while session in progress."; 731 NOTREACHED() << "Illegal to schedule job while session in progress.";
703 return; 732 return;
704 } 733 }
705 734
706 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); 735 TimeDelta delay = job.scheduled_start - TimeTicks::Now();
707 tracked_objects::Location loc(job->from_location());
708 if (delay < TimeDelta::FromMilliseconds(0)) 736 if (delay < TimeDelta::FromMilliseconds(0))
709 delay = TimeDelta::FromMilliseconds(0); 737 delay = TimeDelta::FromMilliseconds(0);
710 SDVLOG_LOC(loc, 2) 738 SDVLOG_LOC(job.from_here, 2)
711 << "In ScheduleSyncSessionJob with " 739 << "In ScheduleSyncSessionJob with "
712 << SyncSessionJob::GetPurposeString(job->purpose()) 740 << SyncSessionJob::GetPurposeString(job.purpose)
713 << " job and " << delay.InMilliseconds() << " ms delay"; 741 << " job and " << delay.InMilliseconds() << " ms delay";
714 742
715 DCHECK(job->purpose() == SyncSessionJob::NUDGE || 743 DCHECK(job.purpose == SyncSessionJob::NUDGE ||
716 job->purpose() == SyncSessionJob::POLL); 744 job.purpose == SyncSessionJob::POLL);
717 if (job->purpose() == SyncSessionJob::NUDGE) { 745 if (job.purpose == SyncSessionJob::NUDGE) {
718 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to "; 746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge";
719 DCHECK(!pending_nudge_ || pending_nudge_->session() == 747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() ==
720 job->session()); 748 job.session);
721 pending_nudge_ = job.get(); 749 pending_nudge_.reset(new SyncSessionJob(job));
750 }
751 PostDelayedTask(job.from_here, "DoSyncSessionJob",
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob,
753 weak_ptr_factory_.GetWeakPtr(),
754 job),
755 delay);
756 }
757
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) {
759 DCHECK_EQ(MessageLoop::current(), sync_loop_);
760
761 AutoReset<bool> protector(&no_scheduling_allowed_, true);
762 if (!ShouldRunJob(job)) {
763 SLOG(WARNING)
764 << "Not executing "
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from "
766 << GetUpdatesSourceString(job.session->source().updates_source);
767 return;
722 } 768 }
723 769
724 PostDelayedTask(loc, "DoSyncSessionJob", 770 if (job.purpose == SyncSessionJob::NUDGE) {
725 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), 771 if (pending_nudge_.get() == NULL ||
726 weak_ptr_factory_.GetWeakPtr(), 772 pending_nudge_->session != job.session) {
727 base::Passed(&job)),
728 delay);
729 }
730
731 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
732 DCHECK_EQ(MessageLoop::current(), sync_loop_);
733 if (job->purpose() == SyncSessionJob::NUDGE) {
734 if (pending_nudge_ == NULL ||
735 pending_nudge_->session() != job->session()) {
736 // |job| is abandoned.
737 SDVLOG(2) << "Dropping a nudge in " 773 SDVLOG(2) << "Dropping a nudge in "
738 << "DoSyncSessionJob because another nudge was scheduled"; 774 << "DoSyncSessionJob because another nudge was scheduled";
739 return false; 775 return; // Another nudge must have been scheduled in in the meantime.
740 } 776 }
741 pending_nudge_ = NULL; 777 pending_nudge_.reset();
742 778
743 // Rebase the session with the latest model safe table and use it to purge 779 // Create the session with the latest model safe table and use it to purge
744 // and update any disabled or modified entries in the job. 780 // and update any disabled or modified entries in the job.
745 job->mutable_session()->RebaseRoutingInfoWithLatest( 781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
746 session_context_->routing_info(), session_context_->workers()); 782
783 job.session->RebaseRoutingInfoWithLatest(*session);
747 } 784 }
785 SDVLOG(2) << "DoSyncSessionJob with "
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job";
748 787
749 AutoReset<bool> protector(&no_scheduling_allowed_, true); 788 SyncerStep begin(SYNCER_END);
750 JobProcessDecision decision = DecideOnJob(*job); 789 SyncerStep end(SYNCER_END);
751 SDVLOG(2) << "Should run " 790 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
752 << SyncSessionJob::GetPurposeString(job->purpose())
753 << " job " << job->session()
754 << " in mode " << GetModeString(mode_)
755 << " with source " << job->session()->source().updates_source
756 << ": " << GetDecisionString(decision);
757 if (decision != CONTINUE) {
758 if (decision == SAVE) {
759 HandleSaveJobDecision(job.Pass());
760 } else {
761 DCHECK_EQ(decision, DROP);
762 }
763 return false;
764 }
765
766 SDVLOG(2) << "DoSyncSessionJob with "
767 << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
768 791
769 bool has_more_to_sync = true; 792 bool has_more_to_sync = true;
770 bool premature_exit = false; 793 while (ShouldRunJob(job) && has_more_to_sync) {
771 while (DecideOnJob(*job) == CONTINUE && has_more_to_sync) {
772 SDVLOG(2) << "Calling SyncShare."; 794 SDVLOG(2) << "Calling SyncShare.";
773 // Synchronously perform the sync session from this thread. 795 // Synchronously perform the sync session from this thread.
774 premature_exit = !syncer_->SyncShare(job->mutable_session(), 796 syncer_->SyncShare(job.session.get(), begin, end);
775 job->start_step(), 797 has_more_to_sync = job.session->HasMoreToSync();
776 job->end_step());
777
778 has_more_to_sync = job->session()->HasMoreToSync();
779 if (has_more_to_sync) 798 if (has_more_to_sync)
780 job->mutable_session()->PrepareForAnotherSyncCycle(); 799 job.session->PrepareForAnotherSyncCycle();
781 } 800 }
782 SDVLOG(2) << "Done SyncShare looping."; 801 SDVLOG(2) << "Done SyncShare looping.";
783 802
784 return FinishSyncSessionJob(job.Pass(), premature_exit); 803 FinishSyncSessionJob(job);
785 } 804 }
786 805
787 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
788 DCHECK_EQ(MessageLoop::current(), sync_loop_); 807 DCHECK_EQ(MessageLoop::current(), sync_loop_);
789 808
790 // We are interested in recording time between local nudges for datatypes. 809 // We are interested in recording time between local nudges for datatypes.
791 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
792 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
793 return; 812 return;
794 813
795 base::TimeTicks now = TimeTicks::Now(); 814 base::TimeTicks now = TimeTicks::Now();
796 // Update timing information for how often datatypes are triggering nudges. 815 // Update timing information for how often datatypes are triggering nudges.
797 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); 816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin();
798 iter != info.types.end(); 817 iter != info.types.end();
799 ++iter) { 818 ++iter) {
800 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; 819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first];
801 last_local_nudges_by_model_type_[iter->first] = now; 820 last_local_nudges_by_model_type_[iter->first] = now;
802 if (previous.is_null()) 821 if (previous.is_null())
803 continue; 822 continue;
804 823
805 #define PER_DATA_TYPE_MACRO(type_str) \ 824 #define PER_DATA_TYPE_MACRO(type_str) \
806 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
807 SYNC_DATA_TYPE_HISTOGRAM(iter->first); 826 SYNC_DATA_TYPE_HISTOGRAM(iter->first);
808 #undef PER_DATA_TYPE_MACRO 827 #undef PER_DATA_TYPE_MACRO
809 } 828 }
810 } 829 }
811 830
812 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, 831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) {
813 bool exited_prematurely) {
814 DCHECK_EQ(MessageLoop::current(), sync_loop_); 832 DCHECK_EQ(MessageLoop::current(), sync_loop_);
815 // Now update the status of the connection from SCM. We need this to decide 833 // Now update the status of the connection from SCM. We need this to decide
816 // whether we need to save/run future jobs. The notifications from SCM are 834 // whether we need to save/run future jobs. The notifications from SCM are not
817 // not reliable. 835 // reliable.
818 // 836 //
819 // TODO(rlarocque): crbug.com/110954 837 // TODO(rlarocque): crbug.com/110954
820 // We should get rid of the notifications and it is probably not needed to 838 // We should get rid of the notifications and it is probably not needed to
821 // maintain this status variable in 2 places. We should query it directly 839 // maintain this status variable in 2 places. We should query it directly from
822 // from SCM when needed. 840 // SCM when needed.
823 ServerConnectionManager* scm = session_context_->connection_manager(); 841 ServerConnectionManager* scm = session_context_->connection_manager();
824 UpdateServerConnectionManagerStatus(scm->server_status()); 842 UpdateServerConnectionManagerStatus(scm->server_status());
825 843
826 // Let job know that we're through syncing (calling SyncShare) at this point. 844 if (IsSyncingCurrentlySilenced()) {
827 bool succeeded = false; 845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync.";
828 { 846 // TODO(sync): Investigate whether we need to check job.purpose
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
848 SaveJob(job);
849 return; // Nothing to do.
850 } else if (job.session->Succeeded() &&
851 !job.config_params.ready_task.is_null()) {
852 // If this was a configuration job with a ready task, invoke it now that
853 // we finished successfully.
829 AutoReset<bool> protector(&no_scheduling_allowed_, true); 854 AutoReset<bool> protector(&no_scheduling_allowed_, true);
830 succeeded = job->Finish(exited_prematurely); 855 job.config_params.ready_task.Run();
831 } 856 }
832 857
833 SDVLOG(2) << "Updating the next polling time after SyncMain"; 858 SDVLOG(2) << "Updating the next polling time after SyncMain";
834 ScheduleNextSync(job.Pass(), succeeded); 859 ScheduleNextSync(job);
835 return succeeded;
836 } 860 }
837 861
838 void SyncSchedulerImpl::ScheduleNextSync( 862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) {
839 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) {
840 DCHECK_EQ(MessageLoop::current(), sync_loop_); 863 DCHECK_EQ(MessageLoop::current(), sync_loop_);
841 DCHECK(!finished_job->session()->HasMoreToSync()); 864 DCHECK(!old_job.session->HasMoreToSync());
842 865
843 AdjustPolling(finished_job.get()); 866 AdjustPolling(&old_job);
844 867
845 if (succeeded) { 868 if (old_job.session->Succeeded()) {
846 // Only reset backoff if we actually reached the server. 869 // Only reset backoff if we actually reached the server.
847 // It's possible that we reached the server on one attempt, then had an 870 if (old_job.session->SuccessfullyReachedServer())
848 // error on the next (or didn't perform some of the server-communicating
849 // commands). We want to verify that, for all commands attempted, we
850 // successfully spoke with the server. Therefore, we verify no errors
851 // and at least one SYNCER_OK.
852 if (finished_job->session()->DidReachServer())
853 wait_interval_.reset(); 871 wait_interval_.reset();
854 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; 872 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
855 return; 873 return;
856 } 874 }
857 875
858 if (IsSyncingCurrentlySilenced()) { 876 if (old_job.purpose == SyncSessionJob::POLL) {
859 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
860 // If we're here, it's because |job| was silenced until a server specified
861 // time. (Note, it had to be |job|, because DecideOnJob would not permit
862 // any job through while in WaitInterval::THROTTLED).
863 scoped_ptr<SyncSessionJob> clone = finished_job->Clone();
864 if (clone->purpose() == SyncSessionJob::NUDGE)
865 pending_nudge_ = clone.get();
866 else if (clone->purpose() == SyncSessionJob::CONFIGURATION)
867 wait_interval_->pending_configure_job = clone.get();
868 else
869 clone.reset(); // Unthrottling is enough, no need to force a canary.
870
871 RestartWaiting(clone.Pass());
872 return;
873 }
874
875 if (finished_job->purpose() == SyncSessionJob::POLL) {
876 return; // We don't retry POLL jobs. 877 return; // We don't retry POLL jobs.
877 } 878 }
878 879
879 // TODO(rlarocque): There's no reason why we should blindly backoff and retry 880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
880 // if we don't succeed. Some types of errors are not likely to disappear on 881 // if we don't succeed. Some types of errors are not likely to disappear on
881 // their own. With the return values now available in the old_job.session, 882 // their own. With the return values now available in the old_job.session, we
882 // we should be able to detect such errors and only retry when we detect 883 // should be able to detect such errors and only retry when we detect
883 // transient errors. 884 // transient errors.
884 885
885 if (IsBackingOff() && wait_interval_->timer.IsRunning() && 886 if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
886 mode_ == NORMAL_MODE) { 887 mode_ == NORMAL_MODE) {
887 // When in normal mode, we allow up to one nudge per backoff interval. It 888 // When in normal mode, we allow up to one nudge per backoff interval. It
888 // appears that this was our nudge for this interval, and it failed. 889 // appears that this was our nudge for this interval, and it failed.
889 // 890 //
890 // Note: This does not prevent us from running canary jobs. For example, 891 // Note: This does not prevent us from running canary jobs. For example, an
891 // an IP address change might still result in another nudge being executed 892 // IP address change might still result in another nudge being executed
892 // during this backoff interval. 893 // during this backoff interval.
893 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; 894 SDVLOG(2) << "A nudge during backoff failed";
894 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); 895
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
895 DCHECK(!wait_interval_->had_nudge); 897 DCHECK(!wait_interval_->had_nudge);
896 898
897 wait_interval_->had_nudge = true; 899 wait_interval_->had_nudge = true;
898 DCHECK(!pending_nudge_); 900 InitOrCoalescePendingJob(old_job);
899 901 RestartWaiting();
900 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
901 pending_nudge_ = new_job.get();
902 RestartWaiting(new_job.Pass());
903 } else { 902 } else {
904 // Either this is the first failure or a consecutive failure after our 903 // Either this is the first failure or a consecutive failure after our
905 // backoff timer expired. We handle it the same way in either case. 904 // backoff timer expired. We handle it the same way in either case.
906 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; 905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
907 HandleContinuationError(finished_job.Pass()); 906 HandleContinuationError(old_job);
908 } 907 }
909 } 908 }
910 909
911 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { 910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
912 DCHECK_EQ(MessageLoop::current(), sync_loop_); 911 DCHECK_EQ(MessageLoop::current(), sync_loop_);
913 912
914 TimeDelta poll = (!session_context_->notifications_enabled()) ? 913 TimeDelta poll = (!session_context_->notifications_enabled()) ?
915 syncer_short_poll_interval_seconds_ : 914 syncer_short_poll_interval_seconds_ :
916 syncer_long_poll_interval_seconds_; 915 syncer_long_poll_interval_seconds_;
917 bool rate_changed = !poll_timer_.IsRunning() || 916 bool rate_changed = !poll_timer_.IsRunning() ||
918 poll != poll_timer_.GetCurrentDelay(); 917 poll != poll_timer_.GetCurrentDelay();
919 918
920 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) 919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
921 poll_timer_.Reset(); 920 poll_timer_.Reset();
922 921
923 if (!rate_changed) 922 if (!rate_changed)
924 return; 923 return;
925 924
926 // Adjust poll rate. 925 // Adjust poll rate.
927 poll_timer_.Stop(); 926 poll_timer_.Stop();
928 poll_timer_.Start(FROM_HERE, poll, this, 927 poll_timer_.Start(FROM_HERE, poll, this,
929 &SyncSchedulerImpl::PollTimerCallback); 928 &SyncSchedulerImpl::PollTimerCallback);
930 } 929 }
931 930
932 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { 931 void SyncSchedulerImpl::RestartWaiting() {
933 CHECK(wait_interval_.get()); 932 CHECK(wait_interval_.get());
934 wait_interval_->timer.Stop(); 933 wait_interval_->timer.Stop();
935 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
936 if (wait_interval_->mode == WaitInterval::THROTTLED) { 935 this, &SyncSchedulerImpl::DoCanaryJob);
937 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
938 base::Bind(&SyncSchedulerImpl::Unthrottle,
939 weak_ptr_factory_.GetWeakPtr(),
940 base::Passed(&job)));
941 } else {
942 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
943 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
944 weak_ptr_factory_.GetWeakPtr(),
945 base::Passed(&job)));
946 }
947 } 936 }
948 937
949 void SyncSchedulerImpl::HandleContinuationError( 938 void SyncSchedulerImpl::HandleContinuationError(
950 scoped_ptr<SyncSessionJob> old_job) { 939 const SyncSessionJob& old_job) {
951 DCHECK_EQ(MessageLoop::current(), sync_loop_); 940 DCHECK_EQ(MessageLoop::current(), sync_loop_);
952 if (DCHECK_IS_ON()) { 941 if (DCHECK_IS_ON()) {
953 if (IsBackingOff()) { 942 if (IsBackingOff()) {
954 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); 943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
955 } 944 }
956 } 945 }
957 946
958 TimeDelta length = delay_provider_->GetDelay( 947 TimeDelta length = delay_provider_->GetDelay(
959 IsBackingOff() ? wait_interval_->length : 948 IsBackingOff() ? wait_interval_->length :
960 delay_provider_->GetInitialDelay( 949 delay_provider_->GetInitialDelay(
961 old_job->session()->status_controller().model_neutral_state())); 950 old_job.session->status_controller().model_neutral_state()));
962 951
963 SDVLOG(2) << "In handle continuation error with " 952 SDVLOG(2) << "In handle continuation error with "
964 << SyncSessionJob::GetPurposeString(old_job->purpose()) 953 << SyncSessionJob::GetPurposeString(old_job.purpose)
965 << " job. The time delta(ms) is " 954 << " job. The time delta(ms) is "
966 << length.InMilliseconds(); 955 << length.InMilliseconds();
967 956
968 // This will reset the had_nudge variable as well. 957 // This will reset the had_nudge variable as well.
969 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
970 length)); 959 length));
971 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); 960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
972 new_job->set_scheduled_start(TimeTicks::Now() + length);
973 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
974 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; 961 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
975 // Config params should always get set. 962 // Config params should always get set.
976 DCHECK(!old_job->config_params().ready_task.is_null()); 963 DCHECK(!old_job.config_params.ready_task.is_null());
977 wait_interval_->pending_configure_job = new_job.get(); 964 SyncSession* old = old_job.session.get();
965 SyncSession* s(new SyncSession(session_context_, this,
966 old->source(), old->routing_info(), old->workers()));
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
968 make_linked_ptr(s), false, old_job.config_params,
969 FROM_HERE);
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
978 } else { 971 } else {
979 // We are not in configuration mode. So wait_interval's pending job 972 // We are not in configuration mode. So wait_interval's pending job
980 // should be null. 973 // should be null.
981 DCHECK(wait_interval_->pending_configure_job == NULL); 974 DCHECK(wait_interval_->pending_configure_job.get() == NULL);
982 DCHECK(!pending_nudge_); 975
983 pending_nudge_ = new_job.get(); 976 // TODO(lipalani) - handle clear user data.
977 InitOrCoalescePendingJob(old_job);
984 } 978 }
985 979 RestartWaiting();
986 RestartWaiting(new_job.Pass());
987 } 980 }
988 981
989 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
990 syncer_->RequestEarlyExit(); // Safe to call from any thread. 983 syncer_->RequestEarlyExit(); // Safe to call from any thread.
991 DCHECK(weak_handle_this_.IsInitialized()); 984 DCHECK(weak_handle_this_.IsInitialized());
992 SDVLOG(3) << "Posting StopImpl"; 985 SDVLOG(3) << "Posting StopImpl";
993 weak_handle_this_.Call(FROM_HERE, 986 weak_handle_this_.Call(FROM_HERE,
994 &SyncSchedulerImpl::StopImpl, 987 &SyncSchedulerImpl::StopImpl,
995 callback); 988 callback);
996 } 989 }
997 990
998 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
999 DCHECK_EQ(MessageLoop::current(), sync_loop_); 992 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1000 SDVLOG(2) << "StopImpl called"; 993 SDVLOG(2) << "StopImpl called";
1001 994
1002 // Kill any in-flight method calls. 995 // Kill any in-flight method calls.
1003 weak_ptr_factory_.InvalidateWeakPtrs(); 996 weak_ptr_factory_.InvalidateWeakPtrs();
1004 wait_interval_.reset(); 997 wait_interval_.reset();
1005 poll_timer_.Stop(); 998 poll_timer_.Stop();
1006 if (started_) { 999 if (started_) {
1007 started_ = false; 1000 started_ = false;
1008 } 1001 }
1009 if (!callback.is_null()) 1002 if (!callback.is_null())
1010 callback.Run(); 1003 callback.Run();
1011 } 1004 }
1012 1005
1013 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { 1006 void SyncSchedulerImpl::DoCanaryJob() {
1014 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1007 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1015 SDVLOG(2) << "Do canary job"; 1008 SDVLOG(2) << "Do canary job";
1016 1009 DoPendingJobIfPossible(true);
1017 // Only set canary privileges here, when we are about to run the job. This
1018 // avoids confusion in managing canary bits during scheduling, when you
1019 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that
1020 // was scheduled as canary, and send it to an "unscheduled" state.
1021 to_be_canary->GrantCanaryPrivilege();
1022
1023 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) {
1024 // TODO(tim): We should be able to remove this...
1025 scoped_ptr<SyncSession> temp = CreateSyncSession(
1026 to_be_canary->session()->source()).Pass();
1027 // The routing info might have been changed since we cached the
1028 // pending nudge. Update it by coalescing to the latest.
1029 to_be_canary->mutable_session()->Coalesce(*(temp));
1030 }
1031 DoSyncSessionJob(to_be_canary.Pass());
1032 } 1010 }
1033 1011
1034 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { 1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) {
1035 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1013 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1036 // If we find a scheduled pending_ job, abandon the old one and return a 1014 SyncSessionJob* job_to_execute = NULL;
1037 // a clone. If unscheduled, just hand over ownership.
1038 scoped_ptr<SyncSessionJob> candidate;
1039 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() 1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
1040 && wait_interval_->pending_configure_job) { 1016 && wait_interval_->pending_configure_job.get()) {
1041 SDVLOG(2) << "Found pending configure job"; 1017 SDVLOG(2) << "Found pending configure job";
1042 candidate = 1018 job_to_execute = wait_interval_->pending_configure_job.get();
1043 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); 1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
1044 wait_interval_->pending_configure_job = candidate.get();
1045 } else if (mode_ == NORMAL_MODE && pending_nudge_) {
1046 SDVLOG(2) << "Found pending nudge job"; 1020 SDVLOG(2) << "Found pending nudge job";
1047 candidate = pending_nudge_->CloneAndAbandon(); 1021
1048 pending_nudge_ = candidate.get(); 1022 scoped_ptr<SyncSession> session(CreateSyncSession(
1049 unscheduled_nudge_storage_.reset(); 1023 pending_nudge_->session->source()));
1024
1025 // Also the routing info might have been changed since we cached the
1026 // pending nudge. Update it by coalescing to the latest.
1027 pending_nudge_->session->Coalesce(*(session.get()));
1028 // The pending nudge would be cleared in the DoSyncSessionJob function.
1029 job_to_execute = pending_nudge_.get();
1050 } 1030 }
1051 return candidate.Pass(); 1031
1032 if (job_to_execute != NULL) {
1033 SDVLOG(2) << "Executing pending job";
1034 SyncSessionJob copy = *job_to_execute;
1035 copy.is_canary_job = is_canary_job;
1036 DoSyncSessionJob(copy);
1037 }
1052 } 1038 }
1053 1039
1054 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( 1040 SyncSession* SyncSchedulerImpl::CreateSyncSession(
1055 const SyncSourceInfo& source) { 1041 const SyncSourceInfo& source) {
1056 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1042 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1057 DVLOG(2) << "Creating sync session with routes " 1043 DVLOG(2) << "Creating sync session with routes "
1058 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 1044 << ModelSafeRoutingInfoToString(session_context_->routing_info());
1059 1045
1060 SyncSourceInfo info(source); 1046 SyncSourceInfo info(source);
1061 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, 1047 SyncSession* session(new SyncSession(session_context_, this, info,
1062 session_context_->routing_info(), session_context_->workers())); 1048 session_context_->routing_info(), session_context_->workers()));
1049
1050 return session;
1063 } 1051 }
1064 1052
1065 void SyncSchedulerImpl::PollTimerCallback() { 1053 void SyncSchedulerImpl::PollTimerCallback() {
1066 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1067 ModelSafeRoutingInfo r; 1055 ModelSafeRoutingInfo r;
1068 ModelTypeInvalidationMap invalidation_map = 1056 ModelTypeInvalidationMap invalidation_map =
1069 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); 1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
1070 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
1071 scoped_ptr<SyncSession> s(CreateSyncSession(info)); 1059 SyncSession* s = CreateSyncSession(info);
1072 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, 1060
1073 TimeTicks::Now(), 1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(),
1074 s.Pass(), 1062 make_linked_ptr(s),
1075 ConfigurationParams(), 1063 false,
1076 FROM_HERE)); 1064 ConfigurationParams(),
1077 ScheduleSyncSessionJob(job.Pass()); 1065 FROM_HERE);
1066
1067 ScheduleSyncSessionJob(job);
1078 } 1068 }
1079 1069
1080 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { 1070 void SyncSchedulerImpl::Unthrottle() {
1081 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1071 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1082 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1083 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") 1073 SDVLOG(2) << "Unthrottled.";
1084 << "canary."; 1074 DoCanaryJob();
1085 if (to_be_canary.get())
1086 DoCanaryJob(to_be_canary.Pass());
1087
1088 // TODO(tim): The way DecideOnJob works today, canary privileges aren't
1089 // enough to bypass a THROTTLED wait interval, which would suggest we need
1090 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is
1091 // probably the "right" thing to do). Bug 154216.
1092 wait_interval_.reset(); 1075 wait_interval_.reset();
1093 } 1076 }
1094 1077
1095 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
1096 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1079 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1097 session_context_->NotifyListeners(SyncEngineEvent(cause)); 1080 session_context_->NotifyListeners(SyncEngineEvent(cause));
1098 } 1081 }
1099 1082
1100 bool SyncSchedulerImpl::IsBackingOff() const { 1083 bool SyncSchedulerImpl::IsBackingOff() const {
1101 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1102 return wait_interval_.get() && wait_interval_->mode == 1085 return wait_interval_.get() && wait_interval_->mode ==
1103 WaitInterval::EXPONENTIAL_BACKOFF; 1086 WaitInterval::EXPONENTIAL_BACKOFF;
1104 } 1087 }
1105 1088
1106 void SyncSchedulerImpl::OnSilencedUntil( 1089 void SyncSchedulerImpl::OnSilencedUntil(
1107 const base::TimeTicks& silenced_until) { 1090 const base::TimeTicks& silenced_until) {
1108 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1091 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1109 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
1110 silenced_until - TimeTicks::Now())); 1093 silenced_until - TimeTicks::Now()));
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
1095 &SyncSchedulerImpl::Unthrottle);
1111 } 1096 }
1112 1097
1113 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { 1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
1114 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1099 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1115 return wait_interval_.get() && wait_interval_->mode == 1100 return wait_interval_.get() && wait_interval_->mode ==
1116 WaitInterval::THROTTLED; 1101 WaitInterval::THROTTLED;
1117 } 1102 }
1118 1103
1119 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
1120 const base::TimeDelta& new_interval) { 1105 const base::TimeDelta& new_interval) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1174 1159
1175 #undef SDVLOG_LOC 1160 #undef SDVLOG_LOC
1176 1161
1177 #undef SDVLOG 1162 #undef SDVLOG
1178 1163
1179 #undef SLOG 1164 #undef SLOG
1180 1165
1181 #undef ENUM_CASE 1166 #undef ENUM_CASE
1182 1167
1183 } // namespace syncer 1168 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698